1、使用RabbitMq实现延时队列方法1
2、基于我们的业务我们使用下面这种方式实现延时队列
1、导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、编写rabbitmq的相关配置
2、编写配置类来创建队列、交换机、绑定关系
package com.gulimall.order.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 使用配置类快速创建交换机、队列、绑定关系
*/
@Configuration
public class MyRabbitMqConfig {
/**
* @Bean的作用是自动给rabbitmq创建我们的交换机、队列、绑定关系
* @return
*/
/**
* 监听我们的死信队列 【由于我们在配置文件中设置了手动ack消息,所以我们在这里通过Chnnel来接受消息】
* @return
*/
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity order, Channel channel, Message message) throws IOException {
String orderSn = order.getOrderSn();
System.out.println("收到过期消息:" + orderSn);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手动确认消息
}
//一个交换机
@Bean
public Exchange orderEventExchange(){
/**
* 对于只有一个交换机的设计,我们一般创建一个topic交换机
* (String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
*/
return new TopicExchange("order-event-exchange", true, false);
}
//死信队列
@Bean
public Queue orderDelayQueue(){
/**
* 配置参数
* x-dead-letter-exchange: order-event-exchange
* x-dead-letter-routing-key: order.release.order
* x-message-ttl: 60000
*/
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");//死信路由到order-event-exchange交换机
arguments.put("x-dead-letter-routing-key", "order.release.order");//死信路由键
arguments.put("x-message-ttl", 60000);//消息过期时间
/**
* String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
*/
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
//普通队列
@Bean
public Queue orderReleaseOrderQueue(){
/**
* String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
*/
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
//交换机绑定队列
@Bean
public Binding orderCreateOrderBinding(){
/**
* String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
*/
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
}
//交换机绑定队列
@Bean
public Binding orderReleaseOrderBinding(){
/**
* String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
*/
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
}
}
重启项目后,如果发现rabbitmq官网上面没有自动给我们创建交换机和队列,我们只需要给rabbitmq发送一个请求即可自动创建,我的请求如下