首页 > 其他分享 >3分钟白话RocketMQ系列—— 如何存储消息

3分钟白话RocketMQ系列—— 如何存储消息

时间:2023-11-13 12:02:29浏览次数:38  
标签:存储 白话 写入 消息 磁盘 RocketMQ 刷盘


白话3分钟,快速了解RocketMQ如何存储消息。

看完如果不了解,欢迎来打我。

我们知道RocketMQ主要分为消息 生产、存储(消息堆积)、消费 三大块领域。

那接下来,我们白话一下,RocketMQ是如何存储消息的,揭秘消息存储全过程。

注意,如果白话中不小心提到相关代码配置与类名,请参考RocketMQ 4.9.4版本



关键字摘要

  • 存储模型与存储类型
  • 如何保证存储消息不丢失
  • 如何提高写入性能
  • 如何清理过期消息


存储模型是什么?有哪些存储类型?

RocketMQ使用了一种基于日志的存储方式,将消息以顺序写入的方式追加到文件中,从而实现高性能的消息存储和读取。

RocketMQ的消息存储方式可以分为两个类型:CommitLogConsumeQueue



还有一个文件类型是indexfile,主要用于控制台消息检索,不影响消息的写入与消费,我们就不展开了。



CommitLog

CommitLog文件存储了Producer端写入的消息主体内容,它以追加写入的方式将消息存储到磁盘上的文件中。

单个文件大小默认1G ,文件名长度为20位(左边补零,剩余为起始偏移量),当文件满了,写入下一个文件。

比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。

它的主要特点是:顺序写,但是随机读(被ConsumeQueue读取)。

虽然是随机读,但是利用package机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。

Broker单个实例下所有的队列共用一个日志数据文件CommitLog来存储。而Kafka采用的是独立型的存储结构,每个队列一个文件。



ConsumeQueue

ConsumeQueue文件是用于支持消息消费的存储结构。保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。消费者 通过 顺序读取 ConsumeQueue文件,可以快速定位到消息在CommitLog中的物理存储位置,从而实现快速消息的拉取和消费。从实际物理存储的角度来看,每个主题Topic下的每个队列Queue对应一个ConsumeQueue文件。

生产者端的消息是顺序写入CommitLog,消费者端是顺序读取ConsumeQueue。
但是根据ConsumeQueue的起始物理位置偏移量offset读取消息真实内容,实际是随机读取CommitLog。
实现了 消息生产与消息消费数据存储和数据索引 相互分离。



怎么保证存储消息不丢失?



刷盘机制

Broker在把消息写入日志文件的过程中,如果在刚收到消息时,Broker异常宕机了,那么内存中尚未写入磁盘的消息就会丢失了。

因此,RocketMQ持久化消息分为两种:同步刷盘和异步刷盘(默认配置)。

异步刷盘是指Broker收到消息后先存储到PageCache,然后立即通知Producer消息已存储成功,可以继续处理业务逻辑。

此后,Broker会启动一个异步线程将消息持久化到磁盘。然而,如果Broker在持久化到磁盘之前发生故障,消息将会丢失。

## 刷盘策略配置
flushDiskType = ASYNC_FLUSH

注意,写入PageCache后,应用服务宕机消息不丢失,只有机器断电或宕机会有少量消息丢失。

相比之下,同步刷盘的方式是在消息存储到缓存后不立即通知Producer,而是等待消息被持久化到磁盘后再通知Producer。
这种方式确保了消息不会丢失,但性能不如异步刷盘高。一般用于金融业务。

## 刷盘策略配置
flushDiskType = SYNC_FLUSH

在选择刷盘方式时,需要根据业务场景进行权衡。



主从同步机制

即使Broker采用同步刷盘策略,但如果刷盘完成后磁盘损坏,会导致所有存储在磁盘上的消息丢失。

即使采用了主从复制,如果主节点在刷盘完成后还没有来得及将数据同步给从节点就发生了磁盘故障,同样会导致数据丢失。

