* coding: UTF-8 *
"""
@project -> file : city-test -> swagger_api_parse_backup
@Author : qinmin.vendor
@Date : 2023/1/12 17:55
@Desc :
"""
import copy
import json
import os.path
from utils.operation_datas import operationExcle
from utils.request_main import requestMain
from utils.data_util import dataUtil
class swaggerApiParse():
excel_path='./'
excle_name='maint-api.xlsx'
requst=requestMain()
_data=dataUtil()
url='你的swagger对应的api地址'
api_data=requst.get_main(url=url,data=None,headers=None).json()
api = api_data['paths'] # 取swagger文件内容中的path,文件中path是键名
api_uri_list = [] # 创建接口地址空列表
api_method_list = [] # 创建请求方式空列表
api_name_list = [] # 创建接口描述空列表
api_params_path_list = []
api_params_path_list_descs = []
api_params_body_list = []
api_params_body_list_descs = []
api_uri=""
# 定义接口参数描述中各个类型的默认值
api_params_ref_result_properties_type_default={
"array":[],
"string":"",
"int":0,
"integer":0,
"number":0,
"null":None,
"float":0.0,
"double":0.0,
"bool":False,
"boolean":False,
"object":{},
"set":{},
}
@classmethod
def get_info_info(cls):
'''解析api数据'''
def dispose_api_params_desc_result(api_params,api_params_ref_result,api_data):
'''处理请求参数中的key,转为jsonpath提取数据并返回'''
def replece_key(api_params_ref_json_path,api_data,split_symbol='.'):
'''处理引用为标准的jsonpath,并替换原dict key'''
if api_params_ref_json_path.startswith('$.'):
api_params_ref_json_path_split = api_params_ref_json_path.split(split_symbol)
# print("api_params_ref_json_path:",api_params_ref_json_path)
# print("api_params_ref_json_path_split:",api_params_ref_json_path_split)
if api_params_ref_json_path_split[1] in api_data.keys():
for _key in list(api_data[api_params_ref_json_path_split[1]].keys()):
if str(_key).replace("«","").replace("»","").startswith(api_params_ref_json_path_split[2]):
# 问题:/v1/devices/gateway/call : jsonpath出现空格, /v1/stakes/{sectionId}/list-page jsonpath出现: "«" 、"»",没有写入api-params数据
# 原因:api_data与cls.api_data是共用的一个内存地址,所以只要当当前的api触发key修改后,后面的api读取到的数据也是修改后的
if not api_data[api_params_ref_json_path_split[1]].get(api_params_ref_json_path_split[2]):
api_data[api_params_ref_json_path_split[1]][api_params_ref_json_path_split[2]] = \
api_data[api_params_ref_json_path_split[1]][_key]
del api_data[api_params_ref_json_path_split[1]][_key]
return api_data
api_params_ref_is_array=False #标识接口参数是否为array
if not api_params:
return "描述key为空",api_params_ref_result,api_params_ref_is_array
'''解析请求参数为json数据的接口'''
api_params_ref = api_params['schema']['$ref'] if api_params.get('schema') and api_params.get('schema').get(
'$ref') else api_params
if not str(api_params_ref).startswith('#'):
'''解析请求参数格式为非json的数据,例如:array[object]'''
# print(api_params)
api_params_ref_type = api_params['schema']['type'] if api_params.get('schema') and api_params.get(
'schema').get('type') else {}
if api_params_ref_type:
if api_params_ref_type in cls.api_params_ref_result_properties_type_default.keys():
api_params_ref = api_params['schema']['items']['$ref'] if api_params.get(
'schema') and api_params.get('schema').get('items') else api_params
if api_params_ref_type == 'array':
api_params_ref_is_array = True
else:
api_params_ref_type = api_params['type'] if api_params.get('type') and api_params.get(
'items') else {}
if api_params_ref_type:
if api_params_ref_type in cls.api_params_ref_result_properties_type_default.keys():
api_params_ref = api_params['items']['$ref'] if api_params.get(
'items') and api_params.get('items').get('$ref') else api_params
if api_params_ref_type == 'array':
api_params_ref_is_array = True
api_params_ref_json_path = str(api_params_ref).replace('#', '$').replace('/', '.')
if " " in api_params_ref_json_path:
api_params_ref_json_path_all = api_params_ref_json_path.split(' ')
api_params_ref_json_path = api_params_ref_json_path_all[0]
api_data=replece_key(api_params_ref_json_path=api_params_ref_json_path,api_data=api_data)
elif "«" in api_params_ref_json_path or "»" in api_params_ref_json_path:
api_params_ref_json_path=api_params_ref_json_path.replace("«","").replace("»","")
api_data=replece_key(api_params_ref_json_path=api_params_ref_json_path,api_data=api_data)
if api_params_ref_json_path.startswith("$."):
api_params_ref_result = cls._data.json_path_parse_public(json_obj=api_data, json_path=api_params_ref_json_path)
# print(api_params_ref_result)
api_params_ref_result = api_params_ref_result[0] if api_params_ref_result and len(api_params_ref_result)==1 else api_params_ref_result
return api_params_ref,api_params_ref_result,api_params_ref_is_array
def api_params_desc_convert_to_params(api_params_ref_result:dict,api_data:dict):
'''将接口示例对象转换为可用的请求参数'''
def recursion_get_api_child_parms(properties_key, api_params_ref_result):
'''递归解析子节点参数信息'''
if api_params_ref_result_properties[i].get('items'):
api_params_ref, api_params_ref_result, api_params_ref_is_array = dispose_api_params_desc_result(
api_params=api_params_ref_result_properties[i],
api_params_ref_result=api_params_ref_result, api_data=api_data)
# api_params_ref没有#时,代表不再有json参数嵌套,不在调用
if str(api_params_ref).startswith("#/"):
api_params = api_params_desc_convert_to_params(api_params_ref_result=api_params_ref_result,
api_data=cls.api_data)
properties_child_dict = {properties_key: api_params}
return api_params_ref_is_array,properties_child_dict
return False,{}
_api_params_ref_result_properties_dict = dict()
if (not api_params_ref_result) or isinstance(api_params_ref_result,str) :
return _api_params_ref_result_properties_dict
api_params_ref_result_format=api_params_ref_result['type'] if api_params_ref_result.get('type') else api_params_ref_result
if api_params_ref_result_format=='object':
api_params_ref_result_properties=api_params_ref_result['properties'] if api_params_ref_result.get('properties') else api_params_ref_result
for i in api_params_ref_result_properties:
if api_params_ref_result_properties[i].get('items'):
api_params_ref_is_array,properties_child_dict=recursion_get_api_child_parms(properties_key=i,api_params_ref_result=api_params_ref_result)
if api_params_ref_is_array:
for cd in properties_child_dict:
if not isinstance(properties_child_dict[cd],(list,tuple,set)):
properties_child_dict[cd]=[properties_child_dict[cd]]
_api_params_ref_result_properties_dict.update(properties_child_dict)
api_params_ref_result_properties_type=api_params_ref_result_properties[i]['type'] if \
api_params_ref_result_properties[i].get('type') else 'string'
_api_params_ref_result_properties_dict[i]=\
cls.api_params_ref_result_properties_type_default[api_params_ref_result_properties_type]
# print(_api_params_ref_result_properties_dict)
return _api_params_ref_result_properties_dict
def dispost_output_api_params(api_params,api_params_type,api_params_ref_result):
'''api_params_type跟据输出最终需要的api_params'''
if not api_params:
return "描述key为空",[],{}
__api_params=copy.deepcopy(api_params)
api_params=[i for i in api_params if api_params_type in i.values()]
if api_params_type=='body':
api_params = api_params[0] if api_params else {}
'''处理body参数'''
api_params_ref,api_params_ref_result,api_params_ref_is_array=dispose_api_params_desc_result(
api_params=api_params,api_params_ref_result=api_params_ref_result,api_data=cls.api_data)
# #将api参数转换为最终需要的格式
api_params_out=api_params_desc_convert_to_params(api_params_ref_result=api_params_ref_result,api_data=cls.api_data)
if api_params_ref_is_array and ( not isinstance(api_params_out,(list,tuple))):
api_params_out=[api_params_out]
return api_params_ref,api_params_ref_result,api_params_out
elif api_params_type in ('path','query'):
'''处理path参数'''
api_params_ref="参数为path的请求没有描述key(不存在外部引用)"
if not api_params:
if api_params_type=='path':
api_params_type='query'
elif api_params_type=='query':
api_params_type='path'
api_params = [i for i in __api_params if api_params_type in i.values()]
api_params_ref_result=api_params
api_params_out=dict()
for api_param_dict in api_params:
params_type=api_param_dict['type'] if api_param_dict.get('type') else 'string'
params_filed=api_param_dict['name'] if api_param_dict.get('name') else None
if params_filed :
api_params_out[params_filed]=cls.api_params_ref_result_properties_type_default[params_type]
return api_params_ref,api_params_ref_result,api_params_out
for path in cls.api.keys(): # 循环取key
values = cls.api[path] # 根据key获取值
cls.api_uri=path
api_params_ref_result = "描述结果为空" # 保存请求示例中的额外嵌套的信息
for i in values.keys():
cls.api_uri_list.append(path) # 将path写入接口地址列表中
api_method = i # 获取请求方式,文件中请求方式是key
cls.api_method_list.append(api_method) # 将method写入请求方式列表中
tags = values[api_method]['tags'][0] # 获取接口分类
api_params=values[api_method]['parameters'] if values[api_method].get('parameters') else {}
api_params_ref_path,api_params_ref_result_path,api_params_path=dispost_output_api_params(
api_params=api_params,api_params_type='path',api_params_ref_result=api_params_ref_result)
api_params_ref_body,api_params_ref_result_body,api_params_body=dispost_output_api_params(
api_params=api_params,api_params_type='body',api_params_ref_result=api_params_ref_result)
cls.api_params_path_list_descs.append(f"{api_params_ref_path}:->>{json.dumps(api_params_ref_result_path,ensure_ascii=False)}")
cls.api_params_path_list.append(json.dumps(api_params_path,ensure_ascii=False))
cls.api_params_body_list_descs.append(f"{api_params_ref_body}:->>{json.dumps(api_params_ref_result_body,ensure_ascii=False)}")
cls.api_params_body_list.append(json.dumps(api_params_body,ensure_ascii=False))
summary = values[api_method]['summary'] # 获取接口描述
cls.api_name_list.append(tags + '_' + summary)
@classmethod
def rm_old_file(cls):
'''删除旧数据,避免数据写入重复'''
if os.path.isfile(f'{os.path.join(cls.excel_path,cls.excle_name)}'):
print(f'删除文件:{os.path.join(cls.excel_path,cls.excle_name)}')
os.remove(f'{os.path.join(cls.excel_path,cls.excle_name)}') #删除文件
## shutil.rmtree('./') #删除文件夹及子目录下的所有文件
@classmethod
def write_data_to_excle(cls):
'''将api信息写入到excel'''
op_ex = operationExcle(excle_path=cls.excel_path, excle_name=cls.excle_name)
# 写入excel首行
first_line=['case','uri','method','params_path','params_path_desc','params_body','params_body_desc','expected','skip']
op_ex.write_values(first_line)
# # 写入api信息到excle
for i in range(len(cls.api_uri_list)): # 将接口path循环写入第一列
api_infos=[cls.api_name_list[i],cls.api_uri_list[i],
cls.api_method_list[i],cls.api_params_path_list[i],
cls.api_params_path_list_descs[i],cls.api_params_body_list[i],
cls.api_params_body_list_descs[i],'{"code": 200}',0]
op_ex.write_values(api_infos)
op_ex.save_workbook(cls.excle_name) # 保存文件
if __name__=="__main__":
sa=swaggerApiParse()
sa.get_info_info()
sa.rm_old_file()
sa.write_data_to_excle()
用到的依赖
operationExcle:基于openpyxl封装
import demjson
import os
import openpyxl
demjson:可以处理不规则的json字符串
常用的两个方法, 一个是 encode, 一个是 decode
encode 将 Python 对象编码成 JSON 字符串
decode 将已编码的 JSON 字符串解码为 Python 对象
class operationExcle(object):
def __init__(self, excle_path, excle_name):
self.excle_path=excle_path
self.excle_name=excle_name
if self.excle_path is None:
self.excle_path = os.path.join(os.path.dirname(__file__), '../config')
if self.excle_name is None:
self.excle_name='DataCase_ALL.xlsx'
self.file_address = os.path.join(self.excle_path, self.excle_name)
# # 类实例化时即完成对工作薄对象进行实例化,访问写入的数据不能立马生效的问题
if not os.path.exists(self.excle_path):
os.makedirs(self.excle_path)
if not os.path.exists(self.file_address):
# 创建工作薄
new_book = openpyxl.Workbook()
new_book.save(self.file_address)
self.workbook=openpyxl.load_workbook(self.file_address,data_only=True) #data_only=True,读取数据时保留exlce单元格格式
self.data=self.get_data()
self.__lines=self.get_lines()
# 写入数据
def write_value(self, row, col, value):
'''写入excle数据row,col,value'''
# 设定单元格的值,三种方式
# sheet.cell(row=2, column=5).value = 99
# sheet.cell(row=3, column=5, value=100)
# ws['A4'] = 4 # write
data=self.get_data()
data.cell(row=row,column=col).value=value
# self.save_workbook()
def write_values(self,values:(list,tuple,dict)):
'''
使用list、tuple、dict批量写入数据至excle
Args:
values:
Returns:
'''
data=self.get_data()
data.append(values)
# self.save_workbook()
#保存
def save_workbook(self,file_address=None):
if file_address is None:
file_address=self.file_address
else:
try:
file_address=os.path.join(self.excle_path,file_address)
except BaseException as error:
print("保存的文件路径还是不要乱来哦")
raise Exception(error)
# print(file_address)
self.workbook.save(file_address)
# 获取工作簿对象并返回,property将方法转换为属性
# 获取sheets的内容
def get_data(self,sheet_id=None):
data=self.workbook
if sheet_id is None:
sheet_id=0
else:
sheet_id=int(sheet_id)
# raise TypeError("sheet_id类型错误")
tables = data[(data.sheetnames[sheet_id])]
# 返回数据前先保存关闭
return tables
# 获取单元格的行数
def get_lines(self,isRow=True):
tables = self.data
'''max_row:获取行数,max_column:获取列数
isRow:默认获取最大行数,flase:获取最大列数'''
if isRow:
lines=tables.max_row
else:
lines = tables.max_column
return lines
# 获取有效的行数,过滤一行数据全是None获取空格的情况
def get_valid_lines(self):
valid_lines=0
for i in range(1,self.__lines+1):
line_data=self.get_row_data(i)[0] #行数据第一个字段为空或者为空格,当做无效行
if line_data and str(line_data).strip():
valid_lines+=1
else:
break
self.__lines=valid_lines
return self.__lines
# 获取某一个单元格的内容
def get_cell_value(self, row, col):
cell_value=self.data.cell(row=row,column=col).value
# 也可以使用:cell_value=self.data['A1'].value
return cell_value
# 根据行列返回表单内容
# 根据对应的caseid找到对应行的内容
def get_row_data(self, case_id):
row_num = self.get_row_num(case_id)
rows_data = self.get_row_values(row_num)
return rows_data
# 根据对应的caseid找到相应的行号
def get_row_num(self, case_id):
'''用例起始行为2,所以这里需要指定now初始值为2'''
try:
num = 2
cols_data = self.get_cols_data(1)
cols_data=[int(i) for i in cols_data]
try:
case_id=int(case_id)
if case_id<=self.__lines:
if cols_data:
for col_data in cols_data:
if case_id == col_data:
return num
num = num + 1
else:
return None
else:
print('依赖caseId不能大于用例总行数')
return None
except TypeError as typeerror:
# print(typeerror)
return None
except (ValueError,TypeError) as e:
# print("excle 第一行内容不是int")
return case_id
# 根据行号找到该行内容
def get_row_values(self, row):
cols=self.get_lines(isRow=False)#获取最大列
rowdata=[]
for i in range(1,cols+1):
cellvalue=self.data.cell(row=row,column=i).value
rowdata.append(cellvalue)
return rowdata
# 获取某一列的内容
def get_cols_data(self, col=None):
rows = self.__lines#获取最大行
columndata=[]
for i in range(2,rows+1):
if col != None:
cellvalue = self.data.cell(row=i,column=col).value
else:
cellvalue=self.data.cell(row=i,column=1).value
columndata.append(cellvalue)
return columndata
requestMain:基于request封装
import json
import traceback
import requests
from utils.operation_logging import operationLogging
class requestMain(object):
# requests.packages.urilib3.disable_warnings() #禁用提醒
log=operationLogging()
# instance=None
# def __new__(cls, *args, **kwargs):
# if not cls.instance:
# cls.__session = requests.session()
# cls.request = None
# cls.__response = None
# cls.url = None
# cls.instance=super().__new__(cls)
# return cls.instance
def __init__(self):
self.__session = requests.session()
self.request = None
self.__response = None
self.url = None
@classmethod
def check_headers_files(cls,files,headers):
'''
检查headers与files的格式是否合规
'''
if not (files and len(files) <= 4 and isinstance(files, dict)):
files=None
if not headers:
headers=None
return files,headers
def get_main(self, url, data, headers, params=None, files=None,cookies=None): # 封装get请求
# verify:验证——(可选)要么是布尔型,在这种情况下,它控制我们是否验证服务器的TLS证书或字符串,在这种情况下,它必须是通往CA捆绑包的路径。默认值为True
# res=self.__session.get(uri=uri,params=data,headers=headers,verify=false)
# get请求请求参数尽量不要编码,防止会有一些错误,这里手动处理一下错误
try:
res = self.__session.get(url=url, params=params,data=data, headers=headers, files=files, verify=False,cookies=cookies)
return res
except:
self.log.log_main('error', False, None, f"get请求发生错误:{traceback.format_exc()}")
def post_main(self,url, data, headers, params=None, files=None,cookies=None): # 封装post请求
try:
res = self.__session.post(url=url, params=params,data=data,headers=headers, files=files, verify=False,cookies=cookies)
return res
except:
self.log.log_main('error', False, None, f"post请求发生错误:{traceback.format_exc()}")
def put_main(self,url, data, headers, params=None, files=None,cookies=None): # 封装put请求
try:
res = self.__session.put(url=url, params=params,data=data, headers=headers, files=files, verify=False,cookies=cookies)
return res
except:
self.log.log_main('error', False, None, f"put请求发生错误:{traceback.format_exc()}")
def delete_main(self,url, data, headers, params=None, files=None,cookies=None): # 封装put请求
try:
res = self.__session.delete(url=url, params=params,data=data, headers=headers, files=files, verify=False,cookies=cookies)
return res
except:
self.log.log_main('error', False, None, f"delete请求发生错误:{traceback.format_exc()}")
def run_main(self, host,uri, method,params=None,body=None, headers=None, files=None,cookies=None, res_format='json',is_log_out=False): # 封装主请求
'''参数1:请求方式,参数2:请求data,参数3:请求信息头,参数4:返回的数据格式'''
# 相关源码:
# ''' :param files: (optional) Dictionary of ``'name': file-like-objects`` (or ``{'name': file-tuple}``) for multipart encoding upload.
# ``file-tuple`` can be a 2-tuple ``('filename', fileobj)``, 3-tuple ``('filename', fileobj, 'content_type')``
# or a 4-tuple ``('filename', fileobj, 'content_type', custom_headers)``, where ``'content-type'`` is a string
# defining the content type of the given file and ``custom_headers`` a dict-like object containing additional headers
# to add for the file. '''
# files参数示例:
# files={'file': ('git.docx', open('C:/Users/Acer/Downloads/git.docx', 'rb'))}
if not (host.startswith('http://') or host.startswith('https://')) :
self.log.log_main('error', False, None, f"host:{host},格式错误")
raise Exception(f"host:{host},格式错误")
if host.endswith('/') and uri.startswith('/'):
host=host[:-1]
if not (host.endswith('/') or uri.startswith('/')) :
uri='/'+uri
if str(body).startswith("{") and str(body).endswith("}") and isinstance(body,dict):
body=json.dumps(body)
files,headers=self.check_headers_files(files=files,headers=headers)
self.url=host+uri
if method.lower() == 'get' :
res = self.get_main(url=self.url, params=params, data=body, headers=headers,files=files,cookies=cookies)
elif method.lower() == 'post' :
res = self.post_main(url=self.url, params=params, data=body, headers=headers, files=files,cookies=cookies)
elif method.lower() == 'put' :
res = self.put_main(url=self.url, params=params, data=body, headers=headers, files=files,cookies=cookies)
elif method.lower() == 'delete' :
res = self.delete_main(url=self.url, params=params, data=body, headers=headers, files=files,cookies=cookies)
else:
self.log.log_main('error',False,None,f"不支持的请求方式:{method}")
# res=None
raise Exception("暂不支持的请求方式")
self.request=res
# self.log.log_main('info',False,None,f"request:{self.request}")
try:
if res_format.lower() == 'json' : # 以json格式返回数据
'''ensure_ascii:处理json编码问题(中文乱码),separators:消除json中的所有空格'''
response = res.json()
elif res_format.lower() in ('text','str'): # 以文本格式返回数据
response = res.text
elif res_format.lower() == 'content' : # 以二进制形式返回响应数据
response = res.content
else: # 以json格式返回数据
response = res.json()
self.__response=response
return response
except BaseException as e:
if is_log_out:
self.log.log_main('error', False, None, traceback.format_exc())
return res.text
finally:
if is_log_out:
self.log.log_main('info', False, None, f'请求url:{self.url}')
self.log.log_main('info', False, None, f'请求method:{method}')
self.log.log_main('info', False, None, f'请求body:{body}')
self.log.log_main('info', False, None, f'请求headers:{headers}')
self.log.log_main('info', False, None, f'请求响应:{self.__response}')
self.log.log_main('info', False, None, f'请求响应code:{self.request.status_code}')
def get_response_with_status_code(self):
return int(self.request.status_code)
def get_response(self):
return self.__response
if __name__ == '__main__':
r = requestMain()
dataUtil:个人封装的一些常用的数据处理的方法
import traceback
from utils.operation_logging import operationLogging
from jsonpath_rw_ext import parse
from utils.time_utils import timeUtil
class dataUtil(timeUtil):
log=operationLogging()
# 返回依赖数据
def depend_data_parse(self,dependkey,response_data,index='one'):
__dict={}#存放字典
'''处理依赖'''
if dependkey:
# 匹配字典key
depend_data_index = dependkey.rfind('.')
depend_data_str = dependkey[depend_data_index + 1:]
try:
math_value = self.json_path_parse_public(json_path=dependkey,json_obj=response_data)
if math_value:
if index=='one':
math_value=math_value[0]
__dict[depend_data_str]=math_value
return __dict
else:
return None
except IndexError as indexerror:
return None
else:
return None
# 根据jsonpath表达式获取json对象公共方法,部分功能还需要测试
def json_path_parse_public(self,json_path,json_obj,get_index:bool=False):
if json_path:
# 定义要获取的key
# 定义响应数据,key从响应数据里获取
# print(madle)
# math.value返回的是一个list,可以使用索引访问特定的值jsonpath_rw的作用就相当于从json里面提取响应的字段值
try:
json_exe = parse(json_path)
madle = json_exe.find(json_obj)
math_value = [i.value for i in madle]
if get_index:
return math_value[0]#返回匹配结果第0个元素
return math_value
except IndexError :
self.log.log_main('info',False,None,f"json_obj:{json_obj}提取失败,json_path:{json_path}")
return []
except Exception :
self.log.log_main('error', False, None, f"json_obj:{json_obj}提取失败,json_path:{json_path},具体错误,{traceback.format_exc()}")
return []
else:
return []
if __name__ == "__main__":
du=dataUtil()
print(du.json_path_parse_public(json_path="$.definitions.Page«object»", json_obj={}))
operationLogging:基于loguru封装
import os
from utils.time_utils import timeUtil
from loguru import logger
# 发生Error日志时,发邮件进行警报
from utils.send_email import sendEmail
class operationLogging():
__time = timeUtil()
__email = sendEmail()
def __init__(self):
self.__log_path = self.log_path()
def log_path(self):
"""Get log directory"""
path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "log")
if not os.path.exists(path):
os.mkdir(path)
return path
@classmethod
def json_log_msg(cls,*log_msg):
try:
if not isinstance(log_msg,(tuple,list,set)):
return log_msg
log_msg=','.join(log_msg)
except:
log_msg=','.join([str(i) for i in log_msg])
finally:
return log_msg
def log_main(self,log_level,is_send_email=False,user_list=None,*log_msg):
'''
Args:
log_level:
*log_msg:
Returns:
'''
log_msg=self.json_log_msg(*log_msg)
__log_name=os.path.join(self.__log_path,f"{log_level}_{self.__time.genrate_mongo_iso_date(isDate=True,isMongo=False)}.log")
logger_conf = {
"format": '{time:YYYY-MM-DD HH:mm:ss} | {level} | {module}:{function}:{line} process_id:{process} thread_id:{thread}, log_content:{message}',
# 配置按大小滚动:rotation, 压缩日志:compression, 多进程安全:enqueue=True
"rotation":"10 MB","compression":"zip","enqueue":True,
# 错误堆栈
"backtrace" : True, "diagnose" : True,
'encoding':'utf-8',
'level':'INFO', #级别等于或高于info的日志才会记录日志到文件中
}
logger.add(__log_name,**logger_conf)
if log_level not in ('debug','info','warning','error'):
log_level='info'
if log_level=='debug':
logger.debug(log_msg)
elif log_level=='info':
logger.info(log_msg)
elif log_level=='warning':
logger.warning(log_msg)
else:
logger.error(log_msg)
if is_send_email:
self.__email.send_msg(sub="日志告警",msg=log_msg,user_list=user_list)
logger.remove()
if __name__=="__main__":
op_log=operationLogging()
op_log.log_main('info',True,None,'1','2','3',(1,2,34))
最终效果: