refactor(): 优化文件读取,变量替换等

- 优化用例加载模块器
- 新增JSON文件读取模块
This commit is contained in:
2025-03-09 17:23:25 +08:00
parent 914b0301ba
commit a50e00a4e1
15 changed files with 219 additions and 451 deletions

View File

@@ -0,0 +1,65 @@
{
"epic": "项目名称answer",
"feature": "页面状态",
"story": "状态",
"title": "查询状态信息",
"request": {
"method": "get",
"url": "/answer/api/v1/connector/info",
"headers": {
"Host": "119.91.19.171:40065",
"Accept-Language": "en_US",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
"Referer": "http://119.91.19.171:40065/users/login",
"Accept-Encoding": "gzip, deflate"
}
},
"extract": {
"msg": [
"json",
"$.msg",
0
]
},
"validate": {
"equals": {
"状态码等于200": [
"Success.",
"Success."
]
}
},
"parametrize": [
[
"title",
"username",
"password",
"msg"
],
[
"测试1",
"user1",
"pass1",
"200"
],
[
"测试2",
"user2",
"pass2",
"300"
],
[
"测试3",
"user3",
"pass3",
"200"
],
[
"测试4",
"user4",
"pass4",
"200"
]
]
}

View File

@@ -16,7 +16,7 @@ import allure
import pytest
from commons import settings
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.file_handle import FileHandle
from commons.models import CaseInfo
from commons.session import Session
from commons.exchange import Exchange
@@ -26,99 +26,96 @@ logger = logging.getLogger(__name__)
session = Session(settings.base_url)
_case_path = Path(settings.case_path)
cases_dir = Path(settings.cases_dir)
exchanger = Exchange(settings.exchanger)
@allure.epic("项目名称answer")
class TestAPI:
@classmethod
def find_yaml_case(cls, case_path: Path = _case_path):
def find_test_cases(cls, case_dir: Path = cases_dir):
"""
搜索和加载yaml文件
:return:
"""
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for yaml_path in yaml_path_list:
logger.info(f"加载文件:{yaml_path}")
case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for case_path in case_path_list:
logger.info(f"加载文件:{case_path}")
file = YamlProcessor(yaml_path) # 自动读取yaml文件
case_info = CaseInfo(**file) # 校验yaml格式
logger.info(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
# case_info = {yaml_path.stem:case_info}
# logger.info(f"case_info_dict={case_info}")
case_func = cls.new_case(yaml_path.stem, file) # 从yaml格式转换为pytest格式
print(yaml_path.stem)
setattr(cls, f"{yaml_path.stem}", case_func) # 把pytest格式添加到类中
file = FileHandle(case_path) # 自动读取yaml文件
try:
CaseInfo(**file) # 校验用例格式
logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# print(case_path.stem)
setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
except Exception as e:
logger.error(e)
@classmethod
def new_case(cls,file_name, case_info: dict):
case_data = data_driver.DataDriver().generate_cases(file_name,case_info)
# ddt_data = case_info.ddt()
# keys_list = ['test_1_user[0]', 'test_1_user[1]', 'test_1_user[2]', 'test_1_user[3]']
# titles = ['查询用户信息', '查询用户信息', '查询用户信息', '查询用户信息']
keys_list = []
titles = []
for item in case_data:
# 遍历列表中的每个字典
for key, value in item.items():
print(f"key:{key}")
keys_list.append(key)
print(f"value:{value}")
# # 遍历内层字典(这里内层字典其实只有一个键值对)
titles.append(value['title'])
print(f"测试数据:{case_data}")
# item={'test_1_user[0]': {'feature': '特征', 'story': '事件', 'title': '查询用户信息', 'request': {'method': 'get', 'url': 'http://119.91.19.171:40065/answer/api/v1/connector/info', 'headers': {'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0'}}, 'extract': {'code': ['json', '$.code', 0], 'msg': ['json', '$.msg', 0]}, 'validate': {'equals': {'状态码等于200': [200, 'code1']}, 'not_equals': {'状态码不等于404': [404, 'code1']}, 'contains': {'包含关系': [404, 'code1']}, 'not_contains': {'不包含关系': [404, 'code1']}}, 'parametrize': [['title', 'username', 'password', 'code'], ['测试1', 'user1', 'pass1', 'code1'], ['测试2', 'user2', 'pass2', 'code2'], ['测试3', 'user3', 'pass3', 'code3'], ['测试4', 'user4', 'pass4', 'code4']]}}
# ddt_title = [data.title for data in ddt_data]
# logger.info(f"{ddt_title=}")
logger.info(f"keys_list={keys_list}")
logger.info(f"titles={titles}")
logger.info(f"feature={case_info.get('feature')}")
logger.info(f"story={case_info.get('story')}")
@allure.feature(case_info.get("feature"))
@allure.story(case_info.get("story"))
@pytest.mark.parametrize("case_key", keys_list, ids=titles)
def test_func(self, case_key):
logger.info(f"case_key={case_key}")
item.get(case_key)
logger.info(f"========:{item}")
logger.info(f"========:{item.get(case_key)}")
allure.dynamic.title(case_info.get("title"))
def new_case(cls, file_name, case_info: dict):
test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
logger.info(f"用例开始执行:{case_info.get('title')}".center(80, "="))
keys_list = list(test_case.keys())
logger.info(f"keys_list{keys_list}")
values_list = list(test_case.values())
logger.info(f"测试用例列表:{values_list}")
driver_title = [i.get("title") for i in values_list]
logger.info(f"driver_title={driver_title}")
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
@pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
def test_func(self, case_key):
logger.info(f"case_key{case_key}")
test_case_mapping = test_case.get(case_key)
logger.info(f"测试用例:{test_case_mapping}")
allure.dynamic.title(test_case_mapping.get("title"))
logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(case_info)
new_case_info = exchanger.replace(test_case_mapping)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info={new_case_info}")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
# new_case_info = CaseInfo(**new_case_info)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(case_info) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}")
assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{case_info.get('title')}".center(80, "="))
logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
return test_func
# TestAPI.find_yaml_case()
if __name__ == '__main__':
TestAPI.find_yaml_case()
TestAPI.find_test_cases()
# print(TestAPI.__dict__)

View File

@@ -13,21 +13,19 @@ import copy
import json
import logging
import re
import jsonpath
import allure
from commons.templates import Template
import jsonpath
from commons.file_processors.yaml_processor import YamlProcessor, StringOrDict
from commons.models import CaseInfo
from commons.file_processors.file_handle import FileHandle
logger = logging.getLogger(__name__)
class Exchange:
def __init__(self, path):
self.file = YamlProcessor(path)
self.file = FileHandle(path)
@allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index):
@@ -59,29 +57,17 @@ class Exchange:
self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件
# @allure.step("替换变量")
# def replace(self, case_info: CaseInfo):
# logger.info(case_info)
# # 1将case_info转换为字符串
# case_info_str = case_info.to_yaml()
# print(f"{case_info_str=}")
# # 2替换字符串
# case_info_str = Template(case_info_str).render(self.file)
# print(f"{case_info_str=}")
# # 3将字符串转换成case_info
# new_case_info = case_info.by_yaml(case_info_str)
# return new_case_info
@allure.step("替换变量")
def replace(self, case_info: dict) -> dict:
logger.info(case_info)
logger.info(f"变量替换:{case_info}")
# 1将case_info转换为字符串
case_info_str = YamlProcessor.to_string(case_info)
case_info_str = FileHandle.to_string(case_info)
print(f"{case_info_str=}")
# 2替换字符串
case_info_str = Template(case_info_str).render(self.file)
print(f"{case_info_str=}")
# 3将字符串转换成case_info
new_case_info = YamlProcessor.to_dict(case_info_str)
new_case_info = FileHandle.to_dict(case_info_str)
return new_case_info
@@ -97,7 +83,7 @@ if __name__ == '__main__':
# print(mock_resp.text)
# print(mock_resp.json())
exchanger = Exchange(r"D:\CNWei\CNW\InterfaceAutoTest\extract.yaml")
exchanger = Exchange(r"E:\PyP\InterfaceAutoTest\extract.yaml")
exchanger.extract(mock_resp, "name", "json", '$.name', 0)
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0)

