15 Commits

Author SHA1 Message Date
00791809df refactor: 重构执行引擎为上下文驱动架构
- 优化 WorkflowExecutor 与 Exchange支持 ExecutionEnv 资源注入。
 - 实现 Session 级别连接复用与变量池内存镜像化,消除重复 I/O 开销。
 - 引入 ChainMap 实现动态上下文切换,解决参数化变量与全局提取变量的优先级覆盖。
 - 完善变量提取与断言逻辑,确保跨用例变量流转的可靠性。
2026-03-14 11:45:52 +08:00
2116016a0d feat(executor): 重构用例加载与执行逻辑,支持参数化变量优先级
- 引入 CaseEntity 包装器,实现数据模型与执行上下文解耦。
 - 移除加载阶段的 deepcopy,优化大规模参数化用例的内存占用。
 - 实现 perform 阶段的局部变量注入,确保参数化数据优先级高于全局缓存。
2026-03-11 17:11:19 +08:00
293b5160fe fix(exchange,case_validator),refactor(),feat(model): 解决 Pydantic 模型初始化与变量占位符的类型冲突,优化变量替换逻辑,重构 CaseInfo 模型并引入延迟校验机制
- 引入 SmartInt 和 SmartDict 类型,支持 YAML 占位符与业务类型的自动转换。
- 优化 CaseInfo 互斥校验逻辑,确保 request 与 api_action 二选一。
- 统一使用 Pydantic V2 的 model_config 规范。
- 将变量替换时机提前至模型实例化之前,支持占位符在校验前完成真实值注入,
保证了 int/bool 等字段的类型转换正确性。
- 优化断言渲染时机,支持响应提取值关联。
2026-03-11 10:29:16 +08:00
69a96a0060 refactor(): 重构动态用例生成逻辑并解耦核心组件
- 将 `CaseGenerator` 拆分为 `CaseDataLoader`(数据加载)和 `CaseGenerator`(用例构造),实现单一职责原则。
- 引入 `TestTemplateBase` 作为纯净的方法挂载容器,避免逻辑代码污染测试用例。
- 优化 YAML 解析流程,将文件扫描、参数化解析与 pytest 方法构建逻辑完全分离。
- 改进装饰器写法,使用更直观的 @ 语法糖处理 Allure 和 pytest.mark.parametrize。
- 增强执行日志,通过类型注解和实例引用记录更详细的运行上下文。
2026-03-06 15:07:22 +08:00
300b5a92d4 refactor(): 优化测试用例数据的处理,优化代码结构
- 新增用例生成器和注册器
- 优化文件处理
2025-06-03 21:42:57 +08:00
2e9f1c12f7 feat,fix(): 优化funcs注册函数
- 优化 register 方法
2025-04-07 16:35:14 +08:00
4324cf37aa feat,fix(): 优化
- 优化 settings(使用环境变量)
- 修复bug
2025-03-19 17:03:13 +08:00
a50e00a4e1 refactor(): 优化文件读取,变量替换等
- 优化用例加载模块器
- 新增JSON文件读取模块
2025-03-09 17:23:25 +08:00
914b0301ba feat,fix(): 优化项目
- 优化yaml_processor(优化文件类型转换逻辑)
- 修复bug
2025-03-07 17:28:41 +08:00
a6996ed500 feat,fix(): 优化项目
- 优化yaml_processor(优化文件类型转换逻辑)
- 修复bug
2025-03-06 17:37:00 +08:00
31fad3f4e1 refactor(cases,yaml_processor): 优化测试用例加载功能以及文件加载功能
- 优化用例加载模块器
- 优化yaml文件读取模块
2025-03-06 00:26:43 +08:00
b8903798b8 refactor(files): 优化项目
- 重构files
- 新增yaml_processor(优化读取文件逻辑)
- 修复bug
2025-03-05 18:11:28 +08:00
698a95ac83 feat(funcs): 优化函数热加载
- 优化函数热加载模块funcs.py(由字典反射改为装饰器)
- 修复bug
2025-03-02 21:47:04 +08:00
1890918312 refactor(models): 优化项目
- 重构assert_all
- 优化目录结构
2025-02-28 17:48:20 +08:00
bc55dffe40 feat(): 优化项目
- 更新README
- 修复bug
2025-02-26 17:25:37 +08:00
51 changed files with 2335 additions and 694 deletions

22
.gitignore vendored
View File

@@ -2,4 +2,26 @@
.idea/
.venv/
poetry.lock
logs/
# --- 依赖与环境 ---
.venv
venv/
node_modules/
uv.lock
# --- 屏蔽outputs ---
outputs/
# --- Allure 报告 ---
temp/
reports/
.allure/
# --- pytest缓存 ---
.pytest_cache/
.allure_cache/
# --- 配置文件 ---
.env

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.11

View File

@@ -9,8 +9,26 @@
...
## 环境搭建
1安装JAVA
- 配置环境变量
```text
JAVA_HOME
java的安装路径
CLASSPATH
%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar
添加Path
%JAVA_HOME%\bin
%JAVA_HOME%\jre\bin
```
2安装allure
- 配置环境变量
```text
添加Path
allure安装目录\bin
```
...
## 使用方法

View File

@@ -1,51 +0,0 @@
feature: 特征
story: 事件
title: 查询用户信息
request:
method: get
url: http://119.91.19.171:40065/answer/api/v1/connector/info
headers:
Accept-Encoding: gzip, deflate
Accept-Language: zh_CN
Content-Type: application/json
Cookie: psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==
Host: 119.91.19.171:40065
Origin: http://119.91.19.171:40065
Referer: http://119.91.19.171:40065/users/login
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like
Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0
extract: # 提取变量
code:
- "json"
- "$.code"
- 0
msg:
- "json"
- "$.msg"
- 0
validate:
equals: # 断言相等
状态码等于200:
- 200
- ${code}
not_equals: # 断言不相等
状态码不等于404:
- 404
- ${code}
contains: # 断言包含
包含关系:
- 404
- ${code}
not_contains: # 断言不包含
不包含关系:
- 404
- ${code}
parametrize: # 数据驱动测试
- [ "title","username","password","code" ] # 变量名
- [ "测试1","user1","pass1","code1" ] # 变量值
- [ "测试2","user2","pass2","code2" ] # 变量值
- [ "测试3","user3","pass3","code3" ] # 变量值
- [ "测试4","user4","pass4","code4" ] # 变量值

View File

@@ -1,15 +0,0 @@
title: 查询用户信息
request:
method: get
url: "https://api.kuleu.com/api/action"
headers:
user-agent: 'Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, like Gecko) Chrome / 128.0.0.0Safari / 537.36'
params:
text: ${url_unquote(code)}
# data: ${code}
extract:
status_code: [ json, $.data,0 ]
validate:
codes: 200

View File

@@ -1,30 +0,0 @@
title: 查询用户信息
request:
method: get
url: http://119.91.19.171:40065/answer/api/v1/connector/info
headers:
Accept-Encoding: gzip, deflate
Accept-Language: zh_CN
Content-Type: application/json
Cookie: psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==
Host: 119.91.19.171:40065
Origin: http://119.91.19.171:40065
Referer: http://119.91.19.171:40065/users/login
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like
Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0
extract: # 提取变量
reason:
- "json"
- "$.reason"
- 0
validate:
# 断言 sql
contains: # 断言包含
用户在数据库中:
- "ltcs"
- ${sql(select username from user where id=1)}
not_contains: # 断言包含
用户不存在在数据库中:
- "ltcs"
- ${sql(select username from user where id=1)}

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: a_test_case.py
@date: 2024 2024/9/15 19:15
@desc:
"""
from requests import Session
import requests
session = Session()
def test_1():
base_url = "https://jsonplaceholder.typicode.com"
session.params = {
'Content-Type': 'application/json;charset=utf-8'
}
url = f"{base_url}/users"
payload = {}
# response = requests.request("POST", url, headers=headers, data=payload)
response = session.get(url, json=payload)
print(response.json()[0]["username"])
assert response.status_code == 200
def test_2():
base_url = r'https://api.kuleu.com/api/action'
params = {"text": "爱情"}
header = {
"user-agent": 'Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, like Gecko) '
'Chrome / 128.0.0.0Safari / 537.36'
}
response = requests.get(base_url, headers=header, params=params)
# print(response.text)
print(response.json())
print(response.request.url)
assert response.status_code == 200

28
api.py
View File

@@ -1,28 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: api.py
@date: 2024 2024/9/12 22:52
@desc:
"""
from commons.session import Session
# session = requests.session()
session = Session("https://jsonplaceholder.typicode.com")
session.params = {
'Content-Type': 'application/json;charset=utf-8'
}
url = "/users"
payload = {}
# response = requests.request("POST", url, headers=headers, data=payload)
response = session.get(url, json=payload)
# print(response.text)
# print(response.url)
# print(response)

11
api/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: __init__.py
@date: 2024 2024/9/15 21:13
@desc:
"""

32
api/user_api.py Normal file
View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python
# coding=utf-8
from core.base_api import BaseApi
class UserApi(BaseApi):
"""用户中心业务接口"""
def login(self, username, password):
"""登录接口示例"""
self._log_action("login", user=username)
payload = {
"username": username,
"password": password
}
# 直接调用继承自 session 的请求方法
return self.session.request(
method="POST",
url="/api/v1/login",
json=payload
)
def get_info(self, user_id: int):
"""获取用户信息示例"""
self._log_action("get_info", uid=user_id)
return self.session.request(
method="GET",
url=f"/api/v1/user/{user_id}"
)

80
commons/case_handler.py Normal file
View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_handler
@date: 2025/5/26 22:13
@desc:
"""
import json
import logging
from dataclasses import dataclass, asdict
from commons.models import TestCaseModel
logger = logging.getLogger(__name__)
@dataclass
class TestCaseHandle(TestCaseModel):
@classmethod
def new(cls, testcase: dict) -> 'TestCaseHandle':
try:
instance = cls(**testcase)
return instance
except (TypeError, ValueError) as e:
logger.warning(f"解析错误:{e}")
raise e
def to_string(self) -> str:
"""
将 字典 转换为 json 格式的字符串。
:return:
json 格式的字符串。
"""
try:
res = json.dumps(asdict(self), ensure_ascii=False)
return res
except TypeError as e:
logger.error(f"将数据转换为 json 字符串时出错: {e}")
raise e
@staticmethod
def to_dict(json_str: str) -> dict:
"""
将 json 格式的字符串转换为 字典.
:param
json_str: json 格式的字符串。
:return:
"""
try:
res = json.loads(json_str)
return res
except json.JSONDecodeError as e:
logger.error(f"将 json 字符串转换为字典时出错: {e}")
raise e
if __name__ == '__main__':
from pathlib import Path
from commons.file_processors import processor_factory
test_data = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml")
yaml_data = processor_factory.get_processor_class(test_data)
case_info = TestCaseHandle.new(yaml_data.load())
print(case_info.to_string())
print(type(case_info.to_string()))
print(case_info.to_dict(case_info.to_string()))
print(type(case_info.to_dict(case_info.to_string())))
print(type(case_info))
print(case_info.parametrize)
for i in case_info.parametrize:
print(i)

