首页 > 其他分享 >SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ

时间:2025-01-20 09:29:12浏览次数:3  
标签:SpringBoot exchange 队列 RabbitMQ 交换机 消息 整合 public

RabbitMQ

简介

消息中间件:它接收消息并且转发,就类似于一个快递站,卖家把快递通过快递站,送到我们的手上,MQ也是这样,接收并存储消息,再转发。
RabbitMQ在 2007 年由Rabbit科技有限公司发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出,里面存放的内容是message。

用途

MQ的用途最常用的有三个:流量削峰、应用解耦和异步处理。

流量削峰

流量削峰简单概括就是在访问量剧增的情况下,但是应用仍然不能停。比如“双十一”下单的人多,平时的服务没有办法一下子处理如此巨大的流量,那么就可以把这些订单放入到MQ中,使用MQ作缓冲,把一秒内下的订单分散到一段时间去处理,这样虽然用户那边过了十几秒才收到下单成功的消息,但总比直接下单失败要好很多,当然真正的双十一措施要远比这个复杂得多。

应用解耦

以电商中的订单服务举例,订单作为核心业务要涉及到支付、库存、物流等服务,若是不解耦其中一个子系统出了问题,下单就会失败,这显然是违背了高可用原则的。MQ就可以用于这些服务的解耦,将这些服务都放入到MQ中进行处理,这样即使某个服务突然挂了,订单服务也不会立刻失败,需要处理的内存被缓存在MQ中,当失败的服务恢复后可以继续处理订单业务。
MQ_jieou

异步调用

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。
这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。
MQ_yibu

组件

  • Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker
  • Connection: publisher / consumer和 broker之间的TCP连接
  • Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TC PConnection的开销将是巨大的,效率也较低。Channel是在connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker识别 channel,所以channel 之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建TCP connection的开销
  • Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic(publish-subscribe) and fanout
  • Routing Key:生产者将消息发送到交换机时会携带一个key,来指定路由规则
  • binding Key:在绑定Exchange和Queue时,会指定一个BindingKey,生产者发送消息携带的RoutingKey会和bindingKey对比,若一致就将消息分发至这个队列
  • vHost 虚拟主机:每一个RabbitMQ服务器可以开设多个虚拟主机每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的 “交换机exchange、绑定Binding、队列Queue”,更重要的是每一个vhost拥有独立的权限机制,这样就能安全地使用一个RabbitMQ服务器来服务多个应用程序,其中每个vhost服务一个应用程序。

交换机类型

  • direct Exchange(直接交换机)
    匹配路由键,只有完全匹配消息才会被转发

  • Fanout Excange(扇出交换机)
    将消息发送至所有的队列

  • Topic Exchange(主题交换机)
    将路由按模式匹配,此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“.“匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。

  • Header Exchange
    在绑定Exchange和Queue的时候指定一组键值对,header为键,根据请求消息中携带的header进行路由

工作模式

  • simple(简单模式)
    一个生产者对应一个消费者。
  • Work queues(工作模式)
    一个生产者生产,多个消费者进行消费,一条消息消费一次。
  • Publish/Subscibe(发布订阅模式)
    生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息。
  • Routing(路由模式)
    生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列。
  • Topics(主题模式)
    生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者。

基础整合

引入maven依赖

<!--amqp依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ的自动配置类RabbitAutoConfiguration自动配置了连接工厂ConnectionFactory。ConnectionFactory从配置RabbitProperties中获取连接信息完成连接到RabbitMQ服务器。程序中可以注入RabbitTemplate给RabbitMQ发送和接收消息。

全局配置文件中配置RabbitMQ连接信息

spring:
  rabbitmq:
    host: 192.168.0.117 #rabbitmq服务器地址
    port: 5672         #端口号
    username: guest    #用户名
    password: guest    #密码
    #virtual-host:     #虚拟主机

测试发送数据

