并发编程基础
APP自动化并发项目实战
项目步骤
- 获取设备信息并根据设备数量开启n个进程
- appium服务处理
pytest前置处理开启appium服务
pytest后置处理关闭appium服务 - pytest进行用例执行,并输出报告数据
- 等待进程执行完毕
- 生成每个设备的allure报告
- 开启allure服务,并展示报告内容
项目代码
from multiprocessing import Process
import pytest
from tools.decorator import appium
from tools.manage_devices import ManageDevices
from tools.server import AllureServer, AppiumServer
from tools.unit import file_path, logger
@appium
def run_pytest(obj):
print(obj)
port = obj["port"]
driver = str(obj) # pytest只接受字符串
TestCase = file_path("test_case", "test_login.py") # 指定执行测试用例
pytest.main(["-s", # -s打印内容
"-q", # -v输出详细用例执行信息; -q简化输出,只显示整体测试结果
# "-W", "ignore::DeprecationWarning", # 过滤警告
"--devices", driver, # 自定义参数
# TestCase, # 用例文件夹或具体用例文件
# "-m", "smoke" # 只执行标记用例
# "--reruns", "2", # 用例失败重试
"--alluredir", f"reports/{port}_result", "--clean-alluredir", # allure用例数据报告生成及路径和每次清理历史数据
# "--junit-xml", f"reports/{name}_result/result.xml", # xml报告及生成路径
"--durations=0" # 显示执行最慢的用例,n表示显示前几个(n=0表示所有)
])
# @appium # 进程调用函数无法使用装饰器,可以再后面方法使用
def process_devices(obj):
run_pytest(obj)
def run_process():
pool = ManageDevices().devices_pool() # 获取设备信息列表
device_port = []
process = []
if pool:
for i in pool:
device_port.append(i["port"])
obj = Process(target=process_devices, args=(i,)) # 创建线程
obj.start()
process.append(obj) # 加入线程池
for t in process:
t.join()
logger.info("进程全部执行结束")
# allure生成报表,并启动程序
for port in device_port:
create_path = f"./reports/{port}"
html_path = f"./reports/{port}_html"
# 创建html文件
AllureServer.create_html(create_path)
# 启动allure服务
AllureServer.start_allure(port=int(port) + 1000, path=html_path)
# # 30秒后关闭allure服务
# time.sleep(30)
# for port in device_port:
# AllureServer.stop_allure(port=int(port) + 1000)
else:
logger.info("未获取到设备信息")
if __name__ == '__main__':
run_process()
标签:appium,obj,进阶,allure,APP,用例,pytest,port
From: https://www.cnblogs.com/upstudy/p/18004473