Prometheus+Grafana监控flink任务指标
前期准备
Prometheus 是一款基于时序数据库的开源监控告警系统,由go语言开发,Prometheus的基本原理是通过HTTP协议周期性抓取被监控组件的状态,任意组件只要提供对应的HTTP接口就可以接入监控。
Grafana 是一款采用Go语言编写的开源应用,前端由React框架开发,是一个跨平台的开源的度量分析和可视化工具,可以通过将采集的数据查询然后可视化的展示,并及时通知。
Pushgateway 是 Prometheus 生态系统中的一个组件,它的设计目的是允许任务将指标数据推送到 Pushgateway,然后由 Prometheus 从 Pushgateway 中拉取这些数据。
这里对 Pushgateway 的安装作简要介绍:
Pushgateway 安装
下载安装包
下载 pushgateway,下载地址如下:
https://github.com/prometheus/pushgateway/releases
解压安装
解压文件
tar -zxvf pushgateway-1.10.0.linux-amd64.tar.gz -C /opt/
mv pushgateway-1.10.0.linux-amd64/ pushgateway
启动pushgateway
nohup ./pushgateway > pushgateway_run.log 2>&1 &
ss -lntp | grep <pid> # 默认端口9091
LISTEN 0 32768 [::]:9091 [::]:* users:(("pushgateway",pid=27981,fd=3))
常用启动选项,Pushgateway 支持多种启动选项,可以通过命令行参数来配置它的行为。以下是一些常用的选项:
-
--web.listen-address
: 指定 Pushgateway 监听的地址和端口。默认是:9091
。./pushgateway --web.listen-address=":9091"
-
--web.telemetry-path
: 指定 Prometheus 抓取指标的路径。默认是/metrics
。./pushgateway --web.telemetry-path="/metrics"
-
--persistence.file
: 指定用于持久化指标的文件路径。Pushgateway 可以将接收到的指标持久化到文件中。./pushgateway --persistence.file="/path/to/persistence.file"
-
--log.level
: 设置日志级别。可选值包括debug
,info
,warn
,error
,fatal
。./pushgateway --log.level="info"
为prometheus配置文件添加监控项
- job_name: 'pushgateway'
honor_labels: true # 避免覆盖 Pushgateway 中的标签
static_configs:
- targets: ['172.16.24.101:9091'] # Pushgateway 的地址和端口
labels:
instance: pushgateway配置
保存配置后,重启prometheus服务(或 使用热加载 curl -X POST http://<Prometheus-IP>:9090/-/reload
)。
监控flink任务指标
Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过 Dashboard 展示这些指标让开发人员更好地理解作业或集群的状态。
Metrics介绍
Metrics 的类型如下:
-
常用的如 Counter,写过 mapreduce 作业的开发人员就应该很熟悉 Counter,其实含义都是一样的,就是对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。
-
Gauge,Gauge 是最简单的 Metrics,它反映一个值。比如要看现在 Java heap 内存用了多少,就可以每次实时的暴露一个 Gauge,Gauge 当前的值就是heap使用的量。
-
Meter,Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。
-
Histogram,Histogram 比较复杂,也并不常用,Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等。Metric 在 Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识。
Prometheus+PushGateway服务启动
Pushgateway作为独立的服务,位于被采集监控指标的应用程序和Prometheus之间,应用程序主动推送指标到Pushgateway,然后Pushgateway作为target被prometheus抓取这些指标。
Prometheus在正常情况下是采用拉模式从产生metrics的作业或者exporter(比如专门监控主机的NodeExporter)拉取监控数据。但是我们要监控的是 Flink on YARN 作业,想要让Prometheus自动发现作业的提交、结束以及自动拉取数据显然是比较困难的。PushGateway就是一个中转组件,通过配置Flink on YARN作业将metric推到PushGateway,Prometheus再从PushGateway拉取就可以了。
要实现监控 Flink on YARN 作业,需启动 pushgateway 服务,需修改prometheus配置文件 prometheus.yml 并重启prometheus服务,使其监控 pushgateway 。
pushgateway 服务启动:
prometheus 监控 pushgateway :
flink-metrics依赖
从Apache Flink的源码结构我们可以看到,Flink官方支持Prometheus,并且提供了对接Prometheus的jar包,很方便就可以集成。
flink-metrics-prometheus依赖坐标:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-prometheus</artifactId>
<version>${flink.version}</version>
</dependency>
可以在 flink 项目代码里添加一个自定义的指标值:
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext().getMetricGroup().counter("自定义的flink任务指标");
}
@Override
public String map(Event value) throws Exception {
this.counter.inc();
}
// scala写法
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
this.counter = getRuntimeContext.getMetricGroup.counter("自定义的flink任务指标")
}
@throws[Exception]
override def map(value: Event) = {
this.counter.inc()
}
打包后的jar包:
如果使用flink run模式提交任务则需要将 flink-metrics-prometheus jar包上传至flink安装目录 /opt/module/flink/lib 或 plugins 目录下。
如果使用flink run-application模式,则把jar上传至-Dyarn.provided.lib.dirs指定的HDFS目录即可。
修改Flink配置文件
增加flink与pushgateway相关配置:
# 进入到 Flink 的 conf 目录,修改 flink-conf.yaml
# [root@bigdata conf]$ vim flink-conf.yaml
#
# 添加如下配置:
##### 与 Prometheus 集 成 配 置 #####
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
# PushGateway 的主机名与端口号
metrics.reporter.promgateway.host: 172.16.24.101
metrics.reporter.promgateway.port: 9091
# Flink metric在前端展示的标签(前缀)与随机后缀
metrics.reporter.promgateway.jobName: flink-metrics-test
# 是否为推送的指标任务名称(job name)添加一个随机后缀,避免不同flink任务之间的指标冲突。
metrics.reporter.promgateway.randomJobNameSuffix: true
# 在Flink任务或应用程序关闭时,是否从 Pushgateway 中删除已经推送的指标。
metrics.reporter.promgateway.deleteOnShutdown: false
# 设置分组键,包含Flink默认标签+自定义的分组键标签,如:my_metric{job="my-flink-job", instance="localhost:9091", k1="v1", k2="v2"} 10
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
# 指定Flink向Prometheus Pushgateway推送指标的时间间隔。
metrics.reporter.promgateway.interval: 30 SECONDS
提交一个测试的flink任务,打开 prometheus 搜索 flink 相关指标:
Grafana的Flink模版
Grafana 模板页面上搜索flink,选择一个prometheus的flink监控仪表盘模板。
https://grafana.com/grafana/dashboards/?search=flink
搜索flink,会出现非常多的模版。这里有一个问题就是,这些模版开发时间都比较早,随着flink版本的迭代,有一些指标名称进行过更改,对于没有图表显示的需要自行查找指标,并进行修改。
把模版导入Grafana,生成对应的仪表盘展示界面:
————————————————
参考文章:
如何在CDP中使用Prometheus&Grafana对Flink任务进行监控和告警
标签:flink,Flink,Grafana,metrics,Prometheus,pushgateway,Pushgateway From: https://www.cnblogs.com/cyanty/p/18653598