首页 > 其他分享 >MQ【消息延迟解决方案】

MQ【消息延迟解决方案】

时间:2023-11-02 15:11:20浏览次数:42  
标签:消费 队列 解决方案 Kafka MQ 消息 监控 缓冲区 延迟

一、消息延迟如何监控

       1、消息队列提供的工具,通过监控消息的堆积来完成。

       2、通过生产监控消息对消息延时的监控。

二、详情

     /2.1、消息队列工具

           以kafka为例。不用版本消费者的消费进度不一样。

           在 Kafka0.9 之前的版本中,消费进度是存储在 ZooKeeper 中的,消费者在消费消息的时候先要从 ZooKeeper 中获取最新的消费进度,再从这个进度的基础上消费后面的消息。          

           在 Kafka0.9 版本之后,消费进度被迁入到 Kakfa 的一个专门的 topic 叫“__consumer_offsets”里面。

当然,作为一个成熟的组件,Kafka 也提供了一些工具来获取这个消费进度的信息帮助我们实现自己的监控,这个工具主要有两个:

           (1)、Kafka 提供了工具叫做“kafka-consumer-groups.sh”(它在 Kafka 安装包的 bin 目录下)。

  1. 前两列是队列的基本信息,包括topic名和分区名。
  2. 第三列是当前消费者的消费进度。
  3. 第四列是当前生产消息的总数。
  4. 第五列就是消费消息的堆积数(也就是第四列与第三列的差值)。

           (2)、JMX

                Kafka 通过 JMX 暴露了消息堆积的数据,然后我们就可以通过写代码将这个堆积数据发送到我们的监控系统中去。

       2.2、自己生成消息监控

            可以自定义一种特殊的消息,然后启动一个监控程序将消息定时循环的写入到消息队列中,这个消息可以是生成一个时间戳。同时这个消息是可以被消费者消费的,当业务消费到的时候就将其丢弃,而监控程序消费这个消息是,就将其生成时间和消费时间进行对比,如果超过了我们预设的一定阈值就像我们报警。

   建议:上面两种方式都是可以监控消息延迟的,但是在实际生产中,这里推荐将他们两者进行结合来使用,比如,我们先可以在监控程序中通过JMX获取消息堆积数据,然后发送到我们的dashboard 中;同时起一个探测进程确认消息的延迟情况是怎样的。

  三、减少消息延迟

         3.1、消费端

  1. 通过优化消费代码来提升性能。
  2. 增加消费者的数量 。

                 不过第二种方式并不是对于所有的消费队列有效的,它是受消费队列限制的,比如Kafka 是不能通过增加消费者数量来提升消费性能的。

               因为, 在 Kafka 中,一个 Topic 可以配置多个 Partition,数据会被平均或者按照生产者指定的方式写入到多个分区中,那么在消费的时候,Kafka 约定一个分区只能被一个消费者消费,为什么要这么设计呢?在我看来,如果有多个 consumer(消费者)可以消费一个分区的数据,那么在操作这个消费进度的时候就需要加锁,可能会对性能有一定的影响。

              所以,分区数量决定了消费的并行度,增加多余的消费者也是没有用处的,你可以通过增加分区来提高消费者的处理能力。

            既然如此,那我们在不增加分区的情况下该怎么去提升消费性能呢?

             我们虽然不能增加消费者,但是我们可以在消费者使用并行处理。所以我们就可以考虑使用多线程的方式来增加处理能力:

  1.  预先创建一个或者多个线程池;
  2.  拉取到消息丢到线程池中进行异步处理,将串行的消息消费变成了并行的;
  3.  不仅提高了吞吐量,还可以一次消费多拉取一些消息,分配给多个线程来处理 ;

       

     3.2、消息队列本身

             两个关键点:消息存储、零拷贝技术。

           【消息存储】:应当使用本地磁盘作为存储介质。Page Cache 的存在就可以提升消息的读取速度,即使要读取磁盘中的数据,由于消息的读取是顺序的并且不需要跨网络读取数据,所以读取消息的 QPS 肯定是比普通数据库高很多很多。

          【零拷贝技术】:说是零拷贝,其实我们不可能消灭数据的拷贝,只是尽量减少拷贝的次数。在读取消息队列的数据的时候,其实就是把磁盘中的数据通过网络发送给消费客户端,在实现上会有四次数据拷贝的步骤:

                         1、数据从磁盘拷贝到内核缓冲区;

                         2、系统调用将内核缓存区的数据拷贝到用户缓冲区;

                         3、用户缓冲区的数据被写入到 Socket 缓冲区中;

                         4、操作系统再将 Socket 缓冲区的数据拷贝到网卡的缓冲区中。

                  

 操作系统提供了 Sendfile 函数可以减少数据被拷贝的次数。使用了 Sendfile 之后,在内核缓冲区的数据不会被拷贝到用户缓冲区而是直接被拷贝到 Socket 缓冲区,节省了一次拷贝的过程提升了消息发送的性能。高级语言中对于 Sendfile 函数有封装,比如说在 Java 里面的 java.nio.channels.FileChannel 类就提供了 transferTo 方法提供了 Sendfile 的功能。

      队列经常会用在我们项目当中,做好数据堆积监控是关键。

       

 

