这篇文章介绍在RocketMQ存储中使用到的一些概念,包括和零拷贝相关的MappedFile
看下MappedFile的类图结构,DefaultMappedFile实现了MappedFile接口,同时继承了ReferenceResource类,这个类中实现了统计mappedFile的引用次数及释放等操作
DefaultMappedFile是一个封装好的内存块,可以让开发者对内存操作。
MMAP机制
mmap机制通过将磁盘缓冲映射到用户进程缓冲区,从而避免了一次CPU拷贝,显著提高了io效率
参考
@DefaultMappedFile#init()
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
mappedFile如何分配的?
Broker启动的时候,会启动DefaultMessageStore和AllocateMappedFileService,其中后者是一个线程类,在while循环中执行mmapOperation()
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
mmapOperation()会从阻塞队列requestQueue中取出请求,然后创建mmapedFile,流程如下:
- 从优先队列中获requestQueue中获取文件创建请求对象
- 从ConcurrentMap requestTable中获取文件创建请求对象
- 如果两个对象不一致则中断
- 初始化MappedFile
- 如果开启了堆外内存并且异步刷盘模式,并且当前broker为master,则使用带transientPool模式创建mappedFile
- 否则使用mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());创建
- 预热MappedFile
是否预热由isWarmMapedFileEnable配置决定,默认为false
那么:优先队列和map中的请求是什么时候放进去的呢?
if (null == mappedFile || mappedFile.isFull()) {
// 获取到最后一个mappedFile
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
@MappedFileQueue#getLastMappedFile(0)流程
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
// 根据startOffset获取mappedfile时,由于startOffset不一定是在mappedFile的起始位置,必须减去偏移量
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 如果mappedFile满了,就创建新的mappedFiel
// isFull是根据这个mappedFile的writePositon指针判断的,如果等于文件大小说明满了
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
@MappedFileQueue@tryCreateMappedFile
创建mappedFile时,会传入两个offset,分别是当前映射文件的offset以及下一个映射文件的offset
- 如果allocateMappedFileService线程类不为空,将两个映射文件请求路径提交给线程类处理,否则直接创建一个mappedFile
- 如果是创建的第一个mappedFile,就标记firstCreateInQueue
- 将mappedFile放入MappedFiles中
最终会来到创建mappedfile的逻辑
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
@AllocateMappedFileService#putRequestAndReturnMappedFile()
这个方法的逻辑如下:
- 默认提交两个请求,如果开启了堆外内存需要重新计算canSubmitRequest
- 先根据nextFilePath创建AllocateRequest
- 再实例化第二个AllocateRequest
- 等待5s创建完成,返回mappedFile
- 这个过程中,只有nextFilePath需要等待5s,第二个allocateRequest会有后台线程(mmapOperation)循环获取并创建
看完了这个过程,为什么RocketMQ store需要同时创建两个MappedFile呢?
提前创建好mappedFile,等到数据需要用到时直接获取,不必阻塞store的线程