首页 > 其他分享 >RabbitMQ 死信交换机、延迟队列、惰性队列

RabbitMQ 死信交换机、延迟队列、惰性队列

时间:2023-10-05 10:56:13浏览次数:50  
标签:exchange 队列 RabbitMQ 死信 交换机 消息 ttl

如果一个队列设置了死信交换机,该队列的消息就有了极大的可靠性保障,当出现以下情况时,消息就会投递到死信交换机中:

  • 队列中的消息在被消费者处理后,抛出异常,返回了 nack 或者 reject
  • 如果队列设置了 ttl 或者消息本身设置了 ttl ,消息因为超时而未消费
  • 队列容量已经满了,后续发来的消息无法接收,直接被丢弃

有了死信交换机后,在保障消息的可靠性同时,也极大的简化了代码编写。死信交换机结合 ttl 队列也能够实现消息的延迟发送。由于延迟发送消息的使用场景非常广泛,因此 RabbitMQ 专门提供了延迟插件,进一步简化了延迟发送消息的代码。

队列满了,主要是因为服务器内存不足,这种情况其实是比较危险的,说明生产消息的速度超过了消费速度,已经造成了大量消息的累积。虽然 Spring AMQP 发送消息默认设置为持久化,但是当服务器内存不足时,会集中将一批消息持久化到硬盘中,此时会消耗一定的时间,造成 RabbitMQ 的不稳定,有可能会造成新生产的消息无法及时入队而丢失。当然增加消费者程序来提高消费速度是最佳解决方案,但是如果短期内无法增加消费者程序,我们可以将队列迅速变为惰性队列,将消息持续稳定的持久化到硬盘上,就可以确保消息可靠性,由于硬盘容量很大,将在足够长的时间内不会出现队列爆满的问题。

本篇博客主要通过代码的方式演示 RabbitMQ 以上消息可靠性保障措施,在博客的最后会提供源代码下载。


一、搭建工程

继续采用上篇博客通过 docker-compose 部署的 RabbitMQ 进行演示,搭建 SpringBoot 工程如下:

image

两个子工程没有引入任何依赖,主要使用父工程 pom 文件的依赖,父工程 pom 文件细节如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jobs</groupId>
    <artifactId>rmq_dead_delay</artifactId>
    <packaging>pom</packaging>
    <version>1.0</version>
    <modules>
        <module>publish_msg</module>
        <module>consumer_msg</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
    </parent>

    <dependencies>
        <!--在此主要使用 lombok 自带的 log 方法-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--Spring AMQP 消息队列依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--引入单元测试依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

二、死信交换机

死信交换机是在队列上进行设置,只需要在队列上增加 2 个参数设置即可:

  • x-dead-letter-exchange 参数设置死信交换机的名称
  • x-dead-letter-routing-key 参数设置私信交换机的路由 key

在上篇博客中,消费者本地重试次数耗尽后,我们设置的策略是投递到新的交换机中。有了死信交换机后,我们就不需要这么做了,本地重试次数耗尽后,仍然使用默认的丢弃策略,此时被丢弃的消息会通过死信交换机投递到其它队列中进行处理。

//用于死信交换机投递消息,当别的队列中消息处理错误被丢弃,或者超时未消费,则在该队列中进行处理
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "error.queue"),
    exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
    key = "error"
))
public void listenerDeadQueue(String msg) {
    log.info("接收到 dead.queue 消息:" + msg);
    //在这里可以记录日志,或将消息存储到数据库中,后续由人工进行干预处理
}

//接收到消息后,处理程序会报异常
//在 application.yml 中配置了本地重试 3 次,如果都是失败,默认会丢弃消息,返回 basic.reject
//由于已经为该队列设置了死信交换机和相应的路由 key,因此最终失败的消息会投递到 error.queue 中
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "data.queue", arguments = {
            //在队列上设置【死信交换机】和【路由key】
            @Argument(name = "x-dead-letter-exchange", value = "test.exchange"),
            @Argument(name = "x-dead-letter-routing-key", value = "error")}),
    exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
    key = "data"
))
public void listenerDataQueue(String msg) {
    log.info("接收到 data.queue 消息:" + msg);
    //以下代码会抛出异常
    Integer result = 1 / 0;
}

