首页 > 其他分享 >RocketMQ存储篇二:相关概念介绍(MappedFile)

RocketMQ存储篇二:相关概念介绍(MappedFile)

时间:2023-01-04 15:35:53浏览次数:57  
标签:存储 mappedFileLast 创建 mappedFile 线程 startOffset RocketMQ MappedFile

这篇文章介绍在RocketMQ存储中使用到的一些概念,包括和零拷贝相关的MappedFile

看下MappedFile的类图结构,DefaultMappedFile实现了MappedFile接口,同时继承了ReferenceResource类,这个类中实现了统计mappedFile的引用次数及释放等操作
DefaultMappedFile是一个封装好的内存块,可以让开发者对内存操作。

image

MMAP机制

mmap机制通过将磁盘缓冲映射到用户进程缓冲区,从而避免了一次CPU拷贝,显著提高了io效率
参考

@DefaultMappedFile#init()

this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();

mappedFile如何分配的?
Broker启动的时候,会启动DefaultMessageStore和AllocateMappedFileService,其中后者是一个线程类,在while循环中执行mmapOperation()

    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped() && this.mmapOperation()) {

        }
        log.info(this.getServiceName() + " service end");
    }

mmapOperation()会从阻塞队列requestQueue中取出请求,然后创建mmapedFile,流程如下:

  1. 从优先队列中获requestQueue中获取文件创建请求对象
  2. 从ConcurrentMap requestTable中获取文件创建请求对象
  3. 如果两个对象不一致则中断
  4. 初始化MappedFile
    1. 如果开启了堆外内存并且异步刷盘模式,并且当前broker为master,则使用带transientPool模式创建mappedFile
    2. 否则使用mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());创建
  5. 预热MappedFile
    是否预热由isWarmMapedFileEnable配置决定,默认为false

那么:优先队列和map中的请求是什么时候放进去的呢?

if (null == mappedFile || mappedFile.isFull()) {
	// 获取到最后一个mappedFile
	mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}

@MappedFileQueue#getLastMappedFile(0)流程

    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {
			// 根据startOffset获取mappedfile时,由于startOffset不一定是在mappedFile的起始位置,必须减去偏移量
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) {
			// 如果mappedFile满了,就创建新的mappedFiel
			// isFull是根据这个mappedFile的writePositon指针判断的,如果等于文件大小说明满了
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }

        if (createOffset != -1 && needCreate) {
            return tryCreateMappedFile(createOffset);
        }

        return mappedFileLast;
    }

@MappedFileQueue@tryCreateMappedFile
创建mappedFile时,会传入两个offset,分别是当前映射文件的offset以及下一个映射文件的offset

  1. 如果allocateMappedFileService线程类不为空,将两个映射文件请求路径提交给线程类处理,否则直接创建一个mappedFile
  2. 如果是创建的第一个mappedFile,就标记firstCreateInQueue
  3. 将mappedFile放入MappedFiles中

最终会来到创建mappedfile的逻辑

        if (this.allocateMappedFileService != null) {
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
        } else {
            try {
                mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }

@AllocateMappedFileService#putRequestAndReturnMappedFile()
这个方法的逻辑如下:

  1. 默认提交两个请求,如果开启了堆外内存需要重新计算canSubmitRequest
  2. 先根据nextFilePath创建AllocateRequest
  3. 再实例化第二个AllocateRequest
  4. 等待5s创建完成,返回mappedFile
  5. 这个过程中,只有nextFilePath需要等待5s,第二个allocateRequest会有后台线程(mmapOperation)循环获取并创建

看完了这个过程,为什么RocketMQ store需要同时创建两个MappedFile呢?
提前创建好mappedFile,等到数据需要用到时直接获取,不必阻塞store的线程

标签:存储,mappedFileLast,创建,mappedFile,线程,startOffset,RocketMQ,MappedFile
From: https://www.cnblogs.com/okdogngge/p/17017626.html

相关文章

  • RocketMQ 5.0 多语言客户端的设计与实现
    本文作者:古崟佑,阿里云中间件开发。 RocketMQ5.0版本拥有非常多新特性,比如存储计算分离、batch能力的提升等,它是具有里程碑意义的版本。提到新版本,我们往往会首先......
  • C语言数据的存储
    前言之前写过一篇关于​​C语言内存管理​​的文章,对在C语言中使用内存中需要注意的一些问题和解决办法做了一些总结。实际上,内存终归是要存储数据的,这次对C语言中的数据存......
  • RocketMQ 5.0 多语言客户端的设计与实现
    本文作者:古崟佑,阿里云中间件开发。RocketMQ5.0版本拥有非常多新特性,比如存储计算分离、batch能力的提升等,它是具有里程碑意义的版本。提到新版本,我们往往会首先想到服务......
  • RocketMQ Compaction Topic的设计与实现
    本文作者:刘涛,阿里云智能技术专家。01 CompactionTopic介绍一般来说,消息队列提供的数据过期机制有如下几种,比如有基于时间的过期机制——数据保存多长时间后即进行清理,也有......
  • RocketMQ Compaction Topic的设计与实现
    本文作者:刘涛,阿里云智能技术专家。 01 CompactionTopic介绍一般来说,消息队列提供的数据过期机制有如下几种,比如有基于时间的过期机制——数据保存多长时间后即进行......
  • 浅谈存储系统:LSM 树设计原理
    我在上篇文章ApachePulsar的架构设计中介绍了Pulsar存算分离的架构,其中broker只负责计算,由BookKeeper负责底层的存储,我还画了这样一张图说明BookKeeper读写分......
  • ceph 集群维护 存储池管理 用户认证
    添加主机 添加磁盘  删除磁盘ceph集群维护:http://docs.ceph.org.cn/rados/#ceph集群配置、部署与运维通过套接字进行单机管理每个节点上,每个osd都会生成 soc......
  • AT24C02数据存储+普中51单片机+江科大自化协
    1系统原理图 2现象 当按键Key1被按下时,LCD1602显示的数值加1,每按下一次,自增1;当按键Key2被按下时,LCD1602显示的数值自减1;当按键Key3被按下时,单片机将LCD1602显......
  • Curve 分布式存储在 KubeSphere 中的实践
    作者:尹珉,KubeSphere社区用户委员会杭州站站长Curve介绍Curve是网易开发的现代存储系统,目前支持文件存储(CurveFS)和块存储(CurveBS)。现在它作为一个沙盒项目托管在......
  • Curve 分布式存储在 KubeSphere 中的实践
    Curve介绍Curve是网易开发的现代存储系统,目前支持文件存储(CurveFS)和块存储(CurveBS)。现在它作为一个沙盒项目托管在CNCF。Curve是一个高性能、轻量级操作、本......