首页 > 其他分享 >RocketMQ 多级存储设计与实现

RocketMQ 多级存储设计与实现

时间:2023-05-30 18:31:31浏览次数:58  
标签:存储 缓存 多级 消息 offset 上传 RocketMQ

作者:张森泽

随着 RocketMQ 5.1.0 的正式发布,多级存储作为 RocketMQ 一个新的独立模块到达了 Technical Preview 里程碑:允许用户将消息从本地磁盘卸载到其他更便宜的存储介质,可以用较低的成本延长消息保留时间。本文详细介绍 RocketMQ 多级存储设计与实现。

设计总览

RocketMQ 多级存储旨在不影响热数据读写的前提下将数据卸载到其他存储介质中,适用于两种场景:

  1. 冷热数据分离:RocketMQ 新近产生的消息会缓存在 page cache 中,我们称之为热数据;当缓存超过了内存的容量就会有热数据被换出成为冷数据。如果有少许消费者尝试消费冷数据就会从硬盘中重新加载冷数据到 page cache,这会导致读写 IO 竞争并挤压 page cache 的空间。而将冷数据的读取链路切换为多级存储就可以避免这个问题;
  2. 延长消息保留时间:将消息卸载到更大更便宜的存储介质中,可以用较低的成本实现更长的消息保存时间。同时多级存储支持为 topic 指定不同的消息保留时间,可以根据业务需要灵活配置消息 TTL。

RocketMQ 多级存储对比 Kafka 和 Pulsar 的实现最大的不同是我们使用准实时的方式上传消息,而不是等一个 CommitLog 写满后再上传,主要基于以下几点考虑:

  1. 均摊成本:RocketMQ 多级存储需要将全局 CommitLog 转换为 topic 维度并重新构建消息索引,一次性处理整个 CommitLog 文件会带来性能毛刺;
  2. 对小规格实例更友好:小规格实例往往配置较小的内存,这意味着热数据会更快换出成为冷数据,等待 CommitLog 写满再上传本身就有冷读风险。采取准实时上传的方式既能规避消息上传时的冷读风险,又能尽快使得冷数据可以从多级存储读取。

Quick Start

多级存储在设计上希望降低用户心智负担:用户无需变更客户端就能实现无感切换冷热数据读写链路,通过简单的修改服务端配置即可具备多级存储的能力,只需以下两步:

  1. 修改 Broker 配置,指定使用 org.apache.rocketmq.tieredstore.TieredMessageStore 作为 messageStorePlugIn
  2. 配置你想使用的储存介质,以卸载消息到其他硬盘为例:配置 tieredBackendServiceProvider 为 org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment,同时指定新储存的文件路径:tieredStoreFilepath

可选项:支持修改 tieredMetadataServiceProvider 切换元数据存储的实现,默认是基于 json 的文件存储

更多使用说明和配置项可以在 GitHub 上查看多级存储的 README \[ 1]

技术架构

RocketMQ 多级存储设计与实现_上传

architecture

  • 接入层:TieredMessageStore/TieredDispatcher/TieredMessageFetcher接入层实现 MessageStore 中的部分读写接口,并为他们增加了异步语意。TieredDispatcher 和 TieredMessageFetcher 分别实现了多级存储的上传/下载逻辑,相比于底层接口这里做了较多的性能优化:包括使用独立的线程池,避免慢 IO 阻塞访问热数据;使用预读缓存优化性能等。
  • 容器层:TieredCommitLog/TieredConsumeQueue/TieredIndexFile/TieredFileQueue容器层实现了和 DefaultMessageStore 类似的逻辑文件抽象,同样将文件划分为 CommitLog、ConsumeQueue、IndexFile,并且每种逻辑文件类型都通过 FileQueue 持有底层物理文件的引用。有所不同的是多级存储的 CommitLog 改为 queue 维度。
  • 驱动层:TieredFileSegment
    驱动层负责维护逻辑文件到物理文件的映射,通过实现 TieredStoreProvider 对接底层文件系统读写接口(Posix、S3、OSS、MinIO 等)。目前提供了 PosixFileSegment 的实现,可以将数据转移到其他硬盘或通过 fuse 挂载的对象存储上。

消息上传

RocketMQ 多级存储的消息上传是由 dispatch 机制触发的:初始化多级存储时会将 TieredDispatcher 注册为 CommitLog 的 dispacher。这样每当有消息发送到 Broker 会调用 TieredDispatcher 进行消息分发,TieredDispatcher 将该消息写入到 upload buffer 后立即返回成功。整个 dispatch 流程中不会有任何阻塞逻辑,确保不会影响本地 ConsumeQueue 的构建。

