- 引入 SmartInt 和 SmartDict 类型,支持 YAML 占位符与业务类型的自动转换。 - 优化 CaseInfo 互斥校验逻辑,确保 request 与 api_action 二选一。 - 统一使用 Pydantic V2 的 model_config 规范。 - 将变量替换时机提前至模型实例化之前,支持占位符在校验前完成真实值注入, 保证了 int/bool 等字段的类型转换正确性。 - 优化断言渲染时机,支持响应提取值关联。
181 lines
6.3 KiB
Python
181 lines
6.3 KiB
Python
#!/usr/bin/env python
|
||
# coding=utf-8
|
||
|
||
"""
|
||
@author: chen wei
|
||
@Software: PyCharm
|
||
@contact: t6i888@163.com
|
||
@file: templates.py
|
||
@date: 2024 2024/9/22 22:20
|
||
@desc:
|
||
"""
|
||
import copy
|
||
import logging
|
||
import re
|
||
import string
|
||
import ast
|
||
from typing import List, Any
|
||
|
||
from commons.funcs import Funcs
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class Template(string.Template):
|
||
"""
|
||
增强型模板引擎:
|
||
1. 兼容标准变量替换 ${var}
|
||
2. 支持带参数的函数调用 ${func(arg1, arg2)}
|
||
3. 支持变量嵌套作为函数参数 ${func(${var})}
|
||
"""
|
||
|
||
# call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
|
||
# call_pattern = re.compile(r"\$\{(?P<func_name>[a-zA-Z_]\w*)\((?P<func_args>.*)\)}")
|
||
# 匹配函数调用结构:${函数名(参数)}
|
||
# 分组:func_name (字母下划线开头), func_args (括号内的所有内容)
|
||
call_pattern = re.compile(r"\$\{(?P<func_name>[a-zA-Z_]\w*)\((?P<func_args>.*)\)}")
|
||
|
||
def render(self, mapping: dict) -> str:
|
||
"""
|
||
渲染入口
|
||
:param mapping: 变量缓存(来自 Exchange._variable_cache)
|
||
:return: 渲染后的字符串
|
||
"""
|
||
# 1. 第一步:利用原生 string.Template 替换基础变量
|
||
# 这一步会将参数中的 ${var} 预先替换为实际值,从而支持函数嵌套调用
|
||
s = self.safe_substitute(mapping) # 原有方法替换变量
|
||
# 2. 第二步:解析并执行函数调用
|
||
s = self.safe_substitute_funcs(s, mapping)
|
||
|
||
return s
|
||
|
||
@staticmethod
|
||
def _parse_args(args_str: str, mapping: dict) -> List[Any]:
|
||
"""
|
||
核心优化:安全拆分函数参数
|
||
利用正则预读,跳过引号内的逗号,解决 ${func('a,b', 123)} 的分割问题
|
||
"""
|
||
args_str = args_str.strip()
|
||
if not args_str:
|
||
return []
|
||
|
||
# 正则解析说明:匹配逗号,但该逗号后面必须有偶数个引号(说明逗号不在引号内)
|
||
raw_args = re.split(r',(?=(?:[^\'"]*[\'"][^\'"]*[\'"])*[^\'"]*$)', args_str)
|
||
|
||
processed_args = []
|
||
for arg in raw_args:
|
||
arg = arg.strip()
|
||
# 1. 处理带引号的字符串参数
|
||
if (arg.startswith("'") and arg.endswith("'")) or (arg.startswith('"') and arg.endswith('"')):
|
||
processed_args.append(arg[1:-1])
|
||
# 2. 处理数字类型
|
||
elif arg.isdigit():
|
||
processed_args.append(int(arg))
|
||
# 3. 处理布尔值
|
||
elif arg.lower() == "true":
|
||
processed_args.append(True)
|
||
elif arg.lower() == "false":
|
||
processed_args.append(False)
|
||
# 4. 如果在 mapping 中能找到(针对未经过第一步替换的情况),取其值
|
||
elif arg in mapping:
|
||
processed_args.append(mapping[arg])
|
||
# 5. 其他情况按原样字符串处理
|
||
else:
|
||
processed_args.append(arg)
|
||
|
||
return processed_args
|
||
|
||
def safe_substitute_funcs(self, template: str, mapping: dict) -> str:
|
||
"""
|
||
解析字符串中的函数名和参数,并将函数调用结果进行替换
|
||
:param template: 字符串
|
||
:param mapping: 上下文,提供要使用的函数和变量
|
||
:return: 替换后的结果
|
||
"""
|
||
|
||
# 合并函数映射和变量映射,作为统一上下文
|
||
# 使用解构赋值替代 deepcopy,提升性能
|
||
logger.info(f"mapping更新前: {mapping}")
|
||
render_context = {**Funcs.FUNC_MAPPING, **mapping}
|
||
logger.info(f"mapping更新后: {render_context}")
|
||
|
||
# mapping = copy.deepcopy(mapping)
|
||
# logger.info(f"mapping更新前: {mapping}")
|
||
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
|
||
# mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
|
||
# logger.info(f"mapping更新后: {mapping}")
|
||
|
||
def convert(mo):
|
||
func_name = mo.group("func_name")
|
||
# func_args = mo.group("func_args").split(",")
|
||
func_args_str = mo.group("func_args")
|
||
|
||
func = render_context.get(func_name) # 读取指定函数
|
||
|
||
if not callable(func):
|
||
logger.warning(f"模板中的函数 '{func_name}' 未定义或不可调用")
|
||
return mo.group()
|
||
# 解析参数列表
|
||
args = self._parse_args(func_args_str, render_context)
|
||
|
||
try:
|
||
# 执行函数并强制转为字符串返回,以便 re.sub 替换
|
||
result = func(*args)
|
||
return str(result)
|
||
except Exception as e:
|
||
logger.error(f"执行函数 ${{{func_name}(...)}} 报错: {e}", exc_info=True)
|
||
return mo.group()
|
||
|
||
return self.call_pattern.sub(convert, template)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 模拟 Funcs.FUNC_MAPPING
|
||
def mock_concat(a, b):
|
||
return f"{a}_{b}"
|
||
|
||
|
||
def mock_get_now():
|
||
return "2026-03-09"
|
||
|
||
|
||
def mock_add(x, y):
|
||
return x + y
|
||
|
||
|
||
# 注入模拟函数
|
||
Funcs.FUNC_MAPPING = {
|
||
"concat": mock_concat,
|
||
"now": mock_get_now,
|
||
"add": mock_add
|
||
}
|
||
|
||
# 模拟变量缓存
|
||
test_mapping = {
|
||
"env": "prod",
|
||
"num1": 10,
|
||
"num2": 20
|
||
}
|
||
|
||
test_cases = [
|
||
("场景A:标准变量", "Current env is ${env}", "Current env is prod"),
|
||
("场景B:无参数函数", "Date: ${now()}", "Date: 2026-03-09"),
|
||
("场景C:带参数函数(含逗号)", "Res: ${concat('hello,world', 'test')}", "Res: hello,world_test"),
|
||
("场景D:变量嵌套函数参数", "Sum: ${add(${num1}, ${num2})}", "Sum: 30"),
|
||
("场景E:混合模式", "URL: /${env}/api/${now()}", "URL: /prod/api/2026-03-09"),
|
||
("场景F:参数类型自动识别", "Value: ${add(5, 5)}", "Value: 10"), # 5应该被识别为int
|
||
]
|
||
|
||
print(f"{'测试场景':<25} | {'预期结果':<30} | {'实际结果'}")
|
||
print("-" * 80)
|
||
|
||
for scene, tpl_str, expected in test_cases:
|
||
actual = Template(tpl_str).render(test_mapping)
|
||
status = "✅" if str(actual) == str(expected) else "❌"
|
||
print(f"{scene:<25} | {expected:<30} | {actual} {status}")
|
||
|
||
# 特殊验证:嵌套失败回退
|
||
print("\n>>> 验证未定义函数回退:")
|
||
error_tpl = "Check: ${undefined_func()}"
|
||
print(f"结果: {Template(error_tpl).render(test_mapping)}")
|