首页 > 其他分享 >Rocketmq 不同的topic要配不同的consumegroup

Rocketmq 不同的topic要配不同的consumegroup

时间:2024-05-10 09:02:11浏览次数:14  
标签:group sub 要配 topic boolean null data Rocketmq

Rocketmq 不同的topic要配不同的consumegroup

  使用Rocketmq一定要注意,如果项目中要订阅两个topic,一定要保证consumeGroup是两个不同的。

  这是因为,Consumer会定期发送心跳,默认是30s一次。心跳会像全部broker发送,心跳包内容包括groupname,topicname1。然后broker端会缓存这个信息,以groupname为key

  代码在 ClientManagerProcessor # heartBeat

复制代码
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
            ctx.channel(),
            heartbeatData.getClientID(),
            request.getLanguage(),
            request.getVersion()
        );

        for (ConsumerData
                data : heartbeatData.getConsumerDataSet()) {
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                    data.getGroupName());
            boolean isNotifyConsumerIdsChangedEnable = true;
            if (null != subscriptionGroupConfig) {
                isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                int topicSysFlag = 0;
                if (data.isUnitMode()) {
                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                }
                String newTopic = MixAll.getRetryTopic(data.getGroupName());
                this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                    newTopic,
                    subscriptionGroupConfig.getRetryQueueNums(),
                    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
            }

            boolean changed = this.brokerController.getConsumerManager().registerConsumer(@1
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
            );

            if (changed) {
                log.info("registerConsumer info changed {} {}",
                    data.toString(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                );
            }
        }

        for (ProducerData data : heartbeatData.getProducerDataSet()) {
            this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                clientChannelInfo);
        }
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
复制代码

  代码很长,就不一一分析了。重点是 心跳里带着数据 

  heartbeatData.getConsumerDataSet()。

复制代码
public class ConsumerData {
    private String groupName;
    private ConsumeType consumeType;
    private MessageModel messageModel;
    private ConsumeFromWhere consumeFromWhere;
    private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
    private boolean unitMode;
复制代码

  SubscriptionData 是关于topic的相关信息,里面最重要的就是topic

  @1处的代码

复制代码
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
            );
复制代码 复制代码
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }

        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        boolean r2 = consumerGroupInfo.updateSubscription(subList);
复制代码 复制代码
public boolean updateSubscription(final Set<SubscriptionData> subList) {
        boolean updated = false;

        for (SubscriptionData sub : subList) {
            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
            if (old == null) {
                SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
                if (null == prev) {
                    updated = true;
                    log.info("subscription changed, add new topic, group: {} {}",
                        this.groupName,
                        sub.toString());
                }
            } else if (sub.getSubVersion() > old.getSubVersion()) {
                if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                    log.info("subscription changed, group: {} OLD: {} NEW: {}",
                        this.groupName,
                        old.toString(),
                        sub.toString()
                    );
                }

                this.subscriptionTable.put(sub.getTopic(), sub);
            }
        }
复制代码

  从上面的代码看得出来,如果两个不同的topic,consmer客户端不小心配成了相同的consumeGroup,在broker端的缓存里,每次心跳就有可能覆盖之前的订阅信息。导致某一个consume消费不到自己要订阅的topic了

 

  

  标签: RocketMq 好文要顶 关注我 收藏该文 微信分享 MaXianZhe
粉丝 - 5 关注 - 5
    +加关注 0 0     升级成为会员   « 上一篇: SpringBoot之使用外部的启动类
» 下一篇: Seata AT模式全局锁源码分析

posted on 2021-08-13 16:24  MaXianZhe  阅读(2148)  评论(0)  编辑  收藏  举报

    刷新评论刷新页面返回顶部 升级成为园子VIP会员 编辑 预览   88ee990f-bb7c-489a-46fb-08d6d3fea897     自动补全

不改了 退出 订阅评论 我的博客

 

[Ctrl+Enter快捷键提交]

  【推荐】融资做与众不同的众包平台,让开发能力成为一种服务
