@Service @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}") public class MsgListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { private org.slf4j.Logger logger = LoggerFactory.getLogger(getClass()); @Override public void onMessage(MessageExt msg) { logger.debug("RECEIVE_MSG_BEGIN: " + msg.toString()); logger.debug(String.format("消费消息,消息ID:%s,消息KEY:%s,消息体:%s ", msg.getMsgId(), msg.getKeys(), new String(msg.getBody()))); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { consumer.setInstanceName("testTopic-tag1"); } }
原因
多消费组实例的场景下,只配置了一个通用的name server配置,导致有些消费组
consumer.setInstanceName("testTopic-tag1");正常连接到b-name server,但是使用默认配置连到了a-name server上面,导致关系错乱,无法正常消费消息
consumer.setInstanceName("testTopic-tag2");也是被错误的连接到了a-name server
在初始化的时候
问题排查思路
RocketMQ中,如果不同消费组消费同一个Topic,理论上每个消费组应该只消费该Topic的消息一次。然而,确实有可能出现某个消费组偶尔消费不到消息的情况,这可能是由以下原因导致的:
-
消费分组不正确:
确保你在创建消费者时指定了正确的消费组名称,并且这个消费组已经订阅了要消费的Topic。 -
消息过滤或Tag匹配问题:
如果消费者使用了消息过滤或者Tag匹配,那么只有满足过滤条件或者Tag匹配的消息才会被消费。检查你的消费代码,确保过滤条件和Tag匹配设置正确。 -
消费线程数不足或阻塞:
如果消费者的消费线程数不足或者消费过程中出现了阻塞,可能会导致部分消息未能及时消费。检查消费者的配置和日志,确保消费线程数足够并且没有异常情况。 -
消息堆积:
如果Broker中的消息积压严重,新产生的消息可能会被延迟投递,从而导致某个消费组暂时消费不到消息。 -
网络问题或 Broker 故障:
网络波动、Broker重启或者其他故障可能会影响消息的正常投递和消费。 -
消费位点问题:
消费者的消费位点(即消费进度)可能存在异常,导致某些消息未被正确消费。检查消费者的消费位点信息,确保其与实际消费进度一致。 -
系统负载过高:
如果系统负载过高,包括CPU、内存或者磁盘I/O等资源紧张,可能会影响RocketMQ的正常运行和消息投递。
为了解决这个问题,你可以按照以下步骤进行排查和处理:
- 确认消费组的配置和订阅关系是否正确。
- 检查消费者的消费代码,特别是消息过滤和Tag匹配的部分。
- 调整消费者的消费线程数和消费参数,以适应实际的负载和性能需求。
- 监控Broker的状态和网络连接,确保其正常运行。
本篇文章如有帮助到您,请给「翎野君」点个赞,感谢您的支持。
首发链接:https://www.cnblogs.com/lingyejun/p/18559591
标签:消费,消费者,topic,Tag,消息,msg,consumer,RocketMQ From: https://www.cnblogs.com/lingyejun/p/18559591