1 Commits

Author SHA1 Message Date
ffbde1158b feat,fix(): 优化
- 优化 settings(使用环境变量)
- 修复bug
2025-03-19 16:53:21 +08:00
17 changed files with 307 additions and 494 deletions

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@ poetry.lock
report/ report/
temp/ temp/
logs/ logs/
.env

View File

@@ -1,80 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_handler
@date: 2025/5/26 22:13
@desc:
"""
import json
import logging
from dataclasses import dataclass, asdict
from commons.models import TestCaseModel
logger = logging.getLogger(__name__)
@dataclass
class TestCaseHandle(TestCaseModel):
@classmethod
def new(cls, testcase: dict) -> 'TestCaseHandle':
try:
instance = cls(**testcase)
return instance
except (TypeError, ValueError) as e:
logger.warning(f"解析错误:{e}")
raise e
def to_string(self) -> str:
"""
将 字典 转换为 json 格式的字符串。
:return:
json 格式的字符串。
"""
try:
res = json.dumps(asdict(self), ensure_ascii=False)
return res
except TypeError as e:
logger.error(f"将数据转换为 json 字符串时出错: {e}")
raise e
@staticmethod
def to_dict(json_str: str) -> dict:
"""
将 json 格式的字符串转换为 字典.
:param
json_str: json 格式的字符串。
:return:
"""
try:
res = json.loads(json_str)
return res
except json.JSONDecodeError as e:
logger.error(f"将 json 字符串转换为字典时出错: {e}")
raise e
if __name__ == '__main__':
from pathlib import Path
from commons.file_processors import processor_factory
test_data = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml")
yaml_data = processor_factory.get_processor_class(test_data)
case_info = TestCaseHandle.new(yaml_data.load())
print(case_info.to_string())
print(type(case_info.to_string()))
print(case_info.to_dict(case_info.to_string()))
print(type(case_info.to_dict(case_info.to_string())))
print(type(case_info))
print(case_info.parametrize)
for i in case_info.parametrize:
print(i)

View File

@@ -9,23 +9,18 @@
@date: 2024 2024/9/16 9:57 @date: 2024 2024/9/16 9:57
@desc: 动态生成用例 @desc: 动态生成用例
""" """
from dataclasses import asdict
from pathlib import Path from pathlib import Path
import logging import logging
from typing import Union, Generator, Type
from unittest import TestCase
import allure import allure
import pytest import pytest
from commons import settings from commons import settings
from commons.file_processors.processor_factory import get_processor_class from commons.file_processors.file_handle import FileHandle
# from commons.models import CaseInfo from commons.models import CaseInfo
from commons.session import Session from commons.session import Session
from commons.exchange import Exchange from commons.exchange import Exchange
from commons.templates import Template from utils import data_driver, case_validator
from commons.case_handler import TestCaseHandle
from utils import case_validator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -37,195 +32,90 @@ exchanger = Exchange(settings.exchanger)
class TestAPI: class TestAPI:
@classmethod @classmethod
def run(cls, testcase_dir: Union[Path, str] = cases_dir): def find_test_cases(cls, case_dir: Path = cases_dir):
for fp in CaseFinder(testcase_dir).find_testcases(): """
print(fp.name) 搜索和加载yaml文件
case = CaseGenerator(fp).generate_testcases() :return:
print(f"{case=}") """
for i in case: case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
print(f"{i=}") for case_path in case_path_list:
CaseRegister(cls).register_test_func(i) logger.info(f"加载文件:{case_path}")
# @classmethod
# def find_test_cases(cls, case_dir: Path = cases_dir):
# """
# 搜索和加载yaml文件
# :return:
# """
# case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
# for case_path in case_path_list:
# logger.info(f"加载文件:{case_path}")
#
# file = FileHandle(case_path) # 自动读取yaml文件
# try:
# CaseInfo(**file) # 校验用例格式
# logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
# case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# # print(case_path.stem)
# setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
# except Exception as e:
# logger.error(e)
#
# @classmethod
# def new_case(cls, file_name, case_info: dict):
# test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
#
# keys_list = list(test_case.keys())
# logger.info(f"keys_list{keys_list}")
#
# values_list = list(test_case.values())
# logger.info(f"测试用例列表:{values_list}")
#
# driver_title = [i.get("title") for i in values_list]
# logger.info(f"driver_title={driver_title}")
#
# epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
# logger.info(f"epic{epic}")
#
# feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
# logger.info(f"feature{feature}")
#
# story = case_info["story"] if case_info["story"] else settings.allure_story
# logger.info(f"story{story}")
#
# @allure.epic(epic)
# @allure.feature(feature)
# @allure.story(story)
# @pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
# def test_func(self, case_key):
# logger.info(f"case_key{case_key}")
#
# test_case_mapping = test_case.get(case_key)
# logger.info(f"测试用例:{test_case_mapping}")
#
# allure.dynamic.title(test_case_mapping.get("title"))
#
# logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
#
# # 0变量替换
# new_case_info = exchanger.replace(test_case_mapping)
# logger.info(f"1正在注入变量...")
# logger.info(f"new_case_info{new_case_info}")
# # 1发送请求
# logger.info(f"2正在请求接口...")
# resp = session.request(**new_case_info.get("request"))
#
# logger.info(f"3正在提取变量...")
# # 2保存变量(接口关联)
# for var_name, extract_info in new_case_info.get("extract").items():
# logger.info(f"保存变量:{var_name}{extract_info}")
# exchanger.extract(resp, var_name, *extract_info)
# # 3断言
# logger.info(f"4正在断言...")
# assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}")
# # assert_case_info.assert_all() # 执行断言
# _validator = case_validator.CaseValidator()
# _validator.assert_all(assert_case_info.get("validate"))
#
# logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
#
# return test_func
file = FileHandle(case_path) # 自动读取yaml文件
try:
CaseInfo(**file) # 校验用例格式
logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# print(case_path.stem)
setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
except Exception as e:
logger.error(e)
class CaseFinder: @classmethod
find_suffix: str = settings.test_suffix def new_case(cls, file_name, case_info: dict):
test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
def __init__(self, testcase_dir: Union[str, Path]): keys_list = list(test_case.keys())
if Path(testcase_dir).is_dir(): logger.info(f"keys_list{keys_list}")
self.testcase_dir: Path = Path(testcase_dir)
else:
raise FileNotFoundError("不是有效的目录")
def find_testcases(self) -> Generator[Path, None, None]: values_list = list(test_case.values())
testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}") logger.info(f"测试用例列表:{values_list}")
for fp in testcase_files:
logger.info(f"加载文件:{fp}")
yield fp
driver_title = [i.get("title") for i in values_list]
logger.info(f"driver_title={driver_title}")
class CaseGenerator: epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
def __init__(self, fp: Union[str, Path]): feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
self.fp: Path = Path(fp) logger.info(f"feature{feature}")
def generate_testcases(self) -> Generator[dict, None, None]: story = case_info["story"] if case_info["story"] else settings.allure_story
file_name = self.fp.stem logger.info(f"story{story}")
case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件 @allure.epic(epic)
case_info = TestCaseHandle.new(case_info_) @allure.feature(feature)
@allure.story(story)
@pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
def test_func(self, case_key):
logger.info(f"case_key{case_key}")
if not case_info.parametrize: test_case_mapping = test_case.get(case_key)
yield {file_name + "__": asdict(case_info)} logger.info(f"测试用例:{test_case_mapping}")
else:
cases = {}
args_names = case_info.parametrize[0]
for i, args_values in enumerate(case_info.parametrize[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
print(context)
# rendered = Template(FileHandle.to_string(case_info)).render(context)
rendered = Template(case_info.to_string()).render(context)
# cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)})
yield cases allure.dynamic.title(test_case_mapping.get("title"))
logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
class CaseRegister: # 0变量替换
def __init__(self, register: Type["TestAPI"]): new_case_info = exchanger.replace(test_case_mapping)
self.register: Type["TestAPI"] = register logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
def register_test_func(self, case: dict): logger.info(f"3正在提取变量...")
for test_filed_name, case_info in case.items(): # 2保存变量(接口关联)
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"epic{epic}") logger.info(f"保存变量{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
logger.info(f"feature{feature}")
story = case_info["story"] if case_info["story"] else settings.allure_story return test_func
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
def register_func(instance, testcase=case_info):
# allure.dynamic.epic(epic)
# allure.dynamic.feature(feature)
# allure.dynamic.story(story)
allure.dynamic.title(testcase.get("title"))
logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(testcase)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(testcase) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "="))
# return test_func
setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中
# TestAPI.find_yaml_case() # TestAPI.find_yaml_case()
if __name__ == '__main__': if __name__ == '__main__':
TestAPI.run(cases_dir) TestAPI.find_test_cases()
print(TestAPI.__dict__) # print(TestAPI.__dict__)

