fix(exchange,case_validator),refactor(),feat(model): 解决 Pydantic 模型初始化与变量占位符的类型冲突,优化变量替换逻辑,重构 CaseInfo 模型并引入延迟校验机制

- 引入 SmartInt 和 SmartDict 类型,支持 YAML 占位符与业务类型的自动转换。
- 优化 CaseInfo 互斥校验逻辑,确保 request 与 api_action 二选一。
- 统一使用 Pydantic V2 的 model_config 规范。
- 将变量替换时机提前至模型实例化之前,支持占位符在校验前完成真实值注入,
保证了 int/bool 等字段的类型转换正确性。
- 优化断言渲染时机,支持响应提取值关联。
This commit is contained in:
2026-03-11 10:29:16 +08:00
parent 69a96a0060
commit 293b5160fe
39 changed files with 1359 additions and 1031 deletions

View File

@@ -9,23 +9,18 @@
@date: 2024 2024/9/16 9:57
@desc: 动态生成用例
"""
from dataclasses import asdict
from pathlib import Path
import logging
from typing import Union, Generator, Type
from unittest import TestCase
import allure
import pytest
from commons import settings
from commons.file_processors.processor_factory import get_processor_class
# from commons.models import CaseInfo
from commons.session import Session
from commons.exchange import Exchange
from commons.templates import Template
from commons.case_handler import TestCaseHandle
from utils import case_validator
from core import settings
from commons.file_processors.yaml_processor import YamlProcessor as FileHandle
from commons.models import CaseInfo
from core.session import Session
from core.exchange import Exchange
from utils import data_driver, case_validator
logger = logging.getLogger(__name__)
@@ -37,195 +32,90 @@ exchanger = Exchange(settings.exchanger)
class TestAPI:
@classmethod
def run(cls, testcase_dir: Union[Path, str] = cases_dir):
for fp in CaseFinder(testcase_dir).find_testcases():
print(fp.name)
case = CaseGenerator(fp).generate_testcases()
print(f"{case=}")
for i in case:
print(f"{i=}")
CaseRegister(cls).register_test_func(i)
# @classmethod
# def find_test_cases(cls, case_dir: Path = cases_dir):
# """
# 搜索和加载yaml文件
# :return:
# """
# case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
# for case_path in case_path_list:
# logger.info(f"加载文件:{case_path}")
#
# file = FileHandle(case_path) # 自动读取yaml文件
# try:
# CaseInfo(**file) # 校验用例格式
# logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
# case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# # print(case_path.stem)
# setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
# except Exception as e:
# logger.error(e)
#
# @classmethod
# def new_case(cls, file_name, case_info: dict):
# test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
#
# keys_list = list(test_case.keys())
# logger.info(f"keys_list{keys_list}")
#
# values_list = list(test_case.values())
# logger.info(f"测试用例列表:{values_list}")
#
# driver_title = [i.get("title") for i in values_list]
# logger.info(f"driver_title={driver_title}")
#
# epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
# logger.info(f"epic{epic}")
#
# feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
# logger.info(f"feature{feature}")
#
# story = case_info["story"] if case_info["story"] else settings.allure_story
# logger.info(f"story{story}")
#
# @allure.epic(epic)
# @allure.feature(feature)
# @allure.story(story)
# @pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
# def test_func(self, case_key):
# logger.info(f"case_key{case_key}")
#
# test_case_mapping = test_case.get(case_key)
# logger.info(f"测试用例:{test_case_mapping}")
#
# allure.dynamic.title(test_case_mapping.get("title"))
#
# logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
#
# # 0变量替换
# new_case_info = exchanger.replace(test_case_mapping)
# logger.info(f"1正在注入变量...")
# logger.info(f"new_case_info{new_case_info}")
# # 1发送请求
# logger.info(f"2正在请求接口...")
# resp = session.request(**new_case_info.get("request"))
#
# logger.info(f"3正在提取变量...")
# # 2保存变量(接口关联)
# for var_name, extract_info in new_case_info.get("extract").items():
# logger.info(f"保存变量:{var_name}{extract_info}")
# exchanger.extract(resp, var_name, *extract_info)
# # 3断言
# logger.info(f"4正在断言...")
# assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}")
# # assert_case_info.assert_all() # 执行断言
# _validator = case_validator.CaseValidator()
# _validator.assert_all(assert_case_info.get("validate"))
#
# logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
#
# return test_func
def find_test_cases(cls, case_dir: Path = cases_dir):
"""
搜索和加载yaml文件
:return:
"""
case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for case_path in case_path_list:
logger.info(f"加载文件:{case_path}")
file = FileHandle(case_path) # 自动读取yaml文件
try:
CaseInfo(**file) # 校验用例格式
logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# print(case_path.stem)
setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
except Exception as e:
logger.error(e)
class CaseFinder:
find_suffix: str = settings.test_suffix
@classmethod
def new_case(cls, file_name, case_info: dict):
test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
def __init__(self, testcase_dir: Union[str, Path]):
if Path(testcase_dir).is_dir():
self.testcase_dir: Path = Path(testcase_dir)
else:
raise FileNotFoundError("不是有效的目录")
keys_list = list(test_case.keys())
logger.info(f"keys_list{keys_list}")
def find_testcases(self) -> Generator[Path, None, None]:
testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}")
for fp in testcase_files:
logger.info(f"加载文件:{fp}")
yield fp
values_list = list(test_case.values())
logger.info(f"测试用例列表:{values_list}")
driver_title = [i.get("title") for i in values_list]
logger.info(f"driver_title={driver_title}")
class CaseGenerator:
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
def __init__(self, fp: Union[str, Path]):
self.fp: Path = Path(fp)
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
def generate_testcases(self) -> Generator[dict, None, None]:
file_name = self.fp.stem
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件
case_info = TestCaseHandle.new(case_info_)
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
@pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
def test_func(self, case_key):
logger.info(f"case_key{case_key}")
if not case_info.parametrize:
yield {file_name + "__": asdict(case_info)}
else:
cases = {}
args_names = case_info.parametrize[0]
for i, args_values in enumerate(case_info.parametrize[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
print(context)
# rendered = Template(FileHandle.to_string(case_info)).render(context)
rendered = Template(case_info.to_string()).render(context)
# cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)})
test_case_mapping = test_case.get(case_key)
logger.info(f"测试用例:{test_case_mapping}")
yield cases
allure.dynamic.title(test_case_mapping.get("title"))
logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
class CaseRegister:
def __init__(self, register: Type["TestAPI"]):
self.register: Type["TestAPI"] = register
# 0变量替换
new_case_info = exchanger.replace(test_case_mapping)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
def register_test_func(self, case: dict):
for test_filed_name, case_info in case.items():
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
def register_func(instance, testcase=case_info):
# allure.dynamic.epic(epic)
# allure.dynamic.feature(feature)
# allure.dynamic.story(story)
allure.dynamic.title(testcase.get("title"))
logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(testcase)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(testcase) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "="))
# return test_func
setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中
return test_func
# TestAPI.find_yaml_case()
if __name__ == '__main__':
TestAPI.run(cases_dir)
print(TestAPI.__dict__)
TestAPI.find_test_cases()
# print(TestAPI.__dict__)

