支撑环境
JDK: java version "1.8.0_391"
应用框架: org.springframework.boot:2.7.17
RocketMQ客户端SDK: rocketmq-spring-boot-starter:2.2.3
生产者
消息提供者
添加依赖
implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.3'
添加配置
application.yml文件中添加Rocket相关配置
rocketmq:
#NameServer服务的ip端口
name-server: 127.0.0.1:9876
#生产者分组
producer:
group-name: tradeTxProducer
向Srping容器注入相关Bean
向Spring容器注入生产者和RocketMQTemplate的Bean对象;RocketMQTemplate用于管理生产者,提供与RocketMQ服务端的连接方法,早期版本的rocketmq-spring-boot-starter不需要自己提供RocketMQTemplate的Bean。
@Configuration
public class RocketMQConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group-name}")
private String producerGroupName;
/**
* 普通消息生产者
* @return
*/
@Bean
public DefaultMQProducer mqProducer() {
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr(nameServer);
return producer;
}
/**
* RocketMQ普通消息连接组件
* @param mqProducer
* @return
*/
@Bean
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(mqProducer);
return rocketMQTemplate;
}
}
业务代码发送消息
在业务代码中添加RocketMQTemplate,通过RocketMQTemplate发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/msg/general")
public R<String> sendGeneralMsg(){
R<String> r = R.ok("普通消息推送成功");
String reqNo = TransactionIdGenerator.generateReqTransactionId();
// 创建 RocketMQ 的 短信Message 实例
Message<String> smsMessage = MessageBuilder
.withPayload("您购买的产品已预约成功,请及时打款完成交易。")
.setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo)
.build();
//发送短信通知消息
SendResult smsSendResult = rocketMQTemplate.syncSend("smsTopic:tradeBuyTag", smsMessage);
if (smsSendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
logger.info("产品预约成功客户提醒消息已经发送");
return r;
}
return R.failure("普通消息发送失败");
}
消费者
RocketMQ中顺序消息需要使用同步线程的消费者消费消息,其它类型的消息的消费者不做特别要求
添加依赖
implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.3'
添加配置
application.yml文件中添加Rocket相关配置
rocketmq:
#NameServer服务的ip端口
name-server: 127.0.0.1:9876
添加消息消费者
使用@RocketMQMessageListener注解,在注解中指定订阅的topic和消费者分组;
实现RocketMQListener接口,指定RocketMQListener
@Component
@RocketMQMessageListener(
topic = "smsTopic",
consumerGroup = "smsConsumerGroup"
)
public class SMSTopicListener implements RocketMQListener<MessageExt> {
private static final Logger logger = LoggerFactory.getLogger(SMSTopicListener.class);
@Override
public void onMessage(MessageExt msgExt) {
// 执行本地事务逻辑,返回事务状态
try {
String bodyStr = new String(msgExt.getBody());
logger.info("购买交易抢占基金份额消息消费开始:{}", bodyStr);
//TODO 消费过程
logger.info("购买交易抢占基金份额消息消费结束");
} catch (Exception e) {
logger.error("交易消息消费失败!", e);
throw e;
}
}
}
标签:RocketMQ,消费,return,rocketMQTemplate,RocketMQTemplate,消息,rocketmq,客户端
From: https://www.cnblogs.com/zly1015/p/17972188