View File

@@ -33,13 +33,13 @@ class DBServer:
db = DBServer( db = DBServer(
host=settings.db_host, # ip host=settings.db_host, # ip
port=settings.db_port, # 端口 port=3306, # 端口
user=settings.db_user, # 用户名 user='root', # 用户名
password=settings.db_password, # 密码 password='mysql_hNahSe', # 密码
database=settings.db_database # 库名 database='answer' # 库名
) )
if __name__ == '__main__': if __name__ == '__main__':
... ...
# res = db.execute_sql('select username from user where id=1;') res = db.execute_sql('select username from user where id=1;')
# print(res[0]) print(res[0])

View File

@@ -18,18 +18,17 @@ import jsonpath
import allure import allure
from commons.templates import Template from commons.templates import Template
from commons.file_processors.processor_factory import get_processor_class from commons.file_processors.file_handle import FileHandle
from tests.b import TestCaseHandle
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Exchange: class Exchange:
def __init__(self, path): def __init__(self, path):
self.file = get_processor_class(path) self.file = FileHandle(path)
@allure.step("提取变量") @allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index) -> None: def extract(self, resp, var_name, attr, expr: str, index):
resp = copy.deepcopy(resp) resp = copy.deepcopy(resp)
@@ -54,22 +53,21 @@ class Exchange:
value = "not data" value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值 logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
data = self.file.load()
data[var_name] = value # 保存变量 self.file[var_name] = value # 保存变量
self.file.save(data) # 持久化存储到文件 self.file.save() # 持久化存储到文件
@allure.step("替换变量") @allure.step("替换变量")
def replace(self, case_info: dict) -> dict: def replace(self, case_info: dict) -> dict:
logger.info(f"变量替换:{case_info}") logger.info(f"变量替换:{case_info}")
# 1将case_info转换为字符串 # 1将case_info转换为字符串
data = TestCaseHandle(**case_info) case_info_str = FileHandle.to_string(case_info)
case_info_str = data.to_string()
print(f"{case_info_str=}") print(f"{case_info_str=}")
# 2替换字符串 # 2替换字符串
case_info_str = Template(case_info_str).render(self.file.load()) case_info_str = Template(case_info_str).render(self.file)
print(f"{case_info_str=}") print(f"{case_info_str=}")
# 3将字符串转换成case_info # 3将字符串转换成case_info
new_case_info = data.to_dict(case_info_str) new_case_info = FileHandle.to_dict(case_info_str)
return new_case_info return new_case_info

