refactor(): 优化测试用例数据的处理,优化代码结构

- 新增用例生成器和注册器
- 优化文件处理
This commit is contained in:
2025-06-03 21:42:57 +08:00
parent 2e9f1c12f7
commit 300b5a92d4
16 changed files with 468 additions and 293 deletions

80
commons/case_handler.py Normal file
View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_handler
@date: 2025/5/26 22:13
@desc:
"""
import json
import logging
from dataclasses import dataclass, asdict
from commons.models import TestCaseModel
logger = logging.getLogger(__name__)
@dataclass
class TestCaseHandle(TestCaseModel):
@classmethod
def new(cls, testcase: dict) -> 'TestCaseHandle':
try:
instance = cls(**testcase)
return instance
except (TypeError, ValueError) as e:
logger.warning(f"解析错误:{e}")
raise e
def to_string(self) -> str:
"""
将 字典 转换为 json 格式的字符串。
:return:
json 格式的字符串。
"""
try:
res = json.dumps(asdict(self), ensure_ascii=False)
return res
except TypeError as e:
logger.error(f"将数据转换为 json 字符串时出错: {e}")
raise e
@staticmethod
def to_dict(json_str: str) -> dict:
"""
将 json 格式的字符串转换为 字典.
:param
json_str: json 格式的字符串。
:return:
"""
try:
res = json.loads(json_str)
return res
except json.JSONDecodeError as e:
logger.error(f"将 json 字符串转换为字典时出错: {e}")
raise e
if __name__ == '__main__':
from pathlib import Path
from commons.file_processors import processor_factory
test_data = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml")
yaml_data = processor_factory.get_processor_class(test_data)
case_info = TestCaseHandle.new(yaml_data.load())
print(case_info.to_string())
print(type(case_info.to_string()))
print(case_info.to_dict(case_info.to_string()))
print(type(case_info.to_dict(case_info.to_string())))
print(type(case_info))
print(case_info.parametrize)
for i in case_info.parametrize:
print(i)

View File

@@ -9,18 +9,23 @@
@date: 2024 2024/9/16 9:57
@desc: 动态生成用例
"""
from dataclasses import asdict
from pathlib import Path
import logging
from typing import Union, Generator, Type
from unittest import TestCase
import allure
import pytest
from commons import settings
from commons.file_processors.file_handle import FileHandle
from commons.models import CaseInfo
from commons.file_processors.processor_factory import get_processor_class
# from commons.models import CaseInfo
from commons.session import Session
from commons.exchange import Exchange
from utils import data_driver, case_validator
from commons.templates import Template
from commons.case_handler import TestCaseHandle
from utils import case_validator
logger = logging.getLogger(__name__)
@@ -32,90 +37,195 @@ exchanger = Exchange(settings.exchanger)
class TestAPI:
@classmethod
def find_test_cases(cls, case_dir: Path = cases_dir):
"""
搜索和加载yaml文件
:return:
"""
case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for case_path in case_path_list:
logger.info(f"加载文件:{case_path}")
def run(cls, testcase_dir: Union[Path, str] = cases_dir):
for fp in CaseFinder(testcase_dir).find_testcases():
print(fp.name)
case = CaseGenerator(fp).generate_testcases()
print(f"{case=}")
for i in case:
print(f"{i=}")
CaseRegister(cls).register_test_func(i)
# @classmethod
# def find_test_cases(cls, case_dir: Path = cases_dir):
# """
# 搜索和加载yaml文件
# :return:
# """
# case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
# for case_path in case_path_list:
# logger.info(f"加载文件:{case_path}")
#
# file = FileHandle(case_path) # 自动读取yaml文件
# try:
# CaseInfo(**file) # 校验用例格式
# logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
# case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# # print(case_path.stem)
# setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
# except Exception as e:
# logger.error(e)
#
# @classmethod
# def new_case(cls, file_name, case_info: dict):
# test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
#
# keys_list = list(test_case.keys())
# logger.info(f"keys_list{keys_list}")
#
# values_list = list(test_case.values())
# logger.info(f"测试用例列表:{values_list}")
#
# driver_title = [i.get("title") for i in values_list]
# logger.info(f"driver_title={driver_title}")
#
# epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
# logger.info(f"epic{epic}")
#
# feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
# logger.info(f"feature{feature}")
#
# story = case_info["story"] if case_info["story"] else settings.allure_story
# logger.info(f"story{story}")
#
# @allure.epic(epic)
# @allure.feature(feature)
# @allure.story(story)
# @pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
# def test_func(self, case_key):
# logger.info(f"case_key{case_key}")
#
# test_case_mapping = test_case.get(case_key)
# logger.info(f"测试用例:{test_case_mapping}")
#
# allure.dynamic.title(test_case_mapping.get("title"))
#
# logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
#
# # 0变量替换
# new_case_info = exchanger.replace(test_case_mapping)
# logger.info(f"1正在注入变量...")
# logger.info(f"new_case_info{new_case_info}")
# # 1发送请求
# logger.info(f"2正在请求接口...")
# resp = session.request(**new_case_info.get("request"))
#
# logger.info(f"3正在提取变量...")
# # 2保存变量(接口关联)
# for var_name, extract_info in new_case_info.get("extract").items():
# logger.info(f"保存变量:{var_name}{extract_info}")
# exchanger.extract(resp, var_name, *extract_info)
# # 3断言
# logger.info(f"4正在断言...")
# assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}")
# # assert_case_info.assert_all() # 执行断言
# _validator = case_validator.CaseValidator()
# _validator.assert_all(assert_case_info.get("validate"))
#
# logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
#
# return test_func
file = FileHandle(case_path) # 自动读取yaml文件
try:
CaseInfo(**file) # 校验用例格式
logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# print(case_path.stem)
setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
except Exception as e:
logger.error(e)
@classmethod
def new_case(cls, file_name, case_info: dict):
test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
class CaseFinder:
find_suffix: str = settings.test_suffix
keys_list = list(test_case.keys())
logger.info(f"keys_list{keys_list}")
def __init__(self, testcase_dir: Union[str, Path]):
if Path(testcase_dir).is_dir():
self.testcase_dir: Path = Path(testcase_dir)
else:
raise FileNotFoundError("不是有效的目录")
values_list = list(test_case.values())
logger.info(f"测试用例列表:{values_list}")
def find_testcases(self) -> Generator[Path, None, None]:
testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}")
for fp in testcase_files:
logger.info(f"加载文件:{fp}")
yield fp
driver_title = [i.get("title") for i in values_list]
logger.info(f"driver_title={driver_title}")
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
class CaseGenerator:
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
def __init__(self, fp: Union[str, Path]):
self.fp: Path = Path(fp)
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
def generate_testcases(self) -> Generator[dict, None, None]:
file_name = self.fp.stem
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
@pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
def test_func(self, case_key):
logger.info(f"case_key{case_key}")
case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件
case_info = TestCaseHandle.new(case_info_)
test_case_mapping = test_case.get(case_key)
logger.info(f"测试用例:{test_case_mapping}")
if not case_info.parametrize:
yield {file_name + "__": asdict(case_info)}
else:
cases = {}
args_names = case_info.parametrize[0]
for i, args_values in enumerate(case_info.parametrize[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
print(context)
# rendered = Template(FileHandle.to_string(case_info)).render(context)
rendered = Template(case_info.to_string()).render(context)
# cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)})
allure.dynamic.title(test_case_mapping.get("title"))
yield cases
logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(test_case_mapping)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
class CaseRegister:
def __init__(self, register: Type["TestAPI"]):
self.register: Type["TestAPI"] = register
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
def register_test_func(self, case: dict):
for test_filed_name, case_info in case.items():
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
return test_func
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
def register_func(instance, testcase=case_info):
# allure.dynamic.epic(epic)
# allure.dynamic.feature(feature)
# allure.dynamic.story(story)
allure.dynamic.title(testcase.get("title"))
logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(testcase)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(testcase) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "="))
# return test_func
setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中
# TestAPI.find_yaml_case()
if __name__ == '__main__':
TestAPI.find_test_cases()
# print(TestAPI.__dict__)
TestAPI.run(cases_dir)
print(TestAPI.__dict__)

