首页 > 其他分享 >【打怪升级】【rocketMq】如何处理积压消息

【打怪升级】【rocketMq】如何处理积压消息

时间:2023-03-08 15:57:30浏览次数:53  
标签:topic 消费 积压 消费者 队列 升级 消息 rocketMq

遇到消息积压,如何处理?

  • 什么是消息积压

    消费积压是一个很直接的概念,看图:

    

    当某一批对应的消息,生产者生产的速度大于消费者消费的速度时,就会发生消息积压。因为靠目前的消费者永远无法消费完生产的消息。

 

  • 如何处理消息积压

    由上图,可以看出要想让生产消费者保持一个能力的水平,有这么几点:

    生产者消费过快,是否会生产废弃的消息?生产者消息是否会重复?(https://www.cnblogs.com/oldEleven/p/17149787.html

    消费者是否没有拉取到消息,拉取消息被暂停?dorebalance?

    消费者消费能力太弱(这才是大部分人遇到的问题)


  • 优化生产端:一般来说,消息都是根据执行自己的业务后再到MQ的,如果说MQ前置的业务阻塞过于严重,可能导致消息生产跟不上,但一般来说这不是消息积压的主要影响,而是业务处理缓慢的原因之一。发送消息是否大量重复,一般来说不会,具体请参考(https://www.cnblogs.com/oldEleven/p/17149787.html),为什么消息会发生重复?还有一种是否产生大量废弃的消息?这种的情况比较少,例如某个程序员张三发现同步执行代码块太慢,它采取了 executor.submit( () -> sendMessage)这种不管业务是否正常,异步全部把消息发出去,这时候比如我们做了消息检查,或者消费幂等可能会发现很多被丢弃的消息。

      生产者发送一次消息,主要经过这几步骤:

      处理数据,组装,序列化

      网络传输

      broker接收到消息的处理、持久化

    那么,如果我们单条消息发送耗时长,我们是否可以采用批量发送的方式?这种方式各有利弊,取决于你的业务场景。

  • 消费端是否没有拉取到:dorebalance时会阻塞pullMessage向broker的数据,但是不会阻塞processQueue中缓存的拉取消息,这一点理应是不存在的,毕竟那么牛逼的团队设计出那么好的产品不会出这种问题的。具体的拉取可以参考(https://www.cnblogs.com/oldEleven/p/17149787.html),消费者的生命周期

   消费端,才是我们最常见的原因。

    如果消费性能只是很短暂的,或者因为一些复杂处理影响了消费的速度,在消费端恢复后,只要消费能力大于生产的能力,那么积压的消息是可以逐渐被消化的,但是这里要考虑消费的时间。

    如果消费能力一直比较低,时间长了系统就会出问题。大量积压消息处理不了,幸好这时rocket,如果这时rabbit,可能内存都被打透了 。所以我们一定要保证消费的能力一定要高于生产的能力。

      优化消费端代码,避免出现大量异常阻塞消费进度。

      可以做业务批量拉取,异步消息的处理,并且也可以设置consumer的异步线程数,提高消费端业务的相应。(异步线程是可以加速,但是会有丢失的风险,需要结合业务场景

      消费端水平扩容,这也是很常见的方式之一,因为某些系统是有高锋的,阿里的电商也不是每时每刻都保持着双十一的峰值把?这里说的扩容是指扩容consumer的实例。但是consumer扩容要注意一点,不要做无用的扩容!

    

     接着上消费者对应队列最基本的图,假设我们最开始有两台consumer实例,经过rebalance后,会将topic下的4个队列分配给2台consumer,如果采用默认分配规则,则:consumer1:queue1、queue2,consumer2:queue3、queue4。这时我发现消费能力比较差,我又新加了两个consumer在consumerGroup里,这时出发rebalance,consumer1再去处理balance时发现现在有4个消费者了,4个队列,我是消费者1,那么我分配到队列1的绑定,然后通知broker,那这样其实就是一个consumer对应一个queue,consumerGroup的消费能力增强了。但是,我又加了两个consumer5,consumer6。当再次balance时,因为rocket一个队列只能被一个消费者进行绑定消费,那么5和6的消费者将永远消费不到消息,这就是无用的扩容,没有消息会被他们处理,看着消费者集群是增加了,但是没有用!

    那么,如果在线上发生消息积压,我们需要扩容,我们应该怎么做呢?

    假如,我现在有4个队列的消息发生了积压,线上我又没有办法给它队列做扩容,这时候,我们应该做:

      新建一个topic,设置好topic的队列,例如里面我放了20个队列。

      启一个消费者,这个消费者从原来的topic消费,将消息放在新创建的topic里。这个消费的速度很快,因为它不做业务处理。

      给新的topic加消费者实例,让新增的消费者分配到新的topic消息中消费。

      

  • 短时间的消费积压

    短时间消费积压,例如某一个时刻业务量庞大,但是很短暂。这时扩容感觉不是很需要的情况下,不扩容又会积压消息,这时可以采取服务降级,将一些不重要的消息延后,或者延迟处理一些生产者,当然具体需要根据监控看一下到底是消费慢了还是生产快了,再做对应的处理。

 

标签:topic,消费,积压,消费者,队列,升级,消息,rocketMq
From: https://www.cnblogs.com/oldEleven/p/17149781.html

相关文章