首页 > 其他分享 >RocketMQ笔记(八):顺序消息

RocketMQ笔记(八):顺序消息

时间:2023-05-05 09:13:53浏览次数:47  
标签:消费 indentDemo 笔记 顺序 消息 import RocketMQ

一、什么是顺序消息

  消息有序指的是可以按照消息的发送顺序来消费(FIFO)。

  顺序消息是 RocketMQ 提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

二、顺序消息的特性

  1、消息消费失败或消费超时,会触发服务端重试逻辑,重试消息属于新的消息,原消息的生命周期已结束;

  2、顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。

  3、顺序消息仅支持使用MessageType为FIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

三、RocketMQ如何保证消息的顺序

  RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

  RocketMQ在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);RocketMQ在消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。

  全局有序:发送和消费参与的queue只有一个,则是全局有序。

  分区有序:多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

四、RocketMQ保证消息顺序的条件

RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。

1、生产顺序性

  RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

1、生产者顺序发送的前提

1.1、单一生产者

  消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。

1.2、串行发送

  RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

2、生产者顺序发送

满足上述条件,将顺序消息发送至 RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

  ·相同消息组的消息按照先后顺序被存储在同一个队列。

  ·不同消息组的消息可以混合在同一个队列中,且不保证连续。

  

  如上图所示,消息组1和消息组4的消息混合存储在队列1中,RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。

2、消费顺序性

  RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

2.1、投递顺序

  RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递。

  消费者类型为PushConsumer时, RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由应用程序保证。

2.2、有限重试

  RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

  对于严格保证消费顺序的场景,需要合理设置重试次数,避免消息乱序。

3、生产顺序性和消费顺序性组合

  如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。

  一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。

生产顺序 消费顺序 顺序性效果
设置消息组,保证消息顺序发送。 顺序消费 按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。
设置消息组,保证消息顺序发送。 并发消费 并发消费,尽可能按时间顺序处理。
未设置消息组,消息乱序发送。 顺序消费 按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。
未设置消息组,消息乱序发送。 并发消费 并发消费,尽可能按照时间顺序处理。

五、顺序消息的生命周期

  

1、初始化

  消息被生产者构建并完成初始化,待发送到RocketMQ服务端。

2、待消费

  消息被发送到服务端,等待消费者消费。

3、消费中

  消息被消费者火球,RocketMQ会等待消费者完成消息并提交消费结果,若一定时间后未收到消费者的响应,RocketMQ会对消息进行重试处理。

4、消费提交

  消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已被处理(消费成功/失败)。RocketMQ默认支持保留消息,此时消息数据不会被立即删除,逻辑标记已消费。

在消息保存时间到期或存储空间不足被删除前,消费者可以回溯消息重新消费。

5、消息删除

  RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

六、顺序消息示例demo

  工具类详见:RocketMQ笔记(六):示例代码工具类

1、订单实体

 1 /**
 2  * @Description: 订单信息
 3  */
 4 public class Indent {
 5 
 6     /**
 7      * 订单ID
 8      */
 9     private String orderId;
10 
11     /**
12      * 顺序
13      */
14     private String order;
15 
16     public Indent(String orderId, String order) {
17         this.orderId = orderId;
18         this.order = order;
19     }
20 
21     public String getOrderId() {
22         return orderId;
23     }
24 
25     public void setOrderId(String orderId) {
26         this.orderId = orderId;
27     }
28 
29     public String getOrder() {
30         return order;
31     }
32 
33     public void setOrder(String order) {
34         this.order = order;
35     }
36 }

2、顺序消息生产者

 

 1 import com.snails.rmq.common.RMQConstant;
 2 import com.snails.rmq.utils.ClientUtils;
 3 import com.snails.rmq.utils.DateUtils;
 4 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 5 import org.apache.rocketmq.client.producer.MessageQueueSelector;
 6 import org.apache.rocketmq.client.producer.SendResult;
 7 import org.apache.rocketmq.common.message.Message;
 8 import org.apache.rocketmq.common.message.MessageQueue;
 9 import java.util.ArrayList;
