首页 > 其他分享 >RabbitMq的死信队列

RabbitMq的死信队列

时间:2023-08-16 16:02:47浏览次数:40  
标签:exchange 队列 RabbitMq 死信 交换机 消息 public

参考博客:
https://blog.csdn.net/weixin_59074080/article/details/130673121
https://blog.csdn.net/m0_46979453/article/details/127229005
https://zhuanlan.zhihu.com/p/582787597?utm_id=0

什么是死信队列

正常情况下,一条消息自生产者发布到broke,然后转发到队列中,最后被订阅了(一般是绑定的路由键和发布消息时的路由键相同)该队列的消费者所消费。


消费者在消费生产者生产的消息时发生了某些特殊情况,导致消息无法被正常消费,这些消息会被重新发送到另一个交换机中,这个交换机就是DLX(死信交换机),绑定DLX的队列就称之为死信队列。


DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定

image

产生死信消息的原因

1、消息在队列的存活时间超过设置的生存时间(TTL)时间。

2、队列达到最大的长度 (队列容器已经满了。)

3、消费者消费多次消息失败,就会转移存放到死信队列中。

死信消息需要进行特殊处理,不然可能导致消息中间件不可用,或者业务没法正常提供服务。

如果我们配置了死信队列,那么死信消息将会被丢进死信队列中;如果没有配置死信队列,则该消息将会被丢弃。

如何配置死信交换机和死信队列

需要三步:

1、配置业务队列,绑定到业务交换机上。

这一步和死信队列没有关系,但是没有业务队列,也就没有所谓的死信队列,因此,将这一步排在第一。

2、为业务队列配置死信交换机和路由键。

没有死信交换机,也就没有所谓的死信队列,两者还要通过路由键绑定在一起。

3、为死信交换机配置死信队列。

死信消息最终是要传递到死信队列中的,因此必须配置一个死信队列。

所谓死信交换机(DLX),其实也是一个普通的交换机,它可以是任何类型的交换机,direct、topic、fanout、headers都可以。

配置死信交换机有两种方式:

1、通过rabbitmqctl命令配置

linux系统:

rabbitmqctl set_policy DLX ".*"  '{"dead-letter-exchange":"my-dlx", "dead-letter-routing-key":"rk001"}'

window系统:

rabbitmqctl set_policy DLX ".*"  "{""dead-letter-exchange"":""my-dlx"",""dead-letter-routing-key"":""rk001""}"

注意单引号和双引号的不同!

2、在声明队列时,通过队列参数x-dead-letter-exchange配置

// 声明交换机
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
// 设置死信交换机
args.put("x-dead-letter-exchange", "some.exchange.name");
// 设置死信路由键
args.put("x-dead-letter-routing-key", "some-routing-key");
// 声明队列
channel.queueDeclare("myqueue", false, false, false, args);

需要注意的是,当我们同时通过以上两种方式配置了私信交换机,通过队列参数配置的优先级更高,它会覆盖通过rabbitmqctl命令配置的信息。

但是,更推荐通过rabbitmqctl命令配置,因为可以在不重新部署应用程序就可以配置。


参数dead-letter-exchange:指定死信交换机。

参数dead-letter-routing-key:指定死信路由键,用于绑定死信交换机和死信队列。

所谓死信队列(DLQ),也是一个普通的队列,只不过它是和死信交换机绑定的而已,在声明队列的时候,通过x-dead-letter-exchange参数和x-dead-letter-routing-key指定死信交换机以及死信路由键即可。

死信队列的场景应用场景

消息本身存在错误的场景中,如参数校验异常、关键字段缺失等

对于这类消息,无论重新入队消费多少次,都不可能被消费成功的,而且频繁入队会导致后续的正常的消息没法被消费,甚至造成消息积压,影响服务的整体可用性。

所以,我们设置一个合理的重试次数N,在消息消费失败时,先不将消息放入死信队列,而是在重新入队N次之后,如果消息依然没有被成功消费,这时候再将该消息放入死信队列中。

合理的重试次数可以避免由于网络波动导致的短暂不可用,错误地将正常的消费直接放入死信队列的问题。对于这种情况,重试之后,消息一般都可以重新消费成功。

死信对消息的影响