所以我们可以配置同步机制,等待从节点复制完成主节点的消息后,才去通知Producer完成了消息存储。

## 主从同步策略配置
brokerRole=SYNC_MASTER



怎么提高存储写入性能?



零拷贝技术

RocketMQ通过使用内存映射文件(包括CommitLog、 ConsumeQueue等文件)来提高IO访问性能,也就是我们常说的零拷贝技术。

Java在NIO包里,引入了sendFile(FileChannel类)和MMAP(MappedByteBuffer类)两种实现方式的零拷贝技术。

主流的MQ都会使用零拷贝技术,来提升IO:

  • Kafka:record 的读和写都是基于 FileChannel。index 的读写则基于 MMAP。
  • RocketMQ:读取数据基于 MMAP,写入数据默认使用 MMAP。但可以通过修改配置

transientStorePoolEnable

  • 参数将其配置为使用 FileChannel。作者之所以这样设计,是为了避免 PageCache 的锁竞争,并通过两层架构实现读写分离。


缓冲池写入增强

在不开启RocketMQ的内存映射增强方案时,RocketMQ的读和写都只会简单直接使用MMAP。

但是,MappedByteBuffer也存在一些缺陷:

  • 使用虚拟内存,超过物理内存会导致内存交换,引起磁盘IO(可能非顺序IO)速度较慢。
  • 虚拟内存交换是受操作系统控制的,所以其他进程活动也会触发RocketMQ内存映射的交换。
  • 文件内存映射写入

PageCache

  • 时存在锁竞争,直接写入内存可避免竞争,在异步刷盘场景下速度更快。

为此,RocketMQ通过transientStorePoolEnable参数控制,对写入进行了优化。

如果开启了这个参数,会将写入拆分为两步, 写入缓冲区 + 异步刷盘 的增强策略。

## 刷盘策略配置
flushDiskType = ASYNC_FLUSH 
transientStorePoolEnable = true

MappedFile会提前申请一块直接内存用作缓冲区,放弃使用mmap直接写文件。数据先写入缓冲区,然后异步线程每200ms(且脏数据达到16K,commitCommitLogLeastPages = 4)将缓冲区的数据commit写入FileChannel。再唤醒定时服务(FlushRealTimeService类)将FileChannel里的数据持久化到磁盘。flush函数和commit一样也可以传入一个刷盘页数,当脏页数量达到16K时(flushLeastPages = 4),会进行刷盘操作,调用FileChannelforce将内存中的数据持久化到磁盘。

开启transientStorePoolEnable参数后,性能最好,但是相对来说持久化最不可靠



如何处理消息的过期和删除?

RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。

需要注意的是,在RocketMQ中,消息存储时长并不能完整控制消息的实际保存时间。

因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性消息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长。

建议在存储成本可控的前提下,尽可能延长消息存储时长。延长消息存储时长,可以为紧急故障恢复、应急问题排查和消息回溯带来更多的可操作空间。



总结

  • 存储模型与存储类型:

commitLog

  • 文件存储消息物理文件,

consumeQueue

  • 文件夹存储逻辑队列索引
  • 如何保证存储消息不丢失:同步&异步刷盘、主从消息同步
  • 如何提高写入性能:零拷贝技术MMAP和FileChannel、缓冲区增强 + 异步刷盘 策略
  • 如何清理过期消息:按存储时长清理消息

3分钟到了吗?应该对RocketMQ如何存储消息有全面了解了吧。
如果还想了解更多,欢迎关注下一期内容。


阿丸笔记(微信公众号:aone_note)



标签:存储,白话,写入,消息,磁盘,RocketMQ,刷盘
From: https://blog.51cto.com/u_15270048/8340484