发送消息进行测试的代码如下:

//发送到 dead.queue
//消费者接收到消息后,处理过程中出现异常,本地重试 3 次,一共处理 4 次都抛异常,
//最终由死信交换机投递到 error.queue
@Test
void publishDeadTest() {
    String message = "dead message test";
    String exchange = "test.exchange";
    String rootingkey = "data";
    //发送消息
    rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}

三、TTL 队列

一个队列可以设置 ttl 时间,表示消息最多可以在队列中的存放时间,如果过期未消费就会被丢弃。对于一个消息来说,也可以设置 ttl 时间,表示该消息的存活时间,过期未消费也会被丢弃。如果队列和消息同时设置了 ttl 时间,则以最短的 ttl 时间为准。

package com.jobs.listener;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLQueueConfig {

    //在 SpringAmqpListener 中通过注解绑定交换机和队列时,必须要有消息处理程序。
    //为了测试发送到队列中的消息超时后,自动由死信交换机投递到指定队列中,我们不需要消息处理程序
    //因此只能采用 @Bean 注解去声明交换机和队列并进行绑定。
    //如果在 SpringAmqpListener 设置 ttl 队列的话,只需要给队列设置 x-message-ttl 参数即可

    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.exchange");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                //设置队列中每条消息的存活时间只有 10 秒钟
                .ttl(10000)
                //给队列设置【死信交换机】和【路由key】
                .deadLetterExchange("test.exchange")
                .deadLetterRoutingKey("error")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
}

发送消息测试队列的 ttl 和消息的 ttl

//发送普通消息到 ttl.queue
//由于我们创建的 ttl.queue 具有以下 3 个特征:
//1 每条消息的存活时间只有 10 秒
//2 没有消息处理程序
//3 设置了死信交换机和路由 key
//因此 10 秒之后,消息就会被死信交换机投递到 error.queue
@Test
void publish_Normal_To_TTL_Test() {
    String message = "normal message to ttl queue test";
    String exchange = "ttl.exchange";
    String rootingkey = "ttl";
    //发送消息
    rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}

//消息本身也可以设置 ttl 过期时间
//如果【消息】和【队列】都设置了 ttl 过期时间,以最短的 ttl 过期时间为准
//这里我们将消息的 ttl 过期时间设置为 5 秒,短于队列的 ttl 过期时间
//因此 5 秒之后,会被死信交换机投递到 error.queue 中
@Test
void publish_ttlmsg_To_TTL_Test() {
    String exchange = "ttl.exchange";
    String rootingkey = "ttl";
    Message message = MessageBuilder
        .withBody("ttl messsage to ttl queue test".getBytes(StandardCharsets.UTF_8))
        //设置消息的 ttl 过期时间为 5 秒
        .setExpiration("5000").build();
    //发送消息
    rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}

通过上面的代码和实际执行效果可以发现:死信交换机和 ttl 队列相结合可以实现延迟发送消息的功能。


四、延迟队列

通过死信交换机和 ttl 队列配合实现延迟发送消息的方案,太过于繁琐。由于延迟发送消息的场景比较广泛,RabbitMQ 提供了延迟插件,可以将任意交换机设置为延迟交换机。当发送给延迟交换机的消息包含 x-delay 头时,会先将消息持久化到硬盘中,时间到后再投递到相应的队列中,实现延迟发送消息的功能,大大简化了代码的实现方案。

首先我们需要到官网上下载与 RabbitMQ 版本相同的延迟插件,由于我们使用的 RabbitMQ 的版本是 3.12,因此我们需要下载 3.12 版本的延迟插件,由于插件的下载地址是 github,属于国外网站,偶尔才能访问,可以在不同的时间尝试访问。

