11 Commits

Author SHA1 Message Date
300b5a92d4 refactor(): 优化测试用例数据的处理,优化代码结构
- 新增用例生成器和注册器
- 优化文件处理
2025-06-03 21:42:57 +08:00
2e9f1c12f7 feat,fix(): 优化funcs注册函数
- 优化 register 方法
2025-04-07 16:35:14 +08:00
4324cf37aa feat,fix(): 优化
- 优化 settings(使用环境变量)
- 修复bug
2025-03-19 17:03:13 +08:00
a50e00a4e1 refactor(): 优化文件读取,变量替换等
- 优化用例加载模块器
- 新增JSON文件读取模块
2025-03-09 17:23:25 +08:00
914b0301ba feat,fix(): 优化项目
- 优化yaml_processor(优化文件类型转换逻辑)
- 修复bug
2025-03-07 17:28:41 +08:00
a6996ed500 feat,fix(): 优化项目
- 优化yaml_processor(优化文件类型转换逻辑)
- 修复bug
2025-03-06 17:37:00 +08:00
31fad3f4e1 refactor(cases,yaml_processor): 优化测试用例加载功能以及文件加载功能
- 优化用例加载模块器
- 优化yaml文件读取模块
2025-03-06 00:26:43 +08:00
b8903798b8 refactor(files): 优化项目
- 重构files
- 新增yaml_processor(优化读取文件逻辑)
- 修复bug
2025-03-05 18:11:28 +08:00
698a95ac83 feat(funcs): 优化函数热加载
- 优化函数热加载模块funcs.py(由字典反射改为装饰器)
- 修复bug
2025-03-02 21:47:04 +08:00
1890918312 refactor(models): 优化项目
- 重构assert_all
- 优化目录结构
2025-02-28 17:48:20 +08:00
bc55dffe40 feat(): 优化项目
- 更新README
- 修复bug
2025-02-26 17:25:37 +08:00
28 changed files with 1213 additions and 243 deletions

5
.gitignore vendored
View File

@@ -2,4 +2,7 @@
.idea/
.venv/
poetry.lock
.pytest_cache/
.pytest_cache/
report/
temp/
logs/

View File

@@ -9,8 +9,26 @@
...
## 环境搭建
1安装JAVA
- 配置环境变量
```text
JAVA_HOME
java的安装路径
CLASSPATH
%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar
添加Path
%JAVA_HOME%\bin
%JAVA_HOME%\jre\bin
```
2安装allure
- 配置环境变量
```text
添加Path
allure安装目录\bin
```
...
## 使用方法

View File

@@ -0,0 +1,31 @@
feature: 页面状态
story: 状态
title: 查询状态信息
request:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
extract: # 提取变量
msg:
- "json"
- "$.msg"
- 0
validate:
equals: # 断言相等
状态码等于200:
- Success.
- ${msg}
#parametrize: # 数据驱动测试
# - [ "title","username","password","msg" ] # 变量名
# - [ "测试1","user1","pass1","200" ] # 变量值
# - [ "测试2","user2","pass2","300" ] # 变量值
# - [ "测试3","user3","pass3","200" ] # 变量值
# - [ "测试4","user4","pass4","200" ] # 变量值

View File

@@ -0,0 +1,65 @@
{
"epic": "项目名称answer",
"feature": "页面状态",
"story": "状态",
"title": "查询状态信息",
"request": {
"method": "get",
"url": "/answer/api/v1/connector/info",
"headers": {
"Host": "119.91.19.171:40065",
"Accept-Language": "en_US",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
"Referer": "http://119.91.19.171:40065/users/login",
"Accept-Encoding": "gzip, deflate"
}
},
"extract": {
"msg": [
"json",
"$.msg",
0
]
},
"validate": {
"equals": {
"状态码等于200": [
"Success.",
"Success."
]
}
},
"parametrize": [
[
"title",
"username",
"password",
"msg"
],
[
"测试1",
"user1",
"pass1",
"200"
],
[
"测试2",
"user2",
"pass2",
"300"
],
[
"测试3",
"user3",
"pass3",
"200"
],
[
"测试4",
"user4",
"pass4",
"200"
]
]
}