View File

@@ -14,83 +14,108 @@ import logging
import allure
import pytest
from commons.files import YamlFile
from core import settings
from commons.file_processors.yaml_processor import YamlProcessor as FileHandle
from commons.models import CaseInfo
from commons.session import Session
from commons.exchange import Exchange
from commons import settings
from core.session import Session
from core.exchange import Exchange
from utils import data_driver, case_validator
logger = logging.getLogger(__name__)
session = Session(settings.base_url)
cases_dir = Path(settings.cases_dir)
_case_path = Path(settings.case_path)
exchanger = Exchange(settings.exchanger)
@allure.epic("项目名称answer")
class TestAPI:
...
@classmethod
def find_yaml_case(cls, case_path: Path = _case_path):
def find_test_cases(cls, case_dir: Path = cases_dir):
"""
搜索和加载yaml文件
:return:
"""
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for yaml_path in yaml_path_list:
logger.info(f"load file {yaml_path=}")
case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for case_path in case_path_list:
logger.info(f"加载文件:{case_path}")
file = YamlFile(yaml_path) # 自动读取yaml文件
case_info = CaseInfo(**file) # 校验yaml格式
logger.debug(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
case_func = cls.new_case(case_info) # 从yaml格式转换为pytest格式
print(yaml_path.name)
setattr(cls, f"{yaml_path.name}", case_func) # 把pytest格式添加到类中
file = FileHandle(case_path) # 自动读取yaml文件
try:
CaseInfo(**file) # 校验用例格式
logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# print(case_path.stem)
setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
except Exception as e:
logger.error(e)
@classmethod
def new_case(cls, case_info: CaseInfo):
ddt_data = case_info.ddt()
print(ddt_data)
ddt_title = [data.title for data in ddt_data]
def new_case(cls, file_name, case_info: dict):
test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
@allure.feature(case_info.feature)
@allure.story(case_info.story)
@pytest.mark.parametrize("case_info", ddt_data, ids=ddt_title)
def test_func(self, case_info: CaseInfo):
allure.dynamic.title(case_info.title)
keys_list = list(test_case.keys())
logger.info(f"keys_list{keys_list}")
logger.info(f"用例开始执行:{case_info.title}".center(80, "="))
values_list = list(test_case.values())
logger.info(f"测试用例列表:{values_list}")
driver_title = [i.get("title") for i in values_list]
logger.info(f"driver_title={driver_title}")
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
@pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
def test_func(self, case_key):
logger.info(f"case_key{case_key}")
test_case_mapping = test_case.get(case_key)
logger.info(f"测试用例:{test_case_mapping}")
allure.dynamic.title(test_case_mapping.get("title"))
logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(case_info)
new_case_info = exchanger.replace(test_case_mapping)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.request)
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.extract.items():
print(var_name, extract_info)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(case_info) # 为断言加载变量
print(assert_case_info)
assert_case_info.assert_all() # 执行断言
assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{case_info.title}".center(80, "="))
logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
return test_func
# TestAPI.find_yaml_case()
if __name__ == '__main__':
TestAPI.find_yaml_case()
TestAPI.find_test_cases()
# print(TestAPI.__dict__)

View File

@@ -10,9 +10,10 @@
@desc:
"""
import logging
import os
import pymysql as MySQLdb
from commons import settings
logger = logging.getLogger(__name__)
@@ -32,14 +33,14 @@ class DBServer:
db = DBServer(
host=settings.db_host, # ip
port=3306, # 端口
user='root', # 用户名
password='mysql_hNahSe', # 密码
database='answer' # 库名
host=os.getenv("DB_HOST"), # ip
port=os.getenv("DB_PORT"), # 端口
user=os.getenv("DB_USER"), # 用户名
password=os.getenv("DB_PASSWORD"), # 密码
database=os.getenv("DB_DATABASE") # 库名
)
if __name__ == '__main__':
...
res = db.execute_sql('select username from user where id=1;')
print(res[0])
# res = db.execute_sql('select username from user where id=1;')
# print(res[0])

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: exchange.py
@date: 2024 2024/9/18 21:58
@desc:
"""
import copy
import json
import logging
import re
import allure
from commons.templates import Template
import jsonpath
from commons.files import YamlFile
from commons.models import CaseInfo
logger = logging.getLogger(__name__)
class Exchange:
def __init__(self, path):
self.file = YamlFile(path)
@allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index):
# resp中json是方法不是属性需要手动更改为属性
resp = copy.deepcopy(resp)
try:
resp.json = resp.json()
except json.decoder.JSONDecodeError:
resp.json = {"msg": "is not json data"}
data = getattr(resp, attr)
# print(data)
if expr.startswith("/"): # xpath
res = None
elif expr.startswith("$"): # jsonpath
data = dict(data)
res = jsonpath.jsonpath(data, expr)
else: # 正则
res = re.findall(expr, str(data))
# print(res)
if res: # 如果有数据
value = res[index]
else: # 如果没有数据
value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件
@allure.step("替换变量")
def replace(self, case_info: CaseInfo):
...
# 1将case_info转换为字符串
case_info_str = case_info.to_yaml()
# 2替换字符串
case_info_str = Template(case_info_str).render(self.file)
# 3将字符串转换成case_info
new_case_info = case_info.by_yaml(case_info_str)
return new_case_info
if __name__ == '__main__':
class MockResponse:
text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}'
def json(self):
return json.loads(self.text)
mock_resp = MockResponse()
# print(mock_resp.text)
# print(mock_resp.json())
exchanger = Exchange(r"E:\PyP\InterfaceAutoTest\extract.yaml")
exchanger.extract(mock_resp, "name", "json", '$.name', 0)
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0)
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
case_info = CaseInfo(
title="单元测试",
request={
"data":
{"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
},
extract={},
validate={}
)
new_case_info = exchanger.replace(case_info)
print(new_case_info)

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: __init__.py
@date: 2025/3/4 17:23
@desc:
"""
from .base_processor import BaseFileProcessor
from .json_processor import JsonProcessor
from .yaml_processor import YamlProcessor
from .processor_factory import get_processor_class
__all__ = [
"BaseFileProcessor",
"JsonProcessor",
"YamlProcessor",
"get_processor_class",
]

View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: base
@date: 2025/3/4 17:23
@desc:
"""
import abc
from pathlib import Path
from typing import Union
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
"""
文件处理器的抽象基类。
定义了所有子类必须实现的方法。
"""
def __init__(self, filepath: Union[str, Path], **kwargs):
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
@abc.abstractmethod
def load(self) -> dict:
"""加载."""
raise NotImplementedError
@abc.abstractmethod
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""将数据保存."""
raise NotImplementedError

View File

@@ -0,0 +1,86 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: yaml_processor
@date: 2025/3/4 17:28
@desc:
"""
import logging
from typing import Union, Any
from pathlib import Path
import json
from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__)
class JsonProcessor(BaseFileProcessor):
"""
用于处理 JSON 文件的类。
提供了从文件加载 JSON 数据为字典,以及将字典保存为 JSON 文件的功能。
"""
def __init__(self, filepath: Union[str, Path], **kwargs):
"""
初始化 JsonFile 对象。
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
"""
super().__init__(filepath, **kwargs)
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> dict[str, Any]:
"""
从 Json 文件加载数据。
:return:
"""
if not self.filepath.exists():
logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = json.load(f)
if not isinstance(loaded_data, dict): # 确保加载的是字典
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
return loaded_data
except json.JSONDecodeError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
raise e
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""
将字典数据保存到 json 文件。
Args:
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(
data,
f,
ensure_ascii=False, # 允许非ASCII字符
sort_keys=False # 不排序键
)
logger.info(f"数据已成功保存到 {filepath}")
except (TypeError, OSError, json.JSONDecodeError) as e:
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
raise e
if __name__ == '__main__':
# 示例用法
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
json_file = JsonProcessor(json_path)
print(json_file.load())
print(type(json_file))
# json_file.save()

View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from pathlib import Path
from typing import Type, Union
from commons.file_processors.base_processor import BaseFileProcessor
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
# 类型别名,表示处理器类的字典
ProcessorMap = dict[str, Type[BaseFileProcessor]]
processors: ProcessorMap = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
class UnsupportedFileTypeError(Exception):
"""当文件类型不被支持时抛出此异常。"""
pass
# def get_processor_class(file_suffix: str = "yaml") -> Type[BaseFileProcessor]:
def get_processor_class(fp: Union[Path, str]) -> 'BaseFileProcessor':
fp = Path(fp)
if fp.is_file():
file_suffix = fp.suffix[1:]
processor_class = processors.get(file_suffix.lower(), YamlProcessor) # 代理模式
return processor_class(fp) # 默认回退到 Yaml
else:
raise UnsupportedFileTypeError(fp)
# FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
# yaml_file = FileHandle(yaml_path)
# print(yaml_file.load())
# print(type(yaml_file))
# file_suffix = Path(yaml_path).suffix[1:]
# print(file_suffix)
get_processor = get_processor_class(yaml_path)
print(get_processor.load())

View File

