3 Commits

Author SHA1 Message Date
ffbde1158b feat,fix(): 优化
- 优化 settings(使用环境变量)
- 修复bug
2025-03-19 16:53:21 +08:00
a50e00a4e1 refactor(): 优化文件读取,变量替换等
- 优化用例加载模块器
- 新增JSON文件读取模块
2025-03-09 17:23:25 +08:00
914b0301ba feat,fix(): 优化项目
- 优化yaml_processor(优化文件类型转换逻辑)
- 修复bug
2025-03-07 17:28:41 +08:00
17 changed files with 414 additions and 330 deletions

3
.gitignore vendored
View File

@@ -5,4 +5,5 @@ poetry.lock
.pytest_cache/ .pytest_cache/
report/ report/
temp/ temp/
logs/ logs/
.env

View File

@@ -0,0 +1,65 @@
{
"epic": "项目名称answer",
"feature": "页面状态",
"story": "状态",
"title": "查询状态信息",
"request": {
"method": "get",
"url": "/answer/api/v1/connector/info",
"headers": {
"Host": "119.91.19.171:40065",
"Accept-Language": "en_US",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
"Referer": "http://119.91.19.171:40065/users/login",
"Accept-Encoding": "gzip, deflate"
}
},
"extract": {
"msg": [
"json",
"$.msg",
0
]
},
"validate": {
"equals": {
"状态码等于200": [
"Success.",
"Success."
]
}
},
"parametrize": [
[
"title",
"username",
"password",
"msg"
],
[
"测试1",
"user1",
"pass1",
"200"
],
[
"测试2",
"user2",
"pass2",
"300"
],
[
"测试3",
"user3",
"pass3",
"200"
],
[
"测试4",
"user4",
"pass4",
"200"
]
]
}

View File

