首页 > 其他分享 >“RabbitMQ入门指南:从入门到起飞,这一篇就够!打造高效消息通信系统的第一步“。

“RabbitMQ入门指南:从入门到起飞,这一篇就够!打造高效消息通信系统的第一步“。

时间:2024-06-12 20:57:54浏览次数:11  
标签:入门 队列 RabbitMQ 交换机 就够 msg public 消息

1.前言

        RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)的标准,并用Erlang语言编写。作为消息代理,RabbitMQ接收、存储和转发消息,帮助应用程序之间实现异步通信。它提供了一个强大而灵活的消息传递机制,可以在分布式系统中可靠地传递消息,确保消息的顺序性和可靠性。

        RabbitMQ的核心概念包括生产者、消费者、交换机、队列和绑定。生产者负责发送消息,消费者负责接收消息,交换机负责接收来自生产者的消息并将它们路由到一个或多个队列,队列存储消息直到消费者准备接收它们,而绑定则定义了交换机和队列之间的关系。

        RabbitMQ具有许多特性,包括可靠性、灵活的路由、集群和高可用性、可扩展性、管理界面、多种协议支持和可编程性。它被广泛应用于构建分布式系统中的消息队列、异步任务处理、日志收集、事件驱动架构等场景,是一个强大而受欢迎的消息中间件解决方案。

                1.1 前置知识

1. 同步通信 和 异步通信

        微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢? 

解读:

  • 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。

  • 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

同步通信:服务返回响应后才可以进行后续的操作。

存在的问题:

  • 扩展性差
    • 随着业务规模扩大,产品的功能也在不断完善。每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则(面向修改关闭,面向拓展开放),拓展性不好。

  • 性能下降
    • 我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和。

  • 级联失败 
    • 由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

      这其实就是同步调用的级联失败问题。

    • 比如:比如说支付成功,短信发送出现问题了,就给我们退款了。

    • 级联失败雪崩:由于一系列问题或错误的积累,最终导致系统或项目崩溃或失败的现象。

异步调用: 只发送通知,发送完就可以结束了,具体你有没有收到,什么时候收到,我不关心。

介绍:

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方

  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器

  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

优势:

  • 耦合度更低

  • 性能更好

  • 业务拓展性强

  • 故障隔离,避免级联失败

  • 消峰

    • 消峰的原理就是全部都放在消息队列,里面后续的业务慢慢的取

缺点:

  • 完全依赖于Broker的可靠性、安全性和性能

  • 架构复杂,后期维护和调试麻烦

        1.2 不同MQ之间的对比 

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

目比较常见的MQ实现:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

几种常见MQ的对比:

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

2. RabbitMQ的安装

        2.1 执行Docker命令

docker run -d \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_VHOST=default_vhost \
  -e RABBITMQ_DEFAULT_USER=default_user \
  -e RABBITMQ_DEFAULT_PASS=default_pass \
  --hostname my_rabbitmq \
  --name rabbitmq \
  rabbitmq
  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

参数说明:

  • docker run: 运行 Docker 容器的命令。

  • -d: 在后台运行容器,即以守护进程的方式运行容器。

  • -p 15672:15672 -p 5672:5672: 将容器的端口 15672(RabbitMQ 控制台 Web 界面的端口)和 5672(RabbitMQ 应用访问的端口)映射到主机的对应端口。这样可以通过主机的这些端口来访问 RabbitMQ。

  • -e RABBITMQ_DEFAULT_VHOST=my_vhost: 设置 RabbitMQ 默认的虚拟机名为 my_vhost。虚拟机(vhost)是 RabbitMQ 中用于隔离不同应用程序或服务的逻辑隔离单元。

  • -e RABBITMQ_DEFAULT_USER=admin: 设置 RabbitMQ 默认的用户名为 admin

  • -e RABBITMQ_DEFAULT_PASS=123456: 设置 RabbitMQ 默认的用户密码为 123456。

  • --hostname myRabbit: 指定容器的主机名为 myRabbit。在 RabbitMQ 中,节点名称被用于存储数据,而默认情况下会使用主机名。因此,在此设置了容器的主机名。

  • --name rabbitmq: 设置容器的名称为 rabbitmq

  • rabbitmq: 指定要使用的容器镜像为 rabbitmq,这是 RabbitMQ 官方提供的 Docker 镜像。

