前提 RocketMQ的部署环境可用
1 依赖包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
2 yml的rocketMQ 的配置
rocketmq:
# 多个用;隔开
name-server: 10.22.15.61:9876;10.22.15.60:9876
producer:
# 生产组
group: cip-group
# 发送消息超时时间,默认 3000
sendMessageTimeout: 3000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 发送异步消息失败重试次数,默认2
retryTimesWhenSendAsyncFailed: 2
3 注入 RocketMQTemplate 就可以发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
4 发送消息例子
package com.bs.it.esi.controller;
import com.alibaba.fastjson.JSON;
import com.bs.it.esi.config.IdWorker;
import com.bs.platform.core.annotations.WhiteList;
import com.bs.platform.core.vo.CommonResult;
import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiOperation;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
@RestController
@RequestMapping("/rocketmq")
@Api(tags = "rocket 消息发送测试")
public class RocketMqDemo {
@Autowired
private IdWorker idWorker;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private MQAdmin mqAdmin;
@GetMapping("send/{id}")
@WhiteList
@ApiOperation("发送消息")
public CommonResult send(@PathVariable("id") String id) throws UnsupportedEncodingException {
// MessageConst
// MessageBuilder.withPayload("123").setHeader("",""); 链式调用
for (int i = 0; i < 5; i++) {
SendResult sendResult = rocketMQTemplate.syncSend("rocket-topic-7:AA", MessageBuilder.withPayload("mq发送消息测试AA" + i).build());
System.out.println(sendResult);
}
// rocketMQTemplate.send("rocket-topic-711",MessageBuilder.withPayload("mq发送消息测试a:").build());
// SendResult result = rocketMQTemplate.syncSend("rocket-topic-01", MessageBuilder.withPayload("mq发送消息测试a:"+i).build(), 3000);
return new CommonResult("发送成功!!");
}
@GetMapping("sendTransaction")
@WhiteList
@ApiOperation("事务消息")
public CommonResult sendTransaction(@RequestParam(name = "mes") String mes) throws UnsupportedEncodingException {
// 事务消息
// 事务消息就是将发送消息和本地数据库操作融合为同一个事务,二者要么都成功,要么都失败,不能出现一个操作成功另一操作失败的情况。
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("rocket-topic-7", MessageBuilder.withPayload(mes).build(),null);
System.out.println("-----"+result);
return new CommonResult("发送完成");
}
// 顺序消息
@GetMapping("sendselector")
@WhiteList
@ApiOperation("发送顺序消息")
public CommonResult sendselector(@RequestParam(name = "mes") String mes) {
rocketMQTemplate.setMessageQueueSelector((List<MessageQueue> mqs, Message msg, Object arg) -> {
int queuqInx = arg.toString().hashCode() % (mqs.size());
System.out.println("获取到的队列为:"+queuqInx+" msg "+ new String(msg.getBody()));
return mqs.get(queuqInx);
});
for (int i = 0; i < 5; i++) {
SendResult sendResult = rocketMQTemplate.syncSendOrderly("rocket-topic-7",
MessageBuilder.withPayload("发送的顺序消息" + i).build(), "2");
System.out.println("sendResult: "+sendResult);
}
return new CommonResult();
}
// 批量的消息
@GetMapping("batchSend")
@WhiteList
@ApiOperation("批量的消息")
public CommonResult batchSend(@RequestParam(name = "mes") String mes) {
List<org.springframework.messaging.Message> lists = new ArrayList<>();
for (int i = 0; i < 10; i++) {
org.springframework.messaging.Message<String> build = MessageBuilder.withPayload("批量发送消息的测试。。 :" + i).build();
lists.add(build);
}
List<List<org.springframework.messaging.Message>> partition = Lists.partition(lists, 300);
partition.forEach(li -> {
SendResult sendResult = rocketMQTemplate.syncSend("rocket-topic-7", li, 20000);
System.out.println("--------------------------"+sendResult);
});
return new CommonResult();
}
// 控制消息消费的个数
@GetMapping("batchCount")
@WhiteList
@ApiOperation("控制消息消费的个数")
public CommonResult batchCount(@RequestParam(name = "mes") String mes) {
List<org.springframework.messaging.Message> lists = new ArrayList<>();
for (int i = 0; i < 10; i++) {
org.springframework.messaging.Message<String> build = MessageBuilder.withPayload("批量发送消息的测试。。 :" + i).build();
lists.add(build);
}
SendResult sendResult = rocketMQTemplate.syncSend("rocket-topic-7", lists, 10000);
System.out.println(sendResult);
return new CommonResult();
}
}
其中 事务消息需要另外的实现一个RocketMQLocalTransactionListener类
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@RocketMQTransactionListener()
public class MYRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("开始执行本地事务---");
System.out.println("message: "+message);
if (new String((byte[]) message.getPayload()).length() > 5){
return RocketMQLocalTransactionState.COMMIT;
}else if (message.getPayload().toString().length() == 8){
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("消息的回查----");
return RocketMQLocalTransactionState.COMMIT;
}
}
消费者监听消息:
import com.bs.platform.core.redis.RedisCacheUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(topic = "fdc_draw_chart_topic",
consumerGroup = "fdc-group")
// CONCURRENTLY 并发(消息无序) ORDERLY 单线程(有序)
public class RocketConsumer implements RocketMQListener<MessageExt> {
@Autowired
private RedisCacheUtils redisCacheUtils;
@Override
public void onMessage(MessageExt messageExt) {
// Object aaa = redisCacheUtils.get("aaa");
// if (aaa == null){
// redisCacheUtils.set("aaa","1",10000);
// log.info("程序出错,消息从新拉取消费");
// throw new RuntimeException("出错了");
// }
log.info("----------------------------------------------------------------- :"+new String(messageExt.getBody()).toString());
}
}
更加细致的消息发送和监听 可以参考官网 欢迎交流