首页 > 其他分享 >RocketMQ之管理与监控

RocketMQ之管理与监控

时间:2023-05-06 19:12:57浏览次数:34  
标签:console 管理 mqadmin broker RocketMQ 监控 rocketmq

前言

首先提出我们的监控诉求,出现如下情况时,希望能够及时接收到系统告警通知:

  • RocketMQ服务宕机
  • RocketMQ消费者下线
  • RocketMQ消息出现长时间或者大量堆积

本文将通过修改rocketmq-console源码的方式,增加RocketMQ消费者下线和RocketMQ消息出现长时间或者大量堆积监控能力。

一、RocketMQ服务宕机监控告警

这一级别的监控,本质上而言是监控Linux上启动的Rocket MQ Java进程的运行情况。细分的话,需要监控以下两个维度:

  1. Linux Java进程的CPU使用率,内存使用量;
  2. Java进程本身的JVM的服务质量,GC,并发数,内存分布等

一般的公司在运维方面会有专门的监控组件,如zabbix会做统一处理。

例如这里用简单的shell脚本+钉钉组装的最简单的监控告警方式:
监控的方式有很多,比如简单点的,我们可以写一个shell脚本,监控执行rocketmqJava进程的存活状态,如果rocketmq crash了,发送告警:

#!/bin/bash
## monitor.sh
while true
do
    echo "开始监控rocket broker 进程..."
    PID=$(ps -ef | grep java | grep org.apache.rocketmq.broker.BrokerStartup | awk '{printf $2}');
    if [ -z $PID  ];then
        curl 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxx' -H 'Content-Type: application/json' \
        -d '
        {"msgtype": "text",
            "text": {
                 "content": "【172.xxx.xxx.xxx】rocketmq broker 进程不存在,可能宕机,请尽快排查!"
            }
        }'
    fi
    sleep 10
done

效果:

二、RocketMQ的集群组件组成

2.1、rocketmq-console web控制台介绍

官方提供了一个WEB项目,可以查看rocketmq数据和执行一些操作。incubator-rocketmq-externals,这个项目中有一个子模块叫“rocketmq-console”,这个便是管理控制台项目。Rocket-console做为rocketmq社区维护的产品需要从GitHub上下载,下载地址:https://github.com/apache/rocketmq-externals

各个功能的介绍:

  • OPS运维:NameServer的地址的管理
  • Dashboard驾驶舱:展示Broker和Topic的柱状图和折线图。
  • Cluster集群:Brokder的集群部署情况及每个Broker的详情。
  • Topic主题:对Topic的新增、修改、删除。对consumer管理、消息位点重置等。
  • Consumer消费者:consumer的一些状态的管理。
  • Producer生产者:producer的信息(ip、port、版本等)查看。
  • Message消息:根据Topic、Message Key、Message ID三项对Message分组查询。一般第一项根据Topic查询比较多。因为据说根据key去查询有坑。建议id和Topic,id因为唯一最简单。
  • MessageTrace消息轨迹:

一个完整的RocketMQ集群,一般组成关系如下图所示:

除了核心组成部分:Name Server和Broker Cluster之外,RocketMQ还提供了mqadmin工具,该工具的具体实现代码在RocketMQ tools模块(rocketmq-tools-xxxx.jar)中。但是mqadmin命令行在交互上不够友好,rocketmq-console作为一个社区项目,底层基于mqadmin核心库,用Spring Boot+Angularjs实现了一个RocketMQ Web管理端,开发运维人员可以轻松地使用此管理端完成日常运维操作。

三、mqadmin–提供一套命令行工具,做RocketMQ的日常管理维护

3.1 mqadmin 工具在哪儿?

mqadmin本质上是一个Java命令行工具,也就是说执行mqadmin的过程也是执行Java的过程,mqadmin的位置和runbroker和mqnamesrv并列:

3.2 mqadmin能做什么?

执行./mqadmin,会在命令行输出其指令列表:

