首页 > 其他分享 >RabbitMQ的队列模式你真的懂吗

RabbitMQ的队列模式你真的懂吗

时间:2024-09-11 23:25:54浏览次数:10  
标签:NAME 队列 RabbitMQ 懂吗 消息 交换机 channel String

0 前言

官网描述六类工作队列模式:

  1. 简单队列模式:最简单的工作队列,一个消息生产者,一个消息消费者,一个队列。另称点对点模式
  2. 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。也称点对点模式
  3. 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者
  4. 路由模式:基于发布/订阅模式,有选择的接收消息,即通过 routing 路由进行匹配条件是否满足接收消息
  5. 主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比第四类更灵活
  6. RPC模式:拥有请求/回复的。也就是有响应的,这是其它都没的

1 简单队列模式

1 实现功能

一个生产者 P 发送消息到队列 Q,一个消费者 C 接收:

Pro

Pro负责创建消息队列,并发送消息入列:

  1. 获取连接
  2. 创建通道
  3. 创建队列声明
  4. 发送消息
  5. 关闭队列
public class Producer {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection newConnection = MQConnectionUtils.newConnection();
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "我是生产者生成的消息";
        System.out.println("生产者发送消息:" + msg);
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        channel.close();
        newConnection.close();
    }
}

Con

  1. 获取连接
  2. 获取通道
  3. 监听队列