相关文章

  • 升讯威在线客服系统的并发高性能数据处理技术:高性能OSS文件存储
    我在业余时间开发维护了一款免费开源的升讯威在线客服系统,也收获了许多用户。对我来说,只要能获得用户的认可,就是我最大的动力。最近客服系统成功经受住了客户现场组织的压力测试,获得了客户的认可。客户组织多名客服上线后,所有员工同一时间打开访客页面疯狂不停的给在线客服发消......
  • Filebeat采集通用基于S3协议的对象存储作为输入源
    一背景随着云计算和大数据技术的快速发展,越来越多的组织和企业选择将数据存储在云端的对象存储服务中。对象存储服务提供了高可靠性、可扩展性和低成本的存储解决方案,因此成为了许多应用场景的首选。S3(SimpleStorageService)是亚马逊AWS提供的一种对象存储服务,许多云厂商也提供了......
  • 数据存储和检索:B-tree 和 LSM-tree
     本文主要介绍数据库的核心数据结构索引的实现方式:B+tree和LSM-tree。实际上,数据库是可以不存在索引结构的,遍历数据库总归可以实现数据库的查询,但是,如果数据量很大,这种低效的做法是不可接受的,那么自然而然,牺牲部分空间换取时间被提出和接受,即保留额外的元数据,实现数据......
  • innodb存储引擎了解
    mysql常用的存储引擎分为innodb和myisam其中innodb具有支持事务,执行行级锁,支持MVCC,外键,自动增长列,崩溃恢复等特性。并且mysql在5.5.5之后是数据的默认存储引擎文件:mysql的数据都存放的data文件中,其中日志文件包括错误日志,慢查询日志,查询日志还有二进制日志慢查询日志默认时间......
  • 智能物联网时代里信息存储、处理和传输方式的变化浅谈
    智能物联网时代里信息存储、处理和传输方式的变化浅谈在智能物联网时代,信息存储、处理和传输的方式将发生重大变化。以下是一些可能的变化:1、存储方式的变化:随着物联网设备数量的增加,数据量也将急剧增加。传统的中心化数据存储方式将无法满足大量设备的数据存储需求。因此,分布式存......
  • 智能物联网时代里信息存储、处理和传输方式的变化浅谈
    智能物联网时代里信息存储、处理和传输方式的变化浅谈在智能物联网时代,信息存储、处理和传输的方式将发生重大变化。以下是一些可能的变化:1、存储方式的变化:随着物联网设备数量的增加,数据量也将急剧增加。传统的中心化数据存储方式将无法满足大量设备的数据存储需求。因此,分布式存......
  • 前端存储:localStorage、sessionStorage
    IDE:HBuilderX3.8.12-- 序章前端存储数据的方式有以下几种:JavaScriptCookieWeb存储localStoragesessionStorageHTML5WebSQL数据库IndexedDB 本文测试其中的Web存储:localStorage、sessionStorage。添加数据查看数据普通数据JSON数据删除数据清......
  • 存储过程中的日期格式化
    小例子DECLARE v_last_week_dayvarchar2(100);--上周第一天 v_last_monthvarchar2(100);--上周月份BEGIN selectto_char(trunc(sysdate,'iw')-7,'yyyy-mm-dd')intov_last_week_dayfromdual; selectto_char(trunc(sysdate,'iw')-7,'y......
  • redis 类型Hash 中value存储空间大小
    在Redis中,Hash数据类型的存储空间大小取决于存储在Hash中的键值对的数量以及每个键值对的键和值的大小。Redis内部会根据实际存储的数据进行动态分配内存,因此存储空间大小是可变的。下面是关于Hash数据类型中value存储空间大小的一些考虑因素:键值对数量:Hash中的键值对数量是主要影......
  • MySQL的存储函数、MySQL的触发器、MySQL的索引
    MySQL的存储函数概述MySQL存储函数(自定义函数),函数一般用于计算和返回一个值,可以将经常需要使用的计算或功能写成一个函数。存储函数和存储过程一样,都是在数据库中定义一些SQL语句的集合。存储函数与存储过程的区别:存储函数有且只有一个返回值,而存储过程可以有多个返回值,也可以没......