@@ -0,0 +1,194 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: yaml_processor
@date: 2025/3/4 17:28
@desc:
"""
import logging
from typing import Union, Any
from pathlib import Path
import yaml
from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__)
class YamlLoadError(Exception):
"""自定义 YAML 加载异常:当 YAML 语法错误或不符合业务结构时抛出"""
pass
class YamlProcessor(BaseFileProcessor):
"""
用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。
"""
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
"""
初始化 YamlFile 对象。
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
"""
super().__init__(filepath=filepath)
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> dict[str, Any]:
"""
加载 YAML 文件并返回字典。
Returns:
Dict: 加载后的数据字典。
Raises:
YamlLoadError: 文件读取或解析过程中出现异常。
"""
if not self.filepath.exists():
logger.error(f"❌ 文件未找到: {self.filepath}")
return {}
try:
with open(self.filepath, "r", encoding="utf-8") as f:
content = yaml.safe_load(f)
# 情况1文件内容为空
if content is None:
return {}
# 情况2YAML 语法正确但不是字典(如单纯的字符串或列表)
if not isinstance(content, dict):
raise YamlLoadError(f"YAML 顶层格式错误:期望 dict实际为 {type(content).__name__}")
return content
except yaml.YAMLError as e:
msg = f"❌ YAML 语法错误 [{self.filepath.name}]: {e}"
logger.error(msg)
raise YamlLoadError(msg) from e
except Exception as e:
logger.error(f"📂 读取文件系统异常: {e}")
raise
@staticmethod
def to_string(data: dict[str, Any]) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
data,
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
except Exception as e:
logger.error(f"序列化 YAML 失败: {e}")
return ""
@staticmethod
def from_string(yaml_str: str) -> Union[None, dict]:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
yaml_str: YAML 格式的字符串。
"""
try:
data = yaml.safe_load(yaml_str)
return data if isinstance(data, dict) else {}
except yaml.YAMLError as e:
logger.error(f"YAML 字符串解析失败: {e}")
return {}
def save(self, data: dict[str, Any], new_filepath: Union[str, Path, None] = None):
"""
将字典数据保存为 YAML 文件。
Args:
data: 要保存的字典数据。
new_filepath: 可选,保存到新路径。
"""
target_path = Path(new_filepath) if new_filepath else self.filepath
try:
target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
yaml.safe_dump(
data,
stream=f,
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
logger.debug(f"💾 数据已成功保存至: {target_path}")
except Exception as e:
logger.error(f"🚫 保存 YAML 失败: {e}")
raise
except (TypeError, OSError) as e:
logger.error(f"保存 YAML 文件 {self.filepath} 时出错: {e}")
# todo 需要将异常的情况返回给上层而不是默认处理为{}
if __name__ == '__main__':
from core.settings import TEST_CASE_DIR
# 示例用法
yaml_path = TEST_CASE_DIR / r'answer/test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlProcessor(yaml_path)
print(yaml_file.load())
print(yaml_file.to_string(yaml_file.load()))
print(type(yaml_file))
# # 直接像字典一样访问数据
# print("加载的数据:", yaml_file) # 直接打印对象,就是打印字典内容
# print("title:", yaml_file.get("title")) # 使用 get 方法
# if "title" in yaml_file: # 使用 in 检查键
# print("原始title:", yaml_file["title"]) # 使用方括号访问
# yaml_file["title"] = "新的标题" # 使用方括号修改
# print("修改后的title:", yaml_file["title"])
# #
# yaml_file["new_key"] = "new_value" # 添加新的键值对
#
# # 将字典转换为 YAML 字符串
# yaml_string = yaml_file.to_string()
# print("\nYAML 字符串:", yaml_string)
# #
# # 将 YAML 字符串转换回字典 (并更新 yaml_file)
# yaml_file.to_dict(yaml_string)
# print("\n从字符串加载的数据:", yaml_file)
#
# # 保存修改后的数据 (覆盖原文件)
# yaml_file.save()
#
# # 保存到新文件
# new_yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user_new.yaml'
# yaml_file.save(new_filepath=new_yaml_path)
# 测试从字符串初始化
# yaml_string2 = """
# name: Test User
# age: 30
# """
# yaml_file2 = YamlFile("test2.yaml", data=yaml.safe_load(yaml_string2)) # 从字符串初始化
# print("\n从字符串初始化的 YamlFile:", yaml_file2)
# yaml_file2.save() # 保存到 test2.yaml
#
# 测试文件不存在的情形
# non_existent_file = YamlFile("non_existent_file.yaml")
# print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {}
# non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file)

View File

@@ -10,37 +10,89 @@
@desc: 读取和保存yaml文件
"""
import logging
from pathlib import Path
import yaml
from commons.models import CaseInfo
logger = logging.getLogger(__name__)
class YamlFile(dict):
def __init__(self, path):
def __init__(self, path=None, data=None):
super().__init__()
self.path = path
self.path = Path(path) if path else None
if data:
self.update(data)
elif self.path:
if self.path.is_dir():
raise IsADirectoryError(f"The path {self.path} is a directory, not a file.")
self.load()
def load(self):
if not self.path:
logger.warning("No path specified for YamlFile, cannot load.")
return self
if self.path.exists() and self.path.is_file():
with open(self.path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) # 字典
if data:
self.update(data) # 把两个字段的内容合并
loaded_data = yaml.safe_load(f) or {}
self.clear()
self.update(loaded_data)
else:
logger.warning(f"File not found at {self.path}, YamlFile initialized as empty.")
self.clear()
return self
def to_yaml(self) -> str:
return yaml.safe_dump(
dict(self),
allow_unicode=True,
sort_keys=False
)
@classmethod
def by_yaml(cls, yaml_str):
data = yaml.safe_load(yaml_str) or {}
return cls(data=data)
def save(self):
if not self.path:
raise ValueError("Cannot save YamlFile instance without a specified path.")
# 确保父目录存在
self.path.parent.mkdir(parents=True, exist_ok=True)
with open(self.path, "w", encoding="utf-8") as f:
yaml.safe_dump(
dict(self),
stream=f,
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False) # sort_keys保持原有排序
allow_unicode=True,
sort_keys=False
)
return self
if __name__ == '__main__':
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml'
yaml_file = YamlFile(yaml_path)
# yaml_file.load()
case_info = CaseInfo(**yaml_file)
yaml_file["title"] = "查询用户信息"
yaml_file.save()
from core.models import CaseInfo
from core.settings import TEST_CASE_DIR
# 1. 创建一个用于测试的临时yaml文件
dummy_path = TEST_CASE_DIR / "test_model_demo.yaml"
dummy_data = {
"title": "Get user info",
"request": {"method": "GET", "url": "/users/1"},
"validate": [{"equals": ["status_code", 200]}]
}
YamlFile(path=dummy_path, data=dummy_data).save()
print(f"--- 已创建临时测试文件: {dummy_path}")
# 2. 加载文件并使用Pydantic模型进行校验
yaml_case = YamlFile(dummy_path)
print("\n--- 已加载YAML内容 ---\n", yaml_case.to_yaml())
case_model = CaseInfo(**yaml_case)
print("\n--- Pydantic模型校验成功 ---")
print(case_model.model_dump_json(indent=2, by_alias=True))
# 3. 清理临时文件
dummy_path.unlink()
print(f"\n--- 已清理临时文件: {dummy_path}")

View File

@@ -15,48 +15,81 @@ import time
import urllib.parse
import hashlib
from commons.databases import db
# from commons.databases import db
# from commons.files import YamlFile
from commons import settings
# from commons.file_processors.yaml_processor import YamlProcessor as get_processor_class
from core import settings
logger = logging.getLogger(__name__)
class Funcs:
FUNC_MAPPING = {
"int": int,
"float": float,
"bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
@classmethod
def register(cls, name: str | None = None):
def decorator(func):
if name is None:
cls.FUNC_MAPPING[func.__name__] = func
cls.FUNC_MAPPING[name] = func
return func
return decorator
@Funcs.register("url_unquote")
def url_unquote(s: str) -> str:
return urllib.parse.unquote(s)
@Funcs.register("str")
def to_string(s) -> str:
# 将数据转换为str类型。
return f"'{s}'"
@Funcs.register("time_str")
def time_str() -> str:
return str(time.time())
@Funcs.register("add")
def add(a, b):
return str(int(a) + int(b))
def sql(s: str) -> str:
res = db.execute_sql(s)
return res[0][0]
# @Funcs.register("sql")
# def sql(s: str) -> str:
# res = db.execute_sql(s)
#
# return res[0][0]
def new_id():
# 自增,永不重复
id_file = YamlFile(settings.id_path)
id_file["id"] += 1
id_file.save()
return id_file["id"]
# @Funcs.register("new_id")
# def new_id():
# # 自增,永不重复
# id_file = get_processor_class(settings.id_path)
# data = id_file.load()
# data["id"] += 1
# id_file.save(data)
#
# return data["id"]
def last_id() -> str:
# 不自增,只返回结果
id_file = YamlFile("id.yaml")
return id_file["id"]
# @Funcs.register("last_id")
# def last_id() -> str:
# # 不自增,只返回结果
#
# id_file = get_processor_class(settings.id_path)
# data = id_file.load()
# return data["id"]
@Funcs.register("md5")
def md5(content: str) -> str:
# 1原文转为字节
content = content.encode("utf-8")
@@ -64,6 +97,7 @@ def md5(content: str) -> str:
return result
@Funcs.register("base64_encode")
def base64_encode(content: str) -> str:
# 1原文转二进制
content = content.encode("utf-8")
@@ -75,6 +109,7 @@ def base64_encode(content: str) -> str:
return encode_str
@Funcs.register("base64_decode")
def base64_decode(content: str) -> str:
# 1原文转二进制
content = content.encode("utf-8")
@@ -86,19 +121,24 @@ def base64_decode(content: str) -> str:
return decode_str
@Funcs.register("rsa_encode")
def rsa_encode(content: str) -> str:
...
@Funcs.register("rsa_decode")
def rsa_decode(content: str) -> str:
...
@Funcs.register("gen_phone")
def func_name_test():
return "我被替换了!!!"
if __name__ == '__main__':
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
# print(res)
a = "这是中文dddddd"
bb = base64_encode(a)
print(bb)
cc = base64_decode(bb)
print(cc)
# print(f"计数器:{new_id()}")
# print(f"当前数值:{last_id()}")
print(Funcs().FUNC_MAPPING)

View File

@@ -1,98 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: models.py
@date: 2024 2024/9/15 21:14
@desc: 声明yaml用例格式
"""
import logging
from dataclasses import dataclass, asdict
import allure
import yaml
from commons.templates import Template
from commons import settings
logger = logging.getLogger(__name__)
@dataclass
class CaseInfo:
title: str
request: dict
extract: dict
validate: dict
parametrize: list = ""
epic: str = settings.allure_epic
feature: str = settings.allure_feature
story: str = settings.allure_story
def to_yaml(self) -> str:
# 序列化成yaml字符串
yaml_str = yaml.safe_dump(
asdict(self),
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False)
return yaml_str
@classmethod
def by_yaml(cls, yaml_str):
# 反序列化
obj = cls(**yaml.safe_load(yaml_str))
return obj
@allure.step("断言")
def assert_all(self):
if not self.validate:
return
for assert_type, assert_value in self.validate.items():
for msg, data in assert_value.items():
a, b = data[0], data[1]
# print(assert_type, a, b, msg)
match assert_type:
case 'equals':
logger.info(f"assert {a} == {b}, {msg}")
assert a == b, msg
case 'not_equals':
logger.info(f"assert {a} != {b}, {msg}")
assert a != b, msg
case 'contains':
logger.info(f"assert {a} in {b}, {msg}")
assert a in b, msg
case 'not_contains':
logger.info(f"assert {a} not in {b}, {msg}")
assert a not in b, msg
# case "xxxxx
def ddt(self) -> list: # 返回一个列表列表中应该包含N个注入了变量的caseInfo
case_list = []
if not self.parametrize: # 没有使用数据驱动测试
case_list.append('')
else: # 使用数据驱动测试
args_name = self.parametrize[0]
args_value_list = self.parametrize[1:]
for args_value in args_value_list:
d = dict(zip(args_name, args_value))
# d 就是数据驱动测试的变量,应输入到用例中
case_info_str = self.to_yaml() # 转字符串
case_info_str = Template(case_info_str).render(d) # 输入变量
case_info = self.by_yaml(case_info_str) # 转成类
case_list.append(case_info) # 加入到返回值
return case_list
if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
# print(data)
case_info = CaseInfo(**data)
s = case_info.to_yaml()
print(s)
new_case_info = case_info.by_yaml(s)
print(new_case_info)

View File

@@ -1,28 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: settings
@date: 2025/2/23 21:34
@desc:
"""
base_url = 'http://127.0.0.1:8000'
case_path = r"E:\PyP\InterfaceAutoTest\TestCases"
exchanger = r"E:\PyP\InterfaceAutoTest\extract.yaml"
id_path =r"E:\PyP\InterfaceAutoTest\id.yaml"
db_host = '119.91.19.171' # ip
db_port = 3306 # 端口
db_user = 'root' # 用户名
db_password = 'mysql_hNahSe' # 密码
db_database = 'answer' # 库名
allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature"
allure_story: str = "默认事件story"
rsa_public = ""
rsa_private = ""

View File

@@ -1,82 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: templates.py
@date: 2024 2024/9/22 22:20
@desc:
"""
import copy
import logging
import re
import string
logger = logging.getLogger(__name__)
def _str(s) -> str:
# 将数据转换为str类型。
return f"'{s}'"
class Template(string.Template):
"""
1支持函数调用
2参数也可以是变量
"""
func_mapping = {
"str": _str,
"int": int,
"float": float,
"bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
def render(self, mapping: dict) -> str:
s = self.safe_substitute(mapping) # 原有方法替换变量
s = self.safe_substitute_funcs(s, mapping)
return s
def safe_substitute_funcs(self, template, mapping) -> str:
"""
解析字符串中的函数名和参数,并将函数调用结果进行替换
:param template: 字符串
:param mapping: 上下文,提供要使用的函数和变量
:return: 替换后的结果
"""
mapping = copy.deepcopy(mapping)
mapping.update(self.func_mapping) # 合并两个mapping
def convert(mo):
func_name = mo.group("func_name")
func_args = mo.group("func_args").split(",")
func = mapping.get(func_name) # 读取指定函数
func_args_value = [mapping.get(arg, arg) for arg in func_args]
if func_args_value == [""]: # 处理没有参数的func
func_args_value = []
if not callable(func):
return mo.group() # 如果是不可调用的假函数,不进行替换
else:
return str(func(*func_args_value)) # 否则用函数结果进行替换
return self.call_pattern.sub(convert, template)
def hot_load():
from commons import funcs
for func_name in dir(funcs): # 遍历模块中的所有函数
if func_name.startswith("_"):
continue
func_code = getattr(funcs, func_name) # 取到函数对象
if callable(func_code): # 如果是一个可以调用的函数
Template.func_mapping[func_name] = func_code # 函数放到Template中
hot_load()

60
conftest.py Normal file
View File

@@ -0,0 +1,60 @@
#!/usr/bin/env python
# coding=utf-8
"""
@desc: Pytest 配置文件,用于设置全局 Fixture 和钩子函数
"""
import pytest
from pathlib import Path
import logging
from core import settings
from commons.files import YamlFile
from core.context import VariableStore, ExecutionEnv
from core.executor import WorkflowExecutor
from core.models import RawSchema
from core.session import Session
from core.exchange import Exchange
from core.settings import EXTRACT_CACHE
logger = logging.getLogger(__name__)
@pytest.fixture(scope="session")
def api_env():
"""
工业级资源调度器
1. 保持全局单 Session (连接池复用)
2. 变量池 L2 内存镜像化 (减少 I/O)
"""
# Setup: 加载环境
store = VariableStore(settings.DATA_DIR / "extract.yaml")
exchanger = Exchange(variable_cache=store.store)
session = Session(settings.base_url)
executor = WorkflowExecutor()
env = ExecutionEnv(session, store, executor, exchanger)
yield env # 注入到测试用例中
# Teardown: 统一持久化与清理
store.persist()
session.close()
@pytest.fixture(scope="session")
def session():
"""全局共享的 Session Fixture"""
return Session(settings.base_url)
@pytest.fixture(scope="session")
def exchanger():
"""全局共享的 Exchange Fixture"""
return Exchange(EXTRACT_CACHE)
# @pytest.fixture(scope="session")
# def case_engine(session, exchanger):
# """全局共享的 CaseEngine Fixture"""
# return CaseEngine(session, exchanger)

15
core/base_api.py Normal file
View File

@@ -0,0 +1,15 @@
#!/usr/bin/env python
# coding=utf-8
import logging
from core.session import Session
from core import settings
class BaseApi:
def __init__(self, session: Session = None):
self.session = session or Session(base_url=settings.base_url)
self.logger = logging.getLogger(self.__class__.__name__)
def _log_action(self, method_name: str, **kwargs):
"""统一的动作日志记录"""
self.logger.info(f"执行动作: {method_name} | 参数: {kwargs}")

40
core/context.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com
@file: context
@date: 2026/3/14 09:07
@desc:
"""
from dataclasses import dataclass
from typing import Dict, Any
from pathlib import Path
from core.exchange import Exchange
from core.session import Session
from commons.file_processors.yaml_processor import YamlProcessor
class VariableStore:
"""内存变量仓库:负责 L2 缓存与磁盘的唯一交互"""
def __init__(self, seed_file: Path):
self.seed_file = seed_file
self.processor = YamlProcessor(seed_file)
# 启动时仅加载一次
self.store: Dict[str, Any] = self.processor.load() or {}
def persist(self):
"""测试结束时统一写盘"""
self.processor.save(self.store)
@dataclass
class ExecutionEnv:
"""环境上下文:持有共享资源"""
session: Session
store: VariableStore
exchanger: "Exchange"

179
core/creator.py Normal file
View File

@@ -0,0 +1,179 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com
@file: creator
@date: 2026/3/6 10:40
@desc:
"""
import logging
import allure
from pathlib import Path
from dataclasses import dataclass
from core import settings
from core.executor import WorkflowExecutor
from pydantic import ValidationError
from commons.file_processors.yaml_processor import YamlProcessor as FileHandle, YamlLoadError
from core.models import RawSchema # 导入之前定义的 Pydantic 模型
from typing import Any, List, Type, Generator, Union
logger = logging.getLogger(__name__)
@dataclass
class CaseEntity:
"""用例执行实体:解耦模型数据与执行上下文"""
step_data: RawSchema
row_context: dict[str, Any]
class TestTemplateBase:
"""
具体的测试用例容器。
此映射类不包含任何逻辑方法,仅用于承载由 Loader 挂载的 test_* 方法。
"""
pass
class CaseDataLoader:
"""
测试用例加载器
职责:扫描文件系统 -> 载入 YAML -> 拆解参数化 -> 封装为 CaseInfo 模型
"""
@staticmethod
def fetch_yaml_files(cases_dir: Union[str, Path]) -> Generator[Path, None, None]:
"""扫描目录并迭代返回 (文件路径, 原始内容)"""
base_path = Path(cases_dir)
if not base_path.exists():
logger.error(f"📂 测试目录不存在: {base_path}")
return
# 匹配所有以 test_ 开头的 yaml 文件
yield from base_path.rglob("test_*.yaml")
@classmethod
def load_cases(cls, file_path: Path) -> List[CaseEntity]:
"""
加载单个 YAML 文件并转化为 CaseInfo 列表
包含参数化数据的自动拆解逻辑
"""
entities: List[CaseEntity] = []
try:
# 1. 使用重构后的 YamlProcessor 加载原始字典
processor = FileHandle(file_path)
raw_data = processor.load()
if not raw_data:
return []
entities = cls._parse_parametrize(raw_data)
except YamlLoadError:
# YamlProcessor 已经记录了 error 日志,这里直接跳过
pass
except ValidationError as e:
logger.error(f"用例基础格式校验失败 [{file_path.name}]:\n{e.json()}")
except Exception as e:
logger.error(f"加载用例发生未知异常 [{file_path.name}]: {e}")
return entities
@staticmethod
def _parse_parametrize(raw_data: dict[str, Any]) -> List[CaseEntity]:
"""
解析参数化逻辑:将 raw_data 中的 parametrize 展开为多个 CaseInfo 实例
"""
entities = []
parametrize_data = raw_data.pop("parametrize", None)
# 2. 实例化唯一的模板对象 (Pydantic 校验)
template_case = RawSchema.model_validate(raw_data)
# template_case = CaseTemplate(**raw_data)
# 2. 检查是否存在参数化字段
if parametrize_data and isinstance(parametrize_data, list) and len(parametrize_data) >= 2:
# 3. 参数化拆分
headers = parametrize_data[0]
for row in parametrize_data[1:]:
row_map = dict(zip(headers, row))
# 包装为实体,存入引用而非副本
# 修正: 使用 model_copy() 避免多个用例共享同一个 Pydantic 模型实例,防止意外修改
entities.append(CaseEntity(step_data=template_case.model_copy(), row_context=row_map))
else:
# 普通用例,上下文为空
entities.append(CaseEntity(step_data=template_case.model_copy(), row_context={}))
return entities
@classmethod
def get_all_cases(cls, cases_dir: Union[str, Path]) -> List[CaseEntity]:
"""
全量获取接口:供 CaseGenerator 调用 frank
"""
all_cases = []
for file in cls.fetch_yaml_files(cases_dir):
all_cases.extend(cls.load_cases(file))
return all_cases
class CaseGenerator:
"""
职责 2: 用例构造工厂
负责将数据转化为 pytest 装饰的方法,并挂载到目标类
"""
@classmethod
def build_and_register(cls, target_cls: Type[TestTemplateBase], cases_dir: Union[str, Path]):
# 1. 通过 Loader 获取数据
all_cases = CaseDataLoader.get_all_cases(cases_dir)
for index, case_info in enumerate(all_cases):
case_title = case_info.row_context.get("title") or case_info.step_data.title
dynamic_test_method = cls._create_case_method(title=case_title, entity=case_info)
safe_title = "".join([c if c.isalnum() else "_" for c in case_title])[:50]
method_name = f"test_{index:03d}_{safe_title}"
print(method_name)
setattr(target_cls, method_name, dynamic_test_method)
print(target_cls.__dict__)
logger.debug(f"Successfully registered: {method_name}")
@staticmethod
def _create_case_method(title, entity: CaseEntity):
"""封装具体的 pytest 执行节点"""
case_template = entity.step_data
context = entity.row_context
def build_actual_case(instance: TestTemplateBase, api_env):
# --- 1. 动态设置 Allure 报告属性 ---
allure.dynamic.epic(case_template.epic or settings.allure_epic)
allure.dynamic.feature(case_template.feature or settings.allure_feature)
allure.dynamic.story(case_template.story or settings.allure_story)
allure.dynamic.title(title)
# 日志记录 (利用 instance 标注来源)
logger.info(f"[Runner] Class: {instance.__class__.__name__} | Case: {title}")
try:
WorkflowExecutor.perform(case_template, api_env, context=context)
except Exception as e:
# 可以在这里记录更详细的运行上下文快照
logger.error(f"Case 执行失败: {title} | 错误: {e}")
raise
return build_actual_case
if __name__ == '__main__':
from settings import TEST_CASE_DIR
# print(CaseDataLoader.get_all_cases(TEST_CASE_DIR))
# --- 引导执行 ---
CaseGenerator.build_and_register(TestTemplateBase, settings.TEST_CASE_DIR)

195
core/exchange.py Normal file
View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python
# coding=utf-8
"""
@desc: 变量交换器,用于数据替换和提取
"""
import logging
import re
from typing import Any, Union, TypeVar
import jsonpath
from lxml import etree
from core.models import RawSchema
from core.settings import EXTRACT_CACHE
from core.templates import Template
from commons.file_processors.yaml_processor import YamlProcessor
logger = logging.getLogger(__name__)
# 定义泛型,用于保持返回类型一致
T = TypeVar("T", bound=Union[dict, list, str, Any])
class Exchange:
def __init__(self, variable_cache: dict[str, Any]):
self._cache = variable_cache
# 匹配标准变量 ${var},排除函数调用 ${func()}
self.var_only_pattern = re.compile(r"^\$\{([a-zA-Z_]\w*)}$")
@property
def global_vars(self) -> dict:
return self._cache
@global_vars.setter
def global_vars(self, global_vars: dict) -> None:
self._cache = global_vars
def extract(self, resp: Any, var_name: str, attr: str, expr: str, index: int = 0):
"""
从响应中提取数据并更新到缓存及文件
:param resp: Response 对象
:param var_name: 变量名
:param attr: 属性名 (json, text, headers 等)
:param expr: 提取表达式 ($.jsonpath, //xpath, regex)
:param index: 索引
"""
try:
# 兼容处理 resp.json
target_data = getattr(resp, attr, None)
if attr == "json":
try:
target_data = resp.json()
except Exception:
target_data = {"msg": "not json data"}
if target_data is None:
logger.warning(f"提取失败: 响应对象中不存在属性 '{attr}'")
return
value = None
if expr.startswith("$"): # JSONPath
res = jsonpath.jsonpath(target_data, expr)
if res: value = res[index]
elif expr.startswith("/") or expr.startswith("./"): # XPath 模式
html_content = getattr(resp, "text", "") # 使用 getattr 防护
if not html_content:
logger.warning("XPath 提取失败:响应文本为空")
return
# 将文本解析为 HTML 树
# html_content = resp.text
tree = etree.HTML(html_content)
res = tree.xpath(expr)
if res:
# 获取节点文本或属性值
target_node = res[index]
value = target_node.text if hasattr(target_node, 'text') else str(target_node)
else: # 正则
res = re.findall(expr, str(target_data))
if res: value = res[index]
if value is None:
logger.warning(f"变量 [{var_name}] 未通过表达式 [{expr}] 提取到数据")
value = "not data"
self._cache[var_name] = value
logger.info(f"变量提取成功: {var_name} -> {value} (Type: {type(value).__name__})")
except Exception as e:
logger.error(f"提取变量 [{var_name}] 过程中发生异常: {e}", exc_info=True)
def _smart_replace(self, content: Any) -> Any:
"""
递归替换逻辑:
- 如果是纯变量占位符 ${token},则返回变量在缓存中的原始类型 (int, dict, list 等)
- 如果是混合字符串或函数调用,则调用 Template 渲染为字符串
"""
if isinstance(content, dict):
return {k: self._smart_replace(v) for k, v in content.items()}
elif isinstance(content, list):
return [self._smart_replace(i) for i in content]
elif isinstance(content, str):
# A. 场景:纯变量(为了保持类型,不走 Template 渲染成字符串)
# 例子content = "${order_id}",如果 order_id 是 int 123则返回 123
full_match = self.var_only_pattern.fullmatch(content)
if full_match:
var_name = full_match.group(1)
return self._cache.get(var_name, content)
# B. 场景:混合文本或函数调用
# 例子:"Bearer ${token}" 或 "${gen_phone()}"
if "${" in content:
# 调用你提供的 Template 类
return Template(content).render(self._cache)
return content
def replace(self, data: T) -> T:
"""
通用的变量替换入口
支持输入 dict, list, str 或 Pydantic Model (需先 dump)
"""
if not data:
return data
logger.debug(f"开始变量替换,原始数据类型: {type(data).__name__}")
rendered_data = self._smart_replace(data)
return rendered_data
if __name__ == "__main__":
from core.models import RawSchema, HttpAction
file_handler = YamlProcessor(filepath=EXTRACT_CACHE)
variable_cache_ = file_handler.load() or {}
ex = Exchange(variable_cache_)
# --- 场景 1: 变量提取验证 ---
class MockResponse:
def __init__(self):
self.json_data = {"data": {"token": "auth_123", "user_id": 888}}
self.text = "<html><body><div id='name'>ChenWei</div></body></html>"
def json(self): return self.json_data
mock_resp = MockResponse()
print(">>> 执行提取...")
ex.extract(mock_resp, "token", "json", "$.data.token")
ex.extract(mock_resp, "u_id", "json", "$.data.user_id")
ex.extract(mock_resp, "user_name", "text", "//div[@id='name']")
# --- 场景 2: 变量替换与类型保持 ---
# 定义一个复杂的 CaseInfo
raw_case = {
"title": "测试用例",
"action": {
"method": "POST",
"url": "http://api.com/${token}", # 混合文本 -> 应转为 str
"json_body": {
"id": "${u_id}", # 纯变量 -> 应保持 int
"name": "${user_name}", # 纯变量 -> str
"config": "${existing_var}" # 初始文件变量 -> int
},
"timeout": "${existing_var}" # 字符串形式的数字 -> Pydantic 应转回 int
}
}
print("\n>>> 执行替换...")
new_case_one = ex.replace(raw_case)
print(new_case_one)
RawSchema(**new_case_one)
print(new_case_one.get("action"))
action = HttpAction(**new_case_one.get("action"))
print(action)
# # --- 校验结果 ---
print("\n--- 验证结果 ---")
print(f"URL (混合文本): {action.url} | 类型: {type(action.url)}")
print(f"ID (类型保持): {action.json_body['id']} | 类型: {type(action.json_body['id'])}")
print(f"Timeout (自动转换): {action.timeout} | 类型: {type(action.timeout)}")
# #
assert isinstance(action.json_body['id'], int)
# #
assert action.url == "http://api.com/auth_123"
assert action.timeout == 100
print("\nExchange 场景全部验证通过!")

142
core/executor.py Normal file
View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python
# coding=utf-8
"""
@desc: 核心测试用例执行引擎
"""
import logging
import importlib
from typing import Any, List, Optional
from collections import ChainMap
from pydantic import TypeAdapter
from core import settings
from core.context import ExecutionEnv
from core.models import RawSchema, ValidateItem, HttpAction, ApiActionModel
from core.session import Session
from core.exchange import Exchange
from utils.case_validator import CaseValidator
logger = logging.getLogger(__name__)
# 定义一个复用的适配器(减少初始化开销)
VALIDATE_LIST_ADAPTER = TypeAdapter(List[ValidateItem])
class WorkflowExecutor:
@classmethod
def perform(cls, case_info: RawSchema, env: ExecutionEnv, context: Optional[dict[str, Any]] = None) -> Any:
"""执行单个用例支持直接请求和PO模式调用"""
context = context or {}
# --- 重点 1备份并切换上下文 ---
# 保存 Exchange 当前的全局字典引用
original_cache = env.exchanger.global_vars
# 1. 建立优先级变量池 (参数化变量 > 全局提取变量)
# ChainMap 是实现“局部覆盖全局”性能最好的方案
combined_vars = ChainMap(context, original_cache)
# 将 Exchange 的内部缓存临时指向这个合并池
env.exchanger.global_vars = combined_vars
resp = None # 初始化 resp避免异常时引用未定义
try:
# 2. 动态更新标题(如果 context 中包含 title
current_title = context.get("title") or case_info.title
logger.info(f"🚀 执行用例: {current_title}")
raw_action_dict = case_info.action.model_dump(by_alias=True, exclude_none=True)
rendered_action_dict = env.exchanger.replace(raw_action_dict)
# --- 2. 决定执行模式 ---
if case_info.is_po_mode():
# 重新校验以修复类型(如 params 里的 int
rendered_action = ApiActionModel.model_validate(rendered_action_dict)
# PO 模式:反射调用
resp = cls._execute_po_method(rendered_action, env)
else:
# 接口模式:直接请求
rendered_request = HttpAction.model_validate(rendered_action_dict)
request_kwargs = rendered_request.model_dump(by_alias=True, exclude_none=True)
resp = env.session.request(**request_kwargs)
# --- 3. 后处理:提取与断言 ---
cls._post_process(resp, case_info, env, original_cache)
return resp
except Exception as e:
logger.error(f"用例执行失败: {case_info.title} | 原因: {e}", exc_info=True)
raise
finally:
# 兜底确保环境还原 (尽管 try 块中已经还原了一次,这里确保异常情况下也复位)
env.exchanger.global_vars = original_cache
@staticmethod
def _execute_po_method(action: ApiActionModel, env: ExecutionEnv):
"""核心反射逻辑:根据字符串动态加载 api/ 目录下的类并执行方法"""
class_name = action.api_class
method_name = action.method
params = action.params or {}
# 1. 确定模块路径:优先级策略
# 优先级 1: 显式映射 (API_MAP)
module_name = settings.API_MAP.get(class_name)
# 优先级 2: 规约命名 (UserAPI -> api.user_api)
if not module_name:
base_name = class_name.lower().replace('api', '')
module_name = f"{settings.API_PACKAGE}.{base_name}_api"
try:
# 1. 动态导入模块(假设都在 api 目录下)
module = importlib.import_module(module_name)
# 2. 获取类并实例化
cls = getattr(module, class_name)
api_instance = cls(env.session) # 传入 session 保持会话统一
# 3. 调用方法并返回结果
method = getattr(api_instance, method_name)
logger.info(f"调用业务层: {class_name}.{method_name} 参数: {params}")
return method(**params)
except ImportError as e:
logger.error(f"模块导入失败: 在 '{module_name}' 未找到对应文件。请检查文件名或 settings.API_MAP 配置。")
raise e
except AttributeError as e:
logger.error(f"成员获取失败: 模块 '{module_name}' 中不存在类或方法 '{class_name}.{method_name}'")
raise e
except Exception as e:
logger.error(f"反射调用失败: {class_name}.{method_name} -> {e}")
raise
@classmethod
def _post_process(cls, resp: Any, case_info: RawSchema, env: ExecutionEnv, original_cache: dict):
"""
统一后处理逻辑:处理变量提取(写全局)和断言校验(读局部+全局)
"""
# 记录当前的混合上下文 (ChainMap),供断言使用
combined_vars = env.exchanger.global_vars
# 1. 变量提取 (Write Operation)
if case_info.extract:
try:
# 必须切回 original_cache 才能持久化写入到全局变量池
env.exchanger.global_vars = original_cache
for var_name, extract_info in case_info.extract.items():
env.exchanger.extract(resp, var_name, *extract_info)
finally:
# 提取完成后,切回 combined_vars防止后续逻辑如断言丢失局部变量上下文
env.exchanger.global_vars = combined_vars
# 2. 断言校验 (Read Operation)
if case_info.validate_data:
raw_validate_list = [
item.model_dump(by_alias=True) if isinstance(item, ValidateItem) else item
for item in case_info.validate_data
]
rendered_validate_list = env.exchanger.replace(raw_validate_list)
# 重新通过 Adapter 触发类型修复 (str -> int)
final_validate_data = VALIDATE_LIST_ADAPTER.validate_python(rendered_validate_list)
CaseValidator.validate(resp, final_validate_data)

128
core/models.py Normal file
View File

@@ -0,0 +1,128 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: models.py
@date: 2024 2024/9/15 21:14
@desc: 声明yaml用例格式
"""
import logging
from typing import List, Any
from pydantic import BaseModel, Field, ConfigDict
logger = logging.getLogger(__name__)
class HttpAction(BaseModel):
method: str = Field(..., description="HTTP 请求方法: get, post, etc.")
url: str = Field(..., description="接口路径或完整 URL")
headers: dict[str, Any] | None = Field(default=None, description="HTTP 请求头")
params: dict[str, Any] | None = Field(default=None, description="URL 查询参数")
data: dict[str, Any] | None = None
json_body: Any | None = Field(default=None, alias="json")
timeout: int = 10
files: dict[str, Any] | None = None
model_config = ConfigDict(extra="allow", populate_by_name=True)
class ApiActionModel(BaseModel):
module: str = Field(..., alias="class", description="要调用的 API 类名")
method: str = Field(..., description="类中的方法名")
params: dict[str, Any] = Field(default_factory=dict, description="传给方法的参数")
model_config = ConfigDict(populate_by_name=True)
class ValidateItem(BaseModel):
check: str = Field(..., description="要检查的字段或表达式")
assert_method: str = Field(alias="assert", default="equals")
expect: Any = Field(..., description="期望值")
msg: str = Field(default="Assertion", description="断言描述")
model_config = ConfigDict(populate_by_name=True)
class RawSchema(BaseModel):
title: str = Field(..., description="用例标题")
epic: str | None = None
feature: str | None = None
story: str | None = None
# 统一使用 action 字段承载业务逻辑 (Http 或 PO)
action: dict[str, Any] = Field(description="请求内容或PO动作内容")
extract: dict[str, List[Any]] | None = Field(
default=None,
description="变量提取表达式,格式: {变量名: [来源, 表达式, 索引]}"
)
validate_data: List[Any] = Field(
default_factory=list,
alias="validate",
description="断言信息"
)
model_config = ConfigDict(extra="allow",
populate_by_name=True, # 无论是在代码中用 api_class 还是在 YAML 中用 class 赋值Pydantic 都能正确识别。
arbitrary_types_allowed=True # 允许在模型中使用非 Pydantic 标准类型(如自定义类实例)
) # 允许参数化等额外字段
def is_po_mode(self) -> bool:
"""判断是否为 PO 模式"""
return "class" in self.action or "module" in self.action
if __name__ == '__main__':
# 模拟数据 1标准请求模式
raw_case_1 = {
"title": "查询状态信息",
"action": {
"method": "get",
"url": "/api/v1/info",
"headers": {"User-Agent": "pytest-ai"},
"json": {"User-Agent": "pytest-ai"}
},
"validate": [
{"check": "status_code", "assert": "equals", "expect": 200, "msg": "响应码200"},
{"check": "$.msg", "expect": "Success"}
]
}
# 模拟数据 2PO 模式 (反射调用)
raw_case_2 = {
"title": "用户登录测试",
"action": {
"class": "UserAPI",
"method": "login",
"params": {"user": "admin", "pwd": "123"}
},
"extract": {
"token": ["json", "$.data.token", 0]
}
}
print("--- 开始模型校验测试 ---\n")
try:
# 验证模式 1
case1 = RawSchema(**raw_case_1)
print(f"✅ 模式1 (Request) 校验通过: {case1.title}")
print(f" http: {case1.action}")
print(f" 断言规则数: {len(case1.validate_data)}\n")
# 验证模式 2
case2 = RawSchema(**raw_case_2)
print(f"✅ 模式2 (PO Mode) 校验通过: {case2.title}")
print(f" api: {case2.action}")
print(f" 提取规则数: {len(case2.extract)}\n")
# 验证非法数据(如:既没有 request 也没有 api_action 的情况可以在业务层进一步校验)
# 这里演示 Pydantic 自动类型转换
invalid_data = {"title": "错误用例", "action": {"url": "/api"}} # 缺少 method
print("--- 预期失败测试 ---")
RawSchema(**invalid_data)
except Exception as e:
print(f"❌ 预期内的校验失败: \n{e}")

View File

@@ -9,14 +9,16 @@
@date: 2024 2024/9/12 21:56
@desc:
"""
from urllib.parse import urljoin
import logging
from urllib.parse import urljoin
import requests
import allure
from requests import Response, PreparedRequest
import allure
logger = logging.getLogger("requests.session")
# logger = logging.getLogger("requests.session")
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
@@ -41,7 +43,8 @@ class Session(requests.Session):
logger.info(f"接收响应 <<<<<< 状态码 = {resp.status_code}")
logger.info(f"接收响应 <<<<<< 响应头 = {resp.headers}")
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.text}")
# logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
return resp

64
core/settings.py Normal file
View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: settings
@date: 2025/2/23 21:34
@desc:
"""
import os
from pathlib import Path
from dotenv import load_dotenv
BASE_DIR = (Path(__file__)).resolve().parents[1]
load_dotenv()
# --- 目录配置 ---
TEST_CASE_DIR = BASE_DIR / "test_cases"
OUTPUT_DIR = BASE_DIR / "outputs"
SCREENSHOT_DIR = OUTPUT_DIR / "screenshots"
LOG_DIR = OUTPUT_DIR / "logs"
LOG_BACKUP_DIR = LOG_DIR / "backups"
ALLURE_TEMP = BASE_DIR / "temp"
REPORT_DIR = BASE_DIR / "reports"
CONFIG_DIR = BASE_DIR / "config"
DATA_DIR = BASE_DIR / "data"
# 需要初始化的目录列表
REQUIRED_DIRS = [LOG_DIR, LOG_BACKUP_DIR, ALLURE_TEMP, SCREENSHOT_DIR]
# 核心 API 目录路径
API_PACKAGE = "api"
LOG_SOURCE = LOG_DIR / "pytest.log"
EXTRACT_CACHE = BASE_DIR / "data/extract.yaml"
# 可选:显式映射(类名 -> 完整模块路径),解决文件名不规则的问题
API_MAP = {
"UserAPI": "api.business.user",
"OrderAPI": "api.v2.order_manager"
}
allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature"
allure_story: str = "默认事件story"
test_suffix = "yaml"
base_url = os.getenv("BASE_URL")
rsa_public = ""
rsa_private = ""
if __name__ == '__main__':
print(BASE_DIR)

180
core/templates.py Normal file
View File

@@ -0,0 +1,180 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: templates.py
@date: 2024 2024/9/22 22:20
@desc:
"""
import copy
import logging
import re
import string
import ast
from typing import List, Any
from commons.funcs import Funcs
logger = logging.getLogger(__name__)
class Template(string.Template):
"""
增强型模板引擎:
1. 兼容标准变量替换 ${var}
2. 支持带参数的函数调用 ${func(arg1, arg2)}
3. 支持变量嵌套作为函数参数 ${func(${var})}
"""
# call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
# call_pattern = re.compile(r"\$\{(?P<func_name>[a-zA-Z_]\w*)\((?P<func_args>.*)\)}")
# 匹配函数调用结构:${函数名(参数)}
# 分组func_name (字母下划线开头), func_args (括号内的所有内容)
call_pattern = re.compile(r"\$\{(?P<func_name>[a-zA-Z_]\w*)\((?P<func_args>.*)\)}")
def render(self, mapping: dict) -> str:
"""
渲染入口
:param mapping: 变量缓存(来自 Exchange._variable_cache
:return: 渲染后的字符串
"""
# 1. 第一步:利用原生 string.Template 替换基础变量
# 这一步会将参数中的 ${var} 预先替换为实际值,从而支持函数嵌套调用
s = self.safe_substitute(mapping) # 原有方法替换变量
# 2. 第二步:解析并执行函数调用
s = self.safe_substitute_funcs(s, mapping)
return s
@staticmethod
def _parse_args(args_str: str, mapping: dict) -> List[Any]:
"""
核心优化:安全拆分函数参数
利用正则预读,跳过引号内的逗号,解决 ${func('a,b', 123)} 的分割问题
"""
args_str = args_str.strip()
if not args_str:
return []
# 正则解析说明:匹配逗号,但该逗号后面必须有偶数个引号(说明逗号不在引号内)
raw_args = re.split(r',(?=(?:[^\'"]*[\'"][^\'"]*[\'"])*[^\'"]*$)', args_str)
processed_args = []
for arg in raw_args:
arg = arg.strip()
# 1. 处理带引号的字符串参数
if (arg.startswith("'") and arg.endswith("'")) or (arg.startswith('"') and arg.endswith('"')):
processed_args.append(arg[1:-1])
# 2. 处理数字类型
elif arg.isdigit():
processed_args.append(int(arg))
# 3. 处理布尔值
elif arg.lower() == "true":
processed_args.append(True)
elif arg.lower() == "false":
processed_args.append(False)
# 4. 如果在 mapping 中能找到(针对未经过第一步替换的情况),取其值
elif arg in mapping:
processed_args.append(mapping[arg])
# 5. 其他情况按原样字符串处理
else:
processed_args.append(arg)
return processed_args
def safe_substitute_funcs(self, template: str, mapping: dict) -> str:
"""
解析字符串中的函数名和参数,并将函数调用结果进行替换
:param template: 字符串
:param mapping: 上下文,提供要使用的函数和变量
:return: 替换后的结果
"""
# 合并函数映射和变量映射,作为统一上下文
# 使用解构赋值替代 deepcopy提升性能
logger.info(f"mapping更新前: {mapping}")
render_context = {**Funcs.FUNC_MAPPING, **mapping}
logger.info(f"mapping更新后: {render_context}")
# mapping = copy.deepcopy(mapping)
# logger.info(f"mapping更新前: {mapping}")
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
# mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
# logger.info(f"mapping更新后: {mapping}")
def convert(mo):
func_name = mo.group("func_name")
# func_args = mo.group("func_args").split(",")
func_args_str = mo.group("func_args")
func = render_context.get(func_name) # 读取指定函数
if not callable(func):
logger.warning(f"模板中的函数 '{func_name}' 未定义或不可调用")
return mo.group()
# 解析参数列表
args = self._parse_args(func_args_str, render_context)
try:
# 执行函数并强制转为字符串返回,以便 re.sub 替换
result = func(*args)
return str(result)
except Exception as e:
logger.error(f"执行函数 ${{{func_name}(...)}} 报错: {e}", exc_info=True)
return mo.group()
return self.call_pattern.sub(convert, template)
if __name__ == '__main__':
# 模拟 Funcs.FUNC_MAPPING
def mock_concat(a, b):
return f"{a}_{b}"
def mock_get_now():
return "2026-03-09"
def mock_add(x, y):
return x + y
# 注入模拟函数
Funcs.FUNC_MAPPING = {
"concat": mock_concat,
"now": mock_get_now,
"add": mock_add
}
# 模拟变量缓存
test_mapping = {
"env": "prod",
"num1": 10,
"num2": 20
}
test_cases = [
("场景A标准变量", "Current env is ${env}", "Current env is prod"),
("场景B无参数函数", "Date: ${now()}", "Date: 2026-03-09"),
("场景C带参数函数(含逗号)", "Res: ${concat('hello,world', 'test')}", "Res: hello,world_test"),
("场景D变量嵌套函数参数", "Sum: ${add(${num1}, ${num2})}", "Sum: 30"),
("场景E混合模式", "URL: /${env}/api/${now()}", "URL: /prod/api/2026-03-09"),
("场景F参数类型自动识别", "Value: ${add(5, 5)}", "Value: 10"), # 5应该被识别为int
]
print(f"{'测试场景':<25} | {'预期结果':<30} | {'实际结果'}")
print("-" * 80)
for scene, tpl_str, expected in test_cases:
actual = Template(tpl_str).render(test_mapping)
status = "" if str(actual) == str(expected) else ""
print(f"{scene:<25} | {expected:<30} | {actual} {status}")
# 特殊验证:嵌套失败回退
print("\n>>> 验证未定义函数回退:")
error_tpl = "Check: ${undefined_func()}"
print(f"结果: {Template(error_tpl).render(test_mapping)}")

1
data/extract.yaml Normal file
View File

@@ -0,0 +1 @@
existing_var: '100'

1
data/id.yaml Normal file
View File

@@ -0,0 +1 @@
id: 13

56
docs/README.md Normal file
View File

@@ -0,0 +1,56 @@
# Project Structure Documentation
This document outlines the recommended structure for the Interface Automation Test project. A well-organized structure promotes maintainability, scalability, and collaboration.
## Directory Structure
Here is the proposed optimized directory structure:
```
/
|-- core/ # Main source code
| |-- api.py
| |-- main.py
| |-- luffy.py
| +-- ...
|
|-- tests/ # Test cases
| |-- a_test_case.py
| +-- ...
|
|-- config/ # Configuration files
| |-- id.yaml
| |-- extract.yaml
| +-- ...
|
|-- utils/ # Utility modules
|
|-- docs/ # Project documentation
| +-- README.md
|
|-- .gitignore # Git ignore file
|-- pytest.ini # Pytest configuration
|-- pyproject.toml # Python project configuration
|-- README.md # Main project README
```
## Description of Directories
* **`core/`**: This directory contains the core application logic for the interface tests. Files like `api.py`, `main.py`, and `luffy.py` which handle the main business logic should reside here.
* **`tests/`**: This directory is for all the automated tests. Each test file should ideally correspond to a module or a feature.
* **`config/`**: This directory should store all configuration files, such as `id.yaml` and `extract.yaml`. This separation makes it easier to manage different environments (e.g., development, staging, production).
* **`utils/`**: This directory holds common utility functions and helper scripts that can be used across different parts of the project.
* **`docs/`**: This directory contains all project-related documentation, including this structure guide.
## Benefits of this Structure
* **Clarity**: A clear separation of concerns makes it easy to find code.
* **Maintainability**: Easier to maintain and refactor code without affecting other parts of the system.
* **Scalability**: The structure can easily scale as the project grows in complexity.
* **Collaboration**: New developers can quickly understand the project layout and start contributing.
We recommend moving the existing files into this new structure to improve the overall quality of the project.

View File

@@ -1,3 +0,0 @@
code: 200
msg: 成功。
reason: base.success

View File

@@ -1 +0,0 @@
"id":0

View File

@@ -1,18 +0,0 @@
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_1_user.yaml')'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_2_url.yaml')'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_3_sql.yaml')'
''02/23/2025 10:17:34 PM' [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - ---------------Start: main.py::TestAPI::test_1_user.yaml[查询用户信息0]---------------'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:67 - =================================用例开始执行:查询用户信息=================================='
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:71 - 1正在注入变量...'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:74 - 2正在请求接口...'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login'}'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None '
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Sun, 23 Feb 2025 14:17:34 GMT', 'Content-Length': '64'}'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': '成功。', 'data': []}'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:77 - 3正在提取变量...'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:83 - 4正在断言...'
''02/23/2025 10:17:34 PM' [commons.models] INFO models.assert_all:59 - assert 200 == code1, 状态码等于200'
''02/23/2025 10:17:34 PM' [pytest_result_log] ERROR plugin.pytest_result_log:190 - test status is FAILED (main.py::TestAPI::test_1_user.yaml[查询用户信息0]): AssertionError'
''02/23/2025 10:17:34 PM' [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ----------------End: main.py::TestAPI::test_1_user.yaml[查询用户信息0]----------------'

View File

@@ -1,64 +0,0 @@
import os
import time
from logging.handlers import TimedRotatingFileHandler
class LufffyTimedRotatingFileHandler(TimedRotatingFileHandler):
def doRollover(self):
"""
do a rollover; in this case, a date/time stamp is appended to the filename
when the rollover happens. However, you want the file to be named for the
start of the interval, not the current time. If there is a backup count,
then we have to get a list of matching filenames, sort them and remove
the one with the oldest suffix.
"""
if self.stream:
self.stream.close()
self.stream = None
# get the time that this sequence started at and make it a TimeTuple
currentTime = int(time.time())
dstNow = time.localtime(currentTime)[-1]
t = self.rolloverAt - self.interval
if self.utc:
timeTuple = time.gmtime(t)
else:
timeTuple = time.localtime(t)
dstThen = timeTuple[-1]
if dstNow != dstThen:
if dstNow:
addend = 3600
else:
addend = -3600
timeTuple = time.localtime(t + addend)
"""
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
if os.path.exists(dfn):
os.remove(dfn)
self.rotate(self.baseFilename, dfn)
"""
# 多进程会导致误删日志,将上面代码重写为如下代码(判断如果不存在则重命名)
# 注意:如果改写的代码会影响其他模块则不能采用该方法
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
if not os.path.exists(dfn):
self.rotate(self.baseFilename, dfn)
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
if not self.delay:
self.stream = self._open()
newRolloverAt = self.computeRollover(currentTime)
while newRolloverAt <= currentTime:
newRolloverAt = newRolloverAt + self.interval
#If DST changes and midnight or weekly rollover, adjust for this.
if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc:
dstAtRollover = time.localtime(newRolloverAt)[-1]
if dstNow != dstAtRollover:
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
newRolloverAt += addend
self.rolloverAt = newRolloverAt

94
main.py
View File

@@ -1,18 +1,90 @@
import os
import shutil
import datetime
from pathlib import Path
import pytest
from commons.cases import TestAPI
from core.settings import LOG_SOURCE, LOG_BACKUP_DIR, ALLURE_TEMP
# from core.enums import AppPlatform
from utils.dirs_manager import ensure_dirs_ok
from utils.report_handler import generate_allure_report
TestAPI.find_yaml_case() # 加载yaml文件
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
# 1启动框架生成临时文件
pytest.main([__file__, "-x", "-v"]) # -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录指定pytest.ini路径
# 2生成HTML报告
os.system('allure generate temp -o report --clean') # java程序只能借助操作系统执行
# netstat -ano | findstr :4723
# taskkill /PID 12345 /F
# 3备份日志
shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log")
def _archive_logs():
"""
在测试开始前,归档上一次运行的日志文件。
此时没有任何句柄占用move 操作是 100% 安全的。
"""
# 4. 备份日志 (无论测试是否崩溃都执行)
if LOG_SOURCE.exists() and LOG_SOURCE.stat().st_size > 0:
now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = LOG_BACKUP_DIR / f"pytest_{now}.log"
try:
# 移动并重命名
shutil.move(str(LOG_SOURCE), str(backup_path))
print(f"已自动归档上次运行的日志: {backup_path}")
# shutil.copy2(LOG_SOURCE, backup_path)
# print(f"日志已备份至: {backup_path}")
_clean_old_logs(LOG_BACKUP_DIR)
except Exception as e:
print(f"归档旧日志失败 (可能被外部编辑器打开): {e}")
else:
print("未找到原始日志文件,跳过备份。")
# 日志清理
def _clean_old_logs(backup_dir, keep_count=10):
files = sorted(Path(backup_dir).glob("pytest_*.log"), key=lambda p: p.stat().st_mtime)
while len(files) > keep_count:
file_to_remove = files.pop(0)
try:
file_to_remove.unlink(missing_ok=True)
except OSError as e:
print(f"清理旧日志失败 {file_to_remove}: {e}")
def _clean_temp_dirs():
"""
可选:如果你想在测试前清理掉旧的临时文件
"""
if ALLURE_TEMP.exists():
shutil.rmtree(ALLURE_TEMP)
# 加上 ignore_errors 是为了防止文件被占用导致整个测试无法启动
shutil.rmtree(ALLURE_TEMP, ignore_errors=True)
ALLURE_TEMP.mkdir(parents=True, exist_ok=True)
def main():
try:
# 1. 创建目录
ensure_dirs_ok()
# 2. 处理日志
_archive_logs()
# 3. 执行 Pytest
args = [
"test_cases",
"-x", # 注意:-x 表示遇到错误立即停止,如果是全量回归建议去掉 -x
"-v",
f"--alluredir={ALLURE_TEMP}",
# f"--platform={AppPlatform.ANDROID.value}",
# "--caps_name=wan_android"
]
pytest.main(args)
# 4. 生成报告
generate_allure_report()
except Exception as e:
print(f"自动化测试执行过程中发生异常: {e}")
finally:
print("Time-of-check to Time-of-use")
if __name__ == "__main__":
main()

View File

@@ -1,27 +1,24 @@
[tool.poetry]
[project]
name = "interfaceautotest"
version = "0.1.0"
description = ""
authors = ["NianJiu <t6i888@163.com>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
requires-python = ">=3.11"
dependencies = [
"requests>=2.32.3",
"pyyaml>=6.0.1",
"pytest>=8.3.3",
"jsonpath>=0.82.2",
"pymysql>=1.1.1",
"pytest-result-log>=1.2.2",
"allure-pytest>=2.13.5",
"cryptography>=44.0.2",
"python-dotenv>=0.9.9",
"pydantic>=2.12.5",
"lxml>=6.0.2",
]
requests = "^2.32.3"
pyyaml = "^6.0.2"
pytest = "^8.3.3"
jsonpath = "^0.82.2"
pymysql = "^1.1.1"
pytest-result-log = "^1.2.2"
allure-pytest = "^2.13.5"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[[tool.poetry.source]]
name = "tsinghua"
[[tool.uv.index]]
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
priority = "primary"
default = true

View File

@@ -1,10 +1,22 @@
[pytest]
addopts = -q --show-capture=no
addopts = -q --show-capture=no --reruns 2 --reruns-delay 1
# 1. 开启实时控制台日志
log_cli = True
log_cli_level = INFO
log_cli_format = %(asctime)s %(levelname)-5s [%(name)s] - %(message)s
log_cli_date_format = %H:%M:%S
log_file = logs/pytest.log
log_file_level = info
log_file_format = '%(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s'
log_file_date_format = '%m/%d/%Y %I:%M:%S %p'
# 2. 开启日志文件记录
log_file = outputs/logs/pytest.log
log_file_level = INFO
log_file_format = %(asctime)s %(levelname)-5s [%(name)s] %(module)s.%(funcName)s:%(lineno)d - %(message)s
log_file_date_format = %Y-%m-%d %H:%M:%S
disable_test_id_escaping_and_forfeit_all_rights_to_community_support = true
# 3. 基础配置
# 解决中文测试用例显示为乱码Unicode的问题
disable_test_id_escaping_and_forfeit_all_rights_to_community_support = True
# 限制 Pytest 搜索范围,提升启动速度
testpaths = test_cases
python_files = test_*.py

View File

@@ -0,0 +1,42 @@
feature: 页面状态
story: 状态
title: 查询状态信息
epic: 的点点滴滴
action:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
json_body:
username: "${username}"
password: "${password}"
extract: # 提取变量
msg:
- "json"
- "$.msg"
- 0
validate:
- check: status_code
assert: ${status_assert} # <--- 动态断言方法
expect: ${status_expect} # <--- 动态期望值
msg: "校验接口状态码"
- check: message
assert: ${msg_assert} # <--- 动态断言方法
expect: ${msg_expect} # <--- 动态期望值
msg: "检查返回消息"
parametrize: # 数据驱动测试
# 定义列名,包括了断言方法和期望值
- [ "title", "username", "password", "status_assert", "status_expect", "msg_assert", "msg_expect" ]
# 定义每一行的数据,现在可以为每次运行指定不同的断言逻辑
- [ "场景1: 成功-状态码相等-消息包含Success", "user1", "pass1", "equals", 200, "contains", "Success" ]
- [ "场景2: 失败-状态码不相等-消息不包含Error", "user2", "pass2", "not_equals", 200, "not_contains", "Error" ]
- [ "场景3: 成功-状态码大于199-消息相等", "user3", "pass3", "greater_than", 199, "equals", "Success" ]
- [ "场景4: 失败-状态码小于500-消息为空", "user4", "pass4", "less_than", 500, "is_empty", "" ]

View File

@@ -0,0 +1,25 @@
feature: 用户管理
story: 状态查询
title: ${title} # 引用参数化里的变量
epic: 混合模式示例
# 【关键改动】:不再写具体的 url, method, headers
# 而是指定要调用的 API 类和方法
api_action:
class: UserAPI
method: get_connector_info
params: # 传给 get_connector_info 方法的参数
username: ${username}
password: ${password}
extract:
msg: ["json", "$.msg", 0]
validate:
equals:
业务状态码校验: ["${msg}", "Success."]
parametrize:
- ["title", "username", "password", "msg"]
- ["测试1", "user1", "pass1", "Success."]
- ["测试2", "user2", "pass2", "Fail."]

View File

@@ -0,0 +1,41 @@
feature: 页面状态
story: 状态
title: 查询状态信息
epic: 的点点滴滴
request:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
json_body: { username:${ username },password:${ password } }
extract: # 提取变量
msg:
- "json"
- "$.msg"
- 0
validate:
- check: status_code
assert: ${status_assert} # <--- 动态断言方法
expect: ${status_expect} # <--- 动态期望值
msg: "校验接口状态码"
- check: message
assert: ${msg_assert} # <--- 动态断言方法
expect: ${msg_expect} # <--- 动态期望值
msg: "检查返回消息"
parametrize: # 数据驱动测试
# 定义列名,包括了断言方法和期望值
- [ "title", "username", "password", "status_assert", "status_expect", "msg_assert", "msg_expect" ]
# 定义每一行的数据,现在可以为每次运行指定不同的断言逻辑
- [ "场景1: 成功-状态码相等-消息包含Success", "user1", "pass1", "equals", 200, "contains", "Success" ]
- [ "场景2: 失败-状态码不相等-消息不包含Error", "user2", "pass2", "not_equals", 200, "not_contains", "Error" ]
- [ "场景3: 成功-状态码大于199-消息相等", "user3", "pass3", "greater_than", 199, "equals", "Success" ]
- [ "场景4: 失败-状态码小于500-消息为空", "user4", "pass4", "less_than", 500, "is_empty", "" ]

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python
# coding=utf-8
import logging
from core import settings
from core.creator import CaseGenerator, TestTemplateBase
logger = logging.getLogger(__name__)
class TestRunner(TestTemplateBase):
"""
测试用例的动态容器 (Test Case Container)。
这是一个占位符类CaseGenerator 会扫描所有的 YAML 用例文件,
然后将每一个用例动态地生成为一个测试方法并挂载到这个类上。
Pytest 最终会发现并执行这些动态挂载的 test_* 方法。
"""
pass
try:
# --- 核心逻辑:动态生成测试用例 ---
# 当 Pytest 在“收集测试用例”阶段加载此模块时,下面的代码会立即执行。
logger.info("--- [Collector] 开始扫描并动态生成测试用例 ---")
CaseGenerator.build_and_register(target_cls=TestRunner, cases_dir=settings.TEST_CASE_DIR)
logger.info(f"--- [Collector] 测试用例生成完毕,已成功加载到 {TestRunner.__name__} ---")
except Exception as e:
logger.critical(f"--- [Collector] 动态生成测试用例时发生致命错误,测试执行中止 ---", exc_info=True)
# 抛出异常,让 pytest 捕获并报告为收集错误 (Collection Error)
raise RuntimeError("测试用例收集失败,请检查日志中的详细错误信息。") from e

48
utils/case_parser.py Normal file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_parser
@date: 2025/2/27 17:25
@desc:
"""
import logging
from dataclasses import dataclass, asdict, field
import yaml
from commons.models import TestCaseStruct
class CaseParser:
@staticmethod
def to_yaml(case_data: dict) -> str:
try:
TestCaseStruct(**case_data)
except TypeError as error:
logging.error(error)
raise error
return yaml.safe_dump(case_data, allow_unicode=True, sort_keys=False)
@staticmethod
def from_yaml(yaml_str: str) -> TestCaseStruct:
return TestCaseStruct(**yaml.safe_load(yaml_str))
if __name__ == '__main__':
with open(r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
print(data)
print(type(data))
# print(CaseInfo(**data))
case_parser = CaseParser()
case_data_ = case_parser.to_yaml(data)
# print(case_data_)
# case_parser.from_yaml(case_data_)
# print(type(case_data_))

107
utils/case_validator.py Normal file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_validator
@date: 2025/2/27 17:25
@desc:
"""
import logging
from typing import List, Union, Any
from pydantic import TypeAdapter
from core.exchange import Exchange
from core.models import ValidateItem
logger = logging.getLogger(__name__)
VALIDATE_LIST_ADAPTER = TypeAdapter(List[ValidateItem])
class CaseValidator:
VALIDATORS = {}
@classmethod
def register(cls, name: str):
def decorator(func):
cls.VALIDATORS[name] = func
return func
return decorator
@classmethod
def validate(cls,response: Any, validate_list: List[ValidateItem]):
"""
核心断言入口:适配 CaseInfo.validate_data (List[ValidateItem])
"""
if not validate_list:
return
# dicts = [
# item.model_dump(by_alias=True) if isinstance(item, ValidateItem) else item for item in validate_list
# ]
# rendered = exchanger.replace(dicts)
# # 触发 SmartInt/SmartDict 类型修复
# final_list = VALIDATE_LIST_ADAPTER.validate_python(rendered)
for item in validate_list:
# 1. 提取模型中的数据
# 此时 final_case 里的 item 已经是经过变量替换后的实体
actual = item.check
expect = item.expect
method = item.assert_method # 即模型中的 alias="assert"
msg = item.msg or f"Assert {actual} {method} {expect}"
# 2. 获取对应的断言函数
validator = cls.VALIDATORS.get(method)
if not validator:
logger.error(f"❌ 不支持的断言方式: {method}")
raise KeyError(f"Unsupported validator: {method}")
# 3. 执行断言
try:
validator(actual, expect, msg)
except AssertionError as e:
logger.error(
f"❌ 断言失败: {msg} | 实际值: {actual} ({type(actual)}), 期望值: {expect} ({type(expect)})")
raise e
@CaseValidator.register('equals')
def validate_equals(a, b, msg):
logger.info(f"assert {a} == {b}, {msg} 执行这段代码")
print(f"assert {a} == {b}, {msg} 执行这段代码")
assert a == b, msg
@CaseValidator.register('not_equals')
def validate_not_equals(a, b, msg):
logger.info(f"assert {a} != {b}, {msg}")
assert a != b, msg
@CaseValidator.register('contains')
def validate_contains(a, b, msg):
logger.info(f"assert {a} in {b}, {msg}")
assert a in b, msg
@CaseValidator.register('not_contains')
def validate_not_contains(a, b, msg):
logger.info(f"assert {a} not in {b}, {msg}")
assert a not in b, msg
if __name__ == '__main__':
resp=None
mock_case = [
{"check": 100, "expect": 100, "assert": "equals"},
{"check": "success", "expect": "success", "assert": "contains"}
]
final_validate_list = VALIDATE_LIST_ADAPTER.validate_python(mock_case)
case_validator = CaseValidator()
print(case_validator.VALIDATORS)
case_validator.validate(resp,final_validate_list)

91
utils/data_driver.py Normal file
View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: data_driver
@date: 2025/3/3 10:56
@desc:
"""
from pathlib import Path
from core.templates import Template
from commons.file_processors.yaml_processor import YamlProcessor as FileHandle
class DataDriver:
@staticmethod
def generate_cases(file_name, case_info) -> dict:
if not case_info.get("parametrize"):
return {file_name + "[--]": case_info}
cases = {}
args_names = case_info.get("parametrize")[0]
for i, args_values in enumerate(case_info.get("parametrize")[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
# print(context)
rendered = Template(FileHandle.to_string(case_info)).render(context)
cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
return cases
if __name__ == '__main__':
file_path = Path(r"D:\CNWei\CNW\InterfaceAutoTest\test_cases\answer\test_1_status.yaml")
file_obj = FileHandle(file_path)
print(file_path.stem)
file_name_ = file_path.stem
# mock_case_info = {
# "case_info0": {
# "feature": "页面状态",
# "story": "状态",
# "title": "查询状态信息",
# "request": "",
# "extract": "",
# "validate": "",
# "parametrize": [["title", "username", "password", "msg"], ["测试1", "user1", "pass1", "200"],
# ["测试2", "user2", "pass2", "300"]]
# },
# "case_info1": {
# "feature": "页面状态",
# "story": "状态",
# "title": "查询状态信息",
# "request": "",
# "extract": "",
# "validate": "",
# "parametrize": [1, 2, 3]
# },
# "case_info2": {
# "feature": "页面状态",
# "story": "状态",
# "title": "查询状态信息",
# "request": "",
# "extract": "",
# "validate": "",
# "parametrize": [1, 2, 3]
# }
#
# }
dd = DataDriver()
# cases = dd.generate_cases(mock_case_info.get("case_info0"))
cases_ = dd.generate_cases(file_name_, file_obj)
print(cases_)
case_keys = list(cases_.keys())
case_values = cases_.values()
print(case_keys)
print(case_values)
aa = [i.get("title") for i in case_values]
print(aa)
# print(list(case_values)[0]["feature"])
print(file_obj["feature"])
# print(list(case_values)[0]["story"])
print(file_obj["story"])