View File

@@ -33,13 +33,13 @@ class DBServer:
db = DBServer(
host=settings.db_host, # ip
port=3306, # 端口
user='root', # 用户名
password='mysql_hNahSe', # 密码
database='answer' # 库名
port=settings.db_port, # 端口
user=settings.db_user, # 用户名
password=settings.db_password, # 密码
database=settings.db_database # 库名
)
if __name__ == '__main__':
...
res = db.execute_sql('select username from user where id=1;')
print(res[0])
# res = db.execute_sql('select username from user where id=1;')
# print(res[0])

View File

@@ -18,17 +18,18 @@ import jsonpath
import allure
from commons.templates import Template
from commons.file_processors.file_handle import FileHandle
from commons.file_processors.processor_factory import get_processor_class
from tests.b import TestCaseHandle
logger = logging.getLogger(__name__)
class Exchange:
def __init__(self, path):
self.file = FileHandle(path)
self.file = get_processor_class(path)
@allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index):
def extract(self, resp, var_name, attr, expr: str, index) -> None:
resp = copy.deepcopy(resp)
@@ -53,21 +54,22 @@ class Exchange:
value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件
data = self.file.load()
data[var_name] = value # 保存变量
self.file.save(data) # 持久化存储到文件
@allure.step("替换变量")
def replace(self, case_info: dict) -> dict:
logger.info(f"变量替换:{case_info}")
# 1将case_info转换为字符串
case_info_str = FileHandle.to_string(case_info)
data = TestCaseHandle(**case_info)
case_info_str = data.to_string()
print(f"{case_info_str=}")
# 2替换字符串
case_info_str = Template(case_info_str).render(self.file)
case_info_str = Template(case_info_str).render(self.file.load())
print(f"{case_info_str=}")
# 3将字符串转换成case_info
new_case_info = FileHandle.to_dict(case_info_str)
new_case_info = data.to_dict(case_info_str)
return new_case_info

