首页 > 其他分享 >(转载)【RocketMQ 课程笔记】16.实现集群消费模式与广播消费模式

(转载)【RocketMQ 课程笔记】16.实现集群消费模式与广播消费模式

时间:2022-09-28 11:58:33浏览次数:52  
标签:INFO consumemode 21 16 模式 itlaoqi rocketmq com RocketMQ

集群消费模式与广播消费模式

环境准备

生产者CmProducer

生产者是一致的,循环生成10条普通消息投给给Broker,主题为:cm-sample-data ,Tag:test ,Key:n

@Slf4j
public class CmProducer {
    public static void main(String[] args) {
        //DefaultMQProducer用于发送非事务消息
        DefaultMQProducer producer = new DefaultMQProducer("cm-producergroup");
        //注册NameServer地址
        producer.setNamesrvAddr("192.168.31.103:9876");
        //异步发送失败后Producer自动重试2次
        producer.setRetryTimesWhenSendAsyncFailed(2);
        try {
            //启动生产者实例
            producer.start();
            for(Integer i = 0 ; i < 10 ; i++) {
            //消息数据
            String data = "第" + i + "条消息数据";
            //消息主题
            Message message = new Message("cm-sample-data", "test", i.toString(),
            data.getBytes());
            //发送结果
            SendResult result = producer.send(message);
            log.info("Broker响应:" + result);
            }
        }catch (Exception e){
        	e.printStackTrace();
        }finally {
            try {
                //关闭连接
                producer.shutdown();
                log.info("连接已关闭");
            }catch (Exception e){
            	e.printStackTrace();
            }
        }
    }
}

集群模式消费者

代码分析

@Slf4j
public class CmClusterConsumer {
    public static void main(String[] args) throws Exception {
        // 声明并初始化一个 consumer
        // 需要一个 consumer group 名字作为构造方法的参数
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cmcluster-consumer-group");
        // 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
        consumer.setNamesrvAddr("192.168.31.103:9876");
        //设置为集群模式(负载均衡)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
        consumer.subscribe("cm-sample-data", "*");
        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(msg->{
                    log.info("收到消息:" + new String(msg.getBody()));
                });
                // 返回消费状态
                // CONSUME_SUCCESS 消费成功
                // RECONSUME_LATER 消费失败,需要稍后重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 调用 start() 方法启动 consumer
        consumer.start();
        log.info("集群消费者启动成功,正在监听新消息");
    }
}

运行结果

启动1-4个实例:
实例1:

21:54:58.944 [main] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 集群消费者启动成功,正在监听新消息
21:55:08.963 [ConsumeMessageThread_3] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第2条消息数据
21:55:08.979 [ConsumeMessageThread_5] INFO om.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第6条消息数据

实例2:

21:55:01.010 [main] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 集群消费者启动成功,正在监听新消息
21:55:08.949 [ConsumeMessageThread_1] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第0条消息数据
21:55:08.949 [ConsumeMessageThread_3] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第4条消息数据
21:55:08.985 [ConsumeMessageThread_4] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第8条消息数据

实例3:

21:55:02.987 [main] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 集群消费者启动成功,正在监听新消息
21:55:08.965 [ConsumeMessageThread_1] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第1条消息数据
21:55:08.978 [ConsumeMessageThread_2] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第5条消息数据
21:55:08.988 [ConsumeMessageThread_3] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第9条消息数据

实例4:

21:55:04.490 [main] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 集群消费者启动成功,正在监听新消息
21:55:08.978 [ConsumeMessageThread_1] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第3条消息数据
21:55:08.982 [ConsumeMessageThread_2] INFO com.itlaoqi.rocketmq.consumemode.CmClusterConsumer - 收到消息:第7条消息数据  

​ 随机消费 ?

广播模式消费者

源码分析

区别只有setMessageModel方法传入BROADCASTING常量,其他没有任何变化

@Slf4j
public class CmBroadcastConsumer {
    public static void main(String[] args) throws Exception {
        //...其余代码完全一样
        //设置为广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //...其余代码完全一样
    }
}

