首页 > 其他分享 >十四、消费模式和负载均衡策略

十四、消费模式和负载均衡策略

时间:2023-07-23 20:33:09浏览次数:37  
标签:负载 消费 消费者 组内 模式 消息 均衡 十四 策略

消费模式

RocketMQ消费者都是以消费组去消费消息。如果多个消费者设置了相同的Consumer Group,我们认为这些消费者在同一个消费组内。

在 Apache RocketMQ 有两种消费模式,分别是:

集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。

广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。

广播消费模式适用于每条消息需要被消费组的每个消费者处理的场景,也就是说消费组内的每个消费者都会收到订阅Topic的全量消息,因此即使扩缩消费者数量也无法提升或降低消费能力,具体示例如下图所示。

消费模式由MessageModel 枚举表示:

public enum MessageModel {
    BROADCASTING("BROADCASTING"),
    CLUSTERING("CLUSTERING");

    private String modeCN;

    private MessageModel(String modeCN) {
        this.modeCN = modeCN;
    }

    public String getModeCN() {
        return this.modeCN;
    }
}

BROADCASTING是广播模式,CLUSTERING是集群模式。从DefaultMQPushConsumer构造函数可看出DefaultMQPushConsumer默认的消费模式是集群模式。

负载均衡策略

集群模式下,同一个消费组内的消费者会分担收到的全量消息,这里的分配策略是怎样的?如果扩容消费者是否一定能提升消费能力?

Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等,可以通过如下代码进行设置相应负载均衡策略:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());

默认的分配策略是平均分配,这也是最常见的策略。平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。

在平均分配的算法下,可以通过增加消费者的数量来提高消费的并行度。比如下图中,通过增加消费者来提高消费能力。

但也不是一味地增加消费者就能提升消费能力的,比如下图中Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力。

接下来看下平均队列负载均衡策略,首先将OldVersionConsumer的消费策略设置为集群模式

  consumer.setMessageModel(MessageModel.CLUSTERING);

新建Topic时默认读写队列各为16个,为了比较,发送消息时发送16条消息。在IDEA中启动两个消费者并发送16条消息:

两个消费者各消费8条数据。

 

再增加一个消费者,现在有三个消费者,重新发送16条消息:

第一个消费者消费了6条数据,第二个和第三个消费者各消费5条数据。

标签:负载,消费,消费者,组内,模式,消息,均衡,十四,策略
From: https://www.cnblogs.com/shigongp/p/17575805.html

相关文章

  • nginx+tomcat部署均衡+虚拟IP配置
    一、配置java环境更新数据源sudoaptupdate安装javasudoaptinstallopenjdk-java-8-jdk查看是否安装成功java-version二、Tomcat配置​ 直接访问tomcat官网(http://tomcat.apache.org/),下载需要的版本,我这里选择apache-tomcat-8.5.90版本操作如下:#安装需求的tomc......
  • 纳什均衡
    如何写收益矩阵两个参与方的收益矩阵具体例子:Amy和Bob是一对情侣,一起看足球Amy的收益是5,Bob收益是10,一起看芭蕾Amy的收益是10,Bob的收益是2,不一起行动的收益是0根据上面的信息我们可以写出如下收益矩阵:三个参与方的收益矩阵划线法找纯策略纳什均衡(两个参与方)挡左比横,挡......
  • 230722 做题记录 // 网络流二十四题 (1/24)
    知耻而后勇,物极必反。A.星际转移问题http://222.180.160.110:1024/contest/3952/problem/1如果就按照题目给的路线图,我们显然无法考虑到飞船到达的时刻。同时\(n\)和\(m\)又很小,我们就知道了,「人不能两次踏进同一条河流」,1时刻的站\(p\)和2时刻的站\(p\)也不能是......
  • PerfView专题 (第十四篇): 洞察那些 C# 代码中的短命线程
    一:背景1.讲故事这篇文章源自于分析一些疑难dump的思考而产生的灵感,在dump分析中经常要寻找的一个答案就是如何找到死亡线程的生前都做了一些什么?参考如下输出:0:001>!tThreadCount:22UnstartedThread:0BackgroundThread:1PendingThread:0DeadThread:......
  • nginx 负载均衡相关知识
    Nginx("enginex")是一个高性能的HTTP和反向代理服务器,也是一个IMAP/POP3/SMTP代理服务器。Nginx是由IgorSysoev为俄罗斯访问量第二的Rambler.ru站点开发的,第一个公开版本0.1.0发布于2004年10月4日。其将源代码以类BSD许可证的形式发布,因它的稳定性、丰富的功能集、......
  • crane-scheduler基于真实负载进行k8s调度
    介绍kubernetes的原生调度器只能通过资源请求来调度pod,这很容易造成一系列负载不均的问题:对于某些节点,实际负载与资源请求相差不大,这会导致很大概率出现稳定性问题。对于其他节点来说,实际负载远小于资源请求,这将导致资源的巨大浪费。为了解决这些问题,动态调度器根据实际的......
  • 伪负载均衡(拷贝多个流程
    数厂代码,未实现伪负载均衡//伪负载均衡/*;(()=>{letnodes=JSON.parse(JSON.stringify(data.nodes));letedges=JSON.parse(JSON.stringify(data.edges));//排查特殊节点并记录letstart_node=null,......
  • Nginx负载均衡配置
    Nginx负载均衡实现:需求:两台Web:192.168.1.2/192.168.1.3,nginx负载均衡服务器192.168.1.10Nginx服务器上实现web负载。 配置步骤:1.安装Nginx  (1)安装依赖包  yuminstall-ypcrepcre-developensslopenssl-develgccgccgcc-c++ncurses-develperl  use......
  • 《摆与混》第十四章--7月16日--周日
    周日,平淡却轻松;1.今天做了什么:今天10点起。洗漱后,觉得时间太晚就没有吃早餐,上午随便混了混,今天去外婆家吃饭(已经很久没去了),下午小小摆烂时间,学了会弥补一下上午,5点出发健身锻炼(周末也不断),晚上去了一家早就想去好吃的店,然后经典PTA,周末接着起飞呀!!!!2.解决了什么问题:Java课程推进,PTA......
  • 高级java高并发,高性能,分布式,高可用,负载均衡,系统架构实战
    提到锁,大家肯定想到的是sychronized关键字。是用它可以解决一切并发问题,但是,对于系统吞吐量要求更高的话,我们这提供几个小技巧。帮助大家减小锁颗粒度,提高并发能力。初级技巧-乐观锁乐观锁使用的场景是,读不会冲突,写会冲突。同时读的频率远大于写。悲观锁的实现: 悲观的认为所......