View File

@@ -9,14 +9,3 @@
@date: 2025/3/4 17:23 @date: 2025/3/4 17:23
@desc: @desc:
""" """
from .base_processor import BaseFileProcessor
from .json_processor import JsonProcessor
from .yaml_processor import YamlProcessor
from .processor_factory import get_processor_class
__all__ = [
"BaseFileProcessor",
"JsonProcessor",
"YamlProcessor",
"get_processor_class",
]

View File

@@ -10,8 +10,6 @@
@desc: @desc:
""" """
import abc import abc
from pathlib import Path
from typing import Union
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类 class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
@@ -19,16 +17,25 @@ class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
文件处理器的抽象基类 文件处理器的抽象基类
定义了所有子类必须实现的方法 定义了所有子类必须实现的方法
""" """
def __init__(self, filepath: Union[str, Path], **kwargs):
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
@abc.abstractmethod @abc.abstractmethod
def load(self) -> dict: def load(self):
"""加载.""" """加载."""
raise NotImplementedError pass
@staticmethod
@abc.abstractmethod
def to_string(data: dict) -> str:
"""将文件内容转换为字符串。"""
pass
@staticmethod
@abc.abstractmethod
def to_dict(data: str) -> dict:
"""将文件内容转换为字典。"""
pass
@abc.abstractmethod @abc.abstractmethod
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None: def save(self, new_filepath=None):
"""将数据保存.""" """将数据保存."""
raise NotImplementedError pass

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
processors = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
def get_processor(ext):
agent_model = processors.get(ext, YamlProcessor) # 代理模式
return agent_model # 默认回退到 Yaml
FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = FileHandle(yaml_path)
print(yaml_file)
print(type(yaml_file))
file_string = FileHandle.to_string(yaml_file)
print(file_string)
file_dict = FileHandle.to_dict(file_string)
print(file_dict)