综上所述,该命令的作用是以后台方式启动一个 RabbitMQ 容器,配置了默认的虚拟机名、用户名和密码,并将容器的端口映射到主机上,使得可以通过主机访问 RabbitMQ 控制台和应用服务。

        2.2 设置开机自启动

docker update rabbitmq --restart=always

         2.3 启动 rabbitmq_management

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

 为什么要启动这个:

这个命令的作用是在名为“rabbitmq”的Docker容器中启用RabbitMQ管理插件。具体来说,它启用了RabbitMQ的管理插件,这个插件提供了一个Web界面,可以通过浏览器来管理RabbitMQ服务器。
通常情况下,启用管理插件是为了方便地监控和管理RabbitMQ服务器,可以通过浏览器访问http://<RabbitMQ服务器的IP地址>:15672来打开RabbitMQ的管理界面。

3. RabbitMQ 核心模块介绍

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方 (发送给交换机)

  • consumer:消费者,也就是消费消息的一方(和队列进行绑定(监听))

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

    • 交换机只能路由消息,无法存储消息
    • 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定 
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,因为RabiitMQ性能很强,单个项目使用会造成巨大的浪费,所以多个项目,实现一套MQ,virtual host就是为了不同交换机产生隔离(和容器概念一样)

4. 数据隔离 

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名

  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限

  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。

  • 给每个项目创建不同的virtual host,将每个项目的数据隔离。

5.SpringAMQP 

        Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 框架的一个模块,用于与 AMQP(Advanced Message Queuing Protocol)兼容的消息中间件进行集成。AMQP 是一种消息协议,用于在分布式应用程序之间传递消息。

        将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

        但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

5. 1 导入依赖

<!--    引入SpringAMQP的依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

5. 2 控制台新建一个队列

5. 3 消息发送

首先配置MQ的地址,在配置文件中添加配置

spring:
  rabbitmq:
    username: windStop  # RabbitMQ用户名
    host: 8.130.10.216  # RabbitMQ主机地址
    password: 123       # RabbitMQ密码
    port: 5672          # RabbitMQ端口号
    virtual-host: /windStop  # RabbitMQ虚拟主机

然后编写测试类ConsumerApplicationTest,并利用RabbitTemplate 实现消息发送。

RabbitTemplate 是一个RabbitMQ的模板类,用于发送消息到RabbitMQ队列或者交换机。

@SpringBootTest
public class ConsumerApplicationTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void ConsumerTest(){
        // 定义要发送的队列
        String queue = "simple.queue";
        // 定义要发送的信息
        String message = "你好啊, spring AMPQ";
        rabbitTemplate.convertAndSend(queue,message);
    }
}

 5.4  消息接收

 上同,首先配置MQ的地址。

spring:
  rabbitmq:
    username: windStop  # RabbitMQ用户名
    host: 8.130.10.216  # RabbitMQ主机地址
    password: 123       # RabbitMQ密码
    port: 5672          # RabbitMQ端口号
    virtual-host: /windStop  # RabbitMQ虚拟主机
 

listener包中新建一个类SpringRabbitListener

@Component
@Slf4j
public class SpringRabbitListener {

    /**
     * 接收到的消息会以String类型的msg参数传入方法中
     * @param msg
     */
    @RabbitListener(queues = "simple.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
    public void listenSimpleQueueMessage(String msg){
        log.info("Spring 消费者接收到的消息:{}",msg);
    }
} 

5.5  总结一下核心步骤 

Spring AMQP收发消息的步骤如下:

