- 引入 SmartInt 和 SmartDict 类型,支持 YAML 占位符与业务类型的自动转换。 - 优化 CaseInfo 互斥校验逻辑,确保 request 与 api_action 二选一。 - 统一使用 Pydantic V2 的 model_config 规范。 - 将变量替换时机提前至模型实例化之前,支持占位符在校验前完成真实值注入, 保证了 int/bool 等字段的类型转换正确性。 - 优化断言渲染时机,支持响应提取值关联。
190 lines
6.8 KiB
Python
190 lines
6.8 KiB
Python
#!/usr/bin/env python
|
||
# coding=utf-8
|
||
|
||
"""
|
||
@desc: 变量交换器,用于数据替换和提取
|
||
"""
|
||
|
||
import logging
|
||
import re
|
||
from typing import Any, Union, TypeVar
|
||
|
||
import jsonpath
|
||
from lxml import etree
|
||
|
||
|
||
from core.models import CaseInfo
|
||
from core.settings import EXTRACT_CACHE
|
||
from core.templates import Template
|
||
from commons.file_processors.yaml_processor import YamlProcessor
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 定义泛型,用于保持返回类型一致
|
||
T = TypeVar("T", bound=Union[dict, list, str, Any])
|
||
|
||
|
||
class Exchange:
|
||
def __init__(self, cache_path: str):
|
||
self.cache_path = cache_path
|
||
self.file_handler = YamlProcessor(filepath=self.cache_path)
|
||
# 1. 增加内存缓存,避免频繁磁盘 I/O
|
||
self._variable_cache = self.file_handler.load() or {}
|
||
# 匹配标准变量 ${var},排除函数调用 ${func()}
|
||
# self.var_only_pattern = re.compile(r"\$\{([a-zA-Z_]\w*)}")
|
||
self.var_only_pattern = re.compile(r"^\$\{([a-zA-Z_]\w*)}$")
|
||
|
||
def extract(self, resp, var_name: str, attr: str, expr: str, index: int = 0):
|
||
"""
|
||
从响应中提取数据并更新到缓存及文件
|
||
:param resp: Response 对象
|
||
:param var_name: 变量名
|
||
:param attr: 属性名 (json, text, headers 等)
|
||
:param expr: 提取表达式 ($.jsonpath, //xpath, regex)
|
||
:param index: 索引
|
||
"""
|
||
|
||
try:
|
||
# 兼容处理 resp.json
|
||
target_data = getattr(resp, attr, None)
|
||
if attr == "json":
|
||
try:
|
||
target_data = resp.json()
|
||
except Exception:
|
||
target_data = {"msg": "not json data"}
|
||
|
||
if target_data is None:
|
||
logger.warning(f"提取失败: 响应对象中不存在属性 '{attr}'")
|
||
return
|
||
|
||
value = None
|
||
|
||
if expr.startswith("$"): # JSONPath
|
||
res = jsonpath.jsonpath(target_data, expr)
|
||
if res: value = res[index]
|
||
elif expr.startswith("/") or expr.startswith("./"): # XPath 模式
|
||
# 将文本解析为 HTML 树
|
||
html_content = resp.text
|
||
tree = etree.HTML(html_content)
|
||
res = tree.xpath(expr)
|
||
if res:
|
||
# 获取节点文本或属性值
|
||
target_node = res[index]
|
||
value = target_node.text if hasattr(target_node, 'text') else str(target_node)
|
||
else: # 正则
|
||
res = re.findall(expr, str(target_data))
|
||
if res: value = res[index]
|
||
|
||
if value is None:
|
||
logger.warning(f"变量 [{var_name}] 未通过表达式 [{expr}] 提取到数据")
|
||
value = "not data"
|
||
|
||
self._variable_cache[var_name] = value
|
||
self.file_handler.save(self._variable_cache)
|
||
logger.info(f"变量提取成功: {var_name} -> {value} (Type: {type(value).__name__})")
|
||
|
||
except Exception as e:
|
||
logger.error(f"提取变量 [{var_name}] 过程中发生异常: {e}", exc_info=True)
|
||
|
||
def _smart_replace(self, content: Any) -> Any:
|
||
"""
|
||
递归替换逻辑:
|
||
- 如果是纯变量占位符 ${token},则返回变量在缓存中的原始类型 (int, dict, list 等)
|
||
- 如果是混合字符串或函数调用,则调用 Template 渲染为字符串
|
||
"""
|
||
if isinstance(content, dict):
|
||
return {k: self._smart_replace(v) for k, v in content.items()}
|
||
elif isinstance(content, list):
|
||
return [self._smart_replace(i) for i in content]
|
||
elif isinstance(content, str):
|
||
# A. 场景:纯变量(为了保持类型,不走 Template 渲染成字符串)
|
||
# 例子:content = "${order_id}",如果 order_id 是 int 123,则返回 123
|
||
full_match = self.var_only_pattern.fullmatch(content)
|
||
if full_match:
|
||
var_name = full_match.group(1)
|
||
|
||
return self._variable_cache.get(var_name, content)
|
||
|
||
# B. 场景:混合文本或函数调用
|
||
# 例子:"Bearer ${token}" 或 "${gen_phone()}"
|
||
if "${" in content:
|
||
# 调用你提供的 Template 类
|
||
return Template(content).render(self._variable_cache)
|
||
|
||
return content
|
||
|
||
def replace(self, data: T) -> T:
|
||
"""
|
||
通用的变量替换入口
|
||
支持输入 dict, list, str 或 Pydantic Model (需先 dump)
|
||
"""
|
||
if not data:
|
||
return data
|
||
|
||
logger.debug(f"开始变量替换,原始数据类型: {type(data).__name__}")
|
||
|
||
rendered_data = self._smart_replace(data)
|
||
|
||
return rendered_data
|
||
|
||
|
||
if __name__ == "__main__":
|
||
from core.models import CaseInfo, RequestModel
|
||
|
||
# 模拟外部写入一个初始变量
|
||
with open(EXTRACT_CACHE, "w") as f:
|
||
f.write("existing_var: '100'\n")
|
||
|
||
ex = Exchange(EXTRACT_CACHE)
|
||
|
||
|
||
# --- 场景 1: 变量提取验证 ---
|
||
class MockResponse:
|
||
def __init__(self):
|
||
self.json_data = {"data": {"token": "auth_123", "user_id": 888}}
|
||
self.text = "<html><body><div id='name'>ChenWei</div></body></html>"
|
||
|
||
def json(self): return self.json_data
|
||
|
||
|
||
mock_resp = MockResponse()
|
||
print(">>> 执行提取...")
|
||
ex.extract(mock_resp, "token", "json", "$.data.token")
|
||
ex.extract(mock_resp, "u_id", "json", "$.data.user_id")
|
||
ex.extract(mock_resp, "user_name", "text", "//div[@id='name']")
|
||
|
||
# --- 场景 2: 变量替换与类型保持 ---
|
||
# 定义一个复杂的 CaseInfo
|
||
raw_case = {
|
||
"title": "测试用例",
|
||
"request": {
|
||
"method": "POST",
|
||
"url": "http://api.com/${token}", # 混合文本 -> 应转为 str
|
||
"json_body": {
|
||
"id": "${u_id}", # 纯变量 -> 应保持 int
|
||
"name": "${user_name}", # 纯变量 -> str
|
||
"config": "${existing_var}" # 初始文件变量 -> int
|
||
},
|
||
"timeout": "${existing_var}" # 字符串形式的数字 -> Pydantic 应转回 int
|
||
}
|
||
}
|
||
|
||
print("\n>>> 执行替换...")
|
||
new_case = ex.replace(raw_case)
|
||
print(new_case)
|
||
new_case = CaseInfo(**new_case)
|
||
|
||
# --- 校验结果 ---
|
||
print("\n--- 验证结果 ---")
|
||
print(f"URL (混合文本): {new_case.request.url} | 类型: {type(new_case.request.url)}")
|
||
print(f"ID (类型保持): {new_case.request.json_body['id']} | 类型: {type(new_case.request.json_body['id'])}")
|
||
print(f"Timeout (自动转换): {new_case.request.timeout} | 类型: {type(new_case.request.timeout)}")
|
||
|
||
assert isinstance(new_case.request.json_body['id'], int)
|
||
|
||
assert new_case.request.url == "http://api.com/auth_123"
|
||
assert new_case.request.timeout == 100
|
||
|
||
# if os.path.exists(cache_path): os.remove(cache_path)
|
||
print("\nExchange 场景全部验证通过!")
|