View File

@@ -10,77 +10,117 @@
@desc: @desc:
""" """
import logging import logging
from typing import Union, Any from typing import Union
from pathlib import Path from pathlib import Path
import json import json
from commons.file_processors.base_processor import BaseFileProcessor from commons.file_processors.base import BaseFileProcessor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class JsonProcessor(BaseFileProcessor): class JsonProcessor(BaseFileProcessor, dict):
""" """
用于处理 JSON 文件的类。 用于处理 YAML 文件的类,继承自 dict
提供了从文件加载 JSON 数据为字典,以及将字典保存为 JSON 文件的功能 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。
""" """
def __init__(self, filepath: Union[str, Path], **kwargs): def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
""" """
初始化 JsonFile 对象。 初始化 YamlFile 对象。
Args: Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
""" """
super().__init__(filepath, **kwargs) super().__init__() # 初始化父类 dict
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
def load(self) -> dict[str, Any]: def load(self) -> None:
""" """
Json 文件加载数据。 YAML 文件加载数据并更新字典
:return: 如果文件不存在或加载失败,则清空字典并记录警告/错误。
"""
self.clear() # 清空现有数据
if self.filepath.exists():
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = json.load(f) or {}
self.update(loaded_data) # 使用加载的数据更新字典
except json.JSONDecodeError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
@staticmethod
def to_string(data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
""" """
if not self.filepath.exists():
logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
try: try:
with open(self.filepath, "r", encoding="utf-8") as f: return json.dumps(
loaded_data = json.load(f) dict(data), # 使用dict转换为标准的字典
if not isinstance(loaded_data, dict): # 确保加载的是字典 ensure_ascii=False, # 允许非ASCII字符
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") # indent=4, # 美化输出缩进4个空格
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") sort_keys=False # 不排序键
return loaded_data )
except json.JSONDecodeError as e: except TypeError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") logger.error(f"将数据转换为 JSON 字符串时出错: {e}")
raise e return ""
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None: @staticmethod
def to_dict(data: str) -> None:
""" """
字典数据保存到 json 文件。 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args: Args:
:param data: data: YAML 格式的字符串。
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。 """
try:
loaded_data = json.loads(data) or {}
return loaded_data
except json.JSONDecodeError as e:
logger.error(f"将 JSON 字符串转换为字典时出错: {e}")
def save(self, new_filepath: Union[str, Path, None] = None):
"""
将字典数据 (自身) 保存到 YAML 文件。
Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
""" """
filepath = Path(new_filepath) if new_filepath else self.filepath filepath = Path(new_filepath) if new_filepath else self.filepath
filepath.parent.mkdir(parents=True, exist_ok=True)
try: try:
with open(filepath, "w", encoding="utf-8") as f: with open(filepath, "w", encoding="utf-8") as f:
json.dump( json.dump(
data, dict(self), # 使用dict转换为标准的字典
f, f,
ensure_ascii=False, # 允许非ASCII字符 ensure_ascii=False, # 允许非ASCII字符
indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键 sort_keys=False # 不排序键
) )
logger.info(f"数据已成功保存到 {filepath}") except (TypeError, OSError) as e:
except (TypeError, OSError, json.JSONDecodeError) as e:
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}") logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
raise e
if __name__ == '__main__': if __name__ == '__main__':
# 示例用法 # 示例用法
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径 json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
json_file = JsonProcessor(json_path) json_file = JsonProcessor(json_path)
print(json_file.load()) print(json_file)
print(type(json_file)) print(type(json_file))
# json_file.save() json_string = JsonProcessor.to_string(json_file)
JsonProcessor.to_dict(json_string)
print(json_string)
json_file.save()

View File

@@ -1,57 +0,0 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from pathlib import Path
from typing import Type, Union
from commons.file_processors.base_processor import BaseFileProcessor
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
# 类型别名,表示处理器类的字典
ProcessorMap = dict[str, Type[BaseFileProcessor]]
processors: ProcessorMap = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
class UnsupportedFileTypeError(Exception):
"""当文件类型不被支持时抛出此异常。"""
pass
# def get_processor_class(file_suffix: str = "yaml") -> Type[BaseFileProcessor]:
def get_processor_class(fp: Union[Path, str]) -> 'BaseFileProcessor':
fp = Path(fp)
if fp.is_file():
file_suffix = fp.suffix[1:]
processor_class = processors.get(file_suffix.lower(), YamlProcessor) # 代理模式
return processor_class(fp) # 默认回退到 Yaml
else:
raise UnsupportedFileTypeError(fp)
# FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
# yaml_file = FileHandle(yaml_path)
# print(yaml_file.load())
# print(type(yaml_file))
# file_suffix = Path(yaml_path).suffix[1:]
# print(file_suffix)
get_processor = get_processor_class(yaml_path)
print(get_processor.load())

View File

@@ -11,77 +11,108 @@
""" """
import logging import logging
from typing import Union from typing import Union
from dataclasses import dataclass, asdict, field
from pathlib import Path from pathlib import Path
import yaml import yaml
from commons.file_processors.base_processor import BaseFileProcessor from commons.file_processors.base import BaseFileProcessor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class YamlProcessor(BaseFileProcessor): class YamlProcessor(BaseFileProcessor, dict):
""" """
用于处理 YAML 文件的类,继承自 dict。 用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。 并可以直接像字典一样访问 YAML 数据。
""" """
def __init__(self, filepath: Union[str, Path], **kwargs): def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
""" """
初始化 YamlFile 对象。 初始化 YamlFile 对象。
Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。 如果不提供,则尝试从 filepath 加载数据。
""" """
super().__init__(filepath, **kwargs) super().__init__() # 初始化父类 dict
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
def load(self) -> dict: def load(self) -> None:
""" """
从 YAML 文件加载数据 从 YAML 文件加载数据并更新字典。
:return: 如果文件不存在或加载失败,则清空字典并记录警告/错误。
""" """
if not self.filepath.exists(): self.clear() # 清空现有数据
logger.warning(f"文件 {self.filepath} 不存在.") if self.filepath.exists():
raise FileNotFoundError(f"文件 {self.filepath} 不存在.") try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f) or {}
self.update(loaded_data) # 使用加载的数据更新字典
except yaml.YAMLError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
@staticmethod
def to_string(data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try: try:
with open(self.filepath, "r", encoding="utf-8") as f: return yaml.safe_dump(
loaded_data = yaml.safe_load(f) dict(data), # 使用dict转换为标准的字典
if not isinstance(loaded_data, dict): # 确保加载的是字典 allow_unicode=True,
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") sort_keys=False,
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") default_flow_style=False
return loaded_data )
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
except yaml.YAMLError as e: @staticmethod
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") def to_dict(data: str) -> Union[None, dict]:
raise e
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
""" """
字典数据保存到 YAML 文件。 YAML 格式的字符串转换为字典,并更新当前字典的内容.
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
return loaded_data
except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
def save(self, new_filepath: Union[str, Path, None] = None):
"""
将字典数据 (自身) 保存到 YAML 文件。
Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
""" """
filepath = Path(new_filepath) if new_filepath else self.filepath filepath = Path(new_filepath) if new_filepath else self.filepath
# 确保目标目录存在
filepath.parent.mkdir(parents=True, exist_ok=True)
try: try:
with open(filepath, "w", encoding="utf-8") as f: with open(filepath, "w", encoding="utf-8") as f:
yaml.safe_dump( yaml.safe_dump(
data, dict(self), # 使用dict转换为标准的字典
stream=f, stream=f,
allow_unicode=True, allow_unicode=True,
sort_keys=False, sort_keys=False,
default_flow_style=False default_flow_style=False
) )
logger.info(f"数据已成功保存到 {filepath}") except (TypeError, OSError) as e:
except (TypeError, OSError, yaml.YAMLError) as e:
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
raise e
@@ -91,7 +122,7 @@ if __name__ == '__main__':
# 示例用法 # 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlProcessor(yaml_path) yaml_file = YamlProcessor(yaml_path)
print(yaml_file.load()) print(yaml_file)
print(type(yaml_file)) print(type(yaml_file))
# # 直接像字典一样访问数据 # # 直接像字典一样访问数据

