首页 > 其他分享 >RabbitMQ高级特性 - 消息分发(限流、负载均衡)

RabbitMQ高级特性 - 消息分发(限流、负载均衡)

时间:2024-08-03 22:00:24浏览次数:11  
标签:负载 QOS 消费者 RabbitMQ 限流 MQConst 消息 message channel

文章目录

RabbitMQ 消息分发


概述

RabbitMQ 的队列在有多个消费者订阅时,默认会通过轮询的机制将消息分发给不同的消费者,但是有些消费者消费速度慢,有些消费者消费速度快,就会导致消费速度慢的消费者影响整个的任务的吞吐量下降.

例如,公司有1个正式员工和1个实习生,现在有 10 个任务分配平均给他们(各 5 个),而由于实习生干活比较慢,就会导致整个完成任务的吞吐量下降.

消息分发机制给 “正式工” 多分一些任务,给 “实习生” 少分一些任务.

如何实现消费分发机制(限制每个队列消息数量)

可以在配置文件中配置 prefetchCount(或者使用原生的 channel.basicQos(int prefetchCount)),来限制当前消息通道上(channel)的每一个消费所能保持的最大未确认消息的数量.

例如 prefetchCount 为 10,并且一个 channel 上有两个消费者,那么每个消费者都最多接收 10 条未确认的消息. 此时整个 channel 上未确认消息总数可能达到 20 条.

具体使用:例如配置 prefetch = 5,那么 RabbitMQ 就会为消费者计数. 发送一条消息计数+1,消费一条消息计数-1,当达到了上限5,mq队列 就不会再发送消息,直到消费者确认了某条消息(类似 TCP 中的华滑动窗口).

使用场景

限流

背景

假设,订单系统每秒最多处理 1000 请求,正常情况下,该订单系统可以满足日常使用.
但是在突发的秒杀场景下,请求瞬间增多,每秒 1w qps,这不得把订单系统打成筛子.

问题:mq 在中间的话,不是已经有削峰填谷的作用了么?为什么还要使用 mq 的 prefetch 限流机制?
尽管消息队列可以延缓高峰压力,但消费者的处理能力还是有限的(如果不配置 prefetch,消费者自身从队列中取消息的量是不可控的). 如果消费者一次性取走过多的消息,就可能会导致资源紧张. prefetch 限流就是用来控制每个消费者取消息的数量,确保消费者不会过载.

实现 demo

假设限制未确认消息上限为 5,发送消息数量为 20.

a)配置 prefetch 参数,设置应答方式为手动应答.

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手动确认
        prefetch: 5

b)配置交换机队列

@Configuration
class MQConfig {

    @Bean
    fun transQueue() = Queue(MQConst.TRANS_QUEUE)

    @Bean
    fun qosExchange() = DirectExchange(MQConst.QOS_EXCHANGE)
    @Bean
    fun qosQueue() = Queue(MQConst.QOS_QUEUE)
    @Bean
    fun qosBinding(): Binding = BindingBuilder
        .bind(qosQueue())
        .to(qosExchange())
        .with(MQConst.QOS_BINDING_KEY)

}

c)接口(生产者)

@RestController
@RequestMapping("/mq")
class MQApi(
    val rabbitTemplate: RabbitTemplate,
) {
    @RequestMapping("/qos")
    fun qos(): String {
        for (i in 1..20) {
            rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")
        }
        return "ok"
    }

}

d)消费者

@Component
class QosListener {

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            // 这里不主动应答,模拟超长业务
            // channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

}

e)效果如下:
可以观察到,消费者只接收到 5 个消息,但由于没有主动应答,队列 就不会给消费者发送新的消息.
在这里插入图片描述
在这里插入图片描述

Ps:此时如果直接关闭程序,这 5 个为应答的消息就会重回队列,成为 Ready 状态.
如下可以直接清理掉这些消息:
在这里插入图片描述

非公平发送(负载均衡)

背景

假设有两个消费者,mq 默认会按照轮询的策略将消息分发给消费者.

*但有一个中情况就比较尴尬:打个比方 一个是正式工,另一个是实习生,正式工就处理的很快,而实习生就很慢,就会造成整个任务的进度被拖慢. *

因此我们可以通过 负载均衡 的方式,让处理的快的消费者多处理一些,处理慢的消费者少处理一些.

