首页 > 其他分享 >【RocketMQ】消息的消费总结

【RocketMQ】消息的消费总结

时间:2023-09-19 14:33:29浏览次数:33  
标签:总结 消费 队列 Broker 拉取 消息 RocketMQ 延迟

消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求ConsumeRequest将消费请求提交到线程池处理,否则需要分批构建进行提交。

消息消费

在消息被提交到线程池后进行处理时,会调用消息监听器的consumeMessage进行消息消费,它返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESSRECONSUME_LATER

  • CONSUME_SUCCESS:表示消息消费成功。
  • RECONSUME_LATER:表示消费失败,稍后延迟重新进行消费。

处理消息消费结果

设置ackIndex

在消息消费完毕之后,会根据consumeMessage方法返回的结果状态进行处理,对ackIndex的值进行设置,ackIndex的值用于在下一步中处理消费失败的消息。

前面可知消费结果状态有以下两种:

  • CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消费的总消息个数 - 1,表示消息都消费成功。
  • RECONSUME_LATER:消息消费失败,延迟进行消费,此时ackIndex值为-1。

二、处理消费失败的消息

广播模式

广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。

集群模式

开启for循环,初始值为i = ackIndex + 1,结束条件为i < consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:

消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,向Broker发送CONSUMER_SEND_MSG_BACK请求,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,消费次数加1,并加入到失败消息列表中,稍后重新提交到消息消费线程池进行处理。

发送CONSUMER_SEND_MSG_BACK请求

延迟级别

RocketMQ的延迟级别对应的延迟时间常量定义如下:

public class MessageStoreConfig {
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

延迟级别与延迟时间对应关系:
延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 ---> 延迟时间5s
延迟级别2 ---> 延迟时间10s
...
以此类推,最大的延迟时间为2h。

在向Broker发送CONSUMER_SEND_MSG_BACK请求的时候,会从上下文中获取设置的延迟级别(默认为0,也就是延迟1s),然后设置以下信息,向Broker发送请求:

  • 设置请求类型,请求类型为CONSUMER_SEND_MSG_BACK
  • 设置消费者组名称;
  • 设置消息在CommitLog中的偏移量;
  • 设置延迟级别;
  • 设置消息的ID;
  • 设置该消息的最大消费次数;

Broker对CONSUMER_SEND_MSG_BACK请求处理

Broker对CONSUMER_SEND_MSG_BACK类型的请求处理逻辑如下:

  1. 根据消费组获取该消费者组的订阅信息配置;
  2. 根据消费者组名称获取对应的重试主题;
  3. 从该消费者组的重试队列中随机选取一个队列;
  4. 根据消息在CommitLog中的偏移量从commitLog文件中获取消息内容;
  5. 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0:
    • 如果条件满足,表示需要把消息放入到死信队列DLQ中,此时从死信队列中随机选取一个队列;
    • 如果条件不满足,判断延迟级别是否为0,如果为0的话,会使用消息的消费次数作 + 3为新的延迟级别进行延迟消费;
  6. 新建消息对象MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),重新添加到CommitLog中,消息主题的设置有两种情况:
    • 达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中;
    • 未达到DLQ队列的条件,设置延迟级别,使用重试主题(%RETRY% + 消费组名称),之后将消息投递到此主题下的队列中;
  7. 调用asyncPutMessage存储消息;

asyncPutMessage方法中,会对延迟级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:

  1. 获取延迟消息的主题名称,RocketMQ对延迟消息有一个默认的主题名称SCHEDULE_TOPIC_XXXX;
  2. 根据消息设置的延迟级别,获取对应的延迟队列,SCHEDULE_TOPIC_XXXX主题下,会根据延迟级别创建对应的消息队列,所以这一步会根据消息的延迟级别投递到对应的队列中;
  3. 在消息属性中,设置消息原本的主题名称和消息队列,然后将消息当前的Topic改成RMQ_SYS_SCHEDULE_TOPIC

总结
消费者在消息消费失败的时候,会向Broker发送CONSUMER_SEND_MSG_BACK请求,在请求处理中会判断消息的消费次数是否大于最大的消费次数,如果超过最大消费次数,会将消息投递到死信队列中。
如果未达到最大的消费次数,会根据请求中设置的延迟级别,重新生成一条消息,使用重试主题(%RETRY% + 消费组名称),并随机选取一个队列投递消息,延迟进行消费,不过消息不会立刻投递到队列中,在消息存储之前会对延迟级别进行判断,如果需要延迟消费,会使用RocketMQ默认创建的SCHEDULE_TOPIC_XXXX主题,先根据延迟级别将消息投递到对应的延迟队列中,然后由一个定时任务去检测这个主题下的消息,当消息到达延迟的时间后,再将消息取出投递到原本主题下的消息队列中,之后的流程就与普通消息的存储一致,将消息存入CommitLog中,再创建对应的ConsumeQueue数据,消费者就可以拉取到消息重新进行消费。

消费者在启动的时候,会处理订阅的Topic数据,如果是集群模式,会自动添加重试主题的订阅(%RETRY% + 消费组名称),然后就可以从重试主题中拉取到对应的重试消息进行消费。

