From 00791809df16e710d45869d524bdbb3d40414598 Mon Sep 17 00:00:00 2001 From: CNWei Date: Sat, 14 Mar 2026 11:45:52 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E5=BC=95=E6=93=8E=E4=B8=BA=E4=B8=8A=E4=B8=8B=E6=96=87?= =?UTF-8?q?=E9=A9=B1=E5=8A=A8=E6=9E=B6=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 优化 WorkflowExecutor 与 Exchange支持 ExecutionEnv 资源注入。 - 实现 Session 级别连接复用与变量池内存镜像化,消除重复 I/O 开销。 - 引入 ChainMap 实现动态上下文切换,解决参数化变量与全局提取变量的优先级覆盖。 - 完善变量提取与断言逻辑,确保跨用例变量流转的可靠性。 --- conftest.py | 24 +++- core/context.py | 40 +++++++ core/creator.py | 170 +++++++++------------------ core/exchange.py | 74 ++++++------ core/executor.py | 96 +++++++-------- core/models.py | 126 ++++++-------------- data/extract.yaml | 3 - test_cases/answer/test_1_status.yaml | 2 +- test_cases/test_collector.py | 30 +++++ 9 files changed, 276 insertions(+), 289 deletions(-) create mode 100644 core/context.py create mode 100644 test_cases/test_collector.py diff --git a/conftest.py b/conftest.py index 3a3c4bd..5d48d96 100644 --- a/conftest.py +++ b/conftest.py @@ -11,13 +11,35 @@ import logging from core import settings from commons.files import YamlFile -from core.models import CaseInfo +from core.context import VariableStore, ExecutionEnv +from core.executor import WorkflowExecutor +from core.models import RawSchema from core.session import Session from core.exchange import Exchange from core.settings import EXTRACT_CACHE logger = logging.getLogger(__name__) +@pytest.fixture(scope="session") +def api_env(): + """ + 工业级资源调度器 + 1. 保持全局单 Session (连接池复用) + 2. 变量池 L2 内存镜像化 (减少 I/O) + """ + # Setup: 加载环境 + store = VariableStore(settings.DATA_DIR / "extract.yaml") + exchanger = Exchange(variable_cache=store.store) + session = Session(settings.base_url) + executor = WorkflowExecutor() + + env = ExecutionEnv(session, store, executor, exchanger) + + yield env # 注入到测试用例中 + + # Teardown: 统一持久化与清理 + store.persist() + session.close() @pytest.fixture(scope="session") def session(): """全局共享的 Session Fixture""" diff --git a/core/context.py b/core/context.py new file mode 100644 index 0000000..a64ca13 --- /dev/null +++ b/core/context.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei,ChenWei +@Software: PyCharm +@contact: t6g888@163.com +@file: context +@date: 2026/3/14 09:07 +@desc: +""" +from dataclasses import dataclass +from typing import Dict, Any +from pathlib import Path + +from core.exchange import Exchange +from core.session import Session +from commons.file_processors.yaml_processor import YamlProcessor + + +class VariableStore: + """内存变量仓库:负责 L2 缓存与磁盘的唯一交互""" + + def __init__(self, seed_file: Path): + self.seed_file = seed_file + self.processor = YamlProcessor(seed_file) + # 启动时仅加载一次 + self.store: Dict[str, Any] = self.processor.load() or {} + + def persist(self): + """测试结束时统一写盘""" + self.processor.save(self.store) + + +@dataclass +class ExecutionEnv: + """环境上下文:持有共享资源""" + session: Session + store: VariableStore + exchanger: "Exchange" diff --git a/core/creator.py b/core/creator.py index 19adfff..ec88ee5 100644 --- a/core/creator.py +++ b/core/creator.py @@ -16,25 +16,20 @@ from pathlib import Path from dataclasses import dataclass from core import settings from core.executor import WorkflowExecutor -from core.session import Session -from core.exchange import Exchange - from pydantic import ValidationError -from commons.file_processors.yaml_processor import YamlProcessor as FileHandle,YamlLoadError -from core.models import CaseInfo # 导入之前定义的 Pydantic 模型 +from commons.file_processors.yaml_processor import YamlProcessor as FileHandle, YamlLoadError +from core.models import RawSchema # 导入之前定义的 Pydantic 模型 from typing import Any, List, Type, Generator, Union logger = logging.getLogger(__name__) -# 初始化全局组件 -session = Session(settings.base_url) -exchanger = Exchange(settings.DATA_DIR / "extract.yaml") # 指向 data/extract.yaml -executor = WorkflowExecutor(session, exchanger) + + @dataclass class CaseEntity: """用例执行实体:解耦模型数据与执行上下文""" - step_data: CaseInfo + step_data: RawSchema row_context: dict[str, Any] @@ -53,7 +48,7 @@ class CaseDataLoader: """ @staticmethod - def fetch_yaml_files(cases_dir: str) -> Generator[Path, None, None]: + def fetch_yaml_files(cases_dir: Union[str, Path]) -> Generator[Path, None, None]: """扫描目录并迭代返回 (文件路径, 原始内容)""" base_path = Path(cases_dir) if not base_path.exists(): @@ -68,7 +63,7 @@ class CaseDataLoader: 加载单个 YAML 文件并转化为 CaseInfo 列表 包含参数化数据的自动拆解逻辑 """ - entities = [] + entities: List[CaseEntity] = [] try: # 1. 使用重构后的 YamlProcessor 加载原始字典 processor = FileHandle(file_path) @@ -76,67 +71,45 @@ class CaseDataLoader: if not raw_data: return [] - # 1. 提取参数化数据 - parametrize_data = raw_data.pop("parametrize", None) - # 2. 实例化唯一的模板对象 (Pydantic 校验) - # 此时占位符 ${var} 会被 SmartInt/SmartDict 校验器放行 - template_case = CaseInfo(**raw_data) - - # 2. 检查是否存在参数化字段 - if parametrize_data and isinstance(parametrize_data, list) and len(parametrize_data) >= 2: - # 3. 参数化拆分 - headers = parametrize_data[0] - for row in parametrize_data[1:]: - row_map = dict(zip(headers, row)) - # 包装为实体,存入引用而非副本 - entities.append(CaseEntity(step_data=template_case, row_context=row_map)) - else: - # 普通用例,上下文为空 - entities.append(CaseEntity(step_data=template_case, row_context={})) + entities = cls._parse_parametrize(raw_data) except YamlLoadError: # YamlProcessor 已经记录了 error 日志,这里直接跳过 pass except ValidationError as e: - logger.error(f"用例格式校验失败 [{file_path.name}]:\n{e.json()}") + logger.error(f"用例基础格式校验失败 [{file_path.name}]:\n{e.json()}") except Exception as e: logger.error(f"加载用例发生未知异常 [{file_path.name}]: {e}") return entities @staticmethod - def _parse_parametrize(raw_data: dict[str, Any]) -> List[CaseInfo]: + def _parse_parametrize(raw_data: dict[str, Any]) -> List[CaseEntity]: """ 解析参数化逻辑:将 raw_data 中的 parametrize 展开为多个 CaseInfo 实例 """ - param_content = raw_data.pop("parametrize") - if len(param_content) < 2: - logger.warning(f"参数化数据不足(需包含 Header 和至少一行 Data): {raw_data.get('title')}") - return [CaseInfo(**raw_data)] + entities = [] + parametrize_data = raw_data.pop("parametrize", None) - # 第一行作为变量名 (Headers),后续作为数据行 - headers = param_content[0] - data_rows = param_content[1:] + # 2. 实例化唯一的模板对象 (Pydantic 校验) + template_case = RawSchema.model_validate(raw_data) + # template_case = CaseTemplate(**raw_data) - case_list = [] - for row in data_rows: - # 将变量名和对应行数据打包成字典,例如 {"username": "user1", "title": "测试1"} - row_map = dict(zip(headers, row)) + # 2. 检查是否存在参数化字段 + if parametrize_data and isinstance(parametrize_data, list) and len(parametrize_data) >= 2: + # 3. 参数化拆分 + headers = parametrize_data[0] + for row in parametrize_data[1:]: + row_map = dict(zip(headers, row)) + # 包装为实体,存入引用而非副本 + # 修正: 使用 model_copy() 避免多个用例共享同一个 Pydantic 模型实例,防止意外修改 + entities.append(CaseEntity(step_data=template_case.model_copy(), row_context=row_map)) + else: + # 普通用例,上下文为空 + entities.append(CaseEntity(step_data=template_case.model_copy(), row_context={})) - # 深拷贝原始模板,避免多行数据互相干扰 - case_tmp = raw_data.copy() - - # 关键优化:如果参数化里包含 'title',自动更新顶层的 title 字段 - # 注意:此处仅做初步合并,更复杂的 ${var} 替换由 WorkflowExecutor 的 Exchanger 完成 - if "title" in row_map: - case_tmp["title"] = row_map["title"] - - # 将当前行的数据注入到 CaseInfo 中(此处可以暂存在字段中,或由执行器处理) - # 为了保持模型兼容,我们把 row_map 的信息合入 case_tmp - case_list.append(CaseInfo(**case_tmp)) - - return case_list + return entities @classmethod def get_all_cases(cls, cases_dir: Union[str, Path]) -> List[CaseEntity]: @@ -149,19 +122,6 @@ class CaseDataLoader: all_cases.extend(cls.load_cases(file)) return all_cases - # @staticmethod - # def parse_parametrize(raw_data: Dict, default_name: str) -> Tuple[List[str], List[List[Any]], List[str]]: - # """解析参数化结构,返回 (字段名列表, 数据值列表, ID列表)""" - # if "parametrize" in raw_data: - # fields = raw_data["parametrize"][0] - # values = raw_data["parametrize"][1:] - # ids = [f"{v[0]}" for v in values] - # else: - # fields = ["case_data"] - # values = [[raw_data]] - # ids = [raw_data.get("title", default_name)] - # return fields, values, ids - class CaseGenerator: """ @@ -170,67 +130,43 @@ class CaseGenerator: """ @classmethod - def build_and_register(cls, target_cls: Type[TestTemplateBase], cases_dir: str): + def build_and_register(cls, target_cls: Type[TestTemplateBase], cases_dir: Union[str, Path]): # 1. 通过 Loader 获取数据 - all_cases=CaseDataLoader.get_all_cases(cases_dir) + all_cases = CaseDataLoader.get_all_cases(cases_dir) for index, case_info in enumerate(all_cases): - dynamic_test_method=cls._create_case_method(case_info) - # for file_path, raw_data in CaseDataLoader.get_all_cases(cases_dir): - # # 2. 解析参数化信息 - # fields, values, ids = CaseDataLoader.parse_parametrize(raw_data, file_path.stem) - # - # # 3. 生成执行函数 (闭包) - # dynamic_test_method = cls._create_case_method(raw_data, fields, values, ids) - # - # # 4. 挂载 - # method_name = f"test_{file_path.stem}" case_title = case_info.row_context.get("title") or case_info.step_data.title - method_name = f"test_case_{index}_{case_info.title[:20]}" - safe_name = "".join([c if c.isalnum() else "_" for c in method_name]) - # setattr(target_cls, method_name, dynamic_test_method) - setattr(target_cls, safe_name, dynamic_test_method) + dynamic_test_method = cls._create_case_method(title=case_title, entity=case_info) + + safe_title = "".join([c if c.isalnum() else "_" for c in case_title])[:50] + method_name = f"test_{index:03d}_{safe_title}" + print(method_name) + setattr(target_cls, method_name, dynamic_test_method) + print(target_cls.__dict__) logger.debug(f"Successfully registered: {method_name}") @staticmethod - # def _create_case_method(case_template: Dict, fields: List[str], values: List[Any], ids: List[str]): - def _create_case_method(entity: CaseEntity): + def _create_case_method(title, entity: CaseEntity): """封装具体的 pytest 执行节点""" case_template = entity.step_data context = entity.row_context - # 预取 Allure 层级信息 - # epic = case_template.get("epic", settings.allure_epic) - # feature = case_template.get("feature", settings.allure_feature) - # story = case_template.get("story", settings.allure_story) - epic = case_template.epic or settings.allure_epic - feature = case_template.feature or settings.allure_feature - story = case_template.story or settings.allure_story - - @allure.epic(epic) - @allure.feature(feature) - @allure.story(story) - # @pytest.mark.parametrize("case_args", values, ids=ids) - # def build_actual_case(instance: TestTemplateBase, case_args: List[Any]): - def build_actual_case(instance: TestTemplateBase): - # 数据组装 - # current_params = dict(zip(fields, case_args)) - # case_exec_data = {**case_template, **current_params} - # case_title = current_params.get("title", "未命名用例") - case_title = context.get("title") or case_template.title or "未命名用例" + def build_actual_case(instance: TestTemplateBase, api_env): + # --- 1. 动态设置 Allure 报告属性 --- + allure.dynamic.epic(case_template.epic or settings.allure_epic) + allure.dynamic.feature(case_template.feature or settings.allure_feature) + allure.dynamic.story(case_template.story or settings.allure_story) + allure.dynamic.title(title) # 日志记录 (利用 instance 标注来源) - logger.info(f"🚀 [Runner] Class: {instance.__class__.__name__} | Case: {case_title}") + logger.info(f"[Runner] Class: {instance.__class__.__name__} | Case: {title}") + try: + WorkflowExecutor.perform(case_template, api_env, context=context) - # 执行与断言 - allure.dynamic.title(case_title) - # executor.perform(case_exec_data) - executor.perform(case_template,context=context) - - # 手动链路装饰 (Allure) - # run_actual_case = allure.epic(epic)(run_actual_case) - # run_actual_case = allure.feature(feature)(run_actual_case) - # run_actual_case = allure.story(story)(run_actual_case) + except Exception as e: + # 可以在这里记录更详细的运行上下文快照 + logger.error(f"Case 执行失败: {title} | 错误: {e}") + raise return build_actual_case @@ -238,6 +174,6 @@ class CaseGenerator: if __name__ == '__main__': from settings import TEST_CASE_DIR - print(CaseDataLoader.get_all_cases(TEST_CASE_DIR)) + # print(CaseDataLoader.get_all_cases(TEST_CASE_DIR)) # --- 引导执行 --- - # CaseGenerator.build_and_register(TestTemplateBase, settings.TEST_CASE_DIR) + CaseGenerator.build_and_register(TestTemplateBase, settings.TEST_CASE_DIR) diff --git a/core/exchange.py b/core/exchange.py index eb0baf3..d2de678 100644 --- a/core/exchange.py +++ b/core/exchange.py @@ -12,8 +12,7 @@ from typing import Any, Union, TypeVar import jsonpath from lxml import etree - -from core.models import CaseInfo +from core.models import RawSchema from core.settings import EXTRACT_CACHE from core.templates import Template from commons.file_processors.yaml_processor import YamlProcessor @@ -25,16 +24,20 @@ T = TypeVar("T", bound=Union[dict, list, str, Any]) class Exchange: - def __init__(self, cache_path: str): - self.cache_path = cache_path - self.file_handler = YamlProcessor(filepath=self.cache_path) - # 1. 增加内存缓存,避免频繁磁盘 I/O - self._variable_cache = self.file_handler.load() or {} + def __init__(self, variable_cache: dict[str, Any]): + self._cache = variable_cache # 匹配标准变量 ${var},排除函数调用 ${func()} - # self.var_only_pattern = re.compile(r"\$\{([a-zA-Z_]\w*)}") self.var_only_pattern = re.compile(r"^\$\{([a-zA-Z_]\w*)}$") - def extract(self, resp, var_name: str, attr: str, expr: str, index: int = 0): + @property + def global_vars(self) -> dict: + return self._cache + + @global_vars.setter + def global_vars(self, global_vars: dict) -> None: + self._cache = global_vars + + def extract(self, resp: Any, var_name: str, attr: str, expr: str, index: int = 0): """ 从响应中提取数据并更新到缓存及文件 :param resp: Response 对象 @@ -63,8 +66,12 @@ class Exchange: res = jsonpath.jsonpath(target_data, expr) if res: value = res[index] elif expr.startswith("/") or expr.startswith("./"): # XPath 模式 + html_content = getattr(resp, "text", "") # 使用 getattr 防护 + if not html_content: + logger.warning("XPath 提取失败:响应文本为空") + return # 将文本解析为 HTML 树 - html_content = resp.text + # html_content = resp.text tree = etree.HTML(html_content) res = tree.xpath(expr) if res: @@ -79,8 +86,7 @@ class Exchange: logger.warning(f"变量 [{var_name}] 未通过表达式 [{expr}] 提取到数据") value = "not data" - self._variable_cache[var_name] = value - self.file_handler.save(self._variable_cache) + self._cache[var_name] = value logger.info(f"变量提取成功: {var_name} -> {value} (Type: {type(value).__name__})") except Exception as e: @@ -103,13 +109,13 @@ class Exchange: if full_match: var_name = full_match.group(1) - return self._variable_cache.get(var_name, content) + return self._cache.get(var_name, content) # B. 场景:混合文本或函数调用 # 例子:"Bearer ${token}" 或 "${gen_phone()}" if "${" in content: # 调用你提供的 Template 类 - return Template(content).render(self._variable_cache) + return Template(content).render(self._cache) return content @@ -129,13 +135,12 @@ class Exchange: if __name__ == "__main__": - from core.models import CaseInfo, RequestModel + from core.models import RawSchema, HttpAction - # 模拟外部写入一个初始变量 - with open(EXTRACT_CACHE, "w") as f: - f.write("existing_var: '100'\n") + file_handler = YamlProcessor(filepath=EXTRACT_CACHE) + variable_cache_ = file_handler.load() or {} - ex = Exchange(EXTRACT_CACHE) + ex = Exchange(variable_cache_) # --- 场景 1: 变量提取验证 --- @@ -157,7 +162,7 @@ if __name__ == "__main__": # 定义一个复杂的 CaseInfo raw_case = { "title": "测试用例", - "request": { + "action": { "method": "POST", "url": "http://api.com/${token}", # 混合文本 -> 应转为 str "json_body": { @@ -170,20 +175,21 @@ if __name__ == "__main__": } print("\n>>> 执行替换...") - new_case = ex.replace(raw_case) - print(new_case) - new_case = CaseInfo(**new_case) - - # --- 校验结果 --- + new_case_one = ex.replace(raw_case) + print(new_case_one) + RawSchema(**new_case_one) + print(new_case_one.get("action")) + action = HttpAction(**new_case_one.get("action")) + print(action) + # # --- 校验结果 --- print("\n--- 验证结果 ---") - print(f"URL (混合文本): {new_case.request.url} | 类型: {type(new_case.request.url)}") - print(f"ID (类型保持): {new_case.request.json_body['id']} | 类型: {type(new_case.request.json_body['id'])}") - print(f"Timeout (自动转换): {new_case.request.timeout} | 类型: {type(new_case.request.timeout)}") + print(f"URL (混合文本): {action.url} | 类型: {type(action.url)}") + print(f"ID (类型保持): {action.json_body['id']} | 类型: {type(action.json_body['id'])}") + print(f"Timeout (自动转换): {action.timeout} | 类型: {type(action.timeout)}") + # # + assert isinstance(action.json_body['id'], int) + # # + assert action.url == "http://api.com/auth_123" + assert action.timeout == 100 - assert isinstance(new_case.request.json_body['id'], int) - - assert new_case.request.url == "http://api.com/auth_123" - assert new_case.request.timeout == 100 - - # if os.path.exists(cache_path): os.remove(cache_path) print("\nExchange 场景全部验证通过!") diff --git a/core/executor.py b/core/executor.py index 27d27b2..0c75d4c 100644 --- a/core/executor.py +++ b/core/executor.py @@ -8,11 +8,12 @@ import logging import importlib from typing import Any, List, Optional - +from collections import ChainMap from pydantic import TypeAdapter from core import settings -from core.models import CaseInfo, ValidateItem, RequestModel, ApiActionModel +from core.context import ExecutionEnv +from core.models import RawSchema, ValidateItem, HttpAction, ApiActionModel from core.session import Session from core.exchange import Exchange from utils.case_validator import CaseValidator @@ -24,61 +25,54 @@ VALIDATE_LIST_ADAPTER = TypeAdapter(List[ValidateItem]) class WorkflowExecutor: - def __init__(self, session: Session, exchanger: Exchange): - self.session = session - self.exchanger = exchanger - def perform(self, case_info: CaseInfo,context: Optional[dict[str, Any]] = None) -> Any: + @classmethod + def perform(cls, case_info: RawSchema, env: ExecutionEnv, context: Optional[dict[str, Any]] = None) -> Any: """执行单个用例:支持直接请求和PO模式调用""" context = context or {} + # --- 重点 1:备份并切换上下文 --- + # 保存 Exchange 当前的全局字典引用 + original_cache = env.exchanger.global_vars + # 1. 建立优先级变量池 (参数化变量 > 全局提取变量) + # ChainMap 是实现“局部覆盖全局”性能最好的方案 + combined_vars = ChainMap(context, original_cache) + # 将 Exchange 的内部缓存临时指向这个合并池 + env.exchanger.global_vars = combined_vars - # 1. 局部变量优先级注入 - # 备份全局缓存,将当前行数据合并进去 - old_cache = self.exchanger._variable_cache.copy() - self.exchanger._variable_cache.update(context) - + resp = None # 初始化 resp,避免异常时引用未定义 try: # 2. 动态更新标题(如果 context 中包含 title) current_title = context.get("title") or case_info.title logger.info(f"🚀 执行用例: {current_title}") - # raw_data = case_info.model_dump(by_alias=True, exclude_none=True) - # 1. 变量替换(将 ${var} 替换为真实值) - # rendered_dict = self.exchanger.replace(raw_data) - # rendered_case = CaseInfo.model_validate(rendered_dict) + raw_action_dict = case_info.action.model_dump(by_alias=True, exclude_none=True) + rendered_action_dict = env.exchanger.replace(raw_action_dict) + # --- 2. 决定执行模式 --- if case_info.is_po_mode(): - # PO 模式:仅渲染 api_action - action_dict = case_info.api_action.model_dump(by_alias=True, exclude_none=True) - rendered_action_dict = self.exchanger.replace(action_dict) # 重新校验以修复类型(如 params 里的 int) rendered_action = ApiActionModel.model_validate(rendered_action_dict) # PO 模式:反射调用 - - resp = self._execute_po_method(action=rendered_action) + resp = cls._execute_po_method(rendered_action, env) else: # 接口模式:直接请求 - # 直接将 RequestModel 转为字典传给 session.request - request_kwargs = case_info.request.model_dump(by_alias=True, exclude_none=True) - rendered_req_dict = self.exchanger.replace(request_kwargs) - rendered_request = RequestModel.model_validate(rendered_req_dict) - + rendered_request = HttpAction.model_validate(rendered_action_dict) request_kwargs = rendered_request.model_dump(by_alias=True, exclude_none=True) - resp = self.session.request(**request_kwargs) + resp = env.session.request(**request_kwargs) - # --- 3. 后置处理 (提取 & 断言) --- - self._post_process(resp, case_info) + # --- 3. 后处理:提取与断言 --- + cls._post_process(resp, case_info, env, original_cache) return resp except Exception as e: logger.error(f"用例执行失败: {case_info.title} | 原因: {e}", exc_info=True) raise finally: - # 4. 关键:清理现场,还原全局变量池 - self.exchanger._variable_cache = old_cache + # 兜底确保环境还原 (尽管 try 块中已经还原了一次,这里确保异常情况下也复位) + env.exchanger.global_vars = original_cache - - def _execute_po_method(self, action: ApiActionModel): + @staticmethod + def _execute_po_method(action: ApiActionModel, env: ExecutionEnv): """核心反射逻辑:根据字符串动态加载 api/ 目录下的类并执行方法""" class_name = action.api_class method_name = action.method @@ -94,15 +88,13 @@ class WorkflowExecutor: try: # 1. 动态导入模块(假设都在 api 目录下) - # 例如 class_name 是 UserAPI,则尝试从 api.user 导入 - # 这里简单处理,你可以根据你的文件名约定进一步优化逻辑 - # module_name = f"api.{class_name.lower().replace('api', '')}" module = importlib.import_module(module_name) # 2. 获取类并实例化 cls = getattr(module, class_name) - api_instance = cls(self.session) # 传入 session 保持会话统一 + + api_instance = cls(env.session) # 传入 session 保持会话统一 # 3. 调用方法并返回结果 method = getattr(api_instance, method_name) @@ -118,21 +110,33 @@ class WorkflowExecutor: logger.error(f"反射调用失败: {class_name}.{method_name} -> {e}") raise - def _post_process(self, resp: Any, rendered_case: CaseInfo): - # 3. 提取变量 (接口关联) - if rendered_case.extract: - for var_name, extract_info in rendered_case.extract.items(): - self.exchanger.extract(resp, var_name, *extract_info) + @classmethod + def _post_process(cls, resp: Any, case_info: RawSchema, env: ExecutionEnv, original_cache: dict): + """ + 统一后处理逻辑:处理变量提取(写全局)和断言校验(读局部+全局) + """ + # 记录当前的混合上下文 (ChainMap),供断言使用 + combined_vars = env.exchanger.global_vars - # 4. 断言校验 - if rendered_case.validate_data: - # raw_validate_list = [i.model_dump(by_alias=True) for i in rendered_case.validate_data] + # 1. 变量提取 (Write Operation) + if case_info.extract: + try: + # 必须切回 original_cache 才能持久化写入到全局变量池 + env.exchanger.global_vars = original_cache + for var_name, extract_info in case_info.extract.items(): + env.exchanger.extract(resp, var_name, *extract_info) + finally: + # 提取完成后,切回 combined_vars,防止后续逻辑(如断言)丢失局部变量上下文 + env.exchanger.global_vars = combined_vars + + # 2. 断言校验 (Read Operation) + if case_info.validate_data: raw_validate_list = [ item.model_dump(by_alias=True) if isinstance(item, ValidateItem) else item - for item in rendered_case.validate_data + for item in case_info.validate_data ] - rendered_validate_list = self.exchanger.replace(raw_validate_list) + rendered_validate_list = env.exchanger.replace(raw_validate_list) # 重新通过 Adapter 触发类型修复 (str -> int) final_validate_data = VALIDATE_LIST_ADAPTER.validate_python(rendered_validate_list) CaseValidator.validate(resp, final_validate_data) diff --git a/core/models.py b/core/models.py index 5f64e91..cc3d376 100644 --- a/core/models.py +++ b/core/models.py @@ -10,127 +10,79 @@ @desc: 声明yaml用例格式 """ import logging -from typing import List, Any, Optional, Union, Annotated +from typing import List, Any -import yaml -from pydantic import BaseModel, Field, ConfigDict, model_validator, field_validator, AfterValidator - -from core import settings +from pydantic import BaseModel, Field, ConfigDict logger = logging.getLogger(__name__) -def smart_cast_int(v: Any) -> Any: - if isinstance(v, str) and v.startswith("${") and v.endswith("}"): - return v - try: - return int(v) - except (ValueError, TypeError): - return v - - -def smart_cast_dict(v: Any) -> Any: - """确保字典格式,若是占位符(字符串形式)则放行""" - if isinstance(v, str) and v.startswith("${") and v.endswith("}"): - return v - if isinstance(v, dict) or v is None: - return v - return v # 也可以根据需求抛出异常 - - -# 使用 Annotated 定义带校验的类型 -SmartInt = Annotated[Union[int, str], AfterValidator(smart_cast_int)] -SmartDict = Annotated[Union[dict[str, Any], str], AfterValidator(smart_cast_dict)] - - -# --- 基础请求模型 (用于第一种示例:直接请求) --- -class RequestModel(BaseModel): +class HttpAction(BaseModel): method: str = Field(..., description="HTTP 请求方法: get, post, etc.") url: str = Field(..., description="接口路径或完整 URL") - params: Optional[SmartDict] = None - data: Optional[SmartDict] = None - json_body: Optional[Any] = Field(None, alias="json") - headers: Optional[SmartDict] = None - cookies: Optional[dict[str, str]] = None - timeout: SmartInt = Field(default=10) - files: Optional[SmartDict] = None + headers: dict[str, Any] | None = Field(default=None, description="HTTP 请求头") + params: dict[str, Any] | None = Field(default=None, description="URL 查询参数") + data: dict[str, Any] | None = None + json_body: Any | None = Field(default=None, alias="json") + timeout: int = 10 + files: dict[str, Any] | None = None - model_config = ConfigDict(extra="allow", populate_by_name=True) # 允许扩展 requests 的其他参数 + model_config = ConfigDict(extra="allow", populate_by_name=True) -# --- PO 动作模型 (用于第二种示例:业务层反射调用) --- class ApiActionModel(BaseModel): - api_class: str = Field(..., alias="class", description="要调用的 API 类名") + module: str = Field(..., alias="class", description="要调用的 API 类名") method: str = Field(..., description="类中的方法名") - params: Optional[SmartDict] = Field(default_factory=dict, description="传给方法的参数") + params: dict[str, Any] = Field(default_factory=dict, description="传给方法的参数") model_config = ConfigDict(populate_by_name=True) -# --- 新增:断言条目模型 --- class ValidateItem(BaseModel): - check: Any = Field(..., description="要检查的字段或表达式") + check: str = Field(..., description="要检查的字段或表达式") + assert_method: str = Field(alias="assert", default="equals") expect: Any = Field(..., description="期望值") - assert_method: str = Field(default="equals", alias="assert", description="断言方法") - msg: Optional[str] = Field(default="Assertion", description="断言描述") + msg: str = Field(default="Assertion", description="断言描述") model_config = ConfigDict(populate_by_name=True) -# --- 核心用例数据模型 --- -class CaseInfo(BaseModel): - # 公共元数据 +class RawSchema(BaseModel): title: str = Field(..., description="用例标题") - epic: Optional[str] = None - feature: Optional[str] = None - story: Optional[str] = None - - # 核心逻辑分叉:可以是 request 对象,也可以是 api_action 对象 - # 根据 WorkflowExecutor 的逻辑,这里设为 Optional,但在具体校验时可以互斥 - request: Optional[RequestModel] = None - api_action: Optional[ApiActionModel] = None - - # 后置处理 - extract: Optional[dict[str, List[Any]]] = Field( + epic: str | None = None + feature: str | None = None + story: str | None = None + # 统一使用 action 字段承载业务逻辑 (Http 或 PO) + action: dict[str, Any] = Field(description="请求内容或PO动作内容") + extract: dict[str, List[Any]] | None = Field( default=None, description="变量提取表达式,格式: {变量名: [来源, 表达式, 索引]}" ) - validate_data: Optional[List[Union[ValidateItem, dict[str, Any]]]] = Field( + validate_data: List[Any] = Field( default_factory=list, alias="validate", description="断言信息" ) - # 参数化(在 DataLoader 阶段会被拆解,但在初始加载时需要定义) - parametrize: Optional[List[List[Any]]] = None - - model_config = ConfigDict( - populate_by_name=True, # 无论是在代码中用 api_class 还是在 YAML 中用 class 赋值,Pydantic 都能正确识别。 - arbitrary_types_allowed=True # 允许在模型中使用非 Pydantic 标准类型(如自定义类实例) - ) - - # 核心优化:增加互斥校验 - @model_validator(mode='after') - def check_action_type(self) -> 'CaseInfo': - if not self.request and not self.api_action: - raise ValueError("用例必须包含 'request' 或 'api_action' 其中之一") - if self.request and self.api_action: - raise ValueError("'request' 和 'api_action' 不能同时存在") - return self + model_config = ConfigDict(extra="allow", + populate_by_name=True, # 无论是在代码中用 api_class 还是在 YAML 中用 class 赋值,Pydantic 都能正确识别。 + arbitrary_types_allowed=True # 允许在模型中使用非 Pydantic 标准类型(如自定义类实例) + ) # 允许参数化等额外字段 def is_po_mode(self) -> bool: """判断是否为 PO 模式""" - return self.api_action is not None + return "class" in self.action or "module" in self.action if __name__ == '__main__': # 模拟数据 1:标准请求模式 raw_case_1 = { "title": "查询状态信息", - "request": { + "action": { "method": "get", "url": "/api/v1/info", - "headers": {"User-Agent": "pytest-ai"} + "headers": {"User-Agent": "pytest-ai"}, + "json": {"User-Agent": "pytest-ai"} }, "validate": [ {"check": "status_code", "assert": "equals", "expect": 200, "msg": "响应码200"}, @@ -141,7 +93,7 @@ if __name__ == '__main__': # 模拟数据 2:PO 模式 (反射调用) raw_case_2 = { "title": "用户登录测试", - "api_action": { + "action": { "class": "UserAPI", "method": "login", "params": {"user": "admin", "pwd": "123"} @@ -155,22 +107,22 @@ if __name__ == '__main__': try: # 验证模式 1 - case1 = CaseInfo(**raw_case_1) + case1 = RawSchema(**raw_case_1) print(f"✅ 模式1 (Request) 校验通过: {case1.title}") - print(f" 请求URL: {case1.request.url}") - print(f" 第一个断言方法: {case1.validate_data[0].assert_method}\n") + print(f" http: {case1.action}") + print(f" 断言规则数: {len(case1.validate_data)}\n") # 验证模式 2 - case2 = CaseInfo(**raw_case_2) + case2 = RawSchema(**raw_case_2) print(f"✅ 模式2 (PO Mode) 校验通过: {case2.title}") - print(f" 调用类: {case2.api_action.api_class}") + print(f" api: {case2.action}") print(f" 提取规则数: {len(case2.extract)}\n") # 验证非法数据(如:既没有 request 也没有 api_action 的情况可以在业务层进一步校验) # 这里演示 Pydantic 自动类型转换 - invalid_data = {"title": "错误用例", "request": {"url": "/api"}} # 缺少 method + invalid_data = {"title": "错误用例", "action": {"url": "/api"}} # 缺少 method print("--- 预期失败测试 ---") - CaseInfo(**invalid_data) + RawSchema(**invalid_data) except Exception as e: print(f"❌ 预期内的校验失败: \n{e}") diff --git a/data/extract.yaml b/data/extract.yaml index 3591633..503493e 100644 --- a/data/extract.yaml +++ b/data/extract.yaml @@ -1,4 +1 @@ existing_var: '100' -token: auth_123 -u_id: 888 -user_name: ChenWei diff --git a/test_cases/answer/test_1_status.yaml b/test_cases/answer/test_1_status.yaml index 7af0400..4085d03 100644 --- a/test_cases/answer/test_1_status.yaml +++ b/test_cases/answer/test_1_status.yaml @@ -2,7 +2,7 @@ feature: 页面状态 story: 状态 title: 查询状态信息 epic: 的点点滴滴 -request: +action: method: get url: /answer/api/v1/connector/info headers: diff --git a/test_cases/test_collector.py b/test_cases/test_collector.py new file mode 100644 index 0000000..53c166a --- /dev/null +++ b/test_cases/test_collector.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +# coding=utf-8 + +import logging +from core import settings +from core.creator import CaseGenerator, TestTemplateBase + +logger = logging.getLogger(__name__) + + +class TestRunner(TestTemplateBase): + """ + 测试用例的动态容器 (Test Case Container)。 + 这是一个占位符类,CaseGenerator 会扫描所有的 YAML 用例文件, + 然后将每一个用例动态地生成为一个测试方法并挂载到这个类上。 + Pytest 最终会发现并执行这些动态挂载的 test_* 方法。 + """ + pass + + +try: + # --- 核心逻辑:动态生成测试用例 --- + # 当 Pytest 在“收集测试用例”阶段加载此模块时,下面的代码会立即执行。 + logger.info("--- [Collector] 开始扫描并动态生成测试用例 ---") + CaseGenerator.build_and_register(target_cls=TestRunner, cases_dir=settings.TEST_CASE_DIR) + logger.info(f"--- [Collector] 测试用例生成完毕,已成功加载到 {TestRunner.__name__} ---") +except Exception as e: + logger.critical(f"--- [Collector] 动态生成测试用例时发生致命错误,测试执行中止 ---", exc_info=True) + # 抛出异常,让 pytest 捕获并报告为收集错误 (Collection Error) + raise RuntimeError("测试用例收集失败,请检查日志中的详细错误信息。") from e \ No newline at end of file