首页 > 编程语言 >RocketMq5.0 任意延迟时间 TimerMessageStore 源码解析

RocketMq5.0 任意延迟时间 TimerMessageStore 源码解析

时间:2023-07-06 14:57:45浏览次数:46  
标签:TimerMessageStore 插槽 延迟时间 消息 timerLog timerWheel 数据 源码

TimerMessageStore 简略介绍

  • 延迟队列 rmq_sys_wheel_timer
  • 指定时间的延迟消息。会先投递到 rmq_sys_wheel_timer 队列中
  • 然后由 TimerMessageStore 消费队列数据,将数据消费到 timerWheel 使用时间轮算法,实现秒级任务

TimerMessageStore 操作的文件

  • store\consumequeue\rmq_sys_wheel_timer 从队列中读取消息, 提取数据存到 timerlogtimerwheel
  • store\checkpoint 对应 TimerMessageStore#timerCheckpoint
    • lastReadTimeMs 上次消费的时间节点
    • lastTimerLogFlushPos 最后刷新 log的 pos
    • lastTimerQueueOffset 最后一次消费的队列节点
    • masterTimerQueueOffset 主 Broker 的队列消费节点
  • store\timerwheel 时间轮,内由 Slot 组成 结构如下
    • timeMs 消息到达时间
    • firstPos 开始的 pos
    • lastPos 结束的 pos 在 timerLog 中读取数据, 后面会讲具体逻辑
    • num 消息数量
    • magic no use now, just keep it
  • store\timerlog 对应 TimerMessageStore#timerCheckpoint
    里边也是由多个 mappedFile 组成。
    主要是存储原msg的数据,
    因为从 rmq_sys_wheel_timer 消费了之后,
    会存到 timerwheeltimerlog

TimerMessageStore 启动

  • enqueueGetService.start();
  • enqueuePutService.start();
  • dequeueWarmService.start();
  • dequeueGetService.start();
  • timerFlushService.start();
  • dequeueGetMessageServices[getThreadNum].start();
  • dequeuePutMessageServices[getThreadNum].start();

深入 TimerMessageStore 之 TimerEnqueueGetService

  • TimerMessageStore.this.enqueue 默认 100毫秒执行一次
  • 从 消息队列 rmq_sys_wheel_timer 消费数据 ps: currQueueOffsetcheckpoint 读取出来的
  • 将消费出来的数据, 封装成 TimerRequest 投入到 enqueuePutQueue
  • currQueueOffset + 1 进入下一个循环 消费下一个 offset 节点

深入 TimerMessageStore 之 TimerEnqueuePutService

  • 消费 enqueuePutQueue 中的数据
  • shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs 检查消费的消息是否已到达投递时间。
    • 到达时间。投递到 dequeuePutQueue.put(req);
    • 消息未到达时间 doEnqueue ->
      • timerWheel.getSlot(delayedTime) 获取延迟时间插槽。
      • 构建 ByteBuffer 投入 timerLog 中数据结构为:
      • |消息大小|前一个节点的pos|magic|log写入时间|延迟时间|offsetPy|sizePy|realTopic|0
      • timerLog.append 返回插入位置 ret
      • 构建 timerWheel |消息到达时间戳|firstPos|ret (timerLog.append返回位置)| 消息数量| 0|

深入 TimerMessageStore 之 TimerDequeueGetService

  • 消费 timerWheel 中的数据
  • 根据 currReadTimeMs 来获取 timerWheel 插槽数据
    • currReadTimeMs 初始化的时候 timerCheckpoint.getLastReadTimeMs() 读取的是上次最后消费的数据
    • 假设broker 宕机了一段时间。那么 currReadTimeMs 会按照上一次宕机的时间开始搜寻数据, 这样子宕机消息也不会丢失。会在启动的那段时间被投递出去
    • currReadTimeMsmoveReadTime 方法中会自增
  • timerWheel.getSlot(currReadTimeMs); 读取插槽数据
    • long currOffsetPy = slot.lastPos; 读取插槽属性, 最后一个pos节点
    • timerLog.getWholeBuffer(currOffsetPy) 根据 currOffsetPy 获取 SelectMappedBufferResult
    • timerLogSelectMappedBufferResult 中获取数据。
      • prevPos 上一个节点数据
      • enqueueTime 放入 timerLog 的时间
      • delayedTime 消息到达时间戳
      • offsetPy commitLog的数据位置
      • sizePy commitLog的数据大小
    • 构建 TimerRequest 讲消息投递到 dequeueGetQueue
    • currOffsetPy = prevPos 将位置移动到前一个,进行遍历

深入 TimerMessageStore 之 TimerDequeueGetMessageService

  • 默认有三个 TimerDequeueGetMessageService 实例同时消费 dequeueGetQueue
  • getMessageByCommitOffsetcommitLog 中读取原投递的消息数据
  • 读取 uniqkey 判断不在 deleteList 中的时候 将消息投递到 dequeuePutQueue 中去

深入 TimerMessageStore 之 TimerDequeuePutMessageService

  • 默认有三个 TimerDequeuePutMessageService 实例同时消费 dequeuePutQueue
  • convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic())); 将消息转换成原始的 topic 消息,清除无用属性
  • doPut -> messageStore.putMessage(message) 将消息投递到指定 messageQueue