[root@localhost bin]# ./mqadmin
The most commonly used mqadmin commands are:
   updateTopic          Update or create topic
   deleteTopic          Delete topic from broker and NameServer.
   updateSubGroup       Update or create subscription group
   deleteSubGroup       Delete subscription group from broker.
   updateBrokerConfig   Update broker's config
   updateTopicPerm      Update topic perm
   topicRoute           Examine topic route info
   topicStatus          Examine topic Status info
   topicClusterList     get cluster info for topic
   brokerStatus         Fetch broker runtime status data
   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset
   printMsg             Print Message Detail
   printMsgByQueue      Print Message Detail
   sendMsgStatus        send msg to broker.
   brokerConsumeStats   Fetch broker consume stats data
   producerConnection   Query producer's socket connection and client version
   consumerConnection   Query consumer's socket connection, client version and subscription
   consumerProgress     Query consumers's progress, speed
   consumerStatus       Query consumer's internal data structure
   cloneGroupOffset     clone offset from other group.
   clusterList          List all of clusters
   topicList            Fetch all topic list from name server
   updateKvConfig       Create or update KV config.
   deleteKvConfig       Delete KV config.
   wipeWritePerm        Wipe write perm of broker in all name server
   resetOffsetByTime    Reset consumer offset by timestamp(without client restart).
   updateOrderConf      Create or update or delete order conf
   cleanExpiredCQ       Clean expired ConsumeQueue on broker.
   cleanUnusedTopic     Clean unused topic on broker.
   startMonitoring      Start Monitoring
   statsAll             Topic and Consumer tps stats
   allocateMQ           Allocate MQ
   checkMsgSendRT       check message send response time
   clusterRT            List All clusters Message Send RT
   getNamesrvConfig     Get configs of name server.
   updateNamesrvConfig  Update configs of name server.
   getBrokerConfig      Get broker config by cluster or special broker!
   queryCq              Query cq command.
   sendMessage          Send a message
   consumeMessage       Consume message
   updateAclConfig      Update acl config yaml file in broker
   deleteAccessConfig   Delete Acl Config Account in broker
   clusterAclConfigVersion List all of acl config version information in cluster
   updateGlobalWhiteAddr Update global white address for acl Config File in broker

See 'mqadmin help <command>' for more information on a specific command.
[root@localhost bin]#

例如:

通过命令行查询消息堆压:

[root@localhost rocketmq-4.5.2]# sh bin/mqadmin consumerProgress -n 10.200.110.46:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Group                           #Count #Version #Type #Model     #TPS #Diff Total
please_rename_unique_group_name_ 0      OFFLINE                   0    0
configurationfile-encryption-dem 0      OFFLINE                   0    0
groupnamedef                     1      V4_5_2   PUSH  CLUSTERING 0    0
TOOLS_CONSUMER                   0      OFFLINE                   0    0
[root@localhost rocketmq-4.5.2]# ls

具体每个指令的作用不是本文的重点,后续会开新的文章介绍~

四、使用rocketmq-console添加MQ监控告警

我们可以利用rocketmq-console做如下的监控:

  • RocketMQ消费者下线
  • RocketMQ消息出现长时间或者大量堆积

4.1 rocketmq-console的监控告警功能

作为mqadmin的GUI封装,rocketmq-console基本上具备了mqadmin的功能外,也提供了一些额外的功能,如dashboard面板统计。但是,作为开源源码部分,rocketmq-console将MQ监控功能做了隐藏,我们需要手动放开。如下是使用rocket-console的监控原理:

当此项功能被放开后,在Consumer菜单下,为每一个consumer-group的operation会增加MONITOR CONFIG选项,如下图所示:

指标名称 说明 备注
minCount 当前消费分组的机器数量最小阈值,低于此值将会告警
maxDiffTotal 当前消费分组允许的最大消息堆积量,高于辞职将会告警

4.2 如何开启rocketmq-console的监控告警功能

开源的rocketmq-console将此功能隐藏了,可以通过下载源码,并修改源码的方式支持。

4.2.1 下载源码

从github中获取源码,rocketmq-externals
地址:https://github.com/apache/rocketmq-externals

4.2.2 导入项目

项目导入后,如下图所示,rocketmq-console即为控制台代码

4.2.3 放开console控制台的监控参数配置

默认的rocketmq-console将此功能注释掉了,修改文件:

~/rocketmq-console/src/resources/static/view/pages/consumer.html,将如下图所示的代码放开即可。

4.2.4 开启定时任务监控,扫描实时数据,做阈值判断,告警提示

默认情况下,rocketmq-console只定义了定时任务入口,具体的策略没有任何处理,我们需要根据自己的需求加入自身的告警方式,比如:邮箱,钉钉,短信,微信等等。

其预留的定时任务实现类为:org.apache.rocketmq.console.task.MonitorTask

定时任务的扫描频率可根据自身系统要求考量设置。

@Component
@Slf4j
public class MonitorTask {

    @Resource
    private MonitorService monitorService;

    @Resource
    private ConsumerService consumerService;