public class Customer {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("002");
        Connection newConnection = MQConnectionUtils.newConnection();
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

创建vhost

2 工作队列模式

将耗时的任务分发给多个消费者(工作者)。

主要解决:处理资源密集型任务,且还要等他完成。有了工作队列,就可将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可取出消息并完成工作。若启动了多个工作进程,则工作就可在多个进程间共享。

工作队列也称公平性队列模式,循环分发,若有两个消费者,默认RabbitMQ按序将每条消息发给下一个 Con,每个消费者获得相同数量的消息,即轮询。

Pro

创建50个消息

public class Producer2 {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection newConnection = MQConnectionUtils.newConnection();
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         /**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
        channel.basicQos(1);
        for (int i = 1; i <= 50; i++) {
            String msg = "生产者消息_" + i;
            System.out.println("生产者发送消息:" + msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        channel.close();
        newConnection.close();
    }
}

Con

public class Customer2_1 {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("001");
        Connection newConnection = MQConnectionUtils.newConnection();
        final Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

循环分发

启动生产者

启动两个消费者

Pro发送了50条消息进入队列,而上方消费者启动图里很明显的看到轮询的效果,就是每个消费者会分到相同的队列任务。

公平分发

由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。如一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,咋办?

发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似UDP,面向无连接。可用 basicQos,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。

final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);

消息持久化

背景

上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。

当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。咋办?

参数配置

参数配置一:生产者创建队列声明时,修改第二个参数为 true

/**3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN

for (int i = 1; i <= 50; i++) {
    String msg = "生产者消息_" + i;
    System.out.println("生产者发送消息:" + msg);
    channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}

小结

  • 循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息
  • 消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会丢失
  • 公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题

3 发布订阅模式

工作队列模式是直接在生产者与消费者里声明好一个队列,消息就只会对应同类型的消费者。这种只处理同种类型的消息有弊端。

3.1 案例

门户网站,用户注册完后一般都会发送消息通知用户注册结果。如在一个系统中,用户注册信息有邮箱、手机号,在注册完后会向邮箱和手机号都发送注册完成信息。

利用 MQ 实现业务异步处理,若用工作队列,就声明一个注册信息队列。注册完成后生产者向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息。但实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应放在一块处理。

这时就可利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者:

只需简单的将队列绑定到交换机。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列。就像子网广播,每台子网内的主机都获得一份复制的消息。

3.2 啥是发布订阅模式

可将消息发送给不同类型的消费者。即发布一次,消费多个:

X表示交换机、红色表示队列。

展示邮件、短信的例子,通过绑定到一个交换机,但是

3.3 实战

public class ProducerFanout {

    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        /** 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /** 2.创建通道 */
        Channel channel = connection.createChannel();
        /** 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /** 4.发送消息 */
        for (int i = 0; i < 10; i++)
        {
            String message = "用户注册消息:" + i;
            System.out.println("[send]:" + message);
          	// 第二个参数为空类似于表示全局广播,只要绑定到该队列上的消费者理论上是都可收到
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
            try {
                Thread.sleep(5 * i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /** 5.关闭通道、连接 */
        channel.close();
        connection.close();
        /** 注意:如果消费没有绑定交换机和队列,则消息会丢失 */
    }
}

邮件消费者

public class ConsumerEmailFanout {

    private static final String QUEUE_NAME = "consumerFanout_email";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者启动");
        /* 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.创建通道 */
        Channel channel = connection.createChannel();
        /* 3.消费者关联队列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
      	// 第三个参数置为空时,可以接收到生产者所有的消息(生产者 routingKey 参数为空时)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取生产者消息:" + msg);
            }
        };
        /* 5.消费者监听队列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

短信消费者

public class ConsumerSMSFanout {

    private static final String QUEUE_NAME = "ConsumerFanout_sms";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者启动");
        /* 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.创建通道 */
        Channel channel = connection.createChannel();
        /* 3.消费者关联队列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
      	// 第三个参数置为空时,可接收到生产者所有的消息(生产者 routingKey 参数为空时)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取生产者消息:" + msg);
            }
        };
        /* 5.消费者监听队列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

运行

先运行两个con,再运行pro。如没有提前将队列绑定到交换机,直接运行pro,消息是不会发到任何队列里的。

生产者

短信消费者

邮件消费者

小结

相比工作模式,发布订阅模式引入了交换机,类型上更灵活。

pro不是直接操作队列,而是将数据发给交换机,由交换机将数据发给与之绑定的队列。从不加特定参数的运行结果中可以看到,两种类型的消费者(email,sms)都收到相同数量消息。

必须声明交换机,并设置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),fanout 指分发模式(将每一条消息都发送到与交换机绑定的队列)

队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

生产者发送消息到交换机,多个消费者声明多个队列,与交换机进行绑定,队列中的消息可以被所有消费者消费,类似QQ群消息

4 路由模式

就是发布订阅模式(Publish/Subscribe Pattern)中的直连交换机(Direct Exchange)。一种基于路由键(Routing Key)来路由消息的模式。在这种模式下,生产者发送消息时会指定一个路由键,交换机会根据这个路由键将消息路由到与之匹配的队列。

Pro

使用 channel.basicPublish 方法发送消息,并指定交换机名称和路由键。交换机会根据路由键将消息路由到与之匹配的队列。

Con

在消费者代码中,我们声明了一个直接交换机(direct 类型),并绑定了一个队列。在绑定队列时,我们使用 channel.queueBind 方法,并指定交换机名称、队列名称和路由键。交换机会根据路由键将消息路由到与之匹配的队列。

特点

  • 路由键匹配:消息的路由键必须与队列绑定的路由键完全匹配,才能将消息路由到该队列。
  • 直接交换机:直接交换机根据路由键进行精确匹配,适用于需要精确控制消息路由的场景。

通过这种方式,路由模式可以实现基于路由键的精确消息路由,适用于需要将消息发送到特定队列的场景。

5 主题模式

属于发布订阅模式的TopicExchange(主题交换机)。Queue 通过 routing key 绑定到 TopicExchange,当消息到达TopicExchange后,TopicEkchange 根据消息的 routing key 将消息路由到一个或者多个Queue。

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。

各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。

负责:

  • 中央/分销预订系统性能优化
  • 活动&券等营销中台建设
  • 交易平台及数据中台等架构和开发设计
  • 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
  • LLM Agent应用开发
  • 区块链应用开发
  • 大数据开发挖掘经验
  • 推荐系统项目

目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

本文由博客一文多发平台 OpenWrite 发布!

标签:NAME,队列,RabbitMQ,懂吗,消息,交换机,channel,String
From: https://www.cnblogs.com/JavaEdge/p/18409222

相关文章

  • 单调队列优化 DP
    单调队列优化DP回忆单调队列的作用,\(O(n)\)求出每一个大小为\(K\)的窗口中的最大、最小值。以最大值为例,我们可以得到如下DP转移方程:\[dp[i]=\max(val[j])+base[i],i-j\leqK\]其中\(base[i]\)是一个仅与\(i\)有关的式子,不受\(j\)影响,且可以预处理得到;而\(val[j]......
  • 单调队列优化 dp
    1.概念单调队列优化的本质是借助单调性,及时排除不可能的决策,保持候选集合的秩序性。2.例题P1714切蛋糕题目大意:给定一个序列,找出长度不超过\(m\)的连续子序列,使得子序列中所有数的和最大。思路:要求区间和,首先求出前缀和,然后考虑朴素dp,不难想到用\(dp[i]\)表示包含......
  • RabbitMQ备忘录
    介绍RabbitMQ是一个开源的消息代理软件,支持多种消息协议。它允许不同的应用程序通过消息队列进行通信,促进了系统之间的解耦和异步处理。1.解耦解耦是指将系统中的不同组件分离,使它们可以独立开发和部署。RabbitMQ通过消息队列实现了解耦,生产者和消费者不需要直接知道彼此的存......
  • RabbitMQ的 RPC 消息模式你会了吗?
    前文学习了如何使用工作队列在多个工作者之间分配耗时的任务。若需要在远程计算机上运行一个函数并等待结果呢?这种模式通常被称为远程过程调用(RPC)。本节使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有耗时的任务可以分配,因此我们将创建一......
  • RabbitMQ的 RPC 消息模式你会了吗?
    前文学习了如何使用工作队列在多个工作者之间分配耗时的任务。若需要在远程计算机上运行一个函数并等待结果呢?这种模式通常被称为远程过程调用(RPC)。本节使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有耗时的任务可以分配,因此我们将创建一......
  • 基于数组的循环队列
    基于数组的循环队列关键点在于:当元素总数超过队列长度后,出队、入队等行为如何避免数组越界问题。环绕数组的逻辑结构确实可以类比时钟,当指针走到最后一个刻度(比如12小时制的12点),再往前走时,指针会回到最开始的刻度(即1点),而不是继续前进到一个不存在的位置。 以12小时制时钟为......
  • 进程间通信之消息队列详解
    目录一、什么是消息队列?二、消息队列的优缺点优点:缺点:三、消息队列的实现原理四、消息队列的使用场景五、消息队列的编程实现(C语言示例)1.创建消息队列2.发送消息3.接收消息4.删除消息队列六、总结        在现代操作系统中,进程间通信(IPC)是一个至关......
  • Go语言中的队列与栈:基础与实践
    在日常编程中,数据结构是不可或缺的一部分。无论是开发复杂的系统,还是编写小型工具,选择合适的数据结构都能显著提高程序的效率和可读性。在Go语言中,队列和栈是两种常用的基础数据结构。本文将详细介绍如何在Go中实现队列与栈,并补充一些扩展内容,以帮助大家更好地理解和应用这......
  • 代码随想录day 10-栈和队列2
    题目1150.逆波兰表达式求值给你一个字符串数组tokens,表示一个根据逆波兰表示法表示的算术表达式。请你计算该表达式。返回一个表示表达式值的整数。注意:有效的算符为'+'、'-'、'*'和'/'。每个操作数(运算对象)都可以是一个整数或者另一个表达式。两个整数之间的除......
  • 代码随想录day9-栈和队列1
    题目1232.用栈实现队列请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(push、pop、peek、empty):实现MyQueue类:voidpush(intx)将元素x推到队列的末尾intpop()从队列的开头移除并返回元素intpeek()返回队列开头的元素booleanempty()如......