首页 > 其他分享 >RocketMQ消费暂停问题分析

RocketMQ消费暂停问题分析

时间:2023-06-08 16:35:48浏览次数:39  
标签:消费 pcr 互斥 线程 消息 导流 暂停 RocketMQ

一、背景

客经使用rocketMq批量推送数据到pcr执行次贷策略引擎和互斥决策引擎,pcr将决策结果推送到前置路由。

二、问题现象描述

在客经推数据时,pcr-updateBorrowState消息积压越来越多,从日志上看,pcr不拉取消息,重启服务器后可以消费消息,过一会又消费变慢,不断重启才让所有消息消费完。

三、业务处理流程

1、 客经在hive筛选客户,经客经程序查出客户号,通过mq批量发送到pcr

2、 Pcr 接收批量客户,逐一执行次贷策略,并发送mq执行互斥策略

3、 将次贷策略输出的导流机构发送给adv执行导流路由

涉及到mq的topic:

pcr-hiveDiversionPolicyExecute:客经程序发送, pcr 消费后执行次贷策略引擎,每个消息包含100个客户号,每个消息间隔1s

pcr-updateBorrowState:pcr自产自销,执行互斥决策引擎的topic,每个消息一个客户

   

四、MQ部署架构

       MQ集群:主从模式,四台机器

 

   

五、原因分析

 

   

pullRequest对象:

分配队列时(rebanlance)会重新生成pullRequest对象

消息都取出后会更新偏移量,重新生成pullRequest对象

现象1、pcr-updateBorrowState消息积压:

原因:客经一个消息包含100个客户,每秒发送一次,即每秒推送一个消息(含100个客户号)到消息队列中,pcr的消费线程数为20个,在消费导流消息时会将实际执行导流的逻辑丢到只开了4个核心线程数的线程池中,一个消息100个客户对应100个任务在4个线程中执行,所以导流消息消费耗时较长,所以占用消费线程的时间较长。而每个客户执行完导流后还会发送消息执行互斥策略,因此导流消息和互斥策略消息比例为1:100,当大部分消费线程被导流消息占用,影响其他消息消费,而互斥策略消息生产速率是接近100个/s。所以互斥策略消息积压明显。因为客经发送的一个消息包含100个客户,平均每秒发送一次,及每秒推送100个客户到次贷导流的topic中,而pcr一次拉取32个消息,循环遍历32个消息,每个消息(含100个客户)开一个消费线程来消费,每个消费线程内部开4个线程并行处理,由于次导流消费耗时较长(1-10min),即使一分钟内完全消费完这32个消息,也远不及客经发送该topic的速度(一分钟可发送60个消息);

 

   

一个消息队列对应生成一个pullrequest对象,同一个消费组的所有topic消息(互斥消息、导流消息、或者其他消息)对应生成的pullRequest对象都放在一个队列中,由于客经不停的发送导流消息,就会导致pullRequest队列中连续的pullRequest对象全部是导流消息请求,从而消费线程逐渐全部被导流消息占满,其他消息(不仅仅pcr-updateBorrowState)就得不到资源消费,因为会全部积压;

现象2、 服务器不消费消息,重启才开始拉取消息:

原因:机器重启后会重新分配消息队列,此时会重新生成pullRequest对象,此时该对象中可能重新分派了互斥topic消息,此时该消息获得消费线程后就会被消费,因此可以观察到互斥消息积压下降,但很快当20个消费线程又都被占用来消费客经消息(导流消息消费很慢)时,就又回到了之前的场景,所以要不停重启才会消费其他消息

六、解决方案

1、互斥决策引擎mq(pcr-updateBorrowState)消费由单线程改为多线程处理

2、次贷导流由(100个客户/1s)下调频率

3、将导流的mq单独定义一个消费组,这样就可与线上其他topic隔离,并且也不共用主流程消费的线程

4、批量推送的数据只入库,消费线程不等直接返回;后续处理逻辑由调度捞取执行


作者:上好佳28
链接:https://www.jianshu.com/p/7d4969c1a535
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

