首页 > 其他分享 >详解RocketMQ 顺序消费机制

详解RocketMQ 顺序消费机制

时间:2023-05-30 15:24:25浏览次数:59  
标签:消费 分区 FIFO 订单 详解 消息 RocketMQ 顺序

摘要:顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

本文分享自华为云社区《RocketMQ 顺序消费机制》,作者: 勇哥java实战分享 。

顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

顺序消息分为分区顺序消息和全局顺序消息。

1、分区顺序消息

对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

  • 适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
  • 示例:电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

2、全局顺序消息

对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。

  • 适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
  • 示例:在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。

全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。

因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。

消息的顺序需要由两个阶段保证:

  • 消息发送

如上图所示,A1、B1、A2、A3、B2、B3 是订单 A 和订单 B 的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单 A 的消息发送和消费都按照 A1、A2、A3 的顺序。

如果是普通消息,订单A 的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时 RocketMQ 支持将 Sharding Key 相同(例如同一订单号)的消息序路由到一个队列中。

  • 消息消费

消费者消费消息时,需要保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致。

我们知道负载均衡服务是客户端开始消费的起点。在负载均衡阶段,并发消费和顺序消费并没有什么大的差别,最大的差别在于:向 Borker 申请锁 。

消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试。

见上图,顺序消费核心流程如下:

1、 组装成消费对象

2、 将请求对象提交到消费线程池

和并发消费不同的是,这里的消费请求包含消费快照 processQueue ,消息队列 messageQueue 两个对象,并不对消息列表做任何处理。

3、 消费线程内,对消费队列加锁

4、 从消费快照中取得待消费的消息列表

消费快照 processQueue 对象里,创建了一个红黑树对象 consumingMsgOrderlyTreeMap 用于临时存储的待消费的消息。

5、 执行消息监听器

执行监听器逻辑容易理解,消费快照的消费锁 consumeLock的作用是:防止 Rebalance 线程把当前消费的 MessageQueue 对象移除掉。

6、 处理消费结果

消费成功时,首先计算需要提交的偏移量,然后更新本地消费进度。

消费失败时,分两种场景:

  • 假如已消费次数小于最大重试次数,则将放入对象 consumingMsgOrderlyTreeMap 用例临时存储的待消费的消息,重新加入到消费快照红黑树msgTreeMap中,然后使用定时任务尝试重新消费。
  • 假如已消费次数大于等于最大重试次数,则将失败消息发送到 Broker ,Broker 接收到消息后,会加入到死信队列里 , 最后计算需要提交的偏移量,然后更新本地消费进度。

我们做一个关于顺序消费的总结:

  1. 顺序消费需要由两个阶段消息发送消息消费协同配合,底层支撑依靠的是 RocketMQ 的存储模型;
  2. 顺序消费服务启动后,通过三把锁的机制,消息队列 messageQueue 的数据都会被消费者实例单线程的执行消费;
  3. 假如消费者扩容,消费者重启,或者 Broker 宕机 ,顺序消费也会有一定几率较短时间内乱序,所以消费者的业务逻辑还是要保障幂等

 

点击关注,第一时间了解华为云新鲜技术~

标签:消费,分区,FIFO,订单,详解,消息,RocketMQ,顺序
From: https://www.cnblogs.com/huaweiyun/p/17443332.html

相关文章

  • 一文详解 Sa-Token 中的 SaSession 对象
    Sa-Token是一个轻量级java权限认证框架,主要解决登录认证、权限认证、单点登录、OAuth2、微服务网关鉴权等一系列权限相关问题。Gitee开源地址:https://gitee.com/dromara/sa-token本文将详细介绍Sa-Token中的不同SaSession对象的区别,以及各种方便的存取值的方法。一......
  • 基于ZigBee3.0技术的数传电台功能使用详解
    一、ZigBee3.0数传电台功能简介1、4G DTU数传电台LINK灯详解基于zigbee3.0通信技术的4G DTU数传电台LINK灯用于指示模块当前网络状态,设备入网成功后LINK灯常亮,当设备没有网络时LINK灯熄灭;在协调器模式下,该引脚指示zigbee模块是否正常建立网络,协调器和路由器在配网模式下1Hz闪......
  • C++结构体对齐详解
    内存对齐是一种提高内存访问速度的策略,CPU在访问未对齐的内存可能需要经过两次的内存访问,而经过内存对齐一次就可以了cout<<"char:"<<sizeof(char)<<endl;cout<<"int:"<<sizeof(int)<<endl;cout<<"short:"<<sizeof(short)<<endl;cout&l......
  • SpringBoot集成RocketMQ,rocketmq_client.log日志文件配置
    SpringBoot项目集成rocketmq-client<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version></dependency>项目启动时会在${user.home}/logs目录下创建一个roc......
  • 《asyncio 系列》3. 详解 Socket(阻塞、非阻塞),以及和 asyncio 的搭配
    楔子在前面两篇文章中,我们介绍了协程、任务和事件循环,研究了如何同时运行长耗时的操作,并探索了一些可以优化此操作的asyncioAPI。然而,到目前为止,我们只是用asyncio.sleep函数模拟了长时间的操作。由于我们想要构建的不仅是演示应用程序,因此我们将使用一些真实世界的阻塞操作......
  • 剑指 Offer 58 - I. 翻转单词顺序
    题目描述:输入一个英文句子,翻转句子中单词的顺序,但单词内字符的顺序不变。为简单起见,标点符号和普通字母一样处理。例如输入字符串"Iamastudent.",则输出"student.aamI"。  方法:分割+倒序  classSolution{publicStringreverseWords(Strings){......
  • RocketMQ4.9.5集群部署
    RocketMQ集群部署背景:生产环境单机的MQ不具有高可用,所以我们应该部署成集群模式,这里给大家部署一个双主双从异步复制的Broker集群一、单机部署、部署前提参考https://www.cnblogs.com/hsyw/p/17428530.htmlhttps://www.cnblogs.com/hsyw/p/17429834.html二、集群部署......
  • web安全详解(渗透测试基础)
    一、Web基础知识1.http协议超文本传输协议是互联网上应用最广泛的一种网络协议。所有www文件都必须遵守的一个标准,是以ASCII码传输,建立在TCP/IP协议之上的应用层规范,简单点说就是一种固定的通讯规则。2.网络三种架构及特点网络应用程序架构包括三种:客户机/服务器结构(C/S)......
  • Kubernetes GoRoutineMap工具包代码详解
    1、概述GoRoutineMap定义了一种类型,可以运行具有名称的goroutine并跟踪它们的状态。它防止创建具有相同名称的多个goroutine,并且在上一个具有该名称的goroutine完成后的一段退避时间内可能阻止重新创建goroutine。使用GoRoutineMap场景:使用协程的方式运行函数逻辑,如果函......
  • k8s探针详解
    一、探针类型介绍:(1)、K8s中存在三种类型的探针:livenessprobe、readinessprobe和startup探针。每类探针都支持三种探测方法liveness探针:影响的是单个容器,如果检查失败,将杀死容器,根据pod的restartPolicy来操作。readiness探针:影响的是整个pod,即如果pod中有多个容器,只要有一......