首页 > 其他分享 >SpringAMQP — RabbitMQ操作工具

SpringAMQP — RabbitMQ操作工具

时间:2024-11-09 16:15:51浏览次数:3  
标签:SpringAMQP 队列 绑定 direct RabbitMQ 交换机 消息 工具 路由

1. Spring AMQP 简介

Spring AMQP(Spring for Advanced Message Queuing Protocol) 是 Spring 框架的一个子项目,用于简化与消息代理(如 RabbitMQ)的集成。Spring AMQP 提供了基于 AMQP 协议的抽象层,使得 Java 程序员能够更轻松地使用消息队列完成异步通信、消息分发和数据流处理。Spring AMQP 的核心模块是 spring-rabbit,它封装了与 RabbitMQ 的交互。

 将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

官方地址:https://spring.io/projects/spring-amqp/

2. Spring AMQP 的核心组件

  • RabbitTemplate:Spring AMQP 提供的主要操作类,用于向队列发送和接收消息。它提供了简单且易用的 API 进行消息发送和接收。

  • AmqpAdmin:用于管理 RabbitMQ 的资源,例如创建和删除队列、交换机、绑定等。

  • @RabbitListener 和 @RabbitHandler:注解驱动的消息监听机制,可以在类或方法上使用 @RabbitListener 注解来监听指定队列的消息。

  • MessageConverter:用于将 Java 对象和消息之间进行相互转换,Spring AMQP 提供了多种消息转换器,例如 Jackson JSON、SimpleMessageConverter 等,方便消息的序列化和反序列化。

发送消息示例:

// 交换机名称
String exchangeName = "cyt.topic"; // 指定要发送消息的交换机名称。这里使用了一个名为 "cyt.topic" 的 Topic 类型交换机

// 消息内容
String message = "喜报!孙悟空大战哥斯拉,胜!"; // 要发送的消息内容,可以是任何文本,当前内容为示例新闻

// 发送消息到指定交换机和路由键
rabbitTemplate.convertAndSend(exchangeName, "china.news", message); 
// 使用 RabbitTemplate 将消息发送到指定的交换机。
// 参数解释:
// - exchangeName:要发送消息的交换机名称 "cyt.topic"。
// - "china.news":消息的路由键,适用于 Topic 交换机,用于匹配绑定队列的路由模式。
// - message:要发送的消息内容。
// 如果有队列绑定到 "cyt.topic" 交换机,并且匹配 "china.news" 的路由键(例如 "china.*"),
// 则消息会被路由到该队列并被消费者接收处理。

3. 交换机类型

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

3.1 Fanout交换机

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息

3.2 Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

3.3 Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

4. 声明交换机和队列

程序启动时检查队列和交换机是否存在,如果不存在自动创建。

4.1 Direct模式的交换机和队列

