# -*- coding: UTF-8 -*-
import mysql.connector
import sys
import datetime
import importlib
import requests
importlib.reload(sys)
#修改hadoop地址参数为实际地址
env_code='生产环境 http://172.16.159.75:18888/'
hadoop_code='生产环境hadoop http://172.16.159.75:50070/'
MYSQL_CONFIG= {'host': '172.16.159.72', 'user': 'oozie', 'password': 'oozie%$#@!eyushangshan', "database": 'oozie','port': 3306}
#修改xxx为实际钉钉token
url = 'https://oapi.dingtalk.com/robot/send?access_token=xxx'
hadoop_url= ["http://172.16.159.74:50070/dfshealth.html#tab-overview", "http://172.16.159.75:50070/dfshealth.html#tab-overview"]
def connMysql():
db = mysql.connector.connect(**MYSQL_CONFIG)
print("连接上服务器:%s数据库:%s" % ("172.16.159.139", "oozie"))
return db
def failtaskMonitor(db):
cur = db.cursor()
sql = "select app_name,user_name,end_time,start_time from WF_JOBS where status='KILLED' and end_time between date_add(now(),interval-2 day) and now()"
cur.execute(sql)
datalist = cur.fetchall()
for (app_name,user_name,end_time,start_time) in datalist:
sendmsg = {
"msgtype": "markdown",
"markdown": {
"title": "任务失败告警",
"text": "告警时间: %s \n\n" % datetime.datetime.now().strftime('%Y-%m-%d') +
"告警环境:%s \n\n" % env_code +
"任务【%s】执行失败,请及时处理,并检查环境数据是否正常 \n\n" % app_name +
"**任务开始时间**: %s \n\n" % start_time +
"**任务结束时间**: %s \n\n" % end_time +
"**任务提交人**: %s \n\n" % user_name
}
}
sendMsg(sendmsg)
def delaytaskMonitor(db):
cur = db.cursor()
sql = "select app_name,user_name,start_time,TIMESTAMPDIFF(MINUTE, start_time, now()) as diff from WF_JOBS where status='RUNNING' and TIMESTAMPDIFF(MINUTE, start_time, now()) >60 and app_name <> 'error_log_track_workflow'"
cur.execute(sql)
datalist = cur.fetchall()
for (app_name,user_name,start_time,diff) in datalist:
sendmsg = {
"msgtype": "markdown",
"markdown": {
"title": "任务延迟告警",
"text": "告警时间: %s \n\n" % datetime.datetime.now().strftime('%Y-%m-%d') +
"告警环境:%s \n\n" % env_code +
"任务【%s】执行超时,请优化\n\n" % app_name +
"**任务已执行**: %s 分钟\n\n" % diff +
"**任务开始时间**: %s \n\n" % start_time +
"**任务提交人**: %s \n\n" % user_name
}
}
sendMsg(sendmsg)
def hadooptaskMonitor(url):
hadoop_result1 = requests.get(url[0])
hadoop_result2 = requests.get(url[1])
if ( hadoop_result1.status_code != 200 or hadoop_result2.status_code != 200 ):
sendmsg = {
"msgtype": "markdown",
"markdown": {
"title": "hadoop故障",
"text": "告警时间: %s \n\n" % datetime.datetime.now().strftime('%Y-%m-%d') +
"告警环境:%s \n\n" % hadoop_code +
"hadoop出现严重,请检查hadoop状态! \n\n"
}
}
sendMsg(sendmsg)
def sendMsg(sendmsg):
req = requests.post(url, json=sendmsg)
result = req.json()
if result['errcode'] != 0:
print('notify dingtalk error: %s' % result['errcode'])
if __name__ == '__main__':
now_time = datetime.datetime.now().minute
db = connMysql()
#失败任务监控
failtaskMonitor(db)
delaytaskMonitor(db)
hadooptaskMonitor(hadoop_url)
#延时任务监控============================
#if now_time == 0 :
#每小时执行一次
#delaytaskMonitor(db)
标签:name,db,hadoop,datetime,任务,监控,time,now
From: https://www.cnblogs.com/wes1502/p/17480479.html