假设,一条消息投递到了交换机exchange_a,并指定routing key 为 rk001。

我们通过dead-letter-exchange为其设置了死信交换机exchange_dl,通过dead-letter-routing-key 为其设置了死信路由键 rk001_dl。

交换机的名称会改变

当这条消息由于消费失败,被投递到死信交换机之后,其交换机名称会变为exchange_dl,而不是原来的exchange_a。

路由键会改变

原来的routing key 是 rk001,消息投递到死信交换机之后,routing key变为 rk001_dl了。

路由键变化的前提是:我们通过参数 x-dead-letter-routing-key 指定死信路由键,死信路由键将会被替换成该参数对应的值。

如果没有通过dead-letter-routing-key 设置死信路由键,那么,该消息的路由键依然是原来的rk001。

消息头变化

一条消息变为死信消息之后,其Header中会有一些额外的参数。

  • x-first-death-exchange:第一次成为死信之前的交换机的名称。

  • x-first-death-reason:第一次成为死信的原因。

rejected:由于default-requeue-rejected 参数被设置为false,消息在重新进入队列时被拒绝。

expired :消息的存活时间超过了设置的过期时间。

maxlen : 队列内消息数量超过队列最大容量。

delivery_limit:消息返回的次数超过了限制(通过仲裁队列的策略参数delivery-limit设置)。
  • x-first-death-queue: 第一次成为死信之前的队列的名称

  • x-death: 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新。

[
    {
      "reason": "rejected",
      "count": 1,
      "exchange": "business.exchange",
      "time": 1669478090000,
      "routing-keys": [
        "rk.001"
      ],
      "queue": "business.queue"
    }
]

如上所示,x-death是一个json串,其有以下几个属性:

reason:该消息变为死信消息的原因

count:该消息投递到死信队列中的次数

exchange:该消息在投递到死信队列之前的交换机

time:该消息被投递到死信队列的时间戳

routing-keys:该消息在投递到死信队列之前的路由键

queue:该消息在投递到死信队列之前所在的队列

original-expiration:消息的原始过期属性。如果一个消息是因为超过存活时间而过期,会展示这个属性。另外,过期属性将从死信消息中删除,以防止其再次过期。

模拟死信队列

1、nack响应,且requeue = false

application.yml配置文件信息

spring:
  rabbitmq:
    host: 116.114.21.15
    port: 5672
    username: guest
    password: guest
    dynamic: true
    listener:
      simple:
        acknowledge-mode: manual
        default-requeue-rejected: false
    virtual-host: /

通过acknowledge-mode启用手动应答模式。

通过default-requeue-rejected设置消息被拒绝后,不再重新入队。

RabbitConfig

package com.panda.rabbitmq.config;

import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Data
public class RabbitConfig {
    public static final String DEAD_LETTER_ROUTING_KEY = "rk.dead.letter.001";
    public static final String BUSINESS_ROUTING_KEY = "rk.001";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String BUSINESS_EXCHANGE = "business.exchange";

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * 声明业务队列的交换机
     */
    @Bean("businessExchange")
    public DirectExchange businessExchange() {
        return new DirectExchange(BUSINESS_EXCHANGE);
    }

    /**
     * 声明死信交换机
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

    /**
     * 声明死信队列
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 声明业务队列和业务交换机的绑定关系
     */
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_ROUTING_KEY);
    }

    /**
     * 声明死信队列和死信交换机的绑定关系
     */
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
    }
}

配置业务交换机、业务队列、死信交换机、死信队列,并通过路由键将业务交换机和业务队列绑定在一起,将死信交换机和死信队列绑定在一起。

注意,我们在声明业务队列时,指定了两个参数x-dead-letter-exchange和x-dead-letter-routing-key,这个是关键,如果不配置这两个参数,那么在业务消息消费失败之后,是不会投递到死信交换机的。

BusinessProducer

业务生产者代码,发送业务消息。

import com.alibaba.fastjson.JSON;
import com.panda.rabbitmq.config.RabbitConfig;
import com.panda.rabbitmq.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class BusinessProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(Order order) {
        rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE, RabbitConfig.BUSINESS_ROUTING_KEY, 
            JSON.toJSONString(order));
    }
}