  1. 引入spring-boot-starter-amqp依赖。
  2. 配置RabbitMQ服务端信息,包括用户名、密码、主机地址、端口号等。
  3. 使用RabbitTemplate来发送消息到RabbitMQ服务器。
  4. 使用@RabbitListener注解声明要监听的队列,并编写相应的方法来处理接收到的消息内容。

6.  Work Queues

概念:任务模型。简单来说就是让多个消费者绑定到一个队列中,共同消费队列中的信息。

介绍一下:生产者消费者模型

  • 生产者和消费者之间解耦:生产者和消费者之间通过一个共享的缓冲区(队列)来进行通信,彼此不直接依赖。
  • 异步性:生产者可以持续不断地生成数据,而消费者可以独立地处理这些数据,实现异步处理。
  • 实现多线程并发:生产者和消费者可以在不同的线程中运行,提高系统的吞吐量和效率。

     在RabbitMQ中,生产者将消息发送到队列中,而消费者则从队列中获取消息进行处理,实现了生产者消费者模型的应用。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了

存在的问题: 

        发送完消息会被哪个消费者处理呢?还是所有消费者都会处理?怎么分配?

        6.1 入门案例

6.1.1 创建队列

首先,我们在控制台创建一个新的队列,命名为work.queue

6.1.2 定义两个消费者模型
/**
 * 接收到的消息会以String类型的msg参数传入方法中
 * @param msg
 */
@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage1(String msg){
    System.out.println("消费者1接收到消息:" + msg + "," + LocalDateTime.now());
}

@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage2(String msg){
    System.err.println("消费者2接收到消息:" + msg + "," + LocalDateTime.now());
}
6.1.3 生产者发送五十条信息            
@Test
public void workQueueTest(){
    // 定义要发送的队列
    String queue = "work.queue";
    //循环发送五十条数据
    for (int i = 1; i <= 50; i++) {
        String message = "你好, SpringAMQP" + i;
        rabbitTemplate.convertAndSend(queue,message);
    }
}

通过输出结果可以分析答案是:

        消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。每个都是收到一条。

6.1.4 模拟快慢消费者
/**
 * 接收到的消息会以String类型的msg参数传入方法中
 * @param msg
 */
@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:" + msg + "," + LocalDateTime.now());
    Thread.sleep(50);//通过睡眠短时间模拟快消费者
}

@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
    System.err.println("消费者2接收到消息:" + msg + "," + LocalDateTime.now());
    Thread.sleep(500);//通过睡眠长时间模拟慢消费者
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了50毫秒,相当于每秒钟处理20个消息

  • 消费者2 sleep了500毫秒,相当于每秒处理2个消息

        也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。 

        6.2 能者多劳 (动态分配权重)

        默认情况下,RabbitMO的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没

有考虑到消费者是否已经处理完消息,可能出现消息堆积。

        默认情况下:无论你有没有处理完都给你分配。设置为1,就是处理完才给你分配。

分配消费者的预取限制

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 

 这是一个消费者的预取(Prefetch)限制设置。它定义了消费者在从 RabbitMQ 服务器获取消息时一次预取的消息数量。 

 如果不设置。消费者默认的预取限制将会是无限制的,即一次性获取尽可能多的消息。

  1. 资源过度占用: 如果消费者一次性获取大量消息,但处理消息的速度较慢,就会导致大量消息堆积在消费者端,消耗大量内存和其他系统资源。这可能导致系统的负载急剧增加,甚至导致系统崩溃。

  2. 不可控的消费行为: 一次性获取大量消息会导致消费者处理速度不可控,快速消费完部分消息后,可能会因为处理时间长的消息而导致整体处理速度下降。

  3. 不公平的消息分发: 如果一次性获取大量消息,可能会导致消息在消费者之间分布不均匀,一些消费者可能会快速处理完消息而另一些消费者处理速度较慢,从而导致消息处理效率不高。

  4. 消息积压和延迟: 一次性获取大量消息可能导致消息积压,影响系统对消息的实时处理能力,也会增加消息的处理延

