首页 > 其他分享 >RocketMQ存储篇四:刷盘

RocketMQ存储篇四:刷盘

时间:2023-01-08 19:57:48浏览次数:34  
标签:存储 mappedFile int 内存 new CommitLog RocketMQ 刷盘

概览

RocketMQ 主从同步指的是消息发送到master的内存中,并且等到同步到slaver的内存才返回;
刷盘则是将内存中的消息写入磁盘,同样分为同步刷盘和异步刷盘。同步刷盘指一条消息写入磁盘才返回成功,异步刷盘指写入内存就返回成功,稍后异步线程刷盘。

上文说到消息append后会返回一个状态(PUT_OK或其他),然后处理刷盘

        public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
            // Synchronization flush
            if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (messageExt.isWaitStoreMsgOK()) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    flushDiskWatcher.add(request);
                    service.putRequest(request);
                    return request.future();
                } else {
                    service.wakeup();
                    return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
                }
            }
            // Asynchronous flush
            else {
                if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
                    flushCommitLogService.wakeup();
                } else {
                    commitRealTimeService.wakeup();
                }
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }

调用链

1. BrokerstartUp.main()
2. createBrokerControlller()
3. controller.initialize()
4. this.messageStore = new DefaultMesageStore()
5. new CommitLog()
6. 初始化刷盘线程
   if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
          this.flushCommitLogService = new GroupCommitService();
   } else {
          this.flushCommitLogService = new FlushRealTimeService();
   }
    this.commitLogService = new CommitRealTimeService();
7. BrokerStartup.start()
8. messageStore.start()
9. commitLog.start()
10. 刷盘线程启动
    this.flushCommitLogService.start();
    if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        this.commitLogService.start();
    }

image

  1. 如果同步刷盘模式,启动GroupCommitService
  2. 如果异步刷盘模式,启动FlushRealtimeService
  3. 如果开启了堆外内存,启动CommitRealtimeService
public DefaultFlushManager() {
	if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
		this.flushCommitLogService = new CommitLog.GroupCommitService();
	} else {
		this.flushCommitLogService = new CommitLog.FlushRealTimeService();
	}

	this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override public void start() {
	this.flushCommitLogService.start();

	if (defaultMessageStore.isTransientStorePoolEnable()) {
		this.commitRealTimeService.start();
	}
}

思考:如果开启了堆外内存,还需要开启CommitRealTimeService,这个任务作用是什么?作用于同步刷盘模式还是异步刷盘模式?

同步刷盘线程GroupCommitService

很巧妙的机制,设置了两个阻塞队列,保证读刷盘请求和写刷盘请求始终是在不同的阻塞队列中的,就避免了加锁操作,每次刷盘完后交换两个引用
读写分离,防止锁竞争

 class GroupCommitService extends FlushCommitLogService {
      private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
      private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
      private final PutMessageSpinLock lock = new PutMessageSpinLock();

流程

@GroupCommitService#doCommit()

1. 判断这个请求是否已经刷过
  mappedFileQueue.getFlushWhere() > req.getNextOffset()
2. mappedFileQueue.flush(0)
3. 找到写的文件
   mappedFileQueue.findMappedFileByOffset(this.flushedWhere, this.flushwhere=0)
4. filechannel.force() or mappedByteBuffer.force()
5. 更新刷盘点this.flushedPosition.set(value)
    int offset = mappedFile.flush();
    long where =  mappedFile.getFileFromOffset() + offset()
6. 没有刷盘成功就重试一次
7. 唤醒结束线程req.wakeupCustomer(PUT_OK or Timeout)
8. 更新checkpoint时间点
9. 清空读队列

如果开启了堆外内存,追加消息的时候,使用堆外内存
开启堆外内存时,调用filechannel.force()
未开启时,调用mappedfilebuffer.force()

@MappedFileQueue#flush(0)

    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        // 根据offset找到mappedfile
		MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            // 刷盘指定页数的内存到磁盘,返回flushedOffset,如果参数为0表示,表示立即刷入,可以参考isAbleToFlush()
			int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
        return result;
    }

问题:

  1. mappedFile.getFileFromOffset()是什么值?
    返回的是该CommitLog的起始偏移量
    int offset = mappedFile.flush(0); // 这个返回的是writePosition或者commitedPosition,是一个相对值
    long where = mappedFile.getFileFromOffset() + offset; //实际的刷盘位置
  2. readOffset、writeOffset、commitedOffset分别表示什么含义
    DefaultMappedFile中有几个指针、wrotePosition, committedPosition, flushedPosition。
    其中:
    wrotePosition表示消息写入mappedfile中的位点(未提交刷盘请求、未刷盘)
    committedPosition表示提交刷盘请求的位点(未刷盘)
    flushedPosition表示已刷盘的位点

异步刷盘线程FlushRealTimeService

流程

1. 不停止就一直循环这个线程
2. 获取一些参数
  flushCommitLogTimed:标志使用await还是sleep来控制线程,默认falst使用await
  interval:刷盘间隔,默认500ms
  flushPhysicalQueueLeastPages:一次最少刷入4页
  flushPhysicalQueueThroughInterval:距离上次刷盘间隔最大默认10s
3. 超时判断
  将flushPhysicalQueueLeastPages设置为0,表示一有数据就刷盘
  将lastFlushTimestamp设置为现在