RocketMQ 多级存储设计与实现_多级_02

TieredDispatcherTieredDispatcher 写入 upload buffer 的内容仅为消息的引用,不会将消息的 body 读入内存。因为多级储存以 queue 维度构建 CommitLog,此时需要重新生成 commitLog offset 字段。

RocketMQ 多级存储设计与实现_缓存_03

upload buffer触发 upload buffer 上传时读取到每条消息的 commitLog offset 字段时采用拼接的方式将新的 offset 嵌入到原消息中。

上传进度控制

每个队列都会有两个关键位点控制上传进度:

  1. dispatch offset:已经写入缓存但是未上传的消息位点
  2. commit offset:已上传的消息位点

RocketMQ 多级存储设计与实现_缓存_04

upload progress类比消费者,dispatch offset 相当于拉取消息的位点,commit offset 相当于确认消费的位点。commit offset 到 dispatch offset 之间的部分相当于已拉取未消费的消息。

消息读取

TieredMessageStore 实现了 MessageStore 中的消息读取相关接口,通过请求中的逻辑位点(queue offset)判断是否从多级存储中读取消息,根据配置(tieredStorageLevel)有四种策略:

  • DISABLE:禁止从多级存储中读取消息;
  • NOT\_IN\_DISK:不在 DefaultMessageStore 中的消息从多级存储中读取;
  • NOT\_IN\_MEM:不在 page cache 中的消息即冷数据从多级存储读取;
  • FORCE:强制所有消息从多级存储中读取,目前仅供测试使用。
/**
  * Asynchronous get message
  * @see #getMessage(String, String, int, long, int, MessageFilter) getMessage
  *
  * @param group Consumer group that launches this query.
  * @param topic Topic to query.
  * @param queueId Queue ID to query.
  * @param offset Logical offset to start from.
  * @param maxMsgNums Maximum count of messages to query.
  * @param messageFilter Message filter used to screen desired messages.
  * @return Matched messages.
  */
CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,
    final long offset, final int maxMsgNums, final MessageFilter messageFilter);

需要从多级存储中读取的消息会交由 TieredMessageFetcher 处理:首先校验参数是否合法,然后按照逻辑位点(queue offset)发起拉取请求。TieredConsumeQueue/TieredCommitLog 将逻辑位点换算为对应文件的物理位点从 TieredFileSegment 读取消息。

// TieredMessageFetcher#getMessageAsync similar with 
TieredMessageStore#getMessageAsync
public CompletableFuture<GetMessageResult> 
getMessageAsync(String group, String topic, int queueId,
        long queueOffset, int maxMsgNums, final MessageFilter messageFilter)

TieredFileSegment 维护每个储存在文件系统中的物理文件位点,并通过为不同存储介质实现的接口从中读取所需的数据。

/**
  * Get data from backend file system
  *
  * @param position the index from where the file will be read
  * @param length the data size will be read
  * @return data to be read
  */
CompletableFuture<ByteBuffer> read0(long position, int length);
预读缓存

TieredMessageFetcher 读取消息时会预读一部分消息供下次使用,这些消息暂存在预读缓存中。

protected final Cache<MessageCacheKey /* topic, queue id and queue offset */,
SelectMappedBufferResultWrapper /* message data */> readAheadCache;

预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的消息量类似拥塞窗口采用加法增、乘法减的机制控制:

  • 加法增:从最小窗口开始,每次增加等同于客户端 batchSize 的消息量。
  • 乘法减:当缓存的消息超过了缓存过期时间仍未被全部拉取,在清理缓存的同时会将下次预读消息量减半。

预读缓存支持在读取消息量较大时分片并发请求,以取得更大带宽和更小的延迟。

某个 topic 消息的预读缓存由消费这个 topic 的所有 group 共享,缓存失效策略为:

  1. 所有订阅这个 topic 的 group 都访问了缓存
  2. 到达缓存过期时间

故障恢复

上文中我们介绍上传进度由 commit offset 和 dispatch offset 控制。多级存储会为每个 topic、queue、fileSegment 创建元数据并持久化这两种位点。当 Broker 重启后会从元数据中恢复,继续从 commit offset 开始上传消息,之前缓存的消息会重新上传并不会丢失。

RocketMQ 多级存储设计与实现_缓存_05

开发计划

面向云原生的存储系统要最大化利用云上存储的价值,而对象存储正是云计算红利的体现。RocketMQ 多级存储希望一方面利用对象存储低成本的优势延长消息存储时间、拓展数据的价值;另一方面利用其共享存储的特性在多副本架构中兼得成本和数据可靠性,以及未来向 Serverless 架构演进。

tag 过滤

