首页 > 编程语言 >云计算-使用python发送日志易告警

云计算-使用python发送日志易告警

时间:2023-11-15 11:31:59浏览次数:37  
标签:logging python self error content Nginx time 日志 告警

现状

公司有使用日志易,对应用层监控,并实现了大屏告警。

云计算-使用python发送日志易告警_sql

同时也能够查询到相关的日志记录

云计算-使用python发送日志易告警_Nginx_02

相关的日志易查询语句如下:

#总量
logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` | stats count() as TotalCount
#错误数(大于500)
logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` AND Nginx.status:5?? | stats count() as cnt
#平均时长
logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats avg(Nginx.request_time) as avg_request_time | eval avg_request_time=if( avg_request_time==null,0,format("%.3f",avg_request_time))
#独立访客
logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND Nginx.domainname:* AND NOT (Nginx.domainname:"" OR Nginx.domainname"-" OR tag:api) | where len(Nginx.domainname)<=11 | stats dc(Nginx.domainname) as cnt
#最大响应时长
logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats max(Nginx.request_time) as max_request_time | eval max_request_time=if( max_request_time==null,0,format("%.3f",max_request_time))
#最小响应时长
logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats min(Nginx.request_time) as min_request_time | eval min_request_time=if( min_request_time==null,0,format("%.3f",min_request_time))
#错误数(4??)
logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` AND (Nginx.status:>401 AND Nginx.status:<500) | stats count() as cnt

但是现在比较操蛋的是,日志易因为种种原因,不能开放给我们使用,我们只有查看制度权限,只能看到图表,搜索到日志,但是没有告警发送,邮件汇总等信息,十分不方便。现在想用python爬取日志易的信息,通过邮件发送,同时发送钉钉告警

思路

1、通过python的request模块,调取日志易的API,形成表格,通过邮件发送报表

2、通过python读取日志易API,查询到5XX错误,通过钉钉发送告警

3、通过python直接请求接口,返回5XX错误,通过钉钉发送告警


邮件报表发送

配置文件

记录了邮件地址,日志易密码,接口url等信息

config.py

#日志易的域标识,就是id
uri="http://10.136.1.88:8080/v1/"
token="7a84d69cfa25b056ba9ff15fee5f32d2"
#账户
username="xxxxx"
#密码
passwd="xxxxx"

#系统名
"""
应用模块	
智能工单	sharedWorkOrder
移动门户	mbworkordertest
会议室预定	WSS\/meeting
打印归档	archivesManager
移动入职	newEmployeeEntry
小助手	sharedHelper
工资单咨询	mbSharedOrderPortal\/user?type=payrollCounseling
工资单重发	mbSharedOrderPortal\/user?type=payrollReissue
行政报修0	mbSharedOrderPortal\/user?type=adminFacilityRepairs
员工信息变更	mbSharedOrderPortal\/user?type=empMsgUpdate
移动端我的订单	mbSharedOrderPortal\/user?type=myOrder
新员工入职系统	Default\/NewEmployeelogin
数据平台生产地址	
离职管理	employeeResign
模板管理	templateManager
台账	ledgerManager
iam	iam
消息平台	msgService
getFileArchiveStatus异常	

访问脚本	
总量	logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` | stats count() as TotalCount
错误数(大于500)	logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` AND NOT ( Nginx.status:2?? OR Nginx.status:3?? OR Nginx.status:1??) | stats count() as cnt
平均时长	logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats avg(Nginx.request_time) as avg_request_time | eval avg_request_time=if( avg_request_time==null,0,format("%.3f",avg_request_time))
独立访客	logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND Nginx.domainname:* AND NOT (Nginx.domainname:"" OR Nginx.domainname"-" OR tag:api) | where len(Nginx.domainname)<=11 | stats dc(Nginx.domainname) as cnt
最大响应时长	logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats max(Nginx.request_time) as max_request_time | eval max_request_time=if( max_request_time==null,0,format("%.3f",max_request_time))
最小相应时长	logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats min(Nginx.request_time) as min_request_time | eval min_request_time=if( min_request_time==null,0,format("%.3f",min_request_time))
错误数(4??)	logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` AND  (Nginx.status:>401 AND Nginx.status:<500) | stats count() as cnt

"""
SysNameList={
"智能工单":"sharedWorkOrder",
"移动门户":"mbworkordertest",
"会议室预定":"WSS\/meeting",
"打印归档":"archivesManager",
"移动入职":"newEmployeeEntry",
"小助手":"sharedHelper",
"工资单查询":"mbSharedOrderPortal\/user?type=payrollCounseling",
"工资单重发":"mbSharedOrderPortal\/user?type=payrollReissue",
"行政报修":"mbSharedOrderPortal\/user?type=adminFacilityRepairs",
"员工信息变更":"mbSharedOrderPortal\/user?type=empMsgUpdate",
"移动端我的订单":"mbSharedOrderPortal\/user?type=myOrder",
"新员工入职":"Default\/NewEmployeelogin",
"离职管理":"employeeResign",
"模板管理":"templateManager",
"台账":"ledgerManager",
"IAM":"am",
"消息平台":"msgService",
"newEmployeeUtil":"newEmployeeUtil",
"ledgerManager":"ledgerManager",
"customerService":"customerService",
"psaDataUtil":"psaDataUtil",
"account":"account",
"sla":"sla",
"mbworkorder":"mbworkorder",
"messageService":"messageService",
}

#邮箱配置
host_server = 'xxxxx'
port='465'
sender = 'xxxxxx' 
pwd = 'xxxxx1' #邮箱密码
receiver = ['xxxxxxxx',] 

脚本主程序

main.py,定时每天早上9:30执行

import requests
import config
import json
import logging
import os
import sys
import re
from datetime import datetime
import time
import pandas as pd
import openpyxl
from openpyxl.styles import Font,Color
import smtplib, ssl
from email.mime.text import MIMEText #邮件正文
from email.mime.multipart import MIMEMultipart #邮件主体
from email.header import Header #邮件头,标题、收件人等
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders


#查询日志易API
#增加表格样式标记
#version 2.0
#yangchao

"""
获取信息
1、获取总量
2、错误数(大于500)
3、平均时长
4、独立访客
5、最大相应时长
6、最小相应时长
7、错误数(4??)

