概念
所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
使用场景
1、订单在十分钟之内未支付则自动取消
2、预定会议后,需要在预定时间点前十分钟通知各个与会人员参加会议。
3、淘宝七天自动确认收货,自动评价功能等
TTL(消息存活时间)
TTL 是 RabbitMQ 中一个消息或者队列的属性
表示一条消息或是该队列中的所有消息的最大存活时间,单位是毫秒;目前有两种方法可以设置消息的 TTL。
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。
如果两种方法一起使用,则消息的过期时间以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信”。
当设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列则会被丢到死信队列中)
当设置了消息的 TTL 属性,那么消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意一点是,如果不设置 TTL,表示消息永远不会过期
RabbitMQ 中的 TTL
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间,
单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
- 消息TTL
- 队列TTL
场景
对于消息TTL来说,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者 之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
场景:用户自定义发布文章,用户自定义闹钟提示
对于队列TTL来说,场景:需要固定的时间,例如订单支付,每个用户下单30分钟必须支付,否则取消。
如何使用,在整合springboot篇章讲解
解释
其实跟死信队列一样,大致流程就是 生产者发送消息到普通交换机,交换机与普通队列绑定,设置一个过期时间,当这个时间到了,通过参数使得普通队列的消息转发到死信队列,然后由死信消费者去消费。
整合springboot实现延迟队列
案例引入
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
引入依赖
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
application.yml文件
spring:
rabbitmq:
port: 5672
host: xxxxx
username: xxxx
password: xxxx
配置文件类
用于声明队列,交换机,路由key ,绑定关系等,这样就不需要消费者或者生产者端去声明了。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* TTL队列 配置文件类
*/
@Configuration
public class TtlQueueConfig {
// 普通交换机名称
public static final String X_EXCHANGE = "x";
// 普通队列名称
public static final String QA_QUEUE = "QA";
public static final String QB_QUEUE = "QB";
// 死信交换机名称
public static final String Y_DEAD_EXCHANGE = "Y";
// 死信交换机名称
public static final String QD_DEAD_QUEUE = "QD";
// 声明普通交换机 x
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 声明死信交换机 y
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_EXCHANGE);
}
// 声明普通队列 QA 需要携带TTL参数过期时间,同时要绑定死信交换机
// QA 10s中过期
@Bean("queueA")
public Queue queueA(){
Map<String,Object> arguments = new HashMap<>();
// 绑定死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE);
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间ttl
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QA_QUEUE).withArguments(arguments).build();
}
// 声明普通队列QB ttl过期时间40s
@Bean("queueB")
public Queue queueB(){
Map<String,Object> arguments = new HashMap<>();
// 绑定死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE);
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间ttl
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QB_QUEUE).withArguments(arguments).build();
}
//声明死信队列 QD
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(QD_DEAD_QUEUE).build();
}
//绑定关系 X交换机绑定 QA routking 为 XA。
// @Qualifier 注解,可以根据声明的Bean名称进行自动注入
@Bean
public Binding queueABindX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange eXchange){
return BindingBuilder.bind(queueA).to(eXchange).with("XA");
}
// 绑定关系, X交换机绑定队列QB,routking 为 XB
@Bean
public Binding queueABindB(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange eXchange){
return BindingBuilder.bind(queueB).to(eXchange).with("XB");
}
// 绑定关系, X交换机绑定队列QB,routking 为 XB
@Bean
public Binding queueABindY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yXchange){
return BindingBuilder.bind(queueD).to(yXchange).with("YD");
}
}
生产者
/**
* 发送延迟消息
* http://localhost:8080/ttl/sendMsg/嘻嘻嘻
*
* 发送的消息 通过url 去体现, 如 嘻嘻嘻 既是消息
*/
@RestController
@Slf4j
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
// 这是spring 提供的 操作MQ的模板引擎
private RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("sendMsg/{message}")
public void sendMSg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10秒的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40秒的队列:"+message);
}
}
消费者
/**
* 队列TTL 消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
/**
为什么延迟队列,这是根据死信队列来的,死信队列形成的条件无非就是三种
1、消息延迟
2、拒收消息
3、队列长度
满足其中之一即可
当我们生产者发送消息给QA时,由于QA 没有消费此信息,10秒到期之后,自动发送到死信队列,由QD死信队列接收并且消费。这就是一种延迟消息。
*/
//接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws Exception{
String mes = new String(message.getBody(),"utf-8");
log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),mes);
}
}
测试
浏览器输入http://localhost:8080/ttl/sendMsg/你好啊
经过测试,我们可以指定队列的TTL过期时间,但是业务场景不一定都是固定的时间,例如如果用户需要自定义发布文章,自定义闹钟等场景,这个队列就不能使用了,所以我们可以使用消息TTL过期
消息TTL过期
在上诉代码架构的模型上在添加一个队列QC,绑定关系如下,该队列不设置TTL时间:
配置文件类代码
在原来的代码的基础中,在配置类中 声明一个QC ,同时进行绑定关系。即可
这个QC 队列 不用设置延迟消息时间,而是通过生产者来指定延迟消息的时间。这样就可以完成自定义延迟队列了。
// 优化代码-----创建一个普通队列QC
public static final String QUEUE_C = "QC";
//声明QC队列----->这里不用定义TTL时间,由生产者去定义
@Bean("queueC")
public Queue queueC(){
Map<String,Object> arguments = new HashMap<>(3);
// 设置 死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
//绑定关系
@Bean
public Binding xExchangeBindQC(@Qualifier("xExchange") DirectExchange eExchange,
@Qualifier("queueC") Queue queueC){
return BindingBuilder.bind(queueC).to(eExchange).with("XC");
}
生产者指定发送时间
//开始发送消息----》带有消息 和 TTL时间
@GetMapping("sendExporationMsg/{message}/{ttlTime}")
public void sendMSg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给QC队列:{}",new Date().toString(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC","消息来自TTL为10秒的队列:"+message, msg->{
// 设置发送消息时候的延迟时长
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
由于队列的特性是先进先出,那么例如我发起一个20秒的ttl消息过期的信息 在发送一个2秒消息过期的信息,但是在接收的时候,是先等到20秒的消息,在接收到2秒的消息,就不能实现根据消息ttl的粒度去执行了。
Rabbitmq 插件实现延迟队列
上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间 及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。
安装延时队列插件
在官网上下载 https://www.rabbitmq.com/community-plugins.html,
下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
插件目录:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
执行下列命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安装延迟插件
重启:systemctl restart rabbitmq-server
代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
可以看到,按照原来的方法,我们需要定义普通交换机,普通队列,死信交换机,死信队列
但是使用插件后,我们仅仅只需要定义交换机延迟,和一个队列就实现了延迟队列的功能了。
案例引入
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中。
声明交换机类型的时候,我们要注意,类型一定是 x-delayed-message
在springboot 中,由于这个类型是没有的,所以我们要自定义,通过CustomExchange 可以自定。
参数的说明:
- name:交换机的名称
- type:交换机的类型
- durable:是否持久化
- autoDelete:是否自动删除
- arguments:参数----》如TTL、死信交换机等。x-dead-letter-routing-key" 这样的。
- x-delayed-message---基于插件的交换机
@Configuration
public class DelayedQueueConfig {
// 定义 交换机延迟名称
public static final String DELAYED_EXCHANGE = "delayed.exchange";
// 定义routingkey
public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";
// 定义消息队列
public static final String DELAYED_QUEUE = "delayed.queue";
//声明交换机----交换机类型是:x-delayed-message---基于插件的交换机
@Bean
public CustomExchange delayedExchange(){
//自定义交换机
/**
* 1、交换机的名称
* 2、交换机的类型
* 3、是否需要持久化
* 4、是否需要自动删除
* 5、参数
*/
Map<String,Object> arguments = new HashMap<>();
// 设置 死信交换机
arguments.put("x-delayed-type","direct") ;
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
}
//声明队列
@Bean
public Queue delayedQueue(){
return QueueBuilder.durable(DELAYED_QUEUE).build();
}
//绑定
@Bean
public Binding delayedQueueBindCustomExchange(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange customExchange){
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者代码
// 开始发送消息,基于插件的消息 以及 延迟的时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);
rabbitTemplate.convertAndSend("delayed.exchange","delayed.routing.key",message,msg->{
// 发送消息的时候, 延迟时长 单位 ms
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
消费者代码
/**
* 基于插件的 延迟消息
*/
@Slf4j
@Component
public class DelayQueueConsumer {
//监听消息
@RabbitListener(queues = "delayed.queue")
public void receiveDelayQueue(Message message){
byte[] body = message.getBody();
String meg = new String(body);
log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),meg);
}
}
发起请求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
第二个消息被先消费掉了,符合预期
总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
标签:队列,rabbitmq,死信,交换机,消息,TTL,public,延迟 From: https://www.cnblogs.com/zgf123/p/17674915.html