参考:https://blog.csdn.net/weixin_42128977/article/details/126152834
https://cloud.tencent.com/developer/article/2310463?areaId=106001
场景
-
定时任务,比如任务A和任务B是同条流水线上的,当任务A完成了,一个小时后执行任务B
-
- 我们打车,在规定时间内,没有车主接单,那么平台就会推送消息给你,提示暂时没有车主接单。
-
- 网上支付场景,下单了,如果没有在规定时间付款,平台通常会发消息提示订单在有效期内没有支付完成,此订单自动取消之类的信息。
-
- 我们买东西,如果在一定时间内,没有对该订单进行评分,这时候平台会给一个默认分数给此订单。
-
重试业务,比如业务A需要调用其它服务,而服务出现问题,这时候就需要做业务重试
-
- 当分布式锁加锁失败时,将消息放入到延迟队列中处理
实现方式
- 基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,或者定义时间轮,新消息落在指定位置;
- 基于队列的延迟: 设置不同延迟级别的队列,比如5s、1min、30mins、1h等,每个队列中消息的延迟时间都是相同的。
- 基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rmq可安装插件实现。
- 第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;
- 第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。
- 当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,然假如我们对消息的实时性以及可靠性要求非常高,可能就需要使用MQ或kafka来实现了。
redis实现延迟队列
基本原理
- 延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来
- zset是按相关分数排序的唯一字符串(成员)的集合。当多个字符串具有相同的分数时,这些字符串按字典顺序排列。
实现思路
- 消息体设置有效期,设置好score,然后放入zset中
- 通过排名拉取消息
- 有效期到了,就把当前消息从zset中移除
zadd
ZADD key score member [[score member][score member] …]
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作。如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。score 值可以是整数值或双精度浮点数。
zrangebyscore
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
- 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
- 具有相同 score 值的成员按字典序来排列
- 可选的 LIMIT 参数指定返回结果的数量及区间(就像SQL中的 SELECT LIMIT offset, count ),注意当 offset 很大时,定位 offset 的操作可能需要遍历整个有序集,此过程最坏复杂度为 O(N) 时间。
- 可选的 WITHSCORES 参数决定结果集是单单返回有序集的成员,还是将有序集成员及其 score 值一起返回。
zrem
ZREM key member [member …]
移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。当 key 存在但不是有序集类型时,返回一个错误。
redis延迟队列优缺点
优点
- Redis zset支持高性能的 score 排序。
- Redis是在内存上进行操作的,速度非常快。
- 支持指定消息 remove
- Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。
- Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性
缺点
- 使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
- 没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
- 没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了
消息中间件实现延时队列
Rabbitmq 延时队列
优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列,每一种延时就需要建立一个队列。因为当后面的消息比前面的消息先过期,还是只能等待前面的消息过期,这里的过期检测是惰性的。
使用: RabbitMQ 可以针对 Queue 设置 x-expires 或者针对 Message 设置 x-message-ttl ,来控制消息的生存时间(可以根据 Queue 来设置,也可以根据 message 设置), Queue 还可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key (可选)两个参数, 如果队列内出现了 dead letter ,则按照这两个参数重新路由转发到指定的队列,此时就可以实现延时队列了。
RocketMQ实现延时队列
rocketmq在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息处理的顺序性,可以让同一个队列中消息延时时间是相同的,整个RocketMQ中延时消息时按照递增顺序排序,保证信息处理的先后顺序性。)。之后,通过一个定时器来轮询处理这些队列里的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进行处理。
Kafka实现延时队列
Kafka基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer),Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,可以进行相关的延时队列设置。
Netty实现延时队列
Netty也有基于时间轮算法来实现延时队列。Netty在构建延时队列主要用HashedWheelTimer,HashedWheelTimer底层数据结构是使用DelayedQueue,采用时间轮的算法来实现。