首页 > 其他分享 >死信队列

死信队列

时间:2022-08-15 14:48:41浏览次数:43  
标签:false String NORMAL 队列 死信 channel

1、死信的来源

队列中的消息可能会变成死信消息(dead-lettered),进而当以下几个事件任意一个发生时,消息将会被重新发送到一个交换机:

  • 消息被消费者使用basic.rejectbasic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged);
  • 消息由于消息有效期(per-message TTL)过期;
  • 消息由于队列超过其长度限制而被丢弃;
  • 注意,队列的有效期并不会导致其中的消息过期

2、死信流程

    c1:声明普通交换机、普通队列、死信交换机、死信队列、让普通队列与死信交换机进行捆绑(转发)

    C2:接收死信队列的消息。

3、代码

3.1 ConsumerTTL01代码

public class ConsumerTTL01 {

//普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE = "dead_exchange";

//普通交换机的routing key
public static final String NORMAL_ROUTING_KEY = "zhangsan";
//死信交换机的routing key
public static final String DEAD_ROUTING_KEY = "lisi";

//普通队列的名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//1、获取channel
Channel channel = RabbitMQUtils.getChannel();

//2、声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE , BuiltinExchangeType.DIRECT);
//3、声明队列
//3.1
普通队列设置参数,转发给死信交换机
Map <String, Object> arguments = new HashMap <>();
//过期时间
arguments.put("x-message-ttl" , 1000);
//设置绑定的交换机
arguments.put("x-dead-letter-exchange" , DEAD_EXCHANGE);
//设置绑定的routing key
arguments.put("x-dead-letter-routing-key" , DEAD_ROUTING_KEY);
//设置正常队列长度的限制,例如发送10个消息,6个为正常,4个为死信
arguments.put("x-max-length" , 6);

channel.queueDeclare(NORMAL_QUEUE , true , false , false , arguments);

//4、绑定交换机与队列
channel.queueBind(NORMAL_QUEUE , NORMAL_EXCHANGE , NORMAL_ROUTING_KEY);

channel.exchangeDeclare(DEAD_EXCHANGE , BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE , true , false , false , null);
channel.queueBind(DEAD_QUEUE , DEAD_EXCHANGE , DEAD_ROUTING_KEY);

//5、接收消息
System.out.println("ConsumerTTL01等待接收消息...");

//消息传递时的回调
DeliverCallback deliverCallback = (String consumerTag , Delivery message) -> {
String msg = new String(message.getBody());

if(msg.equals("info5")) {

System.out.println("Consumer01接受的消息是:" + msg + ": 此消息是被C1拒绝的");

channel.basicReject(message.getEnvelope().getDeliveryTag() , false);

} else {
System.out.println("Consumer01接受的消息是:" + msg);

channel.basicAck(message.getEnvelope().getDeliveryTag() , false);

}
};

//消费者取消时的回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("ConsumerTTL01通知消费者取消的回调接口....");
};

//开启手动应答
channel.basicConsume(NORMAL_QUEUE , false , deliverCallback , cancelCallback);

}
}

 

3.2 ConsumerTTL02 代码

 

public class ConsumerTTL02 {

//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {

    //1、获取channel
    Channel channel = RabbitMQUtils.getChannel();

    System.out.println("ConsumerTTL02等待接收死信消息...");

    //2、接收消息
    DeliverCallback deliverCallback = (consumerTag , message) -> {
     System.out.println("ConsumerTTL02接受的消息是:" + new String(message.getBody() , StandardCharsets.UTF_8));
    };

    CancelCallback cancelCallback = (String consumerTag) -> {
     System.out.println("ConsumerTTL02通知消费者取消的回调接口....");
    };

    channel.basicConsume(DEAD_QUEUE , true , deliverCallback , cancelCallback);

}
}

 

3.3 ProducerTTL 代码

 

 

public class ProducerTTL {

//生产者只需要和普通交换机进行绑定
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//普通交换机的routing key
public static final String NORMAL_ROUTING_KEY = "zhangsan";

public static void main(String[] args) throws IOException, TimeoutException {
    //1、获取channel
    Channel channel = RabbitMQUtils.getChannel();

    //2、死信消息 设置ttl时间 live to time 单位是ms
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("1000").build();

    for(int i = 0; i < 11; i++) {

     String msg = "info" + i;
     //3、发送消息
     channel.basicPublish(NORMAL_EXCHANGE , NORMAL_ROUTING_KEY , properties , msg.getBytes());
    }
}
}

 

3.4消息TTL过期

        模拟方式:启动消费者ConsumerTTL01后,关闭该消费者,模拟ConsumerTTL01收不到消息,到了消息过期时间后,消息自动转发到死信队列,由ConsumerTTL02执行

        设置消息有效期的方式:

            通过生产者设置:

// 2、死信消息 设置ttl时间 live to time 单位是ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("1000").build();

 

            通过消费者设置:

//过期时间
arguments.put("x-message-ttl" , 1000);
channel.queueDeclare(NORMAL_QUEUE , true , false , false , arguments);

 

3.5死信最大长度

        在将普通队列与死信队列进行绑定时,通过设置"x-max-length",确定队列的有效长度。

 

    //过期时间
    arguments.put("x-message-ttl", 10000);
    //设置绑定的交换机
    arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
    //设置绑定的routing key
    arguments.put("x-dead-letter-routing-key", DEAD_ROUTINGKEY);
    //设置正常队列长度的限制,例如发送10个消息,6个为正常,4个为死信
    arguments.put("x-max-length",6);
    
    channel.queueDeclare(NORMAL_QUEUE , false , false , false , arguments);

        注意:再展示时,需要先将ConsumerTTL01关闭,带ProducerTTL发送完消息后,将ConsumerTTL01打开。

3.6死信消息被拒

        ConsumerTTL01需要开启手动应答

 

    //消息传递时的回调
    DeliverCallback deliverCallback = (String consumerTag , Delivery message) -> {
     String msg = new String(message.getBody());

     if(msg.equals("info5")) {

        System.out.println("Consumer01接受的消息是:" + msg + ": 此消息是被C1拒绝的");

        channel.basicReject(message.getEnvelope().getDeliveryTag() , false);

     } else {
        System.out.println("Consumer01接受的消息是:" + msg);

        channel.basicAck(message.getEnvelope().getDeliveryTag() , false);

     }
    };
    
    //消费者取消时的回调
    CancelCallback cancelCallback = (String consumerTag) -> {
     System.out.println("ConsumerTTL01通知消费者取消的回调接口....");
    };

    //开启手动应答
    channel.basicConsume(NORMAL_QUEUE , false , deliverCallback , cancelCallback);

 

 

标签:false,String,NORMAL,队列,死信,channel
From: https://www.cnblogs.com/FISH-ROSE/p/16587461.html

相关文章

  • 双端队列简单实现
    设计循环双端队列classMyCircularDeque{privateint[]elements;privateintrear,front;privateintcapacity;publicMyCircularDeque(intk......
  • 24第四章:09_死信队列
    一、什么是死信队列当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会......
  • JDK数组阻塞队列源码深入剖析
    JDK数组阻塞队列源码深入剖析前言在前面一篇文章从零开始自己动手写阻塞队列当中我们仔细介绍了阻塞队列提供给我们的功能,以及他的实现原理,并且基于谈到的内容我们自己实......