首页 > 其他分享 >rabbitmq死信队列

rabbitmq死信队列

时间:2023-09-02 20:22:04浏览次数:27  
标签:队列 rabbitmq 死信 交换机 消息 channel arguments

死信的概念

死信队列(Dead Letter Queue)是指当消息无法被消费者正常消费时,将这些无法消费的消息发送到专门的死信队列中,以便进行进一步的处理。这种处理方式通常被称为“死信处理”。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效

在RabbitMQ中,死信队列通常用于以下几种情况:

  • 消息被拒绝:消费者在消费消息时,如果无法处理该消息,可以将该消息拒绝并返回到RabbitMQ。此时,RabbitMQ可以将这些被拒绝的消息发送到死信队列中,以便进行处理。

  • 消息过期:在发送消息时,可以设置消息的过期时间。如果消息在过期时间内没有被消费者消费,那么RabbitMQ会将这些过期的消息发送到死信队列中。

  • 队列达到最大长度:当队列达到最大长度时,新的消息将无法被发送到队列中。此时,RabbitMQ可以选择将这些无法发送的消息发送到死信队列中,以便进行进一步处理。

实战

image
说明:这里的交换机类型可以自己定义,不一定要求就是直连,根据业务场景选择不同的类型。
正常情况下,生产者发送消息到交换机上,然后到普通的队列,由消费者消费,但是当达到图中三个条件中的其中一个,此时消息会转为死信消息。由于消息不能丢失,要保持正常的消费,所以此时需要定义死信交换机和队列,由死信消费者去消费。

生产者

我将有关于队列的声明,交换机的声明,以及队列与交换机的绑定关系都写在生产者中。
在此之前还记得构建队列时,我们的有一个形参需要传递一个map嘛,这个map可以定义相关的参数
channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);
例如普通队列与死信交换机的绑定关系

 // 2.1 构建参数:指定某种条件下达成的死信消息
        Map<String, Object> arguments = new HashMap<>();
        // 过期时间  10s = 10000ms  可以由生产者中设定----一般由生产者发送
        // arguments.put("x-message-ttl",10000);
        //正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
        // 设置死信routingkey
        arguments.put("x-dead-letter-routing-key","del-queue");
        channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);

完整代码如下:

/**
 * 死信生产者
 * 定义相关队列、交换机等
 */
public class DieProducer {
    // 定义普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 定义普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";
    // 定义 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    // 定义 死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    // 初始化队列
    public static void init( Channel channel) throws Exception{
        // 1.声明普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 2.声明普通队列
        // 2.1 构建参数:指定某种条件下达成的死信消息
        Map<String, Object> arguments = new HashMap<>();
        // 过期时间  10s = 10000ms  可以由生产者中设定----一般由生产者发送
        arguments.put("x-message-ttl",10000);
        //正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
        // 设置死信routingkey
        arguments.put("x-dead-letter-routing-key","del-queue");
        channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);
        // 3.声明死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        // 4.声明死信队列
        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);

        //5.绑定队列与交换机的关系
        // 5.1 绑定普通队列与普通交换机的关系
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"h1");
        // 5.2 绑定死信队列与死信交换机的关系
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"del-queue");

    }

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        // 初始化操作
        init(channel);

        //发送消息
        for (int i = 1; i < 11 ; i++) {
            String message = "info "+i;
            channel.basicPublish(NORMAL_EXCHANGE,"h1",null,message.getBytes(StandardCharsets.UTF_8));
        }

    }
}

运行:
image

可以看到普通队列的消息都转发到了死信队列中了,这是因为设置了消息过期时间,从而满足了成为死信队列的条件之一

// 设置过期时间,当消息时间10秒还未消费,那么就成为死信队列
arguments.put("x-message-ttl",10000);
//正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","del-queue");

普通消费者

代码很简单,只需要接收消息就好了

/**
 * 普通消费者,用于消费普通消息队列
 */
public class DieNormalConsumer {

    // 定义普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        // 定义消息成功的回调
        DeliverCallback nackCallback = ( consumerTag,  message)->{
            //接收消息,打印到控制台
            System.out.println("接收到普通队列的消息:"+new String(message.getBody()));
        };

        channel.basicConsume(NORMAL_QUEUE,true,nackCallback,(consumerTag)->{});
    }
}

死信消费者

同样的,只需要接收消息就好了


/**
 * 死信消费者,用于消费死信消息队列
 */
public class DieDeadConsumer {
    // 定义 死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        // 定义消息成功的回调
        DeliverCallback nackCallback = ( consumerTag,  message)->{
            //接收消息,打印到控制台
            System.out.println("接收到死信队列的消息:"+new String(message.getBody()));
        };

        channel.basicConsume(DEAD_QUEUE,true,nackCallback,(consumerTag)->{});
    }
}

