这篇文章记录RocketMQ大致的存储流程,源码为5.0.1-SNAPSHOT
入口从Broker模块的SendMessageProcessor开始
@SendMessageProcessor#sendMessage()
5.0中将putmessage中的一些校验逻辑下沉到了hook中
for (PutMessageHook putMessageHook : putMessageHookList) {
PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(msg);
if (handleResult != null) {
return CompletableFuture.completedFuture(handleResult);
}
}
- store服务关闭会fail
- slave as master模式中slave不可用会fail
- store不可写会fail
- 非重试topic超过最大长度会失败(Byte 127)
- topic长度超过255会失败
- msg body为空会fail
- pagecache busy会fail
然后会调用commitlog中的putMessage方法
重点看一下这里面的逻辑
@commitLog#asyncPutMessage(msg)
- 编码消息
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
public PutMessageResult encode(MessageExtBrokerInner msgInner) {
this.byteBuf.clear();
//忽略不重要逻辑
// 1 TOTALSIZE
this.byteBuf.writeInt(msgLen);
// 2 MAGICCODE
this.byteBuf.writeInt(msgInner.getVersion().getMagicCode());
// 3 BODYCRC
this.byteBuf.writeInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.byteBuf.writeInt(msgInner.getQueueId());
// 5 FLAG
this.byteBuf.writeInt(msgInner.getFlag());
// 6 QUEUEOFFSET 这里先填占位,后面doappend的时候再写入真实的queueOffset
this.byteBuf.writeLong(queueOffset);
// 7 PHYSICALOFFSET, need update later;这里也是先占位,然后doappend的时候更新消息的真实offset
this.byteBuf.writeLong(0);
// 8 SYSFLAG
this.byteBuf.writeInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.byteBuf.writeLong(msgInner.getBornTimestamp());
// 10 BORNHOST
ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
this.byteBuf.writeBytes(bornHostBytes.array());
// 11 STORETIMESTAMP
this.byteBuf.writeLong(msgInner.getStoreTimestamp()); //后续更新
// 12 STOREHOSTADDRESS
ByteBuffer storeHostBytes = msgInner.getStoreHostBytes(); //..
this.byteBuf.writeBytes(storeHostBytes.array());
// 13 RECONSUMETIMES
this.byteBuf.writeInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.byteBuf.writeInt(bodyLength);
if (bodyLength > 0)
this.byteBuf.writeBytes(msgInner.getBody());
// 16 TOPIC
if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) {
this.byteBuf.writeShort((short) topicLength);
} else {
this.byteBuf.writeByte((byte) topicLength);
}
this.byteBuf.writeBytes(topicData);
// 17 PROPERTIES
this.byteBuf.writeShort((short) propertiesLength);
if (propertiesLength > 0)
this.byteBuf.writeBytes(propertiesData);
return null;
}
这里表明了RocketMQ中消息的定义,组成结构为
-
对topic-queue加锁
对putMessageLock加锁 -
获取mappedFile
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
关于commitlog、mappedFile等介绍,参见文章《》 -
写入mappedFile
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
核心方法,传入一个msg,回调函数以及上下文,将消息写入pagecache,然后调用callback中的doAppend方法写入 -
数据统计
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes()); -
处理刷盘以及主从同步
handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);