概览
RocketMQ 主从同步指的是消息发送到master的内存中,并且等到同步到slaver的内存才返回;
刷盘则是将内存中的消息写入磁盘,同样分为同步刷盘和异步刷盘。同步刷盘指一条消息写入磁盘才返回成功,异步刷盘指写入内存就返回成功,稍后异步线程刷盘。
上文说到消息append后会返回一个状态(PUT_OK或其他),然后处理刷盘
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
调用链
1. BrokerstartUp.main()
2. createBrokerControlller()
3. controller.initialize()
4. this.messageStore = new DefaultMesageStore()
5. new CommitLog()
6. 初始化刷盘线程
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
7. BrokerStartup.start()
8. messageStore.start()
9. commitLog.start()
10. 刷盘线程启动
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
- 如果同步刷盘模式,启动GroupCommitService
- 如果异步刷盘模式,启动FlushRealtimeService
- 如果开启了堆外内存,启动CommitRealtimeService
public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new CommitLog.GroupCommitService();
} else {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();
}
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}
@Override public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.start();
}
}
思考:如果开启了堆外内存,还需要开启CommitRealTimeService,这个任务作用是什么?作用于同步刷盘模式还是异步刷盘模式?
同步刷盘线程GroupCommitService
很巧妙的机制,设置了两个阻塞队列,保证读刷盘请求和写刷盘请求始终是在不同的阻塞队列中的,就避免了加锁操作,每次刷盘完后交换两个引用
读写分离,防止锁竞争
class GroupCommitService extends FlushCommitLogService {
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
private final PutMessageSpinLock lock = new PutMessageSpinLock();
流程
@GroupCommitService#doCommit()
1. 判断这个请求是否已经刷过
mappedFileQueue.getFlushWhere() > req.getNextOffset()
2. mappedFileQueue.flush(0)
3. 找到写的文件
mappedFileQueue.findMappedFileByOffset(this.flushedWhere, this.flushwhere=0)
4. filechannel.force() or mappedByteBuffer.force()
5. 更新刷盘点this.flushedPosition.set(value)
int offset = mappedFile.flush();
long where = mappedFile.getFileFromOffset() + offset()
6. 没有刷盘成功就重试一次
7. 唤醒结束线程req.wakeupCustomer(PUT_OK or Timeout)
8. 更新checkpoint时间点
9. 清空读队列
如果开启了堆外内存,追加消息的时候,使用堆外内存
开启堆外内存时,调用filechannel.force()
未开启时,调用mappedfilebuffer.force()
@MappedFileQueue#flush(0)
public boolean flush(final int flushLeastPages) {
boolean result = true;
// 根据offset找到mappedfile
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
// 刷盘指定页数的内存到磁盘,返回flushedOffset,如果参数为0表示,表示立即刷入,可以参考isAbleToFlush()
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
问题:
- mappedFile.getFileFromOffset()是什么值?
返回的是该CommitLog的起始偏移量
int offset = mappedFile.flush(0); // 这个返回的是writePosition或者commitedPosition,是一个相对值
long where = mappedFile.getFileFromOffset() + offset; //实际的刷盘位置 - readOffset、writeOffset、commitedOffset分别表示什么含义
DefaultMappedFile中有几个指针、wrotePosition, committedPosition, flushedPosition。
其中:
wrotePosition表示消息写入mappedfile中的位点(未提交刷盘请求、未刷盘)
committedPosition表示提交刷盘请求的位点(未刷盘)
flushedPosition表示已刷盘的位点
异步刷盘线程FlushRealTimeService
流程
1. 不停止就一直循环这个线程
2. 获取一些参数
flushCommitLogTimed:标志使用await还是sleep来控制线程,默认falst使用await
interval:刷盘间隔,默认500ms
flushPhysicalQueueLeastPages:一次最少刷入4页
flushPhysicalQueueThroughInterval:距离上次刷盘间隔最大默认10s
3. 超时判断
将flushPhysicalQueueLeastPages设置为0,表示一有数据就刷盘
将lastFlushTimestamp设置为现在
4. 等待500ms
5. 刷盘
mappedFileQueue.flush(),与同步刷盘一样
6.如果线程终止了,就重试10次
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
问题
- 异步刷盘和同步刷盘的区别体现在什么地方?
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
protected void commit0() {
int writePos = WROTE_POSITION_UPDATER.get(this);
int lastCommittedPosition = COMMITTED_POSITION_UPDATER.get(this);
if (writePos - lastCommittedPosition > 0) {
try {
// 消息都是追加到writeBuffer中的,调用write写入映射内存(磁盘文件)
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
COMMITTED_POSITION_UPDATER.set(this, writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
堆外刷盘线程CommitRealtimeSerivce
理解
就是将消息写道堆外内存,fileChannel中;读消息从内存中,这样就是一个刷盘的读写分离。
优势就是提高了刷盘效率;缺点就是可能会丢失数据
流程
1. 不停止就一直循环这个线程
2. 获取一些参数
interval:刷盘的时间间隔,默认为200ms
commitDataLeastPages:一次刷盘的页数,默认为4页
getCommitCommitLogThoroughInterval:刷盘间隔,默认为200ms
3. 超时判断
将flushPhysicalQueueLeastPages设置为0,表示一有数据就刷盘
将lastFlushTimestamp设置为现在
4. 提交数据
mappedFileQueue.commit();
5. 找到写入位置findMappedFileFromOffset()
6. 写入数据并更新刷盘位置
int offset = mappedFile.commit(commitLeastPage);
long where = mappedFile.getFileFromoffset() + offset;
7. 判断是否可以提交
如果commitDataLeastPage大于0,
write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages
如果等于0,表示有数据就提交
8. 创建writeBuffer共享缓存区
9. 通过channel刷盘
10.更新commitedPosition
11.返回刷盘结果result,如果失败就唤醒flushCommitLogService线程
12.仍然失败则重试10次
开启对外内存后,消息先存到writeBuffer,然后通过channel刷盘到磁盘中
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
标签:存储,mappedFile,int,内存,new,CommitLog,RocketMQ,刷盘
From: https://www.cnblogs.com/okdogngge/p/16071181.html