View File

@@ -9,3 +9,14 @@
@date: 2025/3/4 17:23
@desc:
"""
from .base_processor import BaseFileProcessor
from .json_processor import JsonProcessor
from .yaml_processor import YamlProcessor
from .processor_factory import get_processor_class
__all__ = [
"BaseFileProcessor",
"JsonProcessor",
"YamlProcessor",
"get_processor_class",
]

View File

@@ -10,6 +10,8 @@
@desc:
"""
import abc
from pathlib import Path
from typing import Union
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
@@ -17,25 +19,16 @@ class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
文件处理器的抽象基类
定义了所有子类必须实现的方法
"""
def __init__(self, filepath: Union[str, Path], **kwargs):
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
@abc.abstractmethod
def load(self):
def load(self) -> dict:
"""加载."""
pass
@staticmethod
@abc.abstractmethod
def to_string(data: dict) -> str:
"""将文件内容转换为字符串。"""
pass
@staticmethod
@abc.abstractmethod
def to_dict(data: str) -> dict:
"""将文件内容转换为字典。"""
pass
raise NotImplementedError
@abc.abstractmethod
def save(self, new_filepath=None):
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""将数据保存."""
pass
raise NotImplementedError

View File

@@ -1,41 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
processors = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
def get_processor(ext):
agent_model = processors.get(ext, YamlProcessor) # 代理模式
return agent_model # 默认回退到 Yaml
FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = FileHandle(yaml_path)
print(yaml_file)
print(type(yaml_file))
file_string = FileHandle.to_string(yaml_file)
print(file_string)
file_dict = FileHandle.to_dict(file_string)
print(file_dict)

View File