综上所述,合适的预取限制可以帮助控制系统资源的使用,确保消息的平稳处理,避免系统崩溃和消息处理效率低下的问题。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

7. 交换机类型 

在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机。

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化。当消费者处理完毕后,队列中存储的数据就会被删除。
     

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机。

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

        7.1 Fanout交换机 

在广播模式下,消息发送流程是这样的:复制成n份,发送给每一个队列。 

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息

作用解析:分布式架构中,每个模块绑定一个队列,然后对于支付完成后,我们就可以广播给多个队列让他们进行处理,比如:支付后发信息通知,支付后添加积分。 

         7.1.1 代码实现
1. 创建队列:

2. 创建交换机:

3.  绑定队列和交换机之间的关系: 

4. 添加消费者:
@RabbitListener(queues = "fanout.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenFanoutQueueMessage1(String msg){
    System.err.println("消费者fanout1 接收到消息:" + msg + "," + LocalDateTime.now());
}

@RabbitListener(queues = "fanout.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenFanoutQueueMessage2(String msg) {
    System.err.println("消费者fanout2 接收到消息:" + msg + "," + LocalDateTime.now());
}
5.  添加生成者:
@Test
public void fanoutTest(){
    // 定义要发送的队列
    String exchangeNane = "windStop.fanout";
    // 定义要发送的信息
    String message = "大家好啊!";
    rabbitTemplate.convertAndSend(exchangeNane,null ,message);
}

交换机的作用是什么?

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列 

7.2 Direct交换机 

        在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。按照规则进行路由。并且一个队列可以绑定多个规则(路由键)。

在Direct模型下:

  • 每一个Queue都与Exchange设置一个BindingKey(路由key)。
  • 发布者发送消息时,指定消息的RoutingKey。
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。

暗号一样,才会给你。 

        作用解析:分布式架构中,每个模块绑定一个队列,然后对于支付完成后,我们就可以广播给多个队列让他们进行处理,也有些操作不需要发给所有,就需要按照键匹配,支付后发信息通知,支付后添加积分。我取消支付就不需要这两种,但我还需要其他的支付的操作。

        7.2.1 代码实现

1. 创建队列

2. 创建交换机

3. 创建交换机和路由之间的关系

         3.1 进入交换机

        3.2 绑定关系并且指定 BindingKey

因为RabbitMQ官网没有设置同时绑定多个BindingKey,所以要想绑定多个BindingKey就要bind多次。

 

绑定成功后的页面

4. 创建消费者

/**
 * 订阅
 * @param msg 接收到的内容
 */
@RabbitListener(queues = "direct.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenDirectQueueMessage1(String msg){
    System.err.println("消费者fanout1 接收到消息:" + msg + "," + LocalDateTime.now());
}

@RabbitListener(queues = "direct.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenDirectQueueMessage2(String msg) {
    System.err.println("消费者fanout2 接收到消息:" + msg + "," + LocalDateTime.now());
}

5. 创建生产者

这个时候就需要指定第二个参数:

@Test
public void directTest(){
    // 定义要发送的队列
    String exchangeNane = "windStop.direct";
    // 定义要发送的信息
    String message = "红色:震惊,李旭居然是人!";
    rabbitTemplate.convertAndSend(exchangeNane,"red",message);
}

因为二者都绑定了,red这个路由key,所以direct.queue1和direct.queue2都能收到。

@Test
public void directTest2(){
    // 定义要发送的队列
    String exchangeNane = "windStop.direct";
    // 定义要发送的信息
    String message = "蓝色:明天就要上课了。";
    rabbitTemplate.convertAndSend(exchangeNane,"blue",message);
}

 因为只有direct.queue1绑定了blue这个路由key,所以只有direct.queue1能收到。

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

7.3. Topic交换机

