首页 > 其他分享 >RocketMQ存储篇三:追加消息

RocketMQ存储篇三:追加消息

时间:2023-01-05 21:44:42浏览次数:38  
标签:存储 currentPos fileSize AppendMessageResult messageExt 追加 byteBuffer RocketMQ res

第一篇介绍了存储的概览,定位到消息追加消息处细看一下
获取mappedFile ——> 追加消息 ——> 处理结果
image

@DefaultMappedFile#appendMsgInner()

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
        assert messageExt != null;
        assert cb != null;

        int currentPos = WROTE_POSITION_UPDATER.get(this);

        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = appendMessageBuffer().slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
                // traditional batch message
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBatch) messageExt, putMessageContext);
            } else if (messageExt instanceof MessageExtBrokerInner) {
                // traditional single message or newly introduced inner-batch message
				// 使用回调函数来处理append消息的过程
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

@DefaultAppendMessageCallback#doAppend()
处理逻辑

  1. 根据bytebuffer中的wroteposition以及mappedfile开头获取物理offset
    long wroteOffset = fileFromOffset + byteBuffer.position();
  2. 构造msgId
  3. 判断mappedfile是否有足够的空间写msg, 如果没有,就返回一个mappedFile用完的状态,重新分配mappedfile并写入,如果空间足够,就修改encode阶段消息中的一些假值并返回PUT_OK状态
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank)
  4. 根据put的状态来做处理(刷盘以及ha)

那么,消息到底追加到了哪里呢?

标签:存储,currentPos,fileSize,AppendMessageResult,messageExt,追加,byteBuffer,RocketMQ,res
From: https://www.cnblogs.com/okdogngge/p/17027809.html

相关文章

  • 一步一步学爬虫(4)数据存储之MySQL存储
    (一步一步学爬虫(4)数据存储之MySQL存储)4.4MySQL存储  关系型数据库是基于关系模型的数据库,而关系模型是通过二维表来保存的,所以它的存储方式就是行列组成的表,每一列是......
  • Apache RocketMQ 斩获 InfoQ 2022 年度十大开源新锐项目
    以“深入数字经济·洞见技术价值”为主题的【InfoQ2022中国技术力量年终榜单】正式公布获奖名单。其中,ApacheRocketMQ以其卓越的易用性、社区活跃性、成熟度、产品优越性......
  • Apache RocketMQ 斩获 InfoQ 2022 年度十大开源新锐项目
    以“深入数字经济·洞见技术价值”为主题的【InfoQ2022中国技术力量年终榜单】正式公布获奖名单。其中,ApacheRocketMQ以其卓越的易用性、社区活跃性、成熟度、产品优越......
  • 洛谷P4017 最大食物链计数(追加上一篇文章的第二种方法 )
    上一篇不是说第二种方法RE了吗?害,其实,是我的问题我粗心了。------------------------------------------------------因为    这两处的循环应该到n而不是m。......
  • 一步一步学爬虫(4)数据存储之文本存储
    (一步一步学爬虫(4)数据存储之文本存储)4.1TXT纯文本文件存储  将数据保存到TXT文本的操作非常简单,而且TXT文本几乎兼容任何平台,但是这有个缺点,那就是不利于检索。所......
  • 一步一步学爬虫(4)数据存储之JSON存储
    (一步一步学爬虫(4)数据存储之JSON存储)4.2方便灵活的JSON文本文件存储  JSON,全称为JavaScriptObjectNotation,也就是JavaScript对象标记,它通过对象和数组的组合......
  • 一步一步学爬虫(4)数据存储之CSV文件存储
    (一步一步学爬虫(4)数据存储之CSV文件存储)4.3CSV文件存储CSV,全称Comma-SeparatedValues,中文叫做逗号分隔值或字符分隔值,其文件以纯文本形式存储表格数据。CSV文件是一个......
  • C#实现文件二进制存储
    privatestaticBinaryFormatterTransfer=newBinaryFormatter();publicintBinaryFileSav(){//-----二进制文件写入并存储......
  • 对象存储服务MinIO的基本用法
    本文重要内容主要介绍MinIO与SpringBoot项目整合时的基本用法,没有涉及较多的原理剖析,更注重于应用实践(功能实现)。与SpringBoot整合的完整实例(代码可直接复用):https://git......
  • 一文详解RocketMQ的存储模型
    摘要:RocketMQ优异的性能表现,必然绕不开其优秀的存储模型。本文分享自华为云社区《终于弄明白了RocketMQ的存储模型》,作者:勇哥java实战分享。RocketMQ优异的性能表现,......