TimerFlushService

  • timerLog 刷盘
  • timerWheel 刷盘
  • timerCheckpoint 刷盘

TimerMessageStore 初始化加载源码

  • timerLog.load() 加载文件
  • timerMetrics.load 加载文件
  • recover ->
    • recoverAndRevise(lastFlushPos, true) ps: (用于 timerWheltimerLog 的数据保持一致刷新)
      • lastFlushPos 最后一次刷盘的位置, 其实最终是拿到 timerlog -> mappedFile 的第几个文件
      • 遍历这个 mappedFile 的数据
      • timerWheel.reviseSlot 修改插槽数据。 检查这个时间的插槽是否已经有填充数据。
        • 如果有的话,刷新 lastPos (顺序遍历。这里最终还是会是最后一个 lastPos)
        • 如果不存在插槽数据 则插入插槽数据 putSlot
    • reviseQueueOffset(processOffset); 读取 timerLog 最后一个数据, 为了校验最后一个数据是否正常,是否能读取到消息。
    • 确认 currQueueOffset 数据
    • 确认 currReadTimeMs 数据

标签:TimerMessageStore,插槽,延迟时间,消息,timerLog,timerWheel,数据,源码
From: https://www.cnblogs.com/yunlongn/p/17532116.html

相关文章

  • 直播平台源码,默认页面几秒后自动跳转另一页面
    直播平台源码,默认页面几秒后自动跳转另一页面publicclassMainActivityextendsAppCompatActivity{   privatestaticfinallongDELAY=1000;  privateTimerTasktask;   @Override  protectedvoidonCreate(BundlesavedInstanceState){    ......
  • GDAL源码剖析与开发指南 - 李民录 - 2014
    本书适合地理信息系统和遥感等相关专业应用的开发人员阅读参考。本书中大部分的示例代码都是使用C/C++语言编写,有一定C/C++语言基础的读者能够快速上手开发相关应用。目录第1章GDAL简介.................1第2章OGR空间参考.............42第3章OGR库说明........................
  • Red Hat 开始限制 RHEL 源码可得性
    导读RedHat宣布,其企业发行版RHEL(RedHatEnterprise Linux)相关源码现在只能通过CentOS Stream进行公开访问,但付费客户和合作伙伴仍可以通过RedHatCustomerPortal访问源代码。此举将加大社区发行版,如AlmaLinux、RockyLinux以及OracleLinux等提供1:1二进制兼容构建......
  • TensorRT源码编译
    目录1.参考资料2.源码编译2.1.下载TensorRT2.2.TensorRTOSS编译2.2.1.clone指定分支2.2.2.环境依赖2.2.3.编译附1.安装指定版本的cudatoolkit2.安装指定版本的cudnn1.参考资料tensorrt编译https://zhuanlan.zhihu.com/p/346307138tensorrt相关指南https://zhuanlan.zhihu.c......
  • 关于调试gmsh源码过程中产生的gmsh.dll和gmsh.pdb文件无法匹配,进而导致无法载入pdb文
    省流版由于ALL_BUILD会将对应于gmsh.exe的调试文件gmsh.pdb附在对应于gmsh.dll的调试文件gmsh.pdb文件,进而导致gmsh.pdb无法和gmsh.dll文件进行版本匹配,进而导致无法载入,进而导致无法调试gmsh源码;解决办法:将对应于gmsh.exe的gmsh.pdb改为其他任意命名即可;或者仅仅生成gms......
  • 直播源码开发,文字垂直滚动、纵向走马灯
    直播源码开发,文字垂直滚动、纵向走马灯方法一、使用系统控件ViewFlipper方式:布局文件: <ViewFlipper    android:id="@+id/view_flipper"    android:layout_width="300dp"    android:layout_height="35dp"    android:layout_centerInParen......
  • 直播商城源码,加载网页、html文件显示加载进度
    直播商城源码,加载网页、html文件显示加载进度新建加载WebViewActivity新建WebViewActivity加载网页html文件 classWebViewActivity:AppCompatActivity(){     overridefunonCreate(savedInstanceState:Bundle?){    super.onCreate(savedInstanceSta......
  • 我坚定的认为,这个源码肯定是有 BUG 的!
    你好呀,我是歪歪。上周我不是发了《我试图给你分享一种自适应的负载均衡。》这篇文章嘛,里面一种叫做“自适应负载均衡”的负载均衡策略,核心思路就是从多个服务提供者中随机选择两个出来,然后继续选择两者中“负载”最小的那个节点。前几天有读者看了文章后找到我,提出了两个问题。......
  • 语音直播源码知识分享:探索新的沟通方式
     语音直播是一种借助在线平台或应用程序进行实时语音传输的形式,它在互联网发展的背景下逐渐兴起,并受到越来越多人的关注和喜爱。 随着互联网的快速发展和社交媒体的普及,传统文字、图像的沟通方式已经无法满足人们日益增长的交流需求。在这个背景下,语音直播作为一种创新的沟通......
  • 客服系统机器人源码-微信专属个人助理可对接GPT
    我们大家都见过很多微信机器人,对接了GPT以后效果非常好,可以作为微信群里的助理,帮助我们回答各种问题 现在我来分享一下微信机器人的源码,该源码是golang开发,可以实现模拟个人微信桌面版登录,监听到微信的各种消息。下面就是完整代码packagemainimport("fmt""gi......