首页 > 其他分享 >【RocketMQ】RocketMQ 5.0新特性(二)- Pop消费模式

【RocketMQ】RocketMQ 5.0新特性(二)- Pop消费模式

时间:2023-10-13 10:13:54浏览次数:39  
标签:5.0 消费 消费者 队列 Broker Pop ACK 消息 RocketMQ

Pop模式消费和消息粒度负载均衡

在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。

  • Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下消费需要不断主动从Broker拉取消息。
  • Push模式:需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,底层依旧是消费者从Broker拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull模式一样不断判断是否有消息到来。


注:图片来自RocketMQ官方文档

不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。
Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码。

除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题:

(1)队列只能分给组内一个消费者消费,也就无法通过扩展消费者的数量来提升消费能力;
(2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀;
(3)如果某个消费者hang主,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压;

在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡。

消息粒度负载均衡

对于PushConsumer和SimpleConsumer类型的消费者,默认且仅使用消息粒度负载均衡策略。

注:图片来自RocketMQ官方文档

消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给组内多个消费者共同消费。

消息粒度负载均衡策略保证同一个队列的消息可以被组内多个消费者共同处理,但是该策略使用的消息分配算法结果是随机的,不能指定消息被哪一个特定的消费者处理。当消费者获取到某条消息后,服务端会对该消息加锁,保证该消息对其他消费者不可见,直到消息消费成功或者超时,所以多个消费者同时消费同一个消息队列中的消息,服务端也可以保证消息不会被多个消费者重复消费。

消息粒度负载均衡策略适用于绝大多数在线处理的业务场景。

Pop消息消费

首先客户端(消费者)向服务端(Broker)发送Pop请求,Broker端收到请求后以Pop模式获取消息,之后返回给客户端,客户端消费消息成功之后,向Broker发送ACK请求确认消息消费成功。

当POP出一条消息之后,这条消息就会在一段时间内不可见,在这个时间段内,这条消息不会再被POP出来,如果在这个期间未能收到该消息的ACK请求,过了这个不可见的时间之后,消息就会恢复可见状态,重新被消费。

POP的消费位点由Broker保存和控制,并且POP模式可以使多个消费者端消费同一个消息队列中的消息,消费者端不再需要在本地做负载均衡分配消息队列,只需要调用服务端提供的POP接口获取消息进行消费即可,即便某个消费者hang住,其他消费者依旧可以继续消费队列中的数据,不会造成消息堆积。

POP消息在Broker端的实现

  1. Broker端在处理POP请求时,先在队列维度加锁,保证同一时间只有一个消费者可以从该队列中获取消息;

  2. Broker端会从队列中获取一批消息,并构建这批消息对应的CheckPoint信息保存在Broker中,之后会与ACK的消息进行匹配;
    CheckPoint主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId等信息。

  3. CheckPoint会优先保存在内存中,如果在一段时间内收到了客户端的ACK消息,就会将对应的CheckPoint清除,并更新消费进度;

  4. 对于一段时间内为收到ACK消息的CheckPoint,会将其从内存中删除,然后发送到延时主题SCHEDULE_TOPIC_XXXX中,到达延时时间之后,消息会再被转发到REVIVE_TOPIC(会使用REVIVE_LOG_ + 集群名称作为主题)中,有一个线程去处理REVIVE_TOPIC中的数据,将里面的消息拉取放入到一个
    MAP中,如果后续收到对应的ACK消息,则会更新REVIVE_TOPIC主题中的消费位点标识消息消费完成,如果过了一定时间依旧未收到对应的ACK消息,会查找这个CheckPoint对应的真实消息,将其放入到重试队列中,等待客户端消费,所以消费者消费的时候有一定概率可以消费到重试队列中的消息。

由于一个消息队列中的消息可以被多个消费者消费,如果某个消费者在消费某条消息之后一直未发生ACK消息,那么Broker是如何管理消费进度的,比如队列1中有1、2、3、4、5条消息,此时有三个消费者1、2、3,分别分配到了队列中的1、2、3条消息,此时消费者1已经对消息1ACK完毕,消费者3也对消息3ACK完毕,消费者2一直未ACK消息2,那么Broker如何设置消费进度?

个人认为,在一段时间内消息2对应的CheckPoint未匹配到对应的ACK消息,为了保证消费可以继续向后消费消息,应该会推进消费进度跳过这个消息,对于消息2,会按照超时处理逻辑,将其对应的CheckPoint先放入延时队列,再放入REVIVE_TOPIC中,之后等待ACK,如果之后一直还未收到ACK再将其放入重试队列,等待重新消费。

参考
RocketMQ官方文档

RocketMQ 5.0 POP 消费模式探秘

标签:5.0,消费,消费者,队列,Broker,Pop,ACK,消息,RocketMQ
From: https://www.cnblogs.com/shanml/p/17726682.html

相关文章

  • 遥遥领先!青否数字人直播系统5.0发布,支持真人接管实时驱动!
    副标题:口播视频批量制作+7*24小时直播全套解决方案。正文:青否数字人SaaS系统5.0正式发布,王炸更新!提供口播视频批量制作+7*24小时直播全套解决方案。同时直播间支持真人开麦/输入文字选择音色接管,实时驱动直播间数字人回复。7*24小时直播青否数字人客户端选择克隆好的数字人主播,克隆......
  • 【RcoketMQ】RcoketMQ 5.0新特性(一)- Proxy
    为了向云原生演进,提高资源利用和弹性能力,RcoketMQ在5.0进行了架构的调整与升级,先来看新特性之一,增加了Proxy层。增加Proxy代理层计算存储分离计算存储分离是一种分层架构,将计算层与存储层分开。计算层指的是一些消耗计算资源的功能模块比如协议解析、消费管理等,存储指的是数据......
  • PCIe 5.0 SSD四合一!峰值带宽高达64GB/s
    华硕低调发布了新一代HyperM.2x16Gen5扩展卡,可以单卡安装最多四块PCIe5.0x4SSD。它采用了PCIe5.0x16系统级接口和服务器级PCB,可以提供64GB/s的峰值带宽,每块都可以跑到16GB/s。当然目前还没有能做到这一点的SSD,而考虑到带宽损耗,实际上也不可能做到。四块SSD均支持2242......
  • 【RocketMQ】RocketMQ存储结构设计
    CommitLog生产者向Broker发送的消息,会以顺序写的方式,写入CommitLog文件,CommitLog文件的根目录由配置参数storePathRootDir决定,默认每一个CommitLog的文件大小为1G,如果文件写满会新建一个CommitLog文件,以该文件中第一条消息的偏移量为文件名,小于20位用0补齐:比如第一个文件中第一......
  • 华硕推出Hyper M.2 x16 Gen5扩展卡:可装四个PCIe 5.0 SSD
    华硕在官网上架了HyperM.2x16Gen5扩展卡,是原有HyperM.2x16Gen4扩展卡的迭代产品,在带宽上实现了翻倍,可达到512Gbps。据介绍,HyperM.2x16Gen5扩展卡可安装四个PCIe5.0SSD,支持M.22242、2260、2280和22110规格的产品,这些M.2插槽都带有Q-Latch便捷卡扣,便于用户安装和拆卸......
  • popen用法
    函数名中的popen是一个标准C库函数,用于创建一个管道并启动另一个进程来执行一个shell命令。popen返回一个文件指针,可以用于读取或写入子进程的标准输入或输出流。#include<stdio.h>FILE*popen(constchar*command,constchar*mode);intpclose(FILE*stream);......
  • 【RocketMQ】Dledger模式下的日志复制
    RocketMQ在开启Dledger时,使用DLedgerCommitLog,其他情况使用的是CommitLog来管理消息的存储。在Dledger模式下,消息写入时Leader节点还需要将消息转发给Follower节点,有过半的节点响应成功,消息才算写入成功。Leader消息写入Dledger下有DLedgerMemoryStore(基于内存存储)和DLedgerMmap......
  • 2023-10-4 使用Arduino为esp8266烧录ps4 5.05适合的固件
    2023-10-4使用Arduino为esp8266烧录ps45.05适合的固件其实这是个伪需求,但都在我琢磨所有之后才发现,goldhen2.1之后的大版本对于505来说都是没什么实质意义,反而会引起死机等情况。想玩的游戏等降级补丁即可。当然本文记录如何通过arduino烧录你想要的插件1.解决:1-1.A......
  • PHP_POP链
    POP链审计做一题简单的POP链的题目来学习一下POP链的构造。<?php//flagisinflag.phphighlight_file(__FILE__);error_reporting(0);classModifier{private$var;publicfunctionappend($value){include($value);echo$flag;}......
  • Adobe_Photoshop_2024_25.0.0.37图文安装教程及下载
    Adobe_Photoshop_2024正式版,拥有之前beta版本的全部功能,包括但不限于内置AI绘图,一键抠图、移除工具、悬浮工具栏、图像扩展、填充式生成、调整预设等等。尤其是“生成式填充”和“生成式扩展”。除此之外,PS2024正式版还内置了NeuralFilters神经AI滤镜,这款插件用于图片的处理,它......