BusinessConsumer

import com.alibaba.fastjson.JSON;
import com.panda.rabbitmq.config.RabbitConfig;
import com.panda.rabbitmq.entity.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class BusinessConsumer {
    @RabbitListener(queues = RabbitConfig.BUSINESS_QUEUE)
    public void receive(Message message, Channel channel) throws IOException {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        log.info("收到业务消息:{}", order);
        log.info("业务消息附带的头信息: {}", JSON.toJSONString(message.getMessageProperties().getHeaders()));
        try {
            if (StringUtils.isBlank(order.getStatus())) {
                throw new IllegalArgumentException("order's status can not be null!");
            }
        } catch (Exception e) {
            log.error("业务消息消费失败:{}", e.getMessage());
            // 消息消费异常后,返回一个nack响应
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

业务消费者代码。

通过@RabbitListener注解,标注一个方法为消费者。

在这个方法中,我们首先将消息打印出来,然后故意抛出一个IllegalArgumentException(其实是发送消息时,故意没有指定status),模拟消费失败的情况。

最后捕获这个异常,通过channel.basicNack给一个nack响应。

DeadLetterConsumer

import com.alibaba.fastjson.JSON;
import com.panda.rabbitmq.config.RabbitConfig;
import com.panda.rabbitmq.entity.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class DeadLetterConsumer {
    @RabbitListener(queues = RabbitConfig.DEAD_LETTER_QUEUE)
    public void receiveA(Message message, Channel channel) throws IOException {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        log.info("收到死信消息:{}", order);
        log.info("死信消息附带的头信息: {}", JSON.toJSONString(message.getMessageProperties().getHeaders()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

死信消费者。

简单的打印出进入到死信队列的消息,然后手动给一个ack应答(在实际业务中,肯定不能这么简单的处理,但这里不是我们关注的重点)。

TestController

import com.panda.rabbitmq.deadletter.BusinessProducer;
import com.panda.rabbitmq.entity.Order;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("test")
public class TestController {

    @Resource
    private BusinessProducer businessProducer;

    @RequestMapping(value = "sendMsg")
    public void sendMsg() {
        Order order = new Order();
        order.setId("20221126000001");
        order.setType("1");
        businessProducer.send(order);
    }
}

简单的测试接口,发送一条消息,故意不设置status属性,为了模拟业务消费者消费失败。

image

测试结果

2022-11-26 23:54:50.061  INFO 32848 --- [ntContainer#0-1] c.p.r.deadletter.BusinessConsumer        : 
收到业务消息:Order(id=20221126000001, type=1, status=null)
2022-11-26 23:54:50.067  INFO 32848 --- [ntContainer#0-1] c.p.r.deadletter.BusinessConsumer        : 
业务消息附带的头信息: {}
2022-11-26 23:54:50.071 ERROR 32848 --- [ntContainer#0-1] c.p.r.deadletter.BusinessConsumer        : 
业务消息消费失败:order's status can not be null!
2022-11-26 23:54:50.084  INFO 32848 --- [ntContainer#1-1] c.p.r.deadletter.DeadLetterConsumer      : 
收到死信消息:Order(id=20221126000001, type=1, status=null)
2022-11-26 23:54:50.086  INFO 32848 --- [ntContainer#1-1] c.p.r.deadletter.DeadLetterConsumer      : 
死信消息附带的头信息: 
{
  "x-first-death-exchange": "business.exchange",
  "x-death": [
    {
      "reason": "rejected",
      "count": 1,
      "exchange": "business.exchange",
      "time": 1669478090000,
      "routing-keys": [
        "rk.001"
      ],
      "queue": "business.queue"
    }
  ],
  "x-first-death-reason": "rejected",
  "x-first-death-queue": "business.queue"
}

从上面显示的结果可以看出,业务消息在消费时失败了,而后被投递到死信交换机,因此,该死信消息才能被死信消费者所消费。

业务消息并没有附带任何头信息!而死信消息却附带了很多头信息。这些都头信息就是上面的【消息头变化】一节讲述的

代码示例——消息的存活时间超过设置的生存时间

application.yml

配置同上面的例子相同,不再赘述。

RabbitConfig

import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Data
public class RabbitConfig {
    public static final String DEAD_LETTER_ROUTING_KEY = "rk.dead.letter.001";
    public static final String BUSINESS_ROUTING_KEY = "rk.001";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String BUSINESS_EXCHANGE = "business.exchange";

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * 声明业务队列的交换机
     */
    @Bean("businessExchange")
    public DirectExchange businessExchange() {
        return new DirectExchange(BUSINESS_EXCHANGE);
    }

    /**
     * 声明死信交换机
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
//        args.put("x-message-ttl", 5000);
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

    /**
     * 声明死信队列
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 声明业务队列和业务交换机的绑定关系
     */
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_ROUTING_KEY);
    }

    /**
     * 声明死信队列和死信交换机的绑定关系
     */
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
    }
}

在声明业务队列时,指定了两个参数x-dead-letter-exchange和x-dead-letter-routing-key,这个是关键,如果不配置这两个参数,那么在业务消息消费失败之后,是不会投递到死信交换机的。

另外,我们其实还配置了x-message-ttl参数,该参数指定队列中消息的过期时间,单位是毫秒。不过我们先把它注释掉。

BusinessProducer

业务生产者代码,发送业务消息。

在发送消息的同时,指定该消息的过期时间(5秒)。

@Component
@Slf4j
public class BusinessProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(Order order) {
        MessagePostProcessor messagePostProcessor = message -> {
            // 设置消息过期时间为5秒
            message.getMessageProperties().setExpiration("5000");
            return message;
        };
        log.info("开始发送业务消息");
        rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE, RabbitConfig.BUSINESS_ROUTING_KEY,
                JSON.toJSONString(order), messagePostProcessor);
    }
}

BusinessConsumer

将@Component注解和 @RabbitListener注解全都注释掉,这样就没有消费者消费业务队列中的消息,在超过我们设置的超时时间之后,消息会进入到死信队列中。

//@Component
@Slf4j
public class BusinessConsumer {
//    @RabbitListener(queues = RabbitConfig.BUSINESS_QUEUE)
    public void receive(Message message, Channel channel) throws IOException {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        log.info("收到业务消息:{}", order);
        log.info("业务消息附带的头信息: {}", JSON.toJSONString(message.getMessageProperties().getHeaders()));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("业务消息消费失败:{}", e.getMessage());
            // 消息消费异常后,返回一个nack响应
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

DeadLetterConsumer

配置同上面的例子相同,不再赘述。

TestController

配置同上面的例子相同,不再赘述。

测试结果

2023-08-1611: 11: 19.553INFO17776---[nio-8081-exec-1]e.d.c.r.deadLetter2.BusinessProducer: 开始发送业务消息2023-08-1611: 11: 24.711INFO17776---[ntContainer#2-1]e.d.c.r.deadLetter2.DeadLetterConsumer: 收到死信消息:Order(id=20221126000001,
type=1,
status=null)2023-08-1611: 11: 24.713INFO17776---[ntContainer#2-1]e.d.c.r.deadLetter2.DeadLetterConsumer: 死信消息附带的头信息:{
	"x-first-death-exchange": "business.exchange",
	"x-death": [{
		"reason": "expired",
		"original-expiration": "5000",
		"count": 1,
		"exchange": "business.exchange",
		"time": 1692155485000,
		"routing-keys": ["rk.001"],
		"queue": "business.queue"
	}],
	"x-first-death-reason": "expired",
	"x-first-death-queue": "business.queue"
}

如上所示。发送消息的时间是2022-11-27 00:37:56.910,由于没有消费者(我们注释掉了),该消息在超时之后,应该进入到死信队列中。

在上面的结果中,我们可以清楚地看到,收到死信消息的时间是2022-11-27 00:38:01.994,发送消息的时间和死信消费者消费死信消息的时间,比我们设置的超时时间(5秒)多一点点。

而且,从"reason":"expired"也可以看出,消息进入死信交换机的原因是超时(expired)了。

另外,由于该消息是由于超时进入死信队列的,所以x-death属性中有original-expiration属性,这一点和我们在上面的【消息头变化】一节的分析也是一致的。

注意点

在配置消息的过期时间时,有两种配置方式,一种是上面我们在发送消息时配置,如下所示:

public void send(Order order) {
        MessagePostProcessor messagePostProcessor = message -> {
            // 设置消息过期时间为5秒
            message.getMessageProperties().setExpiration("5000");
            return message;
        };
        log.info("开始发送业务消息");
        rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE, RabbitConfig.BUSINESS_ROUTING_KEY,
                JSON.toJSONString(order), messagePostProcessor);
    }

一种是在声明队列时,通过x-message-ttl参数指定消息的超时时间,如下所示:

   /**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
        args.put("x-message-ttl", 5000);
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

这两种方式的区别是:

前者是指定单个消息的过期时间。而后者是指定整个队列中所有消息的超时时间。

代码示例——超出队列的最大长度限制

application.yml

配置同上面的例子相同,不再赘述。

RabbitConfig

import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Data
public class RabbitConfig {
    public static final String DEAD_LETTER_ROUTING_KEY = "rk.dead.letter.001";
    public static final String BUSINESS_ROUTING_KEY = "rk.001";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String BUSINESS_EXCHANGE = "business.exchange";

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * 声明业务队列的交换机
     */
    @Bean("businessExchange")
    public DirectExchange businessExchange() {
        return new DirectExchange(BUSINESS_EXCHANGE);
    }

    /**
     * 声明死信交换机
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
//        args.put("x-message-ttl", 5000);
        // 设置队列中最大的消息容量
        args.put("x-max-length", 5);
        // 指定队列中消息达到最大限制之后的行为
        // 可选参数有:
        // drop-head(删除队列头部的消息)、
        // reject-publish(最近发来的消息将被丢弃)、
        // reject-publish-dlx(拒绝发送消息到死信交换器)
        // 注意,类型为quorum的队列只支持drop-head
        // x-overflow属性默认的处理策略是丢掉队列的头部的消息,或者将队列头部的消息投递到死信交换机
        // args.put("x-overflow", "reject-publish");
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

    /**
     * 声明死信队列
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 声明业务队列和业务交换机的绑定关系
     */
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_ROUTING_KEY);
    }

    /**
     * 声明死信队列和死信交换机的绑定关系
     */
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
    }
}

注意,我们在声明业务队列时,指定了两个参数x-dead-letter-exchange和x-dead-letter-routing-key,这个是关键,如果不配置这两个参数,那么在业务消息消费失败之后,是不会投递到死信交换机的。

我们还配置了x-max-length参数,该参数指定队列中消息的最大容量(消息的条数,不是消息的字节大小)为5,即该队列中同时可以最多容纳5条消息。

另外,我们还配置了x-overflow参数,该参数指定了队列中的消息达到最大容量之后,再接收到消息时的处理策略,暂时先注释掉。

BusinessProduer

@Component
@Slf4j
public class BusinessProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(Order order) {
        log.info("开始发送业务消息");
        rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE, RabbitConfig.BUSINESS_ROUTING_KEY,
                JSON.toJSONString(order));
    }
}

生产者,发送一条消息。

BusinessConsumer

//@Component
@Slf4j
public class BusinessConsumer {
    //    @RabbitListener(queues = RabbitConfig.BUSINESS_QUEUE)
    public void receive(Message message, Channel channel) throws IOException {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        log.info("收到业务消息:{}", order);
        log.info("业务消息附带的头信息: {}", JSON.toJSONString(message.getMessageProperties().getHeaders()));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("业务消息消费失败:{}", e.getMessage());
            // 消息消费异常后,返回一个nack响应
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

将@Component注解和 @RabbitListener注解全都注释掉,这样就没有消费者消费业务队列中的消息,当我们发送多条消息,超过指定的最大容量之后的消息就会进入到死信队列中。

DeadLetterConsumer

配置同上面的例子相同,不再赘述。

TestController

@RestController
@RequestMapping("test")
public class TestController {

    @Resource
    private BusinessProducer businessProducer;

    @RequestMapping(value = "sendMsg")
    public void sendMsg() {
        for (int i = 1; i <= 6; i++) {
            Order order = new Order();
            order.setId("2022112600000" + i);
            order.setType(i + "");
            order.setStatus(i + "");
            businessProducer.send(order);
        }
    }
}

我们在sendMsg方法中,循环发送6条消息,由于我们设置了消息的最大容量是5,且没有消费者,所以有一条消息会进入到死信队列中。

测试结果

2023-08-16 15:36:31.546  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.634  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.667  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.667  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.699  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.700  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.728  INFO 18504 --- [ntContainer#2-1] e.d.c.r.deadLetter3.DeadLetterConsumer   : 收到死信消息:Order(id=20221126000001, type=1, status=1)
2023-08-16 15:36:31.731  INFO 18504 --- [ntContainer#2-1] e.d.c.r.deadLetter3.DeadLetterConsumer   : 死信消息附带的头信息: {"x-first-death-exchange":"business.exchange","x-death":[{"reason":"maxlen","count":1,"exchange":"business.exchange","time":1692171392000,"routing-keys":["rk.001"],"queue":"business.queue"}],"x-first-death-reason":"maxlen","x-first-death-queue":"business.queue"}

日志打印了6次“开始发送业务消息”,说明我们发送了6条消息,而死信消费者消费了一条消息,而且消费的是最先发送到队列中的消息,也就是队列头部的消息。

也就是说当队列中消息容量达到最大值之后,如果依然有消息投递到该队列中,那么队列头部的消息会被投递到死信交换机中!

而且,从"reason":"maxlen",也可以看出,消息进入死信交换机的原因是超出最大限制(maxlen)了。

如果我们把RabbitConfig中配置改成如下:

/**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
        // args.put("x-message-ttl", 5000);
        // 设置队列中最大的消息容量
        args.put("x-max-length", 5);
        // 指定队列中消息达到最大限制之后的行为
        // 可选参数有:
        // drop-head(删除队列头部的消息)、
        // reject-publish(最近发来的消息将被丢弃)、
        // reject-publish-dlx(拒绝发送消息到死信交换器)
        // 注意,类型为quorum的队列只支持drop-head
        // x-overflow属性默认的处理策略是drop-head,即丢掉队列的头部的消息,或者将队列头部的消息投递到死信交换机
        args.put("x-overflow", "reject-publish");
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

即,给队列设置参数x-overflow,其属性值为reject-publish,结果如何呢?

2023-08-16 15:41:26.136  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.230  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.280  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.280  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.366  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.367  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息

IDEA控制台信息如上,可以看出6条消息确实是发送出去了,但是没有死信消费者的消费信息,说明多余那一条消息没有被投递到死信交换机之中。

我们再看RabbitMQ控制台,如下图所示,可以看出第6条消息被抛弃了(不是队头,是队尾的那条消息)。

image

踩坑

Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'business.queue' in vhost '/': received the value '5' of type 'longstr' but current is none, class-id=50, method-id=10)

错误原因:

  RabbitMQ中已存在这个队列,但在启动的项目中对这个队列的属性进行了修改。RabbitMQ中的队列一经声明,其属性不可修改。

解决方法:

  删除该队列并重新声明

注意点

设置队列容量的最大限制有两种方式:

一,通过rabbitmqct命令,如下所说:

rabbitmqctl set_policy ploicy_name "queue_name" '{"max-length-bytes":1048576}'

二、通过x-max-length参数,如下所示。

    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
        // args.put("x-message-ttl", 5000);
        // 设置队列中最大的消息容量
        args.put("x-max-length", 5);
        // 指定队列中消息达到最大限制之后的行为
        // 可选参数有:
        // drop-head(删除队列头部的消息)、
        // reject-publish(最近发来的消息将被丢弃)、
        // reject-publish-dlx(拒绝发送消息到死信交换器)
        // 注意,类型为quorum的队列只支持drop-head
        // x-overflow属性默认的处理策略是drop-head,即丢掉队列的头部的消息,或者将队列头部的消息投递到死信交换机
        args.put("x-overflow", "reject-publish");
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

注意,如果我们同时使用这两种方式设置了队列的最大长度,那么较小的值将被使用!另外,只有处于ready状态(在RabbitMQ中,消息有2种状态:ready 和 unacked)的消息被计数,未被确认的消息不会被计数受到limit的限制。

标签:exchange,队列,RabbitMq,死信,交换机,消息,public
From: https://www.cnblogs.com/d111991/p/17577713.html

相关文章

  • 【单调队列】 单调队列的“扫描线”理解
    【单调队列】单调队列的“扫描线”理解  “如果一个选手比你小还比你强,你就可以退役了。”——单调队列的原理比你强,而且比你影响时间更长。某种意义上,数学思维是生活中的思考的延伸。  算法学习笔记(66):单调队列。引用Pecco的算法笔记。  在这里给出一种扫描线......
  • 3.1 C++ STL 双向队列容器
    双向队列容器(Deque)是C++STL中的一种数据结构,是一种双端队列,允许在容器的两端进行快速插入和删除操作,可以看作是一种动态数组的扩展,支持随机访问,同时提供了高效的在队列头尾插入和删除元素的操作。Deque双向队列容器与Vector非常相似,它不但可以在数组尾部插入和删除元素,还可以在......
  • 3.1 C++ STL 双向队列容器
    双向队列容器(Deque)是C++STL中的一种数据结构,是一种双端队列,允许在容器的两端进行快速插入和删除操作,可以看作是一种动态数组的扩展,支持随机访问,同时提供了高效的在队列头尾插入和删除元素的操作。Deque双向队列容器与Vector非常相似,它不但可以在数组尾部插入和删除元素,还可以在......
  • 代码随想录算法训练营第十三天|单调数列:滑动窗口最大值(力扣239.)、优先级队列:前k个高
    单调数列:滑动窗口最大值(力扣239.)给定滑动窗口的范围,求每个滑动窗口范围内的最大值使用单调队列实现对于最大值数字前面的数字不存入数列,对于最大值数字后面的数字存入数列中单调队列中数字的大小呈递减顺序pop(value):如果窗口移除的元素等于单调队列的队口元素,则pop;否则什......
  • RabbitMQ如何保证顺序消费
    面试官:你能说说RabbitMQ是如何保证消息顺序消费的吗?老任:如果我们想要保证消息是按照顺序进行发送的,发送到队列后,队列的消息应该是先进先出的,我们只需要一个队列配置一个消费者即可(窃喜中......)。面试官:我们的项目一般都是集群部署的,一个队列就会有多个消费者,怎么实现一个队列中所......
  • Redis专题-队列
    Redis专题-队列首先,想一想Redis适合做消息队列吗?1、消息队列的消息存取需求是什么?redis中的解决方案是什么?无非就是下面这几点:0、数据可以顺序读取1、支持阻塞等待拉取消息2、支持发布/订阅模式3、重新消费4、消息不丢失5、消息可堆积那我们来看看redis怎么满足这些需......
  • 《高级程序员 面试攻略 》RabbitMQ 如何实现可靠性
    RabbitMQ提供了多种机制来实现消息传递的可靠性。下面是一些常见的方法:1.持久化消息:RabbitMQ允许将消息标记为持久化,以确保即使在发生故障或重启后,消息也不会丢失。通过将消息的`deliverymode`设置为2,可以将消息标记为持久化消息。1.持久化队列:创建持久化队列可以确保即使......
  • 7.5 C/C++ 实现链表队列
    链表队列是一种基于链表实现的队列,相比于顺序队列而言,链表队列不需要预先申请固定大小的内存空间,可以根据需要动态申请和释放内存。在链表队列中,每个节点包含一个数据元素和一个指向下一个节点的指针,头节点表示队头,尾节点表示队尾,入队操作在队尾插入元素,出队操作在队头删除元素,队......
  • 循环队列
    为了避免当只有一个元素时,队头和队尾重合使得处理变得麻烦,所以引入两个指针front和rear。front即队头指针指向队头元素,rear即队尾指针指向队尾元素的下一个元素。这样当front等于rear是,不是队列中有一个元素,而是表示空队列。假设数组长度为5,空队列即初始状态如图所示,front和rear都......
  • 剑指 Offer 09. 用两个栈实现队列
    用两个栈实现一个队列。队列的声明如下,请实现它的两个函数appendTail和deleteHead,分别完成在队列尾部插入整数和在队列头部删除整数的功能。(若队列中没有元素,deleteHead操作返回-1)示例1:输入:["CQueue","appendTail","deleteHead","deleteHead","deleteHead"][[],[3],......