6 Commits

Author SHA1 Message Date
a6996ed500 feat,fix(): 优化项目
- 优化yaml_processor(优化文件类型转换逻辑)
- 修复bug
2025-03-06 17:37:00 +08:00
31fad3f4e1 refactor(cases,yaml_processor): 优化测试用例加载功能以及文件加载功能
- 优化用例加载模块器
- 优化yaml文件读取模块
2025-03-06 00:26:43 +08:00
b8903798b8 refactor(files): 优化项目
- 重构files
- 新增yaml_processor(优化读取文件逻辑)
- 修复bug
2025-03-05 18:11:28 +08:00
698a95ac83 feat(funcs): 优化函数热加载
- 优化函数热加载模块funcs.py(由字典反射改为装饰器)
- 修复bug
2025-03-02 21:47:04 +08:00
1890918312 refactor(models): 优化项目
- 重构assert_all
- 优化目录结构
2025-02-28 17:48:20 +08:00
bc55dffe40 feat(): 优化项目
- 更新README
- 修复bug
2025-02-26 17:25:37 +08:00
22 changed files with 847 additions and 148 deletions

5
.gitignore vendored
View File

@@ -2,4 +2,7 @@
.idea/
.venv/
poetry.lock
.pytest_cache/
.pytest_cache/
report/
temp/
logs/

View File

@@ -9,8 +9,26 @@
...
## 环境搭建
1安装JAVA
- 配置环境变量
```text
JAVA_HOME
java的安装路径
CLASSPATH
%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar
添加Path
%JAVA_HOME%\bin
%JAVA_HOME%\jre\bin
```
2安装allure
- 配置环境变量
```text
添加Path
allure安装目录\bin
```
...
## 使用方法

View File

@@ -0,0 +1,31 @@
feature: 页面状态
story: 状态
title: 查询状态信息
request:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
extract: # 提取变量
msg:
- "json"
- "$.msg"
- 0
validate:
equals: # 断言相等
状态码等于200:
- Success.
- ${msg}
#parametrize: # 数据驱动测试
# - [ "title","username","password","msg" ] # 变量名
# - [ "测试1","user1","pass1","200" ] # 变量值
# - [ "测试2","user2","pass2","300" ] # 变量值
# - [ "测试3","user3","pass3","200" ] # 变量值
# - [ "测试4","user4","pass4","200" ] # 变量值

View File

