一、前言
1.1、产生背景
ElastAlert最初由Yelp开发并开源,旨在解决实时监控和告警的需求。由于Elasticsearch的日志处理能力强大,许多组织和企业使用它来存储和分析大量的日志数据。然而,仅仅存储和分析数据可能无法满足实时监控和快速响应的需求(XPACK收费),因此ElastAlert应运而生。
1.2、功能介绍
- 实时日志监控:ElastAlert可以实时监听Elasticsearch中的日志数据,支持灵活的查询语法来检索和筛选符合特定条件的日志事件。
- 告警规则定义:用户可以使用YAML配置文件定义告警规则,包括查询条件、时间窗口、触发条件等。
- 多种告警通知方式:ElastAlert支持多种告警通知方式,如电子邮件、Slack、PagerDuty等。
- 内置和自定义告警策略:ElastAlert内置了一些常见的告警策略及自定义告警模块,如持续时间、聚合计数、阈值等。
- 智能告警延迟和降噪:ElastAlert具有智能的延迟和降噪机制(时间窗口、事件频率等),避免因短暂的异常导致大量冗余告警。
二、ElastAlert部署配置
2.1、封装dingtalk告警模块
下载dingtalk组件:
$ wget https://xmars-devops.oss-cn-shanghai.aliyuncs.com/AliCloud/master.zip
增加dingtalk告警逻辑 dingtalk_alert.py
:
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import json
import requests
from elastalert.alerts import Alerter, DateTimeEncoder
from requests.exceptions import RequestException
from elastalert.util import EAException
import time
import hmac
import hashlib
import base64
import urllib.parse
class DingTalkAlerter(Alerter):
required_options = frozenset(['dingtalk_webhook', 'dingtalk_msgtype'])
def __init__(self, rule):
super(DingTalkAlerter, self).__init__(rule)
self.dingtalk_webhook_url = self.rule['dingtalk_webhook']
self.dingtalk_msgtype = self.rule.get('dingtalk_msgtype', 'text')
self.dingtalk_isAtAll = self.rule.get('dingtalk_isAtAll', False)
self.dingtalk_title = self.rule.get('dingtalk_title', '')
self.dingtalk_secret = self.rule.get('dingtalk_secret','')
def format_body(self, body):
return body.encode('utf8')
def alert(self, matches):
headers = {
"Content-Type": "application/json",
"Accept": "application/json;charset=utf-8"
}
body = self.create_alert_body(matches)
payload = {
"msgtype": self.dingtalk_msgtype,
"text": {
"content": body
},
"at": {
"isAtAll": False
}
}
if self.dingtalk_secret!="":
timestamp = str(round(time.time() * 1000))
secret = self.dingtalk_secret
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
self.dingtalk_webhook_url=self.dingtalk_webhook_url+"×tamp={}&sign={}".format(timestamp,sign)
try:
response = requests.post(self.dingtalk_webhook_url,
data=json.dumps(payload, cls=DateTimeEncoder),
headers=headers)
response.raise_for_status()
except RequestException as e:
raise EAException("Error request to Dingtalk: {0}".format(str(e)))
def get_info(self):
return {
"type": "dingtalk",
"dingtalk_webhook": self.dingtalk_webhook_url
}
pass
重新封装基于dingtalk模块的监控告警:
FROM jertel/elastalert-docker:0.2.4
ADD master.zip /opt/elastalert/
RUN cd /opt/elastalert;unzip master.zip;cd elastalert-dingtalk-plugin-master;pip3 install -i https://mirrors.aliyun.com/pypi/simple/ pyOpenSSL==16.2.0;pip3 install -i https://mirrors.aliyun.com/pypi/simple/ setuptools==46.1.3;cp -r elastalert_modules /usr/local/lib/python3.6/;cd /usr/local/lib/python3.6/elastalert_modules; rm -rf dingtalk_alert.py
ADD dingtalk_alert.py /usr/local/lib/python3.6/elastalert_modules/
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
镜像编译&推送
$ docker build -t harbor-local.kubernets.cn/library/elatrt:v22 .
$ docker push harbor-local.kubernets.cn/library/elatrt:v22
2.2、定制K8S方式部署
- configmap:配置文件 & Rules文件
- deployment:elastalert控制器文件
apiVersion: v1
kind: ConfigMap
metadata:
name: elastalert-config
namespace: logging
labels:
app: elastalert
data:
elastalert_config: |- # elastalert配置文件
---
rules_folder: /opt/rules # 指定规则的目录
scan_subdirectories: false
es_host: elasticsearch-master# 修改为当前集群的es链接地址
es_port: 9200
run_every: # 多久从 ES 中查询一次
seconds: 30
buffer_time: #向上翻30分钟查找
minutes: 30
writeback_index: elastalert #创建索引名字
use_ssl: False #ssl不做认证
verify_certs: True
alert_time_limit: # 失败重试限制
minutes: 2400
---
apiVersion: v1
kind: ConfigMap
metadata:
name: elastalert-rules
namespace: logging
labels:
app: elastalert
data:
rule_config.yaml: |- # elastalert规则文件
name: test-alert # 规则名字,唯一值
es_host: elasticsearch-master #es地址,k8s的es
es_port: 9200 #es端口
type: any #所有类型
index: k8s-* #要搜索的索引
num_events: 1
timeframe:
minutes: 1
#1分钟内,统计个数大于等于1个触发报警
filter:
- query:
query_string:
query: "ERROR" #key:value格式,匹配错误日志
alert:
- "elastalert_modules.dingtalk_alert.DingTalkAlerter" #钉钉模块
dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=064dda46xxxxxxe6760d70df9017a27332b3760d143d9ebdb248f0" #钉钉地址
dingtalk_sercurity_tpye: "sign" #钉钉加签格式
dingtalk_msgtype: "text" #消息类型
dingtalk_secret: "SEC82341xxxxxxx01a31xxxxa2bcbae91920ace2acc56cc023ac8" #钉钉加签
alert_subject: "EFK Error!!!" #报警信息
alert_text_type: alert_text_only
alert_text: | #和下面匹配key:value
EFK 日志报错, 参照如下信息进行定位!!!
time: {}
hostname: {}
podName: {}
nameSpaces: {}
message: {}
logIndex: {}
alert_text_args:
- "@timestamp"
- kubernetes.host
- kubernetes.pod_name
- kubernetes.namespace_name
- message
- kubernetes.labels.logIndex
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: elastalert
namespace: logging
labels:
app: elastalert
spec:
selector:
matchLabels:
app: elastalert
template:
metadata:
labels:
app: elastalert
spec:
containers:
- name: elastalert
image: harbor-local.kubernets.cn/library/elatrt:v22
imagePullPolicy: IfNotPresent
command: ["/opt/elastalert/run.sh"]
volumeMounts:
- name: config
mountPath: /opt/config
- name: rules
mountPath: /opt/rules
resources:
limits:
cpu: 512m
memory: 512Mi
requests:
cpu: 50m
memory: 256Mi
volumes:
- name: rules
configMap:
name: elastalert-rules
- name: config
configMap:
name: elastalert-config
items:
- key: elastalert_config
path: elastalert_config.yaml
应用部署:
$ kubectl apply -f elastalert.yaml
$ kubectl get pod -nlogging |grep elastalert
三、测试验证
部署测试pod
apiVersion: v1
kind: Pod
metadata:
name: counter
labels:
logging: "true" # 一定要具有该标签才会被采集
logIndex: "zhdya" # 指定索引名称
spec:
containers:
- name: count
image: busybox
args:
[
/bin/sh,
-c,
'i=0; while true; do echo "ERROR $i: $(date)"; i=$((i+1)); sleep 1; done',# 新增报错ERROR字段
]
Kinbana查看日志数据:
查看钉钉告警:
四、企业场景下的多类型告警方案
4.1、多索引,多匹配
name: MultipleErrorLogs
type: frequency
num_events: 5
timeframe:
minutes: 10
#10分钟内,统计个数大于等于5个触发报警
index:
- myapp-logs-*
- otherapp-logs-*
filter:
- query:
query_string:
query: "error OR exception OR warning"
default_operator: OR
- query:
query_string:
query: "something else"
default_operator: OR
- query:
query_string:
query: "another error"
default_operator: OR
alert:
- "email"
4.2、多个单索引,单匹配
name: ErrorAlert1
type: frequency
num_events: 1
timeframe:
minutes: 5
index: my-index-1
filter:
- query:
query_string:
query: "error OR exception"
default_operator: OR
alert:
- "email"
name: ErrorAlert2
type: frequency
num_events: 1
timeframe:
minutes: 5
index: my-index-2
filter:
- query:
query_string:
query: "file not found OR invalid argument"
default_operator: OR
alert:
- "email"
4.3、基于复杂场景下的configmap
apiVersion: v1
kind: ConfigMap
metadata:
name: elastalert-config
namespace: logging
labels:
app: elastalert
data:
elastalert_config: |- # elastalert配置文件
---
rules_folder: /opt/rules # 指定规则的目录
scan_subdirectories: false
es_host: elasticsearch-master
es_port: 9200
run_every: # 多久从 ES 中查询一次
minutes: 1
buffer_time: #向上翻30分钟查找
minutes: 30
writeback_index: elastalert #创建索引名字
use_ssl: False #ssl不做认证
verify_certs: True
alert_time_limit: # 失败重试限制
minutes: 2400
---
apiVersion: v1
kind: ConfigMap
metadata:
name: elastalert-rules
namespace: logging
labels:
app: elastalert
data:
rule_config.yaml: |- # elastalert规则文件
name: test-alert # 规则名字,唯一值
es_host: elasticsearch-master #es地址,k8s的es
es_port: 9200 #es端口
#type: any #所有类型
index: allapps-* #要搜索的索引
#策略规则,如果在10分钟内,匹配的个数大于等于5,那么就触发钉钉报警
type: frequency
num_events: 5
timeframe:
minutes: 10
filter:
- query:
query_string:
query: "\"获取AccessToken失败\""
alert:
- "elastalert_modules.dingtalk_alert.DingTalkAlerter" #钉钉模块
dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=cc67ce67cb556XXXXXXXda113c2d30f98e4647e704" #钉钉地址
dingtalk_sercurity_tpye: "sign" #钉钉加签格式,感觉可以不要
dingtalk_msgtype: "text" #发消息内容
dingtalk_secret: "SEC2dc9489ccac40XXXXXX4b066c6fd8fd54ffcaa834df2" #钉钉加签
alert_subject: "ERROR!!!" #报警信息
alert_text_type: alert_text_only
alert_text: | #和下面匹配key:value
日志监控
mess: {}
pod-name: {}
alert_text_args:
- message
- kubernetes.pod.name
rule_kafka.yaml: |- # elastalert规则文件
name: kafka-alert # 规则名字,唯一值
es_host: elasticsearch #es地址,k8s的es
es_port: 9200 #es端口
#type: any #所有类型
index: dtk-go-tb-order-* #要搜索的索引
type: frequency
num_events: 5
timeframe:
minutes: 10
filter:
- query:
query_string:
query: "\"写入Kafka消息彻底失败\""
alert:
- "elastalert_modules.dingtalk_alert.DingTalkAlerter" #钉钉模块
dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=cc67ce67cb55XXXXXX113c2d30ea521cf7e704" #钉钉地址
dingtalk_sercurity_tpye: "sign" #钉钉加签格式,感觉可以不要
dingtalk_msgtype: "text" #发消息内容
dingtalk_secret: "SEC2dc9489ccac40fb1XXXXXXX6fd8fd4ffcaa834df2" #钉钉加签
alert_subject: "ERROR!!!" #报警信息
alert_text_type: alert_text_only
alert_text: | #和下面匹配key:value
日志监控
mess: {}
pod-name: {}
alert_text_args:
- message
- kubernetes.pod.name
五、总结
- 部署方式:改良传统docker运行方式,适配当前EFLK模式,运行在Kubernetes集群;
- 规则定义:通过编写规则文件来定义要监控的指标或事件,并设置条件和阈值;
- 告警方式:提升并优化告警方式为快消息类型,提升效率;
- 调优和优化:可以根据企业实际情况进行参数调优。包括调整告警规则、优化查询性能、改进告警策略等;