// 消费者1 - 监听绑定的 direct.queue1 队列
@RabbitListener(bindings = @QueueBinding(
    // 定义并绑定队列
    value = @Queue(name = "direct.queue1"), // 创建一个名称为 direct.queue1 的队列
    // 定义并绑定交换机
    exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
    // 绑定的路由键
    key = {"red", "blue"} // 路由键,指定该消费者会接收 "red" 和 "blue" 路由键的消息
))
public void listenDirectQueue1(String msg){
    // 接收到消息时打印输出
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

// 消费者2 - 监听绑定的 direct.queue2 队列
@RabbitListener(bindings = @QueueBinding(
    // 定义并绑定队列
    value = @Queue(name = "direct.queue2"), // 创建一个名称为 direct.queue2 的队列
    // 定义并绑定交换机
    exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
    // 绑定的路由键
    key = {"red", "yellow"} // 路由键,指定该消费者会接收 "red" 和 "yellow" 路由键的消息
))
public void listenDirectQueue2(String msg){
    // 接收到消息时打印输出
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

4.2 Topic模式的交换机和队列

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

5. 消息转换器

Spring的消息发送代码接收的消息体是一个Object。

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

5.1 引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

5.2 配置JSON转换器

配置消息转换器,在服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

6. 使用示例

6.1 引入依赖

  <!--消息发送-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

6.2 配置

spring:
  rabbitmq:
    host: 192.168.1.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /cyt# 虚拟主机
    username: cyt# 用户名
    password: 123 # 密码

6.3 接收消息

在服务中定义一个消息监听类:

package com.cyt.trade.listener;

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    // 注入订单服务,用于更新订单状态
    private final IOrderService orderService;

    /**
     * 监听支付成功的消息队列,当接收到支付成功的消息时,更新订单状态。
     * 
     * @RabbitListener 注解用于声明这是一个消息监听方法。
     * @QueueBinding 注解定义队列、交换机和路由键的绑定关系。
     *   - @Queue 用于定义队列的属性,例如队列名和是否持久化。
     *   - @Exchange 用于定义交换机的属性,例如交换机名和类型。
     *   - key 指定路由键,将匹配该路由键的消息发送到此队列。
     *
     * @param orderId 支付成功的订单ID,从消息中提取的参数。
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue", durable = "true"), // 定义队列名称和持久化
            exchange = @Exchange(name = "pay.topic"), // 指定交换机名称
            key = "pay.success" // 路由键,用于匹配支付成功的消息
    ))
    public void listenPaySuccess(Long orderId) {
        // 调用订单服务,更新订单为支付成功状态
        orderService.markOrderPaySuccess(orderId);
    }
}

6.4 发送消息

在需要发送消息的地方使用rabbitTemplate即可发送消息。

rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());

标签:SpringAMQP,队列,绑定,direct,RabbitMQ,交换机,消息,工具,路由
From: https://blog.csdn.net/qq_46637011/article/details/143618414

相关文章

  • 腾讯课堂视频课件课程下载工具,如何在电脑端下载腾讯课堂视频课程课件资料到本地?
    一.安装腾讯课堂课程下载器1.获取学无止下载器https://www.xuewuzhi.cn/keqq_downloader 2.下载安装后,然后点击桌面快捷方式运行即可。注意:杀毒软件可能会阻止外部exe文件运行,并将其当做成病毒,直接添加信任即可,本软件绝对没有木马病毒。二.使用说明1.学无止下载器介绍......
  • StarUML建模工具安装学习与汉化最新零基础详细教程【一键式下载】(适用于Windows、MacO
    StarUML破解安装下载教程前言:StarUML破解与汉化安装下载教程,仅供学习研究和交流使用,禁止作为商业用途或其他非法用途!仓库作者:X1a0He,经仓库作者授权使用。目录StarUML破解安装下载教程1.下载准备1.1一键式准备【懒人准备】1.2学习式准备1.2.1学习准备2.window......
  • 一文彻底弄懂JUC工具包的CountDownLatch的设计理念与底层原理
    CountDownLatch是Java并发包(java.util.concurrent)中的一个同步辅助类,它允许一个或多个线程等待一组操作完成。一、设计理念CountDownLatch是基于AQS(AbstractQueuedSynchronizer)实现的。其核心思想是维护一个倒计数,每次倒计数减少到零时,等待的线程才会继续执行。它的主要设......
  • 进程工具类 - C#小函数类推荐
          此文记录的是进程操作的类库。/***进程工具类AustinLiu刘恒辉ProjectManagerandSoftwareDesignerE-Mail:[email protected]:http://lzhdim.cnblogs.comDate:2024-01-1515:18:00使用方法例子:foreac......
  • Nginx代理访问RabbitMQ Management UI
    RabbitMQ官方文档说明如下:UsingaReverseProxyinfrontoftheHTTPAPIItmaybenecessarytoputareverseproxyinfrontofaRabbitMQcluster.ReverseproxysetupforRabbitMQmayrequirecarefulhandlingofencodedslashesinpathsifdefaultvirtualhos......
  • k4yt3x/video2x:视频和图像无损放大工具
    该项目集成了多种超分辨率算法(如Waifu2x、Anime4K、Real-ESRGAN),能够有效提高视频和图像的分辨率,并提供了图形界面(GUI)、Docker和命令行界面(CLI)的使用方式。links:https://hellogithub.com/repository/33efae8614d5435eb5f2db98d53d4fa7https://github.com/k4yt3x/video......
  • OSSFileBrowse:OSS存储桶遍历漏洞利用工具
    简介:由于经常遇到存储桶遍历漏洞,直接访问文件是下载,不方便预览,且甲方要求证明该存储桶的危害,因此该工具应运而生。使用javafx做图形化,kkFileView做文件预览接口。使用:命令行运行:java-Dfile.encoding=UTF-8-jarOSSFileBrowse-1.0-SNAPSHOT.jar或者直接点击run.bat文件。......
  • NetExec:新型内网/域渗透工具
    免责声明仅供安全研究与学习之用,若将工具做其他用途,由使用者承担全部法律及连带责任,作者及发布者不承担任何法律及连带责任。简介:NetExec是一款强大的自动化网络安全评估和漏洞测试工具,作为已停止维护的CrackMapExec(CME)的现代替代品,它已被渗透测试人员和红队成员广泛采用......
  • MySQLMonitor: 黑盒测试Mysql实时监控辅助工具
    MySQLMonitorMySQL实时监控工具(代码审计、黑盒测试辅助工具)使用1.自行打包使用gitclonehttps://github.com/fupinglee/MySQLMonitorcdMySQLMonitormvncleanpackage-DskipTests=true打开target下的jar文件即可执行2.直接下载使用https://github.com/fupinglee/......
  • 最实用的隐私测试工具操作手册来了,错过你就亏了
    注:本工具仅适用于未加固的安卓debug包APK    在开始之前,建议大家先回顾一下我们之前发布的关于隐私合规检测的文章。本次分享的隐私测试工具和以往的xpose隐私检测方法,有很大区别,一个对比后支持范围和准确性,另外一个就是操作简便,兼容性强,报告查看方便;    旧......