7.3.1 .说明

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;

  • china.weather 代表中国的天气消息;

  • japan.news 则代表日本新闻

  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news

    • china.weather

  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news

    • japan.news

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。

首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。创建步骤和上述一样,最终结果如下:

7.3.2 创建消费者 
/**
 * 通配符订阅
 * @param msg 接收到的内容
 */
@RabbitListener(queues = "topic.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenTopicQueueMessage1(String msg){
    System.out.println("消费者topic1 接收到消息:" + msg + "," + LocalDateTime.now());
}

@RabbitListener(queues = "topic.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenTopicQueueMessage2(String msg) {
    System.err.println("消费者topic2 接收到消息:" + msg + "," + LocalDateTime.now());
}
7.3.3 创建生产者
@Test
public void testTopicExchange() {
    // 交换机名称
    String exchangeName = "windStop.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

 因为二者都匹配了,前者前缀匹配,后者后缀匹配,所以topic.queue1和topic.queue2都能收到。

@Test
public void testTopicExchange2() {
    // 交换机名称
    String exchangeName = "windStop.topic";
    // 消息
    String message = "今天天气真不错,我的心情好极了";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}

只有topic.queue1会匹配到。.weacher不符合topic.queue2的后缀要求。  

7.3.4 总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个词

  • *:代表1个词

8. 声明队列和交换机

        在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,需要重新创建发布时候的RabiitMQ,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。-> 使用可视化面板创建 

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。-> 代码创建

         8.1 代码创建的基本API

声明队列和交换机:
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  1. Queue:用于声明队列,可以用工厂类QueueBuilder构建。
  2. Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建。
  3. Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建。
 1. 创建队列

2. 创建交换机

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

        3. fanout示例

        基于AMQP协议的消息队列系统,通过声明式的配置方式,RabbitMQ客户端会在应用启动时自动创建交换机和队列,并建立它们之间的对应关系,从而为应用程序提供便捷的消息队列支持。

@Configuration
public class FanoutConfig {
    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("windStop.fanout");
    }
    // 声明第 1 个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //绑定队列 1 和 交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    // 声明第 2 个队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }

}
       4. direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

这种方式,虽然可以实现但是很臃肿,每绑定一个BindingKey就需要多写个路由关系的方法。

8.2 基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

是不是简单多了。

介绍一下:

  1. @RabbitListener(bindings = @QueueBinding(...)): 这里声明了一个RabbitMQ的消息监听器,通过bindings参数指定了队列绑定的相关配置。

  2. value = @Queue(name = "direct.queue1"): 在这里,我们声明了一个名为"direct.queue1"的队列。这表示我们将会监听这个特定的队列。

  3. exchange = @Exchange(name = "windStop.direct", type = ExchangeTypes.DIRECT): 这里声明了一个名为"windStop.direct"的订阅类型的交换机。订阅交换机(Direct Exchange)根据消息的routing key将消息路由到特定的队列。

  4. key = {"red", "blue"}: 这里指定了队列和交换机之间的绑定关系。对于队列"direct.queue1",它将会接收所有routing key为"red"或"blue"的消息。

再试试Topic模式:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

9. 总结

        这段文档内容非常全面地介绍了 RabbitMQ 的核心概念、安装、配置以及各种交换机类型的使用方法。它包括了 RabbitMQ 的前言介绍,不同类型的通信方式,不同 MQ 的对比,以及 RabbitMQ 的安装和核心模块介绍。同时也涵盖了 Spring AMQP 的使用方法和示例,以及 Work Queues、交换机类型(Fanout、Direct、Topic)的详细说明和代码实现。此外祝大家周末愉快!

标签:入门,队列,RabbitMQ,交换机,就够,msg,public,消息
From: https://blog.csdn.net/m0_74749208/article/details/139566178