"""

#配置日志
abspath=os.path.abspath(sys.argv[0])
current_path=os.path.dirname(abspath)
logging.basicConfig(filename=current_path+'/info.log',encoding='utf-8',level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')

#格式化时间
time_now=datetime.now()
format_date=time_now.strftime("%Y-%m-%d")


class Get_Api():
    """
    获取api信息
    """
    def __init__(self) -> None:
     """
     初始化信息
     """
     self.uri=config.uri
     self.token=config.token
     self.username=config.username
     self.passwd=config.passwd
     self.url=str(self.uri+self.token+"/"+self.username+"/spl/search")
     #请求头
     self.header={'Content-Type': 'application/json'}
     #Http的base认证
     self.auth=requests.auth.HTTPBasicAuth(self.username,self.passwd)

     #基本查询参数
     #任务名
     self.task_name={"task_name":"my_search"}
     #时间范围,一天前
     self.time_range={"time_range":"-1d,now"}
     #从首页显示
     self.page={"page":0}
     #分类
     self.category={"category":"search"}
     #查询语句
     self.query={"query":""}
     #构建查询参数字典
     self.params={}
     self.params.update(self.task_name)
     self.params.update(self.page)
     self.params.update(self.category)
     self.params.update(self.time_range)
     #base认证
     self.credentials=requests.auth.HTTPBasicAuth(self.username,self.passwd)

    def GetTotal(self,sysname:str)->int:
       """
       获取总量
       sysname:系统名
       """
       sql="logtype:Nginx AND Nginx.url:\/" +sysname+ "* AND NOT `JS_CSS_JPG_FONT` | stats count() as cnt"
       self.query["query"]=sql
       self.params.update(self.query)

       try:
            respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
           # respson=requests.get(query_url,auth=credentials,headers=self.header)
            content=respson.text
            if respson.status_code ==200 and "error_code" not in content:
               logging.info("查询总量成功")
            elif respson.status_code !=200 and "error_code"  in content:
               logging.error("查询总量失败%s:",content)
            else:
               logging.error("查询总量失败%s:",content)
  
       except Exception as e:
            logging.error("查询总量失败%s:",e)
            raise Exception("查询总量失败%s:",e)
        
       content_dict=json.loads(content)
       #pprint.pprint(content_dict)
       try:
            cnt=content_dict["result"]["sheets"]["rows"][0]["cnt"]
            print(cnt)
       except Exception as e:
            logging.error("转换查询总量失败:%s",e)
            raise Exception("转换查询总量失败:%s",e)
       info=sysname+":"+str(cnt)
       logging.info("转换查询总量成功:%s",info)
       return int(cnt)
    
    def GetErr5xx(self,sysname:str)->int:
       """
       获取错误数大于500
       计算错误占比
       sysname:系统名
       """
       sql="logtype:Nginx AND Nginx.url:\/" +sysname+"* AND NOT `JS_CSS_JPG_FONT` AND Nginx.status:5??   | stats count() as cnt"
       self.query["query"]=sql
       self.params.update(self.query)
       try:
            respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
            content=respson.text
            if respson.status_code ==200 and "error_code" not in content:
               logging.info("查询5xx成功")
            elif respson.status_code !=200 and "error_code"  in content:
               logging.error("查询5XX失败%s:",content)
            else:
               logging.error("查询5XX失败%s:",content)
  
       except Exception as e:
            logging.error("查询5XX失败%s:",e)
            raise Exception("查询5XX失败%s:",e)
        
       content_dict=json.loads(content)
       #pprint.pprint(content_dict)
       try:
            cnt=content_dict["result"]["sheets"]["rows"][0]["cnt"]
            print(cnt)
       except Exception as e:
            logging.error("转换查询5XX失败:%s",e)
            raise Exception("转换查询5XX失败:%s",e)
       info=sysname+":"+str(cnt)
       logging.info("转换查询5XX成功:%s",info)
       return int(cnt)

    
    def GetAvgResp(self,sysname:str)->float:
       """
       获取平均相应时长
       sysname:系统名
       """
       sql="logtype:Nginx AND Nginx.url:\/"+sysname+'* NOT `JS_CSS_JPG_FONT` | stats avg(Nginx.request_time) as avg_request_time | eval avg_request_time=if( avg_request_time==null,0,format("%.3f",avg_request_time))'

       self.query["query"]=sql
       self.params.update(self.query)
       try:
            respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
            content=respson.text
            if respson.status_code ==200 and "error_code" not in content:
               logging.info("查询平均时长成功")
            elif respson.status_code !=200 and "error_code"  in content:
               logging.error("查询平均时长失败%s:",content)
            else:
               logging.error("查询平均时长失败%s:",content)
  
       except Exception as e:
            logging.error("查询平均时长失败%s:",e)
            raise Exception("查询平均时长失败%s:",e)
        
       content_dict=json.loads(content)
       #pprint.pprint(content_dict)
       try:
            cnt=content_dict["result"]["sheets"]["rows"][0]["avg_request_time"]
            print(cnt)
       except Exception as e:
            logging.error("转换查询平均时长失败:%s",e)
            raise Exception("转换查询平均时长失败:%s",e)
       info=sysname+":"+str(cnt)
       logging.info("转换查询平均时长成功:%s",info)
       return float(cnt)
      
      

    def GetPv(self,sysname:str)->int:
       """
       获取独立访问数
       sysname:系统名
       """
       sql="logtype:Nginx AND Nginx.url:\/"+sysname+'* AND Nginx.domainname:* AND NOT (Nginx.domainname:"" OR Nginx.domainname"-" OR tag:api) | where len(Nginx.domainname)<=11 | stats dc(Nginx.domainname) as cnt'
       self.query["query"]=sql
       self.params.update(self.query)
       try:
            respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
            content=respson.text
            if respson.status_code ==200 and "error_code" not in content:
               logging.info("查询pv成功")
            elif respson.status_code !=200 and "error_code"  in content:
               logging.error("查询pv失败%s:",content)
            else:
               logging.error("查询pv失败%s:",content)
  
       except Exception as e:
            logging.error("查询pv失败%s:",e)
            raise Exception("查询pv失败%s:",e)
        
       content_dict=json.loads(content)
       #pprint.pprint(content_dict)
       try:
            cnt=content_dict["result"]["sheets"]["rows"][0]["cnt"]
            print(cnt)
       except Exception as e:
            logging.error("转换查询pv失败:%s",e)
            raise Exception("转换查询pv失败:%s",e)
       info=sysname+":"+str(cnt)
       logging.info("转换查询pv成功:%s",info)
       return int(cnt)
       
    
    
    def GetMaxResp(self,sysname:str)->float:
       """
       获取最大相应时长
       sysname:系统名
       """
       sql="logtype:Nginx AND Nginx.url:\/"+sysname+'* NOT `JS_CSS_JPG_FONT` | stats max(Nginx.request_time) as max_request_time | eval max_request_time=if( max_request_time==null,0,format("%.3f",max_request_time))'
       self.query["query"]=sql
       self.params.update(self.query)
       try:
            respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
            content=respson.text
            if respson.status_code ==200 and "error_code" not in content:
               logging.info("查询最大时长成功")
            elif respson.status_code !=200 and "error_code"  in content:
               logging.error("查询最大时长失败%s:",content)
            else:
               logging.error("查询最大时长失败%s:",content)
  
       except Exception as e:
            logging.error("查询最大时长失败%s:",e)
            raise Exception("查询最大时长失败%s:",e)
        
       content_dict=json.loads(content)
       #pprint.pprint(content_dict)
       try:
            cnt=content_dict["result"]["sheets"]["rows"][0]["max_request_time"]
            print(cnt)
       except Exception as e:
            logging.error("转换查询最大时长失败:%s",e)
            raise Exception("转换查询最大时长失败:%s",e)
       info=sysname+":"+str(cnt)
       logging.info("转换查询最大时长成功:%s",info)
       return float(cnt)
    


    def GetMinResp(self,sysname:str)->float:
       """
       获取最小相应时长
       sysname:系统名
       """
       sql="logtype:Nginx AND Nginx.url:\/"+sysname+'* NOT `JS_CSS_JPG_FONT` | stats min(Nginx.request_time) as min_request_time | eval min_request_time=if( min_request_time==null,0,format("%.3f",min_request_time))'
       self.query["query"]=sql
       self.params.update(self.query)
       try:
            respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
            content=respson.text
            if respson.status_code ==200 and "error_code" not in content:
               logging.info("查询最小时长成功")
            elif respson.status_code !=200 and "error_code"  in content:
               logging.error("查询最小时长失败%s:",content)
            else:
               logging.error("查询最小时长失败%s:",content)
  
       except Exception as e:
            logging.error("查询最小时长失败%s:",e)
            raise Exception("查询最小时长失败%s:",e)
        
       content_dict=json.loads(content)
       #pprint.pprint(content_dict)
       try:
            cnt=content_dict["result"]["sheets"]["rows"][0]["min_request_time"]
            print(cnt)
       except Exception as e:
            logging.error("转换查询最小时长失败:%s",e)
            raise Exception("转换查询最小时长失败:%s",e)
       info=sysname+":"+str(cnt)
       logging.info("转换查询最小时长成功:%s",info)
       return float(cnt)
    
      
    
    def GetErr4xx(self,sysname:str)->int:
       """
       获取最大相应时长
       sysname:系统名
       """
       sql="logtype:Nginx AND Nginx.url:\/"+sysname+'* AND NOT `JS_CSS_JPG_FONT` AND  (Nginx.status:>401 AND Nginx.status:<500) | stats count() as cnt'
       self.query["query"]=sql
       self.params.update(self.query)
       try:
            respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
            content=respson.text
            if respson.status_code ==200 and "error_code" not in content:
               logging.info("查询4xx成功")
            elif respson.status_code !=200 and "error_code"  in content:
               logging.error("查询4XX失败%s:",content)
            else:
               logging.error("查询4XX失败%s:",content)
  
       except Exception as e:
            logging.error("查询4XX失败%s:",e)
            raise Exception("查询4XX失败%s:",e)
        
       content_dict=json.loads(content)
       #pprint.pprint(content_dict)
       try:
            cnt=content_dict["result"]["sheets"]["rows"][0]["cnt"]
            print(cnt)
       except Exception as e:
            logging.error("转换查询4XX失败:%s",e)
            raise Exception("转换查询4XX失败:%s",e)
       info=sysname+":"+str(cnt)
       logging.info("转换查询4XX成功:%s",info)
       return int(cnt)
       


class Process_Msg():
    """
    1、转换字典为表格
    2、转换字典为html
    3、通过邮件发送html格式邮件和表格
    4、数值计算
    """
    pass
    def __init__(self,host_server,port,sender,pwd,receiver) -> None:
        #self.excel=None
        self.html=None
        self.new_html=None
        self.host_server=host_server
        self.port=port
        self.sender=sender
        self.pwd=pwd
        self.receiver=receiver
        self.file=None

    def process_msg(self,data1:dict,data2:dict,data3:dict,data4:dict,data5:dict,data6:dict,data7:dict,data8:dict,data9:dict,data10:dict):
        """
        转换为表格
        """
        data_dict={}
        try:
            data_dict["二级系统"]=data1
            data_dict["请求数"]=data2
            data_dict["平均响应时间"]=data3
            data_dict["最大响应时间"]=data4
            data_dict["最小响应时间"]=data5
            data_dict["用户数"]=data6
            data_dict["请求错误数(大于500)"]=data7
            data_dict["错误占比(500)"]=data8
            data_dict["请求错误数(大于401)"]=data9
            data_dict["错误占比(大于401)"]=data10
            df=pd.DataFrame.from_dict(data_dict,orient='index') #把字典转换为dict
            df=df.T #行列转换
            self.file=str(current_path+"/report"+format_date+".xlsx")
            #self.file=str(".report"+format_date+".xlsx")
            #self.html=str("report"+format_date+".html")
            self.html=str(current_path+"/report"+format_date+".html")
            if os.path.exists(self.file):
                os.remove(self.file)
            with pd.ExcelWriter(self.file,engine="openpyxl") as w: #写入excel表格
                df.to_excel(w,sheet_name="一天统计",index="SN") #保存到sheet
            #df.to_excel(self.excel,sheet_name="一天统计",index="SN")
            df.to_html(self.html,index=False)
            #self.html=df.to_html(index=False)
            #self.html=re.sub('\n+','',self.html)
       
        except Exception as e:
            logging.error("转换数据失败%s",e)
            raise Exception("数据转换失败%s",e)
        if self.file or self.html:
            logging.info("转换数据成功,生成excel和html!")
        return self.file,self.html
    
    def change_color(self) ->None:
        #标记表格颜色
        wb=openpyxl.load_workbook(self.file)
        wbsheet=wb["一天统计"]
        for row in wbsheet.iter_rows(min_row=2,max_row=wbsheet.max_row,min_col=4,max_col=4):
            for cell in row:
                if float(cell.value) >= 3.000:
                    cell.font= Font(color=Color(rgb="FF0000"))
        for row in wbsheet.iter_rows(min_row=2,max_row=wbsheet.max_row,min_col=5,max_col=5):
            for cell in row:
                if float(cell.value) >= 3.000:
                    cell.font= Font(color=Color(rgb="FF0000"))
        for row in wbsheet.iter_rows(min_row=2,max_row=wbsheet.max_row,min_col=9,max_col=9):
            for cell in row:
               if type(cell.value)==str:
                  value=float(cell.value.strip("%"))
                  if value >=10.000:
                     cell.font= Font(color=Color(rgb="FF0000")) 
               else:
                  value=float(cell.value)
                  if value >=10.000:
                      cell.font=Font(color=Color(rgb="FF0000"))
        for row in wbsheet.iter_rows(min_row=2,max_row=wbsheet.max_row,min_col=11,max_col=11):
            for cell in row:
               if type(value)==str:
                  value=float(cell.value.strip("%"))
                  if value >=10.000:
                     cell.font= Font(color=Color(rgb="FFFF00"))
               else:
                  value=float(value)
                  if value >=10.000:
                      cell.font= Font(color=Color(rgb="FFFF00")) 
        wb.save(filename=self.file)
        
        #改变html样式
        self.new_html=str(current_path+"/report-"+format_date+".html")
        result_list=[]
        with open (self.html,"r",encoding="utf-8") as f:
            lines=f.readlines()
            for line in lines:
                if "<td>" in line:
                    metric=(str(line.strip()).lstrip("<td>")).rstrip("</td>")
                    if "." in metric:
                        if "%" not in metric:
                           if float(metric) >= 3.000:
                               newline='<td style="background-color:rgb(255,0,0)">'+metric+'</td>\n'
                               result_list.append(newline)
                           else:
                               result_list.append(line)
                        elif "%" in metric:
                            metric=metric.strip('%')
                            if float(metric) >= 10.000:
                                newline='<td style="background-color:rgb(255,0,0)">'+metric+'%</td>\n'
                                result_list.append(newline)
                            else:
                                result_list.append(line)
                        else:
                            result_list.append(line)
                    else:
                        result_list.append(line)
                else:
                    result_list.append(line)
        #删除旧的html
        if os.path.exists(self.html):
            os.remove(self.html)
        with open(self.new_html,"w",encoding="utf-8") as file:
            for line in result_list:
                file.write(line)
         
            
                    
    
    def send_msg(self)->None:
        """
        通过邮件发送报表
        """
        with open(self.new_html,"r",encoding="utf-8") as f :
            html_content=f.read()
        mail_content="<h1>昨天报表情况</h1><p>以下是报表信息</p>"+html_content+"<p>详细见附件</p>"\
        +"<h2>模块信息</h2>"\
        +"<p>智能工单	sharedWorkOrder                                      </p>"\
        +"<p>移动门户	mbworkordertest                                      </p>"\
        +"<p>会议室预定	WSS\/meeting                                         </p>"\
        +"<p>打印归档	archivesManager                                      </p>"\
        +"<p>移动入职	newEmployeeEntry                                     </p>"\
        +"<p>小助手	sharedHelper                                         </p>"\
        +"<p>工资单咨询	mbSharedOrderPortal\/user?type=payrollCounseling     </p>"\
        +"<p>工资单重发	mbSharedOrderPortal\/user?type=payrollReissue        </p>"\
        +"<p>行政报修0	mbSharedOrderPortal\/user?type=adminFacilityRepairs  </p>"\
        +"<p>员工信息变更	mbSharedOrderPortal\/user?type=empMsgUpdate      </p>"\
        +"<p>移动端我的订单	mbSharedOrderPortal\/user?type=myOrder           </p>"\
        +"<p>新员工入职系统	Default\/NewEmployeelogin                        </p>"\
        +"<p>离职管理	employeeResign                                       </p>"\
        +"<p>模板管理	templateManager                                      </p>"\
        +"<p>台账	ledgerManager                                            </p>"\
        +"<p>iam	iam                                                      </p>"\
        +"<p>消息平台	msgService                                           </p>"\
        +"<p>newEmployeeUtil    newEmployeeUtil    </p>"\
        +"<p>ledgerManager    ledgerManager	</p>"\
        +"<p>customerService    customerService	</p>"\
        +"<p>psaDataUtil    psaDataUtil	</p>"\
        +"<p>账单    account	</p>"\
        +"<p>工单    sla		</p>"\
        +"<p>模板    mbworkorder		</p>"\
        +"<p>messageService    messageService		</p>"\
        +"<h2>查询语句</h2>"\
        +"<h3>总量</h3>"\
        +'<p>logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` | stats count() as TotalCount</p>'\
        +'<h3>错误数(大于500)</h3>'\
        +'<p>logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` AND  Nginx.status:5?? | stats count() as cnt</p>'\
        +'<h3>平均时长</h3>'	\
        +'<p>logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats avg(Nginx.request_time) as avg_request_time | eval avg_request_time=if( avg_request_time==null,0,format("%.3f",avg_request_time))</p>'\
        +'<h3>独立访客</h3>'\
        +'<p>logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND Nginx.domainname:* AND NOT (Nginx.domainname:"" OR Nginx.domainname"-" OR tag:api) | where len(Nginx.domainname)<=11 | stats dc(Nginx.domainname) as cnt</p>'\
        +'<h3>最大响应时长</h3>'\
        +'<p>logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats max(Nginx.request_time) as max_request_time | eval max_request_time=if( max_request_time==null,0,format("%.3f",max_request_time))</p>'\
        +'<h3>最小响应时长</h3>'\
        +'<p>logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats min(Nginx.request_time) as min_request_time | eval min_request_time=if( min_request_time==null,0,format("%.3f",min_request_time))</p>'\
        +'<h3>错误数(4??)</h3>'\
        +'<p>logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` AND  (Nginx.status:>401 AND Nginx.status:<500) | stats count() as cnt'\
        +'<p>请登录日志易平台详细查询http://10.136.1.88/auth/login/?SessionExpired=true&RefererUrl=%2Fdashboard%2F56%2F105%2F</p>'
        mail_title="日志易站点可用性自动化报表发送"
        #发送多个收件人
        msg=MIMEMultipart()
        msg["From"]=self.sender
        msg["To"]=','.join(self.receiver)
        msg["Subject"]=mail_title
        #添加邮件内容
        msg.attach(MIMEText(mail_content,'html','utf-8'))
        #添加邮件附件
        send_file=open(self.file,'rb')
        part = MIMEBase('application', 'octet-stream')
        part.set_payload(send_file.read())
        encoders.encode_base64(part)
        part.add_header('Content-Disposition', "attachment; filename= %s" % self.file)
        msg.attach(part)
        send_file.close()


        #建立连接并发送邮件
        try:
            """
            with smtplib.SMTP(self.host_server,self.port) as server:
                server.starttls()
                server.login(self.sender,self.pwd)
                server.send_message(msg)
            """
            server=smtplib.SMTP_SSL(self.host_server,self.port)
            server.login(self.sender,self.pwd)
            server.sendmail(self.sender,self.receiver,msg.as_string())
            server.quit()
            
        except Exception as e:
            if e:
                logging.error("邮件发送失败:%s",e)
                raise Exception("邮件发送失败%s:",e)
        logging.info("邮件发送成功")

      #删除文件
        if os.path.exists(self.new_html):
            os.remove(self.new_html)
        if os.path.exists(self.file):
            os.remove(self.file)

 


if __name__=="__main__":
   start_time=time.time()
   #总量列表
   TotalList=[]
   #错误数大于500
   Err5xxList=[]
   #平均时长
   AvgRespList=[]
   #独立访问
   PvList=[]
   #最大响应时长
   MaxRespList=[]
   #最小响应时长
   MinRespList=[]
   #错误数4XX
   Err4xxList=[]
   #错误占比5XX
   Err5xxRateList=[]
   #错误占比4XX
   Err4xxRateList=[]
   #系统名称
   SysNameList=[]
   GetApi=Get_Api()
   logging.info("程序开始运行%s",format_date)
   #循环字典,读取所有系统指标
   for name,sysname in config.SysNameList.items():
      Total=(GetApi.GetTotal(sysname))
      if Total==0:
          TotalList.append(0)
          Err5xxRateList.append(0)
          Err4xxRateList.append(0)
          Err5xxList.append(0)
          Err4xxList.append(0)
      elif Total !=0:
          Err5xx=(GetApi.GetErr5xx(sysname))
          Err4xx=(GetApi.GetErr4xx(sysname))
          try:
            #计算5xx错误占比
            Err5xxRate="{:.3f}".format((Err5xx/Total)*100)
            Err5xxRate=Err5xxRate+"%"
          except Exception as e:
            logging.error("转换查询5XX占比失败:%s",e)
            raise Exception("转换查询5XX占比失败:%s",e)
          logging.info("转换查询5XX占比成功:%s:%s",sysname,Err5xxRate)
            #计算4xx错误占比
          try:
            Err4xxRate="{:.3f}".format((Err4xx/Total)*100)
            Err4xxRate=Err4xxRate+"%"
          except Exception as e:
            logging.error("转换查询5XX占比失败:%s",e)
            raise Exception("转换查询5XX占比失败:%s",e)
          logging.info("转换查询4XX占比成功:%s:%s",sysname,Err4xxRate)
          TotalList.append(Total)
          Err5xxRateList.append(Err5xxRate)
          Err4xxRateList.append(Err4xxRate)
          Err5xxList.append(Err5xx)
          Err4xxList.append(Err4xx)

      AvgRespList.append(GetApi.GetAvgResp(sysname=sysname))
      PvList.append(GetApi.GetPv(sysname))
      MaxRespList.append(GetApi.GetMaxResp(sysname))
      MinRespList.append(GetApi.GetMinResp(sysname))
      SysNameList.append(name)
      
   ProcessMsg=Process_Msg(host_server=config.host_server,
                          port=config.port,
                          sender=config.sender,
                          pwd=config.pwd,
                          receiver=config.receiver,
                          )
   ProcessMsg.process_msg(data1=SysNameList,data2=TotalList,data3=AvgRespList,
                          data4=MaxRespList,data5=MinRespList,data6=PvList,data7=Err5xxList,
                          data8=Err5xxRateList,data9=Err4xxList,data10=Err4xxRateList,
                          )
   ProcessMsg.change_color()
   ProcessMsg.send_msg()
  
   end_time=time.time()

   logging.info("程序结束运行,共耗时%s秒",str(end_time-start_time))

使用效果

云计算-使用python发送日志易告警_Nginx_03

云计算-使用python发送日志易告警_CSS_04

钉钉告警

配置文件

#日志易的域标识,就是id
rizhiyi_uri="http://10.136.1.88:8080/v1/"
rizhiyi_token="7a84d69cfa25b056ba9ff15fee5f32d2"
#账户
rizhiyi_username="xxxxxx"
#密码
rizhiyi_passwd="xxxxxx"

#系统名
"""
应用模块	
智能工单	sharedWorkOrder
移动门户	mbworkordertest
会议室预定	WSS\/meeting
打印归档	archivesManager
移动入职	newEmployeeEntry
小助手	sharedHelper
工资单咨询	mbSharedOrderPortal\/user?type=payrollCounseling
工资单重发	mbSharedOrderPortal\/user?type=payrollReissue
行政报修0	mbSharedOrderPortal\/user?type=adminFacilityRepairs
员工信息变更	mbSharedOrderPortal\/user?type=empMsgUpdate
移动端我的订单	mbSharedOrderPortal\/user?type=myOrder
新员工入职系统	Default\/NewEmployeelogin
数据平台生产地址	
离职管理	employeeResign
模板管理	templateManager
台账	ledgerManager
iam	iam
消息平台	msgService
getFileArchiveStatus异常	