@@ -10,117 +10,77 @@
@desc:
"""
import logging
from typing import Union
from typing import Union, Any
from pathlib import Path
import json
from commons.file_processors.base import BaseFileProcessor
from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__)
class JsonProcessor(BaseFileProcessor, dict):
class JsonProcessor(BaseFileProcessor):
"""
用于处理 YAML 文件的类,继承自 dict
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。
用于处理 JSON 文件的类。
提供了从文件加载 JSON 数据为字典,以及将字典保存为 JSON 文件的功能
"""
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
def __init__(self, filepath: Union[str, Path], **kwargs):
"""
初始化 YamlFile 对象。
初始化 JsonFile 对象。
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
"""
super().__init__() # 初始化父类 dict
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
super().__init__(filepath, **kwargs)
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> None:
def load(self) -> dict[str, Any]:
"""
YAML 文件加载数据并更新字典
如果文件不存在或加载失败,则清空字典并记录警告/错误。
"""
self.clear() # 清空现有数据
if self.filepath.exists():
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = json.load(f) or {}
self.update(loaded_data) # 使用加载的数据更新字典
except json.JSONDecodeError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
@staticmethod
def to_string(data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
Json 文件加载数据。
:return:
"""
if not self.filepath.exists():
logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
try:
return json.dumps(
dict(data), # 使用dict转换为标准的字典
ensure_ascii=False, # 允许非ASCII字符
# indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键
)
except TypeError as e:
logger.error(f"将数据转换为 JSON 字符串时出错: {e}")
return ""
@staticmethod
def to_dict(data: str) -> None:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = json.loads(data) or {}
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = json.load(f)
if not isinstance(loaded_data, dict): # 确保加载的是字典
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
return loaded_data
except json.JSONDecodeError as e:
logger.error(f"将 JSON 字符串转换为字典时出错: {e}")
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
raise e
def save(self, new_filepath: Union[str, Path, None] = None):
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""
将字典数据 (自身) 保存到 YAML 文件。
将字典数据保存到 json 文件。
Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(
dict(self), # 使用dict转换为标准的字典
data,
f,
ensure_ascii=False, # 允许非ASCII字符
indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键
)
except (TypeError, OSError) as e:
logger.info(f"数据已成功保存到 {filepath}")
except (TypeError, OSError, json.JSONDecodeError) as e:
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
raise e
if __name__ == '__main__':
# 示例用法
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
json_file = JsonProcessor(json_path)
print(json_file)
print(json_file.load())
print(type(json_file))
json_string = JsonProcessor.to_string(json_file)
JsonProcessor.to_dict(json_string)
print(json_string)
json_file.save()
# json_file.save()

View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from pathlib import Path
from typing import Type, Union
from commons.file_processors.base_processor import BaseFileProcessor
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
# 类型别名,表示处理器类的字典
ProcessorMap = dict[str, Type[BaseFileProcessor]]
processors: ProcessorMap = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
class UnsupportedFileTypeError(Exception):
"""当文件类型不被支持时抛出此异常。"""
pass
# def get_processor_class(file_suffix: str = "yaml") -> Type[BaseFileProcessor]:
def get_processor_class(fp: Union[Path, str]) -> 'BaseFileProcessor':
fp = Path(fp)
if fp.is_file():
file_suffix = fp.suffix[1:]
processor_class = processors.get(file_suffix.lower(), YamlProcessor) # 代理模式
return processor_class(fp) # 默认回退到 Yaml
else:
raise UnsupportedFileTypeError(fp)
# FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
# yaml_file = FileHandle(yaml_path)
# print(yaml_file.load())
# print(type(yaml_file))
# file_suffix = Path(yaml_path).suffix[1:]
# print(file_suffix)
get_processor = get_processor_class(yaml_path)
print(get_processor.load())

View File

