Spring Boot与消息
JMS、AMQP、RabbitMQ
1. 概述
消息服务的两个常见规范(消息代理规范):JMS、AMQP
JMS(Java Message Service)JAVA消息服务:
基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
AMQP(Advanced Message Queuing Protocol)高级消息队列协议
也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现
JMS和AMQP的对比如下:
消息服务中间件的应用场景:异步处理、应用解耦、流量削峰
消息服务中的两个重要概念:
1)消息代理:消息中间件的服务器
2)目的地:
目的地有两种形式:
队列(queue):点对点消息通信(point-to-point)
消息读取后被移出队列;消息有唯一的发送者和接受者(消息最终被谁消费谁就是接受者),但不一定只有一个接收者(凡是要接收指定消息的都是接收者)。
主题(topic):发布(publish)/订阅(subscribe)消息通信
凡是接收者都可以订阅到消息,也可以有多个接收者。
Spring支持:
spring-jms提供了对JMS的支持
spring-rabbit提供了对AMQP的支持
需要ConnectionFactory的实现来连接消息代理
提供JmsTemplate、RabbitTemplate来发送消息
@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
@EnableJms、@EnableRabbit开启支持
Spring Boot自动配置:
JmsAutoConfiguration
RabbitAutoConfiguration
2. RabbitMQ简介
由erlang
开发的AMQP
实现
2.1 核心概念
1)Message:消息;消息生产者(Publisher)给消息代理(Broker)发送的数据内容,由消息头和消息体组成,消息体不透明,消息头由一些可选属性组成,比如routing-key(路由键)(routing-key用来指定发给谁)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
2)Publisher:消息的生产者;一个向交换器发布消息的客户端应用程序。
3)Exchange:交换器;接收消息生产者(Publisher)发送的消息(Message)并将这些消息路由给服务器中的消息队列。有四种类型,direct(默认,实现JMS中的点对点消息模型),fanout, topic, 和headers,除direct外其他三种用于实现JMS中的发布/订阅消息模型。
4)Queue:消息队列;
5)Binding:绑定;交换器就是一个由绑定构成的路由表,也就是说每个绑定就标识着哪个路由键到哪个消息队列。Exchange 和Queue的绑定可以是多对多的关系。
6)Connection:网络连接;比如一个TCP连接
7)Channel:信道;一个TCP连接内部划分出多个信道,用于传输消息,节省资源。
8)Consumer:消息的消费者;一个从消息队列中取得消息的客户端应用程序。
9)Virtual Host:虚拟主机;RabbitMQ中划分为多个虚拟主机,每个虚拟主机就像一个个mini版的RabbitMQ,拥有自己的队列、交换器、绑定和权限机制。
10)Broker:消息代理;消息队列服务器实体
3. RabbitMQ运行机制
生产者(Publisher)发送消息(Message)到消息代理(Broker)虚拟主机(Virtual Host)的交换器(Exchange),交换器(Exchange)根据消息(Message)中的路由键(routing-key)判断要把这个消息(Message)路由到哪个队列内,这个路由规则就是通过绑定关系(Binding)来表示的,当消息(Message)到达消息队列(Queue)以后,消费者(Consumer)就可以从消息队列(Queue)中取出消息(Message)了,取出的过程是这样的,首先消费者(Consumer)和消息代理(Broker)建立起TCP链接(Connection),并在一条TCP链接(Connection)中开辟多个信道(Channel),从消息队列(Queue)中拿到的消息(Message)通过信道(Channel)传输给消费者(Consumer)。
3.1 Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别
目前共四种类型:direct、fanout、topic、headers 。
headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
3.1.1 Direct Exchange
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。单播
,点对点消息模型
3.1.2 Fanout Exchange
每个发到 Fanout 类型交换器的消息都会被转发到与该交换器绑定的所有队列上。广播
,发布/订阅消息模型
3.1.3 Topic Exchange
根据路由键的规则匹配,有选择性的进行广播。单词之间用点分隔,#匹配0个或多个单词,*匹配一个单词。
4. 使用Docker安装RabbitMQ
镜像网站:https://hub.docker.com/_/rabbitmq?tab=tags
以management结尾的RabbitMQ镜像是带着web的管理界面的
#拉取镜像
docker pull rabbitmq:3-management
#启动镜像
#5672端口:客户端和RabbitMQ通信的端口
#15672端口:访问web管理页面端口
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 7d89798140f9
通过15672端口访问web管理页面,默认的用户名和密码为:guest
5. web端测试
新建三个Exchange,分别是exchange.direct、exchange.fanout、exchange.topic,type分别是direct、fanout和topic,新建四个Queue,分别是atguigu、atguigu.news、atguigu.emps、gulixueyuan.news,分别进行Exchange和Queue的绑定,并发送消息进行测试。通过web管理页面进行操作即可。
不给出完整的测试过程,只有几个需要注意的点:
1)对于Fanout Exchange,无论绑定中设置的Routing key和消息头中的Routing key是否匹配,消息都会发送到所有绑定中的消息队列。
2)对于Topic Exchange,绑定中的Routing key命名和对应的消息队列命名之间是没有任何关系的,只会根据消息头中的Routing key和绑定中的Routing key进行匹配,然后发送消息到绑定中对应的消息队列中。
3)添加新的Exchange时会有一个Durability字段,含义如下
Durability:是否是可持久化的
Durable:RabbitMQ重启,交换器还在
Transient:否则,重启后就不存在了
6. RabbitMQ整合
6.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.2 全局配置
常用的配置项有以下几种:
spring:
rabbitmq:
host: 192.168.37.128
username: guest
password: guest
# virtual-host和port默认配置就是正确的,如果有差异才需要配置
# virtual-host: /
# port: 5672
6.3 自动配置
自动配置类:RabbitAutoConfiguration
1)自动配置了连接工厂ConnectionFactory
2)RabbitProperties 封装了RabbitMQ的配置
3)RabbitTemplate:给RabbitMQ发送和接受消息;
//Message需要自己构造一个;定制消息体内容和消息头
rabbitTemplate.send( exchage , routeKey, message);
//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
rabbitTemplate.convertAndSend( exchage, routeKey, object);
4)AmqpAdmin:RabbitMQ系统管理功能组件,创建和删除消息队列、交换器、绑定等
6.4 单播
@RunWith(SpringRunner.class)
@SpringBootTest
public class CRUDTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testRabbitMQ(){
Map<String, Object> map = new HashMap<String, Object>();
map.put("1", "HelloWorld");
map.put("我来啦", Arrays.asList("dshgfdshfsgf",123));
rabbitTemplate.convertAndSend("exchange.direct", "atguigu.emps", map);
}
@Test
public void testRevieve(){
//recieveAndConvert:消息体直接转化为我们想要的对象
Object o = rabbitTemplate.receiveAndConvert("atguigu.emps");
System.out.println(o.getClass());
System.out.println(o);
}
}
默认存入消息队列中的数据如下,样式不好看
将数据自动转为JSON发送出去
自定义一个配置类,如下,我们不用默认的MessageConverter
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
更换MessageConverter后,还支持发送自定义的对象
@RunWith(SpringRunner.class)
@SpringBootTest
public class CRUDTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testRabbitMQ(){
Map<String, Object> map = new HashMap<String, Object>();
map.put("1", "HelloWorld");
map.put("我来啦", Arrays.asList("dshgfdshfsgf",123));
rabbitTemplate.convertAndSend("exchange.direct", "atguigu.emps", new Department(12, "开发部"));
}
@Test
public void testRevieve(){
Object o = rabbitTemplate.receiveAndConvert("atguigu.emps");
System.out.println(o.getClass());
System.out.println(o);
}
}
6.5 广播
@RunWith(SpringRunner.class)
@SpringBootTest
public class CRUDTest {
@Autowired
RabbitTemplate rabbitTemplate;
//广播,只需要将exchange替换为Fanout类型的exchange
@Test
public void testGuangBo(){
rabbitTemplate.convertAndSend("exchange.fanout", "", new Department(12, "开发部"));
}
}
6.6 应用解耦场景应用
业务描述:通过testGuangBo测试方法描述订单系统,通过DepartmentService描述库存系统,DepartmentService监听某个消息队列,如果订单系统(testGuangBo)向消息队列中放消息,DepartmentService自动取出消息并调用相应的方法。
1)首先需要在主配置类中开启基于注解的RabbitMQ模式
@EnableRabbit //开启基于注解的RabbitMQ模式
@EnableCaching
@MapperScan("com.atguigu.springboot.mapper")
@SpringBootApplication
public class SpringBootCacheApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootCacheApplication.class, args);
}
}
2)订单系统
//描述订单系统行为
@Test
public void testGuangBo(){
rabbitTemplate.convertAndSend("exchange.direct", "atguigu", new Department(12, "开发部"));
}
3)库存系统
//描述库存系统行为
//@RabbitListener(queues = "atguigu"):指定监听atguigu消息队列,当消息队列中有消息来的时候就调用该方法,参数Department是由消息反序列化得到的
@RabbitListener(queues = "atguigu")
public void listenDepartment(Department department){
System.out.println("收到的信息是:" + department);
}
两个系统分别执行,当消息队列中有消息过来时,就会调用相应的方法。
上述是库存系统中方法的参数为对象时的情景,如果想要获取请求头/请求体信息,可以将库存系统改写为如下
@RabbitListener(queues = "atguigu")
public void listenDept(Message message){
//获取请求体信息
System.out.println(message.getBody());
//获取请求头信息
System.out.println(message.getMessageProperties());
}
6.7 AmqpAdmin管理组件的使用
@Test
public void testAmqpAdmin(){
//AmqpAdmin中declareXXX方法是创建方法
amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));
amqpAdmin.declareQueue(new Queue("amqpAdmin.queue"));
//"amqpAdmin.queue":目的地
//Binding.DestinationType.QUEUE:目的地类型,消息队列或交换器
//"amqpAdmin.exchange":要为哪个交换器添加绑定
//"hhh":路由键
//null:参数头信息
amqpAdmin.declareBinding(new Binding("amqpAdmin.queue", Binding.DestinationType.QUEUE, "amqpAdmin.exchange", "hhh", null));
}
标签:交换器,SpringBoot,Exchange,队列,1x,Boot,RabbitMQ,消息,public
From: https://www.cnblogs.com/wzzzj/p/18039118