相关文章

  • HIVE从入门到精通------(1)hive的基本操作
    1.开启hive1.首先在master的/usr/local/soft/下启动hadoop:master:start-all.shstart-all.sh2.在另一个master(2)上监控hive日志:master(2):tail-F/tmp/root/hive.logtail-F/tmp/root/hive.log3.在master上启动元数据服务:......
  • Python学习笔记6:pychram相关知识及安装教程,后续需要学习的入门知识
    上篇文章说了,今天去公司重新装一下IDE,最后也是把过程这边再记录一下,有需要的可以参考一下。关于pychrampychram是什么?PyCharm是由JetBrains公司开发的一款流行的Python集成开发环境(IDE)。它专为Python语言设计,提供了许多方便的功能来帮助开发者编写、测试和调试Python代码......
  • DP经典问题----背包问题的代码实现(入门级)(C++/PYTHON)
    背包的状态转换方程i:表示物品序号j:表示背包大小W[i]:表示第i件物品的重量f[i,j]:表示在前i件物品中选择若干件放在承重为j的背包中,可以取得的最大价值f[i-1,j-Wi]:表示在前i-1件物品中选择若干件放在承重为j-Wi的背包中,可以取得的最大价值Pi(j>=Wi):表示第i件物品的价值,要......
  • 栈溢出漏洞利用二,ret2syscall,构造rop链条实现攻击(pwn入门)
    原理原理就直接参考别的大佬写的文章讲下了 参考文章:https://blog.csdn.net/qq_33948522/article/details/93880812ret2syscall,即控制程序执行系统调用,获取shellret2syscall通常采用execve(重点函数,32位调用号为0x0b,64位调用号为0x3b)ROPReturnOrientedProgramming,其......
  • kettle从入门到精通 第六十八课 ETL之kettle kettle随机数生成的一些方案
    1、在做ETL数据抽取的时候,会用到生成随机数的功能,今天我们一起来学习下如何生成随机数据。如下图所示 2、将生成随机数拉倒画布即可,然后设置字段名称和选择合适的类型,如下图所示:类型:随机数字:生成一个介于0和1之间的随机数随机整数:生成一个随机的32-bit整数随机字符串:基......
  • springboot rabbitmq如何保证消息顺序消费
    很多时候,消息的消费是不用保证顺序的,比如借助mq实现订单超时的处理。但有些时候,业务中可能会存在多个消息需要顺序处理的情况,比如生成订单和扣减库存消息,那肯定是先执行生成订单的操作,再执行扣减库存的操作。那么这种情况下,是如何保证消息顺序消费的呢?首先,为了效率,我们可以设置......
  • RabbitMQ-如何保证消息不丢失
    RabbitMQ常用于异步发送,mysql,redis,es之间的数据同步,分布式事务,削峰填谷等.....在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证rabbitmq的消息不丢失就显得尤为重要。首先要分析问题,我们就要明确rabbitmq在什么时......
  • [转]Systemd 入门教程:命令篇
    原文地址:Systemd入门教程:命令篇-阮一峰的网络日志Systemd是Linux系统工具,用来启动守护进程,已成为大多数发行版的标准配置。本文介绍它的基本用法,分为上下两篇。今天介绍它的主要命令,下一篇介绍如何用于实战。一、由来历史上,Linux的启动一直采用init进程。下面的命令......
  • DockerCompose+Jenkins+Pipeline流水线打包SpringBoot项目(解压安装配置JDK、Maven等)
    场景DockerCompose中部署Jenkins(DockerDesktop在windows上数据卷映射):https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/139491855Docker+Jenkins+Gitee+Maven项目配置jdk、maven、gitee等拉取代码并自动构建以及遇到的那些坑:https://blog.csdn.net/BADAO_LIUMANG_......
  • Mybatis快速入门
    文章目录1.Mybatis概述1.1Mybatis概念1.2JDBC缺点1.3Mybatis优化2.Mybatis快速入门3.Mapper代理开发3.1Mapper代理开发概述3.2使用Mapper代理要求3.3案例4.核心配置文件4.1多环境配置4.2类型别名1.Mybatis概述1.1Mybatis概念MyBatis是一款......