80
commons/case_handler.py Normal file
View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_handler
@date: 2025/5/26 22:13
@desc:
"""
import json
import logging
from dataclasses import dataclass, asdict
from commons.models import TestCaseModel
logger = logging.getLogger(__name__)
@dataclass
class TestCaseHandle(TestCaseModel):
@classmethod
def new(cls, testcase: dict) -> 'TestCaseHandle':
try:
instance = cls(**testcase)
return instance
except (TypeError, ValueError) as e:
logger.warning(f"解析错误:{e}")
raise e
def to_string(self) -> str:
"""
将 字典 转换为 json 格式的字符串。
:return:
json 格式的字符串。
"""
try:
res = json.dumps(asdict(self), ensure_ascii=False)
return res
except TypeError as e:
logger.error(f"将数据转换为 json 字符串时出错: {e}")
raise e
@staticmethod
def to_dict(json_str: str) -> dict:
"""
将 json 格式的字符串转换为 字典.
:param
json_str: json 格式的字符串。
:return:
"""
try:
res = json.loads(json_str)
return res
except json.JSONDecodeError as e:
logger.error(f"将 json 字符串转换为字典时出错: {e}")
raise e
if __name__ == '__main__':
from pathlib import Path
from commons.file_processors import processor_factory
test_data = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml")
yaml_data = processor_factory.get_processor_class(test_data)
case_info = TestCaseHandle.new(yaml_data.load())
print(case_info.to_string())
print(type(case_info.to_string()))
print(case_info.to_dict(case_info.to_string()))
print(type(case_info.to_dict(case_info.to_string())))
print(type(case_info))
print(case_info.parametrize)
for i in case_info.parametrize:
print(i)

View File

@@ -9,88 +9,223 @@
@date: 2024 2024/9/16 9:57
@desc: 动态生成用例
"""
from dataclasses import asdict
from pathlib import Path
import logging
from typing import Union, Generator, Type
from unittest import TestCase
import allure
import pytest
from commons.files import YamlFile
from commons.models import CaseInfo
from commons import settings
from commons.file_processors.processor_factory import get_processor_class
# from commons.models import CaseInfo
from commons.session import Session
from commons.exchange import Exchange
from commons import settings
from commons.templates import Template
from commons.case_handler import TestCaseHandle
from utils import case_validator
logger = logging.getLogger(__name__)
session = Session(settings.base_url)
cases_dir = Path(settings.cases_dir)
_case_path = Path(settings.case_path)
exchanger = Exchange(settings.exchanger)
@allure.epic("项目名称answer")
class TestAPI:
...
@classmethod
def find_yaml_case(cls, case_path: Path = _case_path):
"""
搜索和加载yaml文件
:return:
"""
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for yaml_path in yaml_path_list:
logger.info(f"load file {yaml_path=}")
def run(cls, testcase_dir: Union[Path, str] = cases_dir):
for fp in CaseFinder(testcase_dir).find_testcases():
print(fp.name)
case = CaseGenerator(fp).generate_testcases()
print(f"{case=}")
for i in case:
print(f"{i=}")
CaseRegister(cls).register_test_func(i)
# @classmethod
# def find_test_cases(cls, case_dir: Path = cases_dir):
# """
# 搜索和加载yaml文件
# :return:
# """
# case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
# for case_path in case_path_list:
# logger.info(f"加载文件:{case_path}")
#
# file = FileHandle(case_path) # 自动读取yaml文件
# try:
# CaseInfo(**file) # 校验用例格式
# logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
# case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# # print(case_path.stem)
# setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
# except Exception as e:
# logger.error(e)
#
# @classmethod
# def new_case(cls, file_name, case_info: dict):
# test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
#
# keys_list = list(test_case.keys())
# logger.info(f"keys_list{keys_list}")
#
# values_list = list(test_case.values())
# logger.info(f"测试用例列表:{values_list}")
#
# driver_title = [i.get("title") for i in values_list]
# logger.info(f"driver_title={driver_title}")
#
# epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
# logger.info(f"epic{epic}")
#
# feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
# logger.info(f"feature{feature}")
#
# story = case_info["story"] if case_info["story"] else settings.allure_story
# logger.info(f"story{story}")
#
# @allure.epic(epic)
# @allure.feature(feature)
# @allure.story(story)
# @pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
# def test_func(self, case_key):
# logger.info(f"case_key{case_key}")
#
# test_case_mapping = test_case.get(case_key)
# logger.info(f"测试用例:{test_case_mapping}")
#
# allure.dynamic.title(test_case_mapping.get("title"))
#
# logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
#
# # 0变量替换
# new_case_info = exchanger.replace(test_case_mapping)
# logger.info(f"1正在注入变量...")
# logger.info(f"new_case_info{new_case_info}")
# # 1发送请求
# logger.info(f"2正在请求接口...")
# resp = session.request(**new_case_info.get("request"))
#
# logger.info(f"3正在提取变量...")
# # 2保存变量(接口关联)
# for var_name, extract_info in new_case_info.get("extract").items():
# logger.info(f"保存变量:{var_name}{extract_info}")
# exchanger.extract(resp, var_name, *extract_info)
# # 3断言
# logger.info(f"4正在断言...")
# assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}")
# # assert_case_info.assert_all() # 执行断言
# _validator = case_validator.CaseValidator()
# _validator.assert_all(assert_case_info.get("validate"))
#
# logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
#
# return test_func
file = YamlFile(yaml_path) # 自动读取yaml文件
case_info = CaseInfo(**file) # 校验yaml格式
logger.debug(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
class CaseFinder:
find_suffix: str = settings.test_suffix
case_func = cls.new_case(case_info) # 从yaml格式转换为pytest格式
print(yaml_path.name)
setattr(cls, f"{yaml_path.name}", case_func) # 把pytest格式添加到类中
def __init__(self, testcase_dir: Union[str, Path]):
if Path(testcase_dir).is_dir():
self.testcase_dir: Path = Path(testcase_dir)
else:
raise FileNotFoundError("不是有效的目录")
@classmethod
def new_case(cls, case_info: CaseInfo):
ddt_data = case_info.ddt()
print(ddt_data)
ddt_title = [data.title for data in ddt_data]
def find_testcases(self) -> Generator[Path, None, None]:
testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}")
for fp in testcase_files:
logger.info(f"加载文件:{fp}")
yield fp
@allure.feature(case_info.feature)
@allure.story(case_info.story)
@pytest.mark.parametrize("case_info", ddt_data, ids=ddt_title)
def test_func(self, case_info: CaseInfo):
allure.dynamic.title(case_info.title)
logger.info(f"用例开始执行:{case_info.title}".center(80, "="))
class CaseGenerator:
# 0变量替换
new_case_info = exchanger.replace(case_info)
logger.info(f"1正在注入变量...")
def __init__(self, fp: Union[str, Path]):
self.fp: Path = Path(fp)
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.request)
def generate_testcases(self) -> Generator[dict, None, None]:
file_name = self.fp.stem
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.extract.items():
print(var_name, extract_info)
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(case_info) # 为断言加载变量
print(assert_case_info)
assert_case_info.assert_all() # 执行断言
case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件
case_info = TestCaseHandle.new(case_info_)
logger.info(f"用例执行结束:{case_info.title}".center(80, "="))
if not case_info.parametrize:
yield {file_name + "__": asdict(case_info)}
else:
cases = {}
args_names = case_info.parametrize[0]
for i, args_values in enumerate(case_info.parametrize[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
print(context)
# rendered = Template(FileHandle.to_string(case_info)).render(context)
rendered = Template(case_info.to_string()).render(context)
# cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)})
return test_func
yield cases
class CaseRegister:
def __init__(self, register: Type["TestAPI"]):
self.register: Type["TestAPI"] = register
def register_test_func(self, case: dict):
for test_filed_name, case_info in case.items():
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
def register_func(instance, testcase=case_info):
# allure.dynamic.epic(epic)
# allure.dynamic.feature(feature)
# allure.dynamic.story(story)
allure.dynamic.title(testcase.get("title"))
logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(testcase)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(testcase) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "="))
# return test_func
setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中
# TestAPI.find_yaml_case()
if __name__ == '__main__':
TestAPI.find_yaml_case()
# print(TestAPI.__dict__)
TestAPI.run(cases_dir)
print(TestAPI.__dict__)

