首页 > 其他分享 >RocketMQ笔记(十一):消息存储删除机制

RocketMQ笔记(十一):消息存储删除机制

时间:2023-05-05 09:35:37浏览次数:41  
标签:文件 存储 删除 笔记 消息 ConsumeQueue CommitLog RocketMQ

  RocketMQ的消息采用文件进行持久化存储。

1、存储目录详情

  RocketMQ中默认文件存储位置/root/store,文件详情如下

 0

  commitLog:消息存储目录

  config:运行期间一些配置信息

  consumerqueue:消息消费队列存储目录

  index:消息索引文件存储目录

  checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。

2、消息存储结构

  RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

 CommitLog:存储消息的元数据。

 ConsumerQueue:存储消息在CommitLog的索引。

 IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程。

2.1、物理存储文件 - CommitLog

  CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享。在CommitLog 中,一个消息的存储长度是不固定的, RocketMQ采取一些机制,尽量向CommitLog 中顺序写 ,但是随机读。commitlog 文件默认大小为lG ,可通过在 broker 置文件中设置 mappedFileSizeCommitLog属性来改变默认大小。

  Commitlog文件存储的逻辑视图如下,每条消息的前面4个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。

 0

每个 CommitLog 文件的大小为 1G,一般情况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为 1073741824 (1G = 1073741824byte)。

 0

  每台Rocket只会往一个commitlog文件中写,写完一个接着写下一个。

  indexFile 和 ComsumerQueue 中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个 CommitLog 文件上。

2.2、消息逻辑队列 - ConsumeQueue

  ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。

consmequeue文件夹详情:

 0

 0

  ConsumeQueue中存储的是消息条目,为了加速 ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个 Consumequeue条目不会存储消息的全量信息,消息条目如下:

 0

  ConsumeQueue 即为Commitlog 文件的索引文件, 其构建机制是 当消息到达 Commitlog 文件后 由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与下文提到的索引文件。

存储机制这样设计有以下几个好处:

1、CommitLog 顺序写 ,可以大大提高写入效率

  顺序写的速度远超随机写的速度。

2、页缓存机制

  RocketMq读取消息是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。

  为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构 ,因为ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。

  为了保证 CommitLog和ConsumeQueue 的一致性, CommitLog 里存储了 Consume Queues 、Message Key、 Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。

