import json标签:服务,--,接口,re,json,url,return,data,check From: https://www.cnblogs.com/xieweiwen/p/18121286
from USSyunwei.bin import *
import requests
from USSapi.bascidata import *
# 接口服务
def Api_server(port_name, Goal_hierarchy="content", assert_data="", passError=False, **kwargs):
"""
port_name:接口名称,
header:请求头,
data:json数据,如果传了data接口请求数据从data来,不从模板取数据,
url_path:请求路径,
assert_data:断言,
requests_type:请求方式,不传默认get,
Goal_hierarchy:目标数据层级,请求后得到返回值,目标数据再哪个key里面,默认在content里面,如果目标数据仅在第一层 可传空字符串,但是不能不传,
token:令牌,
issendheader:是否输入请求头,默认为否
**kwargs:接口参数,可以指定参数名传,也可以传data字典,
passError:是否需要输出打印警告词条,默认需要
"""
if "header" in kwargs.keys():
header = kwargs["header"]
kwargs.pop("header")
else:
header = {
"Authorization": f"{token}",
"Content-Type": "application/json;charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Cookie": "sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22ex_yong.zeng%22%2C%22first_id%22%3A%2218cce911e5d1c8d-020ee3b22e7e8f6-4c657b58-1327104-18cce911e5e1dd2%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMThjY2U5MTFlNWQxYzhkLTAyMGVlM2IyMmU3ZThmNi00YzY1N2I1OC0xMzI3MTA0LTE4Y2NlOTExZTVlMWRkMiIsIiRpZGVudGl0eV9sb2dpbl9pZCI6ImV4X3lvbmcuemVuZyJ9%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22ex_yong.zeng%22%7D%2C%22%24device_id%22%3A%2218cce911e5d1c8d-020ee3b22e7e8f6-4c657b58-1327104-18cce911e5e1dd2%22%7D"
}
if "data" in kwargs.keys():
data = url_list(port_name, **kwargs["data"], issenddata=True)
else:
data = url_list(port_name, **kwargs, issenddata=False)
# 判断返回的data是否是正常返回
try:
if type(data[1][1]) != dict:
return False, data[1][1]
except:
return False, f"{data}下标1里面没有下标1"
# url
url = data[0][0]
# 请求方式
requests_type = data[0][1]
# 断言
if assert_data=="":
retext = data[0][2]
else:
retext=assert_data
# 请求数据
if data[1][0]:
data = data[1][1]
else:
return False, data[1][1]
# 1、如果请求data有/?&等符号时优先用parm格式请求
match=["/","?","&","?"]
# print(url)
#如果data里面的数据有关键字并且data里面的数据不是字典的话
if any(c in str(data.values()) for c in match) and type(data[f"{port_name}"])!=dict:
if requests_type == "get":
re_check = requests.get(url=url + data[f"{port_name}"], headers=header)
print(url + data[f"{port_name}"])
elif requests_type == "post":
re_check = requests.post(url=url + data[f"{port_name}"], headers=header)
elif requests_type == "put":
re_check = requests.put(url=url + data[f"{port_name}"], headers=header)
else:
return False, "请求方式不存在"
re_check_json = json.loads(re_check.text)
if Goal_hierarchy != "":
#进行断言
if len(re_check_json[f"{Goal_hierarchy}"]) == 1 and retext in str(re_check_json):
return True, re_check_json
elif len(re_check_json[f"{Goal_hierarchy}"]) == 1 and retext not in str(re_check_json):
print("断言失败请确认数据是否正确")
return False, re_check_json
else:
if passError:
pass
else:
print(f"########################警告!-{port_name}数据不唯一如确认无误请忽略#################")
return True, re_check_json
else:
if retext in str(re_check_json):
return True, re_check_json
else :
print("########################断言失败请确认数据是否正确########################")
return False, re_check_json
# 如果是json格式或者数据是字典的话
else:
try:
# 2、以正常的方式请求data和url分开请求
if requests_type == "get":
re_check1 = requests.get(url=url, data=json.dumps(data[f"{port_name}"]), headers=header)
elif requests_type == "post":
re_check1 = requests.post(url=url, data=json.dumps(data[f"{port_name}"]), headers=header)
elif requests_type == "put":
re_check1 = requests.put(url=url, json=data[f"{port_name}"], headers=header)
else:
return False, "请求方式不存在"
re_check1 = json.loads(re_check1.text)
#判断token是否有效
if "过期" in str(re_check1):
return False, re_check1
if retext in str(re_check1):
if Goal_hierarchy== "":
return True,re_check1
else:
re_check1changge = len(re_check1[f"{Goal_hierarchy}"])
if re_check1changge==1:
return True, re_check1
else:
if Goal_hierarchy == "":
print(f"########################{port_name}断言失败,接口数据已返回########################")
return False, re_check1
else:
re_check1changge = len(re_check1[f"{Goal_hierarchy}"])
if re_check1changge == 0:#优先返回接口请求为空的,因为为空大概率是正确的请求
return False, re_check1
elif re_check1changge == 1:
print(f"########################{port_name}断言失败,接口数据已返回########################")
return False, re_check1
except:
re_check1 = {}
re_check1changge = 999
# 把json字典里面的接口名称的data数据的接口名称组装成param的格式
try:
url = f"{url}{Api_bodytoparam(data[f'{port_name}'])}"
if requests_type == "get":
re_check2 = requests.get(url=url, headers=header)
elif requests_type == "post":
re_check2 = requests.post(url=url, headers=header)
elif requests_type == "put":
re_check2 = requests.put(url=url, headers=header)
else:
return False, "请求方式不存在"
re_check2 = json.loads(re_check2.text)
if "过期" in str(re_check2):
return False, re_check2
if retext in str(re_check2):
if Goal_hierarchy == "":
return True, re_check2
else:
re_check2changge = len(re_check2[f"{Goal_hierarchy}"])
if re_check2changge == 1:
return True, re_check2
else:
if Goal_hierarchy == "":
print(f"########################{port_name}断言失败,接口数据已返回########################")
return False, re_check2
else:
re_check2changge = len(re_check2[f"{Goal_hierarchy}"])
if re_check2changge == 0:
return False, re_check2
elif re_check2changge == 1:
print(f"########################{port_name}断言失败,接口数据已返回########################")
return False, re_check2
except:
re_check2 = {}
re_check2changge = 999
# 把data里面的数据组装成parm格式
try:
url = f"{url}{Api_bodytoparam(data)}"
if requests_type == "get":
re_check3 = requests.get(url=url, headers=header)
elif requests_type == "post":
re_check3 = requests.post(url=url, headers=header)
elif requests_type == "put":
re_check3 = requests.put(url=url, headers=header)
else:
return False, "请求方式不存在"
re_check3 = json.loads(re_check3.text)
if "过期" in str(re_check3):
return False, re_check3
if retext in str(re_check3):
if Goal_hierarchy == "":
return True, re_check3
else:
re_check3changge = len(re_check3[f"{Goal_hierarchy}"])
if re_check3changge == 0:
return True, re_check3
elif re_check3changge == 1:
return True, re_check3
else:
if Goal_hierarchy == "":
print(f"########################{port_name}断言失败,接口数据已返回########################")
return False, re_check3
else:
re_check3changge = len(re_check3[f"{Goal_hierarchy}"])
if re_check3changge == 1:
print(f"########################{port_name}断言失败,接口数据已返回########################")
return False, re_check3
except:
re_check3 = {}
re_check3changge = 999
changge = [re_check3changge, re_check2changge, re_check1changge]
changgedict = {re_check3changge: re_check3, re_check2changge: re_check2, re_check1changge: re_check1}
# 有成功得并且成功返回得数量只有一条,返回一条也有可能是失败的,因为有可能数据只有一条
if 1 in changge :
re_check_json = changgedict[1]
return False, re_check_json
# 有成功得但是返回得数量有多条
elif (re_check3changge not in (999, 0) or re_check2changge not in (999, 0) or re_check1changge not in (999, 0)) and (retext in str(re_check3) or retext in str(re_check2) or retext in str(re_check1)) :
for re_check_jsonnum in changge:
if re_check_jsonnum != 999 and re_check_jsonnum != 0:
if passError:
pass
else:
print(f"########################警告!-{port_name}数据不唯一如确认无误请忽略#################")
re_check_json = changgedict[re_check_jsonnum]
return True, re_check_json
# 有0的则取为0的
elif 0 in changge:
re_check_json = changgedict[0]
if passError:
pass
else:
print(f"########################警告-{port_name}接口返回为空########################")
return False, re_check_json
#没有返回成功并且返回得数量有多条
elif (re_check3changge not in (999, 0) or re_check2changge not in (999, 0) or re_check1changge not in (999, 0)) and retext not in str(re_check3changge) :
for re_check_jsonnum in changge:
if re_check_jsonnum != 999 and re_check_jsonnum != 0:
print(f"########################警告-{port_name}断言失败,并且返回了多条数据########################")
re_check_json = changgedict[re_check_jsonnum]
return False, re_check_json
# 最后返回全部失败的返回第一条
else:
re_check_json = changgedict[re_check3changge]
print("########################接口异常,请联系管理员########################")
return False, re_check_json