首页 > 系统相关 >Python监控Nginx 4、7层健康检查

Python监控Nginx 4、7层健康检查

时间:2023-07-20 20:11:40浏览次数:34  
标签:__ strem get Python self list up Nginx 健康检查

[root@acs-hk-ctos7-prod-01 scripts]# cat upstrem.py
#!/usr/bin/env python 
# -*- coding: utf-8 -*-
# @Time    : 2023/6/25 17:18
# @File    : nginx_upstram.py
# @Software: PyCharm

import requests
from urllib.request import urlopen
import os,socket,datetime,hashlib,base64,hmac,urllib
import time



def get_digest():
    timestamp = str(round(time.time() * 1000))
    #测试
    #secret = ''
    #生产
    secret = ''
    secret_enc = secret.encode('utf-8')  # utf-8编码
    string_to_sign = '{}\n{}'.format(timestamp, secret)  # 字符串格式化拼接
    string_to_sign_enc = string_to_sign.encode('utf-8')  # utf-8编码
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()  # HmacSHA256算法计算签名
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))  # Base64编码后进行urlEncode
    #  返回时间戳和计算好的编码拼接字符串,后面直接拼接到Webhook即可
    return f"&timestamp={timestamp}&sign={sign}"

def get_system_info():
    # 系统时间
    dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    # 系统公网IP
    ip = requests.get('http://ifconfig.me').text.strip()

    # 内网IP
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(('8.8.8.8', 80))
    local_ip = s.getsockname()[0]
    s.close()

    #系统load
    system_load=os.popen("uptime|awk '{print $NF,$NF-1,$NF-2}'")
    load=system_load.read().strip()

    host_name=socket.gethostname()
    return dt, ip, local_ip,load,host_name


class Send_Dingding:
    def __init__(self, title,upst,stra):
        self.title = title
        self.upst=upst
        self.stra=stra

    def send_dingnding(self):
        MSG = "**告警主题**:" + "\n" + self.title + "\n\n" \
        ">当前时间:" + "\n" + get_system_info()[0] + "\n\n"\
        " >当前主机名:"  + "\n" + get_system_info()[4] + "\n\n" \
         " >当前服务器IP:" + "\n" + get_system_info()[1] + "\n" + get_system_info()[2] +"\n\n" \
         " >当前系统load:" + "\n" + str(get_system_info()[3]) + "\n\n" \
        "七层异常:" + "\n" + "\n" + str(self.upst) + "\n\n"\
        "四层异常:" + "\n" + "\n" + str(self.stra) + "\n\n"\


        data = {
            "msgtype": "markdown",
            "markdown": {
                "title": self.title,
                "text": MSG,

            },
            "at": {
                "atMobiles": [
                    "@"
                ],
                "isAtAll": False
            }
        }

# 测试
        #webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token='
# 生产
        webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token='
# 利用requests发送post请求
        rec = requests.post(webhook_url + get_digest(), json=data)



class Get_target:
    def __init__(self, url):
        self.url = url

    def Get_Upstream_Target(self):
        upstre = requests.get(self.url)
        upstre_json_str = upstre.json()
        up_list=[]
        for up_stre in upstre_json_str["servers"]["http"]:
            if up_stre["status"] == "down":
                up_list.append([up_stre["upstream"],up_stre["name"],up_stre["status"]])
                #up_text="Upstream:{}\tstaus: {}\thost: {}".format(up_stre["upstream"], up_stre['status'],up_stre["name"])
        if len(up_list) == 0:
            return  "程序正常"
        else:
            return up_list


    def Get_Stream_Target(self):
        stream = requests.get(self.url)
        stream_json = stream.json()
        st_list=[]
        for strem in stream_json['servers']["stream"]:
            if strem["status"] == "down":
                st_list.append([strem["upstream"],strem["status"],strem["name"]])
                #return "stream: {}\tstatus: {} \thost: {}".format(strem["upstream"], strem["status"], strem["name"])
        if len(st_list) == 0:
            return  "程序正常"
        else:
            return st_list


