首页 > 其他分享 >SpringBoot 1x 系列之(九)Spring Boot与消息

SpringBoot 1x 系列之(九)Spring Boot与消息

时间:2024-02-28 10:15:44浏览次数:32  
标签:交换器 SpringBoot Exchange 队列 1x Boot RabbitMQ 消息 public

Spring Boot与消息

JMS、AMQP、RabbitMQ

1. 概述

消息服务的两个常见规范(消息代理规范):JMS、AMQP

JMS(Java Message Service)JAVA消息服务:

​ 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

AMQP(Advanced Message Queuing Protocol)高级消息队列协议

​ 也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现

JMS和AMQP的对比如下:

消息服务中间件的应用场景:异步处理、应用解耦、流量削峰

消息服务中的两个重要概念:

1)消息代理:消息中间件的服务器

2)目的地:

目的地有两种形式:

队列(queue):点对点消息通信(point-to-point)

​ 消息读取后被移出队列;消息有唯一的发送者和接受者(消息最终被谁消费谁就是接受者),但不一定只有一个接收者(凡是要接收指定消息的都是接收者)。

主题(topic):发布(publish)/订阅(subscribe)消息通信

​ 凡是接收者都可以订阅到消息,也可以有多个接收者。

Spring支持:

​ spring-jms提供了对JMS的支持

​ spring-rabbit提供了对AMQP的支持

​ 需要ConnectionFactory的实现来连接消息代理

​ 提供JmsTemplate、RabbitTemplate来发送消息

​ @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息

​ @EnableJms、@EnableRabbit开启支持

Spring Boot自动配置:

​ JmsAutoConfiguration

​ RabbitAutoConfiguration

2. RabbitMQ简介

erlang开发的AMQP实现

2.1 核心概念

1)Message:消息;消息生产者(Publisher)给消息代理(Broker)发送的数据内容,由消息头和消息体组成,消息体不透明,消息头由一些可选属性组成,比如routing-key(路由键)(routing-key用来指定发给谁)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2)Publisher:消息的生产者;一个向交换器发布消息的客户端应用程序。

3)Exchange:交换器;接收消息生产者(Publisher)发送的消息(Message)并将这些消息路由给服务器中的消息队列。有四种类型,direct(默认,实现JMS中的点对点消息模型),fanout, topic, 和headers,除direct外其他三种用于实现JMS中的发布/订阅消息模型。

4)Queue:消息队列;

5)Binding:绑定;交换器就是一个由绑定构成的路由表,也就是说每个绑定就标识着哪个路由键到哪个消息队列。Exchange 和Queue的绑定可以是多对多的关系。

6)Connection:网络连接;比如一个TCP连接

7)Channel:信道;一个TCP连接内部划分出多个信道,用于传输消息,节省资源。

8)Consumer:消息的消费者;一个从消息队列中取得消息的客户端应用程序。

9)Virtual Host:虚拟主机;RabbitMQ中划分为多个虚拟主机,每个虚拟主机就像一个个mini版的RabbitMQ,拥有自己的队列、交换器、绑定和权限机制。

10)Broker:消息代理;消息队列服务器实体

3. RabbitMQ运行机制

​ 生产者(Publisher)发送消息(Message)到消息代理(Broker)虚拟主机(Virtual Host)的交换器(Exchange),交换器(Exchange)根据消息(Message)中的路由键(routing-key)判断要把这个消息(Message)路由到哪个队列内,这个路由规则就是通过绑定关系(Binding)来表示的,当消息(Message)到达消息队列(Queue)以后,消费者(Consumer)就可以从消息队列(Queue)中取出消息(Message)了,取出的过程是这样的,首先消费者(Consumer)和消息代理(Broker)建立起TCP链接(Connection),并在一条TCP链接(Connection)中开辟多个信道(Channel),从消息队列(Queue)中拿到的消息(Message)通过信道(Channel)传输给消费者(Consumer)。

3.1 Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别

目前共四种类型:direct、fanout、topic、headers 。

​ headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了

3.1.1 Direct Exchange

​ 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。单播点对点消息模型

3.1.2 Fanout Exchange

​ 每个发到 Fanout 类型交换器的消息都会被转发到与该交换器绑定的所有队列上。广播发布/订阅消息模型

3.1.3 Topic Exchange

​ 根据路由键的规则匹配,有选择性的进行广播。单词之间用点分隔,#匹配0个或多个单词,*匹配一个单词。

4. 使用Docker安装RabbitMQ

镜像网站:https://hub.docker.com/_/rabbitmq?tab=tags

以management结尾的RabbitMQ镜像是带着web的管理界面的

#拉取镜像
docker pull rabbitmq:3-management
#启动镜像
#5672端口:客户端和RabbitMQ通信的端口
#15672端口:访问web管理页面端口
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 7d89798140f9

通过15672端口访问web管理页面,默认的用户名和密码为:guest

5. web端测试

