首页 > 其他分享 >RocketMQ 必知概念

RocketMQ 必知概念

时间:2024-10-06 20:44:43浏览次数:14  
标签:Topic 队列 必知 Broker 重试 概念 消息 Message RocketMQ

延迟消息

延迟等级

官方默认设置了 18 哥延迟等级

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

发送延迟消息:按照默认顺序 1-18 数字就对应上面的延迟时间

Message msg = new Message (TOPIC, TAG, "OrderID199", "ok", getBytes(StandardCharsets.UTF_8));
//设置延迟等级
msg.setDelayTimeLevel(3);
producer.send(msg);

基本原理

延迟消息都会被存储到 RocketMQ 的一个内部 Topic:SCHEDULE_TOPIC_XXXX 中

SCHEDULE_TOPIC_XXXX 共有 18 个 MessageQueue:

  • 对应延迟消息的 18 个等级,根据指定的 DelayTimeLevel 来决定选择哪个 MessageQueue
  • 有一个定时任务,每 100 ms 执行一次判断 SCHEDULE_TOPIC_XXXX Topic 中的 MessageQueue 的消息是否到达延迟时间
  • 若到达延迟时间,将 SCHEDULE_TOPIC_XXXX 中的消息投递到消息最初需要投递的 Topic 之中

为什么不支持任意时间?

RocketMQ 并不支持任意时间的延迟,可能的主要原因就是如果提供任意时间,就会涉及到消息的排序,会有一定的性能损耗

事务消息

RocketMQ 采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或失败的消息

image-20241006131642263

基本流程

第一阶段:

  • 发送 Message,Half Message ,即半事务消息
  • 此类型的 Message 是不会被 Consumer 消费的

第二阶段:如果半事务消息投递成功,则会开始执行本地事务

分为如下三种 Case:

  • 本地事务执行成功:会为 Broker 发送 commit 消息,被 commit 过后的 Message 才能被 Consumer 消费
  • 本地事务执行失败:
    • 会为 Broker 发送 rollback 消息,Broker 则会将刚刚投递的半事务消息删除,从而保证上下游数据的一致性
    • 如果 Producer 实例或者网络出现问题,Producer 没能及时 de 将本地事务执行的结果通知 Broker,Broker 会通过扫描发现某条 Message 长时间处于半事务状态,Broker 会主动 de 给 Producer 询问此 Message 对应的事务状态

基本设计

采用 2PC 两阶段设计:

image-20241006133353449


将 Message 原本真实的 Topic 和 MessageQueue 进行备份

  • 存入到 PROPERTY_REALTOPC、PROPERTY_REAL_QUEUE_ID 中

将消息投递到一个内部 Topic 中 RMQ_SYS_TRANS_HALF_TOPIC,该队列专门存储事务消息

所有的 Half Message 全部都写入到 queueId 为 0 的 MessageQueue,因为一个 Topic 下只有 1 个 MessageQueue:

  • 这个 Topic 下的所有 Message 就是全局有序的,ta 们会按照先来后到的顺序被消费

如果本地事务执行成功进行 Commit,则将 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息投递到真实的 Topic 中,供后续流程执行

  • 并删除这条 Half Message,但删除也是假删除,只是给 Message 打上一个删除的 tag

如果本地事务执行失败进行 rollback,则直接删除这条 Half Message,但删除也是假删除

如果本地事务吃吃没有返回结果(默认时间 6s),则会触发事务回查机制

  • 执行回查之前需要校验检查次数是否达到了最大值(需要手动设置,没有默认值)
  • 或者是当前 Half Message 存在是否超过了 Message 保存的上限,即 3 天
  • 如果满足上面条件中的一种 Half Message 会被放进 TRANS_CHECK_MAX_TOPIC Topic 中
  • 一旦判定为需要执行事务回查逻辑,那么当前这条 Half Message 就算已经被消费了
  • 在没有达到最大的校验次数之前,都还需要将其投递到事务队列当中,以便下次重试时再次执行 Check 逻辑
  • 如果回查成功,则删除投递的 Half Message

消息重试

重试时间

消息消费失败后,并不会立即重试,而是一个递增的时间间隔来进行重试的,重试次数默认为 16 次

只比延迟消息的时间间隔等级少了前两个,延迟消息总共有 18 个等级,而消息重试使用了延迟消息的第 3-18 等级

10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

基本原理

重试的 Message,RocketMQ 的做法并不是将其投递回原来的 Topic,而是重试队列

每个 ConsumerGroup 都有自己的重试队列:

  • 其名称是由特定的前缀拼接上 ConsumerGroup 所组成,默认 %RETR% + 消费者组名称
  • 所有在 Consumer 启动时,就会同时消费其 ConsumerGroup 对应的重试队列和普通队列