@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendDirect(){
        // Message需要自己构造一个;定义消息体内容和消息头
        // rabbitTemplate.send(exchage,routeKey,message);
        HashMap<String, Object> map = new HashMap<>();
        map.put("name", "baobao");
        map.put("age", 18);
        map.put("list", Arrays.asList(1,2,3,4,5));
        // 将map对象序列化以后,以baobao为路由键发送到exchange.direct,exchange会根据路由键将消息路由到具体的队列
        rabbitTemplate.convertAndSend("exchange.direct", "baobao", map);
    }
}

clipboard-1591066767967

clipboard-1591066771100
可以发现默认发给RabbitMQ的数据以jdk的方式进行序列化,并且消息头中的content_type保存了消息体的类型。

测试接收数据

@Test
public void testReceiveDirect(){
    // 从指定队列中接收数据
    Object data = rabbitTemplate.receiveAndConvert("baobao");
    System.out.println(data.getClass());
    System.out.println(data);
}

clipboard-1591066802619

自定义json序列化

SpringBoot中,默认发送的对象实例是以JDK序列化的,这总序列化不仅不容易查看消息,还占用较大的内存。

我们可以自己定制序列化方式,这里以json为例:

在自定义配置文件中添加一个自己的MessageConverter,类型是Jackson2JsonMessageConverter

@Configuration
public class MyRabbitConfig {
    // 添加json格式序列化器
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

进行测试

// 测试广播和json序列化
@Test
public void testJsonSerilize(){
    // 广播可以不需要传路由键
    rabbitTemplate.convertAndSend("exchange.fanout", "", new Person("文亮", 18));
}

clipboard-1591066900812
可以发现消息头中的content_type属性保存了消息体的类型为application/json,__TypeId__属性保存了javabean的全类名,用于反序列化

监听消息

@RabbitListener

先在主程序上加@EnableRabbit,开启基于注解的RabbitMQ模式

@SpringBootApplication
@EnableRabbit
public class MainApplicationMQ {
    public static void main(String[] args) {
        SpringApplication.run(MainApplicationMQ.class, args);
    }
}

编写1个Service,声明一个监听方法,方法上标注@RabbitListener,传入需要监听的队列名。监听方法可以接收的参数如下(无需保证参数顺序):