@@ -16,105 +16,106 @@ import allure
import pytest import pytest
from commons import settings from commons import settings
from commons.file_processors.yaml_processor import YamlFile from commons.file_processors.file_handle import FileHandle
from commons.models import CaseInfo from commons.models import CaseInfo
from commons.session import Session from commons.session import Session
from commons.exchange import Exchange from commons.exchange import Exchange
from utils import data_driver from utils import data_driver, case_validator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
session = Session(settings.base_url) session = Session(settings.base_url)
_case_path = Path(settings.case_path) cases_dir = Path(settings.cases_dir)
exchanger = Exchange(settings.exchanger) exchanger = Exchange(settings.exchanger)
@allure.epic("项目名称answer")
class TestAPI: class TestAPI:
@classmethod @classmethod
def find_yaml_case(cls, case_path: Path = _case_path): def find_test_cases(cls, case_dir: Path = cases_dir):
""" """
搜索和加载yaml文件 搜索和加载yaml文件
:return: :return:
""" """
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件 case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for yaml_path in yaml_path_list: for case_path in case_path_list:
logger.info(f"加载文件:{yaml_path}") logger.info(f"加载文件:{case_path}")
file = YamlFile(yaml_path) # 自动读取yaml文件 file = FileHandle(case_path) # 自动读取yaml文件
try:
case_info = CaseInfo(**file) # 校验yaml格式 CaseInfo(**file) # 校验用例格式
logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
logger.info(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志 case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# case_info = {yaml_path.stem:case_info} # print(case_path.stem)
# logger.info(f"case_info_dict={case_info}") setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
case_func = cls.new_case(yaml_path.stem, file) # 从yaml格式转换为pytest格式 except Exception as e:
print(yaml_path.stem) logger.error(e)
setattr(cls, f"{yaml_path.stem}", case_func) # 把pytest格式添加到类中
@classmethod @classmethod
def new_case(cls,file_name, case_info: dict): def new_case(cls, file_name, case_info: dict):
case_data = data_driver.DataDriver().generate_cases(file_name,case_info) test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
# ddt_data = case_info.ddt()
keys_list = ['test_1_user[0]', 'test_1_user[1]', 'test_1_user[2]', 'test_1_user[3]']
titles = ['查询用户信息', '查询用户信息', '查询用户信息', '查询用户信息']
for item in case_data:
# 遍历列表中的每个字典
for key, value in item.items():
print(f"key:{key}")
keys_list.append(key)
print(f"value:{value}")
# # 遍历内层字典(这里内层字典其实只有一个键值对)
titles.append(value['title'])
print(f"测试数据:{case_data}")
item={'test_1_user[0]': {'feature': '特征', 'story': '事件', 'title': '查询用户信息', 'request': {'method': 'get', 'url': 'http://119.91.19.171:40065/answer/api/v1/connector/info', 'headers': {'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0'}}, 'extract': {'code': ['json', '$.code', 0], 'msg': ['json', '$.msg', 0]}, 'validate': {'equals': {'状态码等于200': [200, 'code1']}, 'not_equals': {'状态码不等于404': [404, 'code1']}, 'contains': {'包含关系': [404, 'code1']}, 'not_contains': {'不包含关系': [404, 'code1']}}, 'parametrize': [['title', 'username', 'password', 'code'], ['测试1', 'user1', 'pass1', 'code1'], ['测试2', 'user2', 'pass2', 'code2'], ['测试3', 'user3', 'pass3', 'code3'], ['测试4', 'user4', 'pass4', 'code4']]}}
# ddt_title = [data.title for data in ddt_data]
# logger.info(f"{ddt_title=}")
logger.info(f"keys_list={keys_list}")
logger.info(f"titles={titles}")
logger.info(f"feature={case_info.get('feature')}")
logger.info(f"story={case_info.get('story')}")
@allure.feature(case_info.get("feature"))
@allure.story(case_info.get("story"))
@pytest.mark.parametrize("case_key", keys_list, ids=titles)
def test_func(self, case_key):
logger.info(f"case_key={case_key}")
item.get(case_key)
logger.info(f"========:{item}")
logger.info(f"========:{item.get(case_key)}")
allure.dynamic.title(case_info.get("title"))
logger.info(f"用例开始执行:{case_info.get('title')}".center(80, "=")) keys_list = list(test_case.keys())
logger.info(f"keys_list{keys_list}")
values_list = list(test_case.values())
logger.info(f"测试用例列表:{values_list}")
driver_title = [i.get("title") for i in values_list]
logger.info(f"driver_title={driver_title}")
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
@pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
def test_func(self, case_key):
logger.info(f"case_key{case_key}")
test_case_mapping = test_case.get(case_key)
logger.info(f"测试用例:{test_case_mapping}")
allure.dynamic.title(test_case_mapping.get("title"))
logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
# 0变量替换 # 0变量替换
new_case_info = exchanger.replace(case_info) new_case_info = exchanger.replace(test_case_mapping)
logger.info(f"1正在注入变量...") logger.info(f"1正在注入变量...")
logger.info(f"new_case_info={new_case_info}") logger.info(f"new_case_info{new_case_info}")
# 1发送请求 # 1发送请求
logger.info(f"2正在请求接口...") logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request")) resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...") logger.info(f"3正在提取变量...")
# 2保存变量(接口关联) # 2保存变量(接口关联)
new_case_info = CaseInfo(**new_case_info) for var_name, extract_info in new_case_info.get("extract").items():
for var_name, extract_info in new_case_info.extract.items(): logger.info(f"保存变量:{var_name}{extract_info}")
# logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info) exchanger.extract(resp, var_name, *extract_info)
# 3断言 # 3断言
logger.info(f"4正在断言...") logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(case_info) # 为断言加载变量 assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}") logger.info(f"替换变量后:{assert_case_info}")
assert_case_info.assert_all() # 执行断言 # assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{case_info.title}".center(80, "=")) logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
return test_func return test_func
# TestAPI.find_yaml_case() # TestAPI.find_yaml_case()
if __name__ == '__main__': if __name__ == '__main__':
TestAPI.find_yaml_case() TestAPI.find_test_cases()
# print(TestAPI.__dict__) # print(TestAPI.__dict__)

View File