消费失败的 Message,Consumer 会将其投回 Broker:

  • 相当于这条 Message 已经被消费掉了,之后重试的只是内容相同,但实际不是同一个的 Message
  • 然后会校验重试的次数,如果达到 16 次,则会进入死信队列,组成为 %DLQ% + 消费者组名称
  • 未达到最大重试次数,则会根据重试间隔时间等级将其投递到延迟队列 SCHEDULE_TOPIC_XXXX 中
  • 然后等到了延迟等级对应的时间后,在投递到 ConsumerGroup 所对应的重试队列当中,供后续消费

消息存储

整体架构

RocketMQ 的混合性存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog中)

针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构

Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中

image-20241006191349416


核心步骤:

  1. 首先,生产者根据 topic 发送消息,消息存储在 commitLog中,1 G一个文件,当文件满了,写入下一个文件
  2. 其次,ReputMessageService 重写消息服务执行 2 个分发操作:
    • 创建 ConsumerQueue 逻辑消费队列:
      • 参数:commitLogOffset 物理偏移量、msgSize 消息长度、tagsCode tag 哈希
    • 创建 IndexFile 索引文件:
      • 以创建时的时间戳命名
  3. 最后,消费者根据 topic、tag 拉取消息消费,根据 key 查询消息

重要文件

commitLog 消息日志:

  • 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容

consumequeue 逻辑消费队列:

  • 存储了 commitLog 的起始物理 offset,目的是提高消费的性能

indexFile 索引文件:

  • 提供了一种可以通过 key 或者时间区间来查询消息的方法

consumequeue 文件:

consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为:

  • 8 字节的 commmitLog 物理偏移量
  • 4 字节 的消息长度
  • 8 字节 tag hashcode

单个文件由 30w 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M

  • 默认一个 topic 对应 4 个 queueId,即 4 个 messageQueue

每个 messageQueue 文件夹下有多个 consumeQueue,所以:messageQueue 1 :N consumeQueue

image-20241006192406854

通信机制

通信架构图

image-20241006192645688

基本通讯流程如下:

  • Broker 启动后需要完成一次将自己注册至 NameServer 的操作,随后每隔 30s 时间定时向 NameServer 上报 Topic 路由信息
  • 消息生成者 Producer 作为客户端发送消息时,需要根据消息的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息
    • 如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息
  • 消息生产者 Producer 根据获取的路由信息选择一个队列(MessageQueue) 进行消息发送
    • Broker 作为消息的接收者接收消息并落盘存储
  • 消息消费者 Consumer 根据获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费

为了实现客户端与服务器之间高效的数据请求与接收:

  • RocketMQ-Remoting 包自定义了通信协议并在 Netty 的基础之上扩展了通信模块

标签:Topic,队列,必知,Broker,重试,概念,消息,Message,RocketMQ
From: https://www.cnblogs.com/zhzcc/p/18449383

相关文章

  • 重新定义记忆:语言模型中的概念抹除
    在这个快速发展的AI时代,人们越来越关注如何让机器学习模型具备更好的道德和安全性。尤其是在语言模型(LanguageModels,LMs)方面,如何有效地抹除有害或敏感的概念知识,已成为一个令人瞩目的研究课题。本文将围绕RohitGandikota等人提出的“语言记忆抹除”(ErasureofLanguageM......
  • linux系统相关概念与配置
    一.本地服务器:几个相关软件(一)vmware关键字:虚拟机(搭建虚拟环境)(二)MobaXterm关键字:连接(三)123445![](https://img2024.cnblogs.com/blog/3530901/202410/3530901-20241002141002999-137920567.jpg)![](https://img2024.cnblogs.com/blog/3530901/202410/3530901-20241002141037......
  • Meta:LLM语言概念推理基准Linguini
    ......
  • 网络基础概念
    1.协议的概念1.1什么是协议从应用的角度出发,协议可理解为“规则”,是数据传输和数据的解释的规则。假设,A、B双方欲传输文件。规定:第一次,传输文件名,接收方接收到文件名,应答OK给传输方;第二次,发送文件的尺寸,接收方接收到该数据再次应答一个OK;第三次,传输文件内容。同样,接收方......
  • 掌握RocketMQ消息中间件——基本概念和系统架构篇
    简述RcoketMQ概念:RocketMQ是一个开源的分布式消息中间件,由阿里巴巴开发并贡献给Apache软件基金会。它用于处理高吞吐量、低延迟的消息传递,并广泛应用于现代分布式系统中。1 基本概念1.1消息 (Message)    概念:消息是信息传递的物理载体,生产和消费数据的最小单位,......
  • 【多线程】多线程(1):概念,创建线程
    【多线程的概念】之前写过的所有代码,都只能使用“一个核心”,此时无论如何优化代码,最多只能用一个cpu核心,把这个核心跑满了,其他核心也是空闲着,通过写特殊的代码,把多个cpu核心都能应用起来,此为“并发编程”之前使用的并发模式为“多进程编程”,其在创建/销毁进程时开销较大,一旦......