第一篇介绍了存储的概览,定位到消息追加消息处细看一下
获取mappedFile ——> 追加消息 ——> 处理结果
@DefaultMappedFile#appendMsgInner()
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
int currentPos = WROTE_POSITION_UPDATER.get(this);
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = appendMessageBuffer().slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
// traditional batch message
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBrokerInner) {
// traditional single message or newly introduced inner-batch message
// 使用回调函数来处理append消息的过程
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
@DefaultAppendMessageCallback#doAppend()
处理逻辑
- 根据bytebuffer中的wroteposition以及mappedfile开头获取物理offset
long wroteOffset = fileFromOffset + byteBuffer.position(); - 构造msgId
- 判断mappedfile是否有足够的空间写msg, 如果没有,就返回一个mappedFile用完的状态,重新分配mappedfile并写入,如果空间足够,就修改encode阶段消息中的一些假值并返回PUT_OK状态
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) - 根据put的状态来做处理(刷盘以及ha)
那么,消息到底追加到了哪里呢?
标签:存储,currentPos,fileSize,AppendMessageResult,messageExt,追加,byteBuffer,RocketMQ,res From: https://www.cnblogs.com/okdogngge/p/17027809.html