首页 > 其他分享 >如何解决消息队列的延时以及过期失效问题?

如何解决消息队列的延时以及过期失效问题?

时间:2022-08-29 00:33:42浏览次数:104  
标签:消费 积压 过期 数据 队列 mq 消息 延时 consumer

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

面试官心理分析

你看这问法,其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了;或者消费的速度极其慢。接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是这整个就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办?

所以就这事儿,其实线上挺常见的,一般不出,一出就是大 case。一般常见于,举个例子,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 那儿了,不动了;或者是消费端出了个什么岔子,导致消费速度极其慢。

面试题剖析

关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在出事故了,慌了。

大量消息在 mq 里积压了几个小时了还没解决

几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。

一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
  • 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
  • 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  • 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

mq 中的消息过期失效了

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。

这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。

假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

mq 都快写满了

如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。


对于 RocketMQ,官方针对消息积压问题,提供了解决方案。

1. 提高消费并行度

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax 实现。

2. 批量方式消费

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer 的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

3. 跳过非重要消息

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到 100000 条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:

public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
    long offset = msgs.get(0).getQueueOffset();
    String maxOffset =
            msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
    long diff = Long.parseLong(maxOffset) - offset;
    if (diff > 100000) {
        // TODO 消息堆积情况的特殊处理
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    // TODO 正常消费过程
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

4. 优化每条消息消费过程

举例如下,某条消息的消费过程如下:

  • 根据消息从 DB 查询【数据 1】
  • 根据消息从 DB 查询【数据 2】
  • 复杂的业务计算
  • 向 DB 插入【数据 3】
  • 向 DB 插入【数据 4】

这条消息的消费过程中有 4 次与 DB 的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把 DB 部署在 SSD 硬盘,相比于 SCSI 磁盘,前者的 RT 会小很多。

 

标签:消费,积压,过期,数据,队列,mq,消息,延时,consumer
From: https://www.cnblogs.com/niissan/p/16634568.html

相关文章

  • 共享栈和双端队列
    一、算法设计思想1.ABCD顺序入栈,任意时刻出栈,共多少种排列(Catalan数:(1/n+1)·C2nn)       一定不存在这种情况:i<j<k,Str[i]>Str[k]>Str[j]。只需要在全排列的基......
  • php中设置session过期时间方法
    php中设置session过期时间方法-php手册-PHP中文网 https://www.php.cn/php-notebook-45754.html在apache与php的环境中默认过期时间是20分钟左右,那么我们要怎么设置ses......
  • Kubernetes证书过期
    证书过期提示root@kubernetes#kubectlgetnodesUnabletoconnecttheserver:X509certificatehasexpiredorisnotyetvalid操作步骤1、查看证书有效期ku......
  • 如何保证消息队列的高可用?
    如何保证消息队列的高可用?面试官心理分析如果有人问到你MQ的知识,高可用是必问的。上一讲提到,MQ会导致系统可用性降低。所以只要你用了MQ,接下来问的一些要点肯......
  • 队列
    一、结构体定义1.顺序队typedefstruct{ intdata[maxSize]; intfront,rear;}SqQueue;2.链队(1)队结点类型typedefstructQNode{ intdata; structQNode*ne......
  • 消息队列面试题
             ......
  • 优先队列-多路归并系列题解
    373.查找和最小的K对数字问题描述给定两个以升序排列的整数数组nums1和nums2 , 以及一个整数k 。定义一对值 (u,v),其中第一个元素来自 nums1,第二个元素来......
  • asyncio队列 asyncio.Queue()
    importasyncio#如果maxsize小于等于零,则队列尺寸是无限的。#如果是大于0的整数,则当队列达到maxsize时,awaitput()将阻塞至某个元素被get()取出Q=async......
  • 优先队列-2386. 找出数组的第 K 大和
    问题描述给你一个整数数组nums和一个正整数k。你可以选择数组的任一子序列并且对其全部元素求和。数组的第k大和定义为:可以获得的第k个最大子序列和(子序......
  • LeetCode 232. 用栈实现队列
    思路:用两个栈实现队列pop操作,若out栈为空则先将in中元素push进out,再pop出out中元素peek操作,直接调用pop,在将pop出元素push进outclassMyQueue{public:stack<int......