常用的应用场景
死信队列常常用作延时关闭订单(如订单的超时后的取消订单等),虽然小项目中可以用定时轮询的方法进行检查,但是数据量一旦比较大时,定时轮询将给数据库带来不小的压力,而且定时间隔无法进行动态调整,特别是一个系统中,同时存在好几个定时器的时候,就显得非常的麻烦,同时给数据库造成巨大的访问压力。这时候就可以使RabbitMQ的死信队列。
概念解释
DLX
- Dead Letter Exchange 的缩写
- DLX也叫死信邮箱(网上的译法),死信交换机(字面翻译)。归根结底就是一个交换机,当队列中出现死信时,通过这个交换机将死信重新发送到死信队列中(指定好rabbitmq会自动发送)。
什么是死信
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
死信交换机
- 在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。
什么是死信队列
- 死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已。
消息变成死信后,会被重新投递(publish)到另一个交换机上(Exchange),这个交换机就被称作DLX及死信交换机,然后交换机根据绑定规则转发到对应的队列上,监听该队列就可以被重新消费。
生产者-->发送消息-->交换机-->队列-->变成死信队列-->DLX交换机-->队列-->监听-->消费者
示例代码
添加依赖
implementation 'org.springframework.boot:spring-boot-starter-amqp'
添加配置
spring:
rabbitmq:
host: 10.0.0.19
port: 5672
username: guiyun
password: 111222
添加配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author Guiyun
* @date 2020/1/7 下午 1:51
*/
@Configuration
public class RabbitConfig{
/**
* 死信队列 交换机标识符
*/
private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列 交换机绑定键标识符
*/
private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 创建死信交换机
* @return
*/
@Bean("deadLetterExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
}
/**
* 创建一个死信队列.
* @return
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>(2);
//声明 死信交换机
args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
//声明 死信路由键
args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
}
/**
* 定义死信队列转发队列.
* @return the queue
*/
@Bean("redirectQueue")
public Queue redirectQueue() {
return QueueBuilder.durable("REDIRECT_QUEUE").build();
}
/**
* 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
* @return the binding
*/
@Bean
public Binding deadLetterBinding() {
return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
}
/**
* 死信路由通过 KEY_R 绑定键绑定到死信队列上.
* @return the binding
*/
@Bean
public Binding redirectBinding() {
return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
}
}
生产者向业务队列发送消息
@Autowired
RabbitTemplate rabbitTemplate;
public void spend(JSONObject data) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置编码
messageProperties.setContentEncoding("utf-8");
// 设置过期时间5秒
messageProperties.setExpiration(5000);
return message;
};
rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", data, messagePostProcessor, correlationData);
}
死信队列消费者
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author Guiyun
* @date 2020/1/7 上午 11:09
*/
@Component
@RabbitListener(queues = "DL_QUEUE")
public class DeadLetterConsumer {
private static final Logger log = LoggerFactory.getLogger(DeadLetterConsumer.class);
@RabbitListener(queues = {"REDIRECT_QUEUE"})
public void redirect(JSONObject json) throws IOException {
log.info(json.toJSONString());
}
}