访问脚本	
总量	logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` | stats count() as TotalCount
错误数(大于500)	logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` AND NOT ( Nginx.status:2?? OR Nginx.status:3?? OR Nginx.status:1??) | stats count() as cnt
平均时长	logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats avg(Nginx.request_time) as avg_request_time | eval avg_request_time=if( avg_request_time==null,0,format("%.3f",avg_request_time))
独立访客	logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND Nginx.domainname:* AND NOT (Nginx.domainname:"" OR Nginx.domainname"-" OR tag:api) | where len(Nginx.domainname)<=11 | stats dc(Nginx.domainname) as cnt
最大响应时长	logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats max(Nginx.request_time) as max_request_time | eval max_request_time=if( max_request_time==null,0,format("%.3f",max_request_time))
最小相应时长	logtype:Nginx AND Nginx.url:\/$SubMenuName$* NOT `JS_CSS_JPG_FONT` | stats min(Nginx.request_time) as min_request_time | eval min_request_time=if( min_request_time==null,0,format("%.3f",min_request_time))
错误数(4??)	logtype:Nginx AND Nginx.url:\/$SubMenuName$* AND NOT `JS_CSS_JPG_FONT` AND  (Nginx.status:>401 AND Nginx.status:<500) | stats count() as cnt