10 import java.util.List;
11 
12 /**
13  * @Description: 发送顺序消息
14  */
15 public class OrderProducer {
16 
17     public static void main(String[] args) throws Exception {
18         // 获取生产者实例
19         DefaultMQProducer producer =
20                 ClientUtils.gainProducerInstance(RMQConstant.ORDER_GROUP, RMQConstant.NAEMSRV_ADDR);
21         // 消息标签
22         String[] tags = new String[]{RMQConstant.MSG_TAG_A, RMQConstant.MSG_TAG_B, RMQConstant.MSG_TAG_C};
23         // 订单列表
24         List<Indent> indentList = OrderProducer.buildIndents();
25 
26         for (int i = 0; i < indentList.size(); i++) {
27             // 获取消息
28             String msgBody = "顺序消息, ".concat(DateUtils.now().concat(":")) + indentList.get(i);
29             Message msg = new Message(RMQConstant.ORDER_TOPIC,
30                     tags[i % tags.length], "RMQKEYS" + i, msgBody.getBytes());
31 
32             // 发送消息
33             SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
34                 @Override
35                 public MessageQueue select(List<MessageQueue> mqList, Message msg, Object obj) {
36                     //根据id选择要发送的消息队列queue
37                     long indentId = (Long) obj;
38                     long index = indentId % mqList.size();
39                     return mqList.get((int) index);
40                 }
41             }, Long.valueOf(indentList.get(i).getOrderId()));
42 
43             // 发送结果打印
44             System.out.printf("发送状态:%s, 消息队列id:%d, 消息内容:%s %n",
45                     sendResult.getSendStatus(),
46                     sendResult.getMessageQueue().getQueueId(),
47                     msgBody);
48         }
49 
50         // 关闭生产实例
51         ClientUtils.shutdownProducer(producer);
52     }
53 
54 
55     /**
56      * 生成模拟订单数据
57      */
58     private static List<Indent> buildIndents() {
59         List<Indent> indentList = new ArrayList<Indent>();
60 
61         Indent indentDemo = new Indent("15103111039", "0");
62         indentList.add(indentDemo);
63 
64         indentDemo = new Indent("15103111065", "1");
65         indentList.add(indentDemo);
66 
67         indentDemo = new Indent("15103111039", "2");
68         indentList.add(indentDemo);
69 
70         indentDemo = new Indent("15103117235", "3");
71         indentList.add(indentDemo);
72 
73         indentDemo = new Indent("15103111065", "4");
74         indentList.add(indentDemo);
75 
76         indentDemo = new Indent("15103117235", "5");
77         indentList.add(indentDemo);
78 
79         indentDemo = new Indent("15103111065", "6");
80         indentList.add(indentDemo);
81 
82         indentDemo = new Indent("15103111039", "7");
83         indentList.add(indentDemo);
84 
85         indentDemo = new Indent("15103117235", "8");
86         indentList.add(indentDemo);
87 
88         indentDemo = new Indent("15103111039", "9");
89         indentList.add(indentDemo);
90 
91         return indentList;
92     }
93 }

 

3、顺序消息消费者

 1 import com.snails.rmq.common.RMQConstant;
 2 import com.snails.rmq.utils.ClientUtils;
 3 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 4 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 5 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 6 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 7 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 8 import org.apache.rocketmq.common.message.MessageExt;
 9 
10 import java.util.List;
11 import java.util.Random;
12 import java.util.concurrent.TimeUnit;
13 
14 /**
15  * SUCCESS 确认消费
16  * SUSPEND_CURRENT_QUEUE_A_MOMENT  稍后消费
17  *
18  * @Description: 消费顺序消息
19  */
20 public class OrderConsumer {
21 
22     public static void main(String[] args) {
23         // 获取消费者实例
24         String subExpression = RMQConstant.MSG_TAG_A.concat(" || ").concat(RMQConstant.MSG_TAG_B).concat(" || ").concat(RMQConstant.MSG_TAG_C);
25         DefaultMQPushConsumer consumer =
26                 ClientUtils.gainConsumerInstance(RMQConstant.ORDER_GROUP, RMQConstant.NAEMSRV_ADDR, RMQConstant.ORDER_TOPIC, subExpression);
27 
28         /**
29          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
30          * 如果非第一次启动,那么按照上次消费的位置继续消费
31          */
32         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
33 
34         // 注册回调实现类来处理从broker拉取回来的消息
35         consumer.registerMessageListener(new MessageListenerOrderly() {
36 
37             Random random = new Random();
38 
39             @Override
40             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
41                 context.setAutoCommit(true);
42                 for (MessageExt msg : msgs) {
43                     // 每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
44                     System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
45                     try {
46                         // TODO 业务逻辑处理
47                         TimeUnit.SECONDS.sleep(random.nextInt(10));
48                     } catch (Exception e) {
49                         e.printStackTrace();
50                     }
51                 }
52 
53                 return ConsumeOrderlyStatus.SUCCESS;
54             }
55         });
56 
57         // 启动消费者实例
58         ClientUtils.startupConsumer(consumer);
59     }
60 }

 