View File

@@ -10,9 +10,10 @@
@desc:
"""
import logging
import os
import pymysql as MySQLdb
from commons import settings
logger = logging.getLogger(__name__)
@@ -32,11 +33,11 @@ class DBServer:
db = DBServer(
host=settings.db_host, # ip
port=settings.db_port, # 端口
user=settings.db_user, # 用户名
password=settings.db_password, # 密码
database=settings.db_database # 库名
host=os.getenv("DB_HOST"), # ip
port=os.getenv("DB_PORT"), # 端口
user=os.getenv("DB_USER"), # 用户名
password=os.getenv("DB_PASSWORD"), # 密码
database=os.getenv("DB_DATABASE") # 库名
)
if __name__ == '__main__':

View File

@@ -1,113 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: exchange.py
@date: 2024 2024/9/18 21:58
@desc:
"""
import copy
import json
import logging
import re
import jsonpath
import allure
from commons.templates import Template
from commons.file_processors.processor_factory import get_processor_class
from tests.b import TestCaseHandle
logger = logging.getLogger(__name__)
class Exchange:
def __init__(self, path):
self.file = get_processor_class(path)
@allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index) -> None:
resp = copy.deepcopy(resp)
try:
# resp中json是方法不是属性需要手动更改为属性
resp.json = resp.json()
except json.decoder.JSONDecodeError:
resp.json = {"msg": "is not json data"}
data = getattr(resp, attr)
if expr.startswith("/"): # xpath
res = None
elif expr.startswith("$"): # jsonpath
data = dict(data)
res = jsonpath.jsonpath(data, expr)
else: # 正则
res = re.findall(expr, str(data))
# print(res)
if res: # 如果有数据
value = res[index]
else: # 如果没有数据
value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
data = self.file.load()
data[var_name] = value # 保存变量
self.file.save(data) # 持久化存储到文件
@allure.step("替换变量")
def replace(self, case_info: dict) -> dict:
logger.info(f"变量替换:{case_info}")
# 1将case_info转换为字符串
data = TestCaseHandle(**case_info)
case_info_str = data.to_string()
print(f"{case_info_str=}")
# 2替换字符串
case_info_str = Template(case_info_str).render(self.file.load())
print(f"{case_info_str=}")
# 3将字符串转换成case_info
new_case_info = data.to_dict(case_info_str)
return new_case_info
if __name__ == '__main__':
class MockResponse:
text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}'
def json(self):
return json.loads(self.text)
mock_resp = MockResponse()
# print(mock_resp.text)
# print(mock_resp.json())
exchanger = Exchange(r"E:\PyP\InterfaceAutoTest\extract.yaml")
exchanger.extract(mock_resp, "name", "json", '$.name', 0)
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0)
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
# mock_case_info = CaseInfo(
# title="单元测试",
# request={
# "data":
# {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
# },
# extract={},
# validate={}
# )
mock_case_info = {
"title": "单元测试",
"request": {
"data":
{"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
},
"extract": {},
"validate": {}
}
new_mock_case_info = exchanger.replace(mock_case_info)
print(new_mock_case_info)

View File

@@ -10,14 +10,21 @@
@desc:
"""
import logging
from typing import Union
from typing import Union, Any
from pathlib import Path
import yaml
from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__)
class YamlLoadError(Exception):
"""自定义 YAML 加载异常:当 YAML 语法错误或不符合业务结构时抛出"""
pass
class YamlProcessor(BaseFileProcessor):
"""
用于处理 YAML 文件的类,继承自 dict。
@@ -25,52 +32,101 @@ class YamlProcessor(BaseFileProcessor):
并可以直接像字典一样访问 YAML 数据。
"""
def __init__(self, filepath: Union[str, Path], **kwargs):
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
"""
初始化 YamlFile 对象。
Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
"""
super().__init__(filepath, **kwargs)
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> dict:
super().__init__(filepath=filepath)
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> dict[str, Any]:
"""
YAML 文件加载数据
:return:
加载 YAML 文件并返回字典。
Returns:
Dict: 加载后的数据字典。
Raises:
YamlLoadError: 文件读取或解析过程中出现异常。
"""
if not self.filepath.exists():
logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
logger.error(f"❌ 文件未找到: {self.filepath}")
return {}
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f)
if not isinstance(loaded_data, dict): # 确保加载的是字典
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
return loaded_data
content = yaml.safe_load(f)
# 情况1文件内容为空
if content is None:
return {}
# 情况2YAML 语法正确但不是字典(如单纯的字符串或列表)
if not isinstance(content, dict):
raise YamlLoadError(f"YAML 顶层格式错误:期望 dict实际为 {type(content).__name__}")
return content
except yaml.YAMLError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
raise e
msg = f" YAML 语法错误 [{self.filepath.name}]: {e}"
logger.error(msg)
raise YamlLoadError(msg) from e
except Exception as e:
logger.error(f"📂 读取文件系统异常: {e}")
raise
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
@staticmethod
def to_string(data: dict[str, Any]) -> str:
"""
将字典数据保存到 YAML 文件
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
将字典 (自身) 转换为 YAML 格式的字符串
Returns:
YAML 格式的字符串。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
# 确保目标目录存在
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, "w", encoding="utf-8") as f:
return yaml.safe_dump(
data,
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
except Exception as e:
logger.error(f"序列化 YAML 失败: {e}")
return ""
@staticmethod
def from_string(yaml_str: str) -> Union[None, dict]:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
yaml_str: YAML 格式的字符串。
"""
try:
data = yaml.safe_load(yaml_str)
return data if isinstance(data, dict) else {}
except yaml.YAMLError as e:
logger.error(f"YAML 字符串解析失败: {e}")
return {}
def save(self, data: dict[str, Any], new_filepath: Union[str, Path, None] = None):
"""
将字典数据保存为 YAML 文件。
Args:
data: 要保存的字典数据。
new_filepath: 可选,保存到新路径。
"""
target_path = Path(new_filepath) if new_filepath else self.filepath
try:
target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
yaml.safe_dump(
data,
stream=f,
@@ -78,20 +134,22 @@ class YamlProcessor(BaseFileProcessor):
sort_keys=False,
default_flow_style=False
)
logger.info(f"数据已成功保存 {filepath}")
except (TypeError, OSError, yaml.YAMLError) as e:
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
raise e
logger.debug(f"💾 数据已成功保存至: {target_path}")
except Exception as e:
logger.error(f"🚫 保存 YAML 失败: {e}")
raise
except (TypeError, OSError) as e:
logger.error(f"保存 YAML 文件 {self.filepath} 时出错: {e}")
# todo 需要将异常的情况返回给上层而不是默认处理为{}
if __name__ == '__main__':
from core.settings import TEST_CASE_DIR
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_path = TEST_CASE_DIR / r'answer/test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlProcessor(yaml_path)
print(yaml_file.load())
print(yaml_file.to_string(yaml_file.load()))
print(type(yaml_file))
# # 直接像字典一样访问数据
@@ -134,4 +192,3 @@ if __name__ == '__main__':
# print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {}
# non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file)

View File

@@ -10,7 +10,6 @@
@desc: 读取和保存yaml文件
"""
import logging
from dataclasses import dataclass, asdict, field
from pathlib import Path
import yaml
@@ -18,20 +17,31 @@ logger = logging.getLogger(__name__)
class YamlFile(dict):
def __init__(self, path):
super().__init__() # 初始化父类 dict
self.path = Path(path)
self.load() # 链式初始化加载
def __init__(self, path=None, data=None):
super().__init__()
self.path = Path(path) if path else None
if data:
self.update(data)
elif self.path:
if self.path.is_dir():
raise IsADirectoryError(f"The path {self.path} is a directory, not a file.")
self.load()
def load(self):
if self.path.exists():
if not self.path:
logger.warning("No path specified for YamlFile, cannot load.")
return self
if self.path.exists() and self.path.is_file():
with open(self.path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {} # 加载数据,空文件返回空字典
self.clear() # 清空当前实例
self.update(data) # 更新字典内容
loaded_data = yaml.safe_load(f) or {}
self.clear()
self.update(loaded_data)
else:
logger.warning(f"File {self.path} not found, initialized empty.")
return self # 链式调用
logger.warning(f"File not found at {self.path}, YamlFile initialized as empty.")
self.clear()
return self
def to_yaml(self) -> str:
return yaml.safe_dump(
@@ -40,29 +50,49 @@ class YamlFile(dict):
sort_keys=False
)
@classmethod
def by_yaml(cls, yaml_str):
data = yaml.safe_load(yaml_str) or {}
return cls({**data}) # 通过类方法创建实例
return cls(data=data)
def save(self):
if not self.path:
raise ValueError("Cannot save YamlFile instance without a specified path.")
# 确保父目录存在
self.path.parent.mkdir(parents=True, exist_ok=True)
with open(self.path, "w", encoding="utf-8") as f:
yaml.safe_dump(
dict(self), # 直接 dump 实例本身(已继承 dict
dict(self),
stream=f,
allow_unicode=True,
sort_keys=False
)
return self # 链式调用
return self
if __name__ == '__main__':
from commons.models import CaseInfo
from core.models import CaseInfo
from core.settings import TEST_CASE_DIR
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml'
yaml_file = YamlFile(yaml_path)
# yaml_file.load()
case_info = CaseInfo(**yaml_file)
yaml_file["title"] = "查询用户信息"
yaml_file.save()
# 1. 创建一个用于测试的临时yaml文件
dummy_path = TEST_CASE_DIR / "test_model_demo.yaml"
dummy_data = {
"title": "Get user info",
"request": {"method": "GET", "url": "/users/1"},
"validate": [{"equals": ["status_code", 200]}]
}
YamlFile(path=dummy_path, data=dummy_data).save()
print(f"--- 已创建临时测试文件: {dummy_path}")
# 2. 加载文件并使用Pydantic模型进行校验
yaml_case = YamlFile(dummy_path)
print("\n--- 已加载YAML内容 ---\n", yaml_case.to_yaml())
case_model = CaseInfo(**yaml_case)
print("\n--- Pydantic模型校验成功 ---")
print(case_model.model_dump_json(indent=2, by_alias=True))
# 3. 清理临时文件
dummy_path.unlink()
print(f"\n--- 已清理临时文件: {dummy_path}")

View File

@@ -15,10 +15,10 @@ import time
import urllib.parse
import hashlib
from commons.databases import db
# from commons.databases import db
from commons.file_processors.processor_factory import get_processor_class
from commons import settings
# from commons.file_processors.yaml_processor import YamlProcessor as get_processor_class
from core import settings
logger = logging.getLogger(__name__)
@@ -62,31 +62,31 @@ def add(a, b):
return str(int(a) + int(b))
@Funcs.register("sql")
def sql(s: str) -> str:
res = db.execute_sql(s)
return res[0][0]
# @Funcs.register("sql")
# def sql(s: str) -> str:
# res = db.execute_sql(s)
#
# return res[0][0]
@Funcs.register("new_id")
def new_id():
# 自增,永不重复
id_file = get_processor_class(settings.id_path)
data = id_file.load()
data["id"] += 1
id_file.save(data)
return data["id"]
# @Funcs.register("new_id")
# def new_id():
# # 自增,永不重复
# id_file = get_processor_class(settings.id_path)
# data = id_file.load()
# data["id"] += 1
# id_file.save(data)
#
# return data["id"]
@Funcs.register("last_id")
def last_id() -> str:
# 不自增,只返回结果
id_file = get_processor_class(settings.id_path)
data = id_file.load()
return data["id"]
# @Funcs.register("last_id")
# def last_id() -> str:
# # 不自增,只返回结果
#
# id_file = get_processor_class(settings.id_path)
# data = id_file.load()
# return data["id"]
@Funcs.register("md5")
@@ -131,9 +131,9 @@ def rsa_decode(content: str) -> str:
...
@Funcs.register()
@Funcs.register("gen_phone")
def func_name_test():
...
return "我被替换了!!!"
if __name__ == '__main__':

View File

@@ -1,65 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: models.py
@date: 2024 2024/9/15 21:14
@desc: 声明yaml用例格式
"""
import logging
from typing import Union, Optional
from dataclasses import dataclass, field
import yaml
from commons import settings
logger = logging.getLogger(__name__)
@dataclass
class RequestModel:
method: str
url: str
headers: Optional[dict] = None
# body: Optional[Union[dict, str]] = None
params: Optional[Union[dict, str]] = None
@dataclass
class TestCaseModel:
title: str
request: RequestModel
extract: dict
validate: dict
parametrize: list = field(default_factory=list)
epic: str = field(default_factory=lambda: settings.allure_epic)
feature: str = field(default_factory=lambda: settings.allure_feature)
story: str = field(default_factory=lambda: settings.allure_story)
def __post_init__(self):
# 必填字段非空校验
if self.title is None:
raise ValueError("Title cannot be empty")
# 校验RequestModel
if isinstance(self.request, dict):
try:
self.request = RequestModel(**self.request) # RequestModel 的 __post_init__ 会被调用
except (TypeError, ValueError) as e:
raise ValueError(f"解析 'request' 字段失败: {e} (数据: {self.request})") from e
elif not isinstance(self.request, RequestModel): # 如果不是 dict 也不是 RequestModel
raise TypeError(
f"字段 'request' 必须是字典 (将在内部转换为 RequestModel) 或 RequestModel 实例, "
f"得到的是 {type(self.request).__name__}"
)
if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
# print(data)
case_info = TestCaseModel(**data)

View File

@@ -1,63 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: templates.py
@date: 2024 2024/9/22 22:20
@desc:
"""
import copy
import logging
import re
import string
from commons.funcs import Funcs
logger = logging.getLogger(__name__)
class Template(string.Template):
"""
1支持函数调用
2参数也可以是变量
"""
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
def render(self, mapping: dict) -> str:
s = self.safe_substitute(mapping) # 原有方法替换变量
s = self.safe_substitute_funcs(s, mapping)
return s
def safe_substitute_funcs(self, template, mapping) -> str:
"""
解析字符串中的函数名和参数,并将函数调用结果进行替换
:param template: 字符串
:param mapping: 上下文,提供要使用的函数和变量
:return: 替换后的结果
"""
mapping = copy.deepcopy(mapping)
logger.info(f"mapping更新前: {mapping}")
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
logger.info(f"mapping更新后: {mapping}")
def convert(mo):
func_name = mo.group("func_name")
func_args = mo.group("func_args").split(",")
func = mapping.get(func_name) # 读取指定函数
func_args_value = [mapping.get(arg, arg) for arg in func_args]
if func_args_value == [""]: # 处理没有参数的func
func_args_value = []
if not callable(func):
return mo.group() # 如果是不可调用的假函数,不进行替换
else:
return str(func(*func_args_value)) # 否则用函数结果进行替换
return self.call_pattern.sub(convert, template)