如果一个队列设置了死信交换机,该队列的消息就有了极大的可靠性保障,当出现以下情况时,消息就会投递到死信交换机中:
- 队列中的消息在被消费者处理后,抛出异常,返回了 nack 或者 reject
- 如果队列设置了 ttl 或者消息本身设置了 ttl ,消息因为超时而未消费
- 队列容量已经满了,后续发来的消息无法接收,直接被丢弃
有了死信交换机后,在保障消息的可靠性同时,也极大的简化了代码编写。死信交换机结合 ttl 队列也能够实现消息的延迟发送。由于延迟发送消息的使用场景非常广泛,因此 RabbitMQ 专门提供了延迟插件,进一步简化了延迟发送消息的代码。
队列满了,主要是因为服务器内存不足,这种情况其实是比较危险的,说明生产消息的速度超过了消费速度,已经造成了大量消息的累积。虽然 Spring AMQP 发送消息默认设置为持久化,但是当服务器内存不足时,会集中将一批消息持久化到硬盘中,此时会消耗一定的时间,造成 RabbitMQ 的不稳定,有可能会造成新生产的消息无法及时入队而丢失。当然增加消费者程序来提高消费速度是最佳解决方案,但是如果短期内无法增加消费者程序,我们可以将队列迅速变为惰性队列,将消息持续稳定的持久化到硬盘上,就可以确保消息可靠性,由于硬盘容量很大,将在足够长的时间内不会出现队列爆满的问题。
本篇博客主要通过代码的方式演示 RabbitMQ 以上消息可靠性保障措施,在博客的最后会提供源代码下载。
一、搭建工程
继续采用上篇博客通过 docker-compose 部署的 RabbitMQ 进行演示,搭建 SpringBoot 工程如下:
两个子工程没有引入任何依赖,主要使用父工程 pom 文件的依赖,父工程 pom 文件细节如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jobs</groupId>
<artifactId>rmq_dead_delay</artifactId>
<packaging>pom</packaging>
<version>1.0</version>
<modules>
<module>publish_msg</module>
<module>consumer_msg</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
</parent>
<dependencies>
<!--在此主要使用 lombok 自带的 log 方法-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Spring AMQP 消息队列依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--引入单元测试依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
二、死信交换机
死信交换机是在队列上进行设置,只需要在队列上增加 2 个参数设置即可:
- x-dead-letter-exchange 参数设置死信交换机的名称
- x-dead-letter-routing-key 参数设置私信交换机的路由 key
在上篇博客中,消费者本地重试次数耗尽后,我们设置的策略是投递到新的交换机中。有了死信交换机后,我们就不需要这么做了,本地重试次数耗尽后,仍然使用默认的丢弃策略,此时被丢弃的消息会通过死信交换机投递到其它队列中进行处理。
//用于死信交换机投递消息,当别的队列中消息处理错误被丢弃,或者超时未消费,则在该队列中进行处理
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "error.queue"),
exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
key = "error"
))
public void listenerDeadQueue(String msg) {
log.info("接收到 dead.queue 消息:" + msg);
//在这里可以记录日志,或将消息存储到数据库中,后续由人工进行干预处理
}
//接收到消息后,处理程序会报异常
//在 application.yml 中配置了本地重试 3 次,如果都是失败,默认会丢弃消息,返回 basic.reject
//由于已经为该队列设置了死信交换机和相应的路由 key,因此最终失败的消息会投递到 error.queue 中
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "data.queue", arguments = {
//在队列上设置【死信交换机】和【路由key】
@Argument(name = "x-dead-letter-exchange", value = "test.exchange"),
@Argument(name = "x-dead-letter-routing-key", value = "error")}),
exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
key = "data"
))
public void listenerDataQueue(String msg) {
log.info("接收到 data.queue 消息:" + msg);
//以下代码会抛出异常
Integer result = 1 / 0;
}
发送消息进行测试的代码如下:
//发送到 dead.queue
//消费者接收到消息后,处理过程中出现异常,本地重试 3 次,一共处理 4 次都抛异常,
//最终由死信交换机投递到 error.queue
@Test
void publishDeadTest() {
String message = "dead message test";
String exchange = "test.exchange";
String rootingkey = "data";
//发送消息
rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}
三、TTL 队列
一个队列可以设置 ttl 时间,表示消息最多可以在队列中的存放时间,如果过期未消费就会被丢弃。对于一个消息来说,也可以设置 ttl 时间,表示该消息的存活时间,过期未消费也会被丢弃。如果队列和消息同时设置了 ttl 时间,则以最短的 ttl 时间为准。
package com.jobs.listener;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TTLQueueConfig {
//在 SpringAmqpListener 中通过注解绑定交换机和队列时,必须要有消息处理程序。
//为了测试发送到队列中的消息超时后,自动由死信交换机投递到指定队列中,我们不需要消息处理程序
//因此只能采用 @Bean 注解去声明交换机和队列并进行绑定。
//如果在 SpringAmqpListener 设置 ttl 队列的话,只需要给队列设置 x-message-ttl 参数即可
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.exchange");
}
@Bean
public Queue ttlQueue(){
return QueueBuilder
.durable("ttl.queue")
//设置队列中每条消息的存活时间只有 10 秒钟
.ttl(10000)
//给队列设置【死信交换机】和【路由key】
.deadLetterExchange("test.exchange")
.deadLetterRoutingKey("error")
.build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
发送消息测试队列的 ttl 和消息的 ttl
//发送普通消息到 ttl.queue
//由于我们创建的 ttl.queue 具有以下 3 个特征:
//1 每条消息的存活时间只有 10 秒
//2 没有消息处理程序
//3 设置了死信交换机和路由 key
//因此 10 秒之后,消息就会被死信交换机投递到 error.queue
@Test
void publish_Normal_To_TTL_Test() {
String message = "normal message to ttl queue test";
String exchange = "ttl.exchange";
String rootingkey = "ttl";
//发送消息
rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}
//消息本身也可以设置 ttl 过期时间
//如果【消息】和【队列】都设置了 ttl 过期时间,以最短的 ttl 过期时间为准
//这里我们将消息的 ttl 过期时间设置为 5 秒,短于队列的 ttl 过期时间
//因此 5 秒之后,会被死信交换机投递到 error.queue 中
@Test
void publish_ttlmsg_To_TTL_Test() {
String exchange = "ttl.exchange";
String rootingkey = "ttl";
Message message = MessageBuilder
.withBody("ttl messsage to ttl queue test".getBytes(StandardCharsets.UTF_8))
//设置消息的 ttl 过期时间为 5 秒
.setExpiration("5000").build();
//发送消息
rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}
通过上面的代码和实际执行效果可以发现:死信交换机和 ttl 队列相结合可以实现延迟发送消息的功能。
四、延迟队列
通过死信交换机和 ttl 队列配合实现延迟发送消息的方案,太过于繁琐。由于延迟发送消息的场景比较广泛,RabbitMQ 提供了延迟插件,可以将任意交换机设置为延迟交换机。当发送给延迟交换机的消息包含 x-delay 头时,会先将消息持久化到硬盘中,时间到后再投递到相应的队列中,实现延迟发送消息的功能,大大简化了代码的实现方案。
首先我们需要到官网上下载与 RabbitMQ 版本相同的延迟插件,由于我们使用的 RabbitMQ 的版本是 3.12,因此我们需要下载 3.12 版本的延迟插件,由于插件的下载地址是 github,属于国外网站,偶尔才能访问,可以在不同的时间尝试访问。
RabbitMQ 延迟插件的下载地址为:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装插件非常简单,只需要将插件文件放到 RabbitMQ 的插件目录中,再运行启用插件的命令即可。由于我使用 docker-compose 部署的 RabbitMQ,容器中插件目录是 /plugins ,因此首先我们需要将插件文件拷贝到该目录中。
# 可以通过 xftp 将文件上传到 /app/rabbitmq 目录中
# 然后运行以下命令将插件文件拷贝到容器的 /plugins 目录中
# 备注:我启动的 RabbitMQ 的容器名称是 rabbitmq
docker cp /app/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins
然后进入容器中,使用 RabbitMQ 自带的命令启用插件即可:
# 进入容器中
docker exec -it rabbitmq bash
# 启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在接收消息的处理方法上,我们声明一个新的交换机,通过 delayed = "true" 设置为延迟交换机即可:
//接收 header 设置了 x-delay 的延迟消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "test.queue"),
//给 rabbitmq 安装了 delay message 插件后,给交换机设置 delayed 为 true,就变成了延迟交换机
//当带有 x-delay 头的消息发送到该交换机后,消息先持久化到硬盘中,然后返回给生产者没有找到路由的错误异常,
//因此生产者如果监听了消息是否由交换机发送到队列的话,就会收到 routing not found 的错误消息
//此时生产者的回调方法中,需要判断返回的错误消息是否设置了 delay 以及 delay 值是否大于 0
exchange = @Exchange(name = "delay.exchange", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg) {
log.info("接收到了 test.queue 的延迟消息:" + msg);
}
通过以下发送消息的代码进行测试验证:
//发送延迟消息测试
@Test
void publishDelayMessageTest() {
String exchange = "delay.exchange";
String rootingkey = "delay";
Message message = MessageBuilder
.withBody("delay messsage test".getBytes(StandardCharsets.UTF_8))
//延迟消息,必须要包含 x-delay 头,设置延迟时间,此处设置为延迟 5 秒发送
.setHeader("x-delay", 5000).build();
//如果发送者监听消息是否由交换机发送到队列时,必须给消息设置全局唯一 id
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//rabbitTemplate.convertAndSend(exchange, rootingkey, message, correlationData);
//将消息发送给延迟交换机
rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}
五、惰性队列
从 RabbitMQ 的 3.6.0 版本开始,就增加了惰性队列的功能,具有如下特征:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持海量的消息存储,存取性能取决于硬盘性能
可以通过 RabbitMQ 自带的命令添加策略,将已存在的队列设置为惰性队列,比如:
# 通过正则表达式,将以 lazy 开头的队列,设置为惰性队列
rabbitmqctl set_policy Lazy "^lazy" '{"queue-mode":"lazy"}' --apply-to queues
在代码中,只需要给队列设置 x-queue-mode 参数值为 lazy 即可变为惰性队列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "lazy.queue",
//将队列设置为惰性队列,消息会持续稳定的存储到磁盘中,用于解决消息大量累积的问题
arguments = @Argument(name = "x-queue-mode", value = "lazy")),
exchange = @Exchange(name = "test.exchange"),
key = "lazy"
))
public void listenLazyQueue(String msg) throws InterruptedException {
log.info("接收到了 lazy.queue 的消息:" + msg);
//模拟处理消息比较慢,造成消息无法及时消费处理掉,在队列中进行累积
//如果采用惰性队列,则能够持续稳定的存储到硬盘中,防止消息丢失
Thread.sleep(5000);
}
发送消息给惰性队列进行测试,这里发送 100 万条消息进行测试:
//短时间内,发送 100 万条消息到惰性队列中,可通过 RabbitMQ 的 web 控制台观察持久化的稳定性
@Test
void publishLazyMessageTest() {
String exchange = "test.exchange";
String rootingkey = "lazy";
String message = "test lazy message %d";
for (int i = 0; i < 1000000; i++) {
//发送消息
rabbitTemplate.convertAndSend(exchange, rootingkey, String.format(message, i));
}
}
到此为止,已经介绍完毕,所有代码都经过测试无误,这里就不截图展示具体执行效果了,可自行下载源代码测试验证。
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/rmq_dead_delay.zip