refactor(models): 优化项目
- 重构assert_all - 优化目录结构
This commit is contained in:
@@ -14,11 +14,13 @@ import logging
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
|
||||
from commons import settings
|
||||
from commons.files import YamlFile
|
||||
from commons.models import CaseInfo
|
||||
from commons.session import Session
|
||||
from commons.exchange import Exchange
|
||||
from commons import settings
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -31,7 +33,6 @@ exchanger = Exchange(settings.exchanger)
|
||||
|
||||
@allure.epic("项目名称:answer")
|
||||
class TestAPI:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def find_yaml_case(cls, case_path: Path = _case_path):
|
||||
@@ -41,12 +42,12 @@ class TestAPI:
|
||||
"""
|
||||
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
|
||||
for yaml_path in yaml_path_list:
|
||||
# logger.info(f"load file {yaml_path=}")
|
||||
logger.info(f"加载文件:{yaml_path}")
|
||||
|
||||
file = YamlFile(yaml_path) # 自动读取yaml文件
|
||||
case_info = CaseInfo(**file) # 校验yaml格式
|
||||
|
||||
# logger.info(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
|
||||
logger.info(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
|
||||
|
||||
case_func = cls.new_case(case_info) # 从yaml格式转换为pytest格式
|
||||
print(yaml_path.stem)
|
||||
@@ -59,6 +60,7 @@ class TestAPI:
|
||||
|
||||
ddt_title = [data.title for data in ddt_data]
|
||||
logger.info(f"{ddt_title=}")
|
||||
|
||||
@allure.feature(case_info.feature)
|
||||
@allure.story(case_info.story)
|
||||
@pytest.mark.parametrize("case_info", ddt_data, ids=ddt_title)
|
||||
|
||||
@@ -90,7 +90,7 @@ if __name__ == '__main__':
|
||||
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
|
||||
exchanger.extract(mock_resp, "data", "json", '$.data', 0)
|
||||
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
|
||||
case_info = CaseInfo(
|
||||
mock_case_info = CaseInfo(
|
||||
title="单元测试",
|
||||
request={
|
||||
"data":
|
||||
@@ -99,5 +99,5 @@ if __name__ == '__main__':
|
||||
extract={},
|
||||
validate={}
|
||||
)
|
||||
new_case_info = exchanger.replace(case_info)
|
||||
print(new_case_info)
|
||||
new_mock_case_info = exchanger.replace(mock_case_info)
|
||||
print(new_mock_case_info)
|
||||
|
||||
@@ -12,10 +12,10 @@
|
||||
import logging
|
||||
|
||||
import yaml
|
||||
from commons.models import CaseInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class YamlFile(dict):
|
||||
def __init__(self, path):
|
||||
super().__init__()
|
||||
@@ -34,10 +34,13 @@ class YamlFile(dict):
|
||||
dict(self),
|
||||
stream=f,
|
||||
allow_unicode=True, # allow_unicode:使用unicode编码正常显示中文
|
||||
sort_keys=False) # sort_keys:保持原有排序
|
||||
sort_keys=False # sort_keys:保持原有排序
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from commons.models import CaseInfo
|
||||
|
||||
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml'
|
||||
yaml_file = YamlFile(yaml_path)
|
||||
# yaml_file.load()
|
||||
|
||||
@@ -17,7 +17,7 @@ import hashlib
|
||||
|
||||
from commons.databases import db
|
||||
|
||||
# from commons.files import YamlFile
|
||||
from commons.files import YamlFile
|
||||
from commons import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -53,7 +53,7 @@ def new_id():
|
||||
def last_id() -> str:
|
||||
# 不自增,只返回结果
|
||||
|
||||
id_file = YamlFile("id.yaml")
|
||||
id_file = YamlFile(settings.id_path)
|
||||
return id_file["id"]
|
||||
|
||||
|
||||
@@ -97,8 +97,5 @@ def rsa_decode(content: str) -> str:
|
||||
if __name__ == '__main__':
|
||||
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
|
||||
# print(res)
|
||||
a = "这是中文dddddd"
|
||||
bb = base64_encode(a)
|
||||
print(bb)
|
||||
cc = base64_decode(bb)
|
||||
print(cc)
|
||||
print(f"计数器:{new_id()}")
|
||||
print(f"当前数值:{last_id()}")
|
||||
|
||||
@@ -10,13 +10,14 @@
|
||||
@desc: 声明yaml用例格式
|
||||
"""
|
||||
import logging
|
||||
from dataclasses import dataclass, asdict
|
||||
from dataclasses import dataclass, asdict, field
|
||||
|
||||
import allure
|
||||
import yaml
|
||||
|
||||
from commons.templates import Template
|
||||
from commons import settings
|
||||
from utils import case_validator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -27,7 +28,7 @@ class CaseInfo:
|
||||
request: dict
|
||||
extract: dict
|
||||
validate: dict
|
||||
parametrize: list = ""
|
||||
parametrize: list = field(default_factory=list)
|
||||
epic: str = settings.allure_epic
|
||||
feature: str = settings.allure_feature
|
||||
story: str = settings.allure_story
|
||||
@@ -48,26 +49,29 @@ class CaseInfo:
|
||||
|
||||
@allure.step("断言")
|
||||
def assert_all(self):
|
||||
if not self.validate:
|
||||
return
|
||||
for assert_type, assert_value in self.validate.items():
|
||||
for msg, data in assert_value.items():
|
||||
a, b = data[0], data[1]
|
||||
# print(assert_type, a, b, msg)
|
||||
match assert_type:
|
||||
case 'equals':
|
||||
logger.info(f"assert {a} == {b}, {msg}")
|
||||
assert a == b, msg
|
||||
case 'not_equals':
|
||||
logger.info(f"assert {a} != {b}, {msg}")
|
||||
assert a != b, msg
|
||||
case 'contains':
|
||||
logger.info(f"assert {a} in {b}, {msg}")
|
||||
assert a in b, msg
|
||||
case 'not_contains':
|
||||
logger.info(f"assert {a} not in {b}, {msg}")
|
||||
assert a not in b, msg
|
||||
# case "xxxxx
|
||||
_validator = case_validator.CaseValidator()
|
||||
# print(case_validator.VALIDATORS)
|
||||
_validator.assert_all(self.validate)
|
||||
# if not self.validate:
|
||||
# return
|
||||
# for assert_type, assert_value in self.validate.items():
|
||||
# for msg, data in assert_value.items():
|
||||
# a, b = data[0], data[1]
|
||||
# # print(assert_type, a, b, msg)
|
||||
# match assert_type:
|
||||
# case 'equals':
|
||||
# logger.info(f"assert {a} == {b}, {msg}")
|
||||
# assert a == b, msg
|
||||
# case 'not_equals':
|
||||
# logger.info(f"assert {a} != {b}, {msg}")
|
||||
# assert a != b, msg
|
||||
# case 'contains':
|
||||
# logger.info(f"assert {a} in {b}, {msg}")
|
||||
# assert a in b, msg
|
||||
# case 'not_contains':
|
||||
# logger.info(f"assert {a} not in {b}, {msg}")
|
||||
# assert a not in b, msg
|
||||
# case "xxxxx
|
||||
|
||||
def ddt(self) -> list: # 返回一个列表,列表中应该包含N个注入了变量的caseInfo
|
||||
case_list = []
|
||||
|
||||
@@ -13,15 +13,15 @@ from pathlib import Path
|
||||
|
||||
root_path = (Path(__file__)).resolve().parents[1]
|
||||
|
||||
base_url = 'http://127.0.0.1:40065'
|
||||
base_url = 'http://119.91.19.171:40065'
|
||||
case_path = rf"{root_path}\TestCases\answer"
|
||||
exchanger = rf"{root_path}\extract.yaml"
|
||||
id_path = rf"{root_path}\id.yaml"
|
||||
|
||||
db_host = '127.0.0.1' # ip
|
||||
db_host = '119.91.19.171' # ip
|
||||
db_port = 3306 # 端口
|
||||
db_user = 'root' # 用户名
|
||||
db_password = 'password' # 密码
|
||||
db_password = 'mysql_hNahSe' # 密码
|
||||
db_database = 'answer' # 库名
|
||||
|
||||
allure_epic: str = "项目名称:answer"
|
||||
|
||||
@@ -13,6 +13,7 @@ import copy
|
||||
import logging
|
||||
import re
|
||||
import string
|
||||
import inspect
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -58,7 +59,7 @@ class Template(string.Template):
|
||||
func = mapping.get(func_name) # 读取指定函数
|
||||
func_args_value = [mapping.get(arg, arg) for arg in func_args]
|
||||
|
||||
if func_args_value == [""]: # 处理没有参数的func
|
||||
if func_args_value == [""]: # 处理没有参数的func
|
||||
func_args_value = []
|
||||
|
||||
if not callable(func):
|
||||
@@ -75,8 +76,13 @@ def hot_load():
|
||||
if func_name.startswith("_"):
|
||||
continue
|
||||
func_code = getattr(funcs, func_name) # 取到函数对象
|
||||
# print(func_code)
|
||||
if callable(func_code): # 如果是一个可以调用的函数
|
||||
Template.func_mapping[func_name] = func_code # 函数放到Template中
|
||||
print(Template.func_mapping)
|
||||
# if inspect.isfunction(func_code): # 如果是一个可以调用的函数
|
||||
# Template.func_mapping[func_name] = func_code # 函数放到Template中
|
||||
# print(Template.func_mapping)
|
||||
|
||||
|
||||
hot_load()
|
||||
|
||||
Reference in New Issue
Block a user