具体的:只需要配置 prefetch,并开启自动应答即可. 这样一来,处理的快的消费者,自动应答的就更快,接收的消息也就更多.

实现 demo

a)配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手动确认
        prefetch: 1 # 具体配置为多少,需要根据实际业务以及系统承受能力(压测)

b)生产者

    @RequestMapping("/qos")
    fun qos(): String {
        for (i in 1..20) {
            rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")
        }
        return "ok"
    }

c)两个消费者

@Component
class QosListener {

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun fastHandMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            Thread.sleep(1000)
            println("正式工: 任务处理完成!")
            channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun slowHandMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            Thread.sleep(2000)
            println("实习生: 任务处理完成!")
            channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

}

d)效果如下:
在这里插入图片描述

在这里插入图片描述

标签:负载,QOS,消费者,RabbitMQ,限流,MQConst,消息,message,channel
From: https://blog.csdn.net/CYK_byte/article/details/140891895

相关文章

  • Rabbitmq的几种工作模式
    工具类publicclassRabbitMQConnection{publicstaticConnectiongetConnection()throwsException{//1.创建connectionFactoryConnectionFactoryconnectionFactory=newConnectionFactory();//2.配置HostconnectionFactory.......
  • Rabbitmq中的死信队列
    背景        RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。原理        死信队列和普通队列区别不是很大        普通与死信队列都有自己独立的交换机和路由ke......
  • RabbitMQ知识总结(基本原理+高级特性)
    文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/基本原理消息的可靠性投递RabbitMQ消息的投递路径为:生产者------>交换机------>队列------>消费者在Ra......
  • 【VMware VCF】VMware Cloud Foundation Part 06:部署 VI 工作负载域。
    VMwareCloudFoundation标准架构中,管理域和VI工作负载域需要分开部署,管理域是初始构建(Bring-up)中部署的一个工作负载域并且只有一个,管理域专门用于承载管理相关组件虚拟机。之前文章(VMwareCloudFoundationPart05:部署SDDC管理域。)已经完成了管理域的相关部署,现在我们需要......
  • Java/SpringCloud/RabbitMq/无感实现消息携带用户信息 --- 逻辑讲解、代码实现、图形
    一、需求:依据黑马商城hmall为例,用户下单创建订单后,交易服务trade-service向交换机topic发送消息,交换机topic路由到队列,购物车服务cart-service监听此队列,实现自动清空购物车。改造下单功能,将基于OpenFeign的清理购物车同步调用,改为基于RabbitMQ的异步通知:定义t......
  • .NET使用RabbitMQ发送消息
    usingRabbitMQ.Client;usingSystem;usingSystem.Collections.Generic;usingSystem.Linq;usingSystem.Text;usingSystem.Threading.Tasks;namespaceCommon{publicclassRabbitMQSender{privatestaticRabbitMQSenderinstance;pr......
  • 检测Linux服务器CPU、内存、负载、IO读写、机房带宽和服务器类型的脚本
    脚本内容:#!/usr/bin/envbash####RED='\033[0;31m'GREEN='\033[0;32m'YELLOW='\033[0;33m'SKYBLUE='\033[0;36m'PLAIN='\033[0m'about(){ echo"" echo"=============================......
  • rabbitmq单节点部署
    一:前置条件需要安装erlang#注意安装方式需要科学网络,不适用于离线安装1:配置erlang安装条件curl-shttps://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh|sudobash2:安装erlangyuminstall-yerlang3:检查erlang版本erl-v二:安装RabbitMQ1......
  • 借助 NGINX 对本地的 Kubernetes 服务进行自动化的 TCP 负载均衡
    原文作者:ChrisAkker-F5技术解决方案架构师,SteveWagner-F5NGINX解决方案架构师原文链接:借助NGINX对本地的Kubernetes服务进行自动化的TCP负载均衡转载来源:NGINX中文官网NGINX唯一中文官方社区,尽在 nginx.org.cn作为一名现代应用开发人员,您不仅使用一......
  • 什么是负载均衡?
    在当今数字化时代,随着网络应用和服务的不断发展,负载均衡已成为构建高性能、高可用性系统的关键技术之一。什么是负载均衡?负载均衡,从字面上理解,就是将负载(工作任务、网络流量等)均匀地分配到多个资源(服务器、网络链路等)上,以达到优化资源利用、提高系统性能和可靠性的目的。在一个......