请参考 https://www.cuiliangblog.cn/detail/article/17
python使用requests模块获取API信息
prometheus 的 API接口
/api/v1/targets
/api/v1/query?query=<expr>
/api/v1/query_range?query=<expr>&start=<startstamp>&end=<endstamp>&step=<step>
范例01
class Monitor:
"""
获取prometheus监控数据
"""
def __init__(self):
# prometheus地址
self.usr = PROMETHEUS_URL
# up状态节点列表
self.up_list = []
# down状态节点列表
self.down_list = []
def target(self):
"""
获取监控节点
:return:
"""
url = self.usr + '/api/v1/targets'
response = requests.request('GET', url)
if response.status_code == 200:
targets = response.json()['data']['activeTargets']
for target in targets:
if target['health'] == 'up':
self.up_list.append(target['discoveredLabels']['__address__'])
else:
self.down_list.append(target['discoveredLabels']['__address__'])
return self.up_list
else:
print('Get targets status failed!')
return None
范例02
import requests
from ops_py.settings import PROMETHEUS_URL
class Monitor:
"""
获取prometheus监控数据
"""
def __init__(self):
# prometheus地址
self.usr = PROMETHEUS_URL
# up节点列表
self.up_list = []
# down节点列表
self.down_list = []
def getQueryValue(self, query):
"""
执行查询语句(查询单个值)
:param query: 查询的语句
:return: 查询到的值
"""
base_url = self.usr + 'api/v1/query?query='
inquire = base_url + query
print(inquire)
response = requests.request('GET', inquire)
if response.status_code == 200:
result = response.json()['data']['result'][0]
# print("原始查询结果:", result)
return result
else:
return None
def get_os_release(self, address):
"""
获取系统内核版本
:param address:
:return:
"""
query = 'node_uname_info{job="linux",instance="' + address + '"}'
result = self.getQueryValue(query)
value = result['metric']['release']
return value
def get_up_time(self, address):
"""
获取系统启动时长
:return:
"""
query = 'sum(time()-node_boot_time_seconds{job="linux",instance="' + address + '"}) by (instance)'
result = self.getQueryValue(query)
value = int(float(result['value'][1]))
time_hour, time_sec = divmod(value, 3600)
time_day, time_hour = divmod(time_hour, 24)
time_mon, time_day = divmod(time_day, 30)
return str(time_mon) + '月 ' + str(time_day) + '天 ' + str(time_hour) + '小时'
def get_cpu_cores(self, address):
"""
获取CPU核心数
:param address:
:return:
"""
query = 'count(node_cpu_seconds_total{job="linux",mode="system",instance="' + address + '"}) by (instance)'
result = self.getQueryValue(query)
value = result['value'][1]
return value
范例03
import time
import requests
from ops_py.settings import PROMETHEUS_URL
class Monitor:
"""
获取服务器监控数据
"""
def __init__(self):
# prometheus地址
self.usr = PROMETHEUS_URL
# up节点列表
self.up_list = []
# down节点列表
self.down_list = []
def timeQuery(self, start_time, end_time):
"""
范围查询语句构造时间查询格式
:param start_time:
:param end_time:
:return:
"""
start = int(time.mktime(time.strptime(start_time, "%Y-%m-%d %H:%M:%S")))
end = int(time.mktime(time.strptime(end_time, "%Y-%m-%d %H:%M:%S")))
step = int((end - start) / 9)
return '&start=' + str(start) + '&end=' + str(end) + '&step=' + str(step)
def target(self):
"""
获取监控节点
:return:
"""
url = self.usr + '/api/v1/targets'
response = requests.request('GET', url)
if response.status_code == 200:
targets = response.json()['data']['activeTargets']
for target in targets:
if target['health'] == 'up':
self.up_list.append(target['discoveredLabels']['__address__'])
else:
self.down_list.append(target['discoveredLabels']['__address__'])
return self.up_list
else:
print('Get targets status failed!')
return None
def getQueryRange(self, query, time_range):
"""
执行查询语句(查询时间范围)
:param time_range: 查询时间范围
:param query: 查询的语句
:return: 查询到的值
"""
base_url = self.usr + 'api/v1/query_range?query='
inquire = base_url + query + time_range
print(inquire)
response = requests.request('GET', inquire)
if response.status_code == 200:
result = response.json()['data']['result']
# print("原始查询结果:", result)
return result
else:
return None
def get_cpu_use_rate(self, *params):
"""
获取CPU使用率
查单个值——(address)
查所有资源图表值——(start_time,end_time)
:return:
"""
if len(params) == 1:
address = params[0]
query = 'avg(rate(node_cpu_seconds_total{job="linux",instance="'+address+'",mode="user"}[2m])) by (instance) *100'
result = self.getQueryValue(query)
value = round(float(result['value'][1]), 2)
return str(value) + '%'
elif len(params) == 2:
query = 'avg(rate(node_cpu_seconds_total{job="linux",mode="user"}[2m])) by (instance) *100'
time_range = self.timeQuery(params[0], params[1])
result = self.getQueryRange(query, time_range)
return result
else:
print('异常参数')
标签:return,Python,self,address,prometheus,result,time,query,告警
From: https://www.cnblogs.com/faberbeta/p/16932674.html