"""
SysNameList={
#"智能工单":"sharedWorkOrder",
"移动门户":"mbworkordertest",
"会议室预定":"WSS\/meeting",
"打印归档":"archivesManager",
"移动入职":"newEmployeeEntry",
"小助手":"sharedHelper",
"工资单查询":"mbSharedOrderPortal\/user?type=payrollCounseling",
"工资单重发":"mbSharedOrderPortal\/user?type=payrollReissue",
"行政报修":"mbSharedOrderPortal\/user?type=adminFacilityRepairs",
"员工信息变更":"mbSharedOrderPortal\/user?type=empMsgUpdate",
"移动端我的订单":"mbSharedOrderPortal\/user?type=myOrder",
"新员工入职":"Default\/NewEmployeelogin",
"离职管理":"employeeResign",
"模板管理":"templateManager",
"台账":"ledgerManager",
"IAM":"am",
"消息平台":"msgService",
"newEmployeeUtil":"newEmployeeUtil",
"customerService":"customerService",
"psaDataUtil":"psaDataUtil",
"account":"account",
"sla":"sla",
"mbworkorder":"mbworkorder",
"messageService":"messageService",
"法大大offer":"fdd-off",
"法大大入职":"fdd-entry"
}

#邮箱配置
mail_host_server = 'xxxxxx'
mail_port='465'
mail_sender = 'xxxxx' 
mail_pwd = 'xxxxx' #邮箱密码
mail_receiver = ['xxxxxx,]

#接口字典
#只用用curl命令检查api接口路径
api_interface_dict={
    "account_login":"https://ipsapro.isoftstone.com/account/apiAccount/security/login",  #账单
    "sla_login":"https://ipsapro.isoftstone.com/sla/security/login",    #工单
    "sharedWorkOrder":"https://ipsapro.isoftstone.com/sharedWorkOrder/order/order_add", #智能工单
    "archivesManage":"https://ipsapro.isoftstone.com/archivesManager/archivesManager/api", #打印归档
    "newEmployeeEntry":"http://i.isoftstone.com/newEmployeeEntry/api", #移动入职
    "mbSharedOrderPortal\/user?type=payrollCounseling":"http://i.isoftstone.com/sidc/mbSharedOrderPortal/user?type=payrollCounseling", #工资单查询
    "mbSharedOrderPortal\/user?type=payrollReissue":"http://i.isoftstone.com/sidc/mbSharedOrderPortal/user?type=payrollReissue", #工资单重发
    "mbSharedOrderPortal\/user?type=adminFacilityRepairs":"http://i.isoftstone.com/sidc/mbSharedOrderPortal/mbSharedOrderPortal/user?type=adminFacilityRepairs", #行政保修
    "mbSharedOrderPortal\/user?type=empMsgUpdate":"http://i.isoftstone.com/sidc/mbSharedOrderPortal/user?type=empMsgUpdate" ,#员工信息变更
    "mbSharedOrderPortal\/user?type=myOrder":"http://i.isoftstone.com/sidc/mbSharedOrderPortal/user?type=myOrder",#移动端我的订单
    "employeeResign":"http://i.isoftstone.com/employeeResign/api",#离职管理
    "ledgerManager":"https://ipsapro.isoftstone.com/ledgerManager/api",#台账管理
    "msgService":"https://ipsapro.isoftstone.com/msgService/api",#消息平台
    "ledgerManager":"https://ipsapro.isoftstone.com/ledgerManager/api",    
}
##账单系统接口
#account_login="https://ipsapro.isoftstone.com/account/apiAccount/security/login"
#account_loginPwd= "xxxxxx"
#account_userName= "admin"
#account_sendOrderUrl="https://ipsapro.isoftstone.com/account/apiAccount/anonymousWeb/synchronizeOrder"
#account_sendWorkOrderUrl="https://ipsapro.isoftstone.com/account/apiAccount/anonymousWeb/synchronizeOrderWorker"
#account_sendWorkOrderInputIUrl="https://ipsapro.isoftstone.com/account/apiAccount/anonymousWeb/synchronizeOrderWorkerInputInfoaccount"

##工单系统接口
#sla_loginPwd= "xxxxxx"
#sla_userName= "admin"
#sla_login="https://ipsapro.isoftstone.com/sla/security/login"
#sla_sendCreateOrderUrl="https://ipsapro.isoftstone.com/sla/slaAnonymousWeb/synchronizeOrder"
#sla_sendCreateWorkOrderUrl="https://ipsapro.isoftstone.com/sla/slaAnonymousWeb/synchronizeOrderWorker"
#sla_sendChangeStatusOrderUrl="https://ipsapro.isoftstone.com/sla/slaAnonymousWeb/synchronizeOrderStatusChange"
#sla_sendChangeStatusWorkOrderUrl="https://ipsapro.isoftstone.com/sla/slaAnonymousWeb/synchronizeOrderWorkerStatusChange"
#sla_orderChangeUrl="https://ipsapro.isoftstone.com/sla/slaAnonymousWeb/synchronizeOrderDeadlineDateChange"


	

#钉钉接口,发送告警用
dingding_token="https:/xxxxxxxxxxxxx"

脚本主程序

main.py,定时执行,每周一-周五执行,每隔10分钟执行

#获取日志易信息,统计查询大于500的错误,发送钉钉
#直接使用curl命令调用接口,大于500,发送钉钉
import subprocess
import requests
import config
import json
import logging
import os
import sys
import re
from datetime import datetime
import time
import pandas as pd
import openpyxl
from openpyxl.styles import Font,Color
import smtplib, ssl
from email.mime.text import MIMEText #邮件正文
from email.mime.multipart import MIMEMultipart #邮件主体
from email.header import Header #邮件头,标题、收件人等
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders


#查询日志易API
#增加表格样式标记
#version 2.0
#yangchao

"""
获取信息
1、获取总量
2、错误数(大于500)
3、平均时长
4、独立访客
5、最大相应时长
6、最小相应时长
7、错误数(4??)

