延迟消息
延迟等级
官方默认设置了 18 哥延迟等级
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
发送延迟消息:按照默认顺序 1-18 数字就对应上面的延迟时间
Message msg = new Message (TOPIC, TAG, "OrderID199", "ok", getBytes(StandardCharsets.UTF_8));
//设置延迟等级
msg.setDelayTimeLevel(3);
producer.send(msg);
基本原理
延迟消息都会被存储到 RocketMQ 的一个内部 Topic:SCHEDULE_TOPIC_XXXX 中
SCHEDULE_TOPIC_XXXX 共有 18 个 MessageQueue:
- 对应延迟消息的 18 个等级,根据指定的 DelayTimeLevel 来决定选择哪个 MessageQueue
- 有一个定时任务,每 100 ms 执行一次判断 SCHEDULE_TOPIC_XXXX Topic 中的 MessageQueue 的消息是否到达延迟时间
- 若到达延迟时间,将 SCHEDULE_TOPIC_XXXX 中的消息投递到消息最初需要投递的 Topic 之中
为什么不支持任意时间?
RocketMQ 并不支持任意时间的延迟,可能的主要原因就是如果提供任意时间,就会涉及到消息的排序,会有一定的性能损耗
事务消息
RocketMQ 采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或失败的消息
基本流程
第一阶段:
- 发送 Message,Half Message ,即半事务消息
- 此类型的 Message 是不会被 Consumer 消费的
第二阶段:如果半事务消息投递成功,则会开始执行本地事务
分为如下三种 Case:
- 本地事务执行成功:会为 Broker 发送 commit 消息,被 commit 过后的 Message 才能被 Consumer 消费
- 本地事务执行失败:
- 会为 Broker 发送 rollback 消息,Broker 则会将刚刚投递的半事务消息删除,从而保证上下游数据的一致性
- 如果 Producer 实例或者网络出现问题,Producer 没能及时 de 将本地事务执行的结果通知 Broker,Broker 会通过扫描发现某条 Message 长时间处于半事务状态,Broker 会主动 de 给 Producer 询问此 Message 对应的事务状态
基本设计
采用 2PC 两阶段设计:
将 Message 原本真实的 Topic 和 MessageQueue 进行备份
- 存入到 PROPERTY_REALTOPC、PROPERTY_REAL_QUEUE_ID 中
将消息投递到一个内部 Topic 中 RMQ_SYS_TRANS_HALF_TOPIC,该队列专门存储事务消息
所有的 Half Message 全部都写入到 queueId 为 0 的 MessageQueue,因为一个 Topic 下只有 1 个 MessageQueue:
- 这个 Topic 下的所有 Message 就是全局有序的,ta 们会按照先来后到的顺序被消费
如果本地事务执行成功进行 Commit,则将 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息投递到真实的 Topic 中,供后续流程执行
- 并删除这条 Half Message,但删除也是假删除,只是给 Message 打上一个删除的 tag
如果本地事务执行失败进行 rollback,则直接删除这条 Half Message,但删除也是假删除
如果本地事务吃吃没有返回结果(默认时间 6s),则会触发事务回查机制
- 执行回查之前需要校验检查次数是否达到了最大值(需要手动设置,没有默认值)
- 或者是当前 Half Message 存在是否超过了 Message 保存的上限,即 3 天
- 如果满足上面条件中的一种 Half Message 会被放进 TRANS_CHECK_MAX_TOPIC Topic 中
- 一旦判定为需要执行事务回查逻辑,那么当前这条 Half Message 就算已经被消费了
- 在没有达到最大的校验次数之前,都还需要将其投递到事务队列当中,以便下次重试时再次执行 Check 逻辑
- 如果回查成功,则删除投递的 Half Message
消息重试
重试时间
消息消费失败后,并不会立即重试,而是一个递增的时间间隔来进行重试的,重试次数默认为 16 次
只比延迟消息的时间间隔等级少了前两个,延迟消息总共有 18 个等级,而消息重试使用了延迟消息的第 3-18 等级
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
基本原理
重试的 Message,RocketMQ 的做法并不是将其投递回原来的 Topic,而是重试队列
每个 ConsumerGroup 都有自己的重试队列:
- 其名称是由特定的前缀拼接上 ConsumerGroup 所组成,默认 %RETR% + 消费者组名称
- 所有在 Consumer 启动时,就会同时消费其 ConsumerGroup 对应的重试队列和普通队列
消费失败的 Message,Consumer 会将其投回 Broker:
- 相当于这条 Message 已经被消费掉了,之后重试的只是内容相同,但实际不是同一个的 Message
- 然后会校验重试的次数,如果达到 16 次,则会进入死信队列,组成为 %DLQ% + 消费者组名称
- 未达到最大重试次数,则会根据重试间隔时间等级将其投递到延迟队列 SCHEDULE_TOPIC_XXXX 中
- 然后等到了延迟等级对应的时间后,在投递到 ConsumerGroup 所对应的重试队列当中,供后续消费
消息存储
整体架构
RocketMQ 的混合性存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog中)
针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构
Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中
核心步骤:
- 首先,生产者根据 topic 发送消息,消息存储在 commitLog中,1 G一个文件,当文件满了,写入下一个文件
- 其次,ReputMessageService 重写消息服务执行 2 个分发操作:
- 创建 ConsumerQueue 逻辑消费队列:
- 参数:commitLogOffset 物理偏移量、msgSize 消息长度、tagsCode tag 哈希
- 创建 IndexFile 索引文件:
- 以创建时的时间戳命名
- 创建 ConsumerQueue 逻辑消费队列:
- 最后,消费者根据 topic、tag 拉取消息消费,根据 key 查询消息
重要文件
commitLog 消息日志:
- 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容
consumequeue 逻辑消费队列:
- 存储了 commitLog 的起始物理 offset,目的是提高消费的性能
indexFile 索引文件:
- 提供了一种可以通过 key 或者时间区间来查询消息的方法
consumequeue 文件:
consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为:
- 8 字节的 commmitLog 物理偏移量
- 4 字节 的消息长度
- 8 字节 tag hashcode
单个文件由 30w 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M
- 默认一个 topic 对应 4 个 queueId,即 4 个 messageQueue
每个 messageQueue 文件夹下有多个 consumeQueue,所以:messageQueue 1 :N consumeQueue
通信机制
通信架构图
基本通讯流程如下:
- Broker 启动后需要完成一次将自己注册至 NameServer 的操作,随后每隔 30s 时间定时向 NameServer 上报 Topic 路由信息
- 消息生成者 Producer 作为客户端发送消息时,需要根据消息的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息
- 如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息
- 消息生产者 Producer 根据获取的路由信息选择一个队列(MessageQueue) 进行消息发送
- Broker 作为消息的接收者接收消息并落盘存储
- 消息消费者 Consumer 根据获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费
为了实现客户端与服务器之间高效的数据请求与接收:
- RocketMQ-Remoting 包自定义了通信协议并在 Netty 的基础之上扩展了通信模块