  • Message对象:原生消息的详细信息,包括消息头+消息体
  • 自定义实体类对象:用消息体反序列化后得到的Javabean
  • Channel对象:当前传输的数据通道
@Service
public class PersonService {
    /**
     * 监听队列baobao中的消息,有消息会自动取出并回调该方法
     * @param message 原生消息的详细信息,包括消息头+消息体
     * @param person  从消息体中解码出的javabean
     * @param channel 当前传输的数据通道
     */
    @RabbitListener(queues = "baobao")
    public void listen(Message message, Person person, Channel channel){
        System.out.println(message);
        System.out.println(person);
        System.out.println(channel);
    }
}

启动该消费者,从队列中消费1条消息:
clipboard-1591066973657
控制台打印结果

// message
(Body:'{"name":"文亮","age":18}' MessageProperties [headers={__TypeId__=com.baobao.springbootdemo.mq.bean.Person}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=amq.fanout, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-81sPttG477H4uOtS_e6tHA, consumerQueue=baobao])
// person
Person{name='文亮', age=18}
// channel
Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.56.55:5672/,1), conn: Proxy@21a02097 Shared Rabbit Connection: SimpleConnection@2496b460 [delegate=amqp://guest@192.168.56.55:5672/, localPort= 6257]

注意:

  • 如果只有一个消费客户端,那么rabbitmq默认会将队列中的所有一次性发到消费者,但是消费者接收到消息后只能1个1个处理,只有处理完1个消息(即监听方法运行完毕,哪怕执行时间很长),才能继续处理下一个消息
  • 如果启动多个客户端,都对应同一个监听消息的方法,那么对于同一个消息,只有1个客户端可以接收到
  • 监听方法中的消息实例对象要与发送端对应,比如发送端发送字节数组那么接收端也要声明为字节数组参数;发送端发送Person对象那么接收端也要声明为Person类型参数

@RabbitHandler

我们还可以采用@RabbitListener配合@RabbitHandler的方式完成对消息的监听:

  • @RabbitListener:标注在类上,指定监听哪些队列
  • @RabbitHandler:标注在每个接收并处理不同消息的重载方法上,区分处理不同类型的消息
@Service
@RabbitListener(queues = {"baobao","baobao.news","baobao.map"})
public class PersonService {
    @RabbitHandler
    public void handlePersonMsg(Person person){
        System.out.println(person);
    }

    @RabbitHandler
    public void handleUserMsg(User user){
        System.out.println(user);
    }
}

创建交换机、队列、绑定关系

利用AmqpAdmin

给程序中注入AmqpAdmin可以实现对RabbitMQ的管理,它的declareXXX方法可以创建exchange、queue、binding等

public class MQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void testAmqpAdmin(){
        // 创建一个Direct类型的exchange
        amqpAdmin.declareExchange(new DirectExchange("exchange.amqpadmin"));
        // 创建一个queue
        amqpAdmin.declareQueue(new Queue("queue.amqpadmin"));
        // 添加exchange和queue之间的绑定
        amqpAdmin.declareBinding(new Binding("queue.amqpadmin", Binding.DestinationType.QUEUE,
                "exchange.amqpadmin","queue.amqpadmin",null));
    }

使用amqpAdmin创建交换机、队列、绑定关系时,会先检查rabbitmq有是否已经存在对应的交换机、队列、绑定关系,如果不存在才创建,已存在就什么都不做

直接在容器中放置对象

另外还有一种方法可以创建Queue、exchange和绑定关系,直接在容器中放置即可。当执行任何操作rabbitmq的方法时,如果rabbitmq发现还没有队列、交换机或绑定关系,就会自动创建

@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 创建一个交换机:参数1 交换机名称,参数2 是否持久化,参数3 是否自动删除
    @Bean
    public Exchange exchange(){
        return new DirectExchange("test.direct", true, false);
    }

    // 创建一个队列:参数1 队列名称,参数2 是否持久化,参数3 是否排他,参数4 是否自动删除
    @Bean
    public Queue queue(){
        return new Queue("test.queue", true, false, false);
    }

    // 创建交换机和队列的绑定关系:参数1 绑定的目标,参数2 绑定的目标类型,参数3 交换机名称,参数4 路由键,参数5 绑定参数
    @Bean
    public Binding binding(){
        return new Binding("test.queue", Binding.DestinationType.QUEUE,"test.direct",
                "queue", null);
    }

注意:

  • 直接在容器中放置Bean相比于直接利用AmqpAdmin来创建交换机、队列、绑定关系的区别是,容器中放置Bean的方式是懒加载的,也就是说并不会在容器启动时就创建,而是等我们的应用第一次连接rabbitmq进行操作的时候才创建交换机、队列、绑定关系。其底层原理是:当连接第一次创建时,会回调连接创建的监听方法,从容器中查找所有Exchange、Queue和Binding对象,然后利用AmqpAdmin将它们进行创建。也就是说SpringBoot应用刚启动时是不会创建这些对象的,只有程序首次连接rabbitmq获取connection时才会创建
  • 只有rabbitmq中不存在对应的交换机、队列、绑定关系时才会创建,已存在就什么都不做

另外也可以利用Builder模式链式创建

// 利用Builder模式链式创建
@Bean
public Exchange exchange2(){
    // 默认就是非自动删除
    return ExchangeBuilder.directExchange("direct.exchange").durable(true).build();
}

@Bean 
public Queue queue2(){
    // 默认就是非自动删除,不排他
    return QueueBuilder.durable("queue").build();
}

@Bean 
public Binding binding2(@Qualifier("queue2") Queue queue,@Qualifier("exchange2") Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with("routingkey").noargs();
}

标签:SpringBoot,exchange,队列,RabbitMQ,交换机,消息,整合,public
From: https://blog.csdn.net/weixin_52093727/article/details/145253512

相关文章

  • Shell SpringBoot 操作
    通过shell脚本来操作SpringBoot,检查程序是否在运行,启动程序,停止程序,重启程序,输出程序状态#!/bin/bash#这里可替换为你自己的执行程序,其他代码无需更改APP_NAME="$2"APP_DIR=/application#APP_DIR=`pwd`#使用说明,用来提示输入参数usage(){echo"Usage:shxxx.sh[......
  • 基于SpringBoot+Vue的智慧党建平台设计与实现
    ......
  • 如何高效整合海量仪器数据?电子实验记录本给出答案
    实验记录是科研人员对实验过程与结果的忠实记录,维系着科研工作的严谨性与连贯性。各类仪器产生的实验数据量呈爆发式增长,其记录主要存在两大类模式,即传统的纸质模式和现代的电子化模式。一,纸质实验记录模式效率低纸质记录模式面对海量数据,其不利之处如下:1,手抄,速度太慢......
  • 2025毕设springboot 基于web的家教管理系统论文+源码
    系统程序文件列表开题报告内容研究背景在当今社会,随着教育需求的多元化和个性化发展,家教服务逐渐成为众多家庭补充学校教育、提升孩子学习成绩的重要途径。然而,传统的家教市场存在信息不对称、管理不规范、服务质量参差不齐等问题,给家长和家教老师带来了诸多不便。随着互联......
  • 2025毕设springboot 基于Web的多功能游戏平台设计与实现论文+源码
    系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展和普及,网络游戏已成为人们休闲娱乐的重要方式之一。传统的游戏平台大多局限于单一的游戏类型或服务商,无法满足用户多样化的游戏需求。与此同时,随着Web技术的不断进步,基于Web的游戏平台凭借其跨平台、易访问、......
  • 计算机毕业设计Springboot大学生创新教育平台 基于Springboot的大学生创新创业教育平
    计算机毕业设计Springboot大学生创新教育平台3ky241z7(配套有源码程序mysql数据库论文)本套源码可以先看具体功能演示视频领取,文末有联xi可分享随着全球经济和科技的快速发展,创新已成为推动社会进步的重要动力。大学生作为未来社会的主要建设者和创新者,其创新能力的培养和......
  • 计算机毕业设计Springboot服装租赁系统 基于Spring Boot框架的服装租赁平台开发 Sprin
    计算机毕业设计Springboot服装租赁系统4jrvt1zd(配套有源码程序mysql数据库论文)本套源码可以先看具体功能演示视频领取,文末有联xi可分享随着时尚潮流的快速更迭,人们对于服装的需求日益多样化,但购买大量服装不仅成本高昂,还可能造成资源浪费。服装租赁系统应运而生,它为用户......
  • windows快速部署minIO,springboot快速集成
    一、Windows快速部署1.在MinIO官网下载Windows版本2.只需要下载minIOserver即可 3.在下载好的文件夹下打开cmd我是下载到了D:\MinIOminio.exeserverD:\MinIO\Data--console-address":9000"--address":9090" 在浏览器输入localhost:9000即可正常使用4.每次都要......
  • 计算机毕业设计Springboot洗衣店管理系统 基于SpringBoot的智能洗衣店管理系统 洗衣店
    计算机毕业设计Springboot洗衣店管理系统74t7o5qc(配套有源码程序mysql数据库论文)本套源码可以先看具体功能演示视频领取,文末有联xi可分享随着人们生活水平的提高和时间成本的增加,越来越多的人选择将洗衣服务外包给洗衣店。洗衣店行业逐渐发展壮大,但同时也面临着管理难题......
  • 计算机毕业设计Springboot线上考试系统的设计与实现 基于Springboot的在线考试系统设
    计算机毕业设计Springboot线上考试系统的设计与实现vi4cf717(配套有源码程序mysql数据库论文)本套源码可以先看具体功能演示视频领取,文末有联xi可分享随着信息技术的迅猛发展,教育领域也在逐步实现信息化。线上考试系统作为这一趋势下的产物,能够充分利用计算机和网络资源,实......