标签:消费,队列,解决方案,Kafka,MQ,消息,监控,缓冲区,延迟
From: https://www.cnblogs.com/xiaobaicai12138/p/17805462.html

相关文章

  • 真空充氮包装机远程监控运维平台解决方案
    真空充氮包装机是一种复合型的包装机械设备,用于包装食品、药品、化学品和其他物品。它通过将包装物品抽成真空,然后在其中注入氮气等方式,保护包装物品免受氧气、湿气和其他污染物的侵害,可以有效地延长包装物品的保质期,并提高它们的保存质量。 真空充氮包装机集制氮气、抽真空、充氮......
  • pytest + yaml 框架 -39.多账号操作解决方案
    前言最近有小伙伴提到,有写场景需要用到2个账号来回切换操作该如何解决。(备注:从v1.2.4以后新版本不再公开,新功能内部VIP学员可以使用,公开版本仅解决bug,不提供新功能了。)先获取账号token前面教程有讲到全局登录一次,后面所有的请求都会拿着全局登录的账号token去访问请求。......
  • javamail发送附件DataSource使用文件流解决方案
    问题:在使用james邮件服务器发送邮件时,附件是存储在华为云服务器上的,只能通过ApacheHttpClient去下载,存储在FTP上的文件同样会碰到这个问题。API上邮件添加附件的方法:/*************1.本地文件*************///将本地文件作为附件DataSourcedataSource=newFileDataSourc......
  • 羚通视频智能分析平台视频监控算法分析平台视频叠框显示慢且总是一闪一闪的,无法实时跟
    ​羚通视频智能分析平台是一款基于大数据和算法分析的综合性平台,它通过对视频数据的智能分析和处理,为用户提供全方位、多层次的监控服务。该平台集成了多种智能分析算法,可以自动识别和分析视频中的目标对象,如人脸、人体、烟火等。适用于各行各业。最近,有用户反馈,在使用羚通视频......
  • odigos 基于ebpf 以及OpenTelemetry 的分布式tracing 解决方案
    按照odigos官方的介绍是不需要进行代码的修改就可以实现方便的跨应用的分布式trace,目前支持java,python,net,go,js等语言目前看官方的介绍,安装是比较简单的(核心基于了k8s),目前官方文档比较清晰可以试用下说明目前开源分布式trace的工具是越来越多了,同时基于ebpf以及OpenTelemetry......
  • 【Mquant】2、量化平台的选择
    一、选择因素功能和工具集:量化平台应该提供丰富的功能和工具集,包括数据分析、策略回测、实时交易等。不同的平台可能有不同的特点和优势,可以根据自己的需求选择适合的平台。数据源和数据质量:量化交易离不开高质量的数据,因此选择一个平台时要考虑其数据源和数据质量。一些平......
  • NativeBuffering,一种高性能、零内存分配的序列化解决方案[性能测试篇]
    第一版的NativeBuffering([上篇]、[下篇])发布之后,我又对它作了多轮迭代,对性能作了较大的优化。比如确保所有类型的数据都是内存对齐的,内部采用了池化机器确保真正的“零内存分配”等。对于字典类型的数据成员,原来只是“表现得像个字段”,这次真正使用一段连续的内存构架了一个“哈希......
  • celery 进程如何做健康检查,判断 amqp 连接是不是断了?
    要检查Celery进程的健康状况并判断AMQP连接是否断开,可以参考以下方法:使用Heartbeat(心跳)机制:Celery提供了Heartbeat机制来确保AMQP连接的稳定性。你可以在Celery的配置文件中设置BROKER_HEARTBEAT参数来启用心跳检测。心跳检测会定期发送一条特殊的消息给AMQP服务器,如果服务器没有收......
  • 跨越禁区:性感的跨域解决方案揭秘
    跨域问题介绍跨域问题是由于浏览器的同源策略引起的,当一个请求的协议、域名或端口与当前页面不一致时,浏览器会拒绝接收响应。虽然服务器已经处理并响应了请求,但浏览器为了用户的安全,会单方面拒绝响应。为了解决这个问题,常见的有五种方法:JSONP、script标签、前端代理、Nginx代理和......
  • ActiveMQ是什么?-九五小庞
    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMSProvider实现。特点:1、支持多种语言编写客户端2、对spring的支持,很容易和spring整合3、支持多种传输协议:TCP,SSL,N......