首页 > 其他分享 >18-RabbitMQ高级特性-死信队列

18-RabbitMQ高级特性-死信队列

时间:2022-10-04 23:33:53浏览次数:73  
标签:exchange 队列 18 RabbitMQ 死信 import dlx channel

死信队列

死信队列: DLX, Dead-Letter-Exchange

  • 利用DLX, 当消息在一个队列中变成死信(dead message)之后, 它能被重新publish到另一个Exchange, 这个Exchange就是DLX
  • DLX也是一个正常的Exchange, 和一般的Exchange没有区别, 他能在任何的队列上被指定, 实际上就是设置某个队列的属性
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去, 进而被路由到另一个队列
  • 可以监听这个队列的消息做相应的处理, 这个特征可以弥补RabbitMQ3.0以前支持的immediate参数功能

消息变成死信的情况

  • 消息被拒绝(basic.reject/basic.nack), 并且requeue=false(关闭重回队列)
  • 消息TTL过期
  • 队列达到最大长度

死信队列的设置

  • 首先需要设置死信队列的Exchange和Queue, 然后进行绑定
    • Exchange: dlx.exchange
    • Queue: dlx.queue
    • RoutingKey: #
  • 然后进行正常的声明交换机, 队列, 绑定, 只不过需要在队列上添加一个参数即可
    • arguments.put("x-dead-letter-exchange","dlx.exchange");
  • 这样消息在过期, requeue=false, 队列在达到最大长度时, 消息就直接路由到死信队列了!

死信队列代码实现

消费者

package com.dance.redis.mq.rabbit.dlx;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
 
public class Receiver4DLXtExchange {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        // 声明正常的 exchange queue 路由规则
        String queueName = "test_dlx_queue";
        String exchangeName = "test_dlx_exchange";
        String routingKey = "group.*";
        RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);

        //    注意在这里要加一个特殊的属性arguments: x-dead-letter-exchange
        Map<String, Object> arguments = new HashMap<>();
        // 指定死信队列
        arguments.put("x-dead-letter-exchange", "dlx.exchange");
        // 指定死信队列的路由规则
        arguments.put("x-dead-letter-routing-key", "dlx.*");
        RabbitMQHelper.queueDeclare(channel,queueName,true,arguments);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 声明死信队列
        // dlx declare:
        RabbitMQHelper.exchangeDeclare(channel,"dlx.exchange",RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
        channel.queueDeclare("dlx.queue", false, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                // 拒绝并且不重回队列, 这样就会进入死信队列
                channel.basicReject(envelope.getDeliveryTag(),false);
            }
        };
        //    参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, false, consumer);
        //等待回调函数执行完毕之后,关闭资源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

package com.dance.redis.mq.rabbit.dlx;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class Sender4DLXExchange {


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_dlx_exchange";
        String routingKey = "group.dlx";
        Map<String, Object> headers = new HashMap<>();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .headers(headers).build();
        String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
    }

}

死信队列测试

启动消费者

启动生产者

查看消费者

应为直接拒绝了, 所以没有消费, 查看控制台

已经被加入到死信队列中了, 为啥是3呢, 应为我之前测试了两次, 这个时候, 如果是写业务的话, 就可以通过消费死信队列的消息, 完成消费失败的, 或者过期的补偿了~

我这里只是用了拒绝策略, TTL过期和队列满, 都会进入, 可以自己试一下

标签:exchange,队列,18,RabbitMQ,死信,import,dlx,channel
From: https://www.cnblogs.com/flower-dance/p/16754826.html

相关文章

  • 07-RabbitMQ核心API-Direct Exchange
    DirectExchange简介所有发送到directexchange的消息被转发到Routekey中指定的Queue注意:Direct模式可以使用RabbitMQ自带的Exchange(defaultexchange),所以不需......
  • 08-RabbitMQ核心API-Topic Exchange
    TopicExchange简介所有发送到TopicExchange的消息被转发到所有关心RouteKey中指定Topic的Queue上Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个T......
  • 09-RabbitMQ核心API-Fanout Exchange
    FanoutExchange简介不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上Fanout交换机转发消息是最快的......
  • 10-RabbitMQ核心API-其他[Binding, Queue, Message, Virtual host]
    Binding绑定关系Exchange和Exchange,Queue之间的连接关系Binding中可以包含RouteKey或者参数Queue消息队列,实际存储消息数据Durability:是否持久化,Durable......
  • 11-RabbitMQ高级特性-消息如何保证100%的投递成功
    消息如何保证100%的投递成功什么是生产端的可靠性投递保障消息的成功发出保障MQ节点的成功接收发送端收到MQ节点(Broker)确认应答完善的消息进行补偿机制BAT/TMD......
  • 12-RabbitMQ高级特性-在海量订单产生的业务高峰期, 如何避免消息的重复消费问题
    幂等性概念详解幂等性是什么可以借鉴数据库的乐观锁机制比如执行一条更新库存的SQLupdatet_repssetcount=count-1,version=version+1whereversion......
  • 13-RabbitMQ高级特性-Confirm确认消息
    Confirm确认消息理解Confirm消息确认机制消息的确认,是指投递消息后,如果Broker收到消息,则会给我们生产者一个应答生产者进行接收应答用来确定这条消息是否正常的......
  • 04-基于CentOS7安装RabbitMQ3.10.7
    RabbitMQ安装与入门安装与启动我实在是找不到这么老的版本了,直接用最新版本的,按照道理来说,新版本是兼容老版本的官网地址https://www.rabbitmq.com/Erlang安......
  • 05-RabbitMQ控制台入门及其Java简单操作
    MQ控制台简单操作建立Exchange新建Exchange成功新建Queue新建Queue成功建立Exchange与Queue的关系建立关系成功路由键:就是指发送到Exchange的消息,通......
  • 06-RabbitMQ核心API-Exchange
    Exchange流程图接收消息,并根据路由键转发消息所绑定的队列Exchange属性属性含义name交换机名称type交换机类型[direct|topic|fanout......