RabbitMQ 延迟插件的下载地址为:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

安装插件非常简单,只需要将插件文件放到 RabbitMQ 的插件目录中,再运行启用插件的命令即可。由于我使用 docker-compose 部署的 RabbitMQ,容器中插件目录是 /plugins ,因此首先我们需要将插件文件拷贝到该目录中。

# 可以通过 xftp 将文件上传到 /app/rabbitmq 目录中
# 然后运行以下命令将插件文件拷贝到容器的 /plugins 目录中
# 备注:我启动的 RabbitMQ 的容器名称是 rabbitmq
docker cp /app/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins

然后进入容器中,使用 RabbitMQ 自带的命令启用插件即可:

# 进入容器中
docker exec -it rabbitmq bash
# 启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在接收消息的处理方法上,我们声明一个新的交换机,通过 delayed = "true" 设置为延迟交换机即可:

//接收 header 设置了 x-delay 的延迟消息
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "test.queue"),
    //给 rabbitmq 安装了 delay message 插件后,给交换机设置 delayed 为 true,就变成了延迟交换机
    //当带有 x-delay 头的消息发送到该交换机后,消息先持久化到硬盘中,然后返回给生产者没有找到路由的错误异常,
    //因此生产者如果监听了消息是否由交换机发送到队列的话,就会收到 routing not found 的错误消息
    //此时生产者的回调方法中,需要判断返回的错误消息是否设置了 delay 以及 delay 值是否大于 0
    exchange = @Exchange(name = "delay.exchange", delayed = "true"),
    key = "delay"
))
public void listenDelayExchange(String msg) {
    log.info("接收到了 test.queue 的延迟消息:" + msg);
}

通过以下发送消息的代码进行测试验证:

//发送延迟消息测试
@Test
void publishDelayMessageTest() {
    String exchange = "delay.exchange";
    String rootingkey = "delay";
    Message message = MessageBuilder
        .withBody("delay messsage test".getBytes(StandardCharsets.UTF_8))
        //延迟消息,必须要包含 x-delay 头,设置延迟时间,此处设置为延迟 5 秒发送
        .setHeader("x-delay", 5000).build();
    //如果发送者监听消息是否由交换机发送到队列时,必须给消息设置全局唯一 id
    //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    //rabbitTemplate.convertAndSend(exchange, rootingkey, message, correlationData);

    //将消息发送给延迟交换机
    rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}

五、惰性队列

从 RabbitMQ 的 3.6.0 版本开始,就增加了惰性队列的功能,具有如下特征:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持海量的消息存储,存取性能取决于硬盘性能

可以通过 RabbitMQ 自带的命令添加策略,将已存在的队列设置为惰性队列,比如:

# 通过正则表达式,将以 lazy 开头的队列,设置为惰性队列
rabbitmqctl set_policy Lazy "^lazy" '{"queue-mode":"lazy"}' --apply-to queues

在代码中,只需要给队列设置 x-queue-mode 参数值为 lazy 即可变为惰性队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "lazy.queue",
            //将队列设置为惰性队列,消息会持续稳定的存储到磁盘中,用于解决消息大量累积的问题
            arguments = @Argument(name = "x-queue-mode", value = "lazy")),
    exchange = @Exchange(name = "test.exchange"),
    key = "lazy"
))
public void listenLazyQueue(String msg) throws InterruptedException {
    log.info("接收到了 lazy.queue 的消息:" + msg);
    //模拟处理消息比较慢,造成消息无法及时消费处理掉,在队列中进行累积
    //如果采用惰性队列,则能够持续稳定的存储到硬盘中,防止消息丢失
    Thread.sleep(5000);
}

发送消息给惰性队列进行测试,这里发送 100 万条消息进行测试:

