首页 > 其他分享 >一文讲透 RocketMQ 消费者是如何负载均衡的

一文讲透 RocketMQ 消费者是如何负载均衡的

时间:2023-05-01 22:33:59浏览次数:38  
标签:负载 消费 消费者 队列 消息 均衡 讲透 RocketMQ

RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。

集群消费同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。

广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

我们重点讲解下集群消费的消费流程 ,因为集群消费是使用最普遍的消费模式,理解了集群消费,广播消费也就能顺理成章的掌握了。

集群消费示例代码里,启动消费者,我们需要配置三个核心属性:消费组名订阅主题消息监听器,最后调用 start 方法启动。

消费者启动后,我们可以将整个流程简化成:

4 负载均衡

消费端的负载均衡是指将 Broker 端中多个队列按照某种算法分配给同一个消费组中的不同消费者

负载均衡是每个客户端独立进行计算,那么何时触发呢 ?

  • 消费端启动时,立即进行负载均衡;

  • 消费端定时任务每隔 20 秒触发负载均衡;

  • 消费者上下线,Broker 端通知消费者触发负载均衡。

负载均衡流程如下:

1、发送心跳

消费者启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(消息消费分组名称订阅关系集合消息通信模式客户端实例编号等信息)。

Broker 端在收到消费者的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量 consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量 channelInfoTable 中,为之后做消费端的负载均衡提供可以依据的元数据信息。

2、启动负载均衡服务

下图展示了按照主题负载均衡的代码片段:

负载均衡服务会根据消费模式为”广播模式”还是“集群模式”做不同的逻辑处理,这里主要来看下集群模式下的主要处理流程:

(1) 获取该主题下的消息消费队列集合;

(2) 查询 Broker 端获取该消费组下消费者 Id 列表;

(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;

这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range ,最后遍历整个 range 而计算出当前消费端应该分配到的记录。

(4) 分配到的消息队列集合与 processQueueTable 做一个过滤比对操作

消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的消费快照。

标红的部分表示与分配到的消息队列集合互不包含,则需要将这些红色队列 Dropped 属性为 true , 然后从 processQueueTable 对象中移除。

绿色的部分表示与分配到的消息队列集合的交集,processQueueTable 对象中已经存在该队列。

黄色的部分表示这些队列需要添加到 processQueueTable 对象中,创建这些队列的消费快照。最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进入拉取消息环节。

标签:负载,消费,消费者,队列,消息,均衡,讲透,RocketMQ
From: https://www.cnblogs.com/makemylife/p/17367118.html

相关文章

  • RocketMQ(一):基本概念
    RocketMQ官方文档地址:RocketMQ官网文档地址。一、什么是RocketMQRocketMQ是一款分布式、队列模型的消息中间件。二、RocketMQ的基本概念2.1、Topic-主题Topic是RocketMQ中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息,通过TopicName来做唯一标识和区......
  • 服务器之各种压力测试-网络、硬盘、redis、rocketmq等
    网络测试-iperf安装工具yuminstalliperf-y服务端iperf-s-p12345-i1-M客户端iperf-c服务端ip-p12345-i1-t50-w20K结果如下:......
  • Nginx七层反向代理和负载均衡
    Nginx七层反向代理和负载均衡介绍Nginx不仅是一个出色的web软件,其七层代理和负载均衡也是相当出色。Nginx做前端代理,当用户请求服务时,可以根据url进行判断,然后分配到不同的后台webserver上。Nginx的负载均衡实现原理:首先在http模块中配置使用upstream模块定义后台的webserver的池......
  • 久壳教你F5负载均衡设备故障怎么处理!
    2022年,我司技术工程师在进行第四季度设备巡检时,就在客户公司发现了一个负载均衡设备故障。l 确认问题 我们发现有一台F5负载均衡设备,状态出现异常,出现所有的业务健康检查均未通过、设备HA状态异常现象,如下图所示: 经确认,1.1.1.2与1.1.1.1两台设备互为主备:在故障出现前,1.1.1.......
  • 压力测试与负载测试
    什么是压力测试 顾名思义:压力测试,就是 被测试的系统,在一定的访问压力下,看程序运行是否稳定/服务器运行是否稳定(资源占用情况)比如:2000个用户同时到一个购物网站购物,这些用户打开页面的速度是否会变慢,或者网站是否会奔溃 做压力测试的常用工具做压力测试,一般要使用工具,人......
  • 【kafka】-分区-消费端负载均衡
    一.为什么kafka要做分区?因为当一台机器有可能扛不住(类比:就像redis集群中的redis-cluster一样,一个master抗不住写,那么就多个master去抗写),把一个队列的单一master变成多个master,即一台机器扛不住qps,那么我就用多台机器扛qps,把一个队列的流量均匀分散在多台机器上不就可以了么。 ......
  • nginx负载均衡
     //默认为轮询,权重默认值为1upstreamservers{server192.168.1.101:80weight=4down;//不参与随机server192.168.1.102:80weight=2;server192.168.1.102:80weight=1backup;//备用}ip_hash:根据客户端的IP地址转发同一台服务器,可以保存会话。least_co......
  • No qualifying bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' av
    2023-04-2418:50:39.372WARN26732---[main]ConfigServletWebServerApplicationContext:Exceptionencounteredduringcontextinitialization-cancellingrefreshattempt:org.springframework.beans.factory.BeanCreationException:Errorcreating......
  • 一文详解RocketMQ-Spring的源码解析与实战
    摘要:这篇文章主要介绍SpringBoot项目使用rocketmq-springSDK实现消息收发的操作流程,同时笔者会从开发者的角度解读SDK的设计逻辑。本文分享自华为云社区《RocketMQ-Spring:实战与源码解析一网打尽》,作者:勇哥java实战分享。RocketMQ是大家耳熟能详的消息队列,开源项目......
  • SpringCloud 微服务 负载均衡问题 坑死老子了!(铁大软工刘雪丰)
    fetch-registry:true是默认的,刚开始学,你会发现虽然能运行,但是会抛异常,所以我就改成false了。改为false确实不抛异常。但是!!!,如果用负载均衡改进代码,就必须设为true,因为它会报错:Noinstancesavailablefor...,连运行都运行不了。我在网上找了很久,防火墙,依赖重复,依赖版本等方法我都试......