首页 > 其他分享 >RocketMQ存储篇一:概览

RocketMQ存储篇一:概览

时间:2023-01-01 00:13:55浏览次数:37  
标签:存储 mappedFile msgInner 概览 msg byteBuf writeInt fail RocketMQ

这篇文章记录RocketMQ大致的存储流程,源码为5.0.1-SNAPSHOT

入口从Broker模块的SendMessageProcessor开始
@SendMessageProcessor#sendMessage()
image

5.0中将putmessage中的一些校验逻辑下沉到了hook中

for (PutMessageHook putMessageHook : putMessageHookList) {
            PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(msg);
            if (handleResult != null) {
                return CompletableFuture.completedFuture(handleResult);
            }
        }
  1. store服务关闭会fail
  2. slave as master模式中slave不可用会fail
  3. store不可写会fail
  4. 非重试topic超过最大长度会失败(Byte 127)
  5. topic长度超过255会失败
  6. msg body为空会fail
  7. pagecache busy会fail

然后会调用commitlog中的putMessage方法
image
重点看一下这里面的逻辑
@commitLog#asyncPutMessage(msg)

  1. 编码消息
    PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
    public PutMessageResult encode(MessageExtBrokerInner msgInner) {
        this.byteBuf.clear();
		//忽略不重要逻辑
        // 1 TOTALSIZE
        this.byteBuf.writeInt(msgLen);
        // 2 MAGICCODE
        this.byteBuf.writeInt(msgInner.getVersion().getMagicCode());
        // 3 BODYCRC
        this.byteBuf.writeInt(msgInner.getBodyCRC());
        // 4 QUEUEID
        this.byteBuf.writeInt(msgInner.getQueueId());
        // 5 FLAG
        this.byteBuf.writeInt(msgInner.getFlag());
        // 6 QUEUEOFFSET 这里先填占位,后面doappend的时候再写入真实的queueOffset
        this.byteBuf.writeLong(queueOffset);
        // 7 PHYSICALOFFSET, need update later;这里也是先占位,然后doappend的时候更新消息的真实offset
        this.byteBuf.writeLong(0);
        // 8 SYSFLAG
        this.byteBuf.writeInt(msgInner.getSysFlag());
        // 9 BORNTIMESTAMP
        this.byteBuf.writeLong(msgInner.getBornTimestamp());

        // 10 BORNHOST
        ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
        this.byteBuf.writeBytes(bornHostBytes.array());

        // 11 STORETIMESTAMP
        this.byteBuf.writeLong(msgInner.getStoreTimestamp()); //后续更新

        // 12 STOREHOSTADDRESS
        ByteBuffer storeHostBytes = msgInner.getStoreHostBytes();  //..
        this.byteBuf.writeBytes(storeHostBytes.array());

        // 13 RECONSUMETIMES
        this.byteBuf.writeInt(msgInner.getReconsumeTimes());
        // 14 Prepared Transaction Offset
        this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
        // 15 BODY
        this.byteBuf.writeInt(bodyLength);
        if (bodyLength > 0)
            this.byteBuf.writeBytes(msgInner.getBody());

        // 16 TOPIC
        if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) {
            this.byteBuf.writeShort((short) topicLength);
        } else {
            this.byteBuf.writeByte((byte) topicLength);
        }
        this.byteBuf.writeBytes(topicData);

        // 17 PROPERTIES
        this.byteBuf.writeShort((short) propertiesLength);
        if (propertiesLength > 0)
            this.byteBuf.writeBytes(propertiesData);

        return null;
    }

这里表明了RocketMQ中消息的定义,组成结构为
image

  1. 对topic-queue加锁
    对putMessageLock加锁

  2. 获取mappedFile
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    关于commitlog、mappedFile等介绍,参见文章《》

  3. 写入mappedFile
    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
    核心方法,传入一个msg,回调函数以及上下文,将消息写入pagecache,然后调用callback中的doAppend方法写入

  4. 数据统计
    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

  5. 处理刷盘以及主从同步
    handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);

标签:存储,mappedFile,msgInner,概览,msg,byteBuf,writeInt,fail,RocketMQ
From: https://www.cnblogs.com/okdogngge/p/17017433.html

相关文章

  • containerd容器存储探究
    ContainerD容器目录结构探究启动容器作为开始,我们需要去启动一个容器。你可以通过命令行的方式来启动一个容器,例如:ctripulldocker.io/library/nginx:alpinectrc......
  • Flink Forward Asia 2022 主论坛概览
    2022 年 11 月 26-27 日,Flink Forward Asia(FFA)峰会成功举行。Flink Forward Asia 是由 Apache 软件基金会官方授权、由阿里云承办的技术峰会,是目前国内最大的 ......
  • 微人事中的存储过程
    前言说起存储过程,可能少数人跟我一样是第一次听说;没关系,以下这篇博客,我会简要地讲述存储过程的作用、使用以及项目中的应用1.存储过程作用和使用存储过程是由一系列......
  • 第九章《字符串》第3节:String类对象的存储方式
    ​大多数情况下,程序员都会用String类对象表示一个字符串。虚拟机在存储String类对象时会创建一个常量池,把符合条件的对象都存储到常量池中。所谓常量池是指一块用于保存对象......
  • 从USB存储设备启动树莓派
    设置USB启动  当前环境使用的树莓派版本为:RaspberryPi3B,并且已经在SD卡中烧录系统;1.使用SD卡烧录RaspberryPiOS。  可以只使用RaspberryPiOSLite,无桌面环境;......
  • 使用 Docker Hub 完美地存储 Helm 图表实战
    使用DockerHub完美地存储Helm图表实战​​Helm​​是Kubernetes的包管理器。它是一个开源容器编排系统。它通过提供一种简单的方法来定义、安装和升级复杂的Kubern......
  • 操作系统-分页管理存储的实现
    前言我们从前面的cache高数缓存中知道了,CPU的访问cache部分的过程(也就是下图的青色部分的过程),这个章节我们将会学习访问快表和缺页部分的处理.下面这个分......
  • 华为云对象存储,助力企业驶入“数据快车道”
    随着企业数据信息增加,企业服务器硬盘逐渐无法满足庞大的数据信息存储业务,越来越多的企业开始将数据开始向云上迁移,而企业用户对云上存储空间的访问通常是通过公网网络连接方......
  • 华为云对象存储OBS,助力企业降本增效
    信息化时代,互联网信息技术在企业发展中扮演着越来越重要的角色,对于网络信息数据的依赖程度也在不断提高。企业可以通过使用计算机进行信息管理,将各种资源有效利用起来。但是......
  • 解决“双十一”电商行业数据存储难题,华为云OBS值得期待
    叫外卖、网约车、刷短视频、双十一网购……这些我们再熟悉不过的生活场景,其实背后都离不开云服务、大数据存储等底层云计算技术的支持。简单来说,只要用户有刷视频、叫外卖等......