更新拉取偏移量

以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后更新拉取偏移量。

RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。

广播模式
广播模式对应的OffSetStore实现类为LocalFileOffsetStore,使用了一个ConcurrentMap类型的变量offsetTable存储每个消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。
在更新拉取进度的时候,对offsetTable中的值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘。

集群模式
集群模式对应的实现类为RemoteBrokerOffsetStore,更新进度与广播模式下的更新类似,都是只更新了offsetTable中的数据。

持久化的触发
消费者在启动的时候注册了定时任务,定时将消息拉取进度进行持久化,对于广播模式,将每个消息队列对应的拉取偏移量持久化到本地文件即可,对于集群模式,由于拉取进度保存在Broker端,所以需要向Broker发送请求进行持久化。

RocketMQ消息的消费相关源码可参考:【RocketMQ】【源码】消息的消费

标签:总结,消费,队列,Broker,拉取,消息,RocketMQ,延迟
From: https://www.cnblogs.com/shanml/p/17700669.html

相关文章

  • 嵌入式三级知识点总结最终章
    181. 操作系统为软件系统提供了多任务运行环境等等而不是板级支持包,BSP运行之前,调试工具不能够用,BSP调试分两步 最小系统和外围设备驱动程序调试。182. RAM访问速度要比ROM快很多。183. U-Boot能够支持多种体系结构的处理器但是每种结构有其自身的版本。184. VXworks(微内核)......
  • 全网最详细的OSPF原理总结,看这篇就够了!
    大家好,我的网工朋友。OSPF是一种基于链路状态的路由协议,也是专为IP开发的路由协议,直接运行在IP层上面。它从设计上保证了无路由环路。除此之外,IS-IS也是很常见的链路状态协议。为什么会出现OSPF?作为目前主流的IGP协议,OSPF主要是为了解决RIP的三大问题而出现的,比如:收敛很慢、容......
  • Glide源码阅读之工厂模式4总结
    工厂模式的应用比较多;变化形态也是各种各样。但经过这段时间的解读。大概可以用浓缩为1、不是使用new创建对象;2、没有明显build方法创建对象;3、带xxxFactory的几乎都满足工厂模式。当然第3点比较明显。如果不带xxxFactory等这样的标识那看看是否有implements、extends。而且内......
  • Glide源码阅读之策略模式4总结
    《Android源码设计模式解析与实践》定义策略模式定义了一系列的算法,并将每一个算法封装起来,而且使它们还可以相互替换。策略模式让算法独立于使用它的客户而独立变化使用场景针对同一类型问题的多种处理方式。仅仅是具体行为有差别时需要安全地封装多种同一类型的操作时出现同一抽......
  • 迭代器、生成器、模块和包知识点总结
    第一部分:迭代器 例1. for....in运行机制li=[1,2,3,4]#在列表中取值从第一个取到最后一个结束#foriinli:#print(i)#1,2,3,4i=0whilei<len(li):#索引#print(i)#输出索引0,1,2,3print(li[i])#取列表值i+=1print(i)#i=4的时......
  • BCB图像处理总结
    近期遇到了一下图像处理的问题,特总结一下 BCB截图一、画布(Canvas)Graphics::TBitmap*bmp=newGraphics::TBitmap;TCanvas*canvas=newTCanvas;HDChdc=GetDC(NULL);canvas->Handle=hdc;bmp->Width=Screen->Width;bmp->Height=Screen->Height;bmp->Canvas->CopyRect(Rect......
  • 一次访问Redis延时高问题排查与总结
    一次访问Redis延时高问题排查与总结https://mp.weixin.qq.com/s/f3dQIC4DBhWibyXQKBXrzg实战总结|一次访问Redis延时高问题排查与总结(续)https://mp.weixin.qq.com/s/trbGNYZPEfzaAMz6kZ_YKg翻译搜索复制......
  • Whisper + NemoASR + ChatGPT 实现语言转文字、说话人识别、内容总结等功能
    引言2023年,IT领域的焦点无疑是ChatGPT,然而,同属OpenAI的开源产品Whisper似乎鲜少引起足够的注意。Whisper是一款自动语音识别系统,可以识别来自99种不同语言的语音并将其转录为文字。如果说ChatGPT为计算机赋予了大脑,那么Whisper则为其赋予了耳朵。想象一下,在企业应用领域,我们能......
  • Java语言基础知识全总结
    一.Java的优点1.      跨平台性。一次编译,到处运行。Java编译器会将Java代码编译成能在JVM上直接运行的字节码文件,C++会将源代码编译成可执行的二进制代码文件,所以C++执行速度快2.      纯面向对象。Java所有的代码都必须在类中书写。C++兼具面向对象和面向过程的特......
  • 9月18日总结
    一.今天做了什么今天上午进行了传统工程实训,先在工程训练中心,老师向我们详细讲解了车铣刨磨的过程,然后老师演示了车床铣床磨床的使用,又演示了数控机床的使用。接着我们去科技楼,老师用半自动数控机床加工了一个葫芦,又用激光的机器切铁片和塑料片。最后用激光雕刻水晶送给我们班做......