​ 新建三个Exchange,分别是exchange.direct、exchange.fanout、exchange.topic,type分别是direct、fanout和topic,新建四个Queue,分别是atguigu、atguigu.news、atguigu.emps、gulixueyuan.news,分别进行Exchange和Queue的绑定,并发送消息进行测试。通过web管理页面进行操作即可。

不给出完整的测试过程,只有几个需要注意的点:

1)对于Fanout Exchange,无论绑定中设置的Routing key和消息头中的Routing key是否匹配,消息都会发送到所有绑定中的消息队列。

2)对于Topic Exchange,绑定中的Routing key命名和对应的消息队列命名之间是没有任何关系的,只会根据消息头中的Routing key和绑定中的Routing key进行匹配,然后发送消息到绑定中对应的消息队列中。

3)添加新的Exchange时会有一个Durability字段,含义如下

Durability:是否是可持久化的

​ Durable:RabbitMQ重启,交换器还在

​ Transient:否则,重启后就不存在了

6. RabbitMQ整合

6.1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

6.2 全局配置

常用的配置项有以下几种:

spring:
  rabbitmq:
    host: 192.168.37.128
    username: guest
    password: guest
# virtual-host和port默认配置就是正确的,如果有差异才需要配置
#    virtual-host: /
#    port: 5672

6.3 自动配置

自动配置类:RabbitAutoConfiguration
1)自动配置了连接工厂ConnectionFactory
2)RabbitProperties 封装了RabbitMQ的配置
3)RabbitTemplate:给RabbitMQ发送和接受消息;

//Message需要自己构造一个;定制消息体内容和消息头
rabbitTemplate.send( exchage , routeKey, message);

//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
rabbitTemplate.convertAndSend( exchage, routeKey, object);

4)AmqpAdmin:RabbitMQ系统管理功能组件,创建和删除消息队列、交换器、绑定等

6.4 单播

@RunWith(SpringRunner.class)
@SpringBootTest
public class CRUDTest {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void testRabbitMQ(){
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("1", "HelloWorld");
        map.put("我来啦", Arrays.asList("dshgfdshfsgf",123));
        rabbitTemplate.convertAndSend("exchange.direct", "atguigu.emps", map);
    }

    @Test
    public void testRevieve(){
      	//recieveAndConvert:消息体直接转化为我们想要的对象
        Object o = rabbitTemplate.receiveAndConvert("atguigu.emps");
        System.out.println(o.getClass());
        System.out.println(o);
    }
}

默认存入消息队列中的数据如下,样式不好看

将数据自动转为JSON发送出去

自定义一个配置类,如下,我们不用默认的MessageConverter

@Configuration
public class RabbitMQConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

更换MessageConverter后,还支持发送自定义的对象

@RunWith(SpringRunner.class)
@SpringBootTest
public class CRUDTest {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void testRabbitMQ(){
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("1", "HelloWorld");
        map.put("我来啦", Arrays.asList("dshgfdshfsgf",123));
        rabbitTemplate.convertAndSend("exchange.direct", "atguigu.emps", new Department(12, "开发部"));
    }

    @Test
    public void testRevieve(){
        Object o = rabbitTemplate.receiveAndConvert("atguigu.emps");
        System.out.println(o.getClass());
        System.out.println(o);
    }
}

6.5 广播

@RunWith(SpringRunner.class)
@SpringBootTest
public class CRUDTest {

    @Autowired
    RabbitTemplate rabbitTemplate;
  
  	//广播,只需要将exchange替换为Fanout类型的exchange
    @Test
    public void testGuangBo(){
        rabbitTemplate.convertAndSend("exchange.fanout", "", new Department(12, "开发部"));
    }
}

6.6 应用解耦场景应用

​ 业务描述:通过testGuangBo测试方法描述订单系统,通过DepartmentService描述库存系统,DepartmentService监听某个消息队列,如果订单系统(testGuangBo)向消息队列中放消息,DepartmentService自动取出消息并调用相应的方法。

1)首先需要在主配置类中开启基于注解的RabbitMQ模式

@EnableRabbit	//开启基于注解的RabbitMQ模式
@EnableCaching
@MapperScan("com.atguigu.springboot.mapper")
@SpringBootApplication
public class SpringBootCacheApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBootCacheApplication.class, args);
    }
}

2)订单系统

//描述订单系统行为
@Test
public void testGuangBo(){
    rabbitTemplate.convertAndSend("exchange.direct", "atguigu", new Department(12, "开发部"));
}

3)库存系统

//描述库存系统行为
//@RabbitListener(queues = "atguigu"):指定监听atguigu消息队列,当消息队列中有消息来的时候就调用该方法,参数Department是由消息反序列化得到的
@RabbitListener(queues = "atguigu")
public void listenDepartment(Department department){
    System.out.println("收到的信息是:" + department);
}

两个系统分别执行,当消息队列中有消息过来时,就会调用相应的方法。

上述是库存系统中方法的参数为对象时的情景,如果想要获取请求头/请求体信息,可以将库存系统改写为如下

