Alertmanager配置
global: resolve_timeout: 5m smtp_smarthost: 'smtp.163.com:25' smtp_from: '[email protected]' smtp_auth_username: '[email protected]' smtp_auth_password: '11111111' smtp_require_tls: false templates: - '/root/prom/alertmanager-0.26.0.linux-amd64/email.tmpl' route: group_by: ['alertname'] group_wait: 30s group_interval: 5m repeat_interval: 1h receiver: 'dingding.webhook1' routes: - receiver: 'dingding.webhook1' continue: true - receiver: 'email.webhook1' receivers: - name: 'email' email_configs: - to: '[email protected]' html: '{{ template "email.to.html" . }}' send_resolved: true - name: 'dingding.webhook1' webhook_configs: - url: 'http://10.30.92.71:8060/dingtalk/webhook1/send' send_resolved: true - name: 'email.webhook1' webhook_configs: #自定义的web服务API地址 Alertmanager会自动把告警相关数据post到这个API地址 - url: 'http://10.30.92.71:5000' send_resolved: truealertmanager.yml
自研组件开发
# -*- coding: utf-8 -*- import os import json import requests import arrow from flask import Flask from flask import request import socks import socket import smtplib from email.mime.text import MIMEText app = Flask(__name__) socks.set_default_proxy(socks.SOCKS5, "10.30.90.11", 1080) socket.socket = socks.socksocket smtp_server = "smtp.163.com" smtp_port = "25" sender_email = "[email protected]" sender_password = "111111" server = smtplib.SMTP(smtp_server, smtp_port) server.login(sender_email, sender_password) def makealertdata(data): data = json.loads(data) for output in data['alerts'][:]: try: message = output['annotations']['message'] except KeyError: try: message = output['annotations']['description'] except KeyError: message = 'null' if output['status'] == 'firing': status_zh = '报警' title = '【%s】 %s 有新的报警' % (status_zh, output['labels']['alertname']) send_data =f''' {title}\n **告警级别**: {output['labels']['severity']} **告警类型**: {output['labels']['alertname']} **告警主机**: {output['labels']['instance']} **告警详情**: {message} **告警状态**: {output['status']} **触发时间**: {arrow.get(output['startsAt']).to('Asia/Shanghai').format( 'YYYY-MM-DD HH:mm:ss ZZ')}" ''' elif output['status'] == 'resolved': status_zh = '恢复' title = '【%s】 %s 有新的报警' % (status_zh, output['labels']['alertname']) send_data = ''' "title": title, "text": "## %s \n\n" %title + ">**告警级别**: %s \n\n" % output['labels']['severity'] + ">**告警类型**: %s \n\n" % output['labels']['alertname'] + ">**告警主机**: %s \n\n" % output['labels']['instance'] + ">**告警详情**: %s \n\n" % message + ">**告警状态**: %s \n\n" % output['status'] + ">**触发时间**: %s \n\n" % arrow.get(output['startsAt']).to('Asia/Shanghai').format( 'YYYY-MM-DD HH:mm:ss ZZ') + " **触发结束时间**: %s \n" % arrow.get(output['endsAt']).to('Asia/Shanghai').format( 'YYYY-MM-DD HH:mm:ss ZZ') ''' return send_data def send_alert(data): msg = MIMEText(makealertdata(data), _charset='utf-8') msg['Subject'] = '测试邮件主题' msg['From'] = sender_email msg['To'] = '[email protected]' server.sendmail(sender_email, msg['To'], msg.as_string()) print("邮件已成功发送!") @app.route('/', methods=['POST', 'GET']) def send(): if request.method == 'POST': #接收alertmanager自动提交过来的数据 post_data = request.get_data() print(post_data) send_alert(post_data) return 'success' else: return 'weclome to use prometheus alertmanager email webhook server!' if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)python-flask-web
接口联调
标签:AlertManager,自研,send,import,组件,output,告警,data,email From: https://www.cnblogs.com/yxh168/p/18003477