    //@Scheduled(cron = "* * * * * ?")
    //定时任务的扫描频率可根据自身系统要求考量设置
    public void scanProblemConsumeGroup() {
        for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) {
            GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey());
            if (consumeInfo.getCount() < configEntry.getValue().getMinCount() 
                    || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
                log.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo));
               // notify the alert system
               //根据自身的要求加如通知方式
            }
        }
    }
}

4.2.5 部署运行

build修改后的代码,生成可运行的jar包,然后部署运行

4.2.6 样例

经笔者改造后的console的控制台可以显示出‘MONITOR CONFIG’配置项:

钉钉告警样例:

五、总结

rocketmq-console作为开发运维人员监控MQ的便捷入口,可根据自身要求改造rocketmq-consolerocketmq-console服务本身可以调用所有mqadmin的所有能力,项目本身基于angularjs +spring boot,作为java开发人员来说拓展成本也比较低。不过前期需要对RocketMQ的一些概念和各种衡量标准要有明确的认知。

标签:console,管理,mqadmin,broker,RocketMQ,监控,rocketmq
From: https://www.cnblogs.com/ciel717/p/17363790.html

相关文章

  • RocketMQ之消息发送源码分析
    一、概述负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ支持三种消息发送方式:同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果;异步消息发送(async):当Producer发送消息到Broker时......
  • RocketMQ之消息接收源码分析
    一、概述对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:Push方式:由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端......
  • C/C++活动管理程序[2023-05-06]
    C/C++活动管理程序[2023-05-06]设计一个活动管理程序,该程序具有以下功能:(1)多用管理,用户登录:输入用户名和密码,密码正确才允许登录。(2)可以创建活动,设定活动的内容,活动的人数,时间,要求等;(3)登录的所有用户可以查看当前所有正在征集的活动,并可以选择参加;(4)活动的创建者,可......
  • RocketMQ单机版安装
    1、下载最新的安装包  github下载地址:https://github.com/apache/rocketmq/releases。本文安装版本为:rocketmq-all-5.1.0-bin-release.zip2、安装JDK3、上传并解压安装包#从本地电脑上传安装包到Linux服务器scpE:\[email protected]......
  • 28基于java的简单酒店数据管理
    本文章介绍一个基于java的简单酒店数据管理系统项目介绍该项目适用于初学java后,需要一个小练手的javaweb项目,该项目是只有一个酒店数据表,然后实现对该酒店增加,修改,删除和分页查询的小案例,虽然项目不是很复杂,但麻雀虽小但五脏俱全,适合于个人学习适用。项目使用的技术架构后......
  • 系统集成09-Samba文件共享服务器管理
    系统集成09-Samba文件共享服务器管理1Samba文件共享服务器搭建Samba文件共享服务介绍Samba是一个能让Linux系统应用Microsoft网络通讯协议的软件,SMB(ServerMessageBlock)服务器消息块。Samba最大的功能是可以用于Linux与windows系统直接的文件共享和打印共享,既可以用于Window......
  • 系统集成07-系统管理
    系统集成07-系统管理1任务管理什么是任务管理?在系统运维过程中,可能需要在某个预设的时间执行特定任务比如定时发送邮件、备份并清空日志文件等任务任务的内容可以看作是一系列命令或者一个脚本,我们则需要在特定时间去执行它任务管理分为计划执行和定期执行1.1计划执行......
  • ICT实战系统集成-LAB9-Samba文件共享服务器管理
    ICT实战系统集成-LAB9-Samba文件共享服务器管理实验步骤1安装samba服务端安装samba、samba-common、samba-clientyuminstallsambasamba-commonsamba-client启动samba:ststemctlstartsmb.service查看samba状态2添加系统组share,添加系统账号xiaoming、hanmeimei......
  • ICT实战系统集成-LAB6-openEuler管理文件系统及存储
    LAB6-openEuler管理文件系统及存储1实验要求1.1添加两块scsi硬盘,大小分别为10G1.2对新添加的硬盘1(如:/dev/sdb)进行MBR分区、格式化、挂载1、使用fdisk对/dev/sdb进行分区:/dev/sdb1为主分区1大小2G、/dev/sb2为扩展分区大小8G,在/dev/sb2的基础上建立扩展分区/dev/sdb5,大小......
  • ICT实战系统集成-LAB5-OpenEuler软件管理
    系统集成-LAB5-OpenEuler软件管理1实验要求任务一:使用rpm包安装zziplib工具1、完成安装2、查询zziplib工具是否安装成功3、查询zziplib工具的文件列表和完整目录4、查询zziplib工具的详细信息5、对zziplib工具进行卸载任务二:使用yum/dnf安装java-1.8.01、完成yum/dnf源......