一、消息队列的概念及应用场景
什么是消息队列
消息是在不同应用间传递的数据。这里的消息可以非常简单,比如只包含字符串,也可以非常复杂,包含多个嵌套的对象。消息队列(Message Queue)简单来说就是一种应用程序间的通讯方式,消息发送后立即返回,然后由消息系统保证消息的可靠性传输,消息生产者只需要把消息发到 MQ 中就可以了,不需要关心消息的消费,同样,消息消费者只管从 MQ 中拉取消息而不管是谁生产的消息,通过这样的一个“互相不知道对象存在”模式,将消息的生产者和消息的消费者解耦了。
什么场景下考虑使用消息队列
从上面可以知道,消息队列是一种应用间的异步协作机制,那么我们什么时候需要用到 MQ 呢?以常见的订单系统为例,当用户点击「下单」后的业务逻辑可能包括:扣减库存、生成相应订单数据、发短信通知等。在项目和业务发展初期上面这些逻辑可能放在一起执行,随着业务的发展订单量的增加,需要提升系统服务的性能,此时就可以将一些不需要立即生效的操作拆分出来异步执行,比如发送短信通知等。这种场景下就可以使用 MQ ,在下单主流程(比如扣减库存、生成订单数据等)完成之后发送一条消息到 MQ 让主流程快速走完,然后由另外一个线程拉取 MQ 的消息,执行相应的业务逻辑。这里的例子主要是用消息队列来解耦。
二、RabbitMQ 的基本概念和术语
名称 | 描述 |
---|---|
Message 消息 | 消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其它消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 |
Publisher 消息生产者 | 一个向交换机发送消息的客户端应用程序。 |
Exchange 交换器 | 用来接收生产者发送过来的消息,并将这些消息发送给服务器中的队列。 |
Binding 绑定 | 用于消息队列和交换器之间的关联,一个绑定就是一个基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 |
Queue 消息队列 | 用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点,一个消息可投入一个或多个队列,消息一直在队列里面,等待消费者连接到这个队列并将其取走。 |
Connection 网络连接 | 比如一个 TCP 连接。 |
Channel 信道 | 多路复用连接中的一条独立双向数据流通道,信道是建立在真实 TCP 连接内的虚拟连接,AMQP 命令都是通过信道发送出去的,不管是发布消息、订阅消息还是接收消息,这些动作都是通过信道完成的。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 |
Consumer 消息的消费者 | 一个从消息队列中获取消息的客户端应用程序。 |
Virtual Host 虚拟主机 | 表示一批交换器、消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的对服务器域。每个 vhost 本质上是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 |
三、消息模式
- Direct Exchange(直接交换机):消息的 routing key(路由键)与 binding key(绑定键)完全匹配时才会被发送到对应的队列中。适合点对点通信,也支持广播。
- Fanout Exchange(扇形交换机):将所有发送到该交换机的消息广播到所有与该交换机绑定的队列中。适合广播通信。
- Topic Exchange(主题交换机):根据消息的 routing key 与 binding key 的模式匹配度,将消息发送到一个或多个队列中。支持模糊匹配,适合多条件匹配。
- Headers Exchange(头交换机):根据消息头(headers)中的属性(键值对)来匹配消息,并将消息发送到匹配的队列中。支持多条件匹配,但比较复杂,不常用。
四、Docker安装RabbitMQ
//查找镜像
docker search rabbitmq
//拉取镜像
//安装name为rabbitmq的这里是直接安装最新的,如果需要安装其他版本在rabbitmq后面跟上版本号即可 ;(如:docker pull rabbitMq:3.8.3-management)
docker pull rabbitmq
//创建挂载目录
mkdir /usr/rabbitMQ
cd /usr/rabbitMQ
mkdir config
//启动创建容器,并挂载配置文件(systemctl restart docker:测试随着docker重启服务)
docker run --name RabbitMQ -p 5672:5672 -p 15672:15672 -v /usr/rabbitMQ/config/:/etc/rabbitmq/ -d --restart=always rabbitmq
//启动可视化插件
//查看当前启动镜像
docker ps
//查看运行中的容器(4369 -- erlang发现端口5672 --client端通信端口,应用访问端口15672 -- 管理界面ui端口,控制台Web端口号25672 -- server间内部通信端口
docker exec -it ad60b1064cef /bin/bash #进入RabbitMQ镜像 ad60b1064cef:容器id
rabbitmq-plugins enable rabbitmq_management #启动可视化插件
//外部访问需要添加用户及权限(添加完用户端之后就可以在web端登录MQ,IP+端口15672)
//添加账号 rabbitmqctl add_user 账号 密码
rabbitmqctl add_user root root
//设置权限 rabbitmqctl set_permissions -p / 账号 ".*" ".*" ".*"
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
//设置角色rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl set_user_tags root administrator
安装日志插件
//查看当前启动镜像
docker ps
//查看运行中的容器(4369 -- erlang发现端口5672 --client端通信端口,应用访问端口15672 -- 管理界面ui端口,控制台Web端口号25672 -- server间内部通信端口
docker exec -it ad60b1064cef /bin/bash #进入RabbitMQ镜像 ad60b1064cef:容器id
//列出Trace插件
rabbitmq-plugins list
//启动Trace插件
rabbitmqctl trace_on
//启动日志插件命令
rabbitmq-plugins enable rabbitmq_tracing
五、Springboot整合RabbitMQ
简单使用
1、配置 Pom 包,主要是添加 spring-boot-starter-amqp
的支持
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置文件
配置 RabbitMQ 的安装地址、端口以及账户信息
spring.application.name=Spring-boot-rabbitmq
spring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
3、队列配置
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}
3、发送者
rabbitTemplate 是 Spring Boot 提供的默认实现
@component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
4、接收者
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
5、测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
}
注意,发送者和接收者的 queue name 必须一致,不然不能接收
多对多使用
一个发送者,N 个接收者或者 N 个发送者和 N 个接收者会出现什么情况呢?
一对多发送
对上面的代码进行了小改造,接收端注册了两个 Receiver,Receiver1 和 Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果
@Test
public void oneToMany() throws Exception {
for (int i=0;i<100;i++){
neoSender.send(i);
}
}
结果如下:
Receiver 1: Spring boot neo queue ****** 11
Receiver 2: Spring boot neo queue ****** 12
Receiver 2: Spring boot neo queue ****** 14
Receiver 1: Spring boot neo queue ****** 13
Receiver 2: Spring boot neo queue ****** 15
Receiver 1: Spring boot neo queue ****** 16
Receiver 1: Spring boot neo queue ****** 18
Receiver 2: Spring boot neo queue ****** 17
Receiver 2: Spring boot neo queue ****** 19
Receiver 1: Spring boot neo queue ****** 20
根据返回结果得到以下结论
一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多发送
复制了一份发送者,加入标记,在一百个循环中相互交替发送
@Test
public void manyToMany() throws Exception {
for (int i=0;i<100;i++){
neoSender.send(i);
neoSender2.send(i);
}
}
结果如下:
Receiver 1: Spring boot neo queue ****** 20
Receiver 2: Spring boot neo queue ****** 20
Receiver 1: Spring boot neo queue ****** 21
Receiver 2: Spring boot neo queue ****** 21
Receiver 1: Spring boot neo queue ****** 22
Receiver 2: Spring boot neo queue ****** 22
Receiver 1: Spring boot neo queue ****** 23
Receiver 2: Spring boot neo queue ****** 23
Receiver 1: Spring boot neo queue ****** 24
Receiver 2: Spring boot neo queue ****** 24
Receiver 1: Spring boot neo queue ****** 25
Receiver 2: Spring boot neo queue ****** 25
结论:和一对多一样,接收端仍然会均匀接收到消息
高级使用
对象的支持
Spring Boot 以及完美的支持对象的发送和接收,不需要格外的配置。
//发送者
public void send(User user) {
System.out.println("Sender object: " + user.toString());
this.rabbitTemplate.convertAndSend("object", user);
}
...
//接收者
@RabbitHandler
public void process(User user) {
System.out.println("Receiver object : " + user);
}
结果如下:
Sender object: User{name='neo', pass='123456'}
Receiver object : User{name='neo', pass='123456'}
Topic Exchange
topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key 自由的绑定不同的队列
首先对 topic 规则配置,这里使用两个队列来测试
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
使用 queueMessages 同时匹配两个队列,queueMessage 只匹配 “topic.message” 队列
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
这里使用了 A、B、C 三个队列绑定到 Fanout 交换机上面,发送端的 routing_key 写任何字符都会被忽略:
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
结果如下:
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg
结果说明,绑定到 fanout 交换机上面的队列都收到了消息
标签:neo,Spring,boot,Rabbitmq,queue,消息,Receiver From: https://www.cnblogs.com/pekah/p/17233920.html