5 Commits

Author SHA1 Message Date
300b5a92d4 refactor(): 优化测试用例数据的处理,优化代码结构
- 新增用例生成器和注册器
- 优化文件处理
2025-06-03 21:42:57 +08:00
2e9f1c12f7 feat,fix(): 优化funcs注册函数
- 优化 register 方法
2025-04-07 16:35:14 +08:00
4324cf37aa feat,fix(): 优化
- 优化 settings(使用环境变量)
- 修复bug
2025-03-19 17:03:13 +08:00
a50e00a4e1 refactor(): 优化文件读取,变量替换等
- 优化用例加载模块器
- 新增JSON文件读取模块
2025-03-09 17:23:25 +08:00
914b0301ba feat,fix(): 优化项目
- 优化yaml_processor(优化文件类型转换逻辑)
- 修复bug
2025-03-07 17:28:41 +08:00
20 changed files with 710 additions and 439 deletions

View File

@@ -0,0 +1,65 @@
{
"epic": "项目名称answer",
"feature": "页面状态",
"story": "状态",
"title": "查询状态信息",
"request": {
"method": "get",
"url": "/answer/api/v1/connector/info",
"headers": {
"Host": "119.91.19.171:40065",
"Accept-Language": "en_US",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
"Referer": "http://119.91.19.171:40065/users/login",
"Accept-Encoding": "gzip, deflate"
}
},
"extract": {
"msg": [
"json",
"$.msg",
0
]
},
"validate": {
"equals": {
"状态码等于200": [
"Success.",
"Success."
]
}
},
"parametrize": [
[
"title",
"username",
"password",
"msg"
],
[
"测试1",
"user1",
"pass1",
"200"
],
[
"测试2",
"user2",
"pass2",
"300"
],
[
"测试3",
"user3",
"pass3",
"200"
],
[
"测试4",
"user4",
"pass4",
"200"
]
]
}

