首页 > 其他分享 >Rabbitmq中的死信队列

Rabbitmq中的死信队列

时间:2024-08-03 21:23:37浏览次数:16  
标签:false String EXCHANGE 队列 Rabbitmq 死信 channel

背景

        RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

原理

        死信队列和普通队列区别不是很大

        普通与死信队列都有自己独立的交换机和路由key、队列和消费者。

区别:

1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到

普通队列中缓存起来,普通队列对应有自己独立普通消费者。

2.如果生产者投递消息到

死信队列和普通队列区别不是很大

普通与死信队列都有自己独立的交换机和路由key、队列和消费者。

区别:

1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到

普通队列中缓存起来,普通队列对应有自己独立普通消费者。

2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费

的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机

对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

普通队列中,普通队列发现该消息一直没有被消费者消费

的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机

对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

产生死信队列的原因

工具类:

public class RabbitMqUtils {
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        //创建连接
        Connection connection = factory.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        return channel;
    }
}

  1. 消息投递到MQ中存放 消息已经过期  消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。

消息 TTL 过期

生产者代码:

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //设置消息的 TTL 时间
            AMQP.BasicProperties properties = new
                    AMQP.BasicProperties().builder().expiration("10000").build();
            //该信息是用作演示队列个数限制
            for (int i = 1; i <11 ; i++) {
                String message="info"+i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
                        message.getBytes());
                System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

消费端:

普通消费:

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");

        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息"+message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

死信消费:

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信队列消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收死信队列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

  2.关闭消费端->启动生产者 再次查看结果    结果原因:消费端队列关闭,消息时间过期会进入死信队列

2. 队列达到最大的长度 (队列容器已经满了)

生产者代码:

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

            //该信息是用作演示队列个数限制
            for (int i = 1; i <11 ; i++) {
                String message="info"+i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null,
                        message.getBytes());
                System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

消费端:

普通消费:

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        params.put("x-max-length",6); //设置队列长度为6

        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息"+message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

死信消费:

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信队列消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收死信队列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

         结果原因:消费端队列只能存储6条消息,剩下的消息进入死信队列

3. 消费者消费多次消息失败,就会转移存放到死信队列中

注意:与其他不同的是,需要开启手动应答 

生产者代码:

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //该信息是用作演示队列个数限制
            for (int i = 1; i <11 ; i++) {
                String message="info"+i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null,
                        message.getBytes());
                System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

消费端:

普通消费:

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");

        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if(message.equals("info5")){
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }else {
                System.out.println("Consumer01 接收到消息"+message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
        });
    }
}

死信消费:

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信队列消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收死信队列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

         结果原因:消费端队列消息中为info5时,消费端拒绝签收,只能进入死信队列

标签:false,String,EXCHANGE,队列,Rabbitmq,死信,channel
From: https://blog.csdn.net/m0_57921272/article/details/140895661

相关文章

  • RabbitMQ知识总结(基本原理+高级特性)
    文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/基本原理消息的可靠性投递RabbitMQ消息的投递路径为:生产者------>交换机------>队列------>消费者在Ra......
  • JAVA中实现队列和栈(Deque接口和ArrayDeque类)
    用什么来实现队列和栈首先JAVA中有一个Queue接口,用来实现队列。Deque其实就是双端队列,代表两端都可进可出的队列。ArrayDeque就是用数组来实现这个双端队列。(Deque由于是接口,只可以用于声明对象,但是没办法实例化,实例化还是要使用ArrayDeque类)这时可能就会产生疑惑,队列有了,......
  • 【leetcode232:用栈实现队列】
    leetcode232:用栈实现队列题目:请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(push、pop、peek、empty):实现MyQueue类:voidpush(intx)将元素x推到队列的末尾intpop()从队列的开头移除并返回元素intpeek()返回队列开头的元素booleanemp......
  • 用队列实现栈
    请你仅使用两个队列实现一个后入先出(LIFO)的栈,并支持普通栈的全部四种操作(push、top、pop 和 empty)。实现 MyStack 类:voidpush(intx) 将元素x压入栈顶。intpop() 移除并返回栈顶元素。inttop() 返回栈顶元素。booleanempty() 如果栈是空的,返回 true ;否则......
  • 线性表之--栈和队列
    1. 栈的表示和实现1.1栈的概念及结构栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶,另一端称为栈底。栈中的数据元素遵守后进先出LIFO(LastInFirstOut)的原则。压栈:栈的插入操作叫做进栈/压......
  • 代码随想录算法训练营第二十五天|134. 加油站、135. 分发糖果、860.柠檬水找零、406.
    写代码的第二十五天继续贪心!!gogogo!134.加油站思路贪心算法总让我有种脑子知道每次怎么计算,但是写不出来,也想不出贪心贪在哪里了,就只是觉得应该这么做。。。。。本题中大家可以按照自己的计算方法一步一步模拟一下这个过程,然后会发现其实每次都是要计算每站剩余的油量,......
  • Java/SpringCloud/RabbitMq/无感实现消息携带用户信息 --- 逻辑讲解、代码实现、图形
    一、需求:依据黑马商城hmall为例,用户下单创建订单后,交易服务trade-service向交换机topic发送消息,交换机topic路由到队列,购物车服务cart-service监听此队列,实现自动清空购物车。改造下单功能,将基于OpenFeign的清理购物车同步调用,改为基于RabbitMQ的异步通知:定义t......
  • 栈和队列概念及相关实现
    栈在线性逻辑结构中,可以是顺序存储和链式存储两种方式,其操作可以在线性表中的任意位置实现,可能不符合实际应用需求。优化的线性表,其中包含栈和队列(注意:一下代码均以C语言实现)1栈的概述所谓的栈,也称为堆栈,是一个特殊的线性逻辑关系。只能在固定端实现栈的读写访问,其固定......
  • .NET使用RabbitMQ发送消息
    usingRabbitMQ.Client;usingSystem;usingSystem.Collections.Generic;usingSystem.Linq;usingSystem.Text;usingSystem.Threading.Tasks;namespaceCommon{publicclassRabbitMQSender{privatestaticRabbitMQSenderinstance;pr......
  • 生命队列交换机方法
    在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。因此推荐的做法是由程序启动时检查队列......