@@ -14,24 +14,25 @@ import logging
import allure
import pytest
from commons.files import YamlFile
from commons import settings
from commons.file_processors.yaml_processor import YamlFile
from commons.models import CaseInfo
from commons.session import Session
from commons.exchange import Exchange
from commons import settings
from utils import data_driver
logger = logging.getLogger(__name__)
session = Session(settings.base_url)
_case_path = Path(settings.case_path)
exchanger = Exchange(settings.exchanger)
@allure.epic("项目名称answer")
class TestAPI:
...
@classmethod
def find_yaml_case(cls, case_path: Path = _case_path):
@@ -41,48 +42,71 @@ class TestAPI:
"""
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for yaml_path in yaml_path_list:
logger.info(f"load file {yaml_path=}")
logger.info(f"加载文件:{yaml_path}")
file = YamlFile(yaml_path) # 自动读取yaml文件
case_info = CaseInfo(**file) # 校验yaml格式
logger.debug(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
case_func = cls.new_case(case_info) # 从yaml格式转换为pytest格式
print(yaml_path.name)
setattr(cls, f"{yaml_path.name}", case_func) # 把pytest格式添加到类中
logger.info(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
# case_info = {yaml_path.stem:case_info}
# logger.info(f"case_info_dict={case_info}")
case_func = cls.new_case(yaml_path.stem, file) # 从yaml格式转换为pytest格式
print(yaml_path.stem)
setattr(cls, f"{yaml_path.stem}", case_func) # 把pytest格式添加到类中
@classmethod
def new_case(cls, case_info: CaseInfo):
ddt_data = case_info.ddt()
print(ddt_data)
ddt_title = [data.title for data in ddt_data]
def new_case(cls,file_name, case_info: dict):
case_data = data_driver.DataDriver().generate_cases(file_name,case_info)
# ddt_data = case_info.ddt()
keys_list = ['test_1_user[0]', 'test_1_user[1]', 'test_1_user[2]', 'test_1_user[3]']
titles = ['查询用户信息', '查询用户信息', '查询用户信息', '查询用户信息']
for item in case_data:
# 遍历列表中的每个字典
for key, value in item.items():
print(f"key:{key}")
keys_list.append(key)
print(f"value:{value}")
# # 遍历内层字典(这里内层字典其实只有一个键值对)
titles.append(value['title'])
print(f"测试数据:{case_data}")
item={'test_1_user[0]': {'feature': '特征', 'story': '事件', 'title': '查询用户信息', 'request': {'method': 'get', 'url': 'http://119.91.19.171:40065/answer/api/v1/connector/info', 'headers': {'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0'}}, 'extract': {'code': ['json', '$.code', 0], 'msg': ['json', '$.msg', 0]}, 'validate': {'equals': {'状态码等于200': [200, 'code1']}, 'not_equals': {'状态码不等于404': [404, 'code1']}, 'contains': {'包含关系': [404, 'code1']}, 'not_contains': {'不包含关系': [404, 'code1']}}, 'parametrize': [['title', 'username', 'password', 'code'], ['测试1', 'user1', 'pass1', 'code1'], ['测试2', 'user2', 'pass2', 'code2'], ['测试3', 'user3', 'pass3', 'code3'], ['测试4', 'user4', 'pass4', 'code4']]}}
# ddt_title = [data.title for data in ddt_data]
# logger.info(f"{ddt_title=}")
logger.info(f"keys_list={keys_list}")
logger.info(f"titles={titles}")
logger.info(f"feature={case_info.get('feature')}")
logger.info(f"story={case_info.get('story')}")
@allure.feature(case_info.get("feature"))
@allure.story(case_info.get("story"))
@pytest.mark.parametrize("case_key", keys_list, ids=titles)
def test_func(self, case_key):
logger.info(f"case_key={case_key}")
item.get(case_key)
logger.info(f"========:{item}")
logger.info(f"========:{item.get(case_key)}")
allure.dynamic.title(case_info.get("title"))
@allure.feature(case_info.feature)
@allure.story(case_info.story)
@pytest.mark.parametrize("case_info", ddt_data, ids=ddt_title)
def test_func(self, case_info: CaseInfo):
allure.dynamic.title(case_info.title)
logger.info(f"用例开始执行:{case_info.title}".center(80, "="))
logger.info(f"用例开始执行:{case_info.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(case_info)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info={new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.request)
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
new_case_info = CaseInfo(**new_case_info)
for var_name, extract_info in new_case_info.extract.items():
print(var_name, extract_info)
# logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(case_info) # 为断言加载变量
print(assert_case_info)
# logger.info(f"替换变量后:{assert_case_info}")
assert_case_info.assert_all() # 执行断言
logger.info(f"用例执行结束:{case_info.title}".center(80, "="))

View File

@@ -19,7 +19,7 @@ import allure
from commons.templates import Template
import jsonpath
from commons.files import YamlFile
from commons.file_processors.yaml_processor import YamlFile,StringOrDict
from commons.models import CaseInfo
logger = logging.getLogger(__name__)
@@ -31,15 +31,16 @@ class Exchange:
@allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index):
# resp中json是方法不是属性需要手动更改为属性
resp = copy.deepcopy(resp)
try:
# resp中json是方法不是属性需要手动更改为属性
resp.json = resp.json()
except json.decoder.JSONDecodeError:
resp.json = {"msg": "is not json data"}
data = getattr(resp, attr)
# print(data)
if expr.startswith("/"): # xpath
res = None
elif expr.startswith("$"): # jsonpath
@@ -53,23 +54,36 @@ class Exchange:
else: # 如果没有数据
value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件
@allure.step("替换变量")
def replace(self, case_info: CaseInfo):
...
# @allure.step("替换变量")
# def replace(self, case_info: CaseInfo):
# logger.info(case_info)
# # 1将case_info转换为字符串
# case_info_str = case_info.to_yaml()
# print(f"{case_info_str=}")
# # 2替换字符串
# case_info_str = Template(case_info_str).render(self.file)
# print(f"{case_info_str=}")
# # 3将字符串转换成case_info
# new_case_info = case_info.by_yaml(case_info_str)
# return new_case_info
@allure.step("替换变量")
def replace(self, case_info: dict):
logger.info(case_info)
# 1将case_info转换为字符串
case_info_str = case_info.to_yaml()
case_info_str = StringOrDict().to_string(case_info)
print(f"{case_info_str=}")
# 2替换字符串
case_info_str = Template(case_info_str).render(self.file)
print(f"{case_info_str=}")
# 3将字符串转换成case_info
new_case_info = case_info.by_yaml(case_info_str)
new_case_info = StringOrDict().to_dict(case_info_str)
return new_case_info
if __name__ == '__main__':
class MockResponse:
text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}'
@@ -87,7 +101,7 @@ if __name__ == '__main__':
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0)
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
case_info = CaseInfo(
mock_case_info = CaseInfo(
title="单元测试",
request={
"data":
@@ -96,5 +110,5 @@ if __name__ == '__main__':
extract={},
validate={}
)
new_case_info = exchanger.replace(case_info)
print(new_case_info)
new_mock_case_info = exchanger.replace(mock_case_info)
print(new_mock_case_info)

View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: __init__.py
@date: 2025/3/4 17:23
@desc:
"""

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: base
@date: 2025/3/4 17:23
@desc:
"""
import abc
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
"""
文件处理器的抽象基类。
定义了所有子类必须实现的方法。
"""
@abc.abstractmethod
def load(self):
"""加载."""
pass
@abc.abstractmethod
def to_string(self) -> str:
"""将文件内容转换为字符串。"""
pass
@abc.abstractmethod
def to_dict(self, data: str) -> dict:
"""将文件内容转换为字典。"""
pass
@abc.abstractmethod
def save(self, new_filepath=None):
"""将数据保存."""
pass

View File

@@ -0,0 +1,211 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: yaml_processor
@date: 2025/3/4 17:28
@desc:
"""
import logging
from typing import Union
from dataclasses import dataclass, asdict, field
from pathlib import Path
import yaml
from commons.file_processors.base import BaseFileProcessor
logger = logging.getLogger(__name__)
class YamlFile(BaseFileProcessor, dict):
"""
用于处理 YAML 文件的类,继承自 dict。
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
并可以直接像字典一样访问 YAML 数据。
"""
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
"""
初始化 YamlFile 对象。
Args:
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
如果不提供,则尝试从 filepath 加载数据。
"""
super().__init__() # 初始化父类 dict
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
if data is not None:
self.update(data) # 如果提供了初始数据,则更新字典
else:
self.load() # 否则,尝试从文件加载
def load(self) -> None:
"""
从 YAML 文件加载数据并更新字典。
如果文件不存在或加载失败,则清空字典并记录警告/错误。
"""
self.clear() # 清空现有数据
if self.filepath.exists():
try:
with open(self.filepath, "r", encoding="utf-8") as f:
loaded_data = yaml.safe_load(f) or {}
self.update(loaded_data) # 使用加载的数据更新字典
except yaml.YAMLError as e:
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
# 保持字典为空 (已在开头 clear)
else:
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
# 保持字典为空 (已在开头 clear)
def to_string(self) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
dict(self), # 使用dict转换为标准的字典
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
def to_dict(self, data: str) -> None:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
self.clear()
self.update(loaded_data) # 清空并更新
except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
self.clear() # 出错时也清空
def save(self, new_filepath: Union[str, Path, None] = None):
"""
将字典数据 (自身) 保存到 YAML 文件。
Args:
new_filepath: 可选参数,指定新的文件路径。如果为 None则覆盖原文件。
"""
filepath = Path(new_filepath) if new_filepath else self.filepath
try:
with open(filepath, "w", encoding="utf-8") as f:
yaml.safe_dump(
dict(self), # 使用dict转换为标准的字典
stream=f,
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except (TypeError, OSError) as e:
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
class StringOrDict:
@classmethod
def to_string(cls, data: dict) -> str:
"""
将字典 (自身) 转换为 YAML 格式的字符串。
Returns:
YAML 格式的字符串。
"""
try:
return yaml.safe_dump(
dict(data), # 使用dict转换为标准的字典
allow_unicode=True,
sort_keys=False,
default_flow_style=False
)
except TypeError as e:
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
return ""
@classmethod
def to_dict(cls, data: str) -> Union[None, dict]:
"""
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
Args:
data: YAML 格式的字符串。
"""
try:
loaded_data = yaml.safe_load(data) or {}
return loaded_data
except yaml.YAMLError as e:
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
if __name__ == '__main__':
# 示例用法
yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' # 你的 YAML 文件路径
yaml_file = YamlFile(yaml_path)
print(yaml_file)
print(type(yaml_file))
# # 直接像字典一样访问数据
# print("加载的数据:", yaml_file) # 直接打印对象,就是打印字典内容
# print("title:", yaml_file.get("title")) # 使用 get 方法
# if "title" in yaml_file: # 使用 in 检查键
# print("原始title:", yaml_file["title"]) # 使用方括号访问
# yaml_file["title"] = "新的标题" # 使用方括号修改
# print("修改后的title:", yaml_file["title"])
# #
# yaml_file["new_key"] = "new_value" # 添加新的键值对
#
# # 将字典转换为 YAML 字符串
# yaml_string = yaml_file.to_string()
# print("\nYAML 字符串:", yaml_string)
# #
# # 将 YAML 字符串转换回字典 (并更新 yaml_file)
# yaml_file.to_dict(yaml_string)
# print("\n从字符串加载的数据:", yaml_file)
#
# # 保存修改后的数据 (覆盖原文件)
# yaml_file.save()
#
# # 保存到新文件
# new_yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user_new.yaml'
# yaml_file.save(new_filepath=new_yaml_path)
# 测试从字符串初始化
# yaml_string2 = """
# name: Test User
# age: 30
# """
# yaml_file2 = YamlFile("test2.yaml", data=yaml.safe_load(yaml_string2)) # 从字符串初始化
# print("\n从字符串初始化的 YamlFile:", yaml_file2)
# yaml_file2.save() # 保存到 test2.yaml
#
# 测试文件不存在的情形
# non_existent_file = YamlFile("non_existent_file.yaml")
# print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {}
# non_existent_file['a'] = 1 # 可以直接添加
# print("\n加载不存在的文件:", non_existent_file)
# if __name__ == '__main__':
# from commons.models import CaseInfo
#
# yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml'
# yaml_file = YamlFile(yaml_path)
# print(yaml_file.load())
# # yaml_file.load()
# # case_info = CaseInfo(**yaml_file)
# # print(case_info)
# # yaml_file["title"] = "查询用户信息"
# # yaml_file.save()

View File

@@ -10,34 +10,56 @@
@desc: 读取和保存yaml文件
"""
import logging
from dataclasses import dataclass, asdict, field
from pathlib import Path
import yaml
from commons.models import CaseInfo
logger = logging.getLogger(__name__)
class YamlFile(dict):
def __init__(self, path):
super().__init__()
self.path = path
self.load()
super().__init__() # 初始化父类 dict
self.path = Path(path)
self.load() # 链式初始化加载
def load(self):
with open(self.path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) # 字典
if data:
self.update(data) # 把两个字段的内容合并
if self.path.exists():
with open(self.path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {} # 加载数据,空文件返回空字典
self.clear() # 清空当前实例
self.update(data) # 更新字典内容
else:
logger.warning(f"File {self.path} not found, initialized empty.")
return self # 链式调用
def to_yaml(self) -> str:
return yaml.safe_dump(
dict(self),
allow_unicode=True,
sort_keys=False
)
@classmethod
def by_yaml(cls, yaml_str):
data = yaml.safe_load(yaml_str) or {}
return cls({**data}) # 通过类方法创建实例
def save(self):
with open(self.path, "w", encoding="utf-8") as f:
yaml.safe_dump(
dict(self),
dict(self), # 直接 dump 实例本身(已继承 dict
stream=f,
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False) # sort_keys保持原有排序
allow_unicode=True,
sort_keys=False
)
return self # 链式调用
if __name__ == '__main__':
from commons.models import CaseInfo
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml'
yaml_file = YamlFile(yaml_path)
# yaml_file.load()

View File

@@ -15,32 +15,55 @@ import time
import urllib.parse
import hashlib
from commons.databases import db
# from commons.databases import db
# from commons.files import YamlFile
from commons.files import YamlFile
from commons import settings
logger = logging.getLogger(__name__)
class Funcs:
FUNC_MAPPING = {
"int": int,
"float": float,
"bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
@classmethod
def register(cls, name: str):
def decorator(func):
cls.FUNC_MAPPING[name] = func
return func
return decorator
@Funcs.register("url_unquote")
def url_unquote(s: str) -> str:
return urllib.parse.unquote(s)
@Funcs.register("str")
def to_string(s) -> str:
# 将数据转换为str类型。
return f"'{s}'"
@Funcs.register("time_str")
def time_str() -> str:
return str(time.time())
@Funcs.register("add")
def add(a, b):
return str(int(a) + int(b))
@Funcs.register("sql")
def sql(s: str) -> str:
res = db.execute_sql(s)
return res[0][0]
@Funcs.register("new_id")
def new_id():
# 自增,永不重复
id_file = YamlFile(settings.id_path)
@@ -49,21 +72,21 @@ def new_id():
return id_file["id"]
@Funcs.register("last_id")
def last_id() -> str:
# 不自增,只返回结果
id_file = YamlFile("id.yaml")
id_file = YamlFile(settings.id_path)
return id_file["id"]
@Funcs.register("md5")
def md5(content: str) -> str:
# 1原文转为字节
content = content.encode("utf-8")
result = hashlib.md5(content).hexdigest()
return result
@Funcs.register("base64_encode")
def base64_encode(content: str) -> str:
# 1原文转二进制
content = content.encode("utf-8")
@@ -74,7 +97,7 @@ def base64_encode(content: str) -> str:
return encode_str
@Funcs.register("base64_decode")
def base64_decode(content: str) -> str:
# 1原文转二进制
content = content.encode("utf-8")
@@ -85,11 +108,11 @@ def base64_decode(content: str) -> str:
return decode_str
@Funcs.register("rsa_encode")
def rsa_encode(content: str) -> str:
...
@Funcs.register("rsa_decode")
def rsa_decode(content: str) -> str:
...
@@ -97,8 +120,6 @@ def rsa_decode(content: str) -> str:
if __name__ == '__main__':
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
# print(res)
a = "这是中文dddddd"
bb = base64_encode(a)
print(bb)
cc = base64_decode(bb)
print(cc)
# print(f"计数器:{new_id()}")
# print(f"当前数值:{last_id()}")
print(Funcs().FUNC_MAPPING)

View File

@@ -10,13 +10,14 @@
@desc: 声明yaml用例格式
"""
import logging
from dataclasses import dataclass, asdict
from dataclasses import dataclass, asdict, field
import allure
import yaml
from commons.templates import Template
from commons import settings
from utils import case_validator
logger = logging.getLogger(__name__)
@@ -27,7 +28,7 @@ class CaseInfo:
request: dict
extract: dict
validate: dict
parametrize: list = ""
parametrize: list = field(default_factory=list)
epic: str = settings.allure_epic
feature: str = settings.allure_feature
story: str = settings.allure_story
@@ -48,36 +49,47 @@ class CaseInfo:
@allure.step("断言")
def assert_all(self):
_validator = case_validator.CaseValidator()
# print(case_validator.VALIDATORS)
if not self.validate:
return
for assert_type, assert_value in self.validate.items():
for msg, data in assert_value.items():
a, b = data[0], data[1]
# print(assert_type, a, b, msg)
match assert_type:
case 'equals':
logger.info(f"assert {a} == {b}, {msg}")
assert a == b, msg
case 'not_equals':
logger.info(f"assert {a} != {b}, {msg}")
assert a != b, msg
case 'contains':
logger.info(f"assert {a} in {b}, {msg}")
assert a in b, msg
case 'not_contains':
logger.info(f"assert {a} not in {b}, {msg}")
assert a not in b, msg
# case "xxxxx
_validator.assert_all(self.validate)
# for assert_type, assert_value in self.validate.items():
# for msg, data in assert_value.items():
# a, b = data[0], data[1]
# # print(assert_type, a, b, msg)
# match assert_type:
# case 'equals':
# logger.info(f"assert {a} == {b}, {msg}")
# assert a == b, msg
# case 'not_equals':
# logger.info(f"assert {a} != {b}, {msg}")
# assert a != b, msg
# case 'contains':
# logger.info(f"assert {a} in {b}, {msg}")
# assert a in b, msg
# case 'not_contains':
# logger.info(f"assert {a} not in {b}, {msg}")
# assert a not in b, msg
# case "xxxxx
def ddt(self) -> list: # 返回一个列表列表中应该包含N个注入了变量的caseInfo
case_list = []
if not self.parametrize: # 没有使用数据驱动测试
case_list.append('')
logger.info("1执行这一步")
# case_info_str = self.to_yaml() # 转字符串
# case_info_str = Template(case_info_str).render(d) # 输入变量
# case_info = self.by_yaml(case_info_str) # 转成类
# case_list.append(case_info)
case_list.append(self)
else: # 使用数据驱动测试
args_name = self.parametrize[0]
args_value_list = self.parametrize[1:]
for args_value in args_value_list:
d = dict(zip(args_name, args_value))
print(f"D的值{d}")
# d 就是数据驱动测试的变量,应输入到用例中
case_info_str = self.to_yaml() # 转字符串
case_info_str = Template(case_info_str).render(d) # 输入变量
@@ -88,11 +100,13 @@ class CaseInfo:
if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml', encoding='utf-8') as f:
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
# print(data)
case_info = CaseInfo(**data)
s = case_info.to_yaml()
print(s)
# print(s)
new_case_info = case_info.by_yaml(s)
print(new_case_info)
# print(new_case_info)
ddt_ddt = case_info.ddt()
print(ddt_ddt)

View File

@@ -41,7 +41,8 @@ class Session(requests.Session):
logger.info(f"接收响应 <<<<<< 状态码 = {resp.status_code}")
logger.info(f"接收响应 <<<<<< 响应头 = {resp.headers}")
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.text}")
# logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
return resp