以上代码实现是基于TTL消息过期时间导致普通队列转换为死信队列的场景
我们知道,成为死信只需要满足三个条件中的其中一个即可

  • 消息被拒绝:消费者在消费消息时,如果无法处理该消息,可以将该消息拒绝并返回到RabbitMQ。此时,RabbitMQ可以将这些被拒绝的消息发送到死信队列中,以便进行处理。

  • 消息过期:在发送消息时,可以设置消息的过期时间。如果消息在过期时间内没有被消费者消费,那么RabbitMQ会将这些过期的消息发送到死信队列中。

  • 队列达到最大长度:当队列达到最大长度时,新的消息将无法被发送到队列中。此时,RabbitMQ可以选择将这些无法发送的消息发送到死信队列中,以便进行进一步处理。

队列达到最大长度成为死信

只需要在上面的案例中修改声明时的参数列表即可

//3.声明普通队列:因为如果消息称为死信,那么是由队列发送给死信交换机的。所以需要用到 arguments
Map<String, Object> arguments = new HashMap<>();
//正常队列设置过期之后的死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","lisi");
arguments.put("x-max-length",6);  // 表示 队列最大长度为6,超过即为死信队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

例如我循环发送10条信息,但是队列最大长度为6,在未消费的情况下,剩余的4个消息就会成为死信

消息被拒成为死信

例如在消费者方,指定某个消息我不接收,那么该消息就会转到死信队列中

// 接收消息成功的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    // 拒收 消息 info5,同时将他送会死信队列中
      if(message.equals("info5")){
             System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
             //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
             channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
      }else {
             System.out.println("Consumer01 接收到消息"+message);
             // 开启逐一应答
             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
         }
 };

总结参数

以上三个案例用到的参数:

 arguments.put("x-message-ttl",10000);
 //正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","del-queue");
arguments.put("x-max-length",6);  // 表示 队列最大长度为6,超过即为死信队列

标签:队列,rabbitmq,死信,交换机,消息,channel,arguments
From: https://www.cnblogs.com/zgf123/p/17674163.html

相关文章

  • docker 安装rabbitmq
    dockerpullrabbitmqdockerrun-d--hostnamemyrabbitmq--namerabbitmq-p15672:15672-p5672:5672rabbitmqdockerexec-itrabbitmq/bin/bashrabbitmq-pluginsenablerabbitmq_management可以通过访问http://localhost-ip:15672,访问web界面,这里的用户名和密......
  • RabbitMQ交换机
    概念RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。......
  • portswigger——Response queue poisoning(响应队列中毒)_02
    响应队列中毒响应队列中毒是一种强大的请求走私攻击形式,它会导致前端服务器开始将响应从后端映射到错误的请求。实际上,这意味着同一前端/后端连接的所有用户都将获得针对其他人的持久响应。这是通过走私一个完整的请求来实现的,从而在前端服务器只期望一个响应时从后端引发两个响......
  • Redis队列Stream&Redis多线程详解(8)
    Redis目前最新版本为Redis-6.2.6,考虑到实际的情况,本次课程会以CentOS7下Redis-6.2.4版本进行讲解。下载地址:https://redis.io/download安装运行Redis很简单,在Linux下执行上面的4条命令即可,同时前面的课程已经有完整的视频讲解,请到网盘中下载观看,并自行安装。如安装过程出......
  • rabbitmq消息持久化
    概念消息是可以持久化保存的,持久的目的是为了处理任务丢失情况的,采用持久化可以保证消息存储,且消息不被丢失。默认情况下RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。代码......
  • 数据结构与算法——栈和队列<也不过如此>
    ......
  • std模版库 队列、优先队列、双端队列
    queue为单端队列deque为双端队列priority_queue为优先队列includeincludepriority_queue<int,vector,less>//最大堆默认为对大堆也即和priority_queue等价priority_queue<int,vector,greater>//最小堆......
  • linux内核等待队列详解
    https://www.cnblogs.com/xinghuo123/p/13347964.html等待队列用于使得进程等待某一特定事件的发生,无需频繁的轮询,进程在等待周期中睡眠,当时间发生后由内核自动唤醒。1数据结构1.1等待队列头等待队列结构如下,因为每个等待队列都可以再中断时被修改,因此,在操作等待队列之前必......
  • RabbitMQ Stream类型队列
    RabbitMQ提供了三种类型的队列:ClassicQuorumStream官方文档对于流队列的描述是:高性能、可持久化、可复制、非破坏性消费、只追加写入的日志使用场景:一个队列将同一条消息分发给不同消费者可重复消费消息更高的性能存储大量消息而不影响性能更高的吞......
  • RabbitMQ快速入门--简单队列模型
             ......