"""

#配置日志
abspath=os.path.abspath(sys.argv[0])
current_path=os.path.dirname(abspath)
logging.basicConfig(filename=current_path+'/info.log',encoding='utf-8',level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')

#格式化时间
time_now=datetime.now()
format_date=time_now.strftime("%Y-%m-%d")


class RiZhiYi_Api():
    """
    通过日志易获取的信息
    """
    def __init__(self) -> None:
        """
        初始化信息
        """
        self.Err5xx_dict={} #保存错误字典
        self.Err5xx_list=[]
        self.dingding_token=config.dingding_token

        self.uri=config.rizhiyi_uri
        self.token=config.rizhiyi_token
        self.username=config.rizhiyi_username
        self.passwd=config.rizhiyi_passwd
        self.url=str(self.uri+self.token+"/"+self.username+"/spl/search")
        #请求头
        self.header={'Content-Type': 'application/json'}
        #Http的base认证
        self.auth=requests.auth.HTTPBasicAuth(self.username,self.passwd)

        #基本查询参数
        #任务名
        self.task_name={"task_name":"my_search"}
        #时间范围,5分钟前
        self.time_range={"time_range":"-10m,now"}
        #从首页显示
        self.page={"page":0}
        #分类
        self.category={"category":"search"}
        #查询语句
        self.query={"query":""}
        #构建查询参数字典
        self.params={}
        self.params.update(self.task_name)
        self.params.update(self.page)
        self.params.update(self.category)
        self.params.update(self.time_range)
        #base认证
        self.credentials=requests.auth.HTTPBasicAuth(self.username,self.passwd)

    
    def GetErr5xx(self,sysname:str)->int:
        """
        获取错误数大于500
        计算错误占比
        sysname:系统名
        """
        sql="logtype:Nginx AND Nginx.url:\/" +sysname+"* AND NOT `JS_CSS_JPG_FONT` AND Nginx.status:5??   | stats count() as cnt"
        self.query["query"]=sql
        self.params.update(self.query)
        try:
            respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
            content=respson.text
            if respson.status_code ==200 and "error_code" not in content:
               #logging.info("查询5xx成功")
               pass
            elif respson.status_code !=200 and "error_code"  in content:
               logging.error("查询5XX失败%s:",content)
            else:
               logging.error("查询5XX失败%s:",content)
  
        except Exception as e:
            logging.error("查询5XX失败%s:",e)
            raise Exception("查询5XX失败%s:",e)
        
        content_dict=json.loads(content)
        #pprint.pprint(content_dict)
        try:
            cnt=content_dict["result"]["sheets"]["rows"][0]["cnt"]
            print(cnt)
        except Exception as e:
            logging.error("查询5XX失败:%s",e)
            raise Exception("查询5XX失败:%s",e)
        info=sysname+":"+str(cnt)
        #logging.info("查询5XX成功:%s",info)
        return int(cnt)
    

    def GetErr5xxUrl(self,sysname:str)->str:
        """
        获取错误数大于500的接口地址
        计算错误占比
        sysname:系统名
        """
        err_info=[]
        sql="logtype:Nginx AND Nginx.url:\/" +sysname+"* AND NOT `JS_CSS_JPG_FONT` AND Nginx.status:5??"
        self.query["query"]=sql
        self.params.update(self.query)
        respson=requests.get(self.url,params=self.params,auth=self.credentials,headers=self.header)
        content=respson.text
        content_dict=json.loads(content)
        #pprint.pprint(content_dict)
        try:
            count=content_dict["result"]["sheets"]["rows"][:]
            if len(count) !=0:
               for i in count:
                  message=i["raw_message"]
                  logging.info("查询5XX信息:%s",message)
                  if re.search("\[\d{2}\/[a-zA-Z]{3}\/\d{4}:\d{2}:\d{2}:\d{2} \+\d{4}\] (\w+) ([^ ]+) \S+ (\d{3})",message):
                    if re.search('"https([^"]*)"',message):
                       message_1=(re.search("\[\d{2}\/[a-zA-Z]{3}\/\d{4}:\d{2}:\d{2}:\d{2} \+\d{4}\] (\w+) ([^ ]+) \S+ (\d{3})",message)).group(0)
                       message_2=(re.search('"https([^"]*)"',message)).group(0)
                       message_new=message_1+" "+message_2
                       err_info.append(message_new)
            
        except Exception as e:
            logging.error("查询5XX信息失败:%s",e)
            raise Exception("查询5XX信息失败:%s",e)     
        str_message='\n'.join(err_info)   
        return str_message
    
    def SendDingDing(self)->None:
        #循环字典,读取指标
        for name,sysname in config.SysNameList.items():
            Err5xx=self.GetErr5xx(sysname)
            Err5xxUrl=self.GetErr5xxUrl(sysname)

            if Err5xx >=3:
                if Err5xxUrl != "":
                    logging.info("查询到5xx 大于3次:%s: %s",sysname,Err5xx)
                    #self.Err5xx_dict[name]="5XX错误次数:"+str(Err5xx)+",系统名:"+sysname +'详细的接口信息:\r'+Err5xxUrl
                    #self.Err5xx_dict[sysname]='详细的接口信息:\r'+Err5xxUrl
                    i=name+",系统名:"+sysname +" 5XX错误次数:"+str(Err5xx)+'\n详细的接口信息:\n'+Err5xxUrl
                    self.Err5xx_list.append(i)
        #error_str=json.dumps(self.Err5xx_dict)
        #error_str=str(self.Err5xx_dict)
                    error_str="\n".join(self.Err5xx_list)
            data={
                "msgtype":"text",
                "text":{
                "content":"根据日志易查询,当前系统过去10分钟,发现5xx错误大于3次!!!\n"+error_str+"\n请立即检查!!!\n" 
                    }
                }
            try:
                requests.post(self.dingding_token,json=data)
            except Exception as e:
                logging.error("通过日志易,发送钉钉消息失败:%s",e)       



class Curl_Api():
    #通过curl直接访问接口,返回
    def __init__(self) -> None:
        #读取配置文件
        self.api_interface_dict=config.api_interface_dict
        self.dingding_token=config.dingding_token
        self.error_list=[]
       
    
    def TestApi(self)->dict:
      for k,v in self.api_interface_dict.items():
        try:
           #command=subprocess.check_output(['curl','-X','GET','-H','"Content-Type: application/json"',v,'--insecure'])
           #command=subprocess.check_output(['curl',v,'--insecure'])
           command=subprocess.check_output(['curl','-I','-s','-w','"%{http_code}',v,'--insecure'])
           command_text=command.decode('utf-8')
           #command_text=command_text.replace("null","None")
           if command_text =="" or command_text==None:
               logging.error("接口没有返回值!:%s",v)
           else:
              if re.match("HTTP/[\d.]+\s+(\d+)",command_text):
                code=(re.match("HTTP/[\d.]+\s+(\d+)",command_text)).group(0)
                code_int=int(code.split(" ")[1].strip())
                #if re.match("\d{3}",code):
                #    code_int=int((re.match("\d{3}",code)).group(0))
                if code_int >=500:
                    self.error_list.append("接口url:"+v+",响应码:"+str(code_int))
                    logging.error("检测响应码大于500的接口:%s,响应码:%s",v,code)
                else:
                    self.error_list.append("接口url:"+v+",响应码:"+str(code_int))
                    logging.info("接口:%s,响应码:%s",v,code)
        except subprocess.CalledProcessError as e:
            logging.error("读取接口失败:%s",e)
            return self.error_list
          
    def SendDingDing(self):
      if len(self.error_list) !=0 :
        #or self.error_list !=[]: 
        #error_str=json.dumps(self.error_dict)
        error_str="\n".join(self.error_list)
        data={
            "msgtype":"text",
            "text":{
             "content":"通过curl命令检查,当前发生接口错误!!!\n"+error_str+"\n请立即检查!!!"
            }
        }
#      else:
#        data={
#            "msgtype":"text",
#            "text":{
#            "content":"测试一下!"
#            }
#        }
        try:
          requests.post(self.dingding_token,json=data)
        except Exception as e:
          logging.error("发送钉钉消息失败%s",e)       
                
 


if __name__=="__main__":
   start_time=time.time()
   logging.info("程序开始运行%s",format_date)
   
   RiZhiyiApi=RiZhiYi_Api()
   CurlApi=Curl_Api()
   RiZhiyiApi.SendDingDing()
   CurlApi.TestApi()
   CurlApi.SendDingDing()
   end_time=time.time()
   logging.info("程序结束运行,共耗时%s秒",str(end_time-start_time))

使用效果

云计算-使用python发送日志易告警_sql_05

云计算-使用python发送日志易告警_Nginx_06

详细的脚本参见代码库

https://gitee.com/yashirochaos/yangchao/tree/master/python3%E5%B7%A5%E4%BD%9C%E4%BD%BF%E7%94%A8/37%E6%97%A5%E5%BF%97%E6%8E%A5%E5%8F%A3%E6%9F%A5%E8%AF%A2


标签:logging,python,self,error,content,Nginx,time,日志,告警
From: https://blog.51cto.com/u_11555417/8386802

相关文章

  • Python+PlayWright+ Pytest + Allure 自动化学习路线
    前言对于自己写过文章的总结,并不代表最好的学习路线还未完结,努力更新中ing建议把每节的实战演练做一下 PlayWrightPlayWright-环境安装PlayWright-如何使用playwrighPlayWrigh-同步和异步运行PlayWright-深入异步PlayWright-元素定位PlayWright-文本输......
  • 如果我有一个项目,我git如何恢复到3个月前的日志,然后再强推到github项目上,但是项目的
       要将Git项目恢复到3个月前的状态,并将最新内容强制推送到GitHub项目上,可以按照以下步骤进行操作:首先,获取项目的提交历史。使用以下命令查看所有的提交记录:bashCopyCodegitlog复制你想要恢复到的目标提交的commithash(提交哈希值)。切换到一个全新的分支......
  • Filebeat 仅收集命名规则为 myapp_20231114.log 这种当月产生的日志,以及 myapp.log 新
    Filebeat仅收集命名规则为myapp_20231114.log这种当月产生的日志,以及myapp.log新产生的当天的日志,可以通过以下配置来实现:filebeat.inputs:-type:logenabled:truepaths:-/data/myapp/logs/myapp_{{now|date:'yyyyMM'}}*.log-/data/myapp/logs/m......
  • 《最新出炉》系列初窥篇-Python+Playwright自动化测试-28-处理日历时间控件-上篇
    1.简介我们在实际工作中,有可能遇到有些web产品,网页上有一些时间选择,然后支持按照不同时间段范围去筛选数据,例如:我们预定火车票或者预定酒店,需要选择发车日期或者酒店的入住与退房时间。宏哥早在之前的12306选出发站就简单的提到过,只不过是一带而过,今天就展开详细介绍一下。网页上......
  • Python+PlayWright+ Pytest + Allure 自动化学习路线
    前言对于自己写过文章的总结,并不代表最好的学习路线还未完结,努力更新中ing建议把每节的实战演练做一下 PlayWrightPlayWright-环境安装PlayWright-如何使用playwrighPlayWrigh-同步和异步运行PlayWright-深入异步PlayWright-元素定位PlayWright-文本输......
  • 定期删除日志shell脚本
    #!/bin/sh#dest:切割日志,只保留30天,每日00:10运行日志目录格式/var/log/YYYY-MM-DD#请赋予脚本执行权限!#crontab-e创建任务写入100***/当前脚本绝对路径#crontab-l查看任务#find递归找,所以删除后会报错找不到,其实已经删除输出重定向即可find/var/log-typ......
  • 用Python计算圆周率π
     fromrandomimportrandomfrommathimportsqrtfromtimeimport*fromtqdmimporttqdmDARTS=10000000hits=0.0clock()foriinrange(1,DARTS+1):x,y=random(),random()dist=sqrt(x**2+y**2)ifdist<=1.0:hits=hits+1pi=4*(hits/DARTS)forii......
  • CATIA——CATIA日志文件路径在哪里?CATIA点击出现黑框闪退,CATIA日志文件在哪里?CATIA启
    背景:CATIA点击出现黑框闪退,CATIA日志文件在哪里?CATIA启动失败,也没有报错,是什么原因?百度之后,说的检查显卡驱动程序、重新安装CATIA、缺少acadres.dll等方法,感觉都不适用。于是看到一条说是让检查CATIA日志,感觉可行。1、CATIA日志文件路径在哪里?(1)C:\Users\zhaojj01\AppData\Loca......
  • Win7安装Python库Pandas
    Win7只能安装Python3.8及以下版本,3.9版本及以上不支持Win7系统。环境:Win764位操作系统下载安装Python3.8.564位软件版本。然后离线安装pandas库。(1)Python下载地址https://www.python.org/downloads/windows/(2)依赖库下载地址https://www.lfd.uci.edu/~gohlke/pythonlibs......
  • Grafana新手教程-实现仪表盘创建和告警推送
    前言最近在使用Grafana的时候,发现Grafana功能比想象中要强大,除了配合Prometheus使用之外,他自身都可以做很多事情,可视化和监控平台,还可以直接根据用户自定义的告警规则完成告警和进行各种通知。于是在深入学习了一段时间之后,整理成此博文。温馨提示,本文约1.3w字,几十张示例图片并......