@@ -11,108 +11,77 @@
"""
import logging
from typing import Union
from dataclasses import dataclass, asdict, field
from pathlib import Path
import yaml
from commons.file_processors.base import BaseFileProcessor
from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__)
class YamlProcessor(BaseFileProcessor, dict):
class YamlProcessor(BaseFileProcessor):
"""
用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。
"""
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
def __init__(self, filepath: Union[str, Path], **kwargs):
"""
初始化 YamlFile 对象。
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
"""
super().__init__() # 初始化父类 dict
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
super().__init__(filepath, **kwargs)
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> None:
def load(self) -> dict:
"""
从 YAML 文件加载数据并更新字典。
如果文件不存在或加载失败,则清空字典并记录警告/错误。
从 YAML 文件加载数据
:return:
"""
self.clear() # 清空现有数据
if self.filepath.exists():
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f) or {}
self.update(loaded_data) # 使用加载的数据更新字典
except yaml.YAMLError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
if not self.filepath.exists():
logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
@staticmethod
def to_string(data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
dict(data), # 使用dict转换为标准的字典
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
@staticmethod
def to_dict(data: str) -> Union[None, dict]:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f)
if not isinstance(loaded_data, dict): # 确保加载的是字典
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
return loaded_data
except yaml.YAMLError as e:
logger.error(f" YAML 字符串转换为字典时出错: {e}")
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
raise e
def save(self, new_filepath: Union[str, Path, None] = None):
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""
将字典数据 (自身) 保存到 YAML 文件。
将字典数据保存到 YAML 文件。
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
# 确保目标目录存在
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, "w", encoding="utf-8") as f:
yaml.safe_dump(
dict(self), # 使用dict转换为标准的字典
data,
stream=f,
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except (TypeError, OSError) as e:
logger.info(f"数据已成功保存到 {filepath}")
except (TypeError, OSError, yaml.YAMLError) as e:
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
raise e
@@ -122,7 +91,7 @@ if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlProcessor(yaml_path)
print(yaml_file)
print(yaml_file.load())
print(type(yaml_file))
# # 直接像字典一样访问数据

View File

@@ -17,7 +17,7 @@ import hashlib
from commons.databases import db
from commons.file_processors.file_handle import FileHandle
from commons.file_processors.processor_factory import get_processor_class
from commons import settings
logger = logging.getLogger(__name__)
@@ -72,19 +72,21 @@ def sql(s: str) -> str:
@Funcs.register("new_id")
def new_id():
# 自增,永不重复
id_file = FileHandle(settings.id_path)
id_file["id"] += 1
id_file.save()
id_file = get_processor_class(settings.id_path)
data = id_file.load()
data["id"] += 1
id_file.save(data)
return id_file["id"]
return data["id"]
@Funcs.register("last_id")
def last_id() -> str:
# 不自增,只返回结果
id_file = FileHandle(settings.id_path)
return id_file["id"]
id_file = get_processor_class(settings.id_path)
data = id_file.load()
return data["id"]
@Funcs.register("md5")
@@ -127,10 +129,13 @@ def rsa_encode(content: str) -> str:
@Funcs.register("rsa_decode")
def rsa_decode(content: str) -> str:
...
@Funcs.register()
def func_name_test():
...
if __name__ == '__main__':
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
# print(res)

View File

@@ -10,6 +10,7 @@
@desc: 声明yaml用例格式
"""
import logging
from typing import Union, Optional
from dataclasses import dataclass, field
import yaml
@@ -20,20 +21,45 @@ logger = logging.getLogger(__name__)
@dataclass
class CaseInfo:
class RequestModel:
method: str
url: str
headers: Optional[dict] = None
# body: Optional[Union[dict, str]] = None
params: Optional[Union[dict, str]] = None
@dataclass
class TestCaseModel:
title: str
request: dict
request: RequestModel
extract: dict
validate: dict
parametrize: list = field(default_factory=list)
epic: str = settings.allure_epic
feature: str = settings.allure_feature
story: str = settings.allure_story
epic: str = field(default_factory=lambda: settings.allure_epic)
feature: str = field(default_factory=lambda: settings.allure_feature)
story: str = field(default_factory=lambda: settings.allure_story)
def __post_init__(self):
# 必填字段非空校验
if self.title is None:
raise ValueError("Title cannot be empty")
# 校验RequestModel
if isinstance(self.request, dict):
try:
self.request = RequestModel(**self.request) # RequestModel 的 __post_init__ 会被调用
except (TypeError, ValueError) as e:
raise ValueError(f"解析 'request' 字段失败: {e} (数据: {self.request})") from e
elif not isinstance(self.request, RequestModel): # 如果不是 dict 也不是 RequestModel
raise TypeError(
f"字段 'request' 必须是字典 (将在内部转换为 RequestModel) 或 RequestModel 实例, "
f"得到的是 {type(self.request).__name__}"
)
if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
# print(data)
case_info = CaseInfo(**data)
case_info = TestCaseModel(**data)

View File

@@ -22,6 +22,8 @@ cases_dir = rf"{root_path}\TestCases\answer"
exchanger = rf"{root_path}\extract.yaml"
id_path = rf"{root_path}\id.yaml"
test_suffix = "yaml"
db_host = os.getenv("DB_HOST") # ip
db_port = os.getenv("DB_PORT") # 端口
db_user = os.getenv("DB_USER") # 用户名

View File

@@ -5,7 +5,7 @@ import pytest
from commons.cases import TestAPI
TestAPI.find_test_cases() # 加载yaml文件
TestAPI.run() # 加载yaml文件
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

View File

@@ -17,6 +17,7 @@ pymysql = "^1.1.1"
pytest-result-log = "^1.2.2"
allure-pytest = "^2.13.5"
cryptography = "^44.0.2"
dotenv = "^0.9.9"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -3,7 +3,7 @@ addopts = -q --show-capture=no
log_file = logs/pytest.log
log_file_level = info
log_file_level = debug
log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s
log_file_date_format = %m/%d/%Y %I:%M:%S %p