2.3、索引文件 - indexFie

  RocketMQ支持通过MessageID或者MessageKey来查询消息;使用ID查询时,因ID就是用broker+offset生成的(这里msgId指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。对于用MessageKey来查询消息,RocketMQ则通过构建一个index来提高读取速度。

  index 存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列 RocketMQ 专门为消息订阅构建的索引文件 ,提高根据主题与消息检索消息的速度 ,使用Hash索引机制,具体是Hash槽与Hash冲突的链表结构。

 0

2.4、配置信息 - Config

  config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。

 0

  topics.json :topic 配置属性

  subscriptionGroup.json:消息消费组配置信息。

  delayOffset.json:延时消息队列拉取进度。

  consumerOffset.json:集群消费模式消息消进度。

  consumerFilter.json:主题消息过滤信息。

2.5、文件检测点 - checkpoint

  checkpoint :文件检测点,存储 commitlog 文件最后一次刷盘时间戳、 consumequeue最后一次刷盘时间、 index 索引文件最后一次刷盘时间戳。

3、消息删除机制

  由于 RocketMQ 操作 CommitLog,ConsumeQueue文件是基于内存映射机制并在启动的时候会加载 commitlog,ConsumeQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。

  删除过程分别执行清理消息存储文件( Commitlog )与消息消费 队列文件( ConsumeQueue 文件), 消息消费队列文件与消息存储文件( Commitlog )共用一套过期文件机制。

  RocketMQ 清除过期文件的方法是 :如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除, RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 42小时(不同版本的默认值不同,这里以4.4.0为例) ,通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。

  触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每10s执行一次。

3.1、过期判断

  文件删除主要是由这个配置属性:fileReservedTime:文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件, 可以删除。

另外还有其他两个配置参数:

  deletePhysicFilesInterval:删除物理文件的时间间隔(默认是100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件后需要间隔deletePhysicFilesInterval这个时间再删除另外一个文件,由于删除文件是一个非常耗费IO的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。

  destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳 destroyMapedFileIntervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用 小于等于 0为止,即可删除该文件。

3.2、删除条件

3.2.1、指定删除文件的时间点

  RocketMQ 通过 deleteWhen 设置一天的固定时间执行一次。删除过期文件操作, 默认为凌晨4点。

3.2.2、磁盘空间是否充足

  若磁盘空间不充足(DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是85),会触发过期文件删除操作。

3.2.3、RocketMQ的磁盘配置参数

  1、物理使用率大于diskSpaceWarningLevelRatio(默认90%可通过参数设置),则会阻止新消息的插入。

  2、物理磁盘使用率小于diskMaxUsedSpaceRatio(默认75%) 表示磁盘使用正常。

 

标签:文件,存储,删除,笔记,消息,ConsumeQueue,CommitLog,RocketMQ
From: https://www.cnblogs.com/RunningSnails/p/17373140.html

相关文章

  • OpenResty学习笔记03:深入体验WAF
    一.WAF概况  二.Lua介绍  三.文件说明  四.引用关系  五.测试&体验  六.本篇总结  ......
  • RocketMQ笔记(十):事务消息
    事务消息官网:RocketMQ官网-事务消息。一、什么是事务消息事务消息是RocketMQ提供的一种消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。二、事务消息的原理2.1、事务消息的生命周期2.1.1、初始化半事务消息被生产者构建并完成初始化,待发......
  • RocketMQ笔记(八):顺序消息
    一、什么是顺序消息消息有序指的是可以按照消息的发送顺序来消费(FIFO)。顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消息的顺序关系通过消......
  • RocketMQ笔记(九):延时/定时消息
    一、什么是延时/定时消息定时/延时消息为RocketMQ中提供的一种消息类型。定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递(被消费者消费),......
  • 工厂模式笔记
    参考教程主要参考了抽象工厂模式和工厂模式-简单工厂、工厂方法、抽象工厂解析代码部分要生产的产品packagefun.seolas.factory.simple;publicclassProduct{}/***形状产品*/interfaceShape{voiddraw();}classCircleimplementsShape{@Ov......
  • Django笔记三十五之admin后台界面介绍
    本文首发于公众号:Hunter后端原文链接:Django笔记三十五之admin后台界面介绍这一篇介绍一下Django的后台界面使用。Django自带了一套后台管理界面,可用于我们直接操作数据库数据,本篇笔记目录如下:创建后台账号以及登录操作注册后台显示的数据表列表字段的显示操作字段值......
  • 【动手学深度学习】第十二章笔记:异步计算、数据并行
    为了更好的阅读体验,请点击这里12.1编译器和解释器原书主要关注的是命令式编程(imperativeprogramming)。Python是一种解释性语言,因此没有编译器给代码优化,代码会跑得很慢。12.1.1符号式编程考虑另一种选择符号式编程(symbolicprogramming),即代码通常只在完全定义了过程之后才......
  • 【C++学习笔记】类的长度
    //空类长度是1由于可以初始化,所以必须有一个长度1class空类{}//一个函数长度是1其实函数不占长度,多个函数,长度还是为1,为了初始化,必须有一个长度。class一个函数{voidTest();}//一个虚函数类由于有一个虚函数表,所以必须长度为4,多个虚函数,也是4class一个虚函数类......
  • 再谈USB存储设备的使用痕迹
    近来有小伙伴在看了我先前的文章-<如何检视USB存储设备的使用记录>,如下所示:https://www.cnblogs.com/pieces0310/p/15943567.html 仍然想要进一步知道如何具体操作,因此,我就再次进行说明好了~ 首先,关于你提到的问题,其实操作系统的机制本就不在为操作行为留下记录,说穿了......
  • NTT笔记
    NTT笔记前言:这个算法是与FFT类似的,本片不会再从头讲起,建议先去补补课《FFT笔记》。本文只会讲一下互相关联的地方与一些不同的地方。建议:在电脑前放好演算纸和笔。注:本篇文章是我这个小蒟弱写的,真正的dalao请看个玩笑便好,不必争论对错(但是欢迎指出文章存在的小错误)。NT......