refactor: 重构执行引擎为上下文驱动架构

- 优化 WorkflowExecutor 与 Exchange支持 ExecutionEnv 资源注入。
 - 实现 Session 级别连接复用与变量池内存镜像化,消除重复 I/O 开销。
 - 引入 ChainMap 实现动态上下文切换,解决参数化变量与全局提取变量的优先级覆盖。
 - 完善变量提取与断言逻辑,确保跨用例变量流转的可靠性。
This commit is contained in:
2026-03-14 11:45:52 +08:00
parent 2116016a0d
commit 00791809df
9 changed files with 276 additions and 289 deletions

View File

@@ -11,13 +11,35 @@ import logging
from core import settings
from commons.files import YamlFile
from core.models import CaseInfo
from core.context import VariableStore, ExecutionEnv
from core.executor import WorkflowExecutor
from core.models import RawSchema
from core.session import Session
from core.exchange import Exchange
from core.settings import EXTRACT_CACHE
logger = logging.getLogger(__name__)
@pytest.fixture(scope="session")
def api_env():
"""
工业级资源调度器
1. 保持全局单 Session (连接池复用)
2. 变量池 L2 内存镜像化 (减少 I/O)
"""
# Setup: 加载环境
store = VariableStore(settings.DATA_DIR / "extract.yaml")
exchanger = Exchange(variable_cache=store.store)
session = Session(settings.base_url)
executor = WorkflowExecutor()
env = ExecutionEnv(session, store, executor, exchanger)
yield env # 注入到测试用例中
# Teardown: 统一持久化与清理
store.persist()
session.close()
@pytest.fixture(scope="session")
def session():
"""全局共享的 Session Fixture"""