80
commons/case_handler.py Normal file
View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_handler
@date: 2025/5/26 22:13
@desc:
"""
import json
import logging
from dataclasses import dataclass, asdict
from commons.models import TestCaseModel
logger = logging.getLogger(__name__)
@dataclass
class TestCaseHandle(TestCaseModel):
@classmethod
def new(cls, testcase: dict) -> 'TestCaseHandle':
try:
instance = cls(**testcase)
return instance
except (TypeError, ValueError) as e:
logger.warning(f"解析错误:{e}")
raise e
def to_string(self) -> str:
"""
将 字典 转换为 json 格式的字符串。
:return:
json 格式的字符串。
"""
try:
res = json.dumps(asdict(self), ensure_ascii=False)
return res
except TypeError as e:
logger.error(f"将数据转换为 json 字符串时出错: {e}")
raise e
@staticmethod
def to_dict(json_str: str) -> dict:
"""
将 json 格式的字符串转换为 字典.
:param
json_str: json 格式的字符串。
:return:
"""
try:
res = json.loads(json_str)
return res
except json.JSONDecodeError as e:
logger.error(f"将 json 字符串转换为字典时出错: {e}")
raise e
if __name__ == '__main__':
from pathlib import Path
from commons.file_processors import processor_factory
test_data = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml")
yaml_data = processor_factory.get_processor_class(test_data)
case_info = TestCaseHandle.new(yaml_data.load())
print(case_info.to_string())
print(type(case_info.to_string()))
print(case_info.to_dict(case_info.to_string()))
print(type(case_info.to_dict(case_info.to_string())))
print(type(case_info))
print(case_info.parametrize)
for i in case_info.parametrize:
print(i)

View File

@@ -9,112 +9,223 @@
@date: 2024 2024/9/16 9:57 @date: 2024 2024/9/16 9:57
@desc: 动态生成用例 @desc: 动态生成用例
""" """
from dataclasses import asdict
from pathlib import Path from pathlib import Path
import logging import logging
from typing import Union, Generator, Type
from unittest import TestCase
import allure import allure
import pytest import pytest
from commons import settings from commons import settings
from commons.file_processors.yaml_processor import YamlFile from commons.file_processors.processor_factory import get_processor_class
from commons.models import CaseInfo # from commons.models import CaseInfo
from commons.session import Session from commons.session import Session
from commons.exchange import Exchange from commons.exchange import Exchange
from utils import data_driver from commons.templates import Template
from commons.case_handler import TestCaseHandle
from utils import case_validator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
session = Session(settings.base_url) session = Session(settings.base_url)
_case_path = Path(settings.case_path) cases_dir = Path(settings.cases_dir)
exchanger = Exchange(settings.exchanger) exchanger = Exchange(settings.exchanger)
@allure.epic("项目名称answer")
class TestAPI: class TestAPI:
@classmethod @classmethod
def find_yaml_case(cls, case_path: Path = _case_path): def run(cls, testcase_dir: Union[Path, str] = cases_dir):
""" for fp in CaseFinder(testcase_dir).find_testcases():
搜索和加载yaml文件 print(fp.name)
:return: case = CaseGenerator(fp).generate_testcases()
""" print(f"{case=}")
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件 for i in case:
for yaml_path in yaml_path_list: print(f"{i=}")
logger.info(f"加载文件:{yaml_path}") CaseRegister(cls).register_test_func(i)
# @classmethod
# def find_test_cases(cls, case_dir: Path = cases_dir):
# """
# 搜索和加载yaml文件
# :return:
# """
# case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
# for case_path in case_path_list:
# logger.info(f"加载文件:{case_path}")
#
# file = FileHandle(case_path) # 自动读取yaml文件
# try:
# CaseInfo(**file) # 校验用例格式
# logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
# case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# # print(case_path.stem)
# setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
# except Exception as e:
# logger.error(e)
#
# @classmethod
# def new_case(cls, file_name, case_info: dict):
# test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
#
# keys_list = list(test_case.keys())
# logger.info(f"keys_list{keys_list}")
#
# values_list = list(test_case.values())
# logger.info(f"测试用例列表:{values_list}")
#
# driver_title = [i.get("title") for i in values_list]
# logger.info(f"driver_title={driver_title}")
#
# epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
# logger.info(f"epic{epic}")
#
# feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
# logger.info(f"feature{feature}")
#
# story = case_info["story"] if case_info["story"] else settings.allure_story
# logger.info(f"story{story}")
#
# @allure.epic(epic)
# @allure.feature(feature)
# @allure.story(story)
# @pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
# def test_func(self, case_key):
# logger.info(f"case_key{case_key}")
#
# test_case_mapping = test_case.get(case_key)
# logger.info(f"测试用例:{test_case_mapping}")
#
# allure.dynamic.title(test_case_mapping.get("title"))
#
# logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
#
# # 0变量替换
# new_case_info = exchanger.replace(test_case_mapping)
# logger.info(f"1正在注入变量...")
# logger.info(f"new_case_info{new_case_info}")
# # 1发送请求
# logger.info(f"2正在请求接口...")
# resp = session.request(**new_case_info.get("request"))
#
# logger.info(f"3正在提取变量...")
# # 2保存变量(接口关联)
# for var_name, extract_info in new_case_info.get("extract").items():
# logger.info(f"保存变量:{var_name}{extract_info}")
# exchanger.extract(resp, var_name, *extract_info)
# # 3断言
# logger.info(f"4正在断言...")
# assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}")
# # assert_case_info.assert_all() # 执行断言
# _validator = case_validator.CaseValidator()
# _validator.assert_all(assert_case_info.get("validate"))
#
# logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
#
# return test_func
file = YamlFile(yaml_path) # 自动读取yaml文件
case_info = CaseInfo(**file) # 校验yaml格式 class CaseFinder:
find_suffix: str = settings.test_suffix
logger.info(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志 def __init__(self, testcase_dir: Union[str, Path]):
# case_info = {yaml_path.stem:case_info} if Path(testcase_dir).is_dir():
# logger.info(f"case_info_dict={case_info}") self.testcase_dir: Path = Path(testcase_dir)
case_func = cls.new_case(yaml_path.stem, file) # 从yaml格式转换为pytest格式 else:
print(yaml_path.stem) raise FileNotFoundError("不是有效的目录")
setattr(cls, f"{yaml_path.stem}", case_func) # 把pytest格式添加到类中
@classmethod def find_testcases(self) -> Generator[Path, None, None]:
def new_case(cls,file_name, case_info: dict): testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}")
case_data = data_driver.DataDriver().generate_cases(file_name,case_info) for fp in testcase_files:
# ddt_data = case_info.ddt() logger.info(f"加载文件:{fp}")
keys_list = ['test_1_user[0]', 'test_1_user[1]', 'test_1_user[2]', 'test_1_user[3]'] yield fp
titles = ['查询用户信息', '查询用户信息', '查询用户信息', '查询用户信息']
for item in case_data:
# 遍历列表中的每个字典
for key, value in item.items():
print(f"key:{key}")
keys_list.append(key)
print(f"value:{value}")
# # 遍历内层字典(这里内层字典其实只有一个键值对)
titles.append(value['title'])
print(f"测试数据:{case_data}")
item={'test_1_user[0]': {'feature': '特征', 'story': '事件', 'title': '查询用户信息', 'request': {'method': 'get', 'url': 'http://119.91.19.171:40065/answer/api/v1/connector/info', 'headers': {'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0'}}, 'extract': {'code': ['json', '$.code', 0], 'msg': ['json', '$.msg', 0]}, 'validate': {'equals': {'状态码等于200': [200, 'code1']}, 'not_equals': {'状态码不等于404': [404, 'code1']}, 'contains': {'包含关系': [404, 'code1']}, 'not_contains': {'不包含关系': [404, 'code1']}}, 'parametrize': [['title', 'username', 'password', 'code'], ['测试1', 'user1', 'pass1', 'code1'], ['测试2', 'user2', 'pass2', 'code2'], ['测试3', 'user3', 'pass3', 'code3'], ['测试4', 'user4', 'pass4', 'code4']]}}
# ddt_title = [data.title for data in ddt_data]
# logger.info(f"{ddt_title=}")
logger.info(f"keys_list={keys_list}")
logger.info(f"titles={titles}")
logger.info(f"feature={case_info.get('feature')}")
logger.info(f"story={case_info.get('story')}")
@allure.feature(case_info.get("feature"))
@allure.story(case_info.get("story"))
@pytest.mark.parametrize("case_key", keys_list, ids=titles)
def test_func(self, case_key):
logger.info(f"case_key={case_key}")
item.get(case_key)
logger.info(f"========:{item}")
logger.info(f"========:{item.get(case_key)}")
allure.dynamic.title(case_info.get("title"))
logger.info(f"用例开始执行:{case_info.get('title')}".center(80, "="))
# 0变量替换 class CaseGenerator:
new_case_info = exchanger.replace(case_info)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info={new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...") def __init__(self, fp: Union[str, Path]):
# 2保存变量(接口关联) self.fp: Path = Path(fp)
new_case_info = CaseInfo(**new_case_info)
for var_name, extract_info in new_case_info.extract.items():
# logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(case_info) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}")
assert_case_info.assert_all() # 执行断言
logger.info(f"用例执行结束:{case_info.title}".center(80, "=")) def generate_testcases(self) -> Generator[dict, None, None]:
file_name = self.fp.stem
return test_func case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件
case_info = TestCaseHandle.new(case_info_)
if not case_info.parametrize:
yield {file_name + "__": asdict(case_info)}
else:
cases = {}
args_names = case_info.parametrize[0]
for i, args_values in enumerate(case_info.parametrize[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
print(context)
# rendered = Template(FileHandle.to_string(case_info)).render(context)
rendered = Template(case_info.to_string()).render(context)
# cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)})
yield cases
class CaseRegister:
def __init__(self, register: Type["TestAPI"]):
self.register: Type["TestAPI"] = register
def register_test_func(self, case: dict):
for test_filed_name, case_info in case.items():
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
def register_func(instance, testcase=case_info):
# allure.dynamic.epic(epic)
# allure.dynamic.feature(feature)
# allure.dynamic.story(story)
allure.dynamic.title(testcase.get("title"))
logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(testcase)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(testcase) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "="))
# return test_func
setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中
# TestAPI.find_yaml_case() # TestAPI.find_yaml_case()
if __name__ == '__main__': if __name__ == '__main__':
TestAPI.find_yaml_case() TestAPI.run(cases_dir)
# print(TestAPI.__dict__) print(TestAPI.__dict__)

View File

@@ -33,13 +33,13 @@ class DBServer:
db = DBServer( db = DBServer(
host=settings.db_host, # ip host=settings.db_host, # ip
port=3306, # 端口 port=settings.db_port, # 端口
user='root', # 用户名 user=settings.db_user, # 用户名
password='mysql_hNahSe', # 密码 password=settings.db_password, # 密码
database='answer' # 库名 database=settings.db_database # 库名
) )
if __name__ == '__main__': if __name__ == '__main__':
... ...
res = db.execute_sql('select username from user where id=1;') # res = db.execute_sql('select username from user where id=1;')
print(res[0]) # print(res[0])

View File

@@ -13,24 +13,23 @@ import copy
import json import json
import logging import logging
import re import re
import jsonpath
import allure import allure
from commons.templates import Template from commons.templates import Template
import jsonpath from commons.file_processors.processor_factory import get_processor_class
from tests.b import TestCaseHandle
from commons.file_processors.yaml_processor import YamlFile,StringOrDict
from commons.models import CaseInfo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Exchange: class Exchange:
def __init__(self, path): def __init__(self, path):
self.file = YamlFile(path) self.file = get_processor_class(path)
@allure.step("提取变量") @allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index): def extract(self, resp, var_name, attr, expr: str, index) -> None:
resp = copy.deepcopy(resp) resp = copy.deepcopy(resp)
@@ -55,35 +54,25 @@ class Exchange:
value = "not data" value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值 logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
data = self.file.load()
data[var_name] = value # 保存变量
self.file.save(data) # 持久化存储到文件
self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件
# @allure.step("替换变量")
# def replace(self, case_info: CaseInfo):
# logger.info(case_info)
# # 1将case_info转换为字符串
# case_info_str = case_info.to_yaml()
# print(f"{case_info_str=}")
# # 2替换字符串
# case_info_str = Template(case_info_str).render(self.file)
# print(f"{case_info_str=}")
# # 3将字符串转换成case_info
# new_case_info = case_info.by_yaml(case_info_str)
# return new_case_info
@allure.step("替换变量") @allure.step("替换变量")
def replace(self, case_info: dict): def replace(self, case_info: dict) -> dict:
logger.info(case_info) logger.info(f"变量替换:{case_info}")
# 1将case_info转换为字符串 # 1将case_info转换为字符串
case_info_str = StringOrDict().to_string(case_info) data = TestCaseHandle(**case_info)
case_info_str = data.to_string()
print(f"{case_info_str=}") print(f"{case_info_str=}")
# 2替换字符串 # 2替换字符串
case_info_str = Template(case_info_str).render(self.file) case_info_str = Template(case_info_str).render(self.file.load())
print(f"{case_info_str=}") print(f"{case_info_str=}")
# 3将字符串转换成case_info # 3将字符串转换成case_info
new_case_info = StringOrDict().to_dict(case_info_str) new_case_info = data.to_dict(case_info_str)
return new_case_info return new_case_info
if __name__ == '__main__': if __name__ == '__main__':
class MockResponse: class MockResponse:
text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}' text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}'
@@ -101,14 +90,24 @@ if __name__ == '__main__':
exchanger.extract(mock_resp, "age", "json", '$.age', 0) exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0) exchanger.extract(mock_resp, "data", "json", '$.data', 0)
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0) exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
mock_case_info = CaseInfo( # mock_case_info = CaseInfo(
title="单元测试", # title="单元测试",
request={ # request={
# "data":
# {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
# },
# extract={},
# validate={}
# )
mock_case_info = {
"title": "单元测试",
"request": {
"data": "data":
{"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"} {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
}, },
extract={}, "extract": {},
validate={} "validate": {}
) }
new_mock_case_info = exchanger.replace(mock_case_info) new_mock_case_info = exchanger.replace(mock_case_info)
print(new_mock_case_info) print(new_mock_case_info)

View File

@@ -9,3 +9,14 @@
@date: 2025/3/4 17:23 @date: 2025/3/4 17:23
@desc: @desc:
""" """
from .base_processor import BaseFileProcessor
from .json_processor import JsonProcessor
from .yaml_processor import YamlProcessor
from .processor_factory import get_processor_class
__all__ = [
"BaseFileProcessor",
"JsonProcessor",
"YamlProcessor",
"get_processor_class",
]

View File

@@ -10,6 +10,8 @@
@desc: @desc:
""" """
import abc import abc
from pathlib import Path
from typing import Union
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类 class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
@@ -17,23 +19,16 @@ class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
文件处理器的抽象基类 文件处理器的抽象基类
定义了所有子类必须实现的方法 定义了所有子类必须实现的方法
""" """
def __init__(self, filepath: Union[str, Path], **kwargs):
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
@abc.abstractmethod @abc.abstractmethod
def load(self): def load(self) -> dict:
"""加载.""" """加载."""
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def to_string(self) -> str: def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""将文件内容转换为字符串。"""
pass
@abc.abstractmethod
def to_dict(self, data: str) -> dict:
"""将文件内容转换为字典。"""
pass
@abc.abstractmethod
def save(self, new_filepath=None):
"""将数据保存.""" """将数据保存."""
pass raise NotImplementedError

View File

@@ -0,0 +1,86 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: yaml_processor
@date: 2025/3/4 17:28
@desc:
"""
import logging
from typing import Union, Any
from pathlib import Path
import json
from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__)
class JsonProcessor(BaseFileProcessor):
"""
用于处理 JSON 文件的类。
提供了从文件加载 JSON 数据为字典,以及将字典保存为 JSON 文件的功能。
"""
def __init__(self, filepath: Union[str, Path], **kwargs):
"""
初始化 JsonFile 对象。
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
"""
super().__init__(filepath, **kwargs)
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
def load(self) -> dict[str, Any]:
"""
从 Json 文件加载数据。
:return:
"""
if not self.filepath.exists():
logger.warning(f"文件 {self.filepath} 不存在.")
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = json.load(f)
if not isinstance(loaded_data, dict): # 确保加载的是字典
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
return loaded_data
except json.JSONDecodeError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
raise e
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
"""
将字典数据保存到 json 文件。
Args:
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(
data,
f,
ensure_ascii=False, # 允许非ASCII字符
sort_keys=False # 不排序键
)
logger.info(f"数据已成功保存到 {filepath}")
except (TypeError, OSError, json.JSONDecodeError) as e:
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
raise e
if __name__ == '__main__':
# 示例用法
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
json_file = JsonProcessor(json_path)
print(json_file.load())
print(type(json_file))
# json_file.save()

View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from pathlib import Path
from typing import Type, Union
from commons.file_processors.base_processor import BaseFileProcessor
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
# 类型别名,表示处理器类的字典
ProcessorMap = dict[str, Type[BaseFileProcessor]]
processors: ProcessorMap = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
class UnsupportedFileTypeError(Exception):
"""当文件类型不被支持时抛出此异常。"""
pass
# def get_processor_class(file_suffix: str = "yaml") -> Type[BaseFileProcessor]:
def get_processor_class(fp: Union[Path, str]) -> 'BaseFileProcessor':
fp = Path(fp)
if fp.is_file():
file_suffix = fp.suffix[1:]
processor_class = processors.get(file_suffix.lower(), YamlProcessor) # 代理模式
return processor_class(fp) # 默认回退到 Yaml
else:
raise UnsupportedFileTypeError(fp)
# FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
# yaml_file = FileHandle(yaml_path)
# print(yaml_file.load())
# print(type(yaml_file))
# file_suffix = Path(yaml_path).suffix[1:]
# print(file_suffix)
get_processor = get_processor_class(yaml_path)
print(get_processor.load())

View File

@@ -11,150 +11,87 @@
""" """
import logging import logging
from typing import Union from typing import Union
from dataclasses import dataclass, asdict, field
from pathlib import Path from pathlib import Path
import yaml import yaml
from commons.file_processors.base import BaseFileProcessor from commons.file_processors.base_processor import BaseFileProcessor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class YamlFile(BaseFileProcessor, dict): class YamlProcessor(BaseFileProcessor):
""" """
用于处理 YAML 文件的类,继承自 dict。 用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。 并可以直接像字典一样访问 YAML 数据。
""" """
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): def __init__(self, filepath: Union[str, Path], **kwargs):
""" """
初始化 YamlFile 对象。 初始化 YamlFile 对象。
Args: Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。 如果不提供,则尝试从 filepath 加载数据。
""" """
super().__init__() # 初始化父类 dict super().__init__(filepath, **kwargs)
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 # self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
def load(self) -> None: def load(self) -> dict:
""" """
从 YAML 文件加载数据并更新字典。 从 YAML 文件加载数据
如果文件不存在或加载失败,则清空字典并记录警告/错误。 :return:
""" """
self.clear() # 清空现有数据 if not self.filepath.exists():
if self.filepath.exists(): logger.warning(f"文件 {self.filepath} 不存在.")
try: raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f) or {}
self.update(loaded_data) # 使用加载的数据更新字典
except yaml.YAMLError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
def to_string(self) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try: try:
return yaml.safe_dump( with open(self.filepath, "r", encoding="utf-8") as f:
dict(self), # 使用dict转换为标准的字典 loaded_data = yaml.safe_load(f)
allow_unicode=True, if not isinstance(loaded_data, dict): # 确保加载的是字典
sort_keys=False, logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
default_flow_style=False raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
) return loaded_data
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
def to_dict(self, data: str) -> None:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
self.clear()
self.update(loaded_data) # 清空并更新
except yaml.YAMLError as e: except yaml.YAMLError as e:
logger.error(f" YAML 字符串转换为字典时出错: {e}") logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
self.clear() # 出错时也清空 raise e
def save(self, new_filepath: Union[str, Path, None] = None): def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
""" """
将字典数据 (自身) 保存到 YAML 文件。 将字典数据保存到 YAML 文件。
:param data:
:param new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
""" """
filepath = Path(new_filepath) if new_filepath else self.filepath filepath = Path(new_filepath) if new_filepath else self.filepath
# 确保目标目录存在
filepath.parent.mkdir(parents=True, exist_ok=True)
try: try:
with open(filepath, "w", encoding="utf-8") as f: with open(filepath, "w", encoding="utf-8") as f:
yaml.safe_dump( yaml.safe_dump(
dict(self), # 使用dict转换为标准的字典 data,
stream=f, stream=f,
allow_unicode=True, allow_unicode=True,
sort_keys=False, sort_keys=False,
default_flow_style=False default_flow_style=False
) )
except (TypeError, OSError) as e: logger.info(f"数据已成功保存到 {filepath}")
except (TypeError, OSError, yaml.YAMLError) as e:
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
raise e
class StringOrDict:
@classmethod
def to_string(cls, data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
dict(data), # 使用dict转换为标准的字典
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
@classmethod
def to_dict(cls, data: str) -> Union[None, dict]:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
return loaded_data
except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
if __name__ == '__main__': if __name__ == '__main__':
# 示例用法 # 示例用法
yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' # 你的 YAML 文件路径 yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlFile(yaml_path) yaml_file = YamlProcessor(yaml_path)
print(yaml_file) print(yaml_file.load())
print(type(yaml_file)) print(type(yaml_file))
# # 直接像字典一样访问数据 # # 直接像字典一样访问数据
@@ -198,14 +135,3 @@ if __name__ == '__main__':
# non_existent_file['a'] = 1 # 可以直接添加 # non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file) # print("\n加载不存在的文件:", non_existent_file)
# if __name__ == '__main__':
# from commons.models import CaseInfo
#
# yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml'
# yaml_file = YamlFile(yaml_path)
# print(yaml_file.load())
# # yaml_file.load()
# # case_info = CaseInfo(**yaml_file)
# # print(case_info)
# # yaml_file["title"] = "查询用户信息"
# # yaml_file.save()

View File

@@ -15,30 +15,32 @@ import time
import urllib.parse import urllib.parse
import hashlib import hashlib
# from commons.databases import db from commons.databases import db
from commons.files import YamlFile from commons.file_processors.processor_factory import get_processor_class
from commons import settings from commons import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Funcs:
class Funcs:
FUNC_MAPPING = { FUNC_MAPPING = {
"int": int, "int": int,
"float": float, "float": float,
"bool": bool "bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping } # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
@classmethod @classmethod
def register(cls, name: str): def register(cls, name: str | None = None):
def decorator(func): def decorator(func):
if name is None:
cls.FUNC_MAPPING[func.__name__] = func
cls.FUNC_MAPPING[name] = func cls.FUNC_MAPPING[name] = func
return func return func
return decorator return decorator
@Funcs.register("url_unquote") @Funcs.register("url_unquote")
def url_unquote(s: str) -> str: def url_unquote(s: str) -> str:
return urllib.parse.unquote(s) return urllib.parse.unquote(s)
@@ -49,35 +51,43 @@ def to_string(s) -> str:
# 将数据转换为str类型。 # 将数据转换为str类型。
return f"'{s}'" return f"'{s}'"
@Funcs.register("time_str") @Funcs.register("time_str")
def time_str() -> str: def time_str() -> str:
return str(time.time()) return str(time.time())
@Funcs.register("add") @Funcs.register("add")
def add(a, b): def add(a, b):
return str(int(a) + int(b)) return str(int(a) + int(b))
@Funcs.register("sql") @Funcs.register("sql")
def sql(s: str) -> str: def sql(s: str) -> str:
res = db.execute_sql(s) res = db.execute_sql(s)
return res[0][0] return res[0][0]
@Funcs.register("new_id") @Funcs.register("new_id")
def new_id(): def new_id():
# 自增,永不重复 # 自增,永不重复
id_file = YamlFile(settings.id_path) id_file = get_processor_class(settings.id_path)
id_file["id"] += 1 data = id_file.load()
id_file.save() data["id"] += 1
id_file.save(data)
return data["id"]
return id_file["id"]
@Funcs.register("last_id") @Funcs.register("last_id")
def last_id() -> str: def last_id() -> str:
# 不自增,只返回结果 # 不自增,只返回结果
id_file = YamlFile(settings.id_path) id_file = get_processor_class(settings.id_path)
return id_file["id"] data = id_file.load()
return data["id"]
@Funcs.register("md5") @Funcs.register("md5")
def md5(content: str) -> str: def md5(content: str) -> str:
@@ -86,6 +96,7 @@ def md5(content: str) -> str:
result = hashlib.md5(content).hexdigest() result = hashlib.md5(content).hexdigest()
return result return result
@Funcs.register("base64_encode") @Funcs.register("base64_encode")
def base64_encode(content: str) -> str: def base64_encode(content: str) -> str:
# 1原文转二进制 # 1原文转二进制
@@ -97,6 +108,7 @@ def base64_encode(content: str) -> str:
return encode_str return encode_str
@Funcs.register("base64_decode") @Funcs.register("base64_decode")
def base64_decode(content: str) -> str: def base64_decode(content: str) -> str:
# 1原文转二进制 # 1原文转二进制
@@ -108,18 +120,25 @@ def base64_decode(content: str) -> str:
return decode_str return decode_str
@Funcs.register("rsa_encode") @Funcs.register("rsa_encode")
def rsa_encode(content: str) -> str: def rsa_encode(content: str) -> str:
... ...
@Funcs.register("rsa_decode") @Funcs.register("rsa_decode")
def rsa_decode(content: str) -> str: def rsa_decode(content: str) -> str:
... ...
@Funcs.register()
def func_name_test():
...
if __name__ == '__main__': if __name__ == '__main__':
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82") # res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
# print(res) # print(res)
# print(f"计数器:{new_id()}") # print(f"计数器:{new_id()}")
# print(f"当前数值:{last_id()}") # print(f"当前数值:{last_id()}")
print(Funcs().FUNC_MAPPING) print(Funcs().FUNC_MAPPING)

View File

@@ -10,103 +10,56 @@
@desc: 声明yaml用例格式 @desc: 声明yaml用例格式
""" """
import logging import logging
from dataclasses import dataclass, asdict, field from typing import Union, Optional
from dataclasses import dataclass, field
import allure
import yaml import yaml
from commons.templates import Template
from commons import settings from commons import settings
from utils import case_validator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass @dataclass
class CaseInfo: class RequestModel:
method: str
url: str
headers: Optional[dict] = None
# body: Optional[Union[dict, str]] = None
params: Optional[Union[dict, str]] = None
@dataclass
class TestCaseModel:
title: str title: str
request: dict request: RequestModel
extract: dict extract: dict
validate: dict validate: dict
parametrize: list = field(default_factory=list) parametrize: list = field(default_factory=list)
epic: str = settings.allure_epic epic: str = field(default_factory=lambda: settings.allure_epic)
feature: str = settings.allure_feature feature: str = field(default_factory=lambda: settings.allure_feature)
story: str = settings.allure_story story: str = field(default_factory=lambda: settings.allure_story)
def to_yaml(self) -> str: def __post_init__(self):
# 序列化成yaml字符串 # 必填字段非空校验
yaml_str = yaml.safe_dump( if self.title is None:
asdict(self), raise ValueError("Title cannot be empty")
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False)
return yaml_str
@classmethod # 校验RequestModel
def by_yaml(cls, yaml_str): if isinstance(self.request, dict):
# 反序列化 try:
obj = cls(**yaml.safe_load(yaml_str)) self.request = RequestModel(**self.request) # RequestModel 的 __post_init__ 会被调用
return obj except (TypeError, ValueError) as e:
raise ValueError(f"解析 'request' 字段失败: {e} (数据: {self.request})") from e
@allure.step("断言") elif not isinstance(self.request, RequestModel): # 如果不是 dict 也不是 RequestModel
def assert_all(self): raise TypeError(
_validator = case_validator.CaseValidator() f"字段 'request' 必须是字典 (将在内部转换为 RequestModel) 或 RequestModel 实例, "
# print(case_validator.VALIDATORS) f"得到的是 {type(self.request).__name__}"
)
if not self.validate:
return
_validator.assert_all(self.validate)
# for assert_type, assert_value in self.validate.items():
# for msg, data in assert_value.items():
# a, b = data[0], data[1]
# # print(assert_type, a, b, msg)
# match assert_type:
# case 'equals':
# logger.info(f"assert {a} == {b}, {msg}")
# assert a == b, msg
# case 'not_equals':
# logger.info(f"assert {a} != {b}, {msg}")
# assert a != b, msg
# case 'contains':
# logger.info(f"assert {a} in {b}, {msg}")
# assert a in b, msg
# case 'not_contains':
# logger.info(f"assert {a} not in {b}, {msg}")
# assert a not in b, msg
# case "xxxxx
def ddt(self) -> list: # 返回一个列表列表中应该包含N个注入了变量的caseInfo
case_list = []
if not self.parametrize: # 没有使用数据驱动测试
logger.info("1执行这一步")
# case_info_str = self.to_yaml() # 转字符串
# case_info_str = Template(case_info_str).render(d) # 输入变量
# case_info = self.by_yaml(case_info_str) # 转成类
# case_list.append(case_info)
case_list.append(self)
else: # 使用数据驱动测试
args_name = self.parametrize[0]
args_value_list = self.parametrize[1:]
for args_value in args_value_list:
d = dict(zip(args_name, args_value))
print(f"D的值{d}")
# d 就是数据驱动测试的变量,应输入到用例中
case_info_str = self.to_yaml() # 转字符串
case_info_str = Template(case_info_str).render(d) # 输入变量
case_info = self.by_yaml(case_info_str) # 转成类
case_list.append(case_info) # 加入到返回值
return case_list
if __name__ == '__main__': if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f: with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
# print(data) # print(data)
case_info = CaseInfo(**data) case_info = TestCaseModel(**data)
s = case_info.to_yaml()
# print(s)
new_case_info = case_info.by_yaml(s)
# print(new_case_info)
ddt_ddt = case_info.ddt()
print(ddt_ddt)

View File

@@ -9,14 +9,16 @@
@date: 2024 2024/9/12 21:56 @date: 2024 2024/9/12 21:56
@desc: @desc:
""" """
from urllib.parse import urljoin
import logging import logging
from urllib.parse import urljoin
import requests import requests
import allure
from requests import Response, PreparedRequest from requests import Response, PreparedRequest
import allure
logger = logging.getLogger("requests.session") # logger = logging.getLogger("requests.session")
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)

View File

@@ -10,19 +10,25 @@
@desc: @desc:
""" """
from pathlib import Path from pathlib import Path
import os
from dotenv import load_dotenv
load_dotenv()
root_path = (Path(__file__)).resolve().parents[1] root_path = (Path(__file__)).resolve().parents[1]
base_url = 'http://119.91.19.171:40065' base_url = os.getenv("BASE_URL")
case_path = rf"{root_path}\TestCases\answer" cases_dir = rf"{root_path}\TestCases\answer"
exchanger = rf"{root_path}\extract.yaml" exchanger = rf"{root_path}\extract.yaml"
id_path = rf"{root_path}\id.yaml" id_path = rf"{root_path}\id.yaml"
db_host = '119.91.19.171' # ip test_suffix = "yaml"
db_port = 3306 # 端口
db_user = 'root' # 用户名 db_host = os.getenv("DB_HOST") # ip
db_password = 'mysql_hNahSe' # 密码 db_port = os.getenv("DB_PORT") # 端口
db_database = 'answer' # db_user = os.getenv("DB_USER") # 用户
db_password = os.getenv("DB_PASSWORD") # 密码
db_database = os.getenv("DB_DATABASE")
allure_epic: str = "项目名称answer" allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature" allure_feature: str = "默认特征feature"
@@ -31,6 +37,6 @@ allure_story: str = "默认事件story"
rsa_public = "" rsa_public = ""
rsa_private = "" rsa_private = ""
if __name__ == '__main__': if __name__ == '__main__':
print(root_path) print(root_path)
print(base_url,db_host,db_port,db_user,db_password,db_database)

View File

@@ -13,7 +13,6 @@ import copy
import logging import logging
import re import re
import string import string
import inspect
from commons.funcs import Funcs from commons.funcs import Funcs
@@ -26,12 +25,6 @@ class Template(string.Template):
2参数也可以是变量 2参数也可以是变量
""" """
# FUNC_MAPPING = {
# "int": int,
# "float": float,
# "bool": bool
# } # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}") call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
def render(self, mapping: dict) -> str: def render(self, mapping: dict) -> str:
@@ -68,28 +61,3 @@ class Template(string.Template):
return self.call_pattern.sub(convert, template) return self.call_pattern.sub(convert, template)
# def hot_load():
# from commons import funcs
#
# for func_name in dir(funcs): # 遍历模块中的所有函数
# if func_name.startswith("_"):
# continue
# func_code = getattr(funcs, func_name) # 取到函数对象
# # print(func_code)
# if callable(func_code): # 如果是一个可以调用的函数
# Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# print(Template.FUNC_MAPPING)
# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数
# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# # print(Template.FUNC_MAPPING)
# def hot_load():
# from commons.funcs import Funcs
#
# print(Funcs.FUNC_MAPPING)
# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数
# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# # print(Template.FUNC_MAPPING)
#
# hot_load()

View File

@@ -5,7 +5,7 @@ import pytest
from commons.cases import TestAPI from commons.cases import TestAPI
TestAPI.find_yaml_case() # 加载yaml文件 TestAPI.run() # 加载yaml文件
if __name__ == '__main__': if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

View File

@@ -16,6 +16,8 @@ jsonpath = "^0.82.2"
pymysql = "^1.1.1" pymysql = "^1.1.1"
pytest-result-log = "^1.2.2" pytest-result-log = "^1.2.2"
allure-pytest = "^2.13.5" allure-pytest = "^2.13.5"
cryptography = "^44.0.2"
dotenv = "^0.9.9"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"

View File

@@ -3,7 +3,7 @@ addopts = -q --show-capture=no
log_file = logs/pytest.log log_file = logs/pytest.log
log_file_level = info log_file_level = debug
log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s
log_file_date_format = %m/%d/%Y %I:%M:%S %p log_file_date_format = %m/%d/%Y %I:%M:%S %p

View File

@@ -27,7 +27,8 @@ class CaseValidator:
@classmethod @classmethod
def assert_all(cls, validate: dict): def assert_all(cls, validate: dict):
if not validate:
return
for assert_type, cases in validate.items(): for assert_type, cases in validate.items():
logger.info(f"键:{assert_type},值:{cases}") logger.info(f"键:{assert_type},值:{cases}")
validator = cls.VALIDATORS.get(assert_type) validator = cls.VALIDATORS.get(assert_type)
@@ -76,4 +77,4 @@ if __name__ == '__main__':
case_validator = CaseValidator() case_validator = CaseValidator()
print(case_validator.VALIDATORS) print(case_validator.VALIDATORS)
# case_validator.assert_all(mock_case.get("validate")) case_validator.assert_all(mock_case.get("validate"))

View File

@@ -11,91 +11,81 @@
""" """
from pathlib import Path from pathlib import Path
from commons.models import CaseInfo
from commons.templates import Template from commons.templates import Template
from commons.file_processors.yaml_processor import StringOrDict from commons.file_processors.file_handle import FileHandle
class DataDriver: class DataDriver:
@staticmethod @staticmethod
def generate_cases(file_name, case_info) -> list: def generate_cases(file_name, case_info) -> dict:
if not case_info.get("parametrize"): if not case_info.get("parametrize"):
return {file_name + "[--]": case_info} return {file_name + "[--]": case_info}
# cases = [] cases = {}
args_names = case_info.get("parametrize")[0] args_names = case_info.get("parametrize")[0]
for i, args_values in enumerate(case_info.get("parametrize")[1:]): for i, args_values in enumerate(case_info.get("parametrize")[1:]):
# print(args_values) # print(args_values)
context = dict(zip(args_names, args_values)) context = dict(zip(args_names, args_values))
# print(context) # print(context)
# rendered = Template(CaseInfo(**case_info).to_yaml()).render(context) rendered = Template(FileHandle.to_string(case_info)).render(context)
rendered = Template(StringOrDict.to_string(case_info)).render(context) cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
# cases.append({file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)})
yield {file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)} return cases
# return cases
if __name__ == '__main__': if __name__ == '__main__':
from commons.file_processors.yaml_processor import YamlFile
file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml") file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml")
file_obj = YamlFile(file_path) file_obj = FileHandle(file_path)
# print(file_path.stem) print(file_path.stem)
file_name = file_path.stem file_name_ = file_path.stem
mock_case_info = { # mock_case_info = {
"case_info0": { # "case_info0": {
"feature": "页面状态", # "feature": "页面状态",
"story": "状态", # "story": "状态",
"title": "查询状态信息", # "title": "查询状态信息",
"request": "", # "request": "",
"extract": "", # "extract": "",
"validate": "", # "validate": "",
"parametrize": [["title", "username", "password", "msg"], ["测试1", "user1", "pass1", "200"], # "parametrize": [["title", "username", "password", "msg"], ["测试1", "user1", "pass1", "200"],
["测试2", "user2", "pass2", "300"]] # ["测试2", "user2", "pass2", "300"]]
}, # },
"case_info1": { # "case_info1": {
"feature": "页面状态", # "feature": "页面状态",
"story": "状态", # "story": "状态",
"title": "查询状态信息", # "title": "查询状态信息",
"request": "", # "request": "",
"extract": "", # "extract": "",
"validate": "", # "validate": "",
"parametrize": [1, 2, 3] # "parametrize": [1, 2, 3]
}, # },
"case_info2": { # "case_info2": {
"feature": "页面状态", # "feature": "页面状态",
"story": "状态", # "story": "状态",
"title": "查询状态信息", # "title": "查询状态信息",
"request": "", # "request": "",
"extract": "", # "extract": "",
"validate": "", # "validate": "",
"parametrize": [1, 2, 3] # "parametrize": [1, 2, 3]
} # }
#
} # }
dd = DataDriver() dd = DataDriver()
# cases = dd.generate_cases(mock_case_info.get("case_info0")) # cases = dd.generate_cases(mock_case_info.get("case_info0"))
cases = dd.generate_cases(file_name, file_obj) cases_ = dd.generate_cases(file_name_, file_obj)
# print(cases) print(cases_)
# print(len(cases)) case_keys = list(cases_.keys())
keys_list = [] case_values = cases_.values()
titles = []
for item in cases:
print(item)
# 遍历列表中的每个字典
for key, value in item.items():
print(f"key:{key}")
keys_list.append(key)
print(f"value:{value}")
# # 遍历内层字典(这里内层字典其实只有一个键值对)
titles.append(value['title'])
print(item)
print(keys_list) print(case_keys)
print(titles) print(case_values)
aa = [i.get("title") for i in case_values]
print(aa)
# print(list(case_values)[0]["feature"])
print(file_obj["feature"])
# print(list(case_values)[0]["story"])
print(file_obj["story"])
# ddt_title = [data.title for data in ddt_data]
# logger.info(f"{ddt_title=}")