【推荐】园子周边第二季:更大的鼠标垫,没有logo的鼠标垫
【推荐】阿里云云市场联合博客园推出开发者商店,欢迎关注
【推荐】会员力量,点亮园子希望,期待您升级成为园子会员   编辑推荐:
· 压榨数据库的真实处理速度
· 深入剖析:如何使用 Pulsar 和 Arthas 高效排查消息队列延迟问题
· 「动画进阶」巧用 CSS/SVG 实现复杂线条光效动画
· 如何阅读 Paper
· 理解前端工程化
阅读排行:
· 压榨数据库的真实处理速度
· 使用 Docker 部署 TaleBook 私人书籍管理系统
· .NET有哪些好用的定时任务调度框架
· C#.Net筑基-基础知识
· 车牌识别控制台 可快速整合二次开发
 

标签:group,sub,要配,topic,boolean,null,data,Rocketmq
From: https://www.cnblogs.com/fanwenyan/p/18183478

相关文章

  • RocketMQ模型和生产实践
    RocketMQ的客户端编程模型相对⽐较固定,基本都有⼀个固定的步骤。掌握这个固定步骤,对于学习其他复杂的消息模型也是很有帮助的。消息⽣产者的固定步骤1.创建消息⽣产者producer,并指定⽣产者组名2.指定Nameserver地址,可以在代码中固定写IP,也可以通过配置项来写,最好是配置项,这样更......
  • RocketMQ 事件驱动:云时代的事件驱动有啥不同?
    前言:从初代开源消息队列崛起,到PC互联网、移动互联网爆发式发展,再到如今IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了30多个年头。目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往......
  • Apache RocketMQ ACL 2.0 全新升级
    作者:徒钟引言RocketMQ作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ现有的AC......
  • redis-nginx-consul-rocketmq-主机巡检脚本
    架构介绍:6台服务器(192.55.11.1192.55.11.2192.55.11.3192.55.11.4192.55.11.5192.55.11.6)192.55.11.1 192.55.11.2只需巡检主机的cpu、内存、磁盘192.55.11.3需巡检主机的cpu、内存、磁盘,Nginx的连接数192.55.11.4 192.55.11.5需巡检主机的cpu、内存、磁盘,consul日志......
  • rocketMQ一
    参考:图灵课堂:https://vip.tulingxueyuan.cn MQ简介MQ:MessageQueue,消息队列。是在互联网中使用非常广泛的一系列服务中间件。这个词可以分两个部分来看,一是Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上,也可以分布在不同机器上。二是Queue......
  • 如何实现一个简单易用的 RocketMQ SDK
    2018年,做为架构负责人,接到一个架构需求:实现一个简单易用的RocketMQSDK。因为各个团队RocketMQ原生客户端配置起来千奇百怪,有的配置存在风险,各团队负责人都需要一个简洁易用的RocketMQSDK。我立马调研相关开源的方案,当时RocketMQ-Spring项目并没有开源,而阿里云的ONS......
  • 快速入门一篇搞定RocketMq-实现微服务实战落地
    1、RocketMq介绍RocketMQ起源于阿里巴巴,最初是为了解决邮件系统的高可靠性和高性能而设计的。在2016年开源分布式消息中间件,并逐渐成为Apache顶级项目。现在是Apache的一个顶级项目,在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。官网地址:https://......
  • kafka核心概念Broker、Topic、Partition和Replication
    在Kafka中,Broker、Topic、Partition和Replication是四个核心概念,它们各自扮演了不同的角色并共同协作以确保数据的可靠性、可扩展性和高性能。以下是关于这四个概念的详细解释:Broker(代理)*Broker是Kafka集群中的一个节点,负责存储和转发消息。Kafka集群由多个Broker组成。*Brok......
  • RocketMQ生产者启动源码
    核心代码初始化Default生产者DefaultMQProducerproducer=newDefaultMQProducer(PRODUCER_GROUP);设置NameAddr地址producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();分析newDefaultMQProducer(PRODUCER_GROUP)publicDefaultMQProducer(finalStringp......
  • Docker安装RocketMQ
    https://blog.csdn.net/qq_43600166/article/details/136187969 前提条件需要安装dockerhttps://yeasy.gitbook.io/docker_practice/install/centos NameServer1.拉取容器dockerpullrocketmqinc/rocketmq2.创建NameServer容器创建一个新的容器并指定RocketMQ的镜像......