添加pom.xml依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
创建消息消费者
@Component
@Slf4j
public class MessageConsumerService {
@Component
@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group1")
public class Consumer1 implements RocketMQListener<UserChange> {
@Override
public void onMessage(UserChange message) {
log.info("收到信息:{}", JSON.toJSONString(message));
}
}
@Component
@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group2")
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("收到信息:{}", message);
}
}
@Component
@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group3")
public class Consumer3 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
log.info("收到信息:{}", new String(message.getBody()));
}
}
}
发送消息
@RestController
public class TestController {
/**
* destination包括2个部分信息,topic和tags,可以只有topic
*/
private final String destination = RocketMQConstant.TOPIC + ":" + RocketMQConstant.TAGS;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步消息发送
*
* @return
*/
@GetMapping("send")
public SendResult send() {
UserChange change = UserChange.builder().userName("张三").remark("密码变更").build();
Message<UserChange> message = MessageBuilder.withPayload(change).setHeader(RocketMQHeaders.KEYS, "key").build();
return rocketMQTemplate.syncSend(destination, message);
}
}
日志输出
2023-05-06 19:45:15.042 [ConsumeMessageThread_consumer-group2_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer2.onMessage 38 -- 收到信息:{"userName":"张三","remark":"密码变更"}
2023-05-06 19:45:15.042 [ConsumeMessageThread_consumer-group3_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer3.onMessage 47 -- 收到信息:{"userName":"张三","remark":"密码变更"}
2023-05-06 19:45:15.080 [ConsumeMessageThread_consumer-group1_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer1.onMessage 29 -- 收到信息:{"remark":"密码变更","userName":"张三"}
常见错误:
1、connect to 172.17.183.41:10911 failed
防火墙需要开启10911端口
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --reload
2、sendDefaultImpl call timeout
消息发送超时,可调整rocketmq.producer.send-message-timeout参数,默认3秒
标签:集成,SpringBoot,--,RocketMQConstant,onMessage,message,consumer,public,RocketMQ From: https://www.cnblogs.com/zhi-leaf/p/17378288.html