首页 > 编程语言 >【RocketMQ】【源码】延迟消息实现原理

【RocketMQ】【源码】延迟消息实现原理

时间:2023-09-15 09:11:34浏览次数:42  
标签:msgInner TOPIC 源码 消息 offset msg RocketMQ 延迟

RocketMQ设定了延迟级别可以让消息延迟消费,延迟消息会使用SCHEDULE_TOPIC_XXXX这个主题,每个延迟等级对应一个消息队列,并且与普通消息一样,会保存每个消息队列的消费进度(delayOffset.json中的offsetTable):

public class MessageStoreConfig {
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

延迟级别与延迟时间对应关系:
延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 ---> 延迟时间5s
延迟级别2 ---> 延迟时间10s
...
以此类推,最大的延迟时间为2h。

延迟消息

使用延迟消息时,只需设定延迟级别即可,Broker在存储的时候会判断是否设定了延迟级别,如果设置了延迟级别就按延迟消息来处理,由【消息的存储】文章可知,消息存储之前会进入到asyncPutMessage方法中,延迟消息的处理就是在这里做的,处理逻辑如下:

  1. 判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延迟级别;

  2. 获取RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX:

    public class TopicValidator {
        // ...
        public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
    }
    
  3. 根据延迟级别选取对应的队列,一般会把相同延迟级别的消息放在同一个队列中;

  4. 将消息原本的TOPIC和队列ID设置到消息属性中;

  5. 更改消息队列的主题为RMQ_SYS_SCHEDULE_TOPIC,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC,会将消息投递到延迟队列中;

public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // ...
        // 获取事务类型
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        // 如果未使用事务或者提交事务
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 判断延迟级别
            if (msg.getDelayTimeLevel() > 0) {
                // 如果超过了最大延迟级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 获取RMQ_SYS_SCHEDULE_TOPIC
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 根据延迟级别选取对应的队列
                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // 将消息原本的TOPIC和队列ID设置到消息属性中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 设置SCHEDULE_TOPIC
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
        // ...
    }
}

延迟消息被投递到延迟队列中之后,会由定时任务去处理队列中的消息,接下来就去看下定时任务的处理过程。

注册定时任务

Broker启动的时候会调用ScheduleMessageServicestart方法,start方法中为不同的延迟级别创建了对应的定时任务来处理延迟消息,然后从offsetTable中获取当前延迟等级对应那个消息队列的消费进度,如果未获取到,则使用0,从队列的第一条消息开始处理,然后创建定时任务DeliverDelayedMessageTimerTask,可以看到首次是延迟1000ms执行:

public class ScheduleMessageService extends ConfigManager {
    // 首次执行延迟的时间
    private static final long FIRST_DELAY_TIME = 1000L;
    public void start() {
        if (started.compareAndSet(false, true)) {
            super.load();
            this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
            if (this.enableAsyncDeliver) {
                this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
            }
            // 遍历所有的延迟级别
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) { // 如果获取的消费进度为空
                    offset = 0L; // 默认为0,从第一条消息开始处理
                }
                if (timeDelay != null) {
                    if (this.enableAsyncDeliver) {
                        this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                    }
                    // 为每个延迟级别创建对应的定时任务
                    this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                }
            }
            // ...
        }
    }
}

运行定时任务

DeliverDelayedMessageTimerTaskScheduleMessageService的内部类,它实现了Runnable接口,在run方法中调用了executeOnTimeup方法来处理延迟消息:

public class ScheduleMessageService extends ConfigManager {
    class DeliverDelayedMessageTimerTask implements Runnable {
        @Override
        public void run() {
            try {
                if (isStarted()) {
                    // 执行任务
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
            }
        }
    }
}

executeOnTimeup方法的处理逻辑如下:

  1. 根据主题名称以及延迟等级获取ConsumeQueue,如果获取为空,会重新创建一个任务提交到线程池中,延迟时间为DELAY_FOR_A_WHILE,延迟一段时间后重新执行;
  2. 根据当前延迟消息队列的消费进度,从ConsumeQueue获取数据,如果获取为空,处理同上,重新创建一个任务延迟一段时间之后重新执行;
  3. 因为队列中的消息是按写入顺序进行存储的,所以根据偏移量获取到的第一条消息开始,向后处理:
    (1)获取消息存储时间戳
    (2)根据延迟等级和消息的存储时间戳计算消息的到期时间
    (3)获取当前时间,使用当前时间减去消息的到期时间
    • 如果值大于0,表示还未到达指定的延迟时间,需要继续等待,重新创建一个任务延迟一段时间之后重新执行;
    • 如果值小于等于0,表示已经到达了指定的延迟时间,会调用messageTimeup对消息处理,恢复消息原本的Topic;
  4. 根据是否开启了异步来决定同步投递消息还是异步投递消息,这一步会将消息投递到原本Topic中的消息队列,之后与普通消息的存储流程一致;
public class ScheduleMessageService extends ConfigManager {
    class DeliverDelayedMessageTimerTask implements Runnable {
        public void executeOnTimeup() {
            // 根据主题名称以及延迟等级获取ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            // 如果ConsumeQueue为空,新建定时任务等待下次执行
            if (cq == null) {
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
                return;
            }
            // 根据偏移量从ConsumeQueue获取数据
            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
            if (bufferCQ == null) {
                // ...

                // 如果获取为空,新建定时任务等待下次执行
                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
                return;
            }
            long nextOffset = this.offset;
            try {
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                // 开始处理延迟消息
                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 获取消息在CommitLog中的偏移量
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    // 消息大小
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // tag哈希值
                    long tagsCode = bufferCQ.getByteBuffer().getLong();
                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                tagsCode, offsetPy, sizePy);
                            // 获取消息存储时间戳
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            // 根据延迟等级和消息的存储时间计算消息的到期时间
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }
                    // 获取当前时间
                    long now = System.currentTimeMillis();
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    // 计算消息的到期时间
                    long countdown = deliverTimestamp - now;
                    // 如果大于0,表示还未到达指定的延迟时间,需要继续等待
                    if (countdown > 0) {
                        // 新建定时任务等待下次执行
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                    // 走到这里,表示已经到了消息的延迟时间,从CommitLog取出消息
                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                    if (msgExt == null) {
                        continue;
                    }
                    // 处理消息,这里会恢复消息原本的Topic
                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                        continue;
                    }