//短时间内,发送 100 万条消息到惰性队列中,可通过 RabbitMQ 的 web 控制台观察持久化的稳定性
@Test
void publishLazyMessageTest() {
    String exchange = "test.exchange";
    String rootingkey = "lazy";
    String message = "test lazy message %d";
    for (int i = 0; i < 1000000; i++) {
        //发送消息
        rabbitTemplate.convertAndSend(exchange, rootingkey, String.format(message, i));
    }
}

到此为止,已经介绍完毕,所有代码都经过测试无误,这里就不截图展示具体执行效果了,可自行下载源代码测试验证。

本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/rmq_dead_delay.zip

标签:exchange,队列,RabbitMQ,死信,交换机,消息,ttl
From: https://www.cnblogs.com/studyjobs/p/17743146.html

相关文章

  • RabbitMQ 消息发送和消费的可靠性保障
    在一些比较重要的场景中,我们必须要保障RabbitMQ消息的可靠性,也就是发送给rabbitmq的消息必须最终成功,消费者接收消息进行处理也必须最终成功。即使是中间失败了,也必须要有其它保障措施,哪怕最后进行人工进行干预处理。消息出现丢失的场景主要有:发送消息时丢失:比如消息发送到......
  • 【UVA 12100】Printer Queue 题解(队列+优先队列)
    计算机科学学生会中唯一的打印机正经历着极其繁重的工作量。有时,打印机队列中有100个作业,您可能需要等待数小时才能获得一页输出。由于某些作业比其他作业更重要,黑客将军为打印作业队列发明并实现了一个简单的优先级系统。现在,为每个作业分配1到9之间的优先级(9是最高优先级,1是最低......
  • [数据结构和算法] 堆/优先队列的实现
    预备知识:完全二叉树可以用数组表示:从下标0开始存储数据:左子节点=2*父节点+1,右子节点=2*父节点+2;从下标1开始存储数据:左子结点=2*父节点,右子节点=2*父节点+1;堆:大根堆:父节点的值大于等于左右子节点的值;小根堆:父节点的值小于等于左右子节点的值;......
  • 【数据结构】2.栈和队列
    1.栈1.1栈的抽象父类#pragmaoncetemplate<classT>classStack{public://析构函数virtual~Stack(){}//栈是否为空virtualboolempty()const=0;//栈的大小virtualintsize()const=0;//栈顶元素virtualT&top()=0......
  • 【数据结构】栈、队列和数组
    栈、队列和数组栈队列数组数组的顺序表示和实现顺序表中查找和修改数组元素矩阵的压缩存储特殊矩阵稀疏矩阵栈初始化#defineMaxSize50//栈中元素的最大个数typedefcharElemType;//数据结构typedefstruct{inttop;//栈顶指针ElemTypedata[MaxSize];//存放栈中......
  • Java语言通过三种方法来实现队列
    队列关于作者作者介绍......
  • 多重背包单调队列优化
    引用自:动态规划-背包问题(01背包、完全背包、多重背包)#include<cstdio>#include<algorithm>#include<cstring>usingnamespacestd;constintmaxn=100005;intn,m,cnt;intv[102],num[102],dp[maxn];structQueue{intpos,val;}q[maxn];intmain(){......
  • 使用链表模拟队列和栈
    使用链表模拟队列案例1//创建节点类publicclassNode{intn;Nodenext;}//编写方法publicclassQueue{Nodehead=newNode();Nodelast=newNode();privateintlen=0;publicintsize(){returnthis.len;}......
  • 使用数组模拟队列和栈
    使用数组模拟队列案例1publicclassQueue{privateint[]num=newint[5];privateintlen=0;publicintsize(){returnthis.len;}//添加publicintadd(intn){if(len<num.length){num[len]=n;......
  • 浅谈数据结构栈与队列
    栈1.栈的基本概念栈(Stack):是只允许在一端进行插入或删除的线性表。首先栈是一种线性表,但限定这种线性表只能在某一端进行插入和删除操作。不能插入和删除的一端为栈底(Bottom)栈顶(top):线性表允许进行插入删除的那一端栈底(bottom):固定的,不允许进行插入和删除的那一端空栈:不含任何元素的......