运行结果

1-4个实例均消费到10条消息,不过不同实例之间获取消息的前后顺序均有差别。

21:59:10.398 [main] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 广播消费者启动成功,正在监听新消息
21:59:16.379 [ConsumeMessageThread_5] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第4条消息数据
21:59:16.380 [ConsumeMessageThread_3] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第2条消息数据
21:59:16.380 [ConsumeMessageThread_1] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第0条消息数据
21:59:16.380 [ConsumeMessageThread_4] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第3条消息数据
21:59:16.380 [ConsumeMessageThread_2] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第1条消息数据
21:59:16.381 [ConsumeMessageThread_6] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第5条消息数据
21:59:16.386 [ConsumeMessageThread_7] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第6条消息数据
21:59:16.389 [ConsumeMessageThread_8] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第7条消息数据
21:59:16.399 [ConsumeMessageThread_9] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第8条消息数据
21:59:16.399 [ConsumeMessageThread_10] INFO com.itlaoqi.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第9条消息数据  

Process finished with exit code -1  

标签:INFO,consumemode,21,16,模式,itlaoqi,rocketmq,com,RocketMQ
From: https://www.cnblogs.com/JamKing/p/16737483.html

相关文章

  • 16 、zedboard之纯PL按键实验
    实验基本目的:实验手册PL的流水灯,基本流程参考手册;逻辑代码解析:`timescale1ns/1ps////Company://Engineer:////CreateDate:03/11/202007:37:12PM//Design......
  • 链接服务器读取Mysql---出现消息 7347,级别 16,状态 1,第 13 行 链接服务器 '****' 的 OL
    可以毫不夸张的说:“网上所有搜索出来的答案,都没有解决我的问题”,我是采用以下的方式处理此异常,借此宝地mark一下  今天使用链接服务器查询Mysql数据库时,出现以下问题......
  • 汇编实验:按15行×16列的表格形式显示ASCII码为10H—100H的所有字符
    上海大学 计算机学院《汇编语言程序设计实验》报告          实验名称:      第三周实验        一、实验任务1.完成资料里的实验任......
  • P1600 [NOIP2016 提高组] 天天爱跑步
    P1600NOIP2016提高组天天爱跑步LCA+桶点击查看代码///*考虑上行的情况(u,v)中u被i看到<=>1.u∈{i的子树} 2.lca(u,v)不属于{i的子树} 3.de......
  • 模板方法设计模式基础知识!
    模板方法设计模式该设计模式解决的问题是:具有固定算法(步骤)的应用。但这些算法步骤,又针对不同的用户(情况)具有不同的实现方式。在该设计模式中,具有两大类方法:模板方法,步......
  • (转载)【RocketMQ 课程笔记】15.消费者概述
    消费者概述几个关键概念消费者组:一个逻辑概念,在使用消费者时需要指定一个组名。一个消费者组可以订阅多个Topic。消费者实例:一个消费者组程序部署了多个进程,每个进程......
  • DevOps:定义汽车软件新开发模式
    当前,全球汽车产业正在经历从传统工业向数字化转型的大变革,智能化、数字化、信息化正在成为汽车电子行业转型发展的必由之路。“软件定义汽车”(SoftwareDefinedVehicles,SD......
  • 如何正确安装VMware16以及配置Centos7
    如何正确安装VMware16以及配置Centos7https://blog.csdn.net/m0_53533877/article/details/123442079?ops_request_misc=&request_id=&biz_id=102&utm_term=vmware16%E6%9......
  • 装饰者模式
    装饰者模式统一接口packagedecoratortypedecoratorinterface{getPrice()uint8getDesc()string}炒饭(被装饰者)packagedecoratortypericestruct......
  • 设计模式之策略模式
    在一个收银系统中,如果普通用户、中级会员、高级会员分别对应着不同的优惠策略,常规编程就要使用一系列的判断语句,判断用户类型,这种情况下就可以使用策略模式。一、概念理解......