Rocketmq 不同的topic要配不同的consumegroup
使用Rocketmq一定要注意,如果项目中要订阅两个topic,一定要保证consumeGroup是两个不同的。
这是因为,Consumer会定期发送心跳,默认是30s一次。心跳会像全部broker发送,心跳包内容包括groupname,topicname1。然后broker端会缓存这个信息,以groupname为key
代码在 ClientManagerProcessor # heartBeat
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(null); HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ctx.channel(), heartbeatData.getClientID(), request.getLanguage(), request.getVersion() ); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName()); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0; if (data.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } String newTopic = MixAll.getRetryTopic(data.getGroupName()); this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } boolean changed = this.brokerController.getConsumerManager().registerConsumer(@1 data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); if (changed) { log.info("registerConsumer info changed {} {}", data.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); } } for (ProducerData data : heartbeatData.getProducerDataSet()) { this.brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
代码很长,就不一一分析了。重点是 心跳里带着数据
heartbeatData.getConsumerDataSet()。
public class ConsumerData { private String groupName; private ConsumeType consumeType; private MessageModel messageModel; private ConsumeFromWhere consumeFromWhere; private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>(); private boolean unitMode;
SubscriptionData 是关于topic的相关信息,里面最重要的就是topic
@1处的代码
boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable );
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); boolean r2 = consumerGroupInfo.updateSubscription(subList);
public boolean updateSubscription(final Set<SubscriptionData> subList) { boolean updated = false; for (SubscriptionData sub : subList) { SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); if (old == null) { SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); if (null == prev) { updated = true; log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString()); } } else if (sub.getSubVersion() > old.getSubVersion()) { if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) { log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString() ); } this.subscriptionTable.put(sub.getTopic(), sub); } }
从上面的代码看得出来,如果两个不同的topic,consmer客户端不小心配成了相同的consumeGroup,在broker端的缓存里,每次心跳就有可能覆盖之前的订阅信息。导致某一个consume消费不到自己要订阅的topic了
标签: RocketMq 好文要顶 关注我 收藏该文 微信分享 MaXianZhe
粉丝 - 5 关注 - 5
+加关注 0 0 升级成为会员 « 上一篇: SpringBoot之使用外部的启动类
» 下一篇: Seata AT模式全局锁源码分析
posted on 2021-08-13 16:24 MaXianZhe 阅读(2148) 评论(0) 编辑 收藏 举报
刷新评论刷新页面返回顶部 升级成为园子VIP会员 编辑 预览 88ee990f-bb7c-489a-46fb-08d6d3fea897 自动补全[Ctrl+Enter快捷键提交]
【推荐】融资做与众不同的众包平台,让开发能力成为一种服务【推荐】园子周边第二季:更大的鼠标垫,没有logo的鼠标垫
【推荐】阿里云云市场联合博客园推出开发者商店,欢迎关注
【推荐】会员力量,点亮园子希望,期待您升级成为园子会员 编辑推荐:
· 压榨数据库的真实处理速度
· 深入剖析:如何使用 Pulsar 和 Arthas 高效排查消息队列延迟问题
· 「动画进阶」巧用 CSS/SVG 实现复杂线条光效动画
· 如何阅读 Paper
· 理解前端工程化
阅读排行:
· 压榨数据库的真实处理速度
· 使用 Docker 部署 TaleBook 私人书籍管理系统
· .NET有哪些好用的定时任务调度框架
· C#.Net筑基-基础知识
· 车牌识别控制台 可快速整合二次开发
标签:group,sub,要配,topic,boolean,null,data,Rocketmq From: https://www.cnblogs.com/fanwenyan/p/18183478