MQ-消息队列简单来说就是将“消息”放到“队列”中,然后慢慢处理队列中的消息。
完成延迟功能总体的思路是将消息放到队列中,为消息设置过期时间,不直接处理这个队列中的消息,
等到消息过期,将它转到另一个队列进行处理,从而完成延迟功能。
基本概念
1. 队列
队列是RabbitMQ的内部对象,用来存储消息。多个消费者可以订阅同一个队列。
2. 死信队列
消息变成死信的几种情况:
1. 消息被拒绝,并且requeue为false。
2. 消息过期
3. 队列达到最大长度
这里采取第二种方式,设置过期时间。
3. 交换器
交换器常用类型: direct、topic、fanout、headers。
路由键和队列之间的匹配规则取决于交换机的类型。
-
对于 direct 交换机:
- 当一个队列通过绑定键(binding key)与 direct 交换机绑定时,只有消息的路由键与绑定键完全匹配时,消息才会被路由到该队列。
-
对于 fanout 交换机:
- fanout 交换机会将消息广播给所有与之绑定的队列,忽略消息的路由键。
-
对于 topic 交换机:
- topic 交换机使用通配符匹配路由键和绑定键之间的关系。
- 路由键可以包含一个或多个单词(以点分隔),例如 "stock.usd.nyse"。
- 绑定键可以使用以下通配符进行匹配:
*
:匹配一个单词。#
:匹配零个或多个单词。
- 例如,绑定键 "stock.*.nyse" 可以匹配 "stock.usd.nyse",但不匹配 "stock.eur.nyse"。
-
对于 headers 交换机:
- headers 交换机使用消息的头部信息来匹配队列。
- 消息的头部信息是一组键值对。
- 当消息的头部信息与队列绑定时指定的键值对完全匹配时,消息才会被路由到该队列。
根据交换机类型和绑定规则,RabbitMQ 可以灵活地将消息路由到与之匹配的队列中。根据实际需求,我们可以选择合适的交换机类型和绑定规则来实现灵活的消息路由。
绑定
RabbitMQ通过绑定将消息路由到指定队列,Binding
的作用是将交换器和队列关联起来。
代码
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
@Configuration
public class RabbitMQConfig {
/**
* orderQueue是正常的队列,用于接收实时信息
* @return Queue
*/
@Bean
public Queue orderQueue() {
return new Queue("orderQueue");
}
/**
* orderDelayQueue是延迟队列,用于接收延迟信息
* @return Queue
*/
@Bean
public Queue orderDelayQueue() {
HashMap<String, Object> args = new HashMap<>();
// 通过x-dead-letter-exchange设置为死信队列
args.put("x-dead-letter-exchange", "");
// 设置死信路由键,死信会被路由到orderQueue中
args.put("x-dead-letter-routing-key", "orderQueue");
// 设置消息过期时间,单位毫秒
args.put("x-message-ttl", 60000);
return new Queue("orderDelayQueue", true, false, false, args);
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("orderExchange");
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("orderRoutingKey");
}
@Bean
public Binding orderDelayBinding() {
return BindingBuilder.bind(orderDelayQueue()).to(orderExchange()).with("orderDelayRoutingKey");
}
}
连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
发送消息
@Service
public class OrderService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public OrderService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendOrder(String params) {
var id = "1";
rabbitTemplate.convertAndSend("orderExchange", "orderRoutingKey", params, message -> {
message.getMessageProperties().setMessageId(id);
return message;
});
}
public void sendDelayedOrder(Date pickUpTime, String param) {
var id = "2";
// 发送消息到orderDelayQueue队列中
rabbitTemplate.convertAndSend("orderExchange", "orderDelayRoutingKey", param, message -> {
// 设置消息过期时间,单位毫秒
message.getMessageProperties().setExpiration(String.valueOf(getMilSecond(pickUpTime)));
message.getMessageProperties().setMessageId(id);
return message;
});
}
public long getMilSecond(Date pickupTime) {
long deliveryTime = pickupTime.getTime();
// 计算过期时间的毫秒数
return deliveryTime - System.currentTimeMillis();
}
}
消费信息
@Service
public class RabbitMQService {
@RabbitListener(queues = "orderQueue")
public void deal1(String params, Channel channel, Message message) {
System.out.println("orderQueue" + params);
}
@RabbitListener(queues = "orderQueue")
public void deal2(String params, Channel channel, Message message) {
System.out.println("orderQueue" + params);
}
标签:return,队列,Spring,绑定,Boot,RabbitMQ,消息,public,路由
From: https://www.cnblogs.com/shames/p/17822450.html