View File

@@ -9,10 +9,14 @@
@date: 2025/2/23 21:34
@desc:
"""
base_url = 'http://127.0.0.1:8000'
case_path = r"E:\PyP\InterfaceAutoTest\TestCases"
exchanger = r"E:\PyP\InterfaceAutoTest\extract.yaml"
id_path =r"E:\PyP\InterfaceAutoTest\id.yaml"
from pathlib import Path
root_path = (Path(__file__)).resolve().parents[1]
base_url = 'http://119.91.19.171:40065'
case_path = rf"{root_path}\TestCases\answer"
exchanger = rf"{root_path}\extract.yaml"
id_path = rf"{root_path}\id.yaml"
db_host = '119.91.19.171' # ip
db_port = 3306 # 端口
@@ -26,3 +30,7 @@ allure_story: str = "默认事件story"
rsa_public = ""
rsa_private = ""
if __name__ == '__main__':
print(root_path)

View File

@@ -13,26 +13,24 @@ import copy
import logging
import re
import string
import inspect
from commons.funcs import Funcs
logger = logging.getLogger(__name__)
def _str(s) -> str:
# 将数据转换为str类型。
return f"'{s}'"
class Template(string.Template):
"""
1支持函数调用
2参数也可以是变量
"""
func_mapping = {
"str": _str,
"int": int,
"float": float,
"bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
# FUNC_MAPPING = {
# "int": int,
# "float": float,
# "bool": bool
# } # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
@@ -50,15 +48,17 @@ class Template(string.Template):
:return: 替换后的结果
"""
mapping = copy.deepcopy(mapping)
mapping.update(self.func_mapping) # 合并两个mapping
logger.info(f"mapping更新前: {mapping}")
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
logger.info(f"mapping更新后: {mapping}")
def convert(mo):
func_name = mo.group("func_name")
func_args = mo.group("func_args").split(",")
func = mapping.get(func_name) # 读取指定函数
func_args_value = [mapping.get(arg, arg) for arg in func_args]
if func_args_value == [""]: # 处理没有参数的func
if func_args_value == [""]: # 处理没有参数的func
func_args_value = []
if not callable(func):
@@ -69,14 +69,27 @@ class Template(string.Template):
return self.call_pattern.sub(convert, template)
def hot_load():
from commons import funcs
for func_name in dir(funcs): # 遍历模块中的所有函数
if func_name.startswith("_"):
continue
func_code = getattr(funcs, func_name) # 取到函数对象
if callable(func_code): # 如果是一个可以调用的函数
Template.func_mapping[func_name] = func_code # 函数放到Template中
# def hot_load():
# from commons import funcs
#
# for func_name in dir(funcs): # 遍历模块中的所有函数
# if func_name.startswith("_"):
# continue
# func_code = getattr(funcs, func_name) # 取到函数对象
# # print(func_code)
# if callable(func_code): # 如果是一个可以调用的函数
# Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# print(Template.FUNC_MAPPING)
# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数
# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# # print(Template.FUNC_MAPPING)
hot_load()
# def hot_load():
# from commons.funcs import Funcs
#
# print(Funcs.FUNC_MAPPING)
# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数
# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中
# # print(Template.FUNC_MAPPING)
#
# hot_load()