View File

@@ -9,38 +9,33 @@
@date: 2025/3/7 09:31
@desc:
"""
from pathlib import Path
from typing import Union
from yaml_processor import YamlProcessor
from json_processor import JsonProcessor
from commons.file_processors.yaml_processor import YamlProcessor
from commons.file_processors.json_processor import JsonProcessor
class FileHandle:
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
# self.data: Union[dict, None] = data
self.processor = get_processor(filepath, data)
processors = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
def load(self) -> None:
self.processor.load()
def to_string(self) -> str:
return self.processor.to_string()
def to_dict(self, data: str) -> None:
self.processor.to_dict(data)
def save(self, new_filepath: Union[str, Path, None] = None):
self.processor.save(new_filepath)
}
def get_processor(filepath, data):
ext = Path(filepath).suffix.lower()[1:] # 获取后缀名,如 'json'
processors = {
'yaml': YamlProcessor,
'yml': YamlProcessor,
'json': JsonProcessor,
}
def get_processor(ext):
agent_model = processors.get(ext, YamlProcessor) # 代理模式
return agent_model(filepath, data) # 默认回退到 Yaml
return agent_model # 默认回退到 Yaml
FileHandle = get_processor("yaml")
if __name__ == '__main__':
# 示例用法
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = FileHandle(yaml_path)
print(yaml_file)
print(type(yaml_file))
file_string = FileHandle.to_string(yaml_file)
print(file_string)
file_dict = FileHandle.to_dict(file_string)
print(file_dict)

View File

@@ -11,9 +11,8 @@
"""
import logging
from typing import Union
from dataclasses import dataclass, asdict, field
from pathlib import Path
import yaml
import json
from commons.file_processors.base import BaseFileProcessor
logger = logging.getLogger(__name__)
@@ -51,16 +50,17 @@ class JsonProcessor(BaseFileProcessor, dict):
if self.filepath.exists():
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f) or {}
loaded_data = json.load(f) or {}
self.update(loaded_data) # 使用加载的数据更新字典
except yaml.YAMLError as e:
except json.JSONDecodeError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
def to_string(self) -> str:
@staticmethod
def to_string(data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
@@ -68,17 +68,18 @@ class JsonProcessor(BaseFileProcessor, dict):
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
dict(self), # 使用dict转换为标准的字典
allow_unicode=True,
sort_keys=False,
default_flow_style=False
return json.dumps(
dict(data), # 使用dict转换为标准的字典
ensure_ascii=False, # 允许非ASCII字符
# indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
logger.error(f"将数据转换为 JSON 字符串时出错: {e}")
return ""
def to_dict(self, data: str) -> None:
@staticmethod
def to_dict(data: str) -> None:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
@@ -86,12 +87,10 @@ class JsonProcessor(BaseFileProcessor, dict):
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
self.clear()
self.update(loaded_data) # 清空并更新
except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
self.clear() # 出错时也清空
loaded_data = json.loads(data) or {}
return loaded_data
except json.JSONDecodeError as e:
logger.error(f"将 JSON 字符串转换为字典时出错: {e}")
def save(self, new_filepath: Union[str, Path, None] = None):
"""
@@ -104,108 +103,24 @@ class JsonProcessor(BaseFileProcessor, dict):
try:
with open(filepath, "w", encoding="utf-8") as f:
yaml.safe_dump(
json.dump(
dict(self), # 使用dict转换为标准的字典
stream=f,
allow_unicode=True,
sort_keys=False,
default_flow_style=False
f,
ensure_ascii=False, # 允许非ASCII字符
indent=4, # 美化输出缩进4个空格
sort_keys=False # 不排序键
)
except (TypeError, OSError) as e:
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
class StringOrDict:
@classmethod
def to_string(cls, data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
dict(data), # 使用dict转换为标准的字典
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
@classmethod
def to_dict(cls, data: str) -> Union[None, dict]:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
return loaded_data
except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
if __name__ == '__main__':
# 示例用法
yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' # 你的 YAML 文件路径
yaml_file = YamlFile(yaml_path)
print(yaml_file)
print(type(yaml_file))
# # 直接像字典一样访问数据
# print("加载的数据:", yaml_file) # 直接打印对象,就是打印字典内容
# print("title:", yaml_file.get("title")) # 使用 get 方法
# if "title" in yaml_file: # 使用 in 检查键
# print("原始title:", yaml_file["title"]) # 使用方括号访问
# yaml_file["title"] = "新的标题" # 使用方括号修改
# print("修改后的title:", yaml_file["title"])
# #
# yaml_file["new_key"] = "new_value" # 添加新的键值对
#
# # 将字典转换为 YAML 字符串
# yaml_string = yaml_file.to_string()
# print("\nYAML 字符串:", yaml_string)
# #
# # 将 YAML 字符串转换回字典 (并更新 yaml_file)
# yaml_file.to_dict(yaml_string)
# print("\n从字符串加载的数据:", yaml_file)
#
# # 保存修改后的数据 (覆盖原文件)
# yaml_file.save()
#
# # 保存到新文件
# new_yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user_new.yaml'
# yaml_file.save(new_filepath=new_yaml_path)
# 测试从字符串初始化
# yaml_string2 = """
# name: Test User
# age: 30
# """
# yaml_file2 = YamlFile("test2.yaml", data=yaml.safe_load(yaml_string2)) # 从字符串初始化
# print("\n从字符串初始化的 YamlFile:", yaml_file2)
# yaml_file2.save() # 保存到 test2.yaml
#
# 测试文件不存在的情形
# non_existent_file = YamlFile("non_existent_file.yaml")
# print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {}
# non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file)
# if __name__ == '__main__':
# from commons.models import CaseInfo
#
# yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml'
# yaml_file = YamlFile(yaml_path)
# print(yaml_file.load())
# # yaml_file.load()
# # case_info = CaseInfo(**yaml_file)
# # print(case_info)
# # yaml_file["title"] = "查询用户信息"
# # yaml_file.save()
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
json_file = JsonProcessor(json_path)
print(json_file)
print(type(json_file))
json_string = JsonProcessor.to_string(json_file)
JsonProcessor.to_dict(json_string)
print(json_string)
json_file.save()

View File

@@ -60,38 +60,6 @@ class YamlProcessor(BaseFileProcessor, dict):
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
# def to_string(self) -> str:
# """
# 将字典 (自身) 转换为 YAML 格式的字符串。
#
# Returns:
# YAML 格式的字符串。
# """
# try:
# return yaml.safe_dump(
# dict(self), # 使用dict转换为标准的字典
# allow_unicode=True,
# sort_keys=False,
# default_flow_style=False
# )
# except TypeError as e:
# logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
# return ""
# def to_dict(self, data: str) -> None:
# """
# 将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
#
# Args:
# data: YAML 格式的字符串。
# """
# try:
# loaded_data = yaml.safe_load(data) or {}
# self.clear()
# self.update(loaded_data) # 清空并更新
# except yaml.YAMLError as e:
# logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
# self.clear() # 出错时也清空
@staticmethod
def to_string(data: dict) -> str:
"""
@@ -147,48 +115,12 @@ class YamlProcessor(BaseFileProcessor, dict):
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
class StringOrDict:
# @classmethod
# def to_string(cls, data: dict) -> str:
@staticmethod
def to_string(data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
dict(data), # 使用dict转换为标准的字典
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
# @classmethod
# def to_dict(cls, data: str) -> Union[None, dict]:
@staticmethod
def to_dict(data: str) -> Union[None, dict]:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
return loaded_data
except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
if __name__ == '__main__':
# 示例用法
yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' # 你的 YAML 文件路径
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
yaml_file = YamlProcessor(yaml_path)
print(yaml_file)
print(type(yaml_file))
@@ -234,14 +166,3 @@ if __name__ == '__main__':
# non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file)
# if __name__ == '__main__':
# from commons.models import CaseInfo
#
# yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml'
# yaml_file = YamlFile(yaml_path)
# print(yaml_file.load())
# # yaml_file.load()
# # case_info = CaseInfo(**yaml_file)
# # print(case_info)
# # yaml_file["title"] = "查询用户信息"
# # yaml_file.save()

View File

@@ -15,9 +15,9 @@ import time
import urllib.parse
import hashlib
# from commons.databases import db
from commons.databases import db
from commons.files import YamlFile
from commons.file_processors.file_handle import FileHandle
from commons import settings
logger = logging.getLogger(__name__)
@@ -66,7 +66,7 @@ def sql(s: str) -> str:
@Funcs.register("new_id")
def new_id():
# 自增,永不重复
id_file = YamlFile(settings.id_path)
id_file = FileHandle(settings.id_path)
id_file["id"] += 1
id_file.save()
@@ -76,7 +76,7 @@ def new_id():
def last_id() -> str:
# 不自增,只返回结果
id_file = YamlFile(settings.id_path)
id_file = FileHandle(settings.id_path)
return id_file["id"]
@Funcs.register("md5")

View File

@@ -10,14 +10,11 @@
@desc: 声明yaml用例格式
"""
import logging
from dataclasses import dataclass, asdict, field
from dataclasses import dataclass, field
import allure
import yaml
from commons.templates import Template
from commons import settings
from utils import case_validator
logger = logging.getLogger(__name__)
@@ -33,79 +30,10 @@ class CaseInfo:
feature: str = settings.allure_feature
story: str = settings.allure_story
def to_yaml(self) -> str:
# 序列化成yaml字符串
yaml_str = yaml.safe_dump(
asdict(self),
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False)
return yaml_str
@classmethod
def by_yaml(cls, yaml_str):
# 反序列化
obj = cls(**yaml.safe_load(yaml_str))
return obj
# @allure.step("断言")
# def assert_all(self):
# _validator = case_validator.CaseValidator()
# # print(case_validator.VALIDATORS)
#
#
#
# _validator.assert_all(self.validate)
# for assert_type, assert_value in self.validate.items():
# for msg, data in assert_value.items():
# a, b = data[0], data[1]
# # print(assert_type, a, b, msg)
# match assert_type:
# case 'equals':
# logger.info(f"assert {a} == {b}, {msg}")
# assert a == b, msg
# case 'not_equals':
# logger.info(f"assert {a} != {b}, {msg}")
# assert a != b, msg
# case 'contains':
# logger.info(f"assert {a} in {b}, {msg}")
# assert a in b, msg
# case 'not_contains':
# logger.info(f"assert {a} not in {b}, {msg}")
# assert a not in b, msg
# case "xxxxx
# def ddt(self) -> list: # 返回一个列表列表中应该包含N个注入了变量的caseInfo
# case_list = []
# if not self.parametrize: # 没有使用数据驱动测试
# logger.info("1执行这一步")
# # case_info_str = self.to_yaml() # 转字符串
# # case_info_str = Template(case_info_str).render(d) # 输入变量
# # case_info = self.by_yaml(case_info_str) # 转成类
# # case_list.append(case_info)
# case_list.append(self)
# else: # 使用数据驱动测试
# args_name = self.parametrize[0]
# args_value_list = self.parametrize[1:]
# for args_value in args_value_list:
# d = dict(zip(args_name, args_value))
# print(f"D的值{d}")
# # d 就是数据驱动测试的变量,应输入到用例中
# case_info_str = self.to_yaml() # 转字符串
# case_info_str = Template(case_info_str).render(d) # 输入变量
# case_info = self.by_yaml(case_info_str) # 转成类
#
# case_list.append(case_info) # 加入到返回值
# return case_list
if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
# print(data)
case_info = CaseInfo(**data)
s = case_info.to_yaml()
# print(s)
new_case_info = case_info.by_yaml(s)
# print(new_case_info)
ddt_ddt = case_info.ddt()
print(ddt_ddt)

View File

@@ -9,14 +9,16 @@
@date: 2024 2024/9/12 21:56
@desc:
"""
from urllib.parse import urljoin
import logging
from urllib.parse import urljoin
import requests
import allure
from requests import Response, PreparedRequest
import allure
logger = logging.getLogger("requests.session")
# logger = logging.getLogger("requests.session")
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

View File

@@ -14,7 +14,7 @@ from pathlib import Path
root_path = (Path(__file__)).resolve().parents[1]
base_url = 'http://119.91.19.171:40065'
case_path = rf"{root_path}\TestCases\answer"
cases_dir = rf"{root_path}\TestCases\answer"
exchanger = rf"{root_path}\extract.yaml"
id_path = rf"{root_path}\id.yaml"

View File

@@ -13,7 +13,6 @@ import copy
import logging
import re
import string
import inspect
from commons.funcs import Funcs
@@ -26,12 +25,6 @@ class Template(string.Template):
2参数也可以是变量
"""
# FUNC_MAPPING = {
# "int": int,
# "float": float,
# "bool": bool
# } # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
def render(self, mapping: dict) -> str:
@@ -68,28 +61,3 @@ class Template(string.Template):
return self.call_pattern.sub(convert, template)
# def hot_load():
# from commons import funcs
#
# for func_name in dir(funcs): # 遍历模块中的所有函数
# if func_name.startswith("_"):
# continue
# func_code = getattr(funcs, func_name) # 取到函数对象
# # print(func_code)
# if callable(func_code): # 如果是一个可以调用的函数
# Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# print(Template.FUNC_MAPPING)
# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数
# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# # print(Template.FUNC_MAPPING)
# def hot_load():
# from commons.funcs import Funcs
#
# print(Funcs.FUNC_MAPPING)
# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数
# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# # print(Template.FUNC_MAPPING)
#
# hot_load()

View File

@@ -5,7 +5,7 @@ import pytest
from commons.cases import TestAPI
TestAPI.find_yaml_case() # 加载yaml文件
TestAPI.find_test_cases() # 加载yaml文件
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

View File

@@ -16,6 +16,7 @@ jsonpath = "^0.82.2"
pymysql = "^1.1.1"
pytest-result-log = "^1.2.2"
allure-pytest = "^2.13.5"
cryptography = "^44.0.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -77,4 +77,4 @@ if __name__ == '__main__':
case_validator = CaseValidator()
print(case_validator.VALIDATORS)
# case_validator.assert_all(mock_case.get("validate"))
case_validator.assert_all(mock_case.get("validate"))

View File

@@ -11,39 +11,36 @@
"""
from pathlib import Path
from commons.models import CaseInfo
from commons.templates import Template
from commons.file_processors.yaml_processor import StringOrDict
from commons.file_processors.file_handle import FileHandle
class DataDriver:
@staticmethod
def generate_cases(file_name, case_info) -> list:
def generate_cases(file_name, case_info) -> dict:
if not case_info.get("parametrize"):
return {file_name + "[--]": case_info}
# cases = []
cases = {}
args_names = case_info.get("parametrize")[0]
for i, args_values in enumerate(case_info.get("parametrize")[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
# print(context)
# rendered = Template(CaseInfo(**case_info).to_yaml()).render(context)
rendered = Template(StringOrDict.to_string(case_info)).render(context)
# cases.append({file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)})
yield {file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)}
# return cases
rendered = Template(FileHandle.to_string(case_info)).render(context)
cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
return cases
if __name__ == '__main__':
from commons.file_processors.yaml_processor import YamlProcessor
file_path = Path(r"D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml")
file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml")
file_obj = YamlProcessor(file_path)
file_obj = FileHandle(file_path)
print(file_path.stem)
file_name = file_path.stem
file_name_ = file_path.stem
# mock_case_info = {
# "case_info0": {
# "feature": "页面状态",
@@ -78,24 +75,17 @@ if __name__ == '__main__':
dd = DataDriver()
# cases = dd.generate_cases(mock_case_info.get("case_info0"))
cases = dd.generate_cases(file_name, file_obj)
print(cases)
# print(len(cases))
keys_list = []
titles = []
for item in cases:
# print(item)
# 遍历列表中的每个字典
for key, value in item.items():
# print(f"key:{key}")
keys_list.append(key)
# print(f"value:{value}")
# # 遍历内层字典(这里内层字典其实只有一个键值对)
titles.append(value['title'])
# print(item)
cases_ = dd.generate_cases(file_name_, file_obj)
print(cases_)
case_keys = list(cases_.keys())
case_values = cases_.values()
print(keys_list)
print(titles)
print(case_keys)
print(case_values)
aa = [i.get("title") for i in case_values]
print(aa)
# print(list(case_values)[0]["feature"])
print(file_obj["feature"])
# print(list(case_values)[0]["story"])
print(file_obj["story"])
# ddt_title = [data.title for data in ddt_data]
# logger.info(f"{ddt_title=}")