标签:消费,indentDemo,笔记,顺序,消息,import,RocketMQ
From: https://www.cnblogs.com/RunningSnails/p/17373095.html

相关文章

  • RocketMQ笔记(九):延时/定时消息
    一、什么是延时/定时消息定时/延时消息为RocketMQ中提供的一种消息类型。定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递(被消费者消费),......
  • 工厂模式笔记
    参考教程主要参考了抽象工厂模式和工厂模式-简单工厂、工厂方法、抽象工厂解析代码部分要生产的产品packagefun.seolas.factory.simple;publicclassProduct{}/***形状产品*/interfaceShape{voiddraw();}classCircleimplementsShape{@Ov......
  • Django笔记三十五之admin后台界面介绍
    本文首发于公众号:Hunter后端原文链接:Django笔记三十五之admin后台界面介绍这一篇介绍一下Django的后台界面使用。Django自带了一套后台管理界面,可用于我们直接操作数据库数据,本篇笔记目录如下:创建后台账号以及登录操作注册后台显示的数据表列表字段的显示操作字段值......
  • 【动手学深度学习】第十二章笔记:异步计算、数据并行
    为了更好的阅读体验,请点击这里12.1编译器和解释器原书主要关注的是命令式编程(imperativeprogramming)。Python是一种解释性语言,因此没有编译器给代码优化,代码会跑得很慢。12.1.1符号式编程考虑另一种选择符号式编程(symbolicprogramming),即代码通常只在完全定义了过程之后才......
  • ds:顺序表删除重复元素的算法
    算法思想:1.遍历顺序表、移动元素(把未匹配到目标数据的元素前移i-k个位置)intk=0;inti=0;k用来计数,i用来扫描顺序表。当匹配到目标元素时k++,未匹配到目标元素时就i++遍历,并且要将未匹配到的元素前移i-k个位置。2.修改顺序表的length为length-k 例:删除顺序表中值为x的所有......
  • 【C++学习笔记】类的长度
    //空类长度是1由于可以初始化,所以必须有一个长度1class空类{}//一个函数长度是1其实函数不占长度,多个函数,长度还是为1,为了初始化,必须有一个长度。class一个函数{voidTest();}//一个虚函数类由于有一个虚函数表,所以必须长度为4,多个虚函数,也是4class一个虚函数类......
  • NTT笔记
    NTT笔记前言:这个算法是与FFT类似的,本片不会再从头讲起,建议先去补补课《FFT笔记》。本文只会讲一下互相关联的地方与一些不同的地方。建议:在电脑前放好演算纸和笔。注:本篇文章是我这个小蒟弱写的,真正的dalao请看个玩笑便好,不必争论对错(但是欢迎指出文章存在的小错误)。NT......
  • 「学习笔记」可持久化线段树
    可持久化数据结构(Persistentdatastructure)总是可以保留每一个历史版本,并且支持操作的不可变特性(immutable)。主席树全称是可持久化权值线段树,给定\(n\)个整数构成的序列\(a\),将对于指定的闭区间\(\left[l,r\right]\)查询其区间内的第\(k\)小值。可持久化线段......
  • STL源码分析读书笔记
    主要是关于标准库容器的整理空间配置器主要看SGI的实现,有两个空间配置器_malloc_alloc_template<0>__default_alloc_template<...>用户可以选择单独使用第一个分配器,或者一起使用两个分配器。当用户选择使用两个分配器时,编译器会分别将上述两个分配器typedef成malloc_a......
  • 23.4.24前学习笔记
    可通过document.documentElement.scrollTop=0控制返回页面顶部 scrollTo方法 window.scrollTo(x,y)//控制页面移动到哪  页面尺寸事件 window.addEventListener('resize',function(){    //改变屏幕尺寸时发生变化,可代替媒体查询    letw=documen......