View File

@@ -1,3 +1,7 @@
code: 200
msg: 成功。
reason: base.success
name: 张三
age: '18'
data:
- 3
- 4
- 5
aaa: null

View File

@@ -1 +1 @@
"id":0
id: 13

View File

@@ -1,18 +1,54 @@
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_1_user.yaml')'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_2_url.yaml')'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_3_sql.yaml')'
''02/23/2025 10:17:34 PM' [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - ---------------Start: main.py::TestAPI::test_1_user.yaml[查询用户信息0]---------------'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:67 - =================================用例开始执行:查询用户信息=================================='
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:71 - 1正在注入变量...'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:74 - 2正在请求接口...'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login'}'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None '
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Sun, 23 Feb 2025 14:17:34 GMT', 'Content-Length': '64'}'
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': '成功。', 'data': []}'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:77 - 3正在提取变量...'
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:83 - 4正在断言...'
''02/23/2025 10:17:34 PM' [commons.models] INFO models.assert_all:59 - assert 200 == code1, 状态码等于200'
''02/23/2025 10:17:34 PM' [pytest_result_log] ERROR plugin.pytest_result_log:190 - test status is FAILED (main.py::TestAPI::test_1_user.yaml[查询用户信息0]): AssertionError'
''02/23/2025 10:17:34 PM' [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ----------------End: main.py::TestAPI::test_1_user.yaml[查询用户信息0]----------------'
03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:45 - 加载文件D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml
03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:50 - case_info=title: 查询状态信息
request:
method: get
url: /answer/api/v1/connector/info
headers:
Host: 119.91.19.171:40065
Accept-Language: en_US
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
Referer: http://119.91.19.171:40065/users/login
Accept-Encoding: gzip, deflate
extract:
msg:
- json
- $.msg
- 0
validate:
equals:
状态码等于200:
- Success.
- ${msg}
parametrize: []
epic: 项目名称answer
feature: 页面状态
story: 状态
03/03/2025 05:34:28 PM [commons.models] INFO models.ddt:81 - 1执行这一步
03/03/2025 05:34:28 PM [commons.cases] INFO cases.new_case:63 - ddt_title=['查询状态信息']
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - -----------------Start: main.py::TestAPI::test_1_status[查询状态信息]-----------------
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:71 - =================================用例开始执行:查询状态信息==================================
03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称answer', feature='页面状态', story='状态')
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12}
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': <class 'int'>, 'float': <class 'float'>, 'bool': <class 'bool'>, 'url_unquote': <function url_unquote at 0x00000299E6AAC0D0>, 'str': <function to_string at 0x00000299E6AAC160>, 'time_str': <function time_str at 0x00000299E6AAC1F0>, 'add': <function add at 0x00000299E6AAC280>, 'sql': <function sql at 0x00000299E6AAC310>, 'new_id': <function new_id at 0x00000299E6AAC3A0>, 'last_id': <function last_id at 0x00000299E6AAC430>, 'md5': <function md5 at 0x00000299E6AAC4C0>, 'base64_encode': <function base64_encode at 0x00000299E6AAC550>, 'base64_decode': <function base64_decode at 0x00000299E6AAC5E0>, 'rsa_encode': <function rsa_encode at 0x00000299E6AAC670>, 'rsa_decode': <function rsa_decode at 0x00000299E6AAC700>}
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:75 - 1正在注入变量...
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:78 - 2正在请求接口...
03/03/2025 05:34:28 PM [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info
03/03/2025 05:34:28 PM [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': 'application/json, text/plain, */*', 'Connection': 'keep-alive', 'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Referer': 'http://119.91.19.171:40065/users/login'}
03/03/2025 05:34:28 PM [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None
03/03/2025 05:34:28 PM [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200
03/03/2025 05:34:28 PM [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Mon, 03 Mar 2025 09:34:29 GMT', 'Content-Length': '63'}
03/03/2025 05:34:28 PM [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': 'Success.', 'data': []}
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:81 - 3正在提取变量...
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:87 - 4正在断言...
03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称answer', feature='页面状态', story='状态')
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12}
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': <class 'int'>, 'float': <class 'float'>, 'bool': <class 'bool'>, 'url_unquote': <function url_unquote at 0x00000299E6AAC0D0>, 'str': <function to_string at 0x00000299E6AAC160>, 'time_str': <function time_str at 0x00000299E6AAC1F0>, 'add': <function add at 0x00000299E6AAC280>, 'sql': <function sql at 0x00000299E6AAC310>, 'new_id': <function new_id at 0x00000299E6AAC3A0>, 'last_id': <function last_id at 0x00000299E6AAC430>, 'md5': <function md5 at 0x00000299E6AAC4C0>, 'base64_encode': <function base64_encode at 0x00000299E6AAC550>, 'base64_decode': <function base64_decode at 0x00000299E6AAC5E0>, 'rsa_encode': <function rsa_encode at 0x00000299E6AAC670>, 'rsa_decode': <function rsa_decode at 0x00000299E6AAC700>}
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:32 - 键equals{'状态码等于200': ['Success.', 'Success.']}
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:34 - 获取到的断言:<function validate_equals at 0x00000299E6AAC940>
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.validate_equals:43 - assert Success. == Success., 状态码等于200执行这段代码
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:92 - =================================用例执行结束:查询状态信息==================================
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_result_log:190 - test status is PASSED (main.py::TestAPI::test_1_status[查询状态信息]):
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ------------------End: main.py::TestAPI::test_1_status[查询状态信息]------------------

View File

@@ -10,9 +10,10 @@ TestAPI.find_yaml_case() # 加载yaml文件
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
# 1启动框架生成临时文件
pytest.main([__file__, "-x", "-v"]) # -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录指定pytest.ini路径
# -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录指定pytest.ini路径;--alluredir=temp。指定数据生成目录
pytest.main([__file__, "-x", "-v","--alluredir=temp"])
# 2生成HTML报告
os.system('allure generate temp -o report --clean') # java程序只能借助操作系统执行
# 3备份日志
shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log")
# shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log")

View File

@@ -4,7 +4,7 @@ addopts = -q --show-capture=no
log_file = logs/pytest.log
log_file_level = info
log_file_format = '%(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s'
log_file_date_format = '%m/%d/%Y %I:%M:%S %p'
log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s
log_file_date_format = %m/%d/%Y %I:%M:%S %p
disable_test_id_escaping_and_forfeit_all_rights_to_community_support = true

48
utils/case_parser.py Normal file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_parser
@date: 2025/2/27 17:25
@desc:
"""
import logging
from dataclasses import dataclass, asdict, field
import yaml
from commons.models import CaseInfo
class CaseParser:
@staticmethod
def to_yaml(case_data: dict) -> str:
try:
CaseInfo(**case_data)
except TypeError as error:
logging.error(error)
raise error
return yaml.safe_dump(case_data, allow_unicode=True, sort_keys=False)
@staticmethod
def from_yaml(yaml_str: str) -> CaseInfo:
return CaseInfo(**yaml.safe_load(yaml_str))
if __name__ == '__main__':
with open(r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
print(data)
print(type(data))
# print(CaseInfo(**data))
case_parser = CaseParser()
case_data_ = case_parser.to_yaml(data)
# print(case_data_)
# case_parser.from_yaml(case_data_)
# print(type(case_data_))

79
utils/case_validator.py Normal file
View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: case_validator
@date: 2025/2/27 17:25
@desc:
"""
import logging
logger = logging.getLogger(__name__)
class CaseValidator:
VALIDATORS = {}
@classmethod
def register(cls, name: str):
def decorator(func):
cls.VALIDATORS[name] = func
return func
return decorator
@classmethod
def assert_all(cls, validate: dict):
for assert_type, cases in validate.items():
logger.info(f"键:{assert_type},值:{cases}")
validator = cls.VALIDATORS.get(assert_type)
logger.info(f"获取到的断言:{validator}")
if not validator:
raise KeyError(f"Unsupported validator: {assert_type}")
for msg, (a, b) in cases.items():
validator(a, b, msg)
@CaseValidator.register('equals')
def validate_equals(a, b, msg):
logger.info(f"assert {a} == {b}, {msg}执行这段代码")
assert a == b, msg
@CaseValidator.register('not_equals')
def validate_not_equals(a, b, msg):
logger.info(f"assert {a} != {b}, {msg}")
assert a != b, msg
@CaseValidator.register('contains')
def validate_contains(a, b, msg):
logger.info(f"assert {a} in {b}, {msg}")
assert a in b, msg
@CaseValidator.register('not_contains')
def validate_not_contains(a, b, msg):
logger.info(f"assert {a} not in {b}, {msg}")
assert a not in b, msg
if __name__ == '__main__':
mock_case = {
"validate": {
"equals": {
"判断相等": ["Success.", "Success."]
},
"not_equals": {
"判断不相等": ["Success.", "Suc."]
}
}
}
case_validator = CaseValidator()
print(case_validator.VALIDATORS)
# case_validator.assert_all(mock_case.get("validate"))

101
utils/data_driver.py Normal file
View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: data_driver
@date: 2025/3/3 10:56
@desc:
"""
from pathlib import Path
from commons.models import CaseInfo
from commons.templates import Template
from commons.file_processors.yaml_processor import StringOrDict
class DataDriver:
@staticmethod
def generate_cases(file_name, case_info) -> list:
if not case_info.get("parametrize"):
return {file_name + "[--]": case_info}
# cases = []
args_names = case_info.get("parametrize")[0]
for i, args_values in enumerate(case_info.get("parametrize")[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
# print(context)
# rendered = Template(CaseInfo(**case_info).to_yaml()).render(context)
rendered = Template(StringOrDict.to_string(case_info)).render(context)
# cases.append({file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)})
yield {file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)}
# return cases
if __name__ == '__main__':
from commons.file_processors.yaml_processor import YamlFile
file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml")
file_obj = YamlFile(file_path)
# print(file_path.stem)
file_name = file_path.stem
mock_case_info = {
"case_info0": {
"feature": "页面状态",
"story": "状态",
"title": "查询状态信息",
"request": "",
"extract": "",
"validate": "",
"parametrize": [["title", "username", "password", "msg"], ["测试1", "user1", "pass1", "200"],
["测试2", "user2", "pass2", "300"]]
},
"case_info1": {
"feature": "页面状态",
"story": "状态",
"title": "查询状态信息",
"request": "",
"extract": "",
"validate": "",
"parametrize": [1, 2, 3]
},
"case_info2": {
"feature": "页面状态",
"story": "状态",
"title": "查询状态信息",
"request": "",
"extract": "",
"validate": "",
"parametrize": [1, 2, 3]
}
}
dd = DataDriver()
# cases = dd.generate_cases(mock_case_info.get("case_info0"))
cases = dd.generate_cases(file_name, file_obj)
# print(cases)
# print(len(cases))
keys_list = []
titles = []
for item in cases:
print(item)
# 遍历列表中的每个字典
for key, value in item.items():
print(f"key:{key}")
keys_list.append(key)
print(f"value:{value}")
# # 遍历内层字典(这里内层字典其实只有一个键值对)
titles.append(value['title'])
print(item)
print(keys_list)
print(titles)
# ddt_title = [data.title for data in ddt_data]
# logger.info(f"{ddt_title=}")