RabbitMQ
消息队列是实现应用程序与应用程序进行通信的中间件产品,可以实现各个微服务之间的异步操作。
好处:降低系统的耦合度、(用户服务的)快速响应、削峰限流、减少并发压力、便于系统功能的拓展。
RabbitMQ的体系结构介绍
1.Producer为生产者(发送消息),通过Channel与消息中间件Broker连接。
2.Broker中包含多个Virtual Host,每一个Virtual Host就是一个虚拟分组,用户在自己的Virtual Host中使用RabbitMQ组件。在实际开发中,通过Virtual Host区分不同项目、不同功能。
3.Virtual Host包含多个Exchange交换机,Exchange交换机是消息达到Broker的第一站。
4.Queue,消息队列,是消息的容器。消息放在这里等待被消费端取走。Exchange绑定一个或多个队列进行消息的发送。
5.Consumer消费者,通过Channel与Broker连接,监听一个或多个队列,若队列中有消息则会取走消费,(若成功消费发送确认信息)同时在队列中删除消息。
安装
# 拉取镜像
docker pull rabbitmq:3.13-management
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management
后台管理
http://192.168.200.100:15672为后台管理界面,可以创建队列,监控消息等。
1.RabbitMQ工作模式
RabbitMQ官网列举了7种RabbitMQ用法,其中Publish/Subscribe、Routing和Topic最为常见。
1.1 Publish/Subscribe 发布订阅模式
生产者不是把消息直接发送到队列,而是发送到交换机,交换机接收消息,而如何处理消息取决于交换机的类型。
- 交换机有如下3种常见类型
1.Fanout:广播,将消息发送给所有绑定到交换机的队列
2.Direct:定向,把消息交给符合指定routing key的队列
3.Topic:通配符,把消息交给符合routing pattern(路由模式)的队列。
消息发送到Fanout交换机上,就会以广播的形式发送给所有已绑定队列,而监听对应队列的消费者则可以从队列中取走消息。 - 注意
- 前两种简单的工作模式不是没有交换机,而是采用了默认的交换机。
- 若多个服务监听同一个队列,则会对消息产生争抢,谁抢到谁消费。
1.2 Routing
- 基本概念
1.通过路由绑定的方式,把交换机和队列关联起来,交换机和队列通过路由键(string)进行绑定。
2.生产者发送消息时不仅要指定交换机,还要指定路由键。
3.交换机接收到消息会发送到路由键绑定的队列
4..交换机的类型为:Direct
5.队列绑定交换机的时候需要指定routing key
1.3 Topics
- 基本概念
Topic类型与Direct相比,都是可以根据RoutingKey把消息通过路由发送到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用
通配符。 - 注意事项
1.Routingkey一般都是由一个或多个单词组成,多个单词之间以.分割,例如:item.insert。
2.通配符规则:#:匹配零个或多个词; *:匹配一个词
1.4 其他
RPC远程过程调用
本质上是同步调用,不是经典的消息队列的工作方式,和我们使用OpenFeign调用远程接口一样。
- 补充
RPC远程过程调用,对于Java而言简单来说就是远程方法调用,A服务想要调用B服务(在不同的线程甚至不同的服务器上)的方法,就要使用RPC。
A通过socket、websocket或者消息中间件等方法传递给B中方法所需的参数,b在本地调用后得到结果再返回,就完成了远程过程调用。
Publisher Confirms
发送端消息确认,再后续消息可靠性详细介绍。
2.RabbitMQ整合SpringBoot
2.1 大致流程
- RabbitMQ服务器启动RabbitMQ
- 建module、改pom、配yml、主程序
相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置项
spring:
rabbitmq:
host: xxx
port: xxx
username:xxx
password: xxx
virtual-host: /
logging:
level:
com.atguigu.mq.listener.MyMessageListener: info
- 配置类
用作RabbitTemplate的增强 - 业务代码开发
发送消息:使用RabbitTemplate
接收消息:使用@RabbitListener注解(processMessage方法的注解)
@QueueBinding绑定基本信息
@Queue给value赋值,QUEUE_NAME队列名,durable 为是否持久化
@Exchange给exchange 赋值,EXCHANGE_DIRECT为交换机名
key = {ROUTING_KEY} 设置路由键,此处用{}是因为可能有多个路由键,要以数组的方式赋值
3.消息可靠性
3.1 生产端确认
问题:生产端消息未发送成功,导致信息丢失
解决1:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送。
解决2:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机,再由备份交换机传输到绑定的队列上
3.2 持久化
问题:消息正常发送到队列,还未消费,MQ服务器宕机,导致信息丢失。
解决:消息持久化到硬盘上,服务器重启后直接加载,不会导致消息丢失。
创建交换机和队列时,默认为持久化队列,会自动对其中的消息进行持久化,无需专门设置。
若对消息的重要性要求低,可使用临时的交换机和队列(transient),此时消息不会被持久化,而且重启服务后临时的交换机与队列都会消失。
3.3 消费端确认
问题:生产者取走消息但消费失败,导致消息失效
解决:消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息。消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)
补充
1. 交付标签机制deliveryTag
每一个消息进入队列时,broker都会生成一个唯一标识deliveryTag,是一个64位整数,消息向消费者端进行投递时会携带该信息。
消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。而deliveryTag作为消息的唯一标识就很好的满足了这个需求。
对于不同队列中的同一条消息,会产生不同的deliveryTag!
2. 消息确认相关方法参数说明:multiple
在返回消息是参数multile若为true,则会处理当前消息及其在队列中之前的消息,而为了避免不必要的差错,我们一般都设置为false。
此时,NACK、Reject类似,都是返回否定消息,而reject不允许设置multile。
4.消费端限流
问题:若消费者端因为某些原因如 执行效率较慢、生产者生产消息的速度相对快、消费者端重启遇到了大量的积压消息等,都会一次性取走过多的消息导致消费者端的压力过大。
解决:配置prefetch 进行削峰限流
限制每次去走的消息数量,减缓消费者端的服务压力。
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 设置消费者端的回复确认为手动回复(默认为自动)!!
prefetch: 10 # 设置每次最多从消息队列服务器取回多少消息
5.消息超时
1.队列层面设置:该队列中所有的消息都会设置有效时间
2.消息层面设置
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
@Test
public void testSendMessageTTL() {
// 1、创建消息后置处理器对象
MessagePostProcessor messagePostProcessor = (Message message) -> {
// 设定 TTL 时间,以毫秒为单位
message.getMessageProperties().setExpiration("5000");
return message;
};
// 2、发送消息
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu", messagePostProcessor);
}
若消息与队列都设置了有效时间,按照短的计算。
6.死信
6.1 产生死信的三个原因
- 1.消息超时未消费
- 2.消费端取走后消费失败basicNack()/basicReject()且拒绝重新放入队列(requeue=false)
- 3.溢出:消息数量超过队列容纳极限(会将队头消息挤入死信队列)
6.2 死信交换机与死信队列
死信交换机与死信队列其实与常规的无异,若想要将某个队列的死信传入死信队列,需要以下步骤:
1.创建死信交换机与死信队列并绑定
2.创建常规交换机与交换队列并绑定
3.设置正常队列的dead-letter-exchange和dead-letter-routing-key参数,与死信交换机绑定
4.消费端监听死信队列,对死信做特殊处理
7.延迟队列
7.1 基于死信的延迟队列
如果想要做延时任务,可利用常规队列设置超时时间,再绑定死信队列,消费端只监听死信队列,从而实现目的。
7.2 延迟插件 rabbitmq_delayed_message_exchange
插件网址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下载启用后,在managementUI中创建交换机时,延时交换机类型设置为x-delayed-message,并额外设置x-delayed-type为direct、fanout、topic其中的一种。
生产者端正常发送带有ttl的消息给延时交换机
效果:使用rabbitmq_delayed_message_exchange插件后,即使该延时消息成功发送到队列上,也会导致returnedMessage()方法执行
8.事务
- 总结:
1.在生产者端使用事务消息和消费端没有关系
2.在生产者端使用事务消息仅仅是控制事务内的消息是否发送
3.提交事务就把事务内所有消息都发送到交换机
4.回滚事务则事务内任何消息都不会被发送
5.MQ的事务对消费者端无效!
9.惰性队列
9.1 惰性队列的特征
- 1.接收到消息后直接存入磁盘而非内存
- 2.消费者要消费消息时才会从磁盘中读取并加载到内存
- 3.支持数百万条的消息存储
9.2 如何创建惰性队列?
9.2.1 基于策略方式设定
# 登录Docker容器
docker exec -it rabbitmq /bin/bash
# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
- rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量
- set_policy是子命令,表示设置策略
- Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的
- "^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置
- '{"queue-mode":"lazy"}'是一个JSON格式的参数设置指定了队列的模式为"lazy"
- –-apply-to参数指定该策略将应用于队列(queues)级别
- 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列
9.2.2 在声明队列时使用参数设定
应用场景
由于各种原因,如消费者离线/崩溃/停机进行维护、突然出现消息进入高峰,生产者的速度超过了消费者、消费者比正常情况慢等原因导致的消息堆积。
10.优先级队列
设置优先级之后:优先级高的消息更大几率先投递而不是完全的先进先出。
10.1 如何创建优先级队列?
在创建队列时,设置x-max-priority(最大优先级数)即可,默认是0。
当x-max-priority为0时,给消息设置优先级也不会生效。
10.2 如何给消息设置优先级?
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
message.getMessageProperties().setPriority(1); //设置优先级为1
return message;
});
}
11.集群
采用MQ集群可以避免单点故障 大流量场景分摊负载 数据同步等功能。
创建集群
详见 Operation016-Cluster.md 文档。
异地容灾
1.联邦交换机
通过网络将两个关联的交换机消息进行传输同步(两个都有)。
2.Shovel插件
通过网络将一部分消息全部挖到另一台关联的服务器上,要配置shovel的消息源和接收者。
对比
- Shovel和Federation的主要区别:
- Shovel更简洁一些
- Federation更倾向于跨集群使用,而Shovel是否跨集群都可以
- Shovel源队列中的消息经过数据转移后相当于被消费了
12 java代码sample
1.配置类,用于增强RabbitTemplate,使得消息确认的消息可以自己发送,(同时要注意更新配置项acknowledge-mode: manual )
@Configuration
@Slf4j
//ConfirmCallback、ReturnsCallback 是 RabbitTemplate 的内部类(接口)
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
//自动注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//增强RabbitTemplate
@PostConstruct //在组件创建完成后立刻执行该方法,要求方法没有返回值,非私有,无参数
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this); //参数要求传入ConfirmCallback实现类,this就是
rabbitTemplate.setReturnsCallback(this); //参数要求传入ReturnsCallback实现类,this就是
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 消息发送到交换机成功或失败时调用这个方法
log.info("confirm() 回调函数打印 CorrelationData:" + correlationData);
log.info("confirm() 回调函数打印 ack:" + ack);
log.info("confirm() 回调函数打印 cause:" + cause);
}
@Override
public void returnedMessage(ReturnedMessage returned) {
// 发送到队列失败时才调用这个方法
log.info("returnedMessage() 回调函数 消息主体: " + new String(returned.getMessage().getBody()));
log.info("returnedMessage() 回调函数 应答码: " + returned.getReplyCode());
log.info("returnedMessage() 回调函数 描述:" + returned.getReplyText());
log.info("returnedMessage() 回调函数 消息使用的交换器 exchange : " + returned.getExchange());
log.info("returnedMessage() 回调函数 消息使用的路由键 routing : " + returned.getRoutingKey());
}
}
2.生产者模块
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String EXCHANGE_TIMEOUT = "exchange.test.timeout";
public static final String ROUTING_KEY = "order";
public static final String ROUTING_KEY_TIMEOUT = "routing.key.test.timeout";
public static final String EXCHANGE_NORMAL = "exchange.normal.video";
public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
public static final String EXCHANGE_DELAY = "exchange.test.delay";
public static final String ROUTING_KEY_DELAY = "routing.key.test.delay";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test01SendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY + "~", "Message Test Confirm~~~ ~~~");
}
@Test
public void test02SendMessage() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Test Prefetch " + i);
}
}
@Test
public void test03SendMessage() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout " + i);
}
}
@Test
public void test04SendMessage() {
// 创建消息后置处理器对象
MessagePostProcessor postProcessor = message -> {
// 设置消息的过期时间,单位是毫秒
message.getMessageProperties().setExpiration("7000");
return message;
};
rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout", postProcessor);
}
@Test
public void testSendMultiMessage() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况2:消息数量超过队列的最大容量" + i);
}
}
@Test
public void test05SendMessageDelay() {
// 创建消息后置处理器对象
MessagePostProcessor postProcessor = message -> {
// 设置消息过期时间(以毫秒为单位)
// x-delay 参数必须基于 x-delayed-message-exchange 插件才能生效
message.getMessageProperties().setHeader("x-delay", "10000");
return message;
};
// 发送消息
rabbitTemplate.convertAndSend(
EXCHANGE_DELAY,
ROUTING_KEY_DELAY,
"Test delay message by plugin " + new SimpleDateFormat("HH:mm:ss").format(new Date()),
postProcessor);
}
public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
@Test
public void test06SendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "message test proirity 3", message -> {
// 消息本身的优先级数值
// 切记:不能超过 x-max-priority: 10
message.getMessageProperties().setPriority(3);
return message;
});
}
}
3.消费者模块
package com.atguigu.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class MyMessageListener {
public static final String QUEUE_NAME = "queue.order";
public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
public static final String QUEUE_DELAY = "queue.test.delay";
// @RabbitListener(queues = {QUEUE_NAME})
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
// 获取当前消息的 deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 核心操作
log.info("消费端 消息内容:" + dataString);
System.out.println(10 / 0);
// 核心操作成功:返回 ACK 信息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 获取当前消息是否是重复投递的
// redelivered 为 true:说明当前消息已经重复投递过一次了
// redelivered 为 false:说明当前消息是第一次投递
Boolean redelivered = message.getMessageProperties().getRedelivered();
// 核心操作失败:返回 NACK 信息
// requeue 参数:控制消息是否重新放回队列
// 取值为 true:重新放回队列,broker 会重新投递这个消息
// 取值为 false:不重新放回队列,broker 会丢弃这个消息
if (redelivered) {
// 如果当前消息已经是重复投递的,说明此前已经重试过一次啦,所以 requeue 设置为 false,表示不重新放回队列
channel.basicNack(deliveryTag, false, false);
} else {
// 如果当前消息是第一次投递,说明当前代码是第一次抛异常,尚未重试,所以 requeue 设置为 true,表示重新放回队列在投递一次
channel.basicNack(deliveryTag, false, true);
}
// reject 表示拒绝
// 辨析:basicNack() 和 basicReject() 方法区别
// basicNack()能控制是否批量操作
// basicReject()不能控制是否批量操作
// channel.basicReject(deliveryTag, true);
}
}
@RabbitListener(queues = {QUEUE_NAME})
public void processMessageTestPrefetch(String dataString, Message message, Channel channel) throws IOException, InterruptedException {
log.info("消费端 消息内容:" + dataString);
TimeUnit.SECONDS.sleep(1);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {
// 监听正常队列,但是拒绝消息
log.info("★[normal]消息接收到,但我拒绝。");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
// 监听死信队列
log.info("★[dead letter]dataString = " + dataString);
log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = {QUEUE_DELAY})
public void processMessageDelay(String dataString, Message message, Channel channel) throws IOException {
log.info("[delay message][消息本身]" + dataString);
log.info("[delay message][当前时间]" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
public static final String QUEUE_PRIORITY = "queue.test.priority";
@RabbitListener(queues = {QUEUE_PRIORITY})
public void processMessagePriority(String dataString, Message message, Channel channel) throws IOException {
log.info("[priority]" + dataString);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
标签:队列,笔记,死信,交换机,RabbitMQ,message,public,消息
From: https://www.cnblogs.com/zlzw1/p/18361670