diff --git a/commons/case_handler.py b/commons/case_handler.py new file mode 100644 index 0000000..b04c065 --- /dev/null +++ b/commons/case_handler.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: case_handler +@date: 2025/5/26 22:13 +@desc: +""" +import json + +import logging +from dataclasses import dataclass, asdict +from commons.models import TestCaseModel + +logger = logging.getLogger(__name__) + + +@dataclass +class TestCaseHandle(TestCaseModel): + + @classmethod + def new(cls, testcase: dict) -> 'TestCaseHandle': + + try: + instance = cls(**testcase) + return instance + except (TypeError, ValueError) as e: + logger.warning(f"解析错误:{e}") + raise e + + def to_string(self) -> str: + """ + 将 字典 转换为 json 格式的字符串。 + + :return: + json 格式的字符串。 + """ + try: + res = json.dumps(asdict(self), ensure_ascii=False) + return res + except TypeError as e: + logger.error(f"将数据转换为 json 字符串时出错: {e}") + raise e + + @staticmethod + def to_dict(json_str: str) -> dict: + """ + 将 json 格式的字符串转换为 字典. + + :param + json_str: json 格式的字符串。 + :return: + """ + try: + res = json.loads(json_str) + return res + except json.JSONDecodeError as e: + logger.error(f"将 json 字符串转换为字典时出错: {e}") + raise e + + +if __name__ == '__main__': + from pathlib import Path + from commons.file_processors import processor_factory + + test_data = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml") + yaml_data = processor_factory.get_processor_class(test_data) + case_info = TestCaseHandle.new(yaml_data.load()) + print(case_info.to_string()) + print(type(case_info.to_string())) + print(case_info.to_dict(case_info.to_string())) + print(type(case_info.to_dict(case_info.to_string()))) + print(type(case_info)) + print(case_info.parametrize) + + for i in case_info.parametrize: + print(i) diff --git a/commons/cases.py b/commons/cases.py index ee7ee1f..c02f299 100644 --- a/commons/cases.py +++ b/commons/cases.py @@ -9,18 +9,23 @@ @date: 2024 2024/9/16 9:57 @desc: 动态生成用例 """ +from dataclasses import asdict from pathlib import Path import logging +from typing import Union, Generator, Type +from unittest import TestCase import allure import pytest from commons import settings -from commons.file_processors.file_handle import FileHandle -from commons.models import CaseInfo +from commons.file_processors.processor_factory import get_processor_class +# from commons.models import CaseInfo from commons.session import Session from commons.exchange import Exchange -from utils import data_driver, case_validator +from commons.templates import Template +from commons.case_handler import TestCaseHandle +from utils import case_validator logger = logging.getLogger(__name__) @@ -32,90 +37,195 @@ exchanger = Exchange(settings.exchanger) class TestAPI: - @classmethod - def find_test_cases(cls, case_dir: Path = cases_dir): - """ - 搜索和加载yaml文件 - :return: - """ - case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件 - for case_path in case_path_list: - logger.info(f"加载文件:{case_path}") + def run(cls, testcase_dir: Union[Path, str] = cases_dir): + for fp in CaseFinder(testcase_dir).find_testcases(): + print(fp.name) + case = CaseGenerator(fp).generate_testcases() + print(f"{case=}") + for i in case: + print(f"{i=}") + CaseRegister(cls).register_test_func(i) + # @classmethod + # def find_test_cases(cls, case_dir: Path = cases_dir): + # """ + # 搜索和加载yaml文件 + # :return: + # """ + # case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件 + # for case_path in case_path_list: + # logger.info(f"加载文件:{case_path}") + # + # file = FileHandle(case_path) # 自动读取yaml文件 + # try: + # CaseInfo(**file) # 校验用例格式 + # logger.info(f"case_info:{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志 + # case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式 + # # print(case_path.stem) + # setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中 + # except Exception as e: + # logger.error(e) + # + # @classmethod + # def new_case(cls, file_name, case_info: dict): + # test_case = data_driver.DataDriver().generate_cases(file_name, case_info) + # + # keys_list = list(test_case.keys()) + # logger.info(f"keys_list:{keys_list}") + # + # values_list = list(test_case.values()) + # logger.info(f"测试用例列表:{values_list}") + # + # driver_title = [i.get("title") for i in values_list] + # logger.info(f"driver_title={driver_title}") + # + # epic = case_info["epic"] if case_info["epic"] else settings.allure_epic + # logger.info(f"epic:{epic}") + # + # feature = case_info["feature"] if case_info["feature"] else settings.allure_feature + # logger.info(f"feature:{feature}") + # + # story = case_info["story"] if case_info["story"] else settings.allure_story + # logger.info(f"story:{story}") + # + # @allure.epic(epic) + # @allure.feature(feature) + # @allure.story(story) + # @pytest.mark.parametrize("case_key", keys_list, ids=driver_title) + # def test_func(self, case_key): + # logger.info(f"case_key:{case_key}") + # + # test_case_mapping = test_case.get(case_key) + # logger.info(f"测试用例:{test_case_mapping}") + # + # allure.dynamic.title(test_case_mapping.get("title")) + # + # logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "=")) + # + # # 0,变量替换 + # new_case_info = exchanger.replace(test_case_mapping) + # logger.info(f"1,正在注入变量...") + # logger.info(f"new_case_info:{new_case_info}") + # # 1,发送请求 + # logger.info(f"2,正在请求接口...") + # resp = session.request(**new_case_info.get("request")) + # + # logger.info(f"3,正在提取变量...") + # # 2,保存变量(接口关联) + # for var_name, extract_info in new_case_info.get("extract").items(): + # logger.info(f"保存变量:{var_name}{extract_info}") + # exchanger.extract(resp, var_name, *extract_info) + # # 3,断言 + # logger.info(f"4,正在断言...") + # assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量 + # logger.info(f"替换变量后:{assert_case_info}") + # # assert_case_info.assert_all() # 执行断言 + # _validator = case_validator.CaseValidator() + # _validator.assert_all(assert_case_info.get("validate")) + # + # logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "=")) + # + # return test_func - file = FileHandle(case_path) # 自动读取yaml文件 - try: - CaseInfo(**file) # 校验用例格式 - logger.info(f"case_info:{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志 - case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式 - # print(case_path.stem) - setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中 - except Exception as e: - logger.error(e) - @classmethod - def new_case(cls, file_name, case_info: dict): - test_case = data_driver.DataDriver().generate_cases(file_name, case_info) +class CaseFinder: + find_suffix: str = settings.test_suffix - keys_list = list(test_case.keys()) - logger.info(f"keys_list:{keys_list}") + def __init__(self, testcase_dir: Union[str, Path]): + if Path(testcase_dir).is_dir(): + self.testcase_dir: Path = Path(testcase_dir) + else: + raise FileNotFoundError("不是有效的目录") - values_list = list(test_case.values()) - logger.info(f"测试用例列表:{values_list}") + def find_testcases(self) -> Generator[Path, None, None]: + testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}") + for fp in testcase_files: + logger.info(f"加载文件:{fp}") + yield fp - driver_title = [i.get("title") for i in values_list] - logger.info(f"driver_title={driver_title}") - epic = case_info["epic"] if case_info["epic"] else settings.allure_epic - logger.info(f"epic:{epic}") +class CaseGenerator: - feature = case_info["feature"] if case_info["feature"] else settings.allure_feature - logger.info(f"feature:{feature}") + def __init__(self, fp: Union[str, Path]): + self.fp: Path = Path(fp) - story = case_info["story"] if case_info["story"] else settings.allure_story - logger.info(f"story:{story}") + def generate_testcases(self) -> Generator[dict, None, None]: + file_name = self.fp.stem - @allure.epic(epic) - @allure.feature(feature) - @allure.story(story) - @pytest.mark.parametrize("case_key", keys_list, ids=driver_title) - def test_func(self, case_key): - logger.info(f"case_key:{case_key}") + case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件 + case_info = TestCaseHandle.new(case_info_) - test_case_mapping = test_case.get(case_key) - logger.info(f"测试用例:{test_case_mapping}") + if not case_info.parametrize: + yield {file_name + "__": asdict(case_info)} + else: + cases = {} + args_names = case_info.parametrize[0] + for i, args_values in enumerate(case_info.parametrize[1:]): + # print(args_values) + context = dict(zip(args_names, args_values)) + print(context) + # rendered = Template(FileHandle.to_string(case_info)).render(context) + rendered = Template(case_info.to_string()).render(context) + # cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)}) + cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)}) - allure.dynamic.title(test_case_mapping.get("title")) + yield cases - logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "=")) - # 0,变量替换 - new_case_info = exchanger.replace(test_case_mapping) - logger.info(f"1,正在注入变量...") - logger.info(f"new_case_info:{new_case_info}") - # 1,发送请求 - logger.info(f"2,正在请求接口...") - resp = session.request(**new_case_info.get("request")) +class CaseRegister: + def __init__(self, register: Type["TestAPI"]): + self.register: Type["TestAPI"] = register - logger.info(f"3,正在提取变量...") - # 2,保存变量(接口关联) - for var_name, extract_info in new_case_info.get("extract").items(): - logger.info(f"保存变量:{var_name}{extract_info}") - exchanger.extract(resp, var_name, *extract_info) - # 3,断言 - logger.info(f"4,正在断言...") - assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量 - logger.info(f"替换变量后:{assert_case_info}") - # assert_case_info.assert_all() # 执行断言 - _validator = case_validator.CaseValidator() - _validator.assert_all(assert_case_info.get("validate")) + def register_test_func(self, case: dict): + for test_filed_name, case_info in case.items(): + epic = case_info["epic"] if case_info["epic"] else settings.allure_epic + logger.info(f"epic:{epic}") - logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "=")) + feature = case_info["feature"] if case_info["feature"] else settings.allure_feature + logger.info(f"feature:{feature}") - return test_func + story = case_info["story"] if case_info["story"] else settings.allure_story + logger.info(f"story:{story}") + + @allure.epic(epic) + @allure.feature(feature) + @allure.story(story) + def register_func(instance, testcase=case_info): + # allure.dynamic.epic(epic) + # allure.dynamic.feature(feature) + # allure.dynamic.story(story) + allure.dynamic.title(testcase.get("title")) + logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "=")) + + # 0,变量替换 + new_case_info = exchanger.replace(testcase) + logger.info(f"1,正在注入变量...") + logger.info(f"new_case_info:{new_case_info}") + # 1,发送请求 + logger.info(f"2,正在请求接口...") + resp = session.request(**new_case_info.get("request")) + + logger.info(f"3,正在提取变量...") + # 2,保存变量(接口关联) + for var_name, extract_info in new_case_info.get("extract").items(): + logger.info(f"保存变量:{var_name}{extract_info}") + exchanger.extract(resp, var_name, *extract_info) + # 3,断言 + logger.info(f"4,正在断言...") + assert_case_info = exchanger.replace(testcase) # 为断言加载变量 + logger.info(f"替换变量后:{assert_case_info}") + # assert_case_info.assert_all() # 执行断言 + _validator = case_validator.CaseValidator() + _validator.assert_all(assert_case_info.get("validate")) + + logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "=")) + + # return test_func + + setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中 # TestAPI.find_yaml_case() if __name__ == '__main__': - TestAPI.find_test_cases() - # print(TestAPI.__dict__) + TestAPI.run(cases_dir) + print(TestAPI.__dict__) diff --git a/commons/databases.py b/commons/databases.py index fe65d82..dc3474f 100644 --- a/commons/databases.py +++ b/commons/databases.py @@ -33,13 +33,13 @@ class DBServer: db = DBServer( host=settings.db_host, # ip - port=3306, # 端口 - user='root', # 用户名 - password='mysql_hNahSe', # 密码 - database='answer' # 库名 + port=settings.db_port, # 端口 + user=settings.db_user, # 用户名 + password=settings.db_password, # 密码 + database=settings.db_database # 库名 ) if __name__ == '__main__': ... - res = db.execute_sql('select username from user where id=1;') - print(res[0]) + # res = db.execute_sql('select username from user where id=1;') + # print(res[0]) diff --git a/commons/exchange.py b/commons/exchange.py index eb5dde7..62dc88b 100644 --- a/commons/exchange.py +++ b/commons/exchange.py @@ -18,17 +18,18 @@ import jsonpath import allure from commons.templates import Template -from commons.file_processors.file_handle import FileHandle +from commons.file_processors.processor_factory import get_processor_class +from tests.b import TestCaseHandle logger = logging.getLogger(__name__) class Exchange: def __init__(self, path): - self.file = FileHandle(path) + self.file = get_processor_class(path) @allure.step("提取变量") - def extract(self, resp, var_name, attr, expr: str, index): + def extract(self, resp, var_name, attr, expr: str, index) -> None: resp = copy.deepcopy(resp) @@ -53,21 +54,22 @@ class Exchange: value = "not data" logger.debug(f"{var_name} = {value}") # 记录变量名和变量值 - - self.file[var_name] = value # 保存变量 - self.file.save() # 持久化存储到文件 + data = self.file.load() + data[var_name] = value # 保存变量 + self.file.save(data) # 持久化存储到文件 @allure.step("替换变量") def replace(self, case_info: dict) -> dict: logger.info(f"变量替换:{case_info}") # 1,将case_info转换为字符串 - case_info_str = FileHandle.to_string(case_info) + data = TestCaseHandle(**case_info) + case_info_str = data.to_string() print(f"{case_info_str=}") # 2,替换字符串 - case_info_str = Template(case_info_str).render(self.file) + case_info_str = Template(case_info_str).render(self.file.load()) print(f"{case_info_str=}") # 3,将字符串转换成case_info - new_case_info = FileHandle.to_dict(case_info_str) + new_case_info = data.to_dict(case_info_str) return new_case_info diff --git a/commons/file_processors/__init__.py b/commons/file_processors/__init__.py index b525101..9aa0097 100644 --- a/commons/file_processors/__init__.py +++ b/commons/file_processors/__init__.py @@ -9,3 +9,14 @@ @date: 2025/3/4 17:23 @desc: """ +from .base_processor import BaseFileProcessor +from .json_processor import JsonProcessor +from .yaml_processor import YamlProcessor +from .processor_factory import get_processor_class + +__all__ = [ + "BaseFileProcessor", + "JsonProcessor", + "YamlProcessor", + "get_processor_class", +] \ No newline at end of file diff --git a/commons/file_processors/base.py b/commons/file_processors/base_processor.py similarity index 54% rename from commons/file_processors/base.py rename to commons/file_processors/base_processor.py index 28e9583..af8e4f4 100644 --- a/commons/file_processors/base.py +++ b/commons/file_processors/base_processor.py @@ -10,6 +10,8 @@ @desc: """ import abc +from pathlib import Path +from typing import Union class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类 @@ -17,25 +19,16 @@ class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类 文件处理器的抽象基类。 定义了所有子类必须实现的方法。 """ + def __init__(self, filepath: Union[str, Path], **kwargs): + + self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 @abc.abstractmethod - def load(self): + def load(self) -> dict: """加载.""" - pass - - @staticmethod - @abc.abstractmethod - def to_string(data: dict) -> str: - """将文件内容转换为字符串。""" - pass - - @staticmethod - @abc.abstractmethod - def to_dict(data: str) -> dict: - """将文件内容转换为字典。""" - pass + raise NotImplementedError @abc.abstractmethod - def save(self, new_filepath=None): + def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None: """将数据保存.""" - pass + raise NotImplementedError diff --git a/commons/file_processors/file_handle.py b/commons/file_processors/file_handle.py deleted file mode 100644 index e311b58..0000000 --- a/commons/file_processors/file_handle.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python -# coding=utf-8 - -""" -@author: CNWei -@Software: PyCharm -@contact: t6i888@163.com -@file: file_handle -@date: 2025/3/7 09:31 -@desc: -""" - -from commons.file_processors.yaml_processor import YamlProcessor -from commons.file_processors.json_processor import JsonProcessor - -processors = { - 'yaml': YamlProcessor, - 'yml': YamlProcessor, - 'json': JsonProcessor, - -} - - -def get_processor(ext): - agent_model = processors.get(ext, YamlProcessor) # 代理模式 - - return agent_model # 默认回退到 Yaml - - -FileHandle = get_processor("yaml") - -if __name__ == '__main__': - # 示例用法 - yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 - yaml_file = FileHandle(yaml_path) - print(yaml_file) - print(type(yaml_file)) - file_string = FileHandle.to_string(yaml_file) - print(file_string) - file_dict = FileHandle.to_dict(file_string) - print(file_dict) diff --git a/commons/file_processors/json_processor.py b/commons/file_processors/json_processor.py index 521d89b..5c86f98 100644 --- a/commons/file_processors/json_processor.py +++ b/commons/file_processors/json_processor.py @@ -10,117 +10,77 @@ @desc: """ import logging -from typing import Union +from typing import Union, Any from pathlib import Path import json -from commons.file_processors.base import BaseFileProcessor +from commons.file_processors.base_processor import BaseFileProcessor logger = logging.getLogger(__name__) -class JsonProcessor(BaseFileProcessor, dict): +class JsonProcessor(BaseFileProcessor): """ - 用于处理 YAML 文件的类,继承自 dict。 - 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, - 并可以直接像字典一样访问 YAML 数据。 + 用于处理 JSON 文件的类。 + 提供了从文件加载 JSON 数据为字典,以及将字典保存为 JSON 文件的功能。 """ - def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): + def __init__(self, filepath: Union[str, Path], **kwargs): """ - 初始化 YamlFile 对象。 + 初始化 JsonFile 对象。 Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). - data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 - 如果不提供,则尝试从 filepath 加载数据。 """ - super().__init__() # 初始化父类 dict - self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 - if data is not None: - self.update(data) # 如果提供了初始数据,则更新字典 - else: - self.load() # 否则,尝试从文件加载 + super().__init__(filepath, **kwargs) + # self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 - def load(self) -> None: + def load(self) -> dict[str, Any]: """ - 从 YAML 文件加载数据并更新字典。 - 如果文件不存在或加载失败,则清空字典并记录警告/错误。 - """ - self.clear() # 清空现有数据 - if self.filepath.exists(): - try: - with open(self.filepath, "r", encoding="utf-8") as f: - loaded_data = json.load(f) or {} - self.update(loaded_data) # 使用加载的数据更新字典 - except json.JSONDecodeError as e: - logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") - # 保持字典为空 (已在开头 clear) - else: - logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") - # 保持字典为空 (已在开头 clear) - - @staticmethod - def to_string(data: dict) -> str: - """ - 将字典 (自身) 转换为 YAML 格式的字符串。 - - Returns: - YAML 格式的字符串。 + 从 Json 文件加载数据。 + :return: """ + if not self.filepath.exists(): + logger.warning(f"文件 {self.filepath} 不存在.") + raise FileNotFoundError(f"文件 {self.filepath} 不存在.") try: - return json.dumps( - dict(data), # 使用dict转换为标准的字典 - ensure_ascii=False, # 允许非ASCII字符 - # indent=4, # 美化输出,缩进4个空格 - sort_keys=False # 不排序键 - ) - except TypeError as e: - logger.error(f"将数据转换为 JSON 字符串时出错: {e}") - return "" - - @staticmethod - def to_dict(data: str) -> None: - """ - 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. - - Args: - data: YAML 格式的字符串。 - """ - try: - loaded_data = json.loads(data) or {} + with open(self.filepath, "r", encoding="utf-8") as f: + loaded_data = json.load(f) + if not isinstance(loaded_data, dict): # 确保加载的是字典 + logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") + raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") return loaded_data except json.JSONDecodeError as e: - logger.error(f"将 JSON 字符串转换为字典时出错: {e}") + logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") + raise e - def save(self, new_filepath: Union[str, Path, None] = None): + def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None: """ - 将字典数据 (自身) 保存到 YAML 文件。 + 将字典数据保存到 json 文件。 Args: - new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。 + :param data: + :param new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。 """ filepath = Path(new_filepath) if new_filepath else self.filepath - + filepath.parent.mkdir(parents=True, exist_ok=True) try: with open(filepath, "w", encoding="utf-8") as f: json.dump( - dict(self), # 使用dict转换为标准的字典 + data, f, ensure_ascii=False, # 允许非ASCII字符 - indent=4, # 美化输出,缩进4个空格 sort_keys=False # 不排序键 ) - except (TypeError, OSError) as e: + logger.info(f"数据已成功保存到 {filepath}") + except (TypeError, OSError, json.JSONDecodeError) as e: logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}") + raise e if __name__ == '__main__': # 示例用法 json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径 json_file = JsonProcessor(json_path) - print(json_file) + print(json_file.load()) print(type(json_file)) - json_string = JsonProcessor.to_string(json_file) - JsonProcessor.to_dict(json_string) - print(json_string) - json_file.save() + # json_file.save() diff --git a/commons/file_processors/processor_factory.py b/commons/file_processors/processor_factory.py new file mode 100644 index 0000000..1a03900 --- /dev/null +++ b/commons/file_processors/processor_factory.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: file_handle +@date: 2025/3/7 09:31 +@desc: +""" +from pathlib import Path +from typing import Type, Union + +from commons.file_processors.base_processor import BaseFileProcessor +from commons.file_processors.yaml_processor import YamlProcessor +from commons.file_processors.json_processor import JsonProcessor + +# 类型别名,表示处理器类的字典 +ProcessorMap = dict[str, Type[BaseFileProcessor]] +processors: ProcessorMap = { + 'yaml': YamlProcessor, + 'yml': YamlProcessor, + 'json': JsonProcessor, + +} + + +class UnsupportedFileTypeError(Exception): + """当文件类型不被支持时抛出此异常。""" + pass + + +# def get_processor_class(file_suffix: str = "yaml") -> Type[BaseFileProcessor]: +def get_processor_class(fp: Union[Path, str]) -> 'BaseFileProcessor': + fp = Path(fp) + if fp.is_file(): + file_suffix = fp.suffix[1:] + processor_class = processors.get(file_suffix.lower(), YamlProcessor) # 代理模式 + + return processor_class(fp) # 默认回退到 Yaml + else: + raise UnsupportedFileTypeError(fp) + + +# FileHandle = get_processor("yaml") + +if __name__ == '__main__': + # 示例用法 + yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 + # yaml_file = FileHandle(yaml_path) + # print(yaml_file.load()) + # print(type(yaml_file)) + # file_suffix = Path(yaml_path).suffix[1:] + # print(file_suffix) + get_processor = get_processor_class(yaml_path) + print(get_processor.load()) diff --git a/commons/file_processors/yaml_processor.py b/commons/file_processors/yaml_processor.py index 8dcb80a..7b64bdc 100644 --- a/commons/file_processors/yaml_processor.py +++ b/commons/file_processors/yaml_processor.py @@ -11,108 +11,77 @@ """ import logging from typing import Union -from dataclasses import dataclass, asdict, field from pathlib import Path import yaml -from commons.file_processors.base import BaseFileProcessor +from commons.file_processors.base_processor import BaseFileProcessor logger = logging.getLogger(__name__) -class YamlProcessor(BaseFileProcessor, dict): +class YamlProcessor(BaseFileProcessor): """ 用于处理 YAML 文件的类,继承自 dict。 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, 并可以直接像字典一样访问 YAML 数据。 """ - def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): + def __init__(self, filepath: Union[str, Path], **kwargs): """ 初始化 YamlFile 对象。 - Args: - filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). + Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 如果不提供,则尝试从 filepath 加载数据。 """ - super().__init__() # 初始化父类 dict - self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 - if data is not None: - self.update(data) # 如果提供了初始数据,则更新字典 - else: - self.load() # 否则,尝试从文件加载 + super().__init__(filepath, **kwargs) + # self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 - def load(self) -> None: + def load(self) -> dict: """ - 从 YAML 文件加载数据并更新字典。 - 如果文件不存在或加载失败,则清空字典并记录警告/错误。 + 从 YAML 文件加载数据 + :return: """ - self.clear() # 清空现有数据 - if self.filepath.exists(): - try: - with open(self.filepath, "r", encoding="utf-8") as f: - loaded_data = yaml.safe_load(f) or {} - self.update(loaded_data) # 使用加载的数据更新字典 - except yaml.YAMLError as e: - logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") - # 保持字典为空 (已在开头 clear) - else: - logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") - # 保持字典为空 (已在开头 clear) + if not self.filepath.exists(): + logger.warning(f"文件 {self.filepath} 不存在.") + raise FileNotFoundError(f"文件 {self.filepath} 不存在.") - @staticmethod - def to_string(data: dict) -> str: - """ - 将字典 (自身) 转换为 YAML 格式的字符串。 - - Returns: - YAML 格式的字符串。 - """ try: - return yaml.safe_dump( - dict(data), # 使用dict转换为标准的字典 - allow_unicode=True, - sort_keys=False, - default_flow_style=False - ) - except TypeError as e: - logger.error(f"将数据转换为 YAML 字符串时出错: {e}") - return "" - - @staticmethod - def to_dict(data: str) -> Union[None, dict]: - """ - 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. - - Args: - data: YAML 格式的字符串。 - """ - try: - loaded_data = yaml.safe_load(data) or {} + with open(self.filepath, "r", encoding="utf-8") as f: + loaded_data = yaml.safe_load(f) + if not isinstance(loaded_data, dict): # 确保加载的是字典 + logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") + raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") return loaded_data + except yaml.YAMLError as e: - logger.error(f"将 YAML 字符串转换为字典时出错: {e}") + logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") + raise e - def save(self, new_filepath: Union[str, Path, None] = None): + def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None: """ - 将字典数据 (自身) 保存到 YAML 文件。 + 将字典数据保存到 YAML 文件。 + :param data: + :param new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。 - Args: - new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。 """ filepath = Path(new_filepath) if new_filepath else self.filepath + # 确保目标目录存在 + filepath.parent.mkdir(parents=True, exist_ok=True) + try: with open(filepath, "w", encoding="utf-8") as f: yaml.safe_dump( - dict(self), # 使用dict转换为标准的字典 + data, stream=f, allow_unicode=True, sort_keys=False, default_flow_style=False ) - except (TypeError, OSError) as e: + logger.info(f"数据已成功保存到 {filepath}") + except (TypeError, OSError, yaml.YAMLError) as e: logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") + raise e @@ -122,7 +91,7 @@ if __name__ == '__main__': # 示例用法 yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 yaml_file = YamlProcessor(yaml_path) - print(yaml_file) + print(yaml_file.load()) print(type(yaml_file)) # # 直接像字典一样访问数据 diff --git a/commons/funcs.py b/commons/funcs.py index b6913e2..36827bd 100644 --- a/commons/funcs.py +++ b/commons/funcs.py @@ -17,7 +17,7 @@ import hashlib from commons.databases import db -from commons.file_processors.file_handle import FileHandle +from commons.file_processors.processor_factory import get_processor_class from commons import settings logger = logging.getLogger(__name__) @@ -72,19 +72,21 @@ def sql(s: str) -> str: @Funcs.register("new_id") def new_id(): # 自增,永不重复 - id_file = FileHandle(settings.id_path) - id_file["id"] += 1 - id_file.save() + id_file = get_processor_class(settings.id_path) + data = id_file.load() + data["id"] += 1 + id_file.save(data) - return id_file["id"] + return data["id"] @Funcs.register("last_id") def last_id() -> str: # 不自增,只返回结果 - id_file = FileHandle(settings.id_path) - return id_file["id"] + id_file = get_processor_class(settings.id_path) + data = id_file.load() + return data["id"] @Funcs.register("md5") @@ -127,10 +129,13 @@ def rsa_encode(content: str) -> str: @Funcs.register("rsa_decode") def rsa_decode(content: str) -> str: ... + + @Funcs.register() def func_name_test(): ... + if __name__ == '__main__': # res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82") # print(res) diff --git a/commons/models.py b/commons/models.py index c697f5f..f31dba0 100644 --- a/commons/models.py +++ b/commons/models.py @@ -10,6 +10,7 @@ @desc: 声明yaml用例格式 """ import logging +from typing import Union, Optional from dataclasses import dataclass, field import yaml @@ -20,20 +21,45 @@ logger = logging.getLogger(__name__) @dataclass -class CaseInfo: +class RequestModel: + method: str + url: str + headers: Optional[dict] = None + # body: Optional[Union[dict, str]] = None + params: Optional[Union[dict, str]] = None + + +@dataclass +class TestCaseModel: title: str - request: dict + request: RequestModel extract: dict validate: dict parametrize: list = field(default_factory=list) - epic: str = settings.allure_epic - feature: str = settings.allure_feature - story: str = settings.allure_story + epic: str = field(default_factory=lambda: settings.allure_epic) + feature: str = field(default_factory=lambda: settings.allure_feature) + story: str = field(default_factory=lambda: settings.allure_story) + + def __post_init__(self): + # 必填字段非空校验 + if self.title is None: + raise ValueError("Title cannot be empty") + + # 校验RequestModel + if isinstance(self.request, dict): + try: + self.request = RequestModel(**self.request) # RequestModel 的 __post_init__ 会被调用 + except (TypeError, ValueError) as e: + raise ValueError(f"解析 'request' 字段失败: {e} (数据: {self.request})") from e + elif not isinstance(self.request, RequestModel): # 如果不是 dict 也不是 RequestModel + raise TypeError( + f"字段 'request' 必须是字典 (将在内部转换为 RequestModel) 或 RequestModel 实例, " + f"得到的是 {type(self.request).__name__}" + ) if __name__ == '__main__': with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f: data = yaml.safe_load(f) # print(data) - case_info = CaseInfo(**data) - + case_info = TestCaseModel(**data) diff --git a/commons/settings.py b/commons/settings.py index 8034566..3b55e0c 100644 --- a/commons/settings.py +++ b/commons/settings.py @@ -22,6 +22,8 @@ cases_dir = rf"{root_path}\TestCases\answer" exchanger = rf"{root_path}\extract.yaml" id_path = rf"{root_path}\id.yaml" +test_suffix = "yaml" + db_host = os.getenv("DB_HOST") # ip db_port = os.getenv("DB_PORT") # 端口 db_user = os.getenv("DB_USER") # 用户名 diff --git a/main.py b/main.py index b5310e0..9b1657f 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import pytest from commons.cases import TestAPI -TestAPI.find_test_cases() # 加载yaml文件 +TestAPI.run() # 加载yaml文件 if __name__ == '__main__': now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') diff --git a/pyproject.toml b/pyproject.toml index e915c26..949a5c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ pymysql = "^1.1.1" pytest-result-log = "^1.2.2" allure-pytest = "^2.13.5" cryptography = "^44.0.2" +dotenv = "^0.9.9" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/pytest.ini b/pytest.ini index db1ffae..556fa84 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,7 +3,7 @@ addopts = -q --show-capture=no log_file = logs/pytest.log -log_file_level = info +log_file_level = debug log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s log_file_date_format = %m/%d/%Y %I:%M:%S %p