1、死信的来源
队列中的消息可能会变成死信消息(dead-lettered),进而当以下几个事件任意一个发生时,消息将会被重新发送到一个交换机:
- 消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged);
- 消息由于消息有效期(per-message TTL)过期;
- 消息由于队列超过其长度限制而被丢弃;
- 注意,队列的有效期并不会导致其中的消息过期
2、死信流程
c1:声明普通交换机、普通队列、死信交换机、死信队列、让普通队列与死信交换机进行捆绑(转发)
C2:接收死信队列的消息。
3、代码
3.1 ConsumerTTL01代码
public class ConsumerTTL01 {
//普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通交换机的routing key
public static final String NORMAL_ROUTING_KEY = "zhangsan";
//死信交换机的routing key
public static final String DEAD_ROUTING_KEY = "lisi";
//普通队列的名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1、获取channel
Channel channel = RabbitMQUtils.getChannel();
//2、声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE , BuiltinExchangeType.DIRECT);
//3、声明队列
//3.1普通队列设置参数,转发给死信交换机
Map <String, Object> arguments = new HashMap <>();
//过期时间
arguments.put("x-message-ttl" , 1000);
//设置绑定的交换机
arguments.put("x-dead-letter-exchange" , DEAD_EXCHANGE);
//设置绑定的routing key
arguments.put("x-dead-letter-routing-key" , DEAD_ROUTING_KEY);
//设置正常队列长度的限制,例如发送10个消息,6个为正常,4个为死信
arguments.put("x-max-length" , 6);
channel.queueDeclare(NORMAL_QUEUE , true , false , false , arguments);
//4、绑定交换机与队列
channel.queueBind(NORMAL_QUEUE , NORMAL_EXCHANGE , NORMAL_ROUTING_KEY);
channel.exchangeDeclare(DEAD_EXCHANGE , BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE , true , false , false , null);
channel.queueBind(DEAD_QUEUE , DEAD_EXCHANGE , DEAD_ROUTING_KEY);
//5、接收消息
System.out.println("ConsumerTTL01等待接收消息...");
//消息传递时的回调
DeliverCallback deliverCallback = (String consumerTag , Delivery message) -> {
String msg = new String(message.getBody());
if(msg.equals("info5")) {
System.out.println("Consumer01接受的消息是:" + msg + ": 此消息是被C1拒绝的");
channel.basicReject(message.getEnvelope().getDeliveryTag() , false);
} else {
System.out.println("Consumer01接受的消息是:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag() , false);
}
};
//消费者取消时的回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("ConsumerTTL01通知消费者取消的回调接口....");
};
//开启手动应答
channel.basicConsume(NORMAL_QUEUE , false , deliverCallback , cancelCallback);
}
}
3.2 ConsumerTTL02 代码
public
class
ConsumerTTL02 { |
3.3 ProducerTTL 代码
public
class
ProducerTTL { |
3.4消息TTL过期
模拟方式:启动消费者ConsumerTTL01后,关闭该消费者,模拟ConsumerTTL01收不到消息,到了消息过期时间后,消息自动转发到死信队列,由ConsumerTTL02执行
设置消息有效期的方式:
通过生产者设置:
// 2、死信消息 设置ttl时间 live to time 单位是ms |
通过消费者设置:
//过期时间 |
3.5死信最大长度
在将普通队列与死信队列进行绑定时,通过设置"x-max-length",确定队列的有效长度。
//过期时间 |
注意:再展示时,需要先将ConsumerTTL01关闭,带ProducerTTL发送完消息后,将ConsumerTTL01打开。
3.6死信消息被拒
ConsumerTTL01需要开启手动应答
//消息传递时的回调 |
标签:false,String,NORMAL,队列,死信,channel From: https://www.cnblogs.com/FISH-ROSE/p/16587461.html