多级存储拉取消息时没有计算消息的 tag 是否匹配,tag 过滤交给客户端处理。这样会带来额外的网络开销,计划后续在服务端增加 tag 过滤能力。

广播消费以及多个消费进度不同的消费者

预读缓存失效需要所有订阅这个 topic 的 group 都访问了缓存,这在多个 group 消费进度不一致的情况下很难触发,导致无用的消息在缓存中堆积。需要计算出每个 group 的消费 qps 来估算某个 group 能否在缓存失效前用上缓存的消息。如果缓存的消息预期在失效前都不会被再次访问,那么它应该被立即过期。相应的对于广播消费,消息的过期策略应被优化为所有 Client 都读取这条消息后才失效。

和高可用架构的融合

目前主要面临以下三个问题:

  1. 元数据同步:如何可靠的在多个节点间同步元数据,slave 晋升时如何校准和补全缺失的元数据;
  2. 禁止上传超过 confirm offset 的消息:为了避免消息回退,上传的最大 offset 不能超过 confirm offset;
  3. slave 晋升时快速启动多级存储:只有 master 节点具有写权限,在 slave 节点晋升后需要快速拉起多级存储断点续传。

相关链接:

\[1] README

<https://github.com/apache/rocketmq/blob/develop/tieredstore/README.md>

点击此处查看消息队列 RocketMQ 产品详情

标签:存储,缓存,多级,消息,offset,上传,RocketMQ
From: https://blog.51cto.com/u_13778063/6381240

相关文章

  • Javascript上传文件到阿里OSS存储,并支持进度查看
    现在使用js上传文件的插件有很多,例如:plupload等等今天我记录一下使用原生js的上传文件,并且支持进度查看,下面直接上代码:html代码:<inputtype="file"onchange="uploadMedia(this)"><aid="showProgress"style="display:none;"href="#"></a>预览效......
  • ROCKETMQ
    配置环境变量(ROCKETMQ_HOME)修改runserver.cmd1、进入bin目录下找到runserver.cmd文件,用编辑器打开,因为RocketMQ默认需要2g运行内存,做为测试用,就只要最低配置就好了,注释原有的NameServer的配置,在其前面加上rem注释掉,remset"JAVA_OPT=%JAVA_OPT%-server-Xms2g-Xmx2g-Xmn1......
  • jquery本地存储的数据格式只能是字符串,如需存储对象,需要转换后存储
    <!DOCTYPEhtml><htmllang="en"> <head> <metacharset="UTF-8"> <title>Title</title> <scriptsrc="js/jquery-3.5.1.min.js"></script> </head> <body> <scri......
  • 解决es存储的日志显示不完整问题
    背景:通过服务的实时日志和从kibana中查询到的不一致,实时日志会显示的更多。解决方法:修改fluentdconfigmap按照下图的方式fluentd配置中filter下,如图下图位置增加以下配置`#Concatenatemulti-linelogs(>=16KB)<filterkubernetes.**>@typeconcat......
  • RocketMQ使用实例
    下面是一个使用Java实现的RocketMQ示例代码,用于发送和消费消息:首先,您需要下载并安装RocketMQ,并启动NameServer和Broker。接下来,您可以使用以下示例代码来发送和消费消息:Producer.java文件:importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.ro......
  • 详解RocketMQ 顺序消费机制
    摘要:顺序消息是指对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。本文分享自华为云社区《RocketMQ顺序消费机制》,作者:勇哥java实战分享。顺序消息是指对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则......
  • store文件夹 vue_vue-cli2使用store存储全局变量
    1.引入store安装引入vuex,在main.js里面:importstorefrom'./store'//store引入newVue({el:'#app',router,store,//store引入components:{App},template:''})在store文件夹下创建index.js入口文件,添加下面内容:importVuefrom'vue';im......
  • SpringBoot集成RocketMQ,rocketmq_client.log日志文件配置
    SpringBoot项目集成rocketmq-client<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version></dependency>项目启动时会在${user.home}/logs目录下创建一个roc......
  • RocketMQ4.9.5集群部署
    RocketMQ集群部署背景:生产环境单机的MQ不具有高可用,所以我们应该部署成集群模式,这里给大家部署一个双主双从异步复制的Broker集群一、单机部署、部署前提参考https://www.cnblogs.com/hsyw/p/17428530.htmlhttps://www.cnblogs.com/hsyw/p/17429834.html二、集群部署......
  • RocketMQ 顺序消费机制
    顺序消息是指对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。顺序消息分为分区顺序消息和全局顺序消息。1、分区顺序消息对于指定的一个Topic,所有消息根据ShardingKey进行区块分区,同一个分区内的消息按......