View File

@@ -17,30 +17,28 @@ import hashlib
from commons.databases import db from commons.databases import db
from commons.file_processors.processor_factory import get_processor_class from commons.file_processors.file_handle import FileHandle
from commons import settings from commons import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Funcs: class Funcs:
FUNC_MAPPING = { FUNC_MAPPING = {
"int": int, "int": int,
"float": float, "float": float,
"bool": bool "bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping } # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
@classmethod @classmethod
def register(cls, name: str | None = None): def register(cls, name: str):
def decorator(func): def decorator(func):
if name is None:
cls.FUNC_MAPPING[func.__name__] = func
cls.FUNC_MAPPING[name] = func cls.FUNC_MAPPING[name] = func
return func return func
return decorator return decorator
@Funcs.register("url_unquote") @Funcs.register("url_unquote")
def url_unquote(s: str) -> str: def url_unquote(s: str) -> str:
return urllib.parse.unquote(s) return urllib.parse.unquote(s)
@@ -51,43 +49,35 @@ def to_string(s) -> str:
# 将数据转换为str类型。 # 将数据转换为str类型。
return f"'{s}'" return f"'{s}'"
@Funcs.register("time_str") @Funcs.register("time_str")
def time_str() -> str: def time_str() -> str:
return str(time.time()) return str(time.time())
@Funcs.register("add") @Funcs.register("add")
def add(a, b): def add(a, b):
return str(int(a) + int(b)) return str(int(a) + int(b))
@Funcs.register("sql") @Funcs.register("sql")
def sql(s: str) -> str: def sql(s: str) -> str:
res = db.execute_sql(s) res = db.execute_sql(s)
return res[0][0] return res[0][0]
@Funcs.register("new_id") @Funcs.register("new_id")
def new_id(): def new_id():
# 自增,永不重复 # 自增,永不重复
id_file = get_processor_class(settings.id_path) id_file = FileHandle(settings.id_path)
data = id_file.load() id_file["id"] += 1
data["id"] += 1 id_file.save()
id_file.save(data)
return data["id"]
return id_file["id"]
@Funcs.register("last_id") @Funcs.register("last_id")
def last_id() -> str: def last_id() -> str:
# 不自增,只返回结果 # 不自增,只返回结果
id_file = get_processor_class(settings.id_path) id_file = FileHandle(settings.id_path)
data = id_file.load() return id_file["id"]
return data["id"]
@Funcs.register("md5") @Funcs.register("md5")
def md5(content: str) -> str: def md5(content: str) -> str:
@@ -96,7 +86,6 @@ def md5(content: str) -> str:
result = hashlib.md5(content).hexdigest() result = hashlib.md5(content).hexdigest()
return result return result
@Funcs.register("base64_encode") @Funcs.register("base64_encode")
def base64_encode(content: str) -> str: def base64_encode(content: str) -> str:
# 1原文转二进制 # 1原文转二进制
@@ -108,7 +97,6 @@ def base64_encode(content: str) -> str:
return encode_str return encode_str
@Funcs.register("base64_decode") @Funcs.register("base64_decode")
def base64_decode(content: str) -> str: def base64_decode(content: str) -> str:
# 1原文转二进制 # 1原文转二进制
@@ -120,22 +108,15 @@ def base64_decode(content: str) -> str:
return decode_str return decode_str
@Funcs.register("rsa_encode") @Funcs.register("rsa_encode")
def rsa_encode(content: str) -> str: def rsa_encode(content: str) -> str:
... ...
@Funcs.register("rsa_decode") @Funcs.register("rsa_decode")
def rsa_decode(content: str) -> str: def rsa_decode(content: str) -> str:
... ...
@Funcs.register()
def func_name_test():
...
if __name__ == '__main__': if __name__ == '__main__':
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82") # res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
# print(res) # print(res)

View File

@@ -10,7 +10,6 @@
@desc: 声明yaml用例格式 @desc: 声明yaml用例格式
""" """
import logging import logging
from typing import Union, Optional
from dataclasses import dataclass, field from dataclasses import dataclass, field
import yaml import yaml
@@ -21,45 +20,20 @@ logger = logging.getLogger(__name__)
@dataclass @dataclass
class RequestModel: class CaseInfo:
method: str
url: str
headers: Optional[dict] = None
# body: Optional[Union[dict, str]] = None
params: Optional[Union[dict, str]] = None
@dataclass
class TestCaseModel:
title: str title: str
request: RequestModel request: dict
extract: dict extract: dict
validate: dict validate: dict
parametrize: list = field(default_factory=list) parametrize: list = field(default_factory=list)
epic: str = field(default_factory=lambda: settings.allure_epic) epic: str = settings.allure_epic
feature: str = field(default_factory=lambda: settings.allure_feature) feature: str = settings.allure_feature
story: str = field(default_factory=lambda: settings.allure_story) story: str = settings.allure_story
def __post_init__(self):
# 必填字段非空校验
if self.title is None:
raise ValueError("Title cannot be empty")
# 校验RequestModel
if isinstance(self.request, dict):
try:
self.request = RequestModel(**self.request) # RequestModel 的 __post_init__ 会被调用
except (TypeError, ValueError) as e:
raise ValueError(f"解析 'request' 字段失败: {e} (数据: {self.request})") from e
elif not isinstance(self.request, RequestModel): # 如果不是 dict 也不是 RequestModel
raise TypeError(
f"字段 'request' 必须是字典 (将在内部转换为 RequestModel) 或 RequestModel 实例, "
f"得到的是 {type(self.request).__name__}"
)
if __name__ == '__main__': if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f: with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
# print(data) # print(data)
case_info = TestCaseModel(**data) case_info = CaseInfo(**data)

View File

@@ -22,13 +22,11 @@ cases_dir = rf"{root_path}\TestCases\answer"
exchanger = rf"{root_path}\extract.yaml" exchanger = rf"{root_path}\extract.yaml"
id_path = rf"{root_path}\id.yaml" id_path = rf"{root_path}\id.yaml"
test_suffix = "yaml" db_host = os.getenv("DB_HOST") # ip
db_host = os.getenv("DB_HOST") # ip
db_port = os.getenv("DB_PORT") # 端口 db_port = os.getenv("DB_PORT") # 端口
db_user = os.getenv("DB_USER") # 用户名 db_user = os.getenv("DB_USER") # 用户名
db_password = os.getenv("DB_PASSWORD") # 密码 db_password = os.getenv("DB_PASSWORD") # 密码
db_database = os.getenv("DB_DATABASE") db_database = os.getenv("DB_DATABASE") # 库名
allure_epic: str = "项目名称answer" allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature" allure_feature: str = "默认特征feature"
@@ -37,6 +35,7 @@ allure_story: str = "默认事件story"
rsa_public = "" rsa_public = ""
rsa_private = "" rsa_private = ""
if __name__ == '__main__': if __name__ == '__main__':
print(root_path) print(root_path)
print(base_url,db_host,db_port,db_user,db_password,db_database) print(base_url,db_host,db_port,db_user,db_password,db_database)

View File

@@ -5,7 +5,7 @@ import pytest
from commons.cases import TestAPI from commons.cases import TestAPI
TestAPI.run() # 加载yaml文件 TestAPI.find_test_cases() # 加载yaml文件
if __name__ == '__main__': if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

View File

@@ -17,7 +17,6 @@ pymysql = "^1.1.1"
pytest-result-log = "^1.2.2" pytest-result-log = "^1.2.2"
allure-pytest = "^2.13.5" allure-pytest = "^2.13.5"
cryptography = "^44.0.2" cryptography = "^44.0.2"
dotenv = "^0.9.9"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"

View File

@@ -3,7 +3,7 @@ addopts = -q --show-capture=no
log_file = logs/pytest.log log_file = logs/pytest.log
log_file_level = debug log_file_level = info
log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s
log_file_date_format = %m/%d/%Y %I:%M:%S %p log_file_date_format = %m/%d/%Y %I:%M:%S %p