@RabbitListener(queues = "atguigu")
public void listenDept(Message message){
  	//获取请求体信息
    System.out.println(message.getBody());
  	//获取请求头信息
    System.out.println(message.getMessageProperties());
}

6.7 AmqpAdmin管理组件的使用

@Test
public void testAmqpAdmin(){
  	//AmqpAdmin中declareXXX方法是创建方法
    amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));
    amqpAdmin.declareQueue(new Queue("amqpAdmin.queue"));
  	//"amqpAdmin.queue":目的地
  	//Binding.DestinationType.QUEUE:目的地类型,消息队列或交换器
  	//"amqpAdmin.exchange":要为哪个交换器添加绑定
  	//"hhh":路由键
  	//null:参数头信息
    amqpAdmin.declareBinding(new Binding("amqpAdmin.queue", Binding.DestinationType.QUEUE, "amqpAdmin.exchange", "hhh", null));
}

标签:交换器,SpringBoot,Exchange,队列,1x,Boot,RabbitMQ,消息,public
From: https://www.cnblogs.com/wzzzj/p/18039118

相关文章

  • SpringBoot 1x 系列之(八)Spring Boot与缓存
    SpringBoot与缓存JSR-107、Spring缓存抽象、整合Redis缓存:加速系统访问,提升系统性能热点数据、临时数据(如验证码)1.JSR-1071.1背景统一缓存的开发规范及提升系统的扩展性,J2EE发布了JSR-107缓存规范1.2JSR107简介CacheManager与Cache的关系,类比连接池与连接涉及的包ja......
  • SpringBoot 1x 系列之(七)自定义starter
    自定义starterstarters原理、自定义starters如何自定义starter:​ 1、这个场景需要使用到的依赖是什么?​ 2、如何编写自动配置@Configuration//指定这个类是一个配置类@ConditionalOnXXX//在指定条件成立的情况下自动配置类生效@AutoConfigureAfter//指定自动配置类的......
  • SpringBoot 1x 系列之(六)Spring Boot启动配置原理
    SpringBoot启动配置原理启动原理、运行流程、自动配置原理几个重要的事件回调机制(这几个事件回调机制可供我们进行干预)配置在META-INF/spring.factoriesApplicationContextInitializerSpringApplicationRunListener只需要放在ioc容器中(@Component标注)ApplicationRunnerCo......
  • SpringBoot 1x 系列之(五)SpringBoot与数据访问
    SpringBoot与数据访问JDBC、MyBatis、SpringDataJPASpringBoot底层是使用的SpringData作为数据访问的默认处理方式。1.整合基本JDBC与数据源Pom依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId><......
  • SpringBoot 1x 系列之(四)Spring Boot与Web开发
    SpringBoot与Web开发Thymeleaf、Web定制、容器定制1.如何使用SpringBoot创建SpringBoot应用,选中我们需要的模块SpringBoot已经默认将这些场景配置好了,我们只需要在配置文件中指定少量配置就可以运行起来编写业务代码2.SpringBoot对静态资源的映射规则普通的web应用......
  • SpringBoot 1x 系列之(三)SpringBoot与日志
    SpringBoot与日志日志框架、日志配置1.日志框架JDBC和数据库驱动:JDBC是统一的接口层(抽象层),面向JDBC进行开发,而不直接面向数据库驱动,这样的好处是数据库驱动会不断的出现新产品,如果直接面向数据库驱动开发,那么,每次更换数据库驱动,开发的代码就要做相应的调整,而面向JDBC开发,不管......
  • SpringBoot 1x 系列之(二)SpringBoot 配置
    SpringBoot配置配置文件、加载顺序、配置原理1.配置文件SpringBoot默认使用两种类型的配置文件作为一个全局配置文件,配置文件名固定,用于修改SpringBoot自动配置的默认值application.propertiesapplication.y(a)ml1.1YAML简介YAML(YAMLAin'tMarkupLanguage)递归缩写......
  • SpringBoot 1x 系列之(一)SpringBoot 入门
    SpringBoot入门SpringBoot和微服务概念的简介、SpringBootHelloWorld入门程序、内部原理1.SpringBoot简介简化Spring应用开发(整个J2EE开发)的一个框架整个Spring技术栈的一个大整合.........J2EE开发的一站式解决方案注:SpringBoot使用嵌入式的Servlet容器,应用无......
  • spring boot 中使用MybatisPlus的自动填充createTime和updateTime
    首先需要在实体类的字段上加上注解,并且将类型更改为LocalDateTime@TableField(fill=FieldFill.INSERT)@JsonInclude(value=JsonInclude.Include.NON_NULL)@JsonFormat(pattern="yyyy-MM-ddHH:mm:ss")privateLocalDateTimecreateTime;@TableFie......
  • springboot学习过程中的特殊错误
     这是我在学习使用springboot过程中遇到的一个小问题,询问了gpt但是并没有解决我的报错,在网上浏览信息后最终知道了是哪里出了问题 就如这个好哥哥说的一样,mybatis自带的方法不会出现问题,所以问题出在了实体类定义上面,加了@Tableld的注解就解决了问题。......