                    boolean deliverSuc;
                    // 投递消息到原本的主题中
                    if (ScheduleMessageService.this.enableAsyncDeliver) {
                        // 异步投递
                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                    } else {
                        // 同步投递
                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                    }
                    if (!deliverSuc) {
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                }
                // 计算下一条消息的偏移量
                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
            } catch (Exception e) {
                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
            } finally {
                bufferCQ.release();
            }

            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
        }

    }

    private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setBody(msgExt.getBody()); // 设置消息体
        msgInner.setFlag(msgExt.getFlag()); // 设置falg
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        // ...
        msgInner.setWaitStoreMsgOK(false);
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        // 恢复原本的Topic
        msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

        String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
        int queueId = Integer.parseInt(queueIdStr);
        msgInner.setQueueId(queueId);

        return msgInner;
    }
}

标签:msgInner,TOPIC,源码,消息,offset,msg,RocketMQ,延迟
From: https://www.cnblogs.com/shanml/p/17701063.html

相关文章

  • 每日一题:吃透大文件上传问题(附可运行的前后端源码)
    https://www.cnblogs.com/never404/p/17699440.html 在日常开发中,文件上传是常见的操作之一。文件上传技术使得用户可以方便地将本地文件上传到Web服务器上,这在许多场景下都是必需的,比如网盘上传、头像上传等。但是当我们需要上传比较大的文件的时候,容易碰到以下问题:上传时......
  • springboot整合rocketMQ——消费者
    依赖<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/......
  • 基于springboot学院宿舍管理系统-计算机毕业设计源码+LW文档
    摘要随着信息技术的发展,管理系统越来越成熟,各种企事业单位使用各种类型的管理系统来提高工作效率,从而降低手工劳动的弊端。我国政府一直以来都非常重视教育事业的发展,随着学生人数增加,学校对宿舍学生信息管理也变的困难。因此,高校提出通过开发宿舍管理系统来优化管理方案,对宿舍信......
  • 个性404页面HTML源码分享
    分享的HTML与上图内容一样,需要修改的小伙伴可以自行修改内容。<style><!--@importurl("https://fonts.googleapis.com/css?family=Share+Tech+Mono|Montserrat:700");*{margin:0;padding:0;border:0;font-size:100%;font:inherit;vertical-align:baseline......
  • 《Python数据处理》PDF电子书+源码
    本书采用基于项目的方法,介绍用Python完成数据获取、数据清洗、数据探索、数据呈现、数据规模化和自动化的过程。主要内容包括:Python基础知识,如何从CSV、Excel、XML、JSON和PDF文件中提取数据,如何获取与存储数据,各种数据清洗与分析技术,数据可视化方法,如何从网站和API中提取数据。下......
  • 《Python编程快速上手——让繁琐工作自动化》 原版电子书PDF+源码
    第11章从Web抓取信息第12章处理Excel电子表格第13章处理PDF和Word文档第14章处理CSV文件和JSON数据第15章保持时间、计划任务和启动程序第16章发送电子邮件和短信第17章操作图像第18章用GUI自动化控制键盘和鼠标附录A安装第三方模块附录B运行程序附录C习题答案下载:https......
  • Yolov5——训练目标检测模型详解(含完整源码)
    项目的克隆打开yolov5官网(官网地址),下载yolov5的项目:环境的安装(免额外安装CUDA和cudnn)打开anaconda的终端,创建新的名为yolov5的环境(python选择3.8版本):condacreate-nyolov5python=3.8执行如下命令,激活这个环境:condaactivateyolov5打开pytorch的官网,选择自己显卡......
  • 【语音处理】语音信号特技处理(延时、混响、滤波)附Matlab源码
    ✅作者简介:热爱科研的Matlab仿真开发者,修心和技术同步精进,matlab项目合作可私信。......
  • [SpringSecurity5.6.2源码分析八]:SecurityContextPersistenceFilter
    前言• 当我们不在其他线程而就在容器创建的线程中使用SecurityContextHolder.getContext()获取SecurityContext的时候,正常都能获取到• SecurityContext默认是放在线程中的,所以说在某个地方一定将SecurityContext放到线程中,而这个类就是SecurityContextPersistenceFilter1、Secu......
  • springboot智能3D人体导医系统源码
    智能3D人体导医系统源码医院智能导诊系统是在医疗中使用的引导患者自助就诊挂号,在就诊的过程中有许多患者不知道需要挂什么号,要看什么病,通过智能导诊系统,可输入自身疾病的症状表现,或选择身体部位,在经由智慧导诊系统多维度计算,精准推荐科室,引导患者挂号就诊,实现科学就诊,不再担心挂......