首页 > 其他分享 >RocketMQ 顺序消费机制

RocketMQ 顺序消费机制

时间:2023-05-29 17:58:02浏览次数:36  
标签:顺序 队列 分区 订单 消费 消息 机制 RocketMQ

顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

顺序消息分为分区顺序消息全局顺序消息

1、分区顺序消息

对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

  • 适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
  • 示例:电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

2、全局顺序消息

对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。

  • 适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
  • 示例:在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。

全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同

因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高

消息的顺序需要由两个阶段保证:

  • 消息发送

    如上图所示,A1、B1、A2、A3、B2、B3 是订单 A 和订单 B 的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单 A 的消息发送和消费都按照 A1、A2、A3 的顺序。

    如果是普通消息,订单A 的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时 RocketMQ 支持将 Sharding Key 相同(例如同一订单号)的消息序路由到一个队列中。

    生产者顺序发送消息封装

  • 消息消费

    消费者消费消息时,使用单线程消费重平衡已分配的消息队列,保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致。

我们知道负载均衡服务是客户端开始消费的起点。在负载均衡阶段,并发消费和顺序消费并没有什么大的差别,最大的差别在于:向 Borker 申请锁

消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试。

s

见上图,顺序消费核心流程如下:

1、 组装成消费对象

2、 将请求对象提交到消费线程池

和并发消费不同的是,这里的消费请求包含消费快照 processQueue ,消息队列 messageQueue 两个对象,并不对消息列表做任何处理。

3、 消费线程内,对消费队列加锁

4、 从消费快照中取得待消费的消息列表

消费快照 processQueue 对象里,创建了一个红黑树对象 consumingMsgOrderlyTreeMap 用于临时存储的待消费的消息。

5、 执行消息监听器

执行监听器逻辑容易理解,消费快照的消费锁 consumeLock的作用是:防止 Rebalance 线程把当前消费的 MessageQueue 对象移除掉。

6、 处理消费结果

消费成功时,首先获取需要提交的偏移量,然后更新本地消费进度。

消费失败时,分两种场景:

  • 假如已消费次数小于最大重试次数,则将放入对象 consumingMsgOrderlyTreeMap 用例临时存储的待消费的消息,重新加入到消费快照红黑树 msgTreeMap中,然后使用定时任务尝试重新消费。
  • 假如已消费次数大于等于最大重试次数,则将失败消息发送到 Broker ,Broker 接收到消息后,会加入到死信队列里 , 获取需要提交的偏移量,然后更新本地消费进度。

我们做一个关于顺序消费的总结:

  1. 顺序消费需要由两个阶段消息发送消息消费协同配合,底层支撑依靠的是 RocketMQ 的存储模型;
  2. 顺序消费服务启动后,通过三把锁的机制,使得消费者实例单线程的消费重平衡分配的消费队列;
  3. 假如发生扩容,消费者重启,或者 Broker 宕机 ,顺序消费也会有一定几率较短时间内乱序,所以消费者的业务逻辑还是要保障幂等

标签:顺序,队列,分区,订单,消费,消息,机制,RocketMQ
From: https://www.cnblogs.com/makemylife/p/17441210.html

相关文章

  • 转:全面了解MSSQL锁机制以及应用
    转自:https://juejin.cn/post/68449038447243427981.锁概念及锁应用1.1锁的概念当用户并发对数据库进行操作时会带来数据不一致的问题,例如:更新丢失(两个用户读同一个数据并进行修改,一个用户破坏了另一个用户的修改结果)脏读(读出尚未提交事务的数据,产生了脏读)不可重复读(用户......
  • 转:SqlServer锁机制与实践
    转自:https://www.cnblogs.com/wangweitr/p/7158023.html在如今这个云计算,大数据,移动互联网大行其道的时代,各种NoSQL数据库MongoDb、redis、HBase等使用的越来越广泛,大有替代关系型数据库的趋势。但是关系型数据库真的已经落伍了吗?答案是否定的。非关系型数据库不支持ACID属性,不支......
  • 内存管理机制
    Python使用自动内存管理机制,具体来说是使用垃圾回收(GarbageCollection)来管理内存。Python中的垃圾回收器负责跟踪不再使用的对象,并在适当的时候释放它们所占用的内存。Python的内存管理机制主要基于引用计数(ReferenceCounting)和循环垃圾收集(CycleGarbageCollection)。引用......
  • Pytest - Fixture(6) - 作用域混用/执行顺序/依赖关系
    Pytest-Fixture作用域混用若测试用例调用多个不同级别的作用域,都会同时生效:conftest.pyimportpytest"""会话级别fixture,作用域当前目录"""@pytest.fixture(scope="session")deflogin_session():"""作用于整个py文件"""pr......
  • 虚函数、纯虚函数、多态与虚表机制详解
    虚函数在类的定义中,前面有virtual关键字的成员函数就是虚函数注:派生类中的成员函数与基类中虚函数同名且参数相同的函数,不加virtual也会自动变成虚函数纯虚函数与抽象类没有函数体的虚函数叫做纯虚函数,包含纯虚函数的类叫抽象类。 例如上面Base中的Examp就是一个纯虚函......
  • 新建T1,T2,T3线程,如何保证它们执行的顺序性
    在多线程中有多种方法让线程按特定顺序执行,可以用线程类的join()方法在一个线程中启动另一个线程,另外一个线程完成该线程继续执行。  ......
  • 几种同步互斥机制的异同
    同步和互斥的区别同步某些进程为完成同一任务需要分工协作,由于合作的每一个进程都是独立地以不可预知的速度推进,这就需要相互协作的进程在某些协调点上协调各自的工作。当合作进程中的一个到达协调点后,在尚未得到其伙伴进程发来的消息或信号之前应阻塞自己,直到其他合作进程发来......
  • 深入理解浏览器的缓存机制
    一、前言缓存可以说是性能优化中简单高效的一种优化方式了。一个优秀的缓存策略可以缩短网页请求资源的距离,减少延迟,并且由于缓存文件可以重复利用,还可以减少带宽,降低网络负荷。对于一个数据请求来说,可以分为发起网络请求、后端处理、浏览器响应三个步骤。浏览器缓存可以帮助我们在......
  • 【基于容器的部署、扩展和管理】3.5 高可用性和故障恢复机制
    3.5高可用性和故障恢复机制云原生的高可用性是指在云原生环境中,通过自动化工具和技术手段,实现软件发布的高可用性机制。其主要思想是通过自动化部署、自动化监控、自动化修复等手段,提高软件系统的可用性和稳定性,从而减少系统故障和停机时间。故障恢复机制是指在云原生环境中,当系......
  • Task机制
    来源:[.NET]Thread与Task的区别-大杂草-博客园(cnblogs.com)(12条消息)C#多线程七任务Task的简单理解与运用一_c#task_一梭键盘任平生的博客-CSDN博客以下几张图片能够清晰看出task运行大概原理Thread  Task     ThreadPool的运行原理 ​ Tas......