4. 等待500ms
5. 刷盘
    mappedFileQueue.flush(),与同步刷盘一样
6.如果线程终止了,就重试10次
  for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
    result = CommitLog.this.mappedFileQueue.flush(0);
    CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
  }  

问题

  1. 异步刷盘和同步刷盘的区别体现在什么地方?
    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
    protected void commit0() {
        int writePos = WROTE_POSITION_UPDATER.get(this);
        int lastCommittedPosition = COMMITTED_POSITION_UPDATER.get(this);

        if (writePos - lastCommittedPosition > 0) {
            try {
		// 消息都是追加到writeBuffer中的,调用write写入映射内存(磁盘文件)
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);
                COMMITTED_POSITION_UPDATER.set(this, writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }

堆外刷盘线程CommitRealtimeSerivce

理解

就是将消息写道堆外内存,fileChannel中;读消息从内存中,这样就是一个刷盘的读写分离。
优势就是提高了刷盘效率;缺点就是可能会丢失数据

流程

1. 不停止就一直循环这个线程
2. 获取一些参数
  interval:刷盘的时间间隔,默认为200ms
  commitDataLeastPages:一次刷盘的页数,默认为4页
  getCommitCommitLogThoroughInterval:刷盘间隔,默认为200ms
3. 超时判断
  将flushPhysicalQueueLeastPages设置为0,表示一有数据就刷盘
  将lastFlushTimestamp设置为现在
4. 提交数据
   mappedFileQueue.commit();
5. 找到写入位置findMappedFileFromOffset()
6. 写入数据并更新刷盘位置
  int offset = mappedFile.commit(commitLeastPage);
  long where = mappedFile.getFileFromoffset() + offset;
7. 判断是否可以提交
    如果commitDataLeastPage大于0,
    write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages
    如果等于0,表示有数据就提交  
8. 创建writeBuffer共享缓存区
9. 通过channel刷盘
10.更新commitedPosition  
11.返回刷盘结果result,如果失败就唤醒flushCommitLogService线程
12.仍然失败则重试10次

开启对外内存后,消息先存到writeBuffer,然后通过channel刷盘到磁盘中

protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - this.committedPosition.get() > 0) {
        try {
            ByteBuffer byteBuffer = writeBuffer.slice();
            byteBuffer.position(lastCommittedPosition);
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

标签:存储,mappedFile,int,内存,new,CommitLog,RocketMQ,刷盘
From: https://www.cnblogs.com/okdogngge/p/16071181.html

相关文章

  • 配置iSCSI部署网络存储
    iSCSI互联网小型计算机系统接口是由IBM下属的两大研发机构AImaden和Haifa研究中心共同开发的,是一个供硬件设备使用的、可在IP协议上层运行的SCSI指令集,是一种开放的基于I......
  • JS存储
    StoragelocalStorage永久性的存储方法属性length方法Storage.key(index):返回存储中的第n个key名称Storage.getItem(key)Storage.setItem(key,value)Stor......
  • 32、商品服务--品牌管理--云存储的开通与使用
    我们新增的品牌logo,希望存储起来。如果存储到我们的服务器,如果我们这个服务部署了多个服务器,这就导致每个服务器的数据不一致。所以我们将前端提交的数据统一上传到一个地......
  • p17常见的几种调用约定以及float型转化为内存存储格式
    常见的几种调用约定--cdecl(栈外平衡)--stdcall(内平栈)--fastcall找到main入口ctrl+n找到Getversion然后f8找3函数(3push1call)float型转化为内存存储格式8.258:100......
  • 数据的存储(C语言进阶)
    数据类型介绍内置数据类型的归类整型在内存中的存储:①原码、反码、补码②大小端字节序③char的存储内容浮点型在内存中的存储自学b站“鹏哥C语言”笔记。一、数据类型介绍......
  • 客服系统即时通讯IM开发(三)访客实现一对一聊天-访客生成唯一id标识存储到全局变量【唯
    在访客进入聊天界面的时候,就要调用接口生成一个唯一ID标识然后前端链接WebSocket的时候,传递这个访客ID进来 如果你想在前端访客连接时生成一个UUID,可以使用Go语言的......
  • dremio 系统内部存储插件与自定义存储插件加载的区别
    dremio整体包含了两大类存储扩展,系统内部使用的,以及用户开发的,整体区别系统的目前是在dremio自己启动的时候就会注册以及使用的,比如加速反射的,home,元数据存储插件用......
  • MYSQL 创建(存储过程)定时任务
    代码如下:--查看MYSQL事件调度器:(这个调度器主要是监视一个事件是否要调用,要创建事件,必须打开调度器。)showvariableslike'event_scheduler';--创建存储过程:CREATEPROCED......
  • 三 docker存储和网络
    docker数据管理数据卷volumes可供一个或多个容器使用的特殊目录,可以在容器之间共享,对数据卷的修改会立即生效且不会影响镜像,与容器声明周期独立,即容器删除数据卷也可......
  • MTK存储说明
    @目录简介说明RAM和ROM的分配编译完后的空间查看简介MTK存储功能说明说明RAM和ROM的分配MTK项目里面,存储部分只需要关注ROM和RAM即可到对应的工程下面pro/FF741_CQ2......