实现功能
- 测试数据隔离: 测试前后进行数据库备份/还原
- 接口直接的数据依赖: 需要B接口使用A接口响应中的某个字段作为参数
- 对接数据库: 讲数据库的查询结果可直接用于断言操作
- 动态多断言: 可(多个)动态提取实际预期结果与指定的预期结果进行比较断言操作
- 自定义扩展方法: 在用例中使用自定义方法(如:获取当前时间戳...)的返回值
- 邮件发送:将allure报告压缩后已附件形式发送
- 企业微信推送:搭配jenkins发送allure测试报告地址
- 钉钉推送:搭配jenkins发送allure测试报告地址
- 接口录制:录制指定包含url的接口,生成用例数据
目录结构
├─api │ └─client.py # 请求封装 ├─backup_sqls │ └─xxx.sql # 数据库备份文件 ├─config │ └─config.yaml # 配置文件 ├─data │ └─test_data.xlsx # 用例文件 ├─log │ └─run...x.log # 日志文件 ├─report │ ├─data │ └─html # allure报告 ├─test │ ├─conftest.py # 依赖对象初始化 │ └─test_api.py # 测试文件 ├─tools # 工具包 │ ├─__init__.py # 常用方法封装 │ ├─data_clearing.py # 数据隔离 │ ├─data_process.py # 依赖数据处理 │ ├─db.py # 数据库连接对象 │ ├─hooks.py # 自定义扩展方法(可用于用例)文件 │ ├─read_file.py # 用例、配置项读取 │ ├─recording.py # 接口录制,写入用例文件 │ ├─send_dd.py # 给钉钉发送测试报告 │ ├─send_vx.py # 给企业微信发送测试报告 │ └─send_email.py # 邮件发送、报告压缩 ├─项目实战接口文档.md # 配套项目相关接口文档 ├─requirements.txt # 项目依赖库文件 └─run.py # 主启动文件
主要代码
from typing import Any from requests import Session from tools import allure_step, allure_title, logger, allure_step_no from tools.data_process import DataProcess class Transmission: PARAMS: str = "params" DATA: str = "data" JSON: str = "json" class Client(Session): def action(self, case: list, env: str = "dev") -> Any: """处理case数据,转换成可用数据发送请求 :param case: 读取出来的每一行用例内容,可进行解包 :param env: 环境名称 默认使用config.yaml server下的 dev 后面的基准地址 return: 响应结果, 预期结果 """ ( _, case_title, header, path, method, parametric_key, file_obj, data, extra, sql, expect, ) = case logger.debug( f"用例进行处理前数据: \n 接口路径: {path} \n 请求参数: {data} \n 提取参数: {extra} \n 后置sql: {sql} \n 预期结果: {expect} \n " ) # allure报告 用例标题 allure_title(case_title) # 处理url、header、data、file、的前置方法 url = DataProcess.handle_path(path, env) header = DataProcess.handle_header(header) data = DataProcess.handle_data(data) allure_step("请求数据", data) file = DataProcess.handler_files(file_obj) # 发送请求 response = self._request(url, method, parametric_key, header, data, file) # 提取参数 DataProcess.handle_extra(extra, response) return response, expect, sql def _request( self, url, method, parametric_key, header=None, data=None, file=None ) -> dict: """ :param method: 请求方法 :param url: 请求url :param parametric_key: 入参关键字, params(查询参数类型,明文传输,一般在url?参数名=参数值), data(一般用于form表单类型参数) json(一般用于json类型请求参数) :param data: 参数数据,默认等于None :param file: 文件对象 :param header: 请求头 :return: 返回res对象 """ if parametric_key == Transmission.PARAMS: extra_args = {Transmission.PARAMS: data} elif parametric_key == Transmission.DATA: extra_args = {Transmission.DATA: data} elif parametric_key == Transmission.JSON: extra_args = {Transmission.JSON: data} else: raise ValueError("可选关键字为params, json, data") res = self.request( method=method, url=url, files=file, headers=header, verify=False,**extra_args ) response = res.json() logger.info( f"\n最终请求地址:{res.url}\n请求方法:{method}\n请求头:{header}\n请求参数:{data}\n上传文件:{file}\n响应数据:{response}" ) allure_step_no(f"响应耗时(s): {res.elapsed.total_seconds()}") allure_step("响应结果", response) return response client = Client()client.py
server: # 本地接口服务 test: http://127.0.0.1:8888/ # 正式环境地址 dev: ******* # 基准的请求头信息 request_headers: Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36 file_path: test_case: data/case_data.xls report: report/ log: log/run{time}.log email: # 发件人邮箱 user: ******* # 发件人邮箱授权码 password: ******** # 邮箱host host: smtp.qq.com contents: 解压apiAutoReport.zip(接口测试报告)后,请使用已安装Live Server 插件的VsCode,打开解压目录下的index.html查看报告 # 收件人邮箱 addressees: ["*******", "********"] title: 接口自动化测试报告(见附件) # 附件 enclosures: report.zip dingding: # jenkins登录地址 jenkins_url: "http://192.168.1.107:8080/" # job名称 job_name: "job/轻萤/" weixin: # Corpid是企业号的标识 Corpid: "********" # Secret是管理组凭证密钥 Secret: "**********" # 应用ID Agentid: "1000002" # 部门id Partyid: '1' # jenkins登录地址 jenkins_url: "http://192.168.1.107:8080/" # job名称 job_name: "job/轻萤/" # 数据库校验- mysql database: host: ********* port: 3306 user: root # 不用''会被解析成int类型数据 password: 'root123' db_name: py_test charset: utf8mb4 # 数据库所在的服务器配置 ssh_server: port: 22 username: root password: 'root123' # 私有密钥文件路径 private_key_file: # 私钥密码 privat_passowrd: # 如果使用的docker容器部署mysql服务,需要传入mysql的容器id/name mysql_container: mysql8 # 数据库备份文件导出的本地路径, 需要保证存在该文件夹 sql_data_file: backup_sqls/config.yaml
用例文件: case_data.xls
import pytest from tools.data_clearing import DataClearing from tools.db import DB from tools.read_file import ReadFile # @pytest.fixture(scope="session") # def data_clearing(): # """数据清洗""" # DataClearing.server_init() # # 1. 备份数据库 # DataClearing.backup_mysql() # yield # # 2. 恢复数据库 # DataClearing.recovery_mysql() # DataClearing.close_client() # # # # 若不需要数据清洗功能,请把data_clearing去掉 # @pytest.fixture(scope="session") # def get_db(data_clearing): # """关于其作用域请移步查看官方文档""" # try: # db = DB() # yield db # finally: # db.close() # 不使用数据清洗 请把 下面代码解除注释 上面的get_db函数注释 # @pytest.fixture(scope="session") # def get_db(): # """关于其作用域请移步查看官方文档""" # try: # db = DB() # yield db # finally: # db.close() @pytest.fixture(params=ReadFile.read_testcase()) def cases(request): """用例数据,测试方法参数入参该方法名 cases即可,实现同样的参数化 目前来看相较于@pytest.mark.parametrize 更简洁。 """ return request.paramconftest.py
from .conftest import pytest from api import client from tools.data_process import DataProcess # https://www.cnblogs.com/shouhu/p/12392917.html # reruns 重试次数 reruns_delay 次数之间的延时设置(单位:秒) # 失败重跑,会影响总测试时长,如不需要 将 @pytest.mark.flaky(reruns=3, reruns_delay=5) 注释即可 # @pytest.mark.flaky(reruns=2, reruns_delay=1) # def test_main(cases, get_db): # 使用数据库功能(包含sql查询,数据备份,数据恢复) # # 此处的cases入参来自与 conftest.py 文件中 cases函数,与直接使用 @pytest.mark.parametrize # # 有着差不多的效果 # # 发送请求 # response, expect, sql = client.action(cases) # # 执行sql # DataProcess.handle_sql(sql, get_db) # # 断言操作 # DataProcess.assert_result(response, expect) def test_main(cases): # 不使用数据库功能 # 发送请求 response, expect, sql = client.action(cases) # 断言操作 DataProcess.assert_result(response, expect)test_api.py
import re from string import Template from typing import Any import allure from jsonpath import jsonpath from loguru import logger from tools.hooks import * def exec_func(func: str) -> str: """执行函数(exec可以执行Python代码) :params func 字符的形式调用函数 : return 返回的将是个str类型的结果 """ # 得到一个局部的变量字典,来修正exec函数中的变量,在其他函数内部使用不到的问题 loc = locals() exec(f"result = {func}") return str(loc['result']) def extractor(obj: dict, expr: str = '.') -> Any: """ 根据表达式提取字典中的value,表达式, . 提取字典所有内容, $.case 提取一级字典case, $.case.data 提取case字典下的data :param obj :json/dict类型数据 :param expr: 表达式, . 提取字典所有内容, $.case 提取一级字典case, $.case.data 提取case字典下的data $.0.1 提取字典中的第一个列表中的第二个的值 """ try: result = jsonpath(obj, expr)[0] except Exception as e: logger.error(f'{expr} - 提取不到内容,丢给你一个错误!{e}') result = expr return result def rep_expr(content: str, data: dict) -> str: """从请求参数的字符串中,使用正则的方法找出合适的字符串内容并进行替换 :param content: 原始的字符串内容 :param data: 提取的参数变量池 return content: 替换表达式后的字符串 """ content = Template(content).safe_substitute(data) for func in re.findall('\\${(.*?)}', content): try: content = content.replace('${%s}' % func, exec_func(func)) except Exception as e: logger.error(e) return content def convert_json(dict_str: str) -> dict: """ :param dict_str: 长得像字典的字符串 return json格式的内容 """ try: if 'None' in dict_str: dict_str = dict_str.replace('None', 'null') elif 'True' in dict_str: dict_str = dict_str.replace('True', 'true') elif 'False' in dict_str: dict_str = dict_str.replace('False', 'false') dict_str = json.loads(dict_str) except Exception as e: if 'null' in dict_str: dict_str = dict_str.replace('null', 'None') elif 'true' in dict_str: dict_str = dict_str.replace('true', 'True') elif 'false' in dict_str: dict_str = dict_str.replace('false', 'False') dict_str = eval(dict_str) logger.error(e) return dict_str def allure_title(title: str) -> None: """allure中显示的用例标题""" allure.dynamic.title(title) def allure_step(step: str, var: str) -> None: """ :param step: 步骤及附件名称 :param var: 附件内容 """ with allure.step(step): allure.attach( json.dumps( var, ensure_ascii=False, indent=4), step, allure.attachment_type.JSON) def allure_step_no(step: str): """ 无附件的操作步骤 :param step: 步骤名称 :return: """ with allure.step(step): pass_init_.py
import os from datetime import datetime import paramiko from tools.read_file import ReadFile from tools import logger class ServerTools: def __init__( self, host: str, port: int = 22, username: str = "root", password: str = None, private_key_file: str = None, privat_passowrd: str = None): # 进行SSH连接 self.trans = paramiko.Transport((host, port)) self.host = host if password is None: self.trans.connect( username=username, pkey=paramiko.RSAKey.from_private_key_file( private_key_file, privat_passowrd)) else: self.trans.connect(username=username, password=password) # 将sshclient的对象的transport指定为以上的trans self.ssh = paramiko.SSHClient() logger.success("SSH客户端创建成功.") self.ssh._transport = self.trans # 创建SFTP客户端 self.ftp_client = paramiko.SFTPClient.from_transport(self.trans) logger.success("SFTP客户端创建成功.") def execute_cmd(self, cmd: str): """ :param cmd: 服务器下对应的命令 """ stdin, stdout, stderr = self.ssh.exec_command(cmd) error = stderr.read().decode() logger.info(f"输入命令: {cmd} -> 输出结果: {stdout.read().decode()}") logger.error(f"异常信息: {error}") return error def files_action( self, post: bool, local_path: str = os.getcwd(), remote_path: str = "/root"): """ :param post: 动作 为 True 就是上传, False就是下载 :param local_path: 本地的文件路径, 默认当前脚本所在的工作目录 :param remote_path: 服务器上的文件路径,默认在/root目录下 """ if post: # 上传文件 self.ftp_client.put( localpath=local_path, remotepath=f"{remote_path}{os.path.split(local_path)[1]}") logger.info( f"文件上传成功: {local_path} -> {self.host}:{remote_path}{os.path.split(local_path)[1]}") else: # 下载文件 file_path = local_path + os.path.split(remote_path)[1] self.ftp_client.get(remotepath=remote_path, localpath=file_path) logger.info(f"文件下载成功: {self.host}:{remote_path} -> {file_path}") def ssh_close(self): """关闭连接""" self.trans.close() logger.info("已关闭SSH连接...") class DataClearing: settings = ReadFile.read_config('$.database') server_settings = settings.get('ssh_server') server = None # 导出的sql文件名称及后缀 file_name = f"{settings.get('db_name')}_{datetime.now().strftime('%Y-%m-%dT%H_%M_%S')}.sql" @classmethod def server_init(cls, settings=settings, server_settings=server_settings): cls.server = ServerTools( host=settings.get('host'), port=server_settings.get('port'), username=server_settings.get('username'), password=server_settings.get('password'), private_key_file=server_settings.get('private_key_file'), privat_passowrd=server_settings.get('privat_passowrd')) # 新建backup_sql文件夹在服务器上,存放导出的sql文件 cls.server.execute_cmd("mkdir backup_sql") @classmethod def backup_mysql(cls): """ 备份数据库, 会分别备份在数据库所在服务器的/root/backup_sql/目录下, 与当前项目文件目录下的 backup_sqls 每次备份生成一个数据库名_当前年_月_日T_时_分_秒, 支持linux 服务器上安装的mysql服务(本人未调试),以及linux中docker部署的mysql备份 """ if cls.server_settings.get('mysql_container') is None: cmd = f"mysqldump -h127.0.0.1 -u{cls.settings.get('username')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > {cls.file_name}" else: # 将mysql服务的容器中的指定数据库导出, 参考文章 # https://www.cnblogs.com/wangsongbai/p/12666368.html cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysqldump -h127.0.0.1 -u{cls.settings.get('user')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > /root/backup_sql/{cls.file_name}" cls.server.execute_cmd(cmd) cls.server.files_action(0, f"{cls.server_settings.get('sql_data_file')}", f"/root/backup_sql/{cls.file_name}") @classmethod def recovery_mysql( cls, sql_file: str = file_name, database: str = settings.get('db_name')): """ 恢复数据库, 从服务器位置(/root/backup_sql/) 或者本地(../backup_sqls)上传, 传入的需要是.sql文件 :param sql_file: .sql数据库备份文件, 默认就是导出的sql文件名称, 默认文件名称是导出的sql文件 :param database: 恢复的数据库名称,默认是备份数据库(config.yaml中的db_name) """ result = cls.server.execute_cmd(f"ls -l /root/backup_sql/{sql_file}") if "No such file or directory" in result: # 本地上传 cls.server.files_action( 1, f"../backup_sqls/{sql_file}", "/root/backup_sql/") cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysql -u{cls.settings.get('user')} -p{cls.settings.get('password')} {database} < /root/backup_sql/{sql_file}" cls.server.execute_cmd(cmd) @classmethod def close_client(cls): cls.server.ssh_close()data_clearing.py
import re from tools import logger, extractor, convert_json, rep_expr, allure_step, allure_step_no, get_time, get_path from tools.db import DB from tools.read_file import ReadFile class DataProcess: # 存放提取参数的池子 extra_pool = {} header = ReadFile.read_config('$.request_headers') @classmethod def handle_path(cls, path_str: str, env: str) -> str: """路径参数处理 :param path_str: 带提取表达式的字符串 /${id}/state/${create_time} :param env: 环境名称, 对应的是环境基准地址 上述内容表示,从extra_pool字典里取到key为id 对应的值,假设是500,后面${create_time} 类似, 假设其值为 1605711095 最终提取结果 return /511/state/1605711095 """ url = ReadFile.read_config( f'$.server.{env}') + rep_expr(path_str, cls.extra_pool) allure_step_no(f'请求地址: {url}') return url @classmethod def handle_header(cls, header_str: str) -> dict: """处理header, 将用例中的表达式处理后 追加到基础header中 :header_str: 用例栏中的header return header: """ if header_str == '': header_str = '{}' cls.header.update(cls.handle_data(header_str)) allure_step('请求头', cls.header) return cls.header @classmethod def handler_files(cls, file_obj: str) -> object: """file对象处理方法 :param file_obj: 上传文件使用,格式:接口中文件参数的名称:"文件路径地址"/["文件地址1", "文件地址2"] 实例- 单个文件: &file&D: """ if file_obj != '': for k, v in convert_json(file_obj).items(): # 多文件上传 if isinstance(v, list): files = [] for path in v: files.append((k, (open(path, 'rb')))) else: # 单文件上传 l = re.findall(r"\${(.+?)}", str(v)) for i in l: if i == 'path': v = str(v).replace("${" + i + "}", get_path()+'/data/upload_data/') files = {k: open(v, 'rb')} allure_step('上传文件', file_obj) return files @classmethod def handle_data(cls, variable: str) -> dict: """请求数据处理 :param variable: 请求数据,传入的是可转换字典/json的字符串,其中可以包含变量表达式 return 处理之后的json/dict类型的字典数据 """ if variable != '': data = rep_expr(variable, cls.extra_pool) l = re.findall(r"\${(.+?)}", str(data)) for i in l: if i == 'timestamp': data = str(data).replace("${" + i + "}", get_time()) variable = convert_json(data) return variable @classmethod def handle_sql(cls, sql: str, db: DB): """ 处理sql,如果sql执行的结果不会空,执行sql的结果和参数池合并 :param sql: 支持单条或者多条sql,其中多条sql使用 ; 进行分割 多条sql,在用例中填写方式如下select * from user; select * from goods 每条sql语句之间需要使用 ; 来分割 单条sql,select * from user 或者 select * from user; :param db: 数据库连接对象 :return: """ sql = rep_expr(sql, cls.extra_pool) for sql in sql.split(";"): sql = sql.strip() if sql == '': continue # 查后置sql result = db.execute_sql(sql) allure_step(f'执行sql: {sql}', result) logger.info(f'执行sql: {sql} \n 结果: {result}') if result is not None: # 将查询结果添加到响应字典里面,作用在,接口响应的内容某个字段 直接和数据库某个字段比对,在预期结果中 # 使用同样的语法提取即可 cls.extra_pool.update(result) @classmethod def handle_extra(cls, extra_str: str, response: dict): """ 处理提取参数栏 :param extra_str: excel中 提取参数栏内容,需要是 {"参数名": "jsonpath提取式"} 可以有多个 :param response: 当前用例的响应结果字典 """ if extra_str != '': extra_dict = convert_json(extra_str) for k, v in extra_dict.items(): cls.extra_pool[k] = extractor(response, v) logger.info(f'加入依赖字典,key: {k}, 对应value: {v}') @classmethod def assert_result(cls, response: dict, expect_str: str): """ 预期结果实际结果断言方法 :param response: 实际响应结果 :param expect_str: 预期响应内容,从excel中读取 return None """ # 后置sql变量转换 allure_step("当前可用参数池", cls.extra_pool) expect_str = rep_expr(expect_str, cls.extra_pool) expect_dict = convert_json(expect_str) index = 0 for k, v in expect_dict.items(): # 获取需要断言的实际结果部分 actual = extractor(response, k) index += 1 logger.info( f'第{index}个断言,实际结果:{actual} | 预期结果:{v} \n断言结果 {actual == v}') allure_step(f'第{index}个断言', f'实际结果:{actual} = 预期结果:{v}') try: assert actual == v except AssertionError: raise AssertionError( f'第{index}个断言失败 -|- 实际结果:{actual} || 预期结果: {v}')data_process.py
import json from datetime import datetime from typing import Union import pymysql from tools.read_file import ReadFile class DB: mysql = ReadFile.read_config('$.database') def __init__(self): """ 初始化数据库连接,并指定查询的结果集以字典形式返回 """ self.connection = pymysql.connect( host=self.mysql['host'], port=self.mysql['port'], user=self.mysql['user'], password=self.mysql['password'], db=self.mysql['db_name'], charset=self.mysql.get('charset', 'utf8mb4'), cursorclass=pymysql.cursors.DictCursor ) def execute_sql(self, sql: str) -> Union[dict, None]: """ 执行sql语句方法,查询所有结果的sql只会返回一条结果( 比如说: 使用select * from cases , 结果将只会返回第一条数据 {'id': 1, 'name': 'updatehahaha', 'path': None, 'body': None, 'expected': '{"msg": "你好"}', 'api_id': 1, 'create_at': '2021-05-17 17:23:54', 'update_at': '2021-05-17 17:23:54'} ),支持select, delete, insert, update :param sql: sql语句 :return: select 语句 如果有结果则会返回 对应结果字典,delete,insert,update 将返回None """ with self.connection.cursor() as cursor: cursor.execute(sql) result = cursor.fetchone() # 使用commit解决查询数据出现概率查错问题 self.connection.commit() return self.verify(result) def verify(self, result: dict) -> Union[dict, None]: """验证结果能否被json.dumps序列化""" # 尝试变成字符串,解决datetime 无法被json 序列化问题 try: json.dumps(result) except TypeError: # TypeError: Object of type datetime is not JSON serializable for k, v in result.items(): if isinstance(v, datetime): result[k] = str(v) return result def close(self): """关闭数据库连接""" self.connection.close() if __name__ == '__main__': print(DB().execute_sql("SELECT * FROM user where name like '%test2%'"))db.py
import json import os import time def get_current_highest(): """获取当前时间戳""" return int(time.time()) def sum_data(a, b): """计算函数""" return a + b def set_token(token: str): """设置token,直接返回字典""" return {"Authorization": token} def get_time(): '''获取当前时间,格式YYYYmmddHHMMSSxx''' return time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) def get_path(): # 获取当前路径 #curpath = os.path.dirname(os.path.realpath(__file__)) #获取上级目录 curpath = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) return curpath if __name__ == '__main__': print(get_path())hooks.py
import yaml import xlrd from tools import extractor from pathlib import Path class ReadFile: config_dict = None config_path = f"{str(Path(__file__).parent.parent)}/config/config.yaml" @classmethod def get_config_dict(cls) -> dict: """读取配置文件,并且转换成字典 return cls.config_dict """ if cls.config_dict is None: # 指定编码格式解决,win下跑代码抛出错误 with open(cls.config_path, "r", encoding="utf-8") as file: cls.config_dict = yaml.load(file.read(), Loader=yaml.FullLoader) return cls.config_dict @classmethod def read_config(cls, expr: str = ".") -> dict: """默认读取config目录下的config.yaml配置文件,根据传递的expr jsonpath表达式可任意返回任何配置项 :param expr: 提取表达式, 使用jsonpath语法,默认值提取整个读取的对象 return 根据表达式返回的值 """ return extractor(cls.get_config_dict(), expr) @classmethod def read_testcase(cls): """ 读取excel格式的测试用例,返回一个生成器对象 :return 生成器 """ book = xlrd.open_workbook(cls.read_config("$.file_path.test_case")) # 读取第一个sheet页 table = book.sheet_by_index(0) for norw in range(1, table.nrows): # 每行第4列 是否运行 if table.cell_value(norw, 4) != "否": # 每行第4列等于否将不读取内容 value = table.row_values(norw) value.pop(4) yield value if __name__ == '__main__': passread_file.py
# 获取jenkins构建信息和本次报告地址 import os import jenkins import json import urllib3 from tools import get_path from tools.read_file import ReadFile dingding = ReadFile.read_config('$.dingding') # # jenkins登录地址 # jenkins_url = "http://192.168.1.107:8080/" jenkins_url= dingding['jenkins_url'] # 获取jenkins对象 server = jenkins.Jenkins(jenkins_url, username='root', password='******') # job名称 # job_name = "job/轻萤/" job_name = dingding['job_name'] # job的url地址 job_url = jenkins_url + job_name # 获取最后一次构建 job_last_build_url = server.get_info(job_name)['lastBuild']['url'] # 报告地址 report_url = job_last_build_url + 'allure' ''' 钉钉推送方法: 读取report文件中"prometheusData.txt",循环遍历获取需要的值。 使用钉钉机器人的接口,拼接后推送text ''' def DingTalkSend(): d = {} # 获取项目上级路径 path = get_path() # 打开prometheusData 获取需要发送的信息 f = open(path + r'/report\html\export/prometheusData.txt', 'r') for lines in f: for c in lines: launch_name = lines.strip('\n').split(' ')[0] num = lines.strip('\n').split(' ')[1] d.update({launch_name: num}) print(d) f.close() retries_run = d.get('launch_retries_run') # 运行总数 print('运行总数:{}'.format(retries_run)) status_passed = d.get('launch_status_passed') # 通过数量 print('通过数量:{}'.format(status_passed)) status_failed = d.get('launch_status_failed') # 不通过数量 print('通过数量:{}'.format(status_failed)) # 钉钉推送 url = '*******' # webhook con = {"msgtype": "text", "text": { "content": "轻萤脚本执行完成。" "\n测试概述:" "\n运行总数:" + retries_run + "\n通过数量:" + status_passed + "\n失败数量:" + status_failed + "\n构建地址:\n" + job_url + "\n报告地址:\n" + report_url } } urllib3.disable_warnings() http = urllib3.PoolManager() jd = json.dumps(con) jd = bytes(jd, 'utf-8') http.request('POST', url, body=jd, headers={'Content-Type': 'application/json'}) if __name__ == '__main__': DingTalkSend()send_dd.py
import os import jenkins import requests, json import urllib3 from tools import get_path from tools.read_file import ReadFile urllib3.disable_warnings() weixin = ReadFile.read_config('$.weixin') ###填写参数### # Corpid是企业号的标识 Corpid = weixin['Corpid'] # Secret是管理组凭证密钥 Secret = weixin['Secret'] # 应用ID Agentid = weixin['Agentid'] # 部门id Partyid = weixin['Partyid'] # jenkins登录地址 jenkins_url = weixin['jenkins_url'] # 获取jenkins对象 server = jenkins.Jenkins(jenkins_url, username='root', password='*****') # job名称 job_name = weixin['job_name'] # job的url地址 job_url = jenkins_url + job_name # 获取最后一次构建 job_last_build_url = server.get_info(job_name)['lastBuild']['url'] # 报告地址 report_url = job_last_build_url + 'allure' def GetTokenFromServer(Corpid, Secret): """获取access_token""" Url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" Data = { "corpid": Corpid, "corpsecret": Secret } r = requests.get(url=Url, params=Data, verify=False) print(r.json()) if r.json()['errcode'] != 0: return False else: Token = r.json()['access_token'] return Token def SendMessage(Partyid): """发送消息""" Token = GetTokenFromServer(Corpid, Secret) d = {} # 获取项目上级路径 path = get_path() # 打开prometheusData 获取需要发送的信息 f = open(path + r'/report/html/export/prometheusData.txt', 'r') for lines in f: for c in lines: launch_name = lines.strip('\n').split(' ')[0] num = lines.strip('\n').split(' ')[1] d.update({launch_name: num}) print(d) f.close() retries_run = d.get('launch_retries_run') # 运行总数 print('运行总数:{}'.format(retries_run)) status_passed = d.get('launch_status_passed') # 通过数量 print('通过数量:{}'.format(status_passed)) status_failed = d.get('launch_status_failed') # 不通过数量 print('通过数量:{}'.format(status_failed)) # 发送消息 Url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token Data = { "toparty": Partyid, "msgtype": "text", "agentid": Agentid, "text": { "content": "轻萤脚本执行完成。" "\n测试概述:" "\n运行总数:" + retries_run + "\n通过数量:" + status_passed + "\n失败数量:" + status_failed + "\n构建地址:\n" + job_url + "\n报告地址:\n" + report_url }, "safe": "0" } r = requests.post(url=Url, data=json.dumps(Data), verify=False) # 如果发送失败,将重试三次 n = 1 while r.json()['errcode'] != 0 and n < 4: n = n + 1 Token = GetTokenFromServer(Corpid, Secret) if Token: Url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token r = requests.post(url=Url, data=json.dumps(Data), verify=False) print(r.json()) return r.json() if __name__ == '__main__': Status = SendMessage(Partyid) print(Status)send_vx.py
import yagmail from tools import logger import zipfile import os from tools.read_file import ReadFile file_path = ReadFile.read_config('$.file_path') email = ReadFile.read_config('$.email') class EmailServe: @staticmethod def zip_report(file_path: str, out_path: str): """ 压缩指定文件夹 :param file_path: 目标文件夹路径 :param out_path: 压缩文件保存路径+xxxx.zip :return: 无 """ file_path = f"{file_path}html" zip = zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) for path, dirnames, filenames in os.walk(file_path): # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩 fpath = path.replace(file_path, '') fpath = fpath and fpath + os.sep or '' for filename in filenames: zip.write( os.path.join( path, filename), os.path.join(fpath, filename)) zip.close() @staticmethod def send_email(setting: dict, file_path): """ 入参一个字典 :param user: 发件人邮箱 :param password: 邮箱授权码 :param host: 发件人使用的邮箱服务 例如:smtp.163.com :param contents: 内容 :param addressees: 收件人列表 :param title: 邮件标题 :param enclosures: 附件列表 :param file_path: 需要压缩的文件夹 :return: """ EmailServe.zip_report( file_path=file_path, out_path=setting['enclosures']) yag = yagmail.SMTP( setting['user'], setting['password'], setting['host']) # 发送邮件 yag.send( setting['addressees'], setting['title'], setting['contents'], setting['enclosures']) # 关闭服务 yag.close() logger.info("邮件发送成功!") if __name__ == '__main__': #EmailServe.zip_report('../report/html', 'report.zip') EmailServe.send_email(email, file_path['report'])send_email.py
import os import shutil from test.conftest import pytest from tools import logger from tools.read_file import ReadFile from tools.send_email import EmailServe file_path = ReadFile.read_config('$.file_path') email = ReadFile.read_config('$.email') def run(): if os.path.exists('report/'): shutil.rmtree(path='report/') # 解决 issues 句柄无效 logger.remove() logger.add(file_path['log'], enqueue=True, encoding='utf-8') logger.info(""" _ _ _ _____ _ __ _ _ __ (_) / \\ _ _| |_ __|_ _|__ ___| |_ / _` | '_ \\| | / _ \\| | | | __/ _ \\| |/ _ \\/ __| __| | (_| | |_) | |/ ___ \\ |_| | || (_) | | __/\\__ \\ |_ \\__,_| .__/|_/_/ \\_\\__,_|\\__\\___/|_|\\___||___/\\__| |_| Starting ... ... ... """) pytest.main( args=[ 'test/test_api.py', f'--alluredir={file_path["report"]}/data']) # 自动以服务形式打开报告 # os.system(f'allure serve {file_path["report"]}/data') # 本地生成报告 os.system( f'allure generate {file_path["report"]}/data -o {file_path["report"]}/html --clean') logger.success('报告已生成') # 发送邮件带附件报告 # EmailServe.send_email(email, file_path['report']) # # 删除本地附件 # os.remove(email['enclosures']) if __name__ == '__main__': run()run.py
搜索
复制
<iframe height="240" width="320"></iframe> 标签:allure,str,import,Pytest,file,sql,path,requests,data From: https://www.cnblogs.com/MrqiuS/p/16714392.html