标签:消费,pcr,互斥,线程,消息,导流,暂停,RocketMQ
From: https://www.cnblogs.com/yogayao/p/17466876.html

相关文章

  • 数学建模校赛---基于餐厅消费数据的隐形资助研究
    一、校赛初知1.1校赛题目近年来,随着精准资助的开展,如何准确判定高校家庭经济困难学生、切实完善精准资助手段,对高校资助工作具有重要意义。隐性资助是通过大数据挖掘的形式,找准家庭经济困难学生的行为或经济状况特征,隐形认定(识别)出困难学生,并通过隐形实施的方式(不评比,不公示)给予适......
  • RocketMQ 学习社区重磅上线!AI 互动,一秒了解 RocketMQ 功能源码
    作者:RocketMQ学习社区RocketMQ背景ApacheRocketMQ诞生至今,一直服务于100%阿里集团内部业务、阿里云以及开源社区数以万计的企业客户。历经十多年双十一严苛流量验证的RocketMQ,承载了超过万亿级消息规模的洪峰压力。2021年ApacheRocketMQ更是进入全新5.0时代。立足于企......
  • RocketMQ 学习社区重磅上线!AI 互动,一秒了解 RocketMQ 功能源码
    作者:RocketMQ学习社区RocketMQ背景ApacheRocketMQ诞生至今,一直服务于100%阿里集团内部业务、阿里云以及开源社区数以万计的企业客户。历经十多年双十一严苛流量验证的RocketMQ,承载了超过万亿级消息规模的洪峰压力。2021年ApacheRocketMQ更是进入全新5.0时代。立足......
  • rocketmq
    rocketmq在RocketMQ中,msgId和offsetMsgId是两个不同的概念。msgId是消息唯一标识符。具有全局唯一性,由RocketMQ在消息发送时自动生成。当消息被成功发送到Broker端后,msgId会随着消息一起存储在commitlog文件中,并且在消息消费时也可以用来查找和定位该消息。通常情......
  • RocketMQ:一个纯java的开源消息中间件--开发测试环境搭建
    一、简介  RocketMQ的前身是Metaq,当 Metaq 3.0发布时,产品名称改为RocketMQ    MetaQ2.x版本由于依赖了alibaba公司内部其他系统,对于公司外部用户使用不够友好,推荐使用3.0版本。   项目地址: https://github.com/alibaba/RocketMQ二、安装RocketMQ   安装Rocket......
  • RocketMQ入门(3)拉取消息
     http://www.changeself.net/archives/rocketmq%e5%85%a5%e9%97%a8%ef%bc%883%ef%bc%89%e6%8b%89%e5%8f%96%e6%b6%88%e6%81%af.html......
  • RocketMQ 脚本调优
    #!/bin/sh##ExecuteOnlyOnce#echo'vm.overcommit_memory=1'>>/etc/sysctl.confecho'vm.min_free_kbytes=5000000'>>/etc/sysctl.confecho'vm.drop_caches=1'>>/etc/sysctl.confecho'vm.zone_re......
  • KafKa消费开发
     KafKa消费开发配置以下代码需要写完整,不完整会出现中断,假死现象,长时间不处理问题。(实际项目代码)///<summary>///-offsets是自动提交的。///-consumer.Poll/OnMessage是用于消息消费的。///-......
  • 万字长文讲透 RocketMQ 4.X 消费逻辑
    RocketMQ是笔者非常喜欢的消息队列,4.9.X版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。这篇文章,笔者梳理了RocketMQ的消费逻辑,希望对大家有所启发。1架构概览在展开集群消费逻辑细节前,我们先对RocketMQ4.X架构做一个概览。整体架构中......
  • 万字长文讲透 RocketMQ 4.X 消费逻辑
    RocketMQ是笔者非常喜欢的消息队列,4.9.X版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。这篇文章,笔者梳理了RocketMQ的消费逻辑,希望对大家有所启发。1架构概览在展开集群消费逻辑细节前,我们先对RocketMQ4.X架构做一个概览。整体架构中......