@@ -13,21 +13,19 @@ import copy
import json import json
import logging import logging
import re import re
import jsonpath
import allure import allure
from commons.templates import Template from commons.templates import Template
import jsonpath from commons.file_processors.file_handle import FileHandle
from commons.file_processors.yaml_processor import YamlFile,StringOrDict
from commons.models import CaseInfo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Exchange: class Exchange:
def __init__(self, path): def __init__(self, path):
self.file = YamlFile(path) self.file = FileHandle(path)
@allure.step("提取变量") @allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index): def extract(self, resp, var_name, attr, expr: str, index):
@@ -59,31 +57,20 @@ class Exchange:
self.file[var_name] = value # 保存变量 self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件 self.file.save() # 持久化存储到文件
# @allure.step("替换变量")
# def replace(self, case_info: CaseInfo):
# logger.info(case_info)
# # 1将case_info转换为字符串
# case_info_str = case_info.to_yaml()
# print(f"{case_info_str=}")
# # 2替换字符串
# case_info_str = Template(case_info_str).render(self.file)
# print(f"{case_info_str=}")
# # 3将字符串转换成case_info
# new_case_info = case_info.by_yaml(case_info_str)
# return new_case_info
@allure.step("替换变量") @allure.step("替换变量")
def replace(self, case_info: dict): def replace(self, case_info: dict) -> dict:
logger.info(case_info) logger.info(f"变量替换:{case_info}")
# 1将case_info转换为字符串 # 1将case_info转换为字符串
case_info_str = StringOrDict().to_string(case_info) case_info_str = FileHandle.to_string(case_info)
print(f"{case_info_str=}") print(f"{case_info_str=}")
# 2替换字符串 # 2替换字符串
case_info_str = Template(case_info_str).render(self.file) case_info_str = Template(case_info_str).render(self.file)
print(f"{case_info_str=}") print(f"{case_info_str=}")
# 3将字符串转换成case_info # 3将字符串转换成case_info
new_case_info = StringOrDict().to_dict(case_info_str) new_case_info = FileHandle.to_dict(case_info_str)
return new_case_info return new_case_info
if __name__ == '__main__': if __name__ == '__main__':
class MockResponse: class MockResponse:
text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}' text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}'
@@ -101,14 +88,24 @@ if __name__ == '__main__':
exchanger.extract(mock_resp, "age", "json", '$.age', 0) exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0) exchanger.extract(mock_resp, "data", "json", '$.data', 0)
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0) exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
mock_case_info = CaseInfo( # mock_case_info = CaseInfo(
title="单元测试", # title="单元测试",
request={ # request={
# "data":
# {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
# },
# extract={},
# validate={}
# )
mock_case_info = {
"title": "单元测试",
"request": {
"data": "data":
{"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"} {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
}, },
extract={}, "extract": {},
validate={} "validate": {}
) }
new_mock_case_info = exchanger.replace(mock_case_info) new_mock_case_info = exchanger.replace(mock_case_info)
print(new_mock_case_info) print(new_mock_case_info)

View File

@@ -23,13 +23,15 @@ class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
"""加载.""" """加载."""
pass pass
@staticmethod
@abc.abstractmethod @abc.abstractmethod
def to_string(self) -> str: def to_string(data: dict) -> str:
"""将文件内容转换为字符串。""" """将文件内容转换为字符串。"""
pass pass
@staticmethod
@abc.abstractmethod @abc.abstractmethod
def to_dict(self, data: str) -> dict: def to_dict(data: str) -> dict:
"""将文件内容转换为字典。""" """将文件内容转换为字典。"""
pass pass

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: file_handle
@date: 2025/3/7 09:31
@desc:
"""
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
processors = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
def get_processor(ext):
agent_model = processors.get(ext, YamlProcessor) # 代理模式
return agent_model # 默认回退到 Yaml
FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = FileHandle(yaml_path)
print(yaml_file)
print(type(yaml_file))
file_string = FileHandle.to_string(yaml_file)
print(file_string)
file_dict = FileHandle.to_dict(file_string)
print(file_dict)

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: yaml_processor
@date: 2025/3/4 17:28
@desc:
"""
import logging
from typing import Union
from pathlib import Path
import json
from commons.file_processors.base import BaseFileProcessor
logger = logging.getLogger(__name__)
class JsonProcessor(BaseFileProcessor, dict):
"""
用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。
"""
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
"""
初始化 YamlFile 对象。
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
"""
super().__init__() # 初始化父类 dict
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
def load(self) -> None:
"""
从 YAML 文件加载数据并更新字典。
如果文件不存在或加载失败,则清空字典并记录警告/错误。
"""
self.clear() # 清空现有数据
if self.filepath.exists():
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = json.load(f) or {}
self.update(loaded_data) # 使用加载的数据更新字典
except json.JSONDecodeError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
@staticmethod
def to_string(data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return json.dumps(
dict(data), # 使用dict转换为标准的字典
ensure_ascii=False, # 允许非ASCII字符
# indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键
)
except TypeError as e:
logger.error(f"将数据转换为 JSON 字符串时出错: {e}")
return ""
@staticmethod
def to_dict(data: str) -> None:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = json.loads(data) or {}
return loaded_data
except json.JSONDecodeError as e:
logger.error(f"将 JSON 字符串转换为字典时出错: {e}")
def save(self, new_filepath: Union[str, Path, None] = None):
"""
将字典数据 (自身) 保存到 YAML 文件。
Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(
dict(self), # 使用dict转换为标准的字典
f,
ensure_ascii=False, # 允许非ASCII字符
indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键
)
except (TypeError, OSError) as e:
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
if __name__ == '__main__':
# 示例用法
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
json_file = JsonProcessor(json_path)
print(json_file)
print(type(json_file))
json_string = JsonProcessor.to_string(json_file)
JsonProcessor.to_dict(json_string)
print(json_string)
json_file.save()

View File

@@ -19,7 +19,7 @@ from commons.file_processors.base import BaseFileProcessor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class YamlFile(BaseFileProcessor, dict): class YamlProcessor(BaseFileProcessor, dict):
""" """
用于处理 YAML 文件的类,继承自 dict。 用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
@@ -60,7 +60,8 @@ class YamlFile(BaseFileProcessor, dict):
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear) # 保持字典为空 (已在开头 clear)
def to_string(self) -> str: @staticmethod
def to_string(data: dict) -> str:
""" """
将字典 (自身) 转换为 YAML 格式的字符串。 将字典 (自身) 转换为 YAML 格式的字符串。
@@ -69,7 +70,7 @@ class YamlFile(BaseFileProcessor, dict):
""" """
try: try:
return yaml.safe_dump( return yaml.safe_dump(
dict(self), # 使用dict转换为标准的字典 dict(data), # 使用dict转换为标准的字典
allow_unicode=True, allow_unicode=True,
sort_keys=False, sort_keys=False,
default_flow_style=False default_flow_style=False
@@ -78,7 +79,8 @@ class YamlFile(BaseFileProcessor, dict):
logger.error(f"将数据转换为 YAML 字符串时出错: {e}") logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return "" return ""
def to_dict(self, data: str) -> None: @staticmethod
def to_dict(data: str) -> Union[None, dict]:
""" """
将 YAML 格式的字符串转换为字典,并更新当前字典的内容. 将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
@@ -87,11 +89,9 @@ class YamlFile(BaseFileProcessor, dict):
""" """
try: try:
loaded_data = yaml.safe_load(data) or {} loaded_data = yaml.safe_load(data) or {}
self.clear() return loaded_data
self.update(loaded_data) # 清空并更新
except yaml.YAMLError as e: except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}") logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
self.clear() # 出错时也清空
def save(self, new_filepath: Union[str, Path, None] = None): def save(self, new_filepath: Union[str, Path, None] = None):
""" """
@@ -115,45 +115,13 @@ class YamlFile(BaseFileProcessor, dict):
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
class StringOrDict:
@classmethod
def to_string(cls, data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
dict(data), # 使用dict转换为标准的字典
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
@classmethod
def to_dict(cls, data: str) -> Union[None, dict]:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
return loaded_data
except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
if __name__ == '__main__': if __name__ == '__main__':
# 示例用法 # 示例用法
yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' # 你的 YAML 文件路径 yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlFile(yaml_path) yaml_file = YamlProcessor(yaml_path)
print(yaml_file) print(yaml_file)
print(type(yaml_file)) print(type(yaml_file))
@@ -198,14 +166,3 @@ if __name__ == '__main__':
# non_existent_file['a'] = 1 # 可以直接添加 # non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file) # print("\n加载不存在的文件:", non_existent_file)
# if __name__ == '__main__':
# from commons.models import CaseInfo
#
# yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml'
# yaml_file = YamlFile(yaml_path)
# print(yaml_file.load())
# # yaml_file.load()
# # case_info = CaseInfo(**yaml_file)
# # print(case_info)
# # yaml_file["title"] = "查询用户信息"
# # yaml_file.save()

View File

@@ -15,9 +15,9 @@ import time
import urllib.parse import urllib.parse
import hashlib import hashlib
# from commons.databases import db from commons.databases import db
from commons.files import YamlFile from commons.file_processors.file_handle import FileHandle
from commons import settings from commons import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -66,7 +66,7 @@ def sql(s: str) -> str:
@Funcs.register("new_id") @Funcs.register("new_id")
def new_id(): def new_id():
# 自增,永不重复 # 自增,永不重复
id_file = YamlFile(settings.id_path) id_file = FileHandle(settings.id_path)
id_file["id"] += 1 id_file["id"] += 1
id_file.save() id_file.save()
@@ -76,7 +76,7 @@ def new_id():
def last_id() -> str: def last_id() -> str:
# 不自增,只返回结果 # 不自增,只返回结果
id_file = YamlFile(settings.id_path) id_file = FileHandle(settings.id_path)
return id_file["id"] return id_file["id"]
@Funcs.register("md5") @Funcs.register("md5")

View File

@@ -10,14 +10,11 @@
@desc: 声明yaml用例格式 @desc: 声明yaml用例格式
""" """
import logging import logging
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, field
import allure
import yaml import yaml
from commons.templates import Template
from commons import settings from commons import settings
from utils import case_validator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -33,80 +30,10 @@ class CaseInfo:
feature: str = settings.allure_feature feature: str = settings.allure_feature
story: str = settings.allure_story story: str = settings.allure_story
def to_yaml(self) -> str:
# 序列化成yaml字符串
yaml_str = yaml.safe_dump(
asdict(self),
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False)
return yaml_str
@classmethod
def by_yaml(cls, yaml_str):
# 反序列化
obj = cls(**yaml.safe_load(yaml_str))
return obj
@allure.step("断言")
def assert_all(self):
_validator = case_validator.CaseValidator()
# print(case_validator.VALIDATORS)
if not self.validate:
return
_validator.assert_all(self.validate)
# for assert_type, assert_value in self.validate.items():
# for msg, data in assert_value.items():
# a, b = data[0], data[1]
# # print(assert_type, a, b, msg)
# match assert_type:
# case 'equals':
# logger.info(f"assert {a} == {b}, {msg}")
# assert a == b, msg
# case 'not_equals':
# logger.info(f"assert {a} != {b}, {msg}")
# assert a != b, msg
# case 'contains':
# logger.info(f"assert {a} in {b}, {msg}")
# assert a in b, msg
# case 'not_contains':
# logger.info(f"assert {a} not in {b}, {msg}")
# assert a not in b, msg
# case "xxxxx
def ddt(self) -> list: # 返回一个列表列表中应该包含N个注入了变量的caseInfo
case_list = []
if not self.parametrize: # 没有使用数据驱动测试
logger.info("1执行这一步")
# case_info_str = self.to_yaml() # 转字符串
# case_info_str = Template(case_info_str).render(d) # 输入变量
# case_info = self.by_yaml(case_info_str) # 转成类
# case_list.append(case_info)
case_list.append(self)
else: # 使用数据驱动测试
args_name = self.parametrize[0]
args_value_list = self.parametrize[1:]
for args_value in args_value_list:
d = dict(zip(args_name, args_value))
print(f"D的值{d}")
# d 就是数据驱动测试的变量,应输入到用例中
case_info_str = self.to_yaml() # 转字符串
case_info_str = Template(case_info_str).render(d) # 输入变量
case_info = self.by_yaml(case_info_str) # 转成类
case_list.append(case_info) # 加入到返回值
return case_list
if __name__ == '__main__': if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f: with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
# print(data) # print(data)
case_info = CaseInfo(**data) case_info = CaseInfo(**data)
s = case_info.to_yaml()
# print(s)
new_case_info = case_info.by_yaml(s)
# print(new_case_info)
ddt_ddt = case_info.ddt()
print(ddt_ddt)

View File

@@ -9,14 +9,16 @@
@date: 2024 2024/9/12 21:56 @date: 2024 2024/9/12 21:56
@desc: @desc:
""" """
from urllib.parse import urljoin
import logging import logging
from urllib.parse import urljoin
import requests import requests
import allure
from requests import Response, PreparedRequest from requests import Response, PreparedRequest
import allure
logger = logging.getLogger("requests.session") # logger = logging.getLogger("requests.session")
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)

View File

@@ -10,19 +10,23 @@
@desc: @desc:
""" """
from pathlib import Path from pathlib import Path
import os
from dotenv import load_dotenv
load_dotenv()
root_path = (Path(__file__)).resolve().parents[1] root_path = (Path(__file__)).resolve().parents[1]
base_url = 'http://119.91.19.171:40065' base_url = os.getenv("BASE_URL")
case_path = rf"{root_path}\TestCases\answer" cases_dir = rf"{root_path}\TestCases\answer"
exchanger = rf"{root_path}\extract.yaml" exchanger = rf"{root_path}\extract.yaml"
id_path = rf"{root_path}\id.yaml" id_path = rf"{root_path}\id.yaml"
db_host = '119.91.19.171' # ip db_host = os.getenv("DB_HOST") # ip
db_port = 3306 # 端口 db_port = os.getenv("DB_PORT") # 端口
db_user = 'root' # 用户名 db_user = os.getenv("DB_USER") # 用户名
db_password = 'mysql_hNahSe' # 密码 db_password = os.getenv("DB_PASSWORD") # 密码
db_database = 'answer' # 库名 db_database = os.getenv("DB_DATABASE") # 库名
allure_epic: str = "项目名称answer" allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature" allure_feature: str = "默认特征feature"
@@ -33,4 +37,5 @@ rsa_private = ""
if __name__ == '__main__': if __name__ == '__main__':
print(root_path) print(root_path)
print(base_url,db_host,db_port,db_user,db_password,db_database)

View File

@@ -13,7 +13,6 @@ import copy
import logging import logging
import re import re
import string import string
import inspect
from commons.funcs import Funcs from commons.funcs import Funcs
@@ -26,12 +25,6 @@ class Template(string.Template):
2参数也可以是变量 2参数也可以是变量
""" """
# FUNC_MAPPING = {
# "int": int,
# "float": float,
# "bool": bool
# } # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}") call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
def render(self, mapping: dict) -> str: def render(self, mapping: dict) -> str:
@@ -68,28 +61,3 @@ class Template(string.Template):
return self.call_pattern.sub(convert, template) return self.call_pattern.sub(convert, template)
# def hot_load():
# from commons import funcs
#
# for func_name in dir(funcs): # 遍历模块中的所有函数
# if func_name.startswith("_"):
# continue
# func_code = getattr(funcs, func_name) # 取到函数对象
# # print(func_code)
# if callable(func_code): # 如果是一个可以调用的函数
# Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# print(Template.FUNC_MAPPING)
# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数
# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# # print(Template.FUNC_MAPPING)
# def hot_load():
# from commons.funcs import Funcs
#
# print(Funcs.FUNC_MAPPING)
# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数
# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# # print(Template.FUNC_MAPPING)
#
# hot_load()

View File

@@ -5,7 +5,7 @@ import pytest
from commons.cases import TestAPI from commons.cases import TestAPI
TestAPI.find_yaml_case() # 加载yaml文件 TestAPI.find_test_cases() # 加载yaml文件
if __name__ == '__main__': if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

View File

@@ -16,6 +16,7 @@ jsonpath = "^0.82.2"
pymysql = "^1.1.1" pymysql = "^1.1.1"
pytest-result-log = "^1.2.2" pytest-result-log = "^1.2.2"
allure-pytest = "^2.13.5" allure-pytest = "^2.13.5"
cryptography = "^44.0.2"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"

View File

@@ -27,7 +27,8 @@ class CaseValidator:
@classmethod @classmethod
def assert_all(cls, validate: dict): def assert_all(cls, validate: dict):
if not validate:
return
for assert_type, cases in validate.items(): for assert_type, cases in validate.items():
logger.info(f"键:{assert_type},值:{cases}") logger.info(f"键:{assert_type},值:{cases}")
validator = cls.VALIDATORS.get(assert_type) validator = cls.VALIDATORS.get(assert_type)
@@ -76,4 +77,4 @@ if __name__ == '__main__':
case_validator = CaseValidator() case_validator = CaseValidator()
print(case_validator.VALIDATORS) print(case_validator.VALIDATORS)
# case_validator.assert_all(mock_case.get("validate")) case_validator.assert_all(mock_case.get("validate"))

View File

@@ -11,91 +11,81 @@
""" """
from pathlib import Path from pathlib import Path
from commons.models import CaseInfo
from commons.templates import Template from commons.templates import Template
from commons.file_processors.yaml_processor import StringOrDict from commons.file_processors.file_handle import FileHandle
class DataDriver: class DataDriver:
@staticmethod @staticmethod
def generate_cases(file_name, case_info) -> list: def generate_cases(file_name, case_info) -> dict:
if not case_info.get("parametrize"): if not case_info.get("parametrize"):
return {file_name + "[--]": case_info} return {file_name + "[--]": case_info}
# cases = [] cases = {}
args_names = case_info.get("parametrize")[0] args_names = case_info.get("parametrize")[0]
for i, args_values in enumerate(case_info.get("parametrize")[1:]): for i, args_values in enumerate(case_info.get("parametrize")[1:]):
# print(args_values) # print(args_values)
context = dict(zip(args_names, args_values)) context = dict(zip(args_names, args_values))
# print(context) # print(context)
# rendered = Template(CaseInfo(**case_info).to_yaml()).render(context) rendered = Template(FileHandle.to_string(case_info)).render(context)
rendered = Template(StringOrDict.to_string(case_info)).render(context) cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
# cases.append({file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)})
yield {file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)} return cases
# return cases
if __name__ == '__main__': if __name__ == '__main__':
from commons.file_processors.yaml_processor import YamlFile
file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml") file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml")
file_obj = YamlFile(file_path) file_obj = FileHandle(file_path)
# print(file_path.stem) print(file_path.stem)
file_name = file_path.stem file_name_ = file_path.stem
mock_case_info = { # mock_case_info = {
"case_info0": { # "case_info0": {
"feature": "页面状态", # "feature": "页面状态",
"story": "状态", # "story": "状态",
"title": "查询状态信息", # "title": "查询状态信息",
"request": "", # "request": "",
"extract": "", # "extract": "",
"validate": "", # "validate": "",
"parametrize": [["title", "username", "password", "msg"], ["测试1", "user1", "pass1", "200"], # "parametrize": [["title", "username", "password", "msg"], ["测试1", "user1", "pass1", "200"],
["测试2", "user2", "pass2", "300"]] # ["测试2", "user2", "pass2", "300"]]
}, # },
"case_info1": { # "case_info1": {
"feature": "页面状态", # "feature": "页面状态",
"story": "状态", # "story": "状态",
"title": "查询状态信息", # "title": "查询状态信息",
"request": "", # "request": "",
"extract": "", # "extract": "",
"validate": "", # "validate": "",
"parametrize": [1, 2, 3] # "parametrize": [1, 2, 3]
}, # },
"case_info2": { # "case_info2": {
"feature": "页面状态", # "feature": "页面状态",
"story": "状态", # "story": "状态",
"title": "查询状态信息", # "title": "查询状态信息",
"request": "", # "request": "",
"extract": "", # "extract": "",
"validate": "", # "validate": "",
"parametrize": [1, 2, 3] # "parametrize": [1, 2, 3]
} # }
#
} # }
dd = DataDriver() dd = DataDriver()
# cases = dd.generate_cases(mock_case_info.get("case_info0")) # cases = dd.generate_cases(mock_case_info.get("case_info0"))
cases = dd.generate_cases(file_name, file_obj) cases_ = dd.generate_cases(file_name_, file_obj)
# print(cases) print(cases_)
# print(len(cases)) case_keys = list(cases_.keys())
keys_list = [] case_values = cases_.values()
titles = []
for item in cases:
print(item)
# 遍历列表中的每个字典
for key, value in item.items():
print(f"key:{key}")
keys_list.append(key)
print(f"value:{value}")
# # 遍历内层字典(这里内层字典其实只有一个键值对)
titles.append(value['title'])
print(item)
print(keys_list) print(case_keys)
print(titles) print(case_values)
aa = [i.get("title") for i in case_values]
print(aa)
# print(list(case_values)[0]["feature"])
print(file_obj["feature"])
# print(list(case_values)[0]["story"])
print(file_obj["story"])
# ddt_title = [data.title for data in ddt_data]
# logger.info(f"{ddt_title=}")