View File

@@ -33,13 +33,13 @@ class DBServer:
db = DBServer(
host=settings.db_host, # ip
port=3306, # 端口
user='root', # 用户名
password='mysql_hNahSe', # 密码
database='answer' # 库名
port=settings.db_port, # 端口
user=settings.db_user, # 用户名
password=settings.db_password, # 密码
database=settings.db_database # 库名
)
if __name__ == '__main__':
...
res = db.execute_sql('select username from user where id=1;')
print(res[0])
# res = db.execute_sql('select username from user where id=1;')
# print(res[0])

View File

@@ -13,33 +13,33 @@ import copy
import json
import logging
import re
import jsonpath
import allure
from commons.templates import Template
import jsonpath
from commons.files import YamlFile
from commons.models import CaseInfo
from commons.file_processors.processor_factory import get_processor_class
from tests.b import TestCaseHandle
logger = logging.getLogger(__name__)
class Exchange:
def __init__(self, path):
self.file = YamlFile(path)
self.file = get_processor_class(path)
@allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index):
# resp中json是方法不是属性需要手动更改为属性
def extract(self, resp, var_name, attr, expr: str, index) -> None:
resp = copy.deepcopy(resp)
try:
# resp中json是方法不是属性需要手动更改为属性
resp.json = resp.json()
except json.decoder.JSONDecodeError:
resp.json = {"msg": "is not json data"}
data = getattr(resp, attr)
# print(data)
if expr.startswith("/"): # xpath
res = None
elif expr.startswith("$"): # jsonpath
@@ -53,20 +53,23 @@ class Exchange:
else: # 如果没有数据
value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
data = self.file.load()
data[var_name] = value # 保存变量
self.file.save(data) # 持久化存储到文件
self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件
@allure.step("替换变量")
def replace(self, case_info: CaseInfo):
...
def replace(self, case_info: dict) -> dict:
logger.info(f"变量替换:{case_info}")
# 1将case_info转换为字符串
case_info_str = case_info.to_yaml()
data = TestCaseHandle(**case_info)
case_info_str = data.to_string()
print(f"{case_info_str=}")
# 2替换字符串
case_info_str = Template(case_info_str).render(self.file)
case_info_str = Template(case_info_str).render(self.file.load())
print(f"{case_info_str=}")
# 3将字符串转换成case_info
new_case_info = case_info.by_yaml(case_info_str)
new_case_info = data.to_dict(case_info_str)
return new_case_info
@@ -87,14 +90,24 @@ if __name__ == '__main__':
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0)
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
case_info = CaseInfo(
title="单元测试",
request={
# mock_case_info = CaseInfo(
# title="单元测试",
# request={
# "data":
# {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
# },
# extract={},
# validate={}
# )
mock_case_info = {
"title": "单元测试",
"request": {
"data":
{"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
},
extract={},
validate={}
)
new_case_info = exchanger.replace(case_info)
print(new_case_info)
"extract": {},
"validate": {}
}
new_mock_case_info = exchanger.replace(mock_case_info)
print(new_mock_case_info)

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: __init__.py
@date: 2025/3/4 17:23
@desc:
"""
from .base_processor import BaseFileProcessor
from .json_processor import JsonProcessor
from .yaml_processor import YamlProcessor
from .processor_factory import get_processor_class
__all__ = [
"BaseFileProcessor",
"JsonProcessor",
"YamlProcessor",
"get_processor_class",
]

View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: base
@date: 2025/3/4 17:23
@desc:
"""
import abc
from pathlib import Path
from typing import Union
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
"""
文件处理器的抽象基类。
定义了所有子类必须实现的方法。
"""
def __init__(self, filepath: Union[str, Path], **kwargs):
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
@abc.abstractmethod
def load(self) -> dict:
"""加载."""
raise NotImplementedError
@abc.abstractmethod
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""将数据保存."""
raise NotImplementedError

View File

@@ -0,0 +1,86 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: yaml_processor
@date: 2025/3/4 17:28
@desc:
"""
import logging
from typing import Union, Any
from pathlib import Path
import json
from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__)
class JsonProcessor(BaseFileProcessor):
"""
用于处理 JSON 文件的类。
提供了从文件加载 JSON 数据为字典,以及将字典保存为 JSON 文件的功能。
"""
def __init__(self, filepath: Union[str, Path], **kwargs):
"""
初始化 JsonFile 对象。
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
"""
super().__init__(filepath, **kwargs)
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> dict[str, Any]:
"""
从 Json 文件加载数据。
:return:
"""
if not self.filepath.exists():
logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = json.load(f)
if not isinstance(loaded_data, dict): # 确保加载的是字典
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
return loaded_data
except json.JSONDecodeError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
raise e
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""
将字典数据保存到 json 文件。
Args:
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(
data,
f,
ensure_ascii=False, # 允许非ASCII字符
sort_keys=False # 不排序键
)
logger.info(f"数据已成功保存到 {filepath}")
except (TypeError, OSError, json.JSONDecodeError) as e:
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
raise e
if __name__ == '__main__':
# 示例用法
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
json_file = JsonProcessor(json_path)
print(json_file.load())
print(type(json_file))
# json_file.save()

View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from pathlib import Path
from typing import Type, Union
from commons.file_processors.base_processor import BaseFileProcessor
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
# 类型别名,表示处理器类的字典
ProcessorMap = dict[str, Type[BaseFileProcessor]]
processors: ProcessorMap = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
class UnsupportedFileTypeError(Exception):
"""当文件类型不被支持时抛出此异常。"""
pass
# def get_processor_class(file_suffix: str = "yaml") -> Type[BaseFileProcessor]:
def get_processor_class(fp: Union[Path, str]) -> 'BaseFileProcessor':
fp = Path(fp)
if fp.is_file():
file_suffix = fp.suffix[1:]
processor_class = processors.get(file_suffix.lower(), YamlProcessor) # 代理模式
return processor_class(fp) # 默认回退到 Yaml
else:
raise UnsupportedFileTypeError(fp)
# FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
# yaml_file = FileHandle(yaml_path)
# print(yaml_file.load())
# print(type(yaml_file))
# file_suffix = Path(yaml_path).suffix[1:]
# print(file_suffix)
get_processor = get_processor_class(yaml_path)
print(get_processor.load())

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: yaml_processor
@date: 2025/3/4 17:28
@desc:
"""
import logging
from typing import Union
from pathlib import Path
import yaml
from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__)
class YamlProcessor(BaseFileProcessor):
"""
用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。
"""
def __init__(self, filepath: Union[str, Path], **kwargs):
"""
初始化 YamlFile 对象。
Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
"""
super().__init__(filepath, **kwargs)
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> dict:
"""
从 YAML 文件加载数据
:return:
"""
if not self.filepath.exists():
logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f)
if not isinstance(loaded_data, dict): # 确保加载的是字典
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
return loaded_data
except yaml.YAMLError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
raise e
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""
将字典数据保存到 YAML 文件。
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
# 确保目标目录存在
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, "w", encoding="utf-8") as f:
yaml.safe_dump(
data,
stream=f,
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
logger.info(f"数据已成功保存到 {filepath}")
except (TypeError, OSError, yaml.YAMLError) as e:
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
raise e
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlProcessor(yaml_path)
print(yaml_file.load())
print(type(yaml_file))
# # 直接像字典一样访问数据
# print("加载的数据:", yaml_file) # 直接打印对象,就是打印字典内容
# print("title:", yaml_file.get("title")) # 使用 get 方法
# if "title" in yaml_file: # 使用 in 检查键
# print("原始title:", yaml_file["title"]) # 使用方括号访问
# yaml_file["title"] = "新的标题" # 使用方括号修改
# print("修改后的title:", yaml_file["title"])
# #
# yaml_file["new_key"] = "new_value" # 添加新的键值对
#
# # 将字典转换为 YAML 字符串
# yaml_string = yaml_file.to_string()
# print("\nYAML 字符串:", yaml_string)
# #
# # 将 YAML 字符串转换回字典 (并更新 yaml_file)
# yaml_file.to_dict(yaml_string)
# print("\n从字符串加载的数据:", yaml_file)
#
# # 保存修改后的数据 (覆盖原文件)
# yaml_file.save()
#
# # 保存到新文件
# new_yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user_new.yaml'
# yaml_file.save(new_filepath=new_yaml_path)
# 测试从字符串初始化
# yaml_string2 = """
# name: Test User
# age: 30
# """
# yaml_file2 = YamlFile("test2.yaml", data=yaml.safe_load(yaml_string2)) # 从字符串初始化
# print("\n从字符串初始化的 YamlFile:", yaml_file2)
# yaml_file2.save() # 保存到 test2.yaml
#
# 测试文件不存在的情形
# non_existent_file = YamlFile("non_existent_file.yaml")
# print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {}
# non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file)

View File

@@ -10,34 +10,56 @@
@desc: 读取和保存yaml文件
"""
import logging
from dataclasses import dataclass, asdict, field
from pathlib import Path
import yaml
from commons.models import CaseInfo
logger = logging.getLogger(__name__)
class YamlFile(dict):
def __init__(self, path):
super().__init__()
self.path = path
self.load()
super().__init__() # 初始化父类 dict
self.path = Path(path)
self.load() # 链式初始化加载
def load(self):
with open(self.path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) # 字典
if data:
self.update(data) # 把两个字段的内容合并
if self.path.exists():
with open(self.path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {} # 加载数据,空文件返回空字典
self.clear() # 清空当前实例
self.update(data) # 更新字典内容
else:
logger.warning(f"File {self.path} not found, initialized empty.")
return self # 链式调用
def to_yaml(self) -> str:
return yaml.safe_dump(
dict(self),
allow_unicode=True,
sort_keys=False
)
@classmethod
def by_yaml(cls, yaml_str):
data = yaml.safe_load(yaml_str) or {}
return cls({**data}) # 通过类方法创建实例
def save(self):
with open(self.path, "w", encoding="utf-8") as f:
yaml.safe_dump(
dict(self),
dict(self), # 直接 dump 实例本身(已继承 dict
stream=f,
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False) # sort_keys保持原有排序
allow_unicode=True,
sort_keys=False
)
return self # 链式调用
if __name__ == '__main__':
from commons.models import CaseInfo
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml'
yaml_file = YamlFile(yaml_path)
# yaml_file.load()

View File

@@ -17,46 +17,79 @@ import hashlib
from commons.databases import db
# from commons.files import YamlFile
from commons.file_processors.processor_factory import get_processor_class
from commons import settings
logger = logging.getLogger(__name__)
class Funcs:
FUNC_MAPPING = {
"int": int,
"float": float,
"bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
@classmethod
def register(cls, name: str | None = None):
def decorator(func):
if name is None:
cls.FUNC_MAPPING[func.__name__] = func
cls.FUNC_MAPPING[name] = func
return func
return decorator
@Funcs.register("url_unquote")
def url_unquote(s: str) -> str:
return urllib.parse.unquote(s)
@Funcs.register("str")
def to_string(s) -> str:
# 将数据转换为str类型。
return f"'{s}'"
@Funcs.register("time_str")
def time_str() -> str:
return str(time.time())
@Funcs.register("add")
def add(a, b):
return str(int(a) + int(b))
@Funcs.register("sql")
def sql(s: str) -> str:
res = db.execute_sql(s)
return res[0][0]
@Funcs.register("new_id")
def new_id():
# 自增,永不重复
id_file = YamlFile(settings.id_path)
id_file["id"] += 1
id_file.save()
id_file = get_processor_class(settings.id_path)
data = id_file.load()
data["id"] += 1
id_file.save(data)
return id_file["id"]
return data["id"]
@Funcs.register("last_id")
def last_id() -> str:
# 不自增,只返回结果
id_file = YamlFile("id.yaml")
return id_file["id"]
id_file = get_processor_class(settings.id_path)
data = id_file.load()
return data["id"]
@Funcs.register("md5")
def md5(content: str) -> str:
# 1原文转为字节
content = content.encode("utf-8")
@@ -64,6 +97,7 @@ def md5(content: str) -> str:
return result
@Funcs.register("base64_encode")
def base64_encode(content: str) -> str:
# 1原文转二进制
content = content.encode("utf-8")
@@ -75,6 +109,7 @@ def base64_encode(content: str) -> str:
return encode_str
@Funcs.register("base64_decode")
def base64_decode(content: str) -> str:
# 1原文转二进制
content = content.encode("utf-8")
@@ -86,19 +121,24 @@ def base64_decode(content: str) -> str:
return decode_str
@Funcs.register("rsa_encode")
def rsa_encode(content: str) -> str:
...
@Funcs.register("rsa_decode")
def rsa_decode(content: str) -> str:
...
@Funcs.register()
def func_name_test():
...
if __name__ == '__main__':
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
# print(res)
a = "这是中文dddddd"
bb = base64_encode(a)
print(bb)
cc = base64_decode(bb)
print(cc)
# print(f"计数器:{new_id()}")
# print(f"当前数值:{last_id()}")
print(Funcs().FUNC_MAPPING)

View File

@@ -10,89 +10,56 @@
@desc: 声明yaml用例格式
"""
import logging
from dataclasses import dataclass, asdict
from typing import Union, Optional
from dataclasses import dataclass, field
import allure
import yaml
from commons.templates import Template
from commons import settings
logger = logging.getLogger(__name__)
@dataclass
class CaseInfo:
class RequestModel:
method: str
url: str
headers: Optional[dict] = None
# body: Optional[Union[dict, str]] = None
params: Optional[Union[dict, str]] = None
@dataclass
class TestCaseModel:
title: str
request: dict
request: RequestModel
extract: dict
validate: dict
parametrize: list = ""
epic: str = settings.allure_epic
feature: str = settings.allure_feature
story: str = settings.allure_story
parametrize: list = field(default_factory=list)
epic: str = field(default_factory=lambda: settings.allure_epic)
feature: str = field(default_factory=lambda: settings.allure_feature)
story: str = field(default_factory=lambda: settings.allure_story)
def to_yaml(self) -> str:
# 序列化成yaml字符串
yaml_str = yaml.safe_dump(
asdict(self),
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False)
return yaml_str
def __post_init__(self):
# 必填字段非空校验
if self.title is None:
raise ValueError("Title cannot be empty")
@classmethod
def by_yaml(cls, yaml_str):
# 反序列化
obj = cls(**yaml.safe_load(yaml_str))
return obj
@allure.step("断言")
def assert_all(self):
if not self.validate:
return
for assert_type, assert_value in self.validate.items():
for msg, data in assert_value.items():
a, b = data[0], data[1]
# print(assert_type, a, b, msg)
match assert_type:
case 'equals':
logger.info(f"assert {a} == {b}, {msg}")
assert a == b, msg
case 'not_equals':
logger.info(f"assert {a} != {b}, {msg}")
assert a != b, msg
case 'contains':
logger.info(f"assert {a} in {b}, {msg}")
assert a in b, msg
case 'not_contains':
logger.info(f"assert {a} not in {b}, {msg}")
assert a not in b, msg
# case "xxxxx
def ddt(self) -> list: # 返回一个列表列表中应该包含N个注入了变量的caseInfo
case_list = []
if not self.parametrize: # 没有使用数据驱动测试
case_list.append('')
else: # 使用数据驱动测试
args_name = self.parametrize[0]
args_value_list = self.parametrize[1:]
for args_value in args_value_list:
d = dict(zip(args_name, args_value))
# d 就是数据驱动测试的变量,应输入到用例中
case_info_str = self.to_yaml() # 转字符串
case_info_str = Template(case_info_str).render(d) # 输入变量
case_info = self.by_yaml(case_info_str) # 转成类
case_list.append(case_info) # 加入到返回值
return case_list
# 校验RequestModel
if isinstance(self.request, dict):
try:
self.request = RequestModel(**self.request) # RequestModel 的 __post_init__ 会被调用
except (TypeError, ValueError) as e:
raise ValueError(f"解析 'request' 字段失败: {e} (数据: {self.request})") from e
elif not isinstance(self.request, RequestModel): # 如果不是 dict 也不是 RequestModel
raise TypeError(
f"字段 'request' 必须是字典 (将在内部转换为 RequestModel) 或 RequestModel 实例, "
f"得到的是 {type(self.request).__name__}"
)
if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml', encoding='utf-8') as f:
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
# print(data)
case_info = CaseInfo(**data)
s = case_info.to_yaml()
print(s)
new_case_info = case_info.by_yaml(s)
print(new_case_info)
case_info = TestCaseModel(**data)

View File

@@ -9,14 +9,16 @@
@date: 2024 2024/9/12 21:56
@desc:
"""
from urllib.parse import urljoin
import logging
from urllib.parse import urljoin
import requests
import allure
from requests import Response, PreparedRequest
import allure
logger = logging.getLogger("requests.session")
# logger = logging.getLogger("requests.session")
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
@@ -41,7 +43,8 @@ class Session(requests.Session):
logger.info(f"接收响应 <<<<<< 状态码 = {resp.status_code}")
logger.info(f"接收响应 <<<<<< 响应头 = {resp.headers}")
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.text}")
# logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
return resp

View File

@@ -9,16 +9,26 @@
@date: 2025/2/23 21:34
@desc:
"""
base_url = 'http://127.0.0.1:8000'
case_path = r"E:\PyP\InterfaceAutoTest\TestCases"
exchanger = r"E:\PyP\InterfaceAutoTest\extract.yaml"
id_path =r"E:\PyP\InterfaceAutoTest\id.yaml"
from pathlib import Path
import os
from dotenv import load_dotenv
db_host = '119.91.19.171' # ip
db_port = 3306 # 端口
db_user = 'root' # 用户名
db_password = 'mysql_hNahSe' # 密码
db_database = 'answer' # 库名
load_dotenv()
root_path = (Path(__file__)).resolve().parents[1]
base_url = os.getenv("BASE_URL")
cases_dir = rf"{root_path}\TestCases\answer"
exchanger = rf"{root_path}\extract.yaml"
id_path = rf"{root_path}\id.yaml"
test_suffix = "yaml"
db_host = os.getenv("DB_HOST") # ip
db_port = os.getenv("DB_PORT") # 端口
db_user = os.getenv("DB_USER") # 用户名
db_password = os.getenv("DB_PASSWORD") # 密码
db_database = os.getenv("DB_DATABASE")
allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature"
@@ -26,3 +36,7 @@ allure_story: str = "默认事件story"
rsa_public = ""
rsa_private = ""
if __name__ == '__main__':
print(root_path)
print(base_url,db_host,db_port,db_user,db_password,db_database)

View File

@@ -13,26 +13,17 @@ import copy
import logging
import re
import string
from commons.funcs import Funcs
logger = logging.getLogger(__name__)
def _str(s) -> str:
# 将数据转换为str类型。
return f"'{s}'"
class Template(string.Template):
"""
1支持函数调用
2参数也可以是变量
"""
func_mapping = {
"str": _str,
"int": int,
"float": float,
"bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
@@ -50,15 +41,17 @@ class Template(string.Template):
:return: 替换后的结果
"""
mapping = copy.deepcopy(mapping)
mapping.update(self.func_mapping) # 合并两个mapping
logger.info(f"mapping更新前: {mapping}")
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
logger.info(f"mapping更新后: {mapping}")
def convert(mo):
func_name = mo.group("func_name")
func_args = mo.group("func_args").split(",")
func = mapping.get(func_name) # 读取指定函数
func_args_value = [mapping.get(arg, arg) for arg in func_args]
if func_args_value == [""]: # 处理没有参数的func
if func_args_value == [""]: # 处理没有参数的func
func_args_value = []
if not callable(func):
@@ -68,15 +61,3 @@ class Template(string.Template):
return self.call_pattern.sub(convert, template)
def hot_load():
from commons import funcs
for func_name in dir(funcs): # 遍历模块中的所有函数
if func_name.startswith("_"):
continue
func_code = getattr(funcs, func_name) # 取到函数对象
if callable(func_code): # 如果是一个可以调用的函数
Template.func_mapping[func_name] = func_code # 函数放到Template中
hot_load()

View File

@@ -1,3 +1,7 @@
code: 200
msg: 成功。
reason: base.success
name: 张三
age: '18'
data:
- 3
- 4
- 5
aaa: null

View File

@@ -1 +1 @@
"id":0
id: 13

View File

@@ -1,18 +1,54 @@
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_1_user.yaml')'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_2_url.yaml')'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_3_sql.yaml')'
''02/23/2025 10:17:34 PM' [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - ---------------Start: main.py::TestAPI::test_1_user.yaml[查询用户信息0]---------------'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:67 - =================================用例开始执行:查询用户信息=================================='
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:71 - 1正在注入变量...'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:74 - 2正在请求接口...'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login'}'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None '
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Sun, 23 Feb 2025 14:17:34 GMT', 'Content-Length': '64'}'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': '成功。', 'data': []}'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:77 - 3正在提取变量...'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:83 - 4正在断言...'
''02/23/2025 10:17:34 PM' [commons.models] INFO models.assert_all:59 - assert 200 == code1, 状态码等于200'
''02/23/2025 10:17:34 PM' [pytest_result_log] ERROR plugin.pytest_result_log:190 - test status is FAILED (main.py::TestAPI::test_1_user.yaml[查询用户信息0]): AssertionError'
''02/23/2025 10:17:34 PM' [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ----------------End: main.py::TestAPI::test_1_user.yaml[查询用户信息0]----------------'
03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:45 - 加载文件D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml
03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:50 - case_info=title: 查询状态信息
request:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
extract:
msg:
- json
- $.msg
- 0
validate:
equals:
状态码等于200:
- Success.
- ${msg}
parametrize: []
epic: 项目名称answer
feature: 页面状态
story: 状态
03/03/2025 05:34:28 PM [commons.models] INFO models.ddt:81 - 1执行这一步
03/03/2025 05:34:28 PM [commons.cases] INFO cases.new_case:63 - ddt_title=['查询状态信息']
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - -----------------Start: main.py::TestAPI::test_1_status[查询状态信息]-----------------
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:71 - =================================用例开始执行:查询状态信息==================================
03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称answer', feature='页面状态', story='状态')
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12}
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': <class 'int'>, 'float': <class 'float'>, 'bool': <class 'bool'>, 'url_unquote': <function url_unquote at 0x00000299E6AAC0D0>, 'str': <function to_string at 0x00000299E6AAC160>, 'time_str': <function time_str at 0x00000299E6AAC1F0>, 'add': <function add at 0x00000299E6AAC280>, 'sql': <function sql at 0x00000299E6AAC310>, 'new_id': <function new_id at 0x00000299E6AAC3A0>, 'last_id': <function last_id at 0x00000299E6AAC430>, 'md5': <function md5 at 0x00000299E6AAC4C0>, 'base64_encode': <function base64_encode at 0x00000299E6AAC550>, 'base64_decode': <function base64_decode at 0x00000299E6AAC5E0>, 'rsa_encode': <function rsa_encode at 0x00000299E6AAC670>, 'rsa_decode': <function rsa_decode at 0x00000299E6AAC700>}
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:75 - 1正在注入变量...
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:78 - 2正在请求接口...
03/03/2025 05:34:28 PM [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info
03/03/2025 05:34:28 PM [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': 'application/json, text/plain, */*', 'Connection': 'keep-alive', 'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Referer': 'http://119.91.19.171:40065/users/login'}
03/03/2025 05:34:28 PM [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None
03/03/2025 05:34:28 PM [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200
03/03/2025 05:34:28 PM [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Mon, 03 Mar 2025 09:34:29 GMT', 'Content-Length': '63'}
03/03/2025 05:34:28 PM [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': 'Success.', 'data': []}
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:81 - 3正在提取变量...
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:87 - 4正在断言...
03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称answer', feature='页面状态', story='状态')
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12}
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': <class 'int'>, 'float': <class 'float'>, 'bool': <class 'bool'>, 'url_unquote': <function url_unquote at 0x00000299E6AAC0D0>, 'str': <function to_string at 0x00000299E6AAC160>, 'time_str': <function time_str at 0x00000299E6AAC1F0>, 'add': <function add at 0x00000299E6AAC280>, 'sql': <function sql at 0x00000299E6AAC310>, 'new_id': <function new_id at 0x00000299E6AAC3A0>, 'last_id': <function last_id at 0x00000299E6AAC430>, 'md5': <function md5 at 0x00000299E6AAC4C0>, 'base64_encode': <function base64_encode at 0x00000299E6AAC550>, 'base64_decode': <function base64_decode at 0x00000299E6AAC5E0>, 'rsa_encode': <function rsa_encode at 0x00000299E6AAC670>, 'rsa_decode': <function rsa_decode at 0x00000299E6AAC700>}
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:32 - 键equals{'状态码等于200': ['Success.', 'Success.']}
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:34 - 获取到的断言:<function validate_equals at 0x00000299E6AAC940>
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.validate_equals:43 - assert Success. == Success., 状态码等于200执行这段代码
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:92 - =================================用例执行结束:查询状态信息==================================
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_result_log:190 - test status is PASSED (main.py::TestAPI::test_1_status[查询状态信息]):
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ------------------End: main.py::TestAPI::test_1_status[查询状态信息]------------------

View File

@@ -5,14 +5,15 @@ import pytest
from commons.cases import TestAPI
TestAPI.find_yaml_case() # 加载yaml文件
TestAPI.run() # 加载yaml文件
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
# 1启动框架生成临时文件
pytest.main([__file__, "-x", "-v"]) # -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录指定pytest.ini路径
# -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录指定pytest.ini路径;--alluredir=temp。指定数据生成目录
pytest.main([__file__, "-x", "-v","--alluredir=temp"])
# 2生成HTML报告
os.system('allure generate temp -o report --clean') # java程序只能借助操作系统执行
# 3备份日志
shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log")
# shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log")

View File

@@ -16,6 +16,8 @@ jsonpath = "^0.82.2"
pymysql = "^1.1.1"
pytest-result-log = "^1.2.2"
allure-pytest = "^2.13.5"
cryptography = "^44.0.2"
dotenv = "^0.9.9"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -3,8 +3,8 @@ addopts = -q --show-capture=no
log_file = logs/pytest.log
log_file_level = info
log_file_format = '%(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s'
log_file_date_format = '%m/%d/%Y %I:%M:%S %p'
log_file_level = debug
log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s
log_file_date_format = %m/%d/%Y %I:%M:%S %p
disable_test_id_escaping_and_forfeit_all_rights_to_community_support = true

48
utils/case_parser.py Normal file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_parser
@date: 2025/2/27 17:25
@desc:
"""
import logging
from dataclasses import dataclass, asdict, field
import yaml
from commons.models import CaseInfo
class CaseParser:
@staticmethod
def to_yaml(case_data: dict) -> str:
try:
CaseInfo(**case_data)
except TypeError as error:
logging.error(error)
raise error
return yaml.safe_dump(case_data, allow_unicode=True, sort_keys=False)
@staticmethod
def from_yaml(yaml_str: str) -> CaseInfo:
return CaseInfo(**yaml.safe_load(yaml_str))
if __name__ == '__main__':
with open(r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
print(data)
print(type(data))
# print(CaseInfo(**data))
case_parser = CaseParser()
case_data_ = case_parser.to_yaml(data)
# print(case_data_)
# case_parser.from_yaml(case_data_)
# print(type(case_data_))

80
utils/case_validator.py Normal file
View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_validator
@date: 2025/2/27 17:25
@desc:
"""
import logging
logger = logging.getLogger(__name__)
class CaseValidator:
VALIDATORS = {}
@classmethod
def register(cls, name: str):
def decorator(func):
cls.VALIDATORS[name] = func
return func
return decorator
@classmethod
def assert_all(cls, validate: dict):
if not validate:
return
for assert_type, cases in validate.items():
logger.info(f"键:{assert_type},值:{cases}")
validator = cls.VALIDATORS.get(assert_type)
logger.info(f"获取到的断言:{validator}")
if not validator:
raise KeyError(f"Unsupported validator: {assert_type}")
for msg, (a, b) in cases.items():
validator(a, b, msg)
@CaseValidator.register('equals')
def validate_equals(a, b, msg):
logger.info(f"assert {a} == {b}, {msg}执行这段代码")
assert a == b, msg
@CaseValidator.register('not_equals')
def validate_not_equals(a, b, msg):
logger.info(f"assert {a} != {b}, {msg}")
assert a != b, msg
@CaseValidator.register('contains')
def validate_contains(a, b, msg):
logger.info(f"assert {a} in {b}, {msg}")
assert a in b, msg
@CaseValidator.register('not_contains')
def validate_not_contains(a, b, msg):
logger.info(f"assert {a} not in {b}, {msg}")
assert a not in b, msg
if __name__ == '__main__':
mock_case = {
"validate": {
"equals": {
"判断相等": ["Success.", "Success."]
},
"not_equals": {
"判断不相等": ["Success.", "Suc."]
}
}
}
case_validator = CaseValidator()
print(case_validator.VALIDATORS)
case_validator.assert_all(mock_case.get("validate"))

91
utils/data_driver.py Normal file
View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: data_driver
@date: 2025/3/3 10:56
@desc:
"""
from pathlib import Path
from commons.templates import Template
from commons.file_processors.file_handle import FileHandle
class DataDriver:
@staticmethod
def generate_cases(file_name, case_info) -> dict:
if not case_info.get("parametrize"):
return {file_name + "[--]": case_info}
cases = {}
args_names = case_info.get("parametrize")[0]
for i, args_values in enumerate(case_info.get("parametrize")[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
# print(context)
rendered = Template(FileHandle.to_string(case_info)).render(context)
cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
return cases
if __name__ == '__main__':
file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml")
file_obj = FileHandle(file_path)
print(file_path.stem)
file_name_ = file_path.stem
# mock_case_info = {
# "case_info0": {
# "feature": "页面状态",
# "story": "状态",
# "title": "查询状态信息",
# "request": "",
# "extract": "",
# "validate": "",
# "parametrize": [["title", "username", "password", "msg"], ["测试1", "user1", "pass1", "200"],
# ["测试2", "user2", "pass2", "300"]]
# },
# "case_info1": {
# "feature": "页面状态",
# "story": "状态",
# "title": "查询状态信息",
# "request": "",
# "extract": "",
# "validate": "",
# "parametrize": [1, 2, 3]
# },
# "case_info2": {
# "feature": "页面状态",
# "story": "状态",
# "title": "查询状态信息",
# "request": "",
# "extract": "",
# "validate": "",
# "parametrize": [1, 2, 3]
# }
#
# }
dd = DataDriver()
# cases = dd.generate_cases(mock_case_info.get("case_info0"))
cases_ = dd.generate_cases(file_name_, file_obj)
print(cases_)
case_keys = list(cases_.keys())
case_values = cases_.values()
print(case_keys)
print(case_values)
aa = [i.get("title") for i in case_values]
print(aa)
# print(list(case_values)[0]["feature"])
print(file_obj["feature"])
# print(list(case_values)[0]["story"])
print(file_obj["story"])