if "__name__ == __main__":
    res = Get_target('http:///status?format=json')
    up = res.Get_Upstream_Target()
    st = res.Get_Stream_Target()

    if st and up != "程序正常":
        send=Send_Dingding("香港集群环境",up,st)
        send.send_dingnding()
    else:
        print("不告警")

  

标签:__,strem,get,Python,self,list,up,Nginx,健康检查
From: https://www.cnblogs.com/sseban/p/17569539.html

相关文章

  • Python基础day49
    overflow溢出属性值描述visible默认值。内容不会被修剪,会呈现在元素框之外hidden内容会被修剪,并且其余内容是不可见的scroll内容会被修剪,但是浏览器会显示滚动条以便查看其余的内容auto如果内容被修剪,则浏览器会显示滚动条以便查看其余的内容inherit规定......
  • PYTHON 没有实体类做修改
    PYTHON没有实体类做修改在Python中,我们经常需要对数据进行操作和修改。有时候,我们可能需要修改一个已经存在的数据对象,或者需要创建一个新的数据对象来存储修改后的数据。在其他编程语言中,通常会使用实体类来实现这些操作。实体类是一个包含属性和方法的数据结构,可以用来表示现实......
  • Linux python 查找模块和版本号
    LinuxPython查找模块和版本号作为一名经验丰富的开发者,你需要教会一位刚入行的小白如何在Linux环境下使用Python查找模块和版本号。以下是一份详细的步骤和相应的代码注释,帮助他完成这个任务。步骤步骤描述步骤一打开终端步骤二运行Python交互式解释器步骤三......
  • Python字典换行
    Python字典换行的实现作为一名经验丰富的开发者,我将教会你如何实现Python字典换行。在本文中,我将详细介绍整个过程,并提供每一步所需的代码和代码注释。流程概述实现Python字典换行的过程可以分为以下几个步骤:创建一个字典。将字典的键值对分为多行。编写代码实现字典换行。......
  • VM虚拟机配置python环境
    如何配置Python环境的虚拟机引言在软件开发中,配置适当的开发环境是非常重要的。Python作为一种常用的编程语言,为了更好地管理项目和依赖,我们经常使用虚拟机来创建独立的Python环境。本文将为刚入行的开发者介绍如何配置Python环境的虚拟机。整体流程下面的表格展示了配置Python......
  • Python字典编码
    Python字典编码的实现介绍在Python中,字典是一种非常重要和常用的数据结构。字典可以存储键值对的数据,通过键来快速访问和修改对应的值。字典编码是将一个字典转换为字符串的过程,这样可以方便地将字典存储到文件中或者传输给其他系统。本文将介绍Python字典编码的实现步骤以及具......
  • Python中字符串可以直接用大于号小于号吗
    Python中字符串可以直接用大于号小于号吗在Python中,字符串是一种不可变的数据类型。我们可以使用双引号或单引号来定义字符串,比如"HelloWorld"或'Pythonisawesome'。字符串在Python中非常常用,因为它们可以包含文本和字符数据。但是,对于字符串来说,我们不能直接使用大于号(>)和小......
  • Python中下顿号咋打
    如何在Python中输入下顿号作为一名经验丰富的开发者,我很高兴能够帮助你解决这个问题。下面是实现“Python中下顿号”的步骤:步骤操作步骤1安装Python开发环境步骤2创建Python脚本文件步骤3导入所需的模块步骤4使用下顿号现在让我们逐步进行操作。......
  • Python中如何将列表偶数值求和
    Python中如何将列表偶数值求和在实际的编程过程中,我们经常会遇到需要对一个列表中的偶数值进行求和的问题。Python作为一门强大的编程语言,提供了多种方法来解决这个问题。本文将介绍几种常用的方法,并提供示例来解决一个实际问题。问题描述假设我们有一个包含整数的列表,我们需要......
  • Python中如何查找到空格结束
    Python中如何查找到空格结束在Python中,我们经常需要处理字符串,并从中提取出特定的内容。如果我们想要从一个字符串中查找到空格结束,即从空格开始的位置,一直到下一个空格之前的内容,可以使用一些方法来解决。方法一:使用split()函数Python中的split()函数可以将字符串分割成一个列......