40
core/context.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com
@file: context
@date: 2026/3/14 09:07
@desc:
"""
from dataclasses import dataclass
from typing import Dict, Any
from pathlib import Path
from core.exchange import Exchange
from core.session import Session
from commons.file_processors.yaml_processor import YamlProcessor
class VariableStore:
"""内存变量仓库:负责 L2 缓存与磁盘的唯一交互"""
def __init__(self, seed_file: Path):
self.seed_file = seed_file
self.processor = YamlProcessor(seed_file)
# 启动时仅加载一次
self.store: Dict[str, Any] = self.processor.load() or {}
def persist(self):
"""测试结束时统一写盘"""
self.processor.save(self.store)
@dataclass
class ExecutionEnv:
"""环境上下文:持有共享资源"""
session: Session
store: VariableStore
exchanger: "Exchange"

View File

@@ -16,25 +16,20 @@ from pathlib import Path
from dataclasses import dataclass
from core import settings
from core.executor import WorkflowExecutor
from core.session import Session
from core.exchange import Exchange
from pydantic import ValidationError
from commons.file_processors.yaml_processor import YamlProcessor as FileHandle,YamlLoadError
from core.models import CaseInfo # 导入之前定义的 Pydantic 模型
from commons.file_processors.yaml_processor import YamlProcessor as FileHandle, YamlLoadError
from core.models import RawSchema # 导入之前定义的 Pydantic 模型
from typing import Any, List, Type, Generator, Union
logger = logging.getLogger(__name__)
# 初始化全局组件
session = Session(settings.base_url)
exchanger = Exchange(settings.DATA_DIR / "extract.yaml") # 指向 data/extract.yaml
executor = WorkflowExecutor(session, exchanger)
@dataclass
class CaseEntity:
"""用例执行实体:解耦模型数据与执行上下文"""
step_data: CaseInfo
step_data: RawSchema
row_context: dict[str, Any]
@@ -53,7 +48,7 @@ class CaseDataLoader:
"""
@staticmethod
def fetch_yaml_files(cases_dir: str) -> Generator[Path, None, None]:
def fetch_yaml_files(cases_dir: Union[str, Path]) -> Generator[Path, None, None]:
"""扫描目录并迭代返回 (文件路径, 原始内容)"""
base_path = Path(cases_dir)
if not base_path.exists():
@@ -68,7 +63,7 @@ class CaseDataLoader:
加载单个 YAML 文件并转化为 CaseInfo 列表
包含参数化数据的自动拆解逻辑
"""
entities = []
entities: List[CaseEntity] = []
try:
# 1. 使用重构后的 YamlProcessor 加载原始字典
processor = FileHandle(file_path)
@@ -76,67 +71,45 @@ class CaseDataLoader:
if not raw_data:
return []
# 1. 提取参数化数据
parametrize_data = raw_data.pop("parametrize", None)
# 2. 实例化唯一的模板对象 (Pydantic 校验)
# 此时占位符 ${var} 会被 SmartInt/SmartDict 校验器放行
template_case = CaseInfo(**raw_data)
# 2. 检查是否存在参数化字段
if parametrize_data and isinstance(parametrize_data, list) and len(parametrize_data) >= 2:
# 3. 参数化拆分
headers = parametrize_data[0]
for row in parametrize_data[1:]:
row_map = dict(zip(headers, row))
# 包装为实体,存入引用而非副本
entities.append(CaseEntity(step_data=template_case, row_context=row_map))
else:
# 普通用例,上下文为空
entities.append(CaseEntity(step_data=template_case, row_context={}))
entities = cls._parse_parametrize(raw_data)
except YamlLoadError:
# YamlProcessor 已经记录了 error 日志,这里直接跳过
pass
except ValidationError as e:
logger.error(f"用例格式校验失败 [{file_path.name}]:\n{e.json()}")
logger.error(f"用例基础格式校验失败 [{file_path.name}]:\n{e.json()}")
except Exception as e:
logger.error(f"加载用例发生未知异常 [{file_path.name}]: {e}")
return entities
@staticmethod
def _parse_parametrize(raw_data: dict[str, Any]) -> List[CaseInfo]:
def _parse_parametrize(raw_data: dict[str, Any]) -> List[CaseEntity]:
"""
解析参数化逻辑:将 raw_data 中的 parametrize 展开为多个 CaseInfo 实例
"""
param_content = raw_data.pop("parametrize")
if len(param_content) < 2:
logger.warning(f"参数化数据不足(需包含 Header 和至少一行 Data: {raw_data.get('title')}")
return [CaseInfo(**raw_data)]
entities = []
parametrize_data = raw_data.pop("parametrize", None)
# 第一行作为变量名 (Headers),后续作为数据行
headers = param_content[0]
data_rows = param_content[1:]
# 2. 实例化唯一的模板对象 (Pydantic 校验)
template_case = RawSchema.model_validate(raw_data)
# template_case = CaseTemplate(**raw_data)
case_list = []
for row in data_rows:
# 将变量名和对应行数据打包成字典,例如 {"username": "user1", "title": "测试1"}
row_map = dict(zip(headers, row))
# 2. 检查是否存在参数化字段
if parametrize_data and isinstance(parametrize_data, list) and len(parametrize_data) >= 2:
# 3. 参数化拆分
headers = parametrize_data[0]
for row in parametrize_data[1:]:
row_map = dict(zip(headers, row))
# 包装为实体,存入引用而非副本
# 修正: 使用 model_copy() 避免多个用例共享同一个 Pydantic 模型实例,防止意外修改
entities.append(CaseEntity(step_data=template_case.model_copy(), row_context=row_map))
else:
# 普通用例,上下文为空
entities.append(CaseEntity(step_data=template_case.model_copy(), row_context={}))
# 深拷贝原始模板,避免多行数据互相干扰
case_tmp = raw_data.copy()
# 关键优化:如果参数化里包含 'title',自动更新顶层的 title 字段
# 注意:此处仅做初步合并,更复杂的 ${var} 替换由 WorkflowExecutor 的 Exchanger 完成
if "title" in row_map:
case_tmp["title"] = row_map["title"]
# 将当前行的数据注入到 CaseInfo 中(此处可以暂存在字段中,或由执行器处理)
# 为了保持模型兼容,我们把 row_map 的信息合入 case_tmp
case_list.append(CaseInfo(**case_tmp))
return case_list
return entities
@classmethod
def get_all_cases(cls, cases_dir: Union[str, Path]) -> List[CaseEntity]:
@@ -149,19 +122,6 @@ class CaseDataLoader:
all_cases.extend(cls.load_cases(file))
return all_cases
# @staticmethod
# def parse_parametrize(raw_data: Dict, default_name: str) -> Tuple[List[str], List[List[Any]], List[str]]:
# """解析参数化结构,返回 (字段名列表, 数据值列表, ID列表)"""
# if "parametrize" in raw_data:
# fields = raw_data["parametrize"][0]
# values = raw_data["parametrize"][1:]
# ids = [f"{v[0]}" for v in values]
# else:
# fields = ["case_data"]
# values = [[raw_data]]
# ids = [raw_data.get("title", default_name)]
# return fields, values, ids
class CaseGenerator:
"""
@@ -170,67 +130,43 @@ class CaseGenerator:
"""
@classmethod
def build_and_register(cls, target_cls: Type[TestTemplateBase], cases_dir: str):
def build_and_register(cls, target_cls: Type[TestTemplateBase], cases_dir: Union[str, Path]):
# 1. 通过 Loader 获取数据
all_cases=CaseDataLoader.get_all_cases(cases_dir)
all_cases = CaseDataLoader.get_all_cases(cases_dir)
for index, case_info in enumerate(all_cases):
dynamic_test_method=cls._create_case_method(case_info)
# for file_path, raw_data in CaseDataLoader.get_all_cases(cases_dir):
# # 2. 解析参数化信息
# fields, values, ids = CaseDataLoader.parse_parametrize(raw_data, file_path.stem)
#
# # 3. 生成执行函数 (闭包)
# dynamic_test_method = cls._create_case_method(raw_data, fields, values, ids)
#
# # 4. 挂载
# method_name = f"test_{file_path.stem}"
case_title = case_info.row_context.get("title") or case_info.step_data.title
method_name = f"test_case_{index}_{case_info.title[:20]}"
safe_name = "".join([c if c.isalnum() else "_" for c in method_name])
# setattr(target_cls, method_name, dynamic_test_method)
setattr(target_cls, safe_name, dynamic_test_method)
dynamic_test_method = cls._create_case_method(title=case_title, entity=case_info)
safe_title = "".join([c if c.isalnum() else "_" for c in case_title])[:50]
method_name = f"test_{index:03d}_{safe_title}"
print(method_name)
setattr(target_cls, method_name, dynamic_test_method)
print(target_cls.__dict__)
logger.debug(f"Successfully registered: {method_name}")
@staticmethod
# def _create_case_method(case_template: Dict, fields: List[str], values: List[Any], ids: List[str]):
def _create_case_method(entity: CaseEntity):
def _create_case_method(title, entity: CaseEntity):
"""封装具体的 pytest 执行节点"""
case_template = entity.step_data
context = entity.row_context
# 预取 Allure 层级信息
# epic = case_template.get("epic", settings.allure_epic)
# feature = case_template.get("feature", settings.allure_feature)
# story = case_template.get("story", settings.allure_story)
epic = case_template.epic or settings.allure_epic
feature = case_template.feature or settings.allure_feature
story = case_template.story or settings.allure_story
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
# @pytest.mark.parametrize("case_args", values, ids=ids)
# def build_actual_case(instance: TestTemplateBase, case_args: List[Any]):
def build_actual_case(instance: TestTemplateBase):
# 数据组装
# current_params = dict(zip(fields, case_args))
# case_exec_data = {**case_template, **current_params}
# case_title = current_params.get("title", "未命名用例")
case_title = context.get("title") or case_template.title or "未命名用例"
def build_actual_case(instance: TestTemplateBase, api_env):
# --- 1. 动态设置 Allure 报告属性 ---
allure.dynamic.epic(case_template.epic or settings.allure_epic)
allure.dynamic.feature(case_template.feature or settings.allure_feature)
allure.dynamic.story(case_template.story or settings.allure_story)
allure.dynamic.title(title)
# 日志记录 (利用 instance 标注来源)
logger.info(f"🚀 [Runner] Class: {instance.__class__.__name__} | Case: {case_title}")
logger.info(f"[Runner] Class: {instance.__class__.__name__} | Case: {title}")
try:
WorkflowExecutor.perform(case_template, api_env, context=context)
# 执行与断言
allure.dynamic.title(case_title)
# executor.perform(case_exec_data)
executor.perform(case_template,context=context)
# 手动链路装饰 (Allure)
# run_actual_case = allure.epic(epic)(run_actual_case)
# run_actual_case = allure.feature(feature)(run_actual_case)
# run_actual_case = allure.story(story)(run_actual_case)
except Exception as e:
# 可以在这里记录更详细的运行上下文快照
logger.error(f"Case 执行失败: {title} | 错误: {e}")
raise
return build_actual_case
@@ -238,6 +174,6 @@ class CaseGenerator:
if __name__ == '__main__':
from settings import TEST_CASE_DIR
print(CaseDataLoader.get_all_cases(TEST_CASE_DIR))
# print(CaseDataLoader.get_all_cases(TEST_CASE_DIR))
# --- 引导执行 ---
# CaseGenerator.build_and_register(TestTemplateBase, settings.TEST_CASE_DIR)
CaseGenerator.build_and_register(TestTemplateBase, settings.TEST_CASE_DIR)

View File

@@ -12,8 +12,7 @@ from typing import Any, Union, TypeVar
import jsonpath
from lxml import etree
from core.models import CaseInfo
from core.models import RawSchema
from core.settings import EXTRACT_CACHE
from core.templates import Template
from commons.file_processors.yaml_processor import YamlProcessor
@@ -25,16 +24,20 @@ T = TypeVar("T", bound=Union[dict, list, str, Any])
class Exchange:
def __init__(self, cache_path: str):
self.cache_path = cache_path
self.file_handler = YamlProcessor(filepath=self.cache_path)
# 1. 增加内存缓存,避免频繁磁盘 I/O
self._variable_cache = self.file_handler.load() or {}
def __init__(self, variable_cache: dict[str, Any]):
self._cache = variable_cache
# 匹配标准变量 ${var},排除函数调用 ${func()}
# self.var_only_pattern = re.compile(r"\$\{([a-zA-Z_]\w*)}")
self.var_only_pattern = re.compile(r"^\$\{([a-zA-Z_]\w*)}$")
def extract(self, resp, var_name: str, attr: str, expr: str, index: int = 0):
@property
def global_vars(self) -> dict:
return self._cache
@global_vars.setter
def global_vars(self, global_vars: dict) -> None:
self._cache = global_vars
def extract(self, resp: Any, var_name: str, attr: str, expr: str, index: int = 0):
"""
从响应中提取数据并更新到缓存及文件
:param resp: Response 对象
@@ -63,8 +66,12 @@ class Exchange:
res = jsonpath.jsonpath(target_data, expr)
if res: value = res[index]
elif expr.startswith("/") or expr.startswith("./"): # XPath 模式
html_content = getattr(resp, "text", "") # 使用 getattr 防护
if not html_content:
logger.warning("XPath 提取失败:响应文本为空")
return
# 将文本解析为 HTML 树
html_content = resp.text
# html_content = resp.text
tree = etree.HTML(html_content)
res = tree.xpath(expr)
if res:
@@ -79,8 +86,7 @@ class Exchange:
logger.warning(f"变量 [{var_name}] 未通过表达式 [{expr}] 提取到数据")
value = "not data"
self._variable_cache[var_name] = value
self.file_handler.save(self._variable_cache)
self._cache[var_name] = value
logger.info(f"变量提取成功: {var_name} -> {value} (Type: {type(value).__name__})")
except Exception as e:
@@ -103,13 +109,13 @@ class Exchange:
if full_match:
var_name = full_match.group(1)
return self._variable_cache.get(var_name, content)
return self._cache.get(var_name, content)
# B. 场景:混合文本或函数调用
# 例子:"Bearer ${token}" 或 "${gen_phone()}"
if "${" in content:
# 调用你提供的 Template 类
return Template(content).render(self._variable_cache)
return Template(content).render(self._cache)
return content
@@ -129,13 +135,12 @@ class Exchange:
if __name__ == "__main__":
from core.models import CaseInfo, RequestModel
from core.models import RawSchema, HttpAction
# 模拟外部写入一个初始变量
with open(EXTRACT_CACHE, "w") as f:
f.write("existing_var: '100'\n")
file_handler = YamlProcessor(filepath=EXTRACT_CACHE)
variable_cache_ = file_handler.load() or {}
ex = Exchange(EXTRACT_CACHE)
ex = Exchange(variable_cache_)
# --- 场景 1: 变量提取验证 ---
@@ -157,7 +162,7 @@ if __name__ == "__main__":
# 定义一个复杂的 CaseInfo
raw_case = {
"title": "测试用例",
"request": {
"action": {
"method": "POST",
"url": "http://api.com/${token}", # 混合文本 -> 应转为 str
"json_body": {
@@ -170,20 +175,21 @@ if __name__ == "__main__":
}
print("\n>>> 执行替换...")
new_case = ex.replace(raw_case)
print(new_case)
new_case = CaseInfo(**new_case)
# --- 校验结果 ---
new_case_one = ex.replace(raw_case)
print(new_case_one)
RawSchema(**new_case_one)
print(new_case_one.get("action"))
action = HttpAction(**new_case_one.get("action"))
print(action)
# # --- 校验结果 ---
print("\n--- 验证结果 ---")
print(f"URL (混合文本): {new_case.request.url} | 类型: {type(new_case.request.url)}")
print(f"ID (类型保持): {new_case.request.json_body['id']} | 类型: {type(new_case.request.json_body['id'])}")
print(f"Timeout (自动转换): {new_case.request.timeout} | 类型: {type(new_case.request.timeout)}")
print(f"URL (混合文本): {action.url} | 类型: {type(action.url)}")
print(f"ID (类型保持): {action.json_body['id']} | 类型: {type(action.json_body['id'])}")
print(f"Timeout (自动转换): {action.timeout} | 类型: {type(action.timeout)}")
# #
assert isinstance(action.json_body['id'], int)
# #
assert action.url == "http://api.com/auth_123"
assert action.timeout == 100
assert isinstance(new_case.request.json_body['id'], int)
assert new_case.request.url == "http://api.com/auth_123"
assert new_case.request.timeout == 100
# if os.path.exists(cache_path): os.remove(cache_path)
print("\nExchange 场景全部验证通过!")

View File

@@ -8,11 +8,12 @@
import logging
import importlib
from typing import Any, List, Optional
from collections import ChainMap
from pydantic import TypeAdapter
from core import settings
from core.models import CaseInfo, ValidateItem, RequestModel, ApiActionModel
from core.context import ExecutionEnv
from core.models import RawSchema, ValidateItem, HttpAction, ApiActionModel
from core.session import Session
from core.exchange import Exchange
from utils.case_validator import CaseValidator
@@ -24,61 +25,54 @@ VALIDATE_LIST_ADAPTER = TypeAdapter(List[ValidateItem])
class WorkflowExecutor:
def __init__(self, session: Session, exchanger: Exchange):
self.session = session
self.exchanger = exchanger
def perform(self, case_info: CaseInfo,context: Optional[dict[str, Any]] = None) -> Any:
@classmethod
def perform(cls, case_info: RawSchema, env: ExecutionEnv, context: Optional[dict[str, Any]] = None) -> Any:
"""执行单个用例支持直接请求和PO模式调用"""
context = context or {}
# --- 重点 1备份并切换上下文 ---
# 保存 Exchange 当前的全局字典引用
original_cache = env.exchanger.global_vars
# 1. 建立优先级变量池 (参数化变量 > 全局提取变量)
# ChainMap 是实现“局部覆盖全局”性能最好的方案
combined_vars = ChainMap(context, original_cache)
# 将 Exchange 的内部缓存临时指向这个合并池
env.exchanger.global_vars = combined_vars
# 1. 局部变量优先级注入
# 备份全局缓存,将当前行数据合并进去
old_cache = self.exchanger._variable_cache.copy()
self.exchanger._variable_cache.update(context)
resp = None # 初始化 resp避免异常时引用未定义
try:
# 2. 动态更新标题(如果 context 中包含 title
current_title = context.get("title") or case_info.title
logger.info(f"🚀 执行用例: {current_title}")
# raw_data = case_info.model_dump(by_alias=True, exclude_none=True)
# 1. 变量替换(将 ${var} 替换为真实值)
# rendered_dict = self.exchanger.replace(raw_data)
# rendered_case = CaseInfo.model_validate(rendered_dict)
raw_action_dict = case_info.action.model_dump(by_alias=True, exclude_none=True)
rendered_action_dict = env.exchanger.replace(raw_action_dict)
# --- 2. 决定执行模式 ---
if case_info.is_po_mode():
# PO 模式:仅渲染 api_action
action_dict = case_info.api_action.model_dump(by_alias=True, exclude_none=True)
rendered_action_dict = self.exchanger.replace(action_dict)
# 重新校验以修复类型(如 params 里的 int
rendered_action = ApiActionModel.model_validate(rendered_action_dict)
# PO 模式:反射调用
resp = self._execute_po_method(action=rendered_action)
resp = cls._execute_po_method(rendered_action, env)
else:
# 接口模式:直接请求
# 直接将 RequestModel 转为字典传给 session.request
request_kwargs = case_info.request.model_dump(by_alias=True, exclude_none=True)
rendered_req_dict = self.exchanger.replace(request_kwargs)
rendered_request = RequestModel.model_validate(rendered_req_dict)
rendered_request = HttpAction.model_validate(rendered_action_dict)
request_kwargs = rendered_request.model_dump(by_alias=True, exclude_none=True)
resp = self.session.request(**request_kwargs)
resp = env.session.request(**request_kwargs)
# --- 3. 后处理 (提取 & 断言) ---
self._post_process(resp, case_info)
# --- 3. 后处理:提取与断言 ---
cls._post_process(resp, case_info, env, original_cache)
return resp
except Exception as e:
logger.error(f"用例执行失败: {case_info.title} | 原因: {e}", exc_info=True)
raise
finally:
# 4. 关键:清理现场,还原全局变量池
self.exchanger._variable_cache = old_cache
# 兜底确保环境还原 (尽管 try 块中已经还原了一次,这里确保异常情况下也复位)
env.exchanger.global_vars = original_cache
def _execute_po_method(self, action: ApiActionModel):
@staticmethod
def _execute_po_method(action: ApiActionModel, env: ExecutionEnv):
"""核心反射逻辑:根据字符串动态加载 api/ 目录下的类并执行方法"""
class_name = action.api_class
method_name = action.method
@@ -94,15 +88,13 @@ class WorkflowExecutor:
try:
# 1. 动态导入模块(假设都在 api 目录下)
# 例如 class_name 是 UserAPI则尝试从 api.user 导入
# 这里简单处理,你可以根据你的文件名约定进一步优化逻辑
# module_name = f"api.{class_name.lower().replace('api', '')}"
module = importlib.import_module(module_name)
# 2. 获取类并实例化
cls = getattr(module, class_name)
api_instance = cls(self.session) # 传入 session 保持会话统一
api_instance = cls(env.session) # 传入 session 保持会话统一
# 3. 调用方法并返回结果
method = getattr(api_instance, method_name)
@@ -118,21 +110,33 @@ class WorkflowExecutor:
logger.error(f"反射调用失败: {class_name}.{method_name} -> {e}")
raise
def _post_process(self, resp: Any, rendered_case: CaseInfo):
# 3. 提取变量 (接口关联)
if rendered_case.extract:
for var_name, extract_info in rendered_case.extract.items():
self.exchanger.extract(resp, var_name, *extract_info)
@classmethod
def _post_process(cls, resp: Any, case_info: RawSchema, env: ExecutionEnv, original_cache: dict):
"""
统一后处理逻辑:处理变量提取(写全局)和断言校验(读局部+全局)
"""
# 记录当前的混合上下文 (ChainMap),供断言使用
combined_vars = env.exchanger.global_vars
# 4. 断言校验
if rendered_case.validate_data:
# raw_validate_list = [i.model_dump(by_alias=True) for i in rendered_case.validate_data]
# 1. 变量提取 (Write Operation)
if case_info.extract:
try:
# 必须切回 original_cache 才能持久化写入到全局变量池
env.exchanger.global_vars = original_cache
for var_name, extract_info in case_info.extract.items():
env.exchanger.extract(resp, var_name, *extract_info)
finally:
# 提取完成后,切回 combined_vars防止后续逻辑如断言丢失局部变量上下文
env.exchanger.global_vars = combined_vars
# 2. 断言校验 (Read Operation)
if case_info.validate_data:
raw_validate_list = [
item.model_dump(by_alias=True) if isinstance(item, ValidateItem) else item
for item in rendered_case.validate_data
for item in case_info.validate_data
]
rendered_validate_list = self.exchanger.replace(raw_validate_list)
rendered_validate_list = env.exchanger.replace(raw_validate_list)
# 重新通过 Adapter 触发类型修复 (str -> int)
final_validate_data = VALIDATE_LIST_ADAPTER.validate_python(rendered_validate_list)
CaseValidator.validate(resp, final_validate_data)

View File

@@ -10,127 +10,79 @@
@desc: 声明yaml用例格式
"""
import logging
from typing import List, Any, Optional, Union, Annotated
from typing import List, Any
import yaml
from pydantic import BaseModel, Field, ConfigDict, model_validator, field_validator, AfterValidator
from core import settings
from pydantic import BaseModel, Field, ConfigDict
logger = logging.getLogger(__name__)
def smart_cast_int(v: Any) -> Any:
if isinstance(v, str) and v.startswith("${") and v.endswith("}"):
return v
try:
return int(v)
except (ValueError, TypeError):
return v
def smart_cast_dict(v: Any) -> Any:
"""确保字典格式,若是占位符(字符串形式)则放行"""
if isinstance(v, str) and v.startswith("${") and v.endswith("}"):
return v
if isinstance(v, dict) or v is None:
return v
return v # 也可以根据需求抛出异常
# 使用 Annotated 定义带校验的类型
SmartInt = Annotated[Union[int, str], AfterValidator(smart_cast_int)]
SmartDict = Annotated[Union[dict[str, Any], str], AfterValidator(smart_cast_dict)]
# --- 基础请求模型 (用于第一种示例:直接请求) ---
class RequestModel(BaseModel):
class HttpAction(BaseModel):
method: str = Field(..., description="HTTP 请求方法: get, post, etc.")
url: str = Field(..., description="接口路径或完整 URL")
params: Optional[SmartDict] = None
data: Optional[SmartDict] = None
json_body: Optional[Any] = Field(None, alias="json")
headers: Optional[SmartDict] = None
cookies: Optional[dict[str, str]] = None
timeout: SmartInt = Field(default=10)
files: Optional[SmartDict] = None
headers: dict[str, Any] | None = Field(default=None, description="HTTP 请求头")
params: dict[str, Any] | None = Field(default=None, description="URL 查询参数")
data: dict[str, Any] | None = None
json_body: Any | None = Field(default=None, alias="json")
timeout: int = 10
files: dict[str, Any] | None = None
model_config = ConfigDict(extra="allow", populate_by_name=True) # 允许扩展 requests 的其他参数
model_config = ConfigDict(extra="allow", populate_by_name=True)
# --- PO 动作模型 (用于第二种示例:业务层反射调用) ---
class ApiActionModel(BaseModel):
api_class: str = Field(..., alias="class", description="要调用的 API 类名")
module: str = Field(..., alias="class", description="要调用的 API 类名")
method: str = Field(..., description="类中的方法名")
params: Optional[SmartDict] = Field(default_factory=dict, description="传给方法的参数")
params: dict[str, Any] = Field(default_factory=dict, description="传给方法的参数")
model_config = ConfigDict(populate_by_name=True)
# --- 新增:断言条目模型 ---
class ValidateItem(BaseModel):
check: Any = Field(..., description="要检查的字段或表达式")
check: str = Field(..., description="要检查的字段或表达式")
assert_method: str = Field(alias="assert", default="equals")
expect: Any = Field(..., description="期望值")
assert_method: str = Field(default="equals", alias="assert", description="断言方法")
msg: Optional[str] = Field(default="Assertion", description="断言描述")
msg: str = Field(default="Assertion", description="断言描述")
model_config = ConfigDict(populate_by_name=True)
# --- 核心用例数据模型 ---
class CaseInfo(BaseModel):
# 公共元数据
class RawSchema(BaseModel):
title: str = Field(..., description="用例标题")
epic: Optional[str] = None
feature: Optional[str] = None
story: Optional[str] = None
# 核心逻辑分叉:可以是 request 对象,也可以是 api_action 对象
# 根据 WorkflowExecutor 的逻辑,这里设为 Optional但在具体校验时可以互斥
request: Optional[RequestModel] = None
api_action: Optional[ApiActionModel] = None
# 后置处理
extract: Optional[dict[str, List[Any]]] = Field(
epic: str | None = None
feature: str | None = None
story: str | None = None
# 统一使用 action 字段承载业务逻辑 (Http 或 PO)
action: dict[str, Any] = Field(description="请求内容或PO动作内容")
extract: dict[str, List[Any]] | None = Field(
default=None,
description="变量提取表达式,格式: {变量名: [来源, 表达式, 索引]}"
)
validate_data: Optional[List[Union[ValidateItem, dict[str, Any]]]] = Field(
validate_data: List[Any] = Field(
default_factory=list,
alias="validate",
description="断言信息"
)
# 参数化(在 DataLoader 阶段会被拆解,但在初始加载时需要定义)
parametrize: Optional[List[List[Any]]] = None
model_config = ConfigDict(
populate_by_name=True, # 无论是在代码中用 api_class 还是在 YAML 中用 class 赋值Pydantic 都能正确识别。
arbitrary_types_allowed=True # 允许在模型中使用非 Pydantic 标准类型(如自定义类实例)
)
# 核心优化:增加互斥校验
@model_validator(mode='after')
def check_action_type(self) -> 'CaseInfo':
if not self.request and not self.api_action:
raise ValueError("用例必须包含 'request''api_action' 其中之一")
if self.request and self.api_action:
raise ValueError("'request''api_action' 不能同时存在")
return self
model_config = ConfigDict(extra="allow",
populate_by_name=True, # 无论是在代码中用 api_class 还是在 YAML 中用 class 赋值Pydantic 都能正确识别。
arbitrary_types_allowed=True # 允许在模型中使用非 Pydantic 标准类型(如自定义类实例)
) # 允许参数化等额外字段
def is_po_mode(self) -> bool:
"""判断是否为 PO 模式"""
return self.api_action is not None
return "class" in self.action or "module" in self.action
if __name__ == '__main__':
# 模拟数据 1标准请求模式
raw_case_1 = {
"title": "查询状态信息",
"request": {
"action": {
"method": "get",
"url": "/api/v1/info",
"headers": {"User-Agent": "pytest-ai"}
"headers": {"User-Agent": "pytest-ai"},
"json": {"User-Agent": "pytest-ai"}
},
"validate": [
{"check": "status_code", "assert": "equals", "expect": 200, "msg": "响应码200"},
@@ -141,7 +93,7 @@ if __name__ == '__main__':
# 模拟数据 2PO 模式 (反射调用)
raw_case_2 = {
"title": "用户登录测试",
"api_action": {
"action": {
"class": "UserAPI",
"method": "login",
"params": {"user": "admin", "pwd": "123"}
@@ -155,22 +107,22 @@ if __name__ == '__main__':
try:
# 验证模式 1
case1 = CaseInfo(**raw_case_1)
case1 = RawSchema(**raw_case_1)
print(f"✅ 模式1 (Request) 校验通过: {case1.title}")
print(f" 请求URL: {case1.request.url}")
print(f" 第一个断言方法: {case1.validate_data[0].assert_method}\n")
print(f" http: {case1.action}")
print(f" 断言规则数: {len(case1.validate_data)}\n")
# 验证模式 2
case2 = CaseInfo(**raw_case_2)
case2 = RawSchema(**raw_case_2)
print(f"✅ 模式2 (PO Mode) 校验通过: {case2.title}")
print(f" 调用类: {case2.api_action.api_class}")
print(f" api: {case2.action}")
print(f" 提取规则数: {len(case2.extract)}\n")
# 验证非法数据(如:既没有 request 也没有 api_action 的情况可以在业务层进一步校验)
# 这里演示 Pydantic 自动类型转换
invalid_data = {"title": "错误用例", "request": {"url": "/api"}} # 缺少 method
invalid_data = {"title": "错误用例", "action": {"url": "/api"}} # 缺少 method
print("--- 预期失败测试 ---")
CaseInfo(**invalid_data)
RawSchema(**invalid_data)
except Exception as e:
print(f"❌ 预期内的校验失败: \n{e}")

View File

@@ -1,4 +1 @@
existing_var: '100'
token: auth_123
u_id: 888
user_name: ChenWei

View File

@@ -2,7 +2,7 @@ feature: 页面状态
story: 状态
title: 查询状态信息
epic: 的点点滴滴
request:
action:
method: get
url: /answer/api/v1/connector/info
headers:

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python
# coding=utf-8
import logging
from core import settings
from core.creator import CaseGenerator, TestTemplateBase
logger = logging.getLogger(__name__)
class TestRunner(TestTemplateBase):
"""
测试用例的动态容器 (Test Case Container)。
这是一个占位符类CaseGenerator 会扫描所有的 YAML 用例文件,
然后将每一个用例动态地生成为一个测试方法并挂载到这个类上。
Pytest 最终会发现并执行这些动态挂载的 test_* 方法。
"""
pass
try:
# --- 核心逻辑:动态生成测试用例 ---
# 当 Pytest 在“收集测试用例”阶段加载此模块时,下面的代码会立即执行。
logger.info("--- [Collector] 开始扫描并动态生成测试用例 ---")
CaseGenerator.build_and_register(target_cls=TestRunner, cases_dir=settings.TEST_CASE_DIR)
logger.info(f"--- [Collector] 测试用例生成完毕,已成功加载到 {TestRunner.__name__} ---")
except Exception as e:
logger.critical(f"--- [Collector] 动态生成测试用例时发生致命错误,测试执行中止 ---", exc_info=True)
# 抛出异常,让 pytest 捕获并报告为收集错误 (Collection Error)
raise RuntimeError("测试用例收集失败,请检查日志中的详细错误信息。") from e