首页 > 其他分享 >RocketMQ - 消费者进度保存机制

RocketMQ - 消费者进度保存机制

时间:2023-03-01 09:12:50浏览次数:30  
标签:持久 MessageQueue void Broker 保存 进度 位点 final RocketMQ

RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中


/**
 * Offset store interface
 */
public interface OffsetStore {
    /**
     * 加载位点信息
     */
    void load() throws MQClientException;

    /**
     * 更新缓存位点信息
     */
    void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);

    /**
     * 读取本地位点信息
     *
     * @return The fetched offset
     */
    long readOffset(final MessageQueue mq, final ReadOffsetType type);

    /**
     * 持久化全部队列的位点信息
     */
    void persistAll(final Set<MessageQueue> mqs);

    /**
     * 持久化某一个队列的位点信息
     */
    void persist(final MessageQueue mq);

    /**
     * 删除某一个队列的位点信息
     */
    void removeOffset(MessageQueue mq);

    /**
     * 复制一份缓存位点信息
     * @return The cloned offset table of given topic
     */
    Map<MessageQueue, Long> cloneOffsetTable(String topic);

    /**
     * 将本地消费位点持久化到Broker中
     * @param mq
     * @param offset
     * @param isOneway
     */
    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException;
}

客户端消费进度保存也叫消费进度持久化,开源RocketMQ 4.2.0支持定时持久化和不定时持久化两种方式

定时持久化位点实现方法是org.apache.rocketmq.client.impl.factory.MQClientInstance.startScheduledTask()

定时持久化位点逻辑是通过定时任务来实现的,在启动程序10s后,会定时调用持久化方法MQClientInstance.this.persistAllConsumerOffset(),持久化每一个消费者消费的每一个MessageQueue的消费进度。

不定时持久化也叫Pull-And-Commit,也就是在执行Pull方法的同时,把队列最新消费位点信息发给Broker,具体实现代码在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage()方法中

该方法中有两处持久化位点信息
第一处,在拉取完成后,如果拉取位点非法,则此时客户端会主动提交一次最新的消费位点信息给Broker,以便下次能使用正确的位点拉取消息,该处更新位点信息

第二处,在执行消息拉取动作时,如果是集群消费,并且本地位点值大于0,那么把最新的位点上传给Broker

代码中通过commitOffsetEnable、sysFlag两个字段表示是否可以上报消费位点给Broker。在执行Pull请求时,将sysFlag作为网络请求的消息头传递给Broker,Broker中处理该字段的逻辑在org.apache.rocketmq.broker.processor.PullMessageProcessor.processRequest()方法中

hasCommitOffsetFlag:Pull请求中的sysFlag参数,是决定Broker是否执行持久化消费位点的一个因素。
brokerAllowSuspend:Broker是否能挂起。如果Broker是挂起状态,将不能持久化位点。
storeOffsetEnable:True表示Broker需要持久化消费位点,False则不用持久化位点。

标签:持久,MessageQueue,void,Broker,保存,进度,位点,final,RocketMQ
From: https://www.cnblogs.com/vipsoft/p/17162770.html

相关文章

  • 金蝶VB插件--单据保存前检查
    金蝶VB插件--单据保存前检查--转自https://bbs.csdn.net/topics/360161119?list=lzvb代码引用k3classEvents'-----以下是代码'实现一个很简单的功能'--单据体分录[F......
  • 目标责任成本数据无法保存的情况反查-案例:英德路灯项目
    起因事件的起因源于“英德路灯”项目的无法保存。会提示其他支出板块超额了。已签合同大于支出,存在没有合同走账的情况。解决方案从入口查:查找对应板块所有汇总板块......
  • 转载:pageOffice插件 springboot实现服务器上Word文档在线打开编辑保存
    pageOffice插件springboot实现服务器上Word文档在线打开编辑保存需求:在oa系统上,想实现在线,服务器上doc,docx文档,在web打开,编辑。编辑后,可以再同步保存到服务器端。......
  • RocketMQ - 消费者Rebalance机制
    客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的......
  • 学习进度
    今天学习了5小时的Java,总代码量1368行,我对课上的英语单词链进行了深度的练习。除此之外,在晚上的时间,我对androidstudio进行了学习。我安装了sqlite数据库,并且实现了在Andro......
  • C# SmoothProgressBar自定义进度条控件
    usingSystem;usingSystem.Collections;usingSystem.ComponentModel;usingSystem.Drawing;usingSystem.Data;usingSystem.Windows.Forms;namespaceSmoothProgres......
  • 用dask并行把大量文本数据读入numpy并分批保存
    导入包importnumpyasnpimportosimportdask看看文件格式和file_list=os.listdir('train_data')print(len(file_list))print(file_list[:100])delayed读......
  • mq超时异常org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC:
       mq生产环境正常生产和消费都挺稳定的,99.999%应该都没问题的,比较稳定。今天刚好碰到过一例因为写超时导致异常问题。   2023-02-2321:19:58.449TID:8b......
  • RocketMQ 5.0 vs 4.9.X 图解架构对比
    本文作者:李伟,ApacheRocketMQCommitter,RocketMQPython客户端项目Owner,ApacheDorisContributor,腾讯云数据库开发工程师。01RocketMQ4.9.X架构在4.9.X中每个组件和......
  • 【RocketMQ】Dledger日志复制源码分析
    消息存储在【RocketMQ】消息的存储一文中提到,Broker收到消息后会调用CommitLog的asyncPutMessage方法写入消息,在DLedger模式下使用的是DLedgerCommitLog,进入asyncPutMess......