死信的概念
死信队列(Dead Letter Queue)是指当消息无法被消费者正常消费时,将这些无法消费的消息发送到专门的死信队列中,以便进行进一步的处理。这种处理方式通常被称为“死信处理”。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效
在RabbitMQ中,死信队列通常用于以下几种情况:
-
消息被拒绝:消费者在消费消息时,如果无法处理该消息,可以将该消息拒绝并返回到RabbitMQ。此时,RabbitMQ可以将这些被拒绝的消息发送到死信队列中,以便进行处理。
-
消息过期:在发送消息时,可以设置消息的过期时间。如果消息在过期时间内没有被消费者消费,那么RabbitMQ会将这些过期的消息发送到死信队列中。
-
队列达到最大长度:当队列达到最大长度时,新的消息将无法被发送到队列中。此时,RabbitMQ可以选择将这些无法发送的消息发送到死信队列中,以便进行进一步处理。
实战
说明:这里的交换机类型可以自己定义,不一定要求就是直连,根据业务场景选择不同的类型。
正常情况下,生产者发送消息到交换机上,然后到普通的队列,由消费者消费,但是当达到图中三个条件中的其中一个,此时消息会转为死信消息。由于消息不能丢失,要保持正常的消费,所以此时需要定义死信交换机和队列,由死信消费者去消费。
生产者
我将有关于队列的声明,交换机的声明,以及队列与交换机的绑定关系都写在生产者中。
在此之前还记得构建队列时,我们的有一个形参需要传递一个map嘛,这个map可以定义相关的参数
channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);
例如普通队列与死信交换机的绑定关系
// 2.1 构建参数:指定某种条件下达成的死信消息
Map<String, Object> arguments = new HashMap<>();
// 过期时间 10s = 10000ms 可以由生产者中设定----一般由生产者发送
// arguments.put("x-message-ttl",10000);
//正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","del-queue");
channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);
完整代码如下:
/**
* 死信生产者
* 定义相关队列、交换机等
*/
public class DieProducer {
// 定义普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 定义普通队列名称
public static final String NORMAL_QUEUE = "normal_queue";
// 定义 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
// 定义 死信队列名称
public static final String DEAD_QUEUE = "dead_queue";
// 初始化队列
public static void init( Channel channel) throws Exception{
// 1.声明普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 2.声明普通队列
// 2.1 构建参数:指定某种条件下达成的死信消息
Map<String, Object> arguments = new HashMap<>();
// 过期时间 10s = 10000ms 可以由生产者中设定----一般由生产者发送
arguments.put("x-message-ttl",10000);
//正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","del-queue");
channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);
// 3.声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
// 4.声明死信队列
channel.queueDeclare(DEAD_QUEUE,true,false,false,null);
//5.绑定队列与交换机的关系
// 5.1 绑定普通队列与普通交换机的关系
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"h1");
// 5.2 绑定死信队列与死信交换机的关系
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"del-queue");
}
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
// 初始化操作
init(channel);
//发送消息
for (int i = 1; i < 11 ; i++) {
String message = "info "+i;
channel.basicPublish(NORMAL_EXCHANGE,"h1",null,message.getBytes(StandardCharsets.UTF_8));
}
}
}
运行:
可以看到普通队列的消息都转发到了死信队列中了,这是因为设置了消息过期时间,从而满足了成为死信队列的条件之一
// 设置过期时间,当消息时间10秒还未消费,那么就成为死信队列
arguments.put("x-message-ttl",10000);
//正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","del-queue");
普通消费者
代码很简单,只需要接收消息就好了
/**
* 普通消费者,用于消费普通消息队列
*/
public class DieNormalConsumer {
// 定义普通队列名称
public static final String NORMAL_QUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
// 定义消息成功的回调
DeliverCallback nackCallback = ( consumerTag, message)->{
//接收消息,打印到控制台
System.out.println("接收到普通队列的消息:"+new String(message.getBody()));
};
channel.basicConsume(NORMAL_QUEUE,true,nackCallback,(consumerTag)->{});
}
}
死信消费者
同样的,只需要接收消息就好了
/**
* 死信消费者,用于消费死信消息队列
*/
public class DieDeadConsumer {
// 定义 死信队列名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
// 定义消息成功的回调
DeliverCallback nackCallback = ( consumerTag, message)->{
//接收消息,打印到控制台
System.out.println("接收到死信队列的消息:"+new String(message.getBody()));
};
channel.basicConsume(DEAD_QUEUE,true,nackCallback,(consumerTag)->{});
}
}
以上代码实现是基于TTL消息过期时间导致普通队列转换为死信队列的场景
我们知道,成为死信只需要满足三个条件中的其中一个即可
-
消息被拒绝:消费者在消费消息时,如果无法处理该消息,可以将该消息拒绝并返回到RabbitMQ。此时,RabbitMQ可以将这些被拒绝的消息发送到死信队列中,以便进行处理。
-
消息过期:在发送消息时,可以设置消息的过期时间。如果消息在过期时间内没有被消费者消费,那么RabbitMQ会将这些过期的消息发送到死信队列中。
-
队列达到最大长度:当队列达到最大长度时,新的消息将无法被发送到队列中。此时,RabbitMQ可以选择将这些无法发送的消息发送到死信队列中,以便进行进一步处理。
队列达到最大长度成为死信
只需要在上面的案例中修改声明时的参数列表即可
//3.声明普通队列:因为如果消息称为死信,那么是由队列发送给死信交换机的。所以需要用到 arguments
Map<String, Object> arguments = new HashMap<>();
//正常队列设置过期之后的死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","lisi");
arguments.put("x-max-length",6); // 表示 队列最大长度为6,超过即为死信队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
例如我循环发送10条信息,但是队列最大长度为6,在未消费的情况下,剩余的4个消息就会成为死信
消息被拒成为死信
例如在消费者方,指定某个消息我不接收,那么该消息就会转到死信队列中
// 接收消息成功的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
// 拒收 消息 info5,同时将他送会死信队列中
if(message.equals("info5")){
System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到消息"+message);
// 开启逐一应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
总结参数
以上三个案例用到的参数:
arguments.put("x-message-ttl",10000);
//正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","del-queue");
arguments.put("x-max-length",6); // 表示 队列最大长度为6,超过即为死信队列
标签:队列,rabbitmq,死信,交换机,消息,channel,arguments
From: https://www.cnblogs.com/zgf123/p/17674163.html