9 Commits

Author SHA1 Message Date
6393414ab2 feat,fix(core,docs): 完善核心模块代码注释并添加架构改进文档
- 为 core 目录下主要模块 (models, context, creator, base_api, exchange, executor) 添加了详细的类和方法 Docstring。
   - 新增 docs/架构改进.md 文件。
2026-03-18 11:26:55 +08:00
d05757f7cc feat(core): 增强 Exchange,实现智能变量替换与类型保持
- 优化 conftest.py 增加异常日志记录和测试报告环境信息
 - 其他优化
2026-03-16 19:15:01 +08:00
00791809df refactor: 重构执行引擎为上下文驱动架构
- 优化 WorkflowExecutor 与 Exchange支持 ExecutionEnv 资源注入。
 - 实现 Session 级别连接复用与变量池内存镜像化,消除重复 I/O 开销。
 - 引入 ChainMap 实现动态上下文切换,解决参数化变量与全局提取变量的优先级覆盖。
 - 完善变量提取与断言逻辑,确保跨用例变量流转的可靠性。
2026-03-14 11:45:52 +08:00
2116016a0d feat(executor): 重构用例加载与执行逻辑,支持参数化变量优先级
- 引入 CaseEntity 包装器,实现数据模型与执行上下文解耦。
 - 移除加载阶段的 deepcopy,优化大规模参数化用例的内存占用。
 - 实现 perform 阶段的局部变量注入,确保参数化数据优先级高于全局缓存。
2026-03-11 17:11:19 +08:00
293b5160fe fix(exchange,case_validator),refactor(),feat(model): 解决 Pydantic 模型初始化与变量占位符的类型冲突,优化变量替换逻辑,重构 CaseInfo 模型并引入延迟校验机制
- 引入 SmartInt 和 SmartDict 类型,支持 YAML 占位符与业务类型的自动转换。
- 优化 CaseInfo 互斥校验逻辑,确保 request 与 api_action 二选一。
- 统一使用 Pydantic V2 的 model_config 规范。
- 将变量替换时机提前至模型实例化之前,支持占位符在校验前完成真实值注入,
保证了 int/bool 等字段的类型转换正确性。
- 优化断言渲染时机,支持响应提取值关联。
2026-03-11 10:29:16 +08:00
69a96a0060 refactor(): 重构动态用例生成逻辑并解耦核心组件
- 将 `CaseGenerator` 拆分为 `CaseDataLoader`(数据加载)和 `CaseGenerator`(用例构造),实现单一职责原则。
- 引入 `TestTemplateBase` 作为纯净的方法挂载容器,避免逻辑代码污染测试用例。
- 优化 YAML 解析流程,将文件扫描、参数化解析与 pytest 方法构建逻辑完全分离。
- 改进装饰器写法,使用更直观的 @ 语法糖处理 Allure 和 pytest.mark.parametrize。
- 增强执行日志,通过类型注解和实例引用记录更详细的运行上下文。
2026-03-06 15:07:22 +08:00
300b5a92d4 refactor(): 优化测试用例数据的处理,优化代码结构
- 新增用例生成器和注册器
- 优化文件处理
2025-06-03 21:42:57 +08:00
2e9f1c12f7 feat,fix(): 优化funcs注册函数
- 优化 register 方法
2025-04-07 16:35:14 +08:00
4324cf37aa feat,fix(): 优化
- 优化 settings(使用环境变量)
- 修复bug
2025-03-19 17:03:13 +08:00
55 changed files with 2058 additions and 996 deletions

24
.gitignore vendored
View File

@@ -2,8 +2,26 @@
.idea/ .idea/
.venv/ .venv/
poetry.lock poetry.lock
.pytest_cache/
report/
temp/
logs/ logs/
# --- 依赖与环境 ---
.venv
venv/
node_modules/
uv.lock
# --- 屏蔽outputs ---
outputs/
# --- Allure 报告 ---
temp/
reports/
.allure/
# --- pytest缓存 ---
.pytest_cache/
.allure_cache/
# --- 配置文件 ---
.env .env

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.11

View File

@@ -1,31 +0,0 @@
feature: 页面状态
story: 状态
title: 查询状态信息
request:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
extract: # 提取变量
msg:
- "json"
- "$.msg"
- 0
validate:
equals: # 断言相等
状态码等于200:
- Success.
- ${msg}
#parametrize: # 数据驱动测试
# - [ "title","username","password","msg" ] # 变量名
# - [ "测试1","user1","pass1","200" ] # 变量值
# - [ "测试2","user2","pass2","300" ] # 变量值
# - [ "测试3","user3","pass3","200" ] # 变量值
# - [ "测试4","user4","pass4","200" ] # 变量值

View File

@@ -1,65 +0,0 @@
{
"epic": "项目名称answer",
"feature": "页面状态",
"story": "状态",
"title": "查询状态信息",
"request": {
"method": "get",
"url": "/answer/api/v1/connector/info",
"headers": {
"Host": "119.91.19.171:40065",
"Accept-Language": "en_US",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
"Referer": "http://119.91.19.171:40065/users/login",
"Accept-Encoding": "gzip, deflate"
}
},
"extract": {
"msg": [
"json",
"$.msg",
0
]
},
"validate": {
"equals": {
"状态码等于200": [
"Success.",
"Success."
]
}
},
"parametrize": [
[
"title",
"username",
"password",
"msg"
],
[
"测试1",
"user1",
"pass1",
"200"
],
[
"测试2",
"user2",
"pass2",
"300"
],
[
"测试3",
"user3",
"pass3",
"200"
],
[
"测试4",
"user4",
"pass4",
"200"
]
]
}

View File

@@ -1,51 +0,0 @@
feature: 特征
story: 事件
title: 查询用户信息
request:
method: get
url: http://119.91.19.171:40065/answer/api/v1/connector/info
headers:
Accept-Encoding: gzip, deflate
Accept-Language: zh_CN
Content-Type: application/json
Cookie: psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==
Host: 119.91.19.171:40065
Origin: http://119.91.19.171:40065
Referer: http://119.91.19.171:40065/users/login
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like
Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0
extract: # 提取变量
code:
- "json"
- "$.code"
- 0
msg:
- "json"
- "$.msg"
- 0
validate:
equals: # 断言相等
状态码等于200:
- 200
- ${code}
not_equals: # 断言不相等
状态码不等于404:
- 404
- ${code}
contains: # 断言包含
包含关系:
- 404
- ${code}
not_contains: # 断言不包含
不包含关系:
- 404
- ${code}
parametrize: # 数据驱动测试
- [ "title","username","password","code" ] # 变量名
- [ "测试1","user1","pass1","code1" ] # 变量值
- [ "测试2","user2","pass2","code2" ] # 变量值
- [ "测试3","user3","pass3","code3" ] # 变量值
- [ "测试4","user4","pass4","code4" ] # 变量值

View File

@@ -1,15 +0,0 @@
title: 查询用户信息
request:
method: get
url: "https://api.kuleu.com/api/action"
headers:
user-agent: 'Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, like Gecko) Chrome / 128.0.0.0Safari / 537.36'
params:
text: ${url_unquote(code)}
# data: ${code}
extract:
status_code: [ json, $.data,0 ]
validate:
codes: 200

View File

@@ -1,30 +0,0 @@
title: 查询用户信息
request:
method: get
url: http://119.91.19.171:40065/answer/api/v1/connector/info
headers:
Accept-Encoding: gzip, deflate
Accept-Language: zh_CN
Content-Type: application/json
Cookie: psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==
Host: 119.91.19.171:40065
Origin: http://119.91.19.171:40065
Referer: http://119.91.19.171:40065/users/login
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like
Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0
extract: # 提取变量
reason:
- "json"
- "$.reason"
- 0
validate:
# 断言 sql
contains: # 断言包含
用户在数据库中:
- "ltcs"
- ${sql(select username from user where id=1)}
not_contains: # 断言包含
用户不存在在数据库中:
- "ltcs"
- ${sql(select username from user where id=1)}

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: a_test_case.py
@date: 2024 2024/9/15 19:15
@desc:
"""
from requests import Session
import requests
session = Session()
def test_1():
base_url = "https://jsonplaceholder.typicode.com"
session.params = {
'Content-Type': 'application/json;charset=utf-8'
}
url = f"{base_url}/users"
payload = {}
# response = requests.request("POST", url, headers=headers, data=payload)
response = session.get(url, json=payload)
print(response.json()[0]["username"])
assert response.status_code == 200
def test_2():
base_url = r'https://api.kuleu.com/api/action'
params = {"text": "爱情"}
header = {
"user-agent": 'Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, like Gecko) '
'Chrome / 128.0.0.0Safari / 537.36'
}
response = requests.get(base_url, headers=header, params=params)
# print(response.text)
print(response.json())
print(response.request.url)
assert response.status_code == 200

28
api.py
View File

@@ -1,28 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: api.py
@date: 2024 2024/9/12 22:52
@desc:
"""
from commons.session import Session
# session = requests.session()
session = Session("https://jsonplaceholder.typicode.com")
session.params = {
'Content-Type': 'application/json;charset=utf-8'
}
url = "/users"
payload = {}
# response = requests.request("POST", url, headers=headers, data=payload)
response = session.get(url, json=payload)
# print(response.text)
# print(response.url)
# print(response)

11
api/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: __init__.py
@date: 2024 2024/9/15 21:13
@desc:
"""

32
api/user_api.py Normal file
View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python
# coding=utf-8
from core.base_api import BaseApi
class UserApi(BaseApi):
"""用户中心业务接口"""
def login(self, username, password):
"""登录接口示例"""
self._log_action("login", user=username)
payload = {
"username": username,
"password": password
}
# 直接调用继承自 session 的请求方法
return self.session.request(
method="POST",
url="/api/v1/login",
json=payload
)
def get_info(self, user_id: int):
"""获取用户信息示例"""
self._log_action("get_info", uid=user_id)
return self.session.request(
method="GET",
url=f"/api/v1/user/{user_id}"
)

View File

@@ -15,11 +15,11 @@ import logging
import allure import allure
import pytest import pytest
from commons import settings from core import settings
from commons.file_processors.file_handle import FileHandle from commons.file_processors.yaml_processor import YamlProcessor as FileHandle
from commons.models import CaseInfo from commons.models import CaseInfo
from commons.session import Session from core.session import Session
from commons.exchange import Exchange from core.exchange import Exchange
from utils import data_driver, case_validator from utils import data_driver, case_validator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -10,9 +10,10 @@
@desc: @desc:
""" """
import logging import logging
import os
import pymysql as MySQLdb import pymysql as MySQLdb
from commons import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -32,14 +33,14 @@ class DBServer:
db = DBServer( db = DBServer(
host=settings.db_host, # ip host=os.getenv("DB_HOST"), # ip
port=3306, # 端口 port=os.getenv("DB_PORT"), # 端口
user='root', # 用户名 user=os.getenv("DB_USER"), # 用户名
password='mysql_hNahSe', # 密码 password=os.getenv("DB_PASSWORD"), # 密码
database='answer' # 库名 database=os.getenv("DB_DATABASE") # 库名
) )
if __name__ == '__main__': if __name__ == '__main__':
... ...
res = db.execute_sql('select username from user where id=1;') # res = db.execute_sql('select username from user where id=1;')
print(res[0]) # print(res[0])

View File

@@ -1,111 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: exchange.py
@date: 2024 2024/9/18 21:58
@desc:
"""
import copy
import json
import logging
import re
import jsonpath
import allure
from commons.templates import Template
from commons.file_processors.file_handle import FileHandle
logger = logging.getLogger(__name__)
class Exchange:
def __init__(self, path):
self.file = FileHandle(path)
@allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index):
resp = copy.deepcopy(resp)
try:
# resp中json是方法不是属性需要手动更改为属性
resp.json = resp.json()
except json.decoder.JSONDecodeError:
resp.json = {"msg": "is not json data"}
data = getattr(resp, attr)
if expr.startswith("/"): # xpath
res = None
elif expr.startswith("$"): # jsonpath
data = dict(data)
res = jsonpath.jsonpath(data, expr)
else: # 正则
res = re.findall(expr, str(data))
# print(res)
if res: # 如果有数据
value = res[index]
else: # 如果没有数据
value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件
@allure.step("替换变量")
def replace(self, case_info: dict) -> dict:
logger.info(f"变量替换:{case_info}")
# 1将case_info转换为字符串
case_info_str = FileHandle.to_string(case_info)
print(f"{case_info_str=}")
# 2替换字符串
case_info_str = Template(case_info_str).render(self.file)
print(f"{case_info_str=}")
# 3将字符串转换成case_info
new_case_info = FileHandle.to_dict(case_info_str)
return new_case_info
if __name__ == '__main__':
class MockResponse:
text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}'
def json(self):
return json.loads(self.text)
mock_resp = MockResponse()
# print(mock_resp.text)
# print(mock_resp.json())
exchanger = Exchange(r"E:\PyP\InterfaceAutoTest\extract.yaml")
exchanger.extract(mock_resp, "name", "json", '$.name', 0)
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0)
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
# mock_case_info = CaseInfo(
# title="单元测试",
# request={
# "data":
# {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
# },
# extract={},
# validate={}
# )
mock_case_info = {
"title": "单元测试",
"request": {
"data":
{"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
},
"extract": {},
"validate": {}
}
new_mock_case_info = exchanger.replace(mock_case_info)
print(new_mock_case_info)

View File

@@ -9,3 +9,14 @@
@date: 2025/3/4 17:23 @date: 2025/3/4 17:23
@desc: @desc:
""" """
from .base_processor import BaseFileProcessor
from .json_processor import JsonProcessor
from .yaml_processor import YamlProcessor
from .processor_factory import get_processor_class
__all__ = [
"BaseFileProcessor",
"JsonProcessor",
"YamlProcessor",
"get_processor_class",
]

View File

@@ -10,6 +10,8 @@
@desc: @desc:
""" """
import abc import abc
from pathlib import Path
from typing import Union
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类 class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
@@ -17,25 +19,16 @@ class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
文件处理器的抽象基类 文件处理器的抽象基类
定义了所有子类必须实现的方法 定义了所有子类必须实现的方法
""" """
def __init__(self, filepath: Union[str, Path], **kwargs):
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
@abc.abstractmethod @abc.abstractmethod
def load(self): def load(self) -> dict:
"""加载.""" """加载."""
pass raise NotImplementedError
@staticmethod
@abc.abstractmethod
def to_string(data: dict) -> str:
"""将文件内容转换为字符串。"""
pass
@staticmethod
@abc.abstractmethod
def to_dict(data: str) -> dict:
"""将文件内容转换为字典。"""
pass
@abc.abstractmethod @abc.abstractmethod
def save(self, new_filepath=None): def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""将数据保存.""" """将数据保存."""
pass raise NotImplementedError

View File

@@ -1,41 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
processors = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
def get_processor(ext):
agent_model = processors.get(ext, YamlProcessor) # 代理模式
return agent_model # 默认回退到 Yaml
FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = FileHandle(yaml_path)
print(yaml_file)
print(type(yaml_file))
file_string = FileHandle.to_string(yaml_file)
print(file_string)
file_dict = FileHandle.to_dict(file_string)
print(file_dict)

View File

@@ -10,117 +10,77 @@
@desc: @desc:
""" """
import logging import logging
from typing import Union from typing import Union, Any
from pathlib import Path from pathlib import Path
import json import json
from commons.file_processors.base import BaseFileProcessor from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class JsonProcessor(BaseFileProcessor, dict): class JsonProcessor(BaseFileProcessor):
""" """
用于处理 YAML 文件的类,继承自 dict 用于处理 JSON 文件的类。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, 提供了从文件加载 JSON 数据为字典,以及将字典保存为 JSON 文件的功能
并可以直接像字典一样访问 YAML 数据。
""" """
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): def __init__(self, filepath: Union[str, Path], **kwargs):
""" """
初始化 YamlFile 对象。 初始化 JsonFile 对象。
Args: Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
""" """
super().__init__() # 初始化父类 dict super().__init__(filepath, **kwargs)
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 # self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
def load(self) -> None: def load(self) -> dict[str, Any]:
""" """
YAML 文件加载数据并更新字典 Json 文件加载数据。
如果文件不存在或加载失败,则清空字典并记录警告/错误。 :return:
""" """
self.clear() # 清空现有数据 if not self.filepath.exists():
if self.filepath.exists(): logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
try: try:
with open(self.filepath, "r", encoding="utf-8") as f: with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = json.load(f) or {} loaded_data = json.load(f)
self.update(loaded_data) # 使用加载的数据更新字典 if not isinstance(loaded_data, dict): # 确保加载的字典
except json.JSONDecodeError as e: logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
@staticmethod
def to_string(data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return json.dumps(
dict(data), # 使用dict转换为标准的字典
ensure_ascii=False, # 允许非ASCII字符
# indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键
)
except TypeError as e:
logger.error(f"将数据转换为 JSON 字符串时出错: {e}")
return ""
@staticmethod
def to_dict(data: str) -> None:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = json.loads(data) or {}
return loaded_data return loaded_data
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error(f"将 JSON 字符串转换为字典时出错: {e}") logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
raise e
def save(self, new_filepath: Union[str, Path, None] = None): def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
""" """
将字典数据 (自身) 保存到 YAML 文件。 将字典数据保存到 json 文件。
Args: Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。 :param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
""" """
filepath = Path(new_filepath) if new_filepath else self.filepath filepath = Path(new_filepath) if new_filepath else self.filepath
filepath.parent.mkdir(parents=True, exist_ok=True)
try: try:
with open(filepath, "w", encoding="utf-8") as f: with open(filepath, "w", encoding="utf-8") as f:
json.dump( json.dump(
dict(self), # 使用dict转换为标准的字典 data,
f, f,
ensure_ascii=False, # 允许非ASCII字符 ensure_ascii=False, # 允许非ASCII字符
indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键 sort_keys=False # 不排序键
) )
except (TypeError, OSError) as e: logger.info(f"数据已成功保存到 {filepath}")
except (TypeError, OSError, json.JSONDecodeError) as e:
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}") logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
raise e
if __name__ == '__main__': if __name__ == '__main__':
# 示例用法 # 示例用法
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径 json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
json_file = JsonProcessor(json_path) json_file = JsonProcessor(json_path)
print(json_file) print(json_file.load())
print(type(json_file)) print(type(json_file))
json_string = JsonProcessor.to_string(json_file) # json_file.save()
JsonProcessor.to_dict(json_string)
print(json_string)
json_file.save()

View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from pathlib import Path
from typing import Type, Union
from commons.file_processors.base_processor import BaseFileProcessor
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
# 类型别名,表示处理器类的字典
ProcessorMap = dict[str, Type[BaseFileProcessor]]
processors: ProcessorMap = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
class UnsupportedFileTypeError(Exception):
"""当文件类型不被支持时抛出此异常。"""
pass
# def get_processor_class(file_suffix: str = "yaml") -> Type[BaseFileProcessor]:
def get_processor_class(fp: Union[Path, str]) -> 'BaseFileProcessor':
fp = Path(fp)
if fp.is_file():
file_suffix = fp.suffix[1:]
processor_class = processors.get(file_suffix.lower(), YamlProcessor) # 代理模式
return processor_class(fp) # 默认回退到 Yaml
else:
raise UnsupportedFileTypeError(fp)
# FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
# yaml_file = FileHandle(yaml_path)
# print(yaml_file.load())
# print(type(yaml_file))
# file_suffix = Path(yaml_path).suffix[1:]
# print(file_suffix)
get_processor = get_processor_class(yaml_path)
print(get_processor.load())

View File

@@ -10,16 +10,22 @@
@desc: @desc:
""" """
import logging import logging
from typing import Union from typing import Union, Any
from dataclasses import dataclass, asdict, field
from pathlib import Path from pathlib import Path
import yaml import yaml
from commons.file_processors.base import BaseFileProcessor from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class YamlProcessor(BaseFileProcessor, dict): class YamlLoadError(Exception):
"""自定义 YAML 加载异常:当 YAML 语法错误或不符合业务结构时抛出"""
pass
class YamlProcessor(BaseFileProcessor):
""" """
用于处理 YAML 文件的类,继承自 dict。 用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
@@ -35,33 +41,45 @@ class YamlProcessor(BaseFileProcessor, dict):
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。 如果不提供,则尝试从 filepath 加载数据。
""" """
super().__init__() # 初始化父类 dict
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
def load(self) -> None: super().__init__(filepath=filepath)
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> dict[str, Any]:
""" """
YAML 文件加载数据并更新字典。 加载 YAML 文件并返回字典。
如果文件不存在或加载失败,则清空字典并记录警告/错误。
Returns:
Dict: 加载后的数据字典。
Raises:
YamlLoadError: 文件读取或解析过程中出现异常。
""" """
self.clear() # 清空现有数据 if not self.filepath.exists():
if self.filepath.exists(): logger.error(f"❌ 文件未找到: {self.filepath}")
return {}
try: try:
with open(self.filepath, "r", encoding="utf-8") as f: with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f) or {} content = yaml.safe_load(f)
self.update(loaded_data) # 使用加载的数据更新字典 # 情况1文件内容为空
if content is None:
return {}
# 情况2YAML 语法正确但不是字典(如单纯的字符串或列表)
if not isinstance(content, dict):
raise YamlLoadError(f"YAML 顶层格式错误:期望 dict实际为 {type(content).__name__}")
return content
except yaml.YAMLError as e: except yaml.YAMLError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") msg = f" YAML 语法错误 [{self.filepath.name}]: {e}"
# 保持字典为空 (已在开头 clear) logger.error(msg)
else: raise YamlLoadError(msg) from e
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") except Exception as e:
# 保持字典为空 (已在开头 clear) logger.error(f"📂 读取文件系统异常: {e}")
raise
@staticmethod @staticmethod
def to_string(data: dict) -> str: def to_string(data: dict[str, Any]) -> str:
""" """
将字典 (自身) 转换为 YAML 格式的字符串。 将字典 (自身) 转换为 YAML 格式的字符串。
@@ -70,7 +88,7 @@ class YamlProcessor(BaseFileProcessor, dict):
""" """
try: try:
return yaml.safe_dump( return yaml.safe_dump(
dict(data), # 使用dict转换为标准的字典 data,
allow_unicode=True, allow_unicode=True,
sort_keys=False, sort_keys=False,
default_flow_style=False default_flow_style=False
@@ -78,51 +96,60 @@ class YamlProcessor(BaseFileProcessor, dict):
except TypeError as e: except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}") logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return "" return ""
except Exception as e:
logger.error(f"序列化 YAML 失败: {e}")
return ""
@staticmethod @staticmethod
def to_dict(data: str) -> Union[None, dict]: def from_string(yaml_str: str) -> Union[None, dict]:
""" """
将 YAML 格式的字符串转换为字典,并更新当前字典的内容. 将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args: Args:
data: YAML 格式的字符串。 yaml_str: YAML 格式的字符串。
""" """
try: try:
loaded_data = yaml.safe_load(data) or {} data = yaml.safe_load(yaml_str)
return loaded_data return data if isinstance(data, dict) else {}
except yaml.YAMLError as e: except yaml.YAMLError as e:
logger.error(f"YAML 字符串转换为字典时出错: {e}") logger.error(f"YAML 字符串解析失败: {e}")
return {}
def save(self, new_filepath: Union[str, Path, None] = None): def save(self, data: dict[str, Any], new_filepath: Union[str, Path, None] = None):
""" """
将字典数据 (自身) 保存 YAML 文件。 将字典数据保存 YAML 文件。
Args: Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件 data: 要保存的字典数据
new_filepath: 可选,保存到新路径。
""" """
filepath = Path(new_filepath) if new_filepath else self.filepath target_path = Path(new_filepath) if new_filepath else self.filepath
try: try:
with open(filepath, "w", encoding="utf-8") as f: target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
yaml.safe_dump( yaml.safe_dump(
dict(self), # 使用dict转换为标准的字典 data,
stream=f, stream=f,
allow_unicode=True, allow_unicode=True,
sort_keys=False, sort_keys=False,
default_flow_style=False default_flow_style=False
) )
logger.debug(f"💾 数据已成功保存至: {target_path}")
except Exception as e:
logger.error(f"🚫 保存 YAML 失败: {e}")
raise
except (TypeError, OSError) as e: except (TypeError, OSError) as e:
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") logger.error(f"保存 YAML 文件 {self.filepath} 时出错: {e}")
# todo 需要将异常的情况返回给上层而不是默认处理为{}
if __name__ == '__main__': if __name__ == '__main__':
from core.settings import TEST_CASE_DIR
# 示例用法 # 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 yaml_path = TEST_CASE_DIR / r'answer/test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlProcessor(yaml_path) yaml_file = YamlProcessor(yaml_path)
print(yaml_file) print(yaml_file.load())
print(yaml_file.to_string(yaml_file.load()))
print(type(yaml_file)) print(type(yaml_file))
# # 直接像字典一样访问数据 # # 直接像字典一样访问数据
@@ -165,4 +192,3 @@ if __name__ == '__main__':
# print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {} # print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {}
# non_existent_file['a'] = 1 # 可以直接添加 # non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file) # print("\n加载不存在的文件:", non_existent_file)

View File

@@ -1,68 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: files.py
@date: 2024 2024/9/15 21:28
@desc: 读取和保存yaml文件
"""
import logging
from dataclasses import dataclass, asdict, field
from pathlib import Path
import yaml
logger = logging.getLogger(__name__)
class YamlFile(dict):
def __init__(self, path):
super().__init__() # 初始化父类 dict
self.path = Path(path)
self.load() # 链式初始化加载
def load(self):
if self.path.exists():
with open(self.path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {} # 加载数据,空文件返回空字典
self.clear() # 清空当前实例
self.update(data) # 更新字典内容
else:
logger.warning(f"File {self.path} not found, initialized empty.")
return self # 链式调用
def to_yaml(self) -> str:
return yaml.safe_dump(
dict(self),
allow_unicode=True,
sort_keys=False
)
@classmethod
def by_yaml(cls, yaml_str):
data = yaml.safe_load(yaml_str) or {}
return cls({**data}) # 通过类方法创建实例
def save(self):
with open(self.path, "w", encoding="utf-8") as f:
yaml.safe_dump(
dict(self), # 直接 dump 实例本身(已继承 dict
stream=f,
allow_unicode=True,
sort_keys=False
)
return self # 链式调用
if __name__ == '__main__':
from commons.models import CaseInfo
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml'
yaml_file = YamlFile(yaml_path)
# yaml_file.load()
case_info = CaseInfo(**yaml_file)
yaml_file["title"] = "查询用户信息"
yaml_file.save()

View File

@@ -15,30 +15,32 @@ import time
import urllib.parse import urllib.parse
import hashlib import hashlib
from commons.databases import db # from commons.databases import db
from commons.file_processors.file_handle import FileHandle # from commons.file_processors.yaml_processor import YamlProcessor as get_processor_class
from commons import settings from core import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Funcs:
class Funcs:
FUNC_MAPPING = { FUNC_MAPPING = {
"int": int, "int": int,
"float": float, "float": float,
"bool": bool "bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping } # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
@classmethod @classmethod
def register(cls, name: str): def register(cls, name: str | None = None):
def decorator(func): def decorator(func):
if name is None:
cls.FUNC_MAPPING[func.__name__] = func
cls.FUNC_MAPPING[name] = func cls.FUNC_MAPPING[name] = func
return func return func
return decorator return decorator
@Funcs.register("url_unquote") @Funcs.register("url_unquote")
def url_unquote(s: str) -> str: def url_unquote(s: str) -> str:
return urllib.parse.unquote(s) return urllib.parse.unquote(s)
@@ -49,35 +51,43 @@ def to_string(s) -> str:
# 将数据转换为str类型。 # 将数据转换为str类型。
return f"'{s}'" return f"'{s}'"
@Funcs.register("time_str") @Funcs.register("time_str")
def time_str() -> str: def time_str() -> str:
return str(time.time()) return str(time.time())
@Funcs.register("add") @Funcs.register("add")
def add(a, b): def add(a, b):
return str(int(a) + int(b)) return str(int(a) + int(b))
@Funcs.register("sql")
def sql(s: str) -> str:
res = db.execute_sql(s)
return res[0][0] # @Funcs.register("sql")
# def sql(s: str) -> str:
# res = db.execute_sql(s)
#
# return res[0][0]
@Funcs.register("new_id")
def new_id():
# 自增,永不重复
id_file = FileHandle(settings.id_path)
id_file["id"] += 1
id_file.save()
return id_file["id"] # @Funcs.register("new_id")
# def new_id():
# # 自增,永不重复
# id_file = get_processor_class(settings.id_path)
# data = id_file.load()
# data["id"] += 1
# id_file.save(data)
#
# return data["id"]
@Funcs.register("last_id")
def last_id() -> str:
# 不自增,只返回结果
id_file = FileHandle(settings.id_path) # @Funcs.register("last_id")
return id_file["id"] # def last_id() -> str:
# # 不自增,只返回结果
#
# id_file = get_processor_class(settings.id_path)
# data = id_file.load()
# return data["id"]
@Funcs.register("md5") @Funcs.register("md5")
def md5(content: str) -> str: def md5(content: str) -> str:
@@ -86,6 +96,7 @@ def md5(content: str) -> str:
result = hashlib.md5(content).hexdigest() result = hashlib.md5(content).hexdigest()
return result return result
@Funcs.register("base64_encode") @Funcs.register("base64_encode")
def base64_encode(content: str) -> str: def base64_encode(content: str) -> str:
# 1原文转二进制 # 1原文转二进制
@@ -97,6 +108,7 @@ def base64_encode(content: str) -> str:
return encode_str return encode_str
@Funcs.register("base64_decode") @Funcs.register("base64_decode")
def base64_decode(content: str) -> str: def base64_decode(content: str) -> str:
# 1原文转二进制 # 1原文转二进制
@@ -108,15 +120,22 @@ def base64_decode(content: str) -> str:
return decode_str return decode_str
@Funcs.register("rsa_encode") @Funcs.register("rsa_encode")
def rsa_encode(content: str) -> str: def rsa_encode(content: str) -> str:
... ...
@Funcs.register("rsa_decode") @Funcs.register("rsa_decode")
def rsa_decode(content: str) -> str: def rsa_decode(content: str) -> str:
... ...
@Funcs.register("gen_phone")
def func_name_test():
return "我被替换了!!!"
if __name__ == '__main__': if __name__ == '__main__':
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82") # res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
# print(res) # print(res)

View File

@@ -1,39 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: models.py
@date: 2024 2024/9/15 21:14
@desc: 声明yaml用例格式
"""
import logging
from dataclasses import dataclass, field
import yaml
from commons import settings
logger = logging.getLogger(__name__)
@dataclass
class CaseInfo:
title: str
request: dict
extract: dict
validate: dict
parametrize: list = field(default_factory=list)
epic: str = settings.allure_epic
feature: str = settings.allure_feature
story: str = settings.allure_story
if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
# print(data)
case_info = CaseInfo(**data)

View File

@@ -1,41 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: settings
@date: 2025/2/23 21:34
@desc:
"""
from pathlib import Path
import os
from dotenv import load_dotenv
load_dotenv()
root_path = (Path(__file__)).resolve().parents[1]
base_url = os.getenv("BASE_URL")
cases_dir = rf"{root_path}\TestCases\answer"
exchanger = rf"{root_path}\extract.yaml"
id_path = rf"{root_path}\id.yaml"
db_host = os.getenv("DB_HOST") # ip
db_port = os.getenv("DB_PORT") # 端口
db_user = os.getenv("DB_USER") # 用户名
db_password = os.getenv("DB_PASSWORD") # 密码
db_database = os.getenv("DB_DATABASE") # 库名
allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature"
allure_story: str = "默认事件story"
rsa_public = ""
rsa_private = ""
if __name__ == '__main__':
print(root_path)
print(base_url,db_host,db_port,db_user,db_password,db_database)

View File

@@ -1,63 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: templates.py
@date: 2024 2024/9/22 22:20
@desc:
"""
import copy
import logging
import re
import string
from commons.funcs import Funcs
logger = logging.getLogger(__name__)
class Template(string.Template):
"""
1支持函数调用
2参数也可以是变量
"""
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
def render(self, mapping: dict) -> str:
s = self.safe_substitute(mapping) # 原有方法替换变量
s = self.safe_substitute_funcs(s, mapping)
return s
def safe_substitute_funcs(self, template, mapping) -> str:
"""
解析字符串中的函数名和参数,并将函数调用结果进行替换
:param template: 字符串
:param mapping: 上下文,提供要使用的函数和变量
:return: 替换后的结果
"""
mapping = copy.deepcopy(mapping)
logger.info(f"mapping更新前: {mapping}")
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
logger.info(f"mapping更新后: {mapping}")
def convert(mo):
func_name = mo.group("func_name")
func_args = mo.group("func_args").split(",")
func = mapping.get(func_name) # 读取指定函数
func_args_value = [mapping.get(arg, arg) for arg in func_args]
if func_args_value == [""]: # 处理没有参数的func
func_args_value = []
if not callable(func):
return mo.group() # 如果是不可调用的假函数,不进行替换
else:
return str(func(*func_args_value)) # 否则用函数结果进行替换
return self.call_pattern.sub(convert, template)

139
conftest.py Normal file
View File

@@ -0,0 +1,139 @@
#!/usr/bin/env python
# coding=utf-8
"""
@desc: Pytest 配置文件,用于设置全局 Fixture 和钩子函数
"""
import platform
from typing import Any
import pytest
from pathlib import Path
import logging
from core.context import VariableStore, ExecutionEnv
from core.session import Session
from core.exchange import Exchange
from core.settings import EXTRACT_CACHE,base_url
logger = logging.getLogger(__name__)
# 注册命令行参数
def pytest_addoption(parser: Any) -> None:
"""
注册自定义命令行参数。
允许用户通过命令行传递参数来控制测试执行的行为。
Args:
parser: Pytest 的命令行参数解析器对象。
"""
parser.addoption("--test_dir", action="store", default=None, help="测试用例目录")
parser.addoption("--env", action="store", default="test", help="运行环境标识 (test/prod/dev)")
@pytest.fixture(scope="session")
def execution_context():
"""
[Session级别 Fixture] 全局执行上下文环境。
职责:
1. 生命周期管理:初始化并管理全局唯一的 Session、变量存储 (Store) 和 变量交换器 (Exchanger)。
2. 资源复用:确保 HTTP 连接池复用,减少握手开销。
3. 数据持久化:在测试结束时自动将提取的变量回写到磁盘。
Yields:
ExecutionEnv: 包含 session, store, exchanger 的环境对象实例。
"""
# Setup: 加载环境
store = VariableStore(EXTRACT_CACHE)
exchanger = Exchange(variable_cache=store.store)
session = Session(base_url)
env = ExecutionEnv(session, store, exchanger)
yield env # 注入到测试用例中
# Teardown: 统一持久化与清理
store.persist()
session.close()
def pytest_exception_interact(node: Any, call: Any, report: Any) -> None:
"""
[Hook] 异常交互钩子。
当测试用例执行失败(断言错误或代码异常)时触发。
主要用于捕获详细的错误堆栈信息,并将其格式化输出到日志中,
以便于在控制台或日志文件中快速定位问题。
Args:
node: 发生异常的测试节点Item 或 Collector
call: 测试调用信息(包含 excinfo 异常信息)。
report: 测试报告对象。
"""
if report.failed:
# 获取详细的错误堆栈(包含 assert 的对比信息)
# long,short,no-locals
exc_info = call.excinfo.getrepr(style='short')
logger.error(f"\n{'=' * 40} TEST FAILED {'=' * 40}\n"
f"Node: {node.name}\n"
f"Error:\n{exc_info}"
)
logger.error("=" * 93 + "\n")
def pytest_sessionfinish(session: Any, exitstatus: int) -> None:
"""
[Hook] 会话结束钩子。
在所有测试执行完毕后调用。主要完成以下工作:
1. 根据退出状态码记录不同级别的日志信息。
2. 收集测试环境信息(如 Base URL, Python 版本, 操作系统等)。
3. 生成 `environment.properties` 文件供 Allure 报告展示。
Args:
session: Pytest 会话对象。
exitstatus: 整体测试执行的退出状态码。
"""
match exitstatus:
case pytest.ExitCode.OK:
logging.info("测试全部通过!")
case pytest.ExitCode.TESTS_FAILED:
logging.warning("部分测试用例执行失败,请检查报告。")
case pytest.ExitCode.INTERRUPTED:
logging.error("测试被用户手动中断Ctrl+C")
case pytest.ExitCode.INTERNAL_ERROR:
logging.critical("Pytest 发生内部错误!")
case pytest.ExitCode.USAGE_ERROR:
logging.error("Pytest 命令行参数错误或用法不当。")
case pytest.ExitCode.NO_TESTS_COLLECTED:
logging.warning("未发现任何测试用例。")
case _:
logging.error(f"未知错误状态码: {exitstatus}")
report_dir = session.config.getoption("--alluredir")
if not report_dir:
return
report_path = Path(report_dir)
# 收集环境信息 (适配接口自动化)
env_info = {
"Base URL": base_url,
"Environment": session.config.getoption("--env"),
"Python Version": platform.python_version(),
"OS System": platform.system(),
"Project": "Interface Auto Test"
}
try:
if not report_path.exists():
report_path.mkdir(parents=True, exist_ok=True)
# 生成 environment.properties 文件
env_file = report_path / "environment.properties"
with env_file.open("w", encoding="utf-8") as f:
for k, v in env_info.items():
f.write(f"{k}={v}\n")
logging.info("Allure 环境信息已生成。")
except Exception as e:
logging.error(f"无法写入环境属性: {e}")

25
core/base_api.py Normal file
View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
# coding=utf-8
import logging
from core.session import Session
from core import settings
class BaseApi:
"""
所有 API 类的基类。
提供基础的 Session 管理和日志记录功能,供具体的业务 API 类继承。
"""
def __init__(self, session: Session = None):
"""
初始化 BaseApi。
Args:
session: HTTP 会话对象。如果未提供,将使用默认配置创建一个新的 Session。
"""
self.session = session or Session(base_url=settings.base_url)
self.logger = logging.getLogger(self.__class__.__name__)
def _log_action(self, method_name: str, **kwargs):
"""统一的动作日志记录"""
self.logger.info(f"执行动作: {method_name} | 参数: {kwargs}")

46
core/context.py Normal file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com
@file: context
@date: 2026/3/14 09:07
@desc:
"""
from dataclasses import dataclass
from typing import Any
from pathlib import Path
from core.exchange import Exchange
from core.session import Session
from commons.file_processors.yaml_processor import YamlProcessor
class VariableStore:
"""内存变量仓库:负责 L2 缓存与磁盘的唯一交互"""
def __init__(self, seed_file: Path):
"""
初始化变量仓库。
Args:
seed_file: 初始变量文件路径YAML格式用于加载种子数据。
"""
self.seed_file = seed_file
self.processor = YamlProcessor(seed_file)
# 启动时仅加载一次
self.store: dict[str, Any] = self.processor.load() or {}
def persist(self):
"""测试结束时统一写盘"""
self.processor.save(self.store)
@dataclass
class ExecutionEnv:
"""环境上下文:持有共享资源"""
session: Session
store: VariableStore
exchanger: "Exchange"

200
core/creator.py Normal file
View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com
@file: creator
@date: 2026/3/6 10:40
@desc:
"""
import logging
import allure
from pathlib import Path
from dataclasses import dataclass
from conftest import execution_context
from core import settings
from core.executor import WorkflowExecutor
from pydantic import ValidationError
from commons.file_processors.yaml_processor import YamlProcessor as FileHandle, YamlLoadError
from core.models import RawSchema # 导入之前定义的 Pydantic 模型
from typing import Any, List, Type, Generator, Union
logger = logging.getLogger(__name__)
@dataclass
class CaseEntity:
"""用例执行实体:解耦模型数据与执行上下文"""
step_data: RawSchema
row_context: dict[str, Any]
class TestTemplateBase:
"""
具体的测试用例容器。
此映射类不包含任何逻辑方法,仅用于承载由 Loader 挂载的 test_* 方法。
"""
pass
class CaseDataLoader:
"""
测试用例加载器
职责:扫描文件系统 -> 载入 YAML -> 拆解参数化 -> 封装为 CaseInfo 模型
"""
@staticmethod
def fetch_yaml_files(cases_dir: Union[str, Path]) -> Generator[Path, None, None]:
"""扫描目录并迭代返回 (文件路径, 原始内容)"""
base_path = Path(cases_dir)
if not base_path.exists():
logger.error(f"📂 测试目录不存在: {base_path}")
return
# 匹配所有以 test_ 开头的 yaml 文件
yield from base_path.rglob("test_*.yaml")
@classmethod
def load_cases(cls, file_path: Path) -> List[CaseEntity]:
"""
加载单个 YAML 文件并转化为 CaseInfo 列表
包含参数化数据的自动拆解逻辑
"""
entities: List[CaseEntity] = []
try:
# 1. 使用重构后的 YamlProcessor 加载原始字典
processor = FileHandle(file_path)
raw_data = processor.load()
if not raw_data:
return []
entities = cls._parse_parametrize(raw_data)
except YamlLoadError:
# YamlProcessor 已经记录了 error 日志,这里直接跳过
pass
except ValidationError as e:
logger.error(f"用例基础格式校验失败 [{file_path.name}]:\n{e.json()}")
except Exception as e:
logger.error(f"加载用例发生未知异常 [{file_path.name}]: {e}")
return entities
@staticmethod
def _parse_parametrize(raw_data: dict[str, Any]) -> List[CaseEntity]:
"""
解析参数化逻辑:将 raw_data 中的 parametrize 展开为多个 CaseInfo 实例
"""
entities = []
parametrize_data = raw_data.pop("parametrize", None)
# 2. 实例化唯一的模板对象 (Pydantic 校验)
template_case = RawSchema.model_validate(raw_data)
# template_case = CaseTemplate(**raw_data)
# 2. 检查是否存在参数化字段
if parametrize_data and isinstance(parametrize_data, list) and len(parametrize_data) >= 2:
# 3. 参数化拆分
headers = parametrize_data[0]
for row in parametrize_data[1:]:
row_map = dict(zip(headers, row))
# 包装为实体,存入引用而非副本
# 修正: 使用 model_copy() 避免多个用例共享同一个 Pydantic 模型实例,防止意外修改
entities.append(CaseEntity(step_data=template_case.model_copy(), row_context=row_map))
else:
# 普通用例,上下文为空
entities.append(CaseEntity(step_data=template_case.model_copy(), row_context={}))
return entities
@classmethod
def get_all_cases(cls, cases_dir: Union[str, Path]) -> List[CaseEntity]:
"""
全量获取接口:供 CaseGenerator 调用 frank
"""
all_cases = []
for file in cls.fetch_yaml_files(cases_dir):
all_cases.extend(cls.load_cases(file))
return all_cases
class CaseGenerator:
"""
职责 2: 用例构造工厂
负责将数据转化为 pytest 装饰的方法,并挂载到目标类
"""
@classmethod
def build_and_register(cls, target_cls: Type[TestTemplateBase], cases_dir: Union[str, Path]):
"""
构建测试用例并注册到目标测试类中。
遍历指定目录下的用例文件,解析数据,生成测试方法并动态绑定到 target_cls 上。
Args:
target_cls: 目标测试类(通常继承自 TestTemplateBase
cases_dir: 测试用例文件所在的目录路径。
"""
# 1. 通过 Loader 获取数据
all_cases = CaseDataLoader.get_all_cases(cases_dir)
for index, case_info in enumerate(all_cases):
case_title = case_info.row_context.get("title") or case_info.step_data.title
dynamic_test_method = cls._create_case_method(title=case_title, entity=case_info)
safe_title = "".join([c if c.isalnum() else "_" for c in case_title])[:50]
method_name = f"test_{index:03d}_{safe_title}"
print(method_name)
setattr(target_cls, method_name, dynamic_test_method)
# print(target_cls.__dict__)
logger.debug(f"Successfully registered: {method_name}")
@staticmethod
def _create_case_method(title, entity: CaseEntity):
"""
封装具体的 pytest 执行节点。
创建并返回一个闭包函数该函数包含完整的测试执行逻辑Allure 设置、日志、执行器调用)。
Args:
title: 测试用例标题。
entity: 包含用例数据和上下文的实体对象。
Returns:
function: 可被 pytest 识别和执行的测试方法。
"""
case_template = entity.step_data
context = entity.row_context
def build_actual_case(instance: TestTemplateBase, execution_context):
# --- 1. 动态设置 Allure 报告属性 ---
allure.dynamic.epic(case_template.epic or settings.allure_epic)
allure.dynamic.feature(case_template.feature or settings.allure_feature)
allure.dynamic.story(case_template.story or settings.allure_story)
allure.dynamic.title(title)
# 日志记录 (利用 instance 标注来源)
logger.info(f"[Runner] Class: {instance.__class__.__name__} | Case: {title}")
try:
WorkflowExecutor.perform(case_template, execution_context, context=context)
except Exception as e:
# 可以在这里记录更详细的运行上下文快照
logger.error(f"Case 执行失败: {title} | 错误: {e}")
raise
return build_actual_case
if __name__ == '__main__':
from settings import TEST_CASE_DIR
# print(CaseDataLoader.get_all_cases(TEST_CASE_DIR))
# --- 引导执行 ---
CaseGenerator.build_and_register(TestTemplateBase, settings.TEST_CASE_DIR)

209
core/exchange.py Normal file
View File

@@ -0,0 +1,209 @@
#!/usr/bin/env python
# coding=utf-8
"""
@desc: 变量交换器,用于数据替换和提取
"""
import logging
import re
from typing import Any, Union, TypeVar
import jsonpath
from lxml import etree
from core.models import RawSchema
from core.settings import EXTRACT_CACHE
from core.templates import Template
from commons.file_processors.yaml_processor import YamlProcessor
logger = logging.getLogger(__name__)
# 定义泛型,用于保持返回类型一致
T = TypeVar("T", bound=Union[dict, list, str, Any])
class Exchange:
"""
变量交换器类。
负责管理全局变量缓存,核心职能包括:
1. Extract: 从响应结果中提取变量。
2. Replace: 将数据中的变量占位符替换为实际值。
"""
def __init__(self, variable_cache: dict[str, Any]):
"""
初始化交换器。
Args:
variable_cache: 初始变量缓存字典(引用传递,修改会影响源数据)。
"""
self._cache = variable_cache
# 匹配标准变量 ${var},排除函数调用 ${func()}
self.var_only_pattern = re.compile(r"^\$\{([a-zA-Z_]\w*)}$")
@property
def global_vars(self) -> dict:
"""获取当前全局变量缓存。"""
return self._cache
@global_vars.setter
def global_vars(self, global_vars: dict) -> None:
"""设置全局变量缓存(通常用于上下文切换,如 ChainMap 合并)。"""
self._cache = global_vars
def extract(self, resp: Any, var_name: str, attr: str, expr: str, index: int = 0):
"""
从响应中提取数据并更新到缓存及文件
:param resp: Response 对象
:param var_name: 变量名
:param attr: 属性名 (json, text, headers 等)
:param expr: 提取表达式 ($.jsonpath, //xpath, regex)
:param index: 索引
"""
try:
# 兼容处理 resp.json
target_data = getattr(resp, attr, None)
if attr == "json":
try:
target_data = resp.json()
except Exception:
target_data = {"msg": "not json data"}
if target_data is None:
logger.warning(f"提取失败: 响应对象中不存在属性 '{attr}'")
return
value = None
if expr.startswith("$"): # JSONPath
res = jsonpath.jsonpath(target_data, expr)
if res: value = res[index]
elif expr.startswith("/") or expr.startswith("./"): # XPath 模式
html_content = getattr(resp, "text", "") # 使用 getattr 防护
if not html_content:
logger.warning("XPath 提取失败:响应文本为空")
return
# 将文本解析为 HTML 树
# html_content = resp.text
tree = etree.HTML(html_content)
res = tree.xpath(expr)
if res:
# 获取节点文本或属性值
target_node = res[index]
value = target_node.text if hasattr(target_node, 'text') else str(target_node)
else: # 正则
res = re.findall(expr, str(target_data))
if res: value = res[index]
if value is None:
logger.warning(f"变量 [{var_name}] 未通过表达式 [{expr}] 提取到数据")
value = "not data"
self._cache[var_name] = value
logger.info(f"变量提取成功: {var_name} -> {value} (Type: {type(value).__name__})")
except Exception as e:
logger.error(f"提取变量 [{var_name}] 过程中发生异常: {e}", exc_info=True)
def _smart_replace(self, content: Any) -> Any:
"""
递归替换逻辑:
- 如果是纯变量占位符 ${token},则返回变量在缓存中的原始类型 (int, dict, list 等)
- 如果是混合字符串或函数调用,则调用 Template 渲染为字符串
"""
if isinstance(content, dict):
return {k: self._smart_replace(v) for k, v in content.items()}
elif isinstance(content, list):
return [self._smart_replace(i) for i in content]
elif isinstance(content, str):
# A. 场景:纯变量(为了保持类型,不走 Template 渲染成字符串)
# 例子content = "${order_id}",如果 order_id 是 int 123则返回 123
full_match = self.var_only_pattern.fullmatch(content)
if full_match:
var_name = full_match.group(1)
return self._cache.get(var_name, content)
# B. 场景:混合文本或函数调用
# 例子:"Bearer ${token}" 或 "${gen_phone()}"
if "${" in content:
# 调用你提供的 Template 类
return Template(content).render(self._cache)
return content
def replace(self, data: T) -> T:
"""
通用的变量替换入口
支持输入 dict, list, str 或 Pydantic Model (需先 dump)
"""
if not data:
return data
logger.debug(f"开始变量替换,原始数据类型: {type(data).__name__}")
rendered_data = self._smart_replace(data)
return rendered_data
if __name__ == "__main__":
from core.models import RawSchema, HttpAction
file_handler = YamlProcessor(filepath=EXTRACT_CACHE)
variable_cache_ = file_handler.load() or {}
ex = Exchange(variable_cache_)
# --- 场景 1: 变量提取验证 ---
class MockResponse:
def __init__(self):
self.json_data = {"data": {"token": "auth_123", "user_id": 888}}
self.text = "<html><body><div id='name'>ChenWei</div></body></html>"
def json(self): return self.json_data
mock_resp = MockResponse()
print(">>> 执行提取...")
ex.extract(mock_resp, "token", "json", "$.data.token")
ex.extract(mock_resp, "u_id", "json", "$.data.user_id")
ex.extract(mock_resp, "user_name", "text", "//div[@id='name']")
# --- 场景 2: 变量替换与类型保持 ---
# 定义一个复杂的 CaseInfo
raw_case = {
"title": "测试用例",
"action": {
"method": "POST",
"url": "http://api.com/${token}", # 混合文本 -> 应转为 str
"json_body": {
"id": "${u_id}", # 纯变量 -> 应保持 int
"name": "${user_name}", # 纯变量 -> str
"config": "${existing_var}" # 初始文件变量 -> int
},
"timeout": "${existing_var}" # 字符串形式的数字 -> Pydantic 应转回 int
}
}
print("\n>>> 执行替换...")
new_case_one = ex.replace(raw_case)
print(new_case_one)
RawSchema(**new_case_one)
print(new_case_one.get("action"))
action = HttpAction(**new_case_one.get("action"))
print(action)
# # --- 校验结果 ---
print("\n--- 验证结果 ---")
print(f"URL (混合文本): {action.url} | 类型: {type(action.url)}")
print(f"ID (类型保持): {action.json_body['id']} | 类型: {type(action.json_body['id'])}")
print(f"Timeout (自动转换): {action.timeout} | 类型: {type(action.timeout)}")
# #
assert isinstance(action.json_body['id'], int)
# #
assert action.url == "http://api.com/auth_123"
assert action.timeout == 100
print("\nExchange 场景全部验证通过!")

149
core/executor.py Normal file
View File

@@ -0,0 +1,149 @@
#!/usr/bin/env python
# coding=utf-8
"""
@desc: 核心测试用例执行引擎
"""
import logging
import importlib
from typing import Any, List, Optional
from collections import ChainMap
from pydantic import TypeAdapter
from core import settings
from core.context import ExecutionEnv
from core.models import RawSchema, ValidateItem, HttpAction, ApiActionModel
from core.session import Session
from core.exchange import Exchange
from utils.case_validator import CaseValidator
logger = logging.getLogger(__name__)
# 定义一个复用的适配器(减少初始化开销)
VALIDATE_LIST_ADAPTER = TypeAdapter(List[ValidateItem])
class WorkflowExecutor:
"""
工作流执行器。
作为测试执行的核心引擎,负责调度单个用例的完整生命周期:
1. 上下文准备(变量池合并)。
2. 动作路由与执行HTTP 请求或 PO 方法反射调用)。
3. 后处理(变量提取与断言校验)。
"""
@classmethod
def perform(cls, case_info: RawSchema, env: ExecutionEnv, context: Optional[dict[str, Any]] = None) -> Any:
"""执行单个用例支持直接请求和PO模式调用"""
context = context or {}
# --- 重点 1备份并切换上下文 ---
# 保存 Exchange 当前的全局字典引用
original_cache = env.exchanger.global_vars
# 1. 建立优先级变量池 (参数化变量 > 全局提取变量)
# ChainMap 是实现“局部覆盖全局”性能最好的方案
combined_vars = ChainMap(context, original_cache)
# 将 Exchange 的内部缓存临时指向这个合并池
env.exchanger.global_vars = combined_vars
resp = None # 初始化 resp避免异常时引用未定义
try:
# 2. 动态更新标题(如果 context 中包含 title
current_title = context.get("title") or case_info.title
logger.info(f"🚀 执行用例: {current_title}")
raw_action_dict = case_info.action.model_dump(by_alias=True, exclude_none=True)
rendered_action_dict = env.exchanger.replace(raw_action_dict)
# --- 2. 决定执行模式 ---
if case_info.is_po_mode():
# 重新校验以修复类型(如 params 里的 int
rendered_action = ApiActionModel.model_validate(rendered_action_dict)
# PO 模式:反射调用
resp = cls._execute_po_method(rendered_action, env)
else:
# 接口模式:直接请求
rendered_request = HttpAction.model_validate(rendered_action_dict)
request_kwargs = rendered_request.model_dump(by_alias=True, exclude_none=True)
resp = env.session.request(**request_kwargs)
# --- 3. 后处理:提取与断言 ---
cls._post_process(resp, case_info, env, original_cache)
return resp
except Exception as e:
logger.error(f"用例执行失败: {case_info.title} | 原因: {e}", exc_info=True)
raise
finally:
# 兜底确保环境还原 (尽管 try 块中已经还原了一次,这里确保异常情况下也复位)
env.exchanger.global_vars = original_cache
@staticmethod
def _execute_po_method(action: ApiActionModel, env: ExecutionEnv):
"""核心反射逻辑:根据字符串动态加载 api/ 目录下的类并执行方法"""
class_name = action.module
method_name = action.method
params = action.params or {}
# 1. 确定模块路径:优先级策略
# 优先级 1: 显式映射 (API_MAP)
module_name = settings.API_MAP.get(class_name)
# 优先级 2: 规约命名 (UserAPI -> api.user_api)
if not module_name:
base_name = class_name.lower().replace('api', '')
module_name = f"{settings.API_PACKAGE}.{base_name}_api"
try:
# 1. 动态导入模块(假设都在 api 目录下)
module = importlib.import_module(module_name)
# 2. 获取类并实例化
cls = getattr(module, class_name)
api_instance = cls(env.session) # 传入 session 保持会话统一
# 3. 调用方法并返回结果
method = getattr(api_instance, method_name)
logger.info(f"调用业务层: {class_name}.{method_name} 参数: {params}")
return method(**params)
except ImportError as e:
logger.error(f"模块导入失败: 在 '{module_name}' 未找到对应文件。请检查文件名或 settings.API_MAP 配置。")
raise e
except AttributeError as e:
logger.error(f"成员获取失败: 模块 '{module_name}' 中不存在类或方法 '{class_name}.{method_name}'")
raise e
except Exception as e:
logger.error(f"反射调用失败: {class_name}.{method_name} -> {e}")
raise
@classmethod
def _post_process(cls, resp: Any, case_info: RawSchema, env: ExecutionEnv, original_cache: dict):
"""
统一后处理逻辑:处理变量提取(写全局)和断言校验(读局部+全局)
"""
# 记录当前的混合上下文 (ChainMap),供断言使用
combined_vars = env.exchanger.global_vars
# 1. 变量提取 (Write Operation)
if case_info.extract:
try:
# 必须切回 original_cache 才能持久化写入到全局变量池
env.exchanger.global_vars = original_cache
for var_name, extract_info in case_info.extract.items():
env.exchanger.extract(resp, var_name, *extract_info)
finally:
# 提取完成后,切回 combined_vars防止后续逻辑如断言丢失局部变量上下文
env.exchanger.global_vars = combined_vars
# 2. 断言校验 (Read Operation)
if case_info.validate_data:
raw_validate_list = [
item.model_dump(by_alias=True) if isinstance(item, ValidateItem) else item
for item in case_info.validate_data
]
rendered_validate_list = env.exchanger.replace(raw_validate_list)
# 重新通过 Adapter 触发类型修复 (str -> int)
final_validate_data = VALIDATE_LIST_ADAPTER.validate_python(rendered_validate_list)
CaseValidator.validate(resp, final_validate_data)

144
core/models.py Normal file
View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: models.py
@date: 2024 2024/9/15 21:14
@desc: 声明yaml用例格式
"""
import logging
from typing import List, Any
from pydantic import BaseModel, Field, ConfigDict
logger = logging.getLogger(__name__)
class HttpAction(BaseModel):
"""
HTTP 请求动作模型。
定义了发起 HTTP 请求所需的所有参数包括方法、URL、头信息、参数、请求体等。
"""
method: str = Field(..., description="HTTP 请求方法: get, post, etc.")
url: str = Field(..., description="接口路径或完整 URL")
headers: dict[str, Any] | None = Field(default=None, description="HTTP 请求头")
params: dict[str, Any] | None = Field(default=None, description="URL 查询参数")
data: dict[str, Any] | None = None
json_body: Any | None = Field(default=None, alias="json")
timeout: int = 10
files: dict[str, Any] | None = None
model_config = ConfigDict(extra="allow", populate_by_name=True)
class ApiActionModel(BaseModel):
"""
PO (Page Object) 模式动作模型。
定义了调用封装在 API 类中的方法所需的信息,通过反射机制动态执行。
"""
module: str = Field(..., alias="class", description="要调用的 API 类名")
method: str = Field(..., description="类中的方法名")
params: dict[str, Any] = Field(default_factory=dict, description="传给方法的参数")
model_config = ConfigDict(populate_by_name=True)
class ValidateItem(BaseModel):
"""
断言项模型。
定义了测试用例执行后的校验规则,包括检查字段、断言方法和期望值。
"""
check: str = Field(..., description="要检查的字段或表达式")
assert_method: str = Field(alias="assert", default="equals")
expect: Any = Field(..., description="期望值")
msg: str = Field(default="Assertion", description="断言描述")
model_config = ConfigDict(populate_by_name=True)
class RawSchema(BaseModel):
"""
测试用例原始数据模型。
对应 YAML 用例文件的结构,包含元数据、动作定义、变量提取和断言规则。
"""
title: str = Field(..., description="用例标题")
epic: str | None = None
feature: str | None = None
story: str | None = None
# 统一使用 action 字段承载业务逻辑 (Http 或 PO)
action: dict[str, Any] = Field(description="请求内容或PO动作内容")
extract: dict[str, List[Any]] | None = Field(
default=None,
description="变量提取表达式,格式: {变量名: [来源, 表达式, 索引]}"
)
validate_data: List[Any] = Field(
default_factory=list,
alias="validate",
description="断言信息"
)
model_config = ConfigDict(extra="allow",
populate_by_name=True, # 无论是在代码中用 api_class 还是在 YAML 中用 class 赋值Pydantic 都能正确识别。
arbitrary_types_allowed=True # 允许在模型中使用非 Pydantic 标准类型(如自定义类实例)
) # 允许参数化等额外字段
def is_po_mode(self) -> bool:
"""判断是否为 PO 模式"""
return "class" in self.action or "module" in self.action
if __name__ == '__main__':
# 模拟数据 1标准请求模式
raw_case_1 = {
"title": "查询状态信息",
"action": {
"method": "get",
"url": "/api/v1/info",
"headers": {"User-Agent": "pytest-ai"},
"json": {"User-Agent": "pytest-ai"}
},
"validate": [
{"check": "status_code", "assert": "equals", "expect": 200, "msg": "响应码200"},
{"check": "$.msg", "expect": "Success"}
]
}
# 模拟数据 2PO 模式 (反射调用)
raw_case_2 = {
"title": "用户登录测试",
"action": {
"class": "UserAPI",
"method": "login",
"params": {"user": "admin", "pwd": "123"}
},
"extract": {
"token": ["json", "$.data.token", 0]
}
}
print("--- 开始模型校验测试 ---\n")
try:
# 验证模式 1
case1 = RawSchema(**raw_case_1)
print(f"✅ 模式1 (Request) 校验通过: {case1.title}")
print(f" http: {case1.action}")
print(f" 断言规则数: {len(case1.validate_data)}\n")
# 验证模式 2
case2 = RawSchema(**raw_case_2)
print(f"✅ 模式2 (PO Mode) 校验通过: {case2.title}")
print(f" api: {case2.action}")
print(f" 提取规则数: {len(case2.extract)}\n")
# 验证非法数据(如:既没有 request 也没有 api_action 的情况可以在业务层进一步校验)
# 这里演示 Pydantic 自动类型转换
invalid_data = {"title": "错误用例", "action": {"url": "/api"}} # 缺少 method
print("--- 预期失败测试 ---")
RawSchema(**invalid_data)
except Exception as e:
print(f"❌ 预期内的校验失败: \n{e}")

View File

@@ -6,7 +6,7 @@
@Software: PyCharm @Software: PyCharm
@contact: t6i888@163.com @contact: t6i888@163.com
@file: session.py @file: session.py
@date: 2024 2024/9/12 21:56 @date: 2024/9/12 21:56
@desc: @desc:
""" """
import logging import logging
@@ -19,22 +19,69 @@ import allure
# logger = logging.getLogger("requests.session") # logger = logging.getLogger("requests.session")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO) # logging.basicConfig(level=logging.INFO)
class Session(requests.Session): class Session(requests.Session):
"""
自定义会话管理类继承自 requests.Session
增强功能
1. Base URL 管理支持相对路径自动拼接
2. Allure 集成自动将请求操作包装为 Allure 步骤
3. 日志记录详细记录请求和响应的 头部正文状态码等信息
"""
def __init__(self, base_url=None): def __init__(self, base_url=None):
"""
初始化会话
Args:
base_url: 基础 URL用于拼接相对路径请求
"""
super().__init__() # 先执行父类的初始化 super().__init__() # 先执行父类的初始化
self.base_url = base_url # 在执行子类的初始化操作 self.base_url = base_url # 在执行子类的初始化操作
@allure.step("发送请求") @allure.step("发送请求")
def request(self, method, url: str, *args, **kwargs) -> Response: def request(self, method, url: str, *args, **kwargs) -> Response:
"""
发送 HTTP 请求重写
逻辑
1. 如果 url 是相对路径自动拼接 base_url
2. 记录 Allure 步骤
Args:
method: 请求方法 (GET, POST, etc.)
url: 请求 URL (支持相对路径)
*args: 透传给 requests.Session.request 的位置参数
**kwargs: 透传给 requests.Session.request 的关键字参数
Returns:
Response: 响应对象
"""
if not url.startswith("http"): if not url.startswith("http"):
# 自动添加baseurl # 自动添加baseurl
url = urljoin(self.base_url, url) url = urljoin(self.base_url, url)
return super().request(method, url, *args, **kwargs) # 按照原有方式执行 return super().request(method, url, *args, **kwargs) # 按照原有方式执行
def send(self, request: PreparedRequest, *args, **kwargs) -> Response: def send(self, request: PreparedRequest, *args, **kwargs) -> Response:
"""
发送底层 PreparedRequest重写
逻辑
1. 记录请求详细日志 (URL, Headers, Body)
2. 执行真实网络请求
3. 记录响应详细日志 (Status, Headers, Body)
Args:
request: 已准备好的请求对象
*args: 透传参数
**kwargs: 透传参数
Returns:
Response: 响应对象
"""
logger.info(f"发送请求>>>>>> 接口地址 = {request.method} {request.url}") logger.info(f"发送请求>>>>>> 接口地址 = {request.method} {request.url}")
logger.info(f"发送请求>>>>>> 请求头 = {request.headers}") logger.info(f"发送请求>>>>>> 请求头 = {request.headers}")
logger.info(f"发送请求>>>>>> 请求正文 = {request.body} ") logger.info(f"发送请求>>>>>> 请求正文 = {request.body} ")

64
core/settings.py Normal file
View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: settings
@date: 2025/2/23 21:34
@desc:
"""
import os
from pathlib import Path
from dotenv import load_dotenv
BASE_DIR = (Path(__file__)).resolve().parents[1]
load_dotenv()
# --- 目录配置 ---
TEST_CASE_DIR = BASE_DIR / "test_cases"
OUTPUT_DIR = BASE_DIR / "outputs"
SCREENSHOT_DIR = OUTPUT_DIR / "screenshots"
LOG_DIR = OUTPUT_DIR / "logs"
LOG_BACKUP_DIR = LOG_DIR / "backups"
ALLURE_TEMP = BASE_DIR / "temp"
REPORT_DIR = BASE_DIR / "reports"
CONFIG_DIR = BASE_DIR / "config"
DATA_DIR = BASE_DIR / "data"
# 需要初始化的目录列表
REQUIRED_DIRS = [LOG_DIR, LOG_BACKUP_DIR, ALLURE_TEMP, SCREENSHOT_DIR]
# 核心 API 目录路径
API_PACKAGE = "api"
LOG_SOURCE = LOG_DIR / "pytest.log"
EXTRACT_CACHE = BASE_DIR / "data/extract.yaml"
# 可选:显式映射(类名 -> 完整模块路径),解决文件名不规则的问题
API_MAP = {
"UserAPI": "api.business.user",
"OrderAPI": "api.v2.order_manager"
}
allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature"
allure_story: str = "默认事件story"
test_suffix = "yaml"
base_url = os.getenv("BASE_URL")
rsa_public = ""
rsa_private = ""
if __name__ == '__main__':
print(BASE_DIR)

180
core/templates.py Normal file
View File

@@ -0,0 +1,180 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: templates.py
@date: 2024 2024/9/22 22:20
@desc:
"""
import copy
import logging
import re
import string
import ast
from typing import List, Any
from commons.funcs import Funcs
logger = logging.getLogger(__name__)
class Template(string.Template):
"""
增强型模板引擎:
1. 兼容标准变量替换 ${var}
2. 支持带参数的函数调用 ${func(arg1, arg2)}
3. 支持变量嵌套作为函数参数 ${func(${var})}
"""
# call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
# call_pattern = re.compile(r"\$\{(?P<func_name>[a-zA-Z_]\w*)\((?P<func_args>.*)\)}")
# 匹配函数调用结构:${函数名(参数)}
# 分组func_name (字母下划线开头), func_args (括号内的所有内容)
call_pattern = re.compile(r"\$\{(?P<func_name>[a-zA-Z_]\w*)\((?P<func_args>.*)\)}")
def render(self, mapping: dict) -> str:
"""
渲染入口
:param mapping: 变量缓存(来自 Exchange._variable_cache
:return: 渲染后的字符串
"""
# 1. 第一步:利用原生 string.Template 替换基础变量
# 这一步会将参数中的 ${var} 预先替换为实际值,从而支持函数嵌套调用
s = self.safe_substitute(mapping) # 原有方法替换变量
# 2. 第二步:解析并执行函数调用
s = self.safe_substitute_funcs(s, mapping)
return s
@staticmethod
def _parse_args(args_str: str, mapping: dict) -> List[Any]:
"""
核心优化:安全拆分函数参数
利用正则预读,跳过引号内的逗号,解决 ${func('a,b', 123)} 的分割问题
"""
args_str = args_str.strip()
if not args_str:
return []
# 正则解析说明:匹配逗号,但该逗号后面必须有偶数个引号(说明逗号不在引号内)
raw_args = re.split(r',(?=(?:[^\'"]*[\'"][^\'"]*[\'"])*[^\'"]*$)', args_str)
processed_args = []
for arg in raw_args:
arg = arg.strip()
# 1. 处理带引号的字符串参数
if (arg.startswith("'") and arg.endswith("'")) or (arg.startswith('"') and arg.endswith('"')):
processed_args.append(arg[1:-1])
# 2. 处理数字类型
elif arg.isdigit():
processed_args.append(int(arg))
# 3. 处理布尔值
elif arg.lower() == "true":
processed_args.append(True)
elif arg.lower() == "false":
processed_args.append(False)
# 4. 如果在 mapping 中能找到(针对未经过第一步替换的情况),取其值
elif arg in mapping:
processed_args.append(mapping[arg])
# 5. 其他情况按原样字符串处理
else:
processed_args.append(arg)
return processed_args
def safe_substitute_funcs(self, template: str, mapping: dict) -> str:
"""
解析字符串中的函数名和参数,并将函数调用结果进行替换
:param template: 字符串
:param mapping: 上下文,提供要使用的函数和变量
:return: 替换后的结果
"""
# 合并函数映射和变量映射,作为统一上下文
# 使用解构赋值替代 deepcopy提升性能
logger.info(f"mapping更新前: {mapping}")
render_context = {**Funcs.FUNC_MAPPING, **mapping}
logger.info(f"mapping更新后: {render_context}")
# mapping = copy.deepcopy(mapping)
# logger.info(f"mapping更新前: {mapping}")
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
# mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
# logger.info(f"mapping更新后: {mapping}")
def convert(mo):
func_name = mo.group("func_name")
# func_args = mo.group("func_args").split(",")
func_args_str = mo.group("func_args")
func = render_context.get(func_name) # 读取指定函数
if not callable(func):
logger.warning(f"模板中的函数 '{func_name}' 未定义或不可调用")
return mo.group()
# 解析参数列表
args = self._parse_args(func_args_str, render_context)
try:
# 执行函数并强制转为字符串返回,以便 re.sub 替换
result = func(*args)
return str(result)
except Exception as e:
logger.error(f"执行函数 ${{{func_name}(...)}} 报错: {e}", exc_info=True)
return mo.group()
return self.call_pattern.sub(convert, template)
if __name__ == '__main__':
# 模拟 Funcs.FUNC_MAPPING
def mock_concat(a, b):
return f"{a}_{b}"
def mock_get_now():
return "2026-03-09"
def mock_add(x, y):
return x + y
# 注入模拟函数
Funcs.FUNC_MAPPING = {
"concat": mock_concat,
"now": mock_get_now,
"add": mock_add
}
# 模拟变量缓存
test_mapping = {
"env": "prod",
"num1": 10,
"num2": 20
}
test_cases = [
("场景A标准变量", "Current env is ${env}", "Current env is prod"),
("场景B无参数函数", "Date: ${now()}", "Date: 2026-03-09"),
("场景C带参数函数(含逗号)", "Res: ${concat('hello,world', 'test')}", "Res: hello,world_test"),
("场景D变量嵌套函数参数", "Sum: ${add(${num1}, ${num2})}", "Sum: 30"),
("场景E混合模式", "URL: /${env}/api/${now()}", "URL: /prod/api/2026-03-09"),
("场景F参数类型自动识别", "Value: ${add(5, 5)}", "Value: 10"), # 5应该被识别为int
]
print(f"{'测试场景':<25} | {'预期结果':<30} | {'实际结果'}")
print("-" * 80)
for scene, tpl_str, expected in test_cases:
actual = Template(tpl_str).render(test_mapping)
status = "" if str(actual) == str(expected) else ""
print(f"{scene:<25} | {expected:<30} | {actual} {status}")
# 特殊验证:嵌套失败回退
print("\n>>> 验证未定义函数回退:")
error_tpl = "Check: ${undefined_func()}"
print(f"结果: {Template(error_tpl).render(test_mapping)}")

1
data/extract.yaml Normal file
View File

@@ -0,0 +1 @@
existing_var: '100'

56
docs/README.md Normal file
View File

@@ -0,0 +1,56 @@
# Project Structure Documentation
This document outlines the recommended structure for the Interface Automation Test project. A well-organized structure promotes maintainability, scalability, and collaboration.
## Directory Structure
Here is the proposed optimized directory structure:
```
/
|-- core/ # Main source code
| |-- api.py
| |-- main.py
| |-- luffy.py
| +-- ...
|
|-- tests/ # Test cases
| |-- a_test_case.py
| +-- ...
|
|-- config/ # Configuration files
| |-- id.yaml
| |-- extract.yaml
| +-- ...
|
|-- utils/ # Utility modules
|
|-- docs/ # Project documentation
| +-- README.md
|
|-- .gitignore # Git ignore file
|-- pytest.ini # Pytest configuration
|-- pyproject.toml # Python project configuration
|-- README.md # Main project README
```
## Description of Directories
* **`core/`**: This directory contains the core application logic for the interface tests. Files like `api.py`, `main.py`, and `luffy.py` which handle the main business logic should reside here.
* **`tests/`**: This directory is for all the automated tests. Each test file should ideally correspond to a module or a feature.
* **`config/`**: This directory should store all configuration files, such as `id.yaml` and `extract.yaml`. This separation makes it easier to manage different environments (e.g., development, staging, production).
* **`utils/`**: This directory holds common utility functions and helper scripts that can be used across different parts of the project.
* **`docs/`**: This directory contains all project-related documentation, including this structure guide.
## Benefits of this Structure
* **Clarity**: A clear separation of concerns makes it easy to find code.
* **Maintainability**: Easier to maintain and refactor code without affecting other parts of the system.
* **Scalability**: The structure can easily scale as the project grows in complexity.
* **Collaboration**: New developers can quickly understand the project layout and start contributing.
We recommend moving the existing files into this new structure to improve the overall quality of the project.

80
docs/架构改进.md Normal file
View File

@@ -0,0 +1,80 @@
# 自动化测试框架架构改进建议
本文档基于对当前 `InterfaceAutoTest` 项目代码的深度分析,整理了针对框架稳定性、扩展性和易用性的架构改进建议。
## 1. 并发执行支持 (Concurrency Support)
### 现状问题
当前 `VariableStore` 使用简单的文件读写 (`extract.yaml`) 来存储全局变量。
- 在使用 `pytest-xdist` 进行多进程并发测试时,每个进程会加载独立的内存变量副本。
- 测试结束写回文件时,不同进程会相互覆盖,导致变量提取丢失或数据不一致。
### 改进方案
1. **引入分布式缓存 (推荐)**:
- 使用 **Redis** 作为变量存储后端。
- Redis 天然支持原子操作和并发读写,能完美解决多进程数据共享问题。
2. **文件锁机制 (轻量级)**:
- 如果不引入 Redis需在 `VariableStore` 的读写操作中增加 **文件锁 (File Lock)** (如使用 `filelock` 库)。
- 这会降低并发性能,但能保证数据一致性。
## 2. 配置管理增强 (Configuration Management)
### 现状问题
`settings.py` 中存在大量硬编码配置(如 API 映射、日志路径且缺乏对多环境Dev/Test/Prod的动态切换支持。
### 改进方案
1. **多环境配置文件**:
- 建立 `config/` 目录,分离 `base_config.yaml`, `dev.yaml`, `prod.yaml`
- 运行时通过环境变量 `ENV=prod` 加载对应配置并合并。
2. **环境变量集成**:
- 使用 `.env` 文件管理敏感信息和基础路径。
- 利用 `python-dotenv` 在项目启动时加载环境变量。
## 3. 扩展性与钩子机制 (Extensibility & Hooks)
### 现状问题
`WorkflowExecutor` 的执行逻辑(准备 -> 请求 -> 后处理)是固定的。如果需要添加自定义逻辑(如请求签名加密、复杂的响应解密),目前很难插入。
### 改进方案
在执行器中引入 **Hooks (钩子)** 机制,允许注册回调函数:
- `before_request(request_data)`: 请求发出前调用,用于修改 Header、计算签名。
- `after_response(response)`: 收到响应后调用,用于全局解密、统一错误码判断。
- `before_case(context)` / `after_case(result)`: 用例级别的 setup/teardown。
## 4. 安全性管理 (Security)
### 现状问题
敏感数据如密码、SecretKey可能明文写在 YAML 用例中。
### 改进方案
扩展 `Exchange` 类的变量替换逻辑,增加对环境变量的读取支持:
- **语法示例**: `password: ${ENV:DB_PASSWORD}`
- 在运行时从系统环境变量中读取,避免将其提交到代码仓库。
## 5. 可观测性增强 (Observability)
### 现状问题
虽然 `Session` 类中有日志记录,但在高并发或海量日志场景下,难以串联单个用例的完整执行链路。
### 改进方案
1. **全链路 Trace ID**:
- 在用例开始执行时生成唯一的 `trace_id`
- 将其注入到 `logging``Extra` 信息中,使其出现在每一行日志里。
- 同时将 `trace_id` 添加到 HTTP 请求头中(如 `X-Trace-Id`),便于服务端排查。
2. **结构化日志**:
- 考虑使用 JSON 格式输出日志,便于接入 ELK 等日志分析系统。
## 6. 代码健壮性 (Robustness)
### 修复建议
- **属性一致性**: 检查 `core/executor.py` 中的 PO 模式反射逻辑,确保属性访问与 `core/models.py` 定义一致。
- `ApiActionModel` 定义了 `module` (alias=`class`)。
- 确保执行器中使用 `action.module` 而非 `action.api_class`,防止 `AttributeError`
---
**实施路线图建议**:
1. 优先修复代码健壮性问题(属性一致性)。
2. 实施配置管理增强,便于环境隔离。
3. 引入 Redis 或文件锁解决并发问题。
4. 逐步完善 Hooks 和 Trace ID。

22
docs/重构总结.md Normal file
View File

@@ -0,0 +1,22 @@
本次重构核心总结:升级为“模型驱动+混合模式”的自动化测试框架我们本次重构的目标是将现有框架从基于字典dict的松散操作升级为一个结构严谨、易于扩展的现代化测试框架。其核心包含以下四大支柱1.
核心驱动力Pydantic 模型层•目标:用强类型、带校验的模型对象取代脆弱的字典操作。•实现:创建 commons/models/case_model.py
文件,并定义 CaseInfo 类。•关键收益:•健壮性:在执行测试前,通过模型实例化,对 YAML
文件中的字段、类型、结构进行严格校验,提前发现拼写错误或格式问题。•可维护性:代码中不再出现 case.get("request")
这类“魔法字符串”,而是通过 case.request 这样的属性访问IDE 可以提供智能提示和补全,代码更清晰、更安全。•灵活性:支持使用
alias让 YAML 中的字段名(如 validate与模型属性名如 validate_data解耦使模型设计更符合 Python 规范。2.
执行模式支持混合模式Hybrid Mode•目标让框架同时适应简单的数据驱动测试和复杂的业务流测试。•实现•YAML 驱动模式:保留并优化
TestAPI 类。它负责扫描 tests/features/ 目录下的 test_*.yaml 文件,并动态生成 pytest 用例。此模式非常适合单接口、多场景的数据验证。•手动脚本模式:允许在
tests/flows/ 目录下直接编写 test_*.py 脚本。开发者可以像写普通 pytest 用例一样通过导入业务方法来编排复杂的、跨多个接口的业务流程。3.
架构设计:清晰的三层分离•目标:遵循最佳实践,分离关注点,让框架结构清晰,避免混乱。•实现:•数据层 (YAML + Pydantic Model)
:定义测试的输入数据和预期结果(是什么)。•业务层/服务层 (api/*.py):将原始的 HTTP 请求封装成具有业务含义的方法,如
api.auth.login()。它定义了如何执行具体业务操作(怎么做)。•测试层 (TestAPI 或 test_*.py)
作为“导演”负责编排测试流程。它从数据层获取数据调用业务层的方法执行动作并进行最终断言测什么。4.
上下文与状态:统一的会话与变量池•目标:打通 YAML 驱动和手动脚本之间的数据壁垒,实现真正的端到端流程测试。•实现:•所有测试(无论来源)共享同一个
core.session.Session 实例,确保 Cookie、Header 等会话状态的连续性。•所有测试共享同一个 commons.exchange.Exchange
实例(变量交换器)。•关键收益:手动脚本(.py中通过登录获取的 token可以被无缝地注入到后续的 YAML 用例中反之YAML 用例提取的
ID 也能被后续的 .py 脚本使用。重构后的标准执行流程(以 YAML 为例1.加载TestAPI 扫描并加载 test_*.yaml
文件。2.数据驱动DataDriver 将 YAML 文件内容解析为多个独立的、参数化的测试用例。3.执行:在 pytest 的 test_func 内部: a.
变量替换exchanger.replace() 将用例中的 ${variable} 替换为实际值。 b. 模型校验CaseInfo(**replaced_case_data) 将替换后的字典实例化为
CaseInfo 模型对象,完成数据校验。(这是与旧流程最核心的区别) c. 请求发送:使用模型对象的数据发送请求 session.request(**
case.request.model_dump())。 d. 变量提取exchanger.extract() 从响应中提取数据,并存入全局变量池。 e.
断言validator.assert_all(case.validate_data) 使用模型中的断言数据进行校验。

View File

@@ -1,7 +0,0 @@
name: 张三
age: '18'
data:
- 3
- 4
- 5
aaa: null

View File

@@ -1,54 +0,0 @@
03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:45 - 加载文件D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml
03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:50 - case_info=title: 查询状态信息
request:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
extract:
msg:
- json
- $.msg
- 0
validate:
equals:
状态码等于200:
- Success.
- ${msg}
parametrize: []
epic: 项目名称answer
feature: 页面状态
story: 状态
03/03/2025 05:34:28 PM [commons.models] INFO models.ddt:81 - 1执行这一步
03/03/2025 05:34:28 PM [commons.cases] INFO cases.new_case:63 - ddt_title=['查询状态信息']
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - -----------------Start: main.py::TestAPI::test_1_status[查询状态信息]-----------------
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:71 - =================================用例开始执行:查询状态信息==================================
03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称answer', feature='页面状态', story='状态')
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12}
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': <class 'int'>, 'float': <class 'float'>, 'bool': <class 'bool'>, 'url_unquote': <function url_unquote at 0x00000299E6AAC0D0>, 'str': <function to_string at 0x00000299E6AAC160>, 'time_str': <function time_str at 0x00000299E6AAC1F0>, 'add': <function add at 0x00000299E6AAC280>, 'sql': <function sql at 0x00000299E6AAC310>, 'new_id': <function new_id at 0x00000299E6AAC3A0>, 'last_id': <function last_id at 0x00000299E6AAC430>, 'md5': <function md5 at 0x00000299E6AAC4C0>, 'base64_encode': <function base64_encode at 0x00000299E6AAC550>, 'base64_decode': <function base64_decode at 0x00000299E6AAC5E0>, 'rsa_encode': <function rsa_encode at 0x00000299E6AAC670>, 'rsa_decode': <function rsa_decode at 0x00000299E6AAC700>}
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:75 - 1正在注入变量...
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:78 - 2正在请求接口...
03/03/2025 05:34:28 PM [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info
03/03/2025 05:34:28 PM [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': 'application/json, text/plain, */*', 'Connection': 'keep-alive', 'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Referer': 'http://119.91.19.171:40065/users/login'}
03/03/2025 05:34:28 PM [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None
03/03/2025 05:34:28 PM [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200
03/03/2025 05:34:28 PM [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Mon, 03 Mar 2025 09:34:29 GMT', 'Content-Length': '63'}
03/03/2025 05:34:28 PM [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': 'Success.', 'data': []}
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:81 - 3正在提取变量...
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:87 - 4正在断言...
03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称answer', feature='页面状态', story='状态')
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12}
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': <class 'int'>, 'float': <class 'float'>, 'bool': <class 'bool'>, 'url_unquote': <function url_unquote at 0x00000299E6AAC0D0>, 'str': <function to_string at 0x00000299E6AAC160>, 'time_str': <function time_str at 0x00000299E6AAC1F0>, 'add': <function add at 0x00000299E6AAC280>, 'sql': <function sql at 0x00000299E6AAC310>, 'new_id': <function new_id at 0x00000299E6AAC3A0>, 'last_id': <function last_id at 0x00000299E6AAC430>, 'md5': <function md5 at 0x00000299E6AAC4C0>, 'base64_encode': <function base64_encode at 0x00000299E6AAC550>, 'base64_decode': <function base64_decode at 0x00000299E6AAC5E0>, 'rsa_encode': <function rsa_encode at 0x00000299E6AAC670>, 'rsa_decode': <function rsa_decode at 0x00000299E6AAC700>}
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:32 - 键equals{'状态码等于200': ['Success.', 'Success.']}
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:34 - 获取到的断言:<function validate_equals at 0x00000299E6AAC940>
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.validate_equals:43 - assert Success. == Success., 状态码等于200执行这段代码
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:92 - =================================用例执行结束:查询状态信息==================================
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_result_log:190 - test status is PASSED (main.py::TestAPI::test_1_status[查询状态信息]):
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ------------------End: main.py::TestAPI::test_1_status[查询状态信息]------------------

View File

@@ -1,64 +0,0 @@
import os
import time
from logging.handlers import TimedRotatingFileHandler
class LufffyTimedRotatingFileHandler(TimedRotatingFileHandler):
def doRollover(self):
"""
do a rollover; in this case, a date/time stamp is appended to the filename
when the rollover happens. However, you want the file to be named for the
start of the interval, not the current time. If there is a backup count,
then we have to get a list of matching filenames, sort them and remove
the one with the oldest suffix.
"""
if self.stream:
self.stream.close()
self.stream = None
# get the time that this sequence started at and make it a TimeTuple
currentTime = int(time.time())
dstNow = time.localtime(currentTime)[-1]
t = self.rolloverAt - self.interval
if self.utc:
timeTuple = time.gmtime(t)
else:
timeTuple = time.localtime(t)
dstThen = timeTuple[-1]
if dstNow != dstThen:
if dstNow:
addend = 3600
else:
addend = -3600
timeTuple = time.localtime(t + addend)
"""
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
if os.path.exists(dfn):
os.remove(dfn)
self.rotate(self.baseFilename, dfn)
"""
# 多进程会导致误删日志,将上面代码重写为如下代码(判断如果不存在则重命名)
# 注意:如果改写的代码会影响其他模块则不能采用该方法
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
if not os.path.exists(dfn):
self.rotate(self.baseFilename, dfn)
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
if not self.delay:
self.stream = self._open()
newRolloverAt = self.computeRollover(currentTime)
while newRolloverAt <= currentTime:
newRolloverAt = newRolloverAt + self.interval
#If DST changes and midnight or weekly rollover, adjust for this.
if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc:
dstAtRollover = time.localtime(newRolloverAt)[-1]
if dstNow != dstAtRollover:
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
newRolloverAt += addend
self.rolloverAt = newRolloverAt

95
main.py
View File

@@ -1,19 +1,90 @@
import os
import shutil import shutil
import datetime import datetime
from pathlib import Path
import pytest import pytest
from commons.cases import TestAPI from core.settings import LOG_SOURCE, LOG_BACKUP_DIR, ALLURE_TEMP
# from core.enums import AppPlatform
from utils.dirs_manager import ensure_dirs_ok
from utils.report_handler import generate_allure_report
TestAPI.find_test_cases() # 加载yaml文件
if __name__ == '__main__': # netstat -ano | findstr :4723
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') # taskkill /PID 12345 /F
# 1启动框架生成临时文件
# -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录指定pytest.ini路径;--alluredir=temp。指定数据生成目录
pytest.main([__file__, "-x", "-v","--alluredir=temp"])
# 2生成HTML报告
os.system('allure generate temp -o report --clean') # java程序只能借助操作系统执行
# 3备份日志 def _archive_logs():
# shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log") """
在测试开始前,归档上一次运行的日志文件。
此时没有任何句柄占用move 操作是 100% 安全的。
"""
# 4. 备份日志 (无论测试是否崩溃都执行)
if LOG_SOURCE.exists() and LOG_SOURCE.stat().st_size > 0:
now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = LOG_BACKUP_DIR / f"pytest_{now}.log"
try:
# 移动并重命名
shutil.move(str(LOG_SOURCE), str(backup_path))
print(f"已自动归档上次运行的日志: {backup_path}")
# shutil.copy2(LOG_SOURCE, backup_path)
# print(f"日志已备份至: {backup_path}")
_clean_old_logs(LOG_BACKUP_DIR)
except Exception as e:
print(f"归档旧日志失败 (可能被外部编辑器打开): {e}")
else:
print("未找到原始日志文件,跳过备份。")
# 日志清理
def _clean_old_logs(backup_dir, keep_count=10):
files = sorted(Path(backup_dir).glob("pytest_*.log"), key=lambda p: p.stat().st_mtime)
while len(files) > keep_count:
file_to_remove = files.pop(0)
try:
file_to_remove.unlink(missing_ok=True)
except OSError as e:
print(f"清理旧日志失败 {file_to_remove}: {e}")
def _clean_temp_dirs():
"""
可选:如果你想在测试前清理掉旧的临时文件
"""
if ALLURE_TEMP.exists():
shutil.rmtree(ALLURE_TEMP)
# 加上 ignore_errors 是为了防止文件被占用导致整个测试无法启动
shutil.rmtree(ALLURE_TEMP, ignore_errors=True)
ALLURE_TEMP.mkdir(parents=True, exist_ok=True)
def main():
try:
# 1. 创建目录
ensure_dirs_ok()
# 2. 处理日志
_archive_logs()
# 3. 执行 Pytest
args = [
"test_cases",
"-x", # 注意:-x 表示遇到错误立即停止,如果是全量回归建议去掉 -x
"-v",
f"--alluredir={ALLURE_TEMP}",
# f"--platform={AppPlatform.ANDROID.value}",
# "--caps_name=wan_android"
]
pytest.main(args)
# 4. 生成报告
generate_allure_report()
except Exception as e:
print(f"自动化测试执行过程中发生异常: {e}")
finally:
print("Time-of-check to Time-of-use")
if __name__ == "__main__":
main()

View File

@@ -1,28 +1,24 @@
[tool.poetry] [project]
name = "interfaceautotest" name = "interfaceautotest"
version = "0.1.0" version = "0.1.0"
description = "" description = ""
authors = ["NianJiu <t6i888@163.com>"]
readme = "README.md" readme = "README.md"
requires-python = ">=3.11"
[tool.poetry.dependencies] dependencies = [
python = "^3.10" "requests>=2.32.3",
"pyyaml>=6.0.1",
"pytest>=8.3.3",
"jsonpath>=0.82.2",
"pymysql>=1.1.1",
"pytest-result-log>=1.2.2",
"allure-pytest>=2.13.5",
"cryptography>=44.0.2",
"python-dotenv>=0.9.9",
"pydantic>=2.12.5",
"lxml>=6.0.2",
]
requests = "^2.32.3" [[tool.uv.index]]
pyyaml = "^6.0.2"
pytest = "^8.3.3"
jsonpath = "^0.82.2"
pymysql = "^1.1.1"
pytest-result-log = "^1.2.2"
allure-pytest = "^2.13.5"
cryptography = "^44.0.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[[tool.poetry.source]]
name = "tsinghua"
url = "https://pypi.tuna.tsinghua.edu.cn/simple" url = "https://pypi.tuna.tsinghua.edu.cn/simple"
priority = "primary" default = true

View File

@@ -1,10 +1,22 @@
[pytest] [pytest]
addopts = -q --show-capture=no addopts = -q --show-capture=no --reruns 2 --reruns-delay 1
# 1. 开启实时控制台日志
log_cli = True
log_cli_level = INFO
log_cli_format = %(asctime)s %(levelname)-5s [%(name)s] - %(message)s
log_cli_date_format = %H:%M:%S
log_file = logs/pytest.log # 2. 开启日志文件记录
log_file_level = info log_file = outputs/logs/pytest.log
log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s log_file_level = INFO
log_file_date_format = %m/%d/%Y %I:%M:%S %p log_file_format = %(asctime)s %(levelname)-5s [%(name)s] %(module)s.%(funcName)s:%(lineno)d - %(message)s
log_file_date_format = %Y-%m-%d %H:%M:%S
disable_test_id_escaping_and_forfeit_all_rights_to_community_support = true # 3. 基础配置
# 解决中文测试用例显示为乱码Unicode的问题
disable_test_id_escaping_and_forfeit_all_rights_to_community_support = True
# 限制 Pytest 搜索范围,提升启动速度
testpaths = test_cases
python_files = test_*.py

View File

@@ -0,0 +1,42 @@
feature: 页面状态
story: 状态
title: 查询状态信息
epic: 的点点滴滴
action:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
json_body:
username: "${username}"
password: "${password}"
extract: # 提取变量
msg:
- "json"
- "$.msg"
- 0
validate:
- check: status_code
assert: ${status_assert} # <--- 动态断言方法
expect: ${status_expect} # <--- 动态期望值
msg: "校验接口状态码"
- check: message
assert: ${msg_assert} # <--- 动态断言方法
expect: ${msg_expect} # <--- 动态期望值
msg: "检查返回消息"
parametrize: # 数据驱动测试
# 定义列名,包括了断言方法和期望值
- [ "title", "username", "password", "status_assert", "status_expect", "msg_assert", "msg_expect" ]
# 定义每一行的数据,现在可以为每次运行指定不同的断言逻辑
- [ "场景1: 成功-状态码相等-消息包含Success", "user1", "pass1", "equals", 200, "contains", "Success" ]
- [ "场景2: 失败-状态码不相等-消息不包含Error", "user2", "pass2", "not_equals", 200, "not_contains", "Error" ]
- [ "场景3: 成功-状态码大于199-消息相等", "user3", "pass3", "greater_than", 199, "equals", "Success" ]
- [ "场景4: 失败-状态码小于500-消息为空", "user4", "pass4", "less_than", 500, "is_empty", "" ]

View File

@@ -0,0 +1,25 @@
feature: 用户管理
story: 状态查询
title: ${title} # 引用参数化里的变量
epic: 混合模式示例
# 【关键改动】:不再写具体的 url, method, headers
# 而是指定要调用的 API 类和方法
api_action:
class: UserAPI
method: get_connector_info
params: # 传给 get_connector_info 方法的参数
username: ${username}
password: ${password}
extract:
msg: ["json", "$.msg", 0]
validate:
equals:
业务状态码校验: ["${msg}", "Success."]
parametrize:
- ["title", "username", "password", "msg"]
- ["测试1", "user1", "pass1", "Success."]
- ["测试2", "user2", "pass2", "Fail."]

View File

@@ -0,0 +1,41 @@
feature: 页面状态
story: 状态
title: 查询状态信息
epic: 的点点滴滴
request:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
json_body: { username:${ username },password:${ password } }
extract: # 提取变量
msg:
- "json"
- "$.msg"
- 0
validate:
- check: status_code
assert: ${status_assert} # <--- 动态断言方法
expect: ${status_expect} # <--- 动态期望值
msg: "校验接口状态码"
- check: message
assert: ${msg_assert} # <--- 动态断言方法
expect: ${msg_expect} # <--- 动态期望值
msg: "检查返回消息"
parametrize: # 数据驱动测试
# 定义列名,包括了断言方法和期望值
- [ "title", "username", "password", "status_assert", "status_expect", "msg_assert", "msg_expect" ]
# 定义每一行的数据,现在可以为每次运行指定不同的断言逻辑
- [ "场景1: 成功-状态码相等-消息包含Success", "user1", "pass1", "equals", 200, "contains", "Success" ]
- [ "场景2: 失败-状态码不相等-消息不包含Error", "user2", "pass2", "not_equals", 200, "not_contains", "Error" ]
- [ "场景3: 成功-状态码大于199-消息相等", "user3", "pass3", "greater_than", 199, "equals", "Success" ]
- [ "场景4: 失败-状态码小于500-消息为空", "user4", "pass4", "less_than", 500, "is_empty", "" ]

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python
# coding=utf-8
import logging
from core import settings
from core.creator import CaseGenerator, TestTemplateBase
logger = logging.getLogger(__name__)
class TestRunner(TestTemplateBase):
"""
测试用例的动态容器 (Test Case Container)。
这是一个占位符类CaseGenerator 会扫描所有的 YAML 用例文件,
然后将每一个用例动态地生成为一个测试方法并挂载到这个类上。
Pytest 最终会发现并执行这些动态挂载的 test_* 方法。
"""
pass
try:
# --- 核心逻辑:动态生成测试用例 ---
# 当 Pytest 在“收集测试用例”阶段加载此模块时,下面的代码会立即执行。
logger.info("--- [Collector] 开始扫描并动态生成测试用例 ---")
CaseGenerator.build_and_register(target_cls=TestRunner, cases_dir=settings.TEST_CASE_DIR)
logger.info(f"--- [Collector] 测试用例生成完毕,已成功加载到 {TestRunner.__name__} ---")
except Exception as e:
logger.critical(f"--- [Collector] 动态生成测试用例时发生致命错误,测试执行中止 ---", exc_info=True)
# 抛出异常,让 pytest 捕获并报告为收集错误 (Collection Error)
raise RuntimeError("测试用例收集失败,请检查日志中的详细错误信息。") from e

View File

@@ -15,7 +15,7 @@ from dataclasses import dataclass, asdict, field
import yaml import yaml
from commons.models import CaseInfo from commons.models import TestCaseStruct
class CaseParser: class CaseParser:
@@ -23,15 +23,15 @@ class CaseParser:
def to_yaml(case_data: dict) -> str: def to_yaml(case_data: dict) -> str:
try: try:
CaseInfo(**case_data) TestCaseStruct(**case_data)
except TypeError as error: except TypeError as error:
logging.error(error) logging.error(error)
raise error raise error
return yaml.safe_dump(case_data, allow_unicode=True, sort_keys=False) return yaml.safe_dump(case_data, allow_unicode=True, sort_keys=False)
@staticmethod @staticmethod
def from_yaml(yaml_str: str) -> CaseInfo: def from_yaml(yaml_str: str) -> TestCaseStruct:
return CaseInfo(**yaml.safe_load(yaml_str)) return TestCaseStruct(**yaml.safe_load(yaml_str))
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -10,10 +10,18 @@
@desc: @desc:
""" """
import logging import logging
from typing import List, Union, Any
from pydantic import TypeAdapter
from core.exchange import Exchange
from core.models import ValidateItem
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
VALIDATE_LIST_ADAPTER = TypeAdapter(List[ValidateItem])
class CaseValidator: class CaseValidator:
VALIDATORS = {} VALIDATORS = {}
@@ -26,22 +34,46 @@ class CaseValidator:
return decorator return decorator
@classmethod @classmethod
def assert_all(cls, validate: dict): def validate(cls,response: Any, validate_list: List[ValidateItem]):
if not validate: """
核心断言入口:适配 CaseInfo.validate_data (List[ValidateItem])
"""
if not validate_list:
return return
for assert_type, cases in validate.items(): # dicts = [
logger.info(f"键:{assert_type},值:{cases}") # item.model_dump(by_alias=True) if isinstance(item, ValidateItem) else item for item in validate_list
validator = cls.VALIDATORS.get(assert_type) # ]
logger.info(f"获取到的断言:{validator}") # rendered = exchanger.replace(dicts)
# # 触发 SmartInt/SmartDict 类型修复
# final_list = VALIDATE_LIST_ADAPTER.validate_python(rendered)
for item in validate_list:
# 1. 提取模型中的数据
# 此时 final_case 里的 item 已经是经过变量替换后的实体
actual = item.check
expect = item.expect
method = item.assert_method # 即模型中的 alias="assert"
msg = item.msg or f"Assert {actual} {method} {expect}"
# 2. 获取对应的断言函数
validator = cls.VALIDATORS.get(method)
if not validator: if not validator:
raise KeyError(f"Unsupported validator: {assert_type}") logger.error(f"❌ 不支持的断言方式: {method}")
for msg, (a, b) in cases.items(): raise KeyError(f"Unsupported validator: {method}")
validator(a, b, msg)
# 3. 执行断言
try:
validator(actual, expect, msg)
except AssertionError as e:
logger.error(
f"❌ 断言失败: {msg} | 实际值: {actual} ({type(actual)}), 期望值: {expect} ({type(expect)})")
raise e
@CaseValidator.register('equals') @CaseValidator.register('equals')
def validate_equals(a, b, msg): def validate_equals(a, b, msg):
logger.info(f"assert {a} == {b}, {msg}执行这段代码") logger.info(f"assert {a} == {b}, {msg} 执行这段代码")
print(f"assert {a} == {b}, {msg} 执行这段代码")
assert a == b, msg assert a == b, msg
@@ -64,17 +96,12 @@ def validate_not_contains(a, b, msg):
if __name__ == '__main__': if __name__ == '__main__':
mock_case = { resp=None
"validate": { mock_case = [
"equals": { {"check": 100, "expect": 100, "assert": "equals"},
"判断相等": ["Success.", "Success."] {"check": "success", "expect": "success", "assert": "contains"}
}, ]
"not_equals": { final_validate_list = VALIDATE_LIST_ADAPTER.validate_python(mock_case)
"判断不相等": ["Success.", "Suc."]
}
}
}
case_validator = CaseValidator() case_validator = CaseValidator()
print(case_validator.VALIDATORS) print(case_validator.VALIDATORS)
case_validator.assert_all(mock_case.get("validate")) case_validator.validate(resp,final_validate_list)

View File

@@ -11,8 +11,8 @@
""" """
from pathlib import Path from pathlib import Path
from commons.templates import Template from core.templates import Template
from commons.file_processors.file_handle import FileHandle from commons.file_processors.yaml_processor import YamlProcessor as FileHandle
class DataDriver: class DataDriver:
@@ -36,7 +36,7 @@ class DataDriver:
if __name__ == '__main__': if __name__ == '__main__':
file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml") file_path = Path(r"D:\CNWei\CNW\InterfaceAutoTest\test_cases\answer\test_1_status.yaml")
file_obj = FileHandle(file_path) file_obj = FileHandle(file_path)
print(file_path.stem) print(file_path.stem)

32
utils/dirs_manager.py Normal file
View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com
@file: dirs_manager
@date: 2026/2/3 10:52
@desc:
"""
from pathlib import Path
from core.settings import REQUIRED_DIRS
def ensure_dirs_ok():
"""
统一管理项目目录的创建逻辑
"""
for folder in REQUIRED_DIRS:
# 使用 exist_ok=True 避免并发冲突
folder.mkdir(parents=True, exist_ok=True)
def ensure_dir(path: Path) -> Path:
"""确保路径存在并返回路径本身"""
if not isinstance(path, Path):
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path

48
utils/report_handler.py Normal file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com
@file: report_handler
@date: 2026/2/3 13:51
@desc:
"""
import logging
import subprocess
import shutil
from core.settings import ALLURE_TEMP, REPORT_DIR
logger = logging.getLogger(__name__)
def generate_allure_report() -> bool:
"""
将 JSON 原始数据转换为 HTML 报告
"""
if not ALLURE_TEMP.exists() or not any(ALLURE_TEMP.iterdir()):
logger.warning("未发现 Allure 测试数据,跳过报告生成。")
return False
# 检查环境是否有 allure 命令行工具
if not shutil.which("allure"):
logger.error("系统未安装 Allure 命令行工具请先安装https://allurereport.org/docs/")
return False
try:
logger.info("正在生成 Allure HTML 报告...")
# --clean 会清理掉 REPORT_DIR 里的旧报告
subprocess.run(
f'allure generate "{ALLURE_TEMP}" -o "{REPORT_DIR}" --clean',
shell=True,
check=True,
capture_output=True,
text=True
)
logger.info(f"Allure 报告已生成至: {REPORT_DIR}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Allure 报告生成失败: {e.stderr}")
return False