Compare commits
11 Commits
master
...
feature-in
| Author | SHA1 | Date | |
|---|---|---|---|
| 300b5a92d4 | |||
| 2e9f1c12f7 | |||
| 4324cf37aa | |||
| a50e00a4e1 | |||
| 914b0301ba | |||
| a6996ed500 | |||
| 31fad3f4e1 | |||
| b8903798b8 | |||
| 698a95ac83 | |||
| 1890918312 | |||
| bc55dffe40 |
5
.gitignore
vendored
5
.gitignore
vendored
@@ -2,4 +2,7 @@
|
||||
.idea/
|
||||
.venv/
|
||||
poetry.lock
|
||||
.pytest_cache/
|
||||
.pytest_cache/
|
||||
report/
|
||||
temp/
|
||||
logs/
|
||||
20
README.md
20
README.md
@@ -9,8 +9,26 @@
|
||||
...
|
||||
|
||||
## 环境搭建
|
||||
1,安装JAVA
|
||||
- 配置环境变量
|
||||
```text
|
||||
JAVA_HOME
|
||||
java的安装路径
|
||||
CLASSPATH
|
||||
%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar
|
||||
|
||||
添加Path
|
||||
%JAVA_HOME%\bin
|
||||
%JAVA_HOME%\jre\bin
|
||||
```
|
||||
|
||||
2,安装allure
|
||||
- 配置环境变量
|
||||
```text
|
||||
添加Path
|
||||
allure安装目录\bin
|
||||
```
|
||||
|
||||
...
|
||||
|
||||
## 使用方法
|
||||
|
||||
|
||||
31
TestCases/answer/test_1_status.yaml
Normal file
31
TestCases/answer/test_1_status.yaml
Normal file
@@ -0,0 +1,31 @@
|
||||
feature: 页面状态
|
||||
story: 状态
|
||||
title: 查询状态信息
|
||||
request:
|
||||
method: get
|
||||
url: /answer/api/v1/connector/info
|
||||
headers:
|
||||
Host: 119.91.19.171:40065
|
||||
Accept-Language: en_US
|
||||
Accept: application/json, text/plain, */*
|
||||
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
|
||||
Referer: http://119.91.19.171:40065/users/login
|
||||
Accept-Encoding: gzip, deflate
|
||||
extract: # 提取变量
|
||||
msg:
|
||||
- "json"
|
||||
- "$.msg"
|
||||
- 0
|
||||
|
||||
validate:
|
||||
equals: # 断言相等
|
||||
状态码等于200:
|
||||
- Success.
|
||||
- ${msg}
|
||||
|
||||
#parametrize: # 数据驱动测试
|
||||
# - [ "title","username","password","msg" ] # 变量名
|
||||
# - [ "测试1","user1","pass1","200" ] # 变量值
|
||||
# - [ "测试2","user2","pass2","300" ] # 变量值
|
||||
# - [ "测试3","user3","pass3","200" ] # 变量值
|
||||
# - [ "测试4","user4","pass4","200" ] # 变量值
|
||||
65
TestCases/test_1_user.json
Normal file
65
TestCases/test_1_user.json
Normal file
@@ -0,0 +1,65 @@
|
||||
{
|
||||
"epic": "项目名称:answer",
|
||||
"feature": "页面状态",
|
||||
"story": "状态",
|
||||
"title": "查询状态信息",
|
||||
"request": {
|
||||
"method": "get",
|
||||
"url": "/answer/api/v1/connector/info",
|
||||
"headers": {
|
||||
"Host": "119.91.19.171:40065",
|
||||
"Accept-Language": "en_US",
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
|
||||
"Referer": "http://119.91.19.171:40065/users/login",
|
||||
"Accept-Encoding": "gzip, deflate"
|
||||
}
|
||||
},
|
||||
"extract": {
|
||||
"msg": [
|
||||
"json",
|
||||
"$.msg",
|
||||
0
|
||||
]
|
||||
},
|
||||
"validate": {
|
||||
"equals": {
|
||||
"状态码等于200": [
|
||||
"Success.",
|
||||
"Success."
|
||||
]
|
||||
}
|
||||
},
|
||||
"parametrize": [
|
||||
[
|
||||
"title",
|
||||
"username",
|
||||
"password",
|
||||
"msg"
|
||||
],
|
||||
[
|
||||
"测试1",
|
||||
"user1",
|
||||
"pass1",
|
||||
"200"
|
||||
],
|
||||
[
|
||||
"测试2",
|
||||
"user2",
|
||||
"pass2",
|
||||
"300"
|
||||
],
|
||||
[
|
||||
"测试3",
|
||||
"user3",
|
||||
"pass3",
|
||||
"200"
|
||||
],
|
||||
[
|
||||
"测试4",
|
||||
"user4",
|
||||
"pass4",
|
||||
"200"
|
||||
]
|
||||
]
|
||||
}
|
||||
80
commons/case_handler.py
Normal file
80
commons/case_handler.py
Normal file
@@ -0,0 +1,80 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: case_handler
|
||||
@date: 2025/5/26 22:13
|
||||
@desc:
|
||||
"""
|
||||
import json
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, asdict
|
||||
from commons.models import TestCaseModel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestCaseHandle(TestCaseModel):
|
||||
|
||||
@classmethod
|
||||
def new(cls, testcase: dict) -> 'TestCaseHandle':
|
||||
|
||||
try:
|
||||
instance = cls(**testcase)
|
||||
return instance
|
||||
except (TypeError, ValueError) as e:
|
||||
logger.warning(f"解析错误:{e}")
|
||||
raise e
|
||||
|
||||
def to_string(self) -> str:
|
||||
"""
|
||||
将 字典 转换为 json 格式的字符串。
|
||||
|
||||
:return:
|
||||
json 格式的字符串。
|
||||
"""
|
||||
try:
|
||||
res = json.dumps(asdict(self), ensure_ascii=False)
|
||||
return res
|
||||
except TypeError as e:
|
||||
logger.error(f"将数据转换为 json 字符串时出错: {e}")
|
||||
raise e
|
||||
|
||||
@staticmethod
|
||||
def to_dict(json_str: str) -> dict:
|
||||
"""
|
||||
将 json 格式的字符串转换为 字典.
|
||||
|
||||
:param
|
||||
json_str: json 格式的字符串。
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
res = json.loads(json_str)
|
||||
return res
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"将 json 字符串转换为字典时出错: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from pathlib import Path
|
||||
from commons.file_processors import processor_factory
|
||||
|
||||
test_data = Path(r"E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml")
|
||||
yaml_data = processor_factory.get_processor_class(test_data)
|
||||
case_info = TestCaseHandle.new(yaml_data.load())
|
||||
print(case_info.to_string())
|
||||
print(type(case_info.to_string()))
|
||||
print(case_info.to_dict(case_info.to_string()))
|
||||
print(type(case_info.to_dict(case_info.to_string())))
|
||||
print(type(case_info))
|
||||
print(case_info.parametrize)
|
||||
|
||||
for i in case_info.parametrize:
|
||||
print(i)
|
||||
239
commons/cases.py
239
commons/cases.py
@@ -9,88 +9,223 @@
|
||||
@date: 2024 2024/9/16 9:57
|
||||
@desc: 动态生成用例
|
||||
"""
|
||||
from dataclasses import asdict
|
||||
from pathlib import Path
|
||||
import logging
|
||||
from typing import Union, Generator, Type
|
||||
from unittest import TestCase
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
from commons.files import YamlFile
|
||||
from commons.models import CaseInfo
|
||||
|
||||
from commons import settings
|
||||
from commons.file_processors.processor_factory import get_processor_class
|
||||
# from commons.models import CaseInfo
|
||||
from commons.session import Session
|
||||
from commons.exchange import Exchange
|
||||
from commons import settings
|
||||
from commons.templates import Template
|
||||
from commons.case_handler import TestCaseHandle
|
||||
from utils import case_validator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
session = Session(settings.base_url)
|
||||
|
||||
cases_dir = Path(settings.cases_dir)
|
||||
|
||||
_case_path = Path(settings.case_path)
|
||||
exchanger = Exchange(settings.exchanger)
|
||||
|
||||
|
||||
@allure.epic("项目名称:answer")
|
||||
class TestAPI:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def find_yaml_case(cls, case_path: Path = _case_path):
|
||||
"""
|
||||
搜索和加载yaml文件
|
||||
:return:
|
||||
"""
|
||||
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
|
||||
for yaml_path in yaml_path_list:
|
||||
logger.info(f"load file {yaml_path=}")
|
||||
def run(cls, testcase_dir: Union[Path, str] = cases_dir):
|
||||
for fp in CaseFinder(testcase_dir).find_testcases():
|
||||
print(fp.name)
|
||||
case = CaseGenerator(fp).generate_testcases()
|
||||
print(f"{case=}")
|
||||
for i in case:
|
||||
print(f"{i=}")
|
||||
CaseRegister(cls).register_test_func(i)
|
||||
# @classmethod
|
||||
# def find_test_cases(cls, case_dir: Path = cases_dir):
|
||||
# """
|
||||
# 搜索和加载yaml文件
|
||||
# :return:
|
||||
# """
|
||||
# case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
|
||||
# for case_path in case_path_list:
|
||||
# logger.info(f"加载文件:{case_path}")
|
||||
#
|
||||
# file = FileHandle(case_path) # 自动读取yaml文件
|
||||
# try:
|
||||
# CaseInfo(**file) # 校验用例格式
|
||||
# logger.info(f"case_info:{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
|
||||
# case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
|
||||
# # print(case_path.stem)
|
||||
# setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
|
||||
# except Exception as e:
|
||||
# logger.error(e)
|
||||
#
|
||||
# @classmethod
|
||||
# def new_case(cls, file_name, case_info: dict):
|
||||
# test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
|
||||
#
|
||||
# keys_list = list(test_case.keys())
|
||||
# logger.info(f"keys_list:{keys_list}")
|
||||
#
|
||||
# values_list = list(test_case.values())
|
||||
# logger.info(f"测试用例列表:{values_list}")
|
||||
#
|
||||
# driver_title = [i.get("title") for i in values_list]
|
||||
# logger.info(f"driver_title={driver_title}")
|
||||
#
|
||||
# epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
|
||||
# logger.info(f"epic:{epic}")
|
||||
#
|
||||
# feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
|
||||
# logger.info(f"feature:{feature}")
|
||||
#
|
||||
# story = case_info["story"] if case_info["story"] else settings.allure_story
|
||||
# logger.info(f"story:{story}")
|
||||
#
|
||||
# @allure.epic(epic)
|
||||
# @allure.feature(feature)
|
||||
# @allure.story(story)
|
||||
# @pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
|
||||
# def test_func(self, case_key):
|
||||
# logger.info(f"case_key:{case_key}")
|
||||
#
|
||||
# test_case_mapping = test_case.get(case_key)
|
||||
# logger.info(f"测试用例:{test_case_mapping}")
|
||||
#
|
||||
# allure.dynamic.title(test_case_mapping.get("title"))
|
||||
#
|
||||
# logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
|
||||
#
|
||||
# # 0,变量替换
|
||||
# new_case_info = exchanger.replace(test_case_mapping)
|
||||
# logger.info(f"1,正在注入变量...")
|
||||
# logger.info(f"new_case_info:{new_case_info}")
|
||||
# # 1,发送请求
|
||||
# logger.info(f"2,正在请求接口...")
|
||||
# resp = session.request(**new_case_info.get("request"))
|
||||
#
|
||||
# logger.info(f"3,正在提取变量...")
|
||||
# # 2,保存变量(接口关联)
|
||||
# for var_name, extract_info in new_case_info.get("extract").items():
|
||||
# logger.info(f"保存变量:{var_name}{extract_info}")
|
||||
# exchanger.extract(resp, var_name, *extract_info)
|
||||
# # 3,断言
|
||||
# logger.info(f"4,正在断言...")
|
||||
# assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
|
||||
# logger.info(f"替换变量后:{assert_case_info}")
|
||||
# # assert_case_info.assert_all() # 执行断言
|
||||
# _validator = case_validator.CaseValidator()
|
||||
# _validator.assert_all(assert_case_info.get("validate"))
|
||||
#
|
||||
# logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
|
||||
#
|
||||
# return test_func
|
||||
|
||||
file = YamlFile(yaml_path) # 自动读取yaml文件
|
||||
case_info = CaseInfo(**file) # 校验yaml格式
|
||||
|
||||
logger.debug(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
|
||||
class CaseFinder:
|
||||
find_suffix: str = settings.test_suffix
|
||||
|
||||
case_func = cls.new_case(case_info) # 从yaml格式转换为pytest格式
|
||||
print(yaml_path.name)
|
||||
setattr(cls, f"{yaml_path.name}", case_func) # 把pytest格式添加到类中
|
||||
def __init__(self, testcase_dir: Union[str, Path]):
|
||||
if Path(testcase_dir).is_dir():
|
||||
self.testcase_dir: Path = Path(testcase_dir)
|
||||
else:
|
||||
raise FileNotFoundError("不是有效的目录")
|
||||
|
||||
@classmethod
|
||||
def new_case(cls, case_info: CaseInfo):
|
||||
ddt_data = case_info.ddt()
|
||||
print(ddt_data)
|
||||
ddt_title = [data.title for data in ddt_data]
|
||||
def find_testcases(self) -> Generator[Path, None, None]:
|
||||
testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}")
|
||||
for fp in testcase_files:
|
||||
logger.info(f"加载文件:{fp}")
|
||||
yield fp
|
||||
|
||||
@allure.feature(case_info.feature)
|
||||
@allure.story(case_info.story)
|
||||
@pytest.mark.parametrize("case_info", ddt_data, ids=ddt_title)
|
||||
def test_func(self, case_info: CaseInfo):
|
||||
allure.dynamic.title(case_info.title)
|
||||
|
||||
logger.info(f"用例开始执行:{case_info.title}".center(80, "="))
|
||||
class CaseGenerator:
|
||||
|
||||
# 0,变量替换
|
||||
new_case_info = exchanger.replace(case_info)
|
||||
logger.info(f"1,正在注入变量...")
|
||||
def __init__(self, fp: Union[str, Path]):
|
||||
self.fp: Path = Path(fp)
|
||||
|
||||
# 1,发送请求
|
||||
logger.info(f"2,正在请求接口...")
|
||||
resp = session.request(**new_case_info.request)
|
||||
def generate_testcases(self) -> Generator[dict, None, None]:
|
||||
file_name = self.fp.stem
|
||||
|
||||
logger.info(f"3,正在提取变量...")
|
||||
# 2,保存变量(接口关联)
|
||||
for var_name, extract_info in new_case_info.extract.items():
|
||||
print(var_name, extract_info)
|
||||
exchanger.extract(resp, var_name, *extract_info)
|
||||
# 3,断言
|
||||
logger.info(f"4,正在断言...")
|
||||
assert_case_info = exchanger.replace(case_info) # 为断言加载变量
|
||||
print(assert_case_info)
|
||||
assert_case_info.assert_all() # 执行断言
|
||||
case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件
|
||||
case_info = TestCaseHandle.new(case_info_)
|
||||
|
||||
logger.info(f"用例执行结束:{case_info.title}".center(80, "="))
|
||||
if not case_info.parametrize:
|
||||
yield {file_name + "__": asdict(case_info)}
|
||||
else:
|
||||
cases = {}
|
||||
args_names = case_info.parametrize[0]
|
||||
for i, args_values in enumerate(case_info.parametrize[1:]):
|
||||
# print(args_values)
|
||||
context = dict(zip(args_names, args_values))
|
||||
print(context)
|
||||
# rendered = Template(FileHandle.to_string(case_info)).render(context)
|
||||
rendered = Template(case_info.to_string()).render(context)
|
||||
# cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
|
||||
cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)})
|
||||
|
||||
return test_func
|
||||
yield cases
|
||||
|
||||
|
||||
class CaseRegister:
|
||||
def __init__(self, register: Type["TestAPI"]):
|
||||
self.register: Type["TestAPI"] = register
|
||||
|
||||
def register_test_func(self, case: dict):
|
||||
for test_filed_name, case_info in case.items():
|
||||
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
|
||||
logger.info(f"epic:{epic}")
|
||||
|
||||
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
|
||||
logger.info(f"feature:{feature}")
|
||||
|
||||
story = case_info["story"] if case_info["story"] else settings.allure_story
|
||||
logger.info(f"story:{story}")
|
||||
|
||||
@allure.epic(epic)
|
||||
@allure.feature(feature)
|
||||
@allure.story(story)
|
||||
def register_func(instance, testcase=case_info):
|
||||
# allure.dynamic.epic(epic)
|
||||
# allure.dynamic.feature(feature)
|
||||
# allure.dynamic.story(story)
|
||||
allure.dynamic.title(testcase.get("title"))
|
||||
logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "="))
|
||||
|
||||
# 0,变量替换
|
||||
new_case_info = exchanger.replace(testcase)
|
||||
logger.info(f"1,正在注入变量...")
|
||||
logger.info(f"new_case_info:{new_case_info}")
|
||||
# 1,发送请求
|
||||
logger.info(f"2,正在请求接口...")
|
||||
resp = session.request(**new_case_info.get("request"))
|
||||
|
||||
logger.info(f"3,正在提取变量...")
|
||||
# 2,保存变量(接口关联)
|
||||
for var_name, extract_info in new_case_info.get("extract").items():
|
||||
logger.info(f"保存变量:{var_name}{extract_info}")
|
||||
exchanger.extract(resp, var_name, *extract_info)
|
||||
# 3,断言
|
||||
logger.info(f"4,正在断言...")
|
||||
assert_case_info = exchanger.replace(testcase) # 为断言加载变量
|
||||
logger.info(f"替换变量后:{assert_case_info}")
|
||||
# assert_case_info.assert_all() # 执行断言
|
||||
_validator = case_validator.CaseValidator()
|
||||
_validator.assert_all(assert_case_info.get("validate"))
|
||||
|
||||
logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "="))
|
||||
|
||||
# return test_func
|
||||
|
||||
setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中
|
||||
|
||||
|
||||
# TestAPI.find_yaml_case()
|
||||
if __name__ == '__main__':
|
||||
TestAPI.find_yaml_case()
|
||||
# print(TestAPI.__dict__)
|
||||
TestAPI.run(cases_dir)
|
||||
print(TestAPI.__dict__)
|
||||
|
||||
@@ -33,13 +33,13 @@ class DBServer:
|
||||
|
||||
db = DBServer(
|
||||
host=settings.db_host, # ip
|
||||
port=3306, # 端口
|
||||
user='root', # 用户名
|
||||
password='mysql_hNahSe', # 密码
|
||||
database='answer' # 库名
|
||||
port=settings.db_port, # 端口
|
||||
user=settings.db_user, # 用户名
|
||||
password=settings.db_password, # 密码
|
||||
database=settings.db_database # 库名
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
...
|
||||
res = db.execute_sql('select username from user where id=1;')
|
||||
print(res[0])
|
||||
# res = db.execute_sql('select username from user where id=1;')
|
||||
# print(res[0])
|
||||
|
||||
@@ -13,33 +13,33 @@ import copy
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import jsonpath
|
||||
|
||||
import allure
|
||||
|
||||
from commons.templates import Template
|
||||
import jsonpath
|
||||
|
||||
from commons.files import YamlFile
|
||||
from commons.models import CaseInfo
|
||||
from commons.file_processors.processor_factory import get_processor_class
|
||||
from tests.b import TestCaseHandle
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Exchange:
|
||||
def __init__(self, path):
|
||||
self.file = YamlFile(path)
|
||||
self.file = get_processor_class(path)
|
||||
|
||||
@allure.step("提取变量")
|
||||
def extract(self, resp, var_name, attr, expr: str, index):
|
||||
# resp中json是方法不是属性,需要手动更改为属性
|
||||
def extract(self, resp, var_name, attr, expr: str, index) -> None:
|
||||
|
||||
resp = copy.deepcopy(resp)
|
||||
|
||||
try:
|
||||
# resp中json是方法不是属性,需要手动更改为属性
|
||||
resp.json = resp.json()
|
||||
except json.decoder.JSONDecodeError:
|
||||
resp.json = {"msg": "is not json data"}
|
||||
|
||||
data = getattr(resp, attr)
|
||||
# print(data)
|
||||
if expr.startswith("/"): # xpath
|
||||
res = None
|
||||
elif expr.startswith("$"): # jsonpath
|
||||
@@ -53,20 +53,23 @@ class Exchange:
|
||||
else: # 如果没有数据
|
||||
value = "not data"
|
||||
|
||||
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
|
||||
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
|
||||
data = self.file.load()
|
||||
data[var_name] = value # 保存变量
|
||||
self.file.save(data) # 持久化存储到文件
|
||||
|
||||
self.file[var_name] = value # 保存变量
|
||||
self.file.save() # 持久化存储到文件
|
||||
@allure.step("替换变量")
|
||||
def replace(self, case_info: CaseInfo):
|
||||
...
|
||||
|
||||
def replace(self, case_info: dict) -> dict:
|
||||
logger.info(f"变量替换:{case_info}")
|
||||
# 1,将case_info转换为字符串
|
||||
case_info_str = case_info.to_yaml()
|
||||
data = TestCaseHandle(**case_info)
|
||||
case_info_str = data.to_string()
|
||||
print(f"{case_info_str=}")
|
||||
# 2,替换字符串
|
||||
case_info_str = Template(case_info_str).render(self.file)
|
||||
case_info_str = Template(case_info_str).render(self.file.load())
|
||||
print(f"{case_info_str=}")
|
||||
# 3,将字符串转换成case_info
|
||||
new_case_info = case_info.by_yaml(case_info_str)
|
||||
new_case_info = data.to_dict(case_info_str)
|
||||
return new_case_info
|
||||
|
||||
|
||||
@@ -87,14 +90,24 @@ if __name__ == '__main__':
|
||||
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
|
||||
exchanger.extract(mock_resp, "data", "json", '$.data', 0)
|
||||
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
|
||||
case_info = CaseInfo(
|
||||
title="单元测试",
|
||||
request={
|
||||
# mock_case_info = CaseInfo(
|
||||
# title="单元测试",
|
||||
# request={
|
||||
# "data":
|
||||
# {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
|
||||
# },
|
||||
# extract={},
|
||||
# validate={}
|
||||
# )
|
||||
mock_case_info = {
|
||||
"title": "单元测试",
|
||||
"request": {
|
||||
"data":
|
||||
{"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
|
||||
},
|
||||
extract={},
|
||||
validate={}
|
||||
)
|
||||
new_case_info = exchanger.replace(case_info)
|
||||
print(new_case_info)
|
||||
"extract": {},
|
||||
"validate": {}
|
||||
}
|
||||
|
||||
new_mock_case_info = exchanger.replace(mock_case_info)
|
||||
print(new_mock_case_info)
|
||||
|
||||
22
commons/file_processors/__init__.py
Normal file
22
commons/file_processors/__init__.py
Normal file
@@ -0,0 +1,22 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: __init__.py
|
||||
@date: 2025/3/4 17:23
|
||||
@desc:
|
||||
"""
|
||||
from .base_processor import BaseFileProcessor
|
||||
from .json_processor import JsonProcessor
|
||||
from .yaml_processor import YamlProcessor
|
||||
from .processor_factory import get_processor_class
|
||||
|
||||
__all__ = [
|
||||
"BaseFileProcessor",
|
||||
"JsonProcessor",
|
||||
"YamlProcessor",
|
||||
"get_processor_class",
|
||||
]
|
||||
34
commons/file_processors/base_processor.py
Normal file
34
commons/file_processors/base_processor.py
Normal file
@@ -0,0 +1,34 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: base
|
||||
@date: 2025/3/4 17:23
|
||||
@desc:
|
||||
"""
|
||||
import abc
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
|
||||
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
|
||||
"""
|
||||
文件处理器的抽象基类。
|
||||
定义了所有子类必须实现的方法。
|
||||
"""
|
||||
def __init__(self, filepath: Union[str, Path], **kwargs):
|
||||
|
||||
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
|
||||
|
||||
@abc.abstractmethod
|
||||
def load(self) -> dict:
|
||||
"""加载."""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
|
||||
"""将数据保存."""
|
||||
raise NotImplementedError
|
||||
86
commons/file_processors/json_processor.py
Normal file
86
commons/file_processors/json_processor.py
Normal file
@@ -0,0 +1,86 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: yaml_processor
|
||||
@date: 2025/3/4 17:28
|
||||
@desc:
|
||||
"""
|
||||
import logging
|
||||
from typing import Union, Any
|
||||
from pathlib import Path
|
||||
import json
|
||||
from commons.file_processors.base_processor import BaseFileProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JsonProcessor(BaseFileProcessor):
|
||||
"""
|
||||
用于处理 JSON 文件的类。
|
||||
提供了从文件加载 JSON 数据为字典,以及将字典保存为 JSON 文件的功能。
|
||||
"""
|
||||
|
||||
def __init__(self, filepath: Union[str, Path], **kwargs):
|
||||
"""
|
||||
初始化 JsonFile 对象。
|
||||
|
||||
Args:
|
||||
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
|
||||
"""
|
||||
super().__init__(filepath, **kwargs)
|
||||
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
|
||||
|
||||
def load(self) -> dict[str, Any]:
|
||||
"""
|
||||
从 Json 文件加载数据。
|
||||
:return:
|
||||
"""
|
||||
if not self.filepath.exists():
|
||||
logger.warning(f"文件 {self.filepath} 不存在.")
|
||||
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
|
||||
try:
|
||||
with open(self.filepath, "r", encoding="utf-8") as f:
|
||||
loaded_data = json.load(f)
|
||||
if not isinstance(loaded_data, dict): # 确保加载的是字典
|
||||
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
|
||||
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
|
||||
return loaded_data
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
|
||||
raise e
|
||||
|
||||
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
|
||||
"""
|
||||
将字典数据保存到 json 文件。
|
||||
|
||||
Args:
|
||||
:param data:
|
||||
:param new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。
|
||||
"""
|
||||
filepath = Path(new_filepath) if new_filepath else self.filepath
|
||||
filepath.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
json.dump(
|
||||
data,
|
||||
f,
|
||||
ensure_ascii=False, # 允许非ASCII字符
|
||||
sort_keys=False # 不排序键
|
||||
)
|
||||
logger.info(f"数据已成功保存到 {filepath}")
|
||||
except (TypeError, OSError, json.JSONDecodeError) as e:
|
||||
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 示例用法
|
||||
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
|
||||
json_file = JsonProcessor(json_path)
|
||||
print(json_file.load())
|
||||
print(type(json_file))
|
||||
# json_file.save()
|
||||
57
commons/file_processors/processor_factory.py
Normal file
57
commons/file_processors/processor_factory.py
Normal file
@@ -0,0 +1,57 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: file_handle
|
||||
@date: 2025/3/7 09:31
|
||||
@desc:
|
||||
"""
|
||||
from pathlib import Path
|
||||
from typing import Type, Union
|
||||
|
||||
from commons.file_processors.base_processor import BaseFileProcessor
|
||||
from commons.file_processors.yaml_processor import YamlProcessor
|
||||
from commons.file_processors.json_processor import JsonProcessor
|
||||
|
||||
# 类型别名,表示处理器类的字典
|
||||
ProcessorMap = dict[str, Type[BaseFileProcessor]]
|
||||
processors: ProcessorMap = {
|
||||
'yaml': YamlProcessor,
|
||||
'yml': YamlProcessor,
|
||||
'json': JsonProcessor,
|
||||
|
||||
}
|
||||
|
||||
|
||||
class UnsupportedFileTypeError(Exception):
|
||||
"""当文件类型不被支持时抛出此异常。"""
|
||||
pass
|
||||
|
||||
|
||||
# def get_processor_class(file_suffix: str = "yaml") -> Type[BaseFileProcessor]:
|
||||
def get_processor_class(fp: Union[Path, str]) -> 'BaseFileProcessor':
|
||||
fp = Path(fp)
|
||||
if fp.is_file():
|
||||
file_suffix = fp.suffix[1:]
|
||||
processor_class = processors.get(file_suffix.lower(), YamlProcessor) # 代理模式
|
||||
|
||||
return processor_class(fp) # 默认回退到 Yaml
|
||||
else:
|
||||
raise UnsupportedFileTypeError(fp)
|
||||
|
||||
|
||||
# FileHandle = get_processor("yaml")
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 示例用法
|
||||
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
|
||||
# yaml_file = FileHandle(yaml_path)
|
||||
# print(yaml_file.load())
|
||||
# print(type(yaml_file))
|
||||
# file_suffix = Path(yaml_path).suffix[1:]
|
||||
# print(file_suffix)
|
||||
get_processor = get_processor_class(yaml_path)
|
||||
print(get_processor.load())
|
||||
137
commons/file_processors/yaml_processor.py
Normal file
137
commons/file_processors/yaml_processor.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: yaml_processor
|
||||
@date: 2025/3/4 17:28
|
||||
@desc:
|
||||
"""
|
||||
import logging
|
||||
from typing import Union
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
from commons.file_processors.base_processor import BaseFileProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class YamlProcessor(BaseFileProcessor):
|
||||
"""
|
||||
用于处理 YAML 文件的类,继承自 dict。
|
||||
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
|
||||
并可以直接像字典一样访问 YAML 数据。
|
||||
"""
|
||||
|
||||
def __init__(self, filepath: Union[str, Path], **kwargs):
|
||||
"""
|
||||
初始化 YamlFile 对象。
|
||||
|
||||
Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
|
||||
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
|
||||
如果不提供,则尝试从 filepath 加载数据。
|
||||
"""
|
||||
super().__init__(filepath, **kwargs)
|
||||
# self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
|
||||
|
||||
def load(self) -> dict:
|
||||
"""
|
||||
从 YAML 文件加载数据
|
||||
:return:
|
||||
"""
|
||||
if not self.filepath.exists():
|
||||
logger.warning(f"文件 {self.filepath} 不存在.")
|
||||
raise FileNotFoundError(f"文件 {self.filepath} 不存在.")
|
||||
|
||||
try:
|
||||
with open(self.filepath, "r", encoding="utf-8") as f:
|
||||
loaded_data = yaml.safe_load(f)
|
||||
if not isinstance(loaded_data, dict): # 确保加载的是字典
|
||||
logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
|
||||
raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.")
|
||||
return loaded_data
|
||||
|
||||
except yaml.YAMLError as e:
|
||||
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
|
||||
raise e
|
||||
|
||||
def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None:
|
||||
"""
|
||||
将字典数据保存到 YAML 文件。
|
||||
:param data:
|
||||
:param new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。
|
||||
|
||||
"""
|
||||
filepath = Path(new_filepath) if new_filepath else self.filepath
|
||||
|
||||
# 确保目标目录存在
|
||||
filepath.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
yaml.safe_dump(
|
||||
data,
|
||||
stream=f,
|
||||
allow_unicode=True,
|
||||
sort_keys=False,
|
||||
default_flow_style=False
|
||||
)
|
||||
logger.info(f"数据已成功保存到 {filepath}")
|
||||
except (TypeError, OSError, yaml.YAMLError) as e:
|
||||
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 示例用法
|
||||
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
|
||||
yaml_file = YamlProcessor(yaml_path)
|
||||
print(yaml_file.load())
|
||||
print(type(yaml_file))
|
||||
|
||||
# # 直接像字典一样访问数据
|
||||
# print("加载的数据:", yaml_file) # 直接打印对象,就是打印字典内容
|
||||
# print("title:", yaml_file.get("title")) # 使用 get 方法
|
||||
# if "title" in yaml_file: # 使用 in 检查键
|
||||
# print("原始title:", yaml_file["title"]) # 使用方括号访问
|
||||
# yaml_file["title"] = "新的标题" # 使用方括号修改
|
||||
# print("修改后的title:", yaml_file["title"])
|
||||
# #
|
||||
# yaml_file["new_key"] = "new_value" # 添加新的键值对
|
||||
#
|
||||
# # 将字典转换为 YAML 字符串
|
||||
# yaml_string = yaml_file.to_string()
|
||||
# print("\nYAML 字符串:", yaml_string)
|
||||
# #
|
||||
# # 将 YAML 字符串转换回字典 (并更新 yaml_file)
|
||||
# yaml_file.to_dict(yaml_string)
|
||||
# print("\n从字符串加载的数据:", yaml_file)
|
||||
#
|
||||
# # 保存修改后的数据 (覆盖原文件)
|
||||
# yaml_file.save()
|
||||
#
|
||||
# # 保存到新文件
|
||||
# new_yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user_new.yaml'
|
||||
# yaml_file.save(new_filepath=new_yaml_path)
|
||||
|
||||
# 测试从字符串初始化
|
||||
# yaml_string2 = """
|
||||
# name: Test User
|
||||
# age: 30
|
||||
# """
|
||||
|
||||
# yaml_file2 = YamlFile("test2.yaml", data=yaml.safe_load(yaml_string2)) # 从字符串初始化
|
||||
# print("\n从字符串初始化的 YamlFile:", yaml_file2)
|
||||
# yaml_file2.save() # 保存到 test2.yaml
|
||||
#
|
||||
# 测试文件不存在的情形
|
||||
# non_existent_file = YamlFile("non_existent_file.yaml")
|
||||
# print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {}
|
||||
# non_existent_file['a'] = 1 # 可以直接添加
|
||||
# print("\n加载不存在的文件:", non_existent_file)
|
||||
|
||||
@@ -10,34 +10,56 @@
|
||||
@desc: 读取和保存yaml文件
|
||||
"""
|
||||
import logging
|
||||
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
from commons.models import CaseInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class YamlFile(dict):
|
||||
def __init__(self, path):
|
||||
super().__init__()
|
||||
self.path = path
|
||||
self.load()
|
||||
super().__init__() # 初始化父类 dict
|
||||
self.path = Path(path)
|
||||
self.load() # 链式初始化加载
|
||||
|
||||
def load(self):
|
||||
with open(self.path, "r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f) # 字典
|
||||
if data:
|
||||
self.update(data) # 把两个字段的内容合并
|
||||
if self.path.exists():
|
||||
with open(self.path, "r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f) or {} # 加载数据,空文件返回空字典
|
||||
self.clear() # 清空当前实例
|
||||
self.update(data) # 更新字典内容
|
||||
else:
|
||||
logger.warning(f"File {self.path} not found, initialized empty.")
|
||||
return self # 链式调用
|
||||
|
||||
def to_yaml(self) -> str:
|
||||
return yaml.safe_dump(
|
||||
dict(self),
|
||||
allow_unicode=True,
|
||||
sort_keys=False
|
||||
)
|
||||
|
||||
|
||||
@classmethod
|
||||
def by_yaml(cls, yaml_str):
|
||||
data = yaml.safe_load(yaml_str) or {}
|
||||
return cls({**data}) # 通过类方法创建实例
|
||||
|
||||
def save(self):
|
||||
with open(self.path, "w", encoding="utf-8") as f:
|
||||
yaml.safe_dump(
|
||||
dict(self),
|
||||
dict(self), # 直接 dump 实例本身(已继承 dict)
|
||||
stream=f,
|
||||
allow_unicode=True, # allow_unicode:使用unicode编码正常显示中文
|
||||
sort_keys=False) # sort_keys:保持原有排序
|
||||
allow_unicode=True,
|
||||
sort_keys=False
|
||||
)
|
||||
return self # 链式调用
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from commons.models import CaseInfo
|
||||
|
||||
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml'
|
||||
yaml_file = YamlFile(yaml_path)
|
||||
# yaml_file.load()
|
||||
|
||||
@@ -17,46 +17,79 @@ import hashlib
|
||||
|
||||
from commons.databases import db
|
||||
|
||||
# from commons.files import YamlFile
|
||||
from commons.file_processors.processor_factory import get_processor_class
|
||||
from commons import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Funcs:
|
||||
FUNC_MAPPING = {
|
||||
"int": int,
|
||||
"float": float,
|
||||
"bool": bool
|
||||
} # 内置函数有的,直接放入mapping;内置函数没有的,在funcs中定义,自动放入mapping
|
||||
|
||||
@classmethod
|
||||
def register(cls, name: str | None = None):
|
||||
def decorator(func):
|
||||
if name is None:
|
||||
cls.FUNC_MAPPING[func.__name__] = func
|
||||
cls.FUNC_MAPPING[name] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@Funcs.register("url_unquote")
|
||||
def url_unquote(s: str) -> str:
|
||||
return urllib.parse.unquote(s)
|
||||
|
||||
|
||||
@Funcs.register("str")
|
||||
def to_string(s) -> str:
|
||||
# 将数据转换为str类型。
|
||||
return f"'{s}'"
|
||||
|
||||
|
||||
@Funcs.register("time_str")
|
||||
def time_str() -> str:
|
||||
return str(time.time())
|
||||
|
||||
|
||||
@Funcs.register("add")
|
||||
def add(a, b):
|
||||
return str(int(a) + int(b))
|
||||
|
||||
|
||||
@Funcs.register("sql")
|
||||
def sql(s: str) -> str:
|
||||
res = db.execute_sql(s)
|
||||
|
||||
return res[0][0]
|
||||
|
||||
|
||||
@Funcs.register("new_id")
|
||||
def new_id():
|
||||
# 自增,永不重复
|
||||
id_file = YamlFile(settings.id_path)
|
||||
id_file["id"] += 1
|
||||
id_file.save()
|
||||
id_file = get_processor_class(settings.id_path)
|
||||
data = id_file.load()
|
||||
data["id"] += 1
|
||||
id_file.save(data)
|
||||
|
||||
return id_file["id"]
|
||||
return data["id"]
|
||||
|
||||
|
||||
@Funcs.register("last_id")
|
||||
def last_id() -> str:
|
||||
# 不自增,只返回结果
|
||||
|
||||
id_file = YamlFile("id.yaml")
|
||||
return id_file["id"]
|
||||
id_file = get_processor_class(settings.id_path)
|
||||
data = id_file.load()
|
||||
return data["id"]
|
||||
|
||||
|
||||
@Funcs.register("md5")
|
||||
def md5(content: str) -> str:
|
||||
# 1,原文转为字节
|
||||
content = content.encode("utf-8")
|
||||
@@ -64,6 +97,7 @@ def md5(content: str) -> str:
|
||||
return result
|
||||
|
||||
|
||||
@Funcs.register("base64_encode")
|
||||
def base64_encode(content: str) -> str:
|
||||
# 1,原文转二进制
|
||||
content = content.encode("utf-8")
|
||||
@@ -75,6 +109,7 @@ def base64_encode(content: str) -> str:
|
||||
return encode_str
|
||||
|
||||
|
||||
@Funcs.register("base64_decode")
|
||||
def base64_decode(content: str) -> str:
|
||||
# 1,原文转二进制
|
||||
content = content.encode("utf-8")
|
||||
@@ -86,19 +121,24 @@ def base64_decode(content: str) -> str:
|
||||
return decode_str
|
||||
|
||||
|
||||
@Funcs.register("rsa_encode")
|
||||
def rsa_encode(content: str) -> str:
|
||||
...
|
||||
|
||||
|
||||
@Funcs.register("rsa_decode")
|
||||
def rsa_decode(content: str) -> str:
|
||||
...
|
||||
|
||||
|
||||
@Funcs.register()
|
||||
def func_name_test():
|
||||
...
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
|
||||
# print(res)
|
||||
a = "这是中文dddddd"
|
||||
bb = base64_encode(a)
|
||||
print(bb)
|
||||
cc = base64_decode(bb)
|
||||
print(cc)
|
||||
# print(f"计数器:{new_id()}")
|
||||
# print(f"当前数值:{last_id()}")
|
||||
print(Funcs().FUNC_MAPPING)
|
||||
|
||||
@@ -10,89 +10,56 @@
|
||||
@desc: 声明yaml用例格式
|
||||
"""
|
||||
import logging
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Union, Optional
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import allure
|
||||
import yaml
|
||||
|
||||
from commons.templates import Template
|
||||
from commons import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CaseInfo:
|
||||
class RequestModel:
|
||||
method: str
|
||||
url: str
|
||||
headers: Optional[dict] = None
|
||||
# body: Optional[Union[dict, str]] = None
|
||||
params: Optional[Union[dict, str]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestCaseModel:
|
||||
title: str
|
||||
request: dict
|
||||
request: RequestModel
|
||||
extract: dict
|
||||
validate: dict
|
||||
parametrize: list = ""
|
||||
epic: str = settings.allure_epic
|
||||
feature: str = settings.allure_feature
|
||||
story: str = settings.allure_story
|
||||
parametrize: list = field(default_factory=list)
|
||||
epic: str = field(default_factory=lambda: settings.allure_epic)
|
||||
feature: str = field(default_factory=lambda: settings.allure_feature)
|
||||
story: str = field(default_factory=lambda: settings.allure_story)
|
||||
|
||||
def to_yaml(self) -> str:
|
||||
# 序列化成yaml字符串
|
||||
yaml_str = yaml.safe_dump(
|
||||
asdict(self),
|
||||
allow_unicode=True, # allow_unicode:使用unicode编码正常显示中文
|
||||
sort_keys=False)
|
||||
return yaml_str
|
||||
def __post_init__(self):
|
||||
# 必填字段非空校验
|
||||
if self.title is None:
|
||||
raise ValueError("Title cannot be empty")
|
||||
|
||||
@classmethod
|
||||
def by_yaml(cls, yaml_str):
|
||||
# 反序列化
|
||||
obj = cls(**yaml.safe_load(yaml_str))
|
||||
return obj
|
||||
|
||||
@allure.step("断言")
|
||||
def assert_all(self):
|
||||
if not self.validate:
|
||||
return
|
||||
for assert_type, assert_value in self.validate.items():
|
||||
for msg, data in assert_value.items():
|
||||
a, b = data[0], data[1]
|
||||
# print(assert_type, a, b, msg)
|
||||
match assert_type:
|
||||
case 'equals':
|
||||
logger.info(f"assert {a} == {b}, {msg}")
|
||||
assert a == b, msg
|
||||
case 'not_equals':
|
||||
logger.info(f"assert {a} != {b}, {msg}")
|
||||
assert a != b, msg
|
||||
case 'contains':
|
||||
logger.info(f"assert {a} in {b}, {msg}")
|
||||
assert a in b, msg
|
||||
case 'not_contains':
|
||||
logger.info(f"assert {a} not in {b}, {msg}")
|
||||
assert a not in b, msg
|
||||
# case "xxxxx
|
||||
|
||||
def ddt(self) -> list: # 返回一个列表,列表中应该包含N个注入了变量的caseInfo
|
||||
case_list = []
|
||||
if not self.parametrize: # 没有使用数据驱动测试
|
||||
case_list.append('')
|
||||
else: # 使用数据驱动测试
|
||||
args_name = self.parametrize[0]
|
||||
args_value_list = self.parametrize[1:]
|
||||
for args_value in args_value_list:
|
||||
d = dict(zip(args_name, args_value))
|
||||
# d 就是数据驱动测试的变量,应输入到用例中
|
||||
case_info_str = self.to_yaml() # 转字符串
|
||||
case_info_str = Template(case_info_str).render(d) # 输入变量
|
||||
case_info = self.by_yaml(case_info_str) # 转成类
|
||||
|
||||
case_list.append(case_info) # 加入到返回值
|
||||
return case_list
|
||||
# 校验RequestModel
|
||||
if isinstance(self.request, dict):
|
||||
try:
|
||||
self.request = RequestModel(**self.request) # RequestModel 的 __post_init__ 会被调用
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"解析 'request' 字段失败: {e} (数据: {self.request})") from e
|
||||
elif not isinstance(self.request, RequestModel): # 如果不是 dict 也不是 RequestModel
|
||||
raise TypeError(
|
||||
f"字段 'request' 必须是字典 (将在内部转换为 RequestModel) 或 RequestModel 实例, "
|
||||
f"得到的是 {type(self.request).__name__}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with open(r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml', encoding='utf-8') as f:
|
||||
with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
|
||||
data = yaml.safe_load(f)
|
||||
# print(data)
|
||||
case_info = CaseInfo(**data)
|
||||
s = case_info.to_yaml()
|
||||
print(s)
|
||||
new_case_info = case_info.by_yaml(s)
|
||||
print(new_case_info)
|
||||
case_info = TestCaseModel(**data)
|
||||
|
||||
@@ -9,14 +9,16 @@
|
||||
@date: 2024 2024/9/12 21:56
|
||||
@desc:
|
||||
"""
|
||||
from urllib.parse import urljoin
|
||||
import logging
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
import allure
|
||||
from requests import Response, PreparedRequest
|
||||
import allure
|
||||
|
||||
|
||||
logger = logging.getLogger("requests.session")
|
||||
# logger = logging.getLogger("requests.session")
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
@@ -41,7 +43,8 @@ class Session(requests.Session):
|
||||
|
||||
logger.info(f"接收响应 <<<<<< 状态码 = {resp.status_code}")
|
||||
logger.info(f"接收响应 <<<<<< 响应头 = {resp.headers}")
|
||||
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
|
||||
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.text}")
|
||||
# logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
@@ -9,16 +9,26 @@
|
||||
@date: 2025/2/23 21:34
|
||||
@desc:
|
||||
"""
|
||||
base_url = 'http://127.0.0.1:8000'
|
||||
case_path = r"E:\PyP\InterfaceAutoTest\TestCases"
|
||||
exchanger = r"E:\PyP\InterfaceAutoTest\extract.yaml"
|
||||
id_path =r"E:\PyP\InterfaceAutoTest\id.yaml"
|
||||
from pathlib import Path
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
db_host = '119.91.19.171' # ip
|
||||
db_port = 3306 # 端口
|
||||
db_user = 'root' # 用户名
|
||||
db_password = 'mysql_hNahSe' # 密码
|
||||
db_database = 'answer' # 库名
|
||||
load_dotenv()
|
||||
|
||||
root_path = (Path(__file__)).resolve().parents[1]
|
||||
|
||||
base_url = os.getenv("BASE_URL")
|
||||
cases_dir = rf"{root_path}\TestCases\answer"
|
||||
exchanger = rf"{root_path}\extract.yaml"
|
||||
id_path = rf"{root_path}\id.yaml"
|
||||
|
||||
test_suffix = "yaml"
|
||||
|
||||
db_host = os.getenv("DB_HOST") # ip
|
||||
db_port = os.getenv("DB_PORT") # 端口
|
||||
db_user = os.getenv("DB_USER") # 用户名
|
||||
db_password = os.getenv("DB_PASSWORD") # 密码
|
||||
db_database = os.getenv("DB_DATABASE")
|
||||
|
||||
allure_epic: str = "项目名称:answer"
|
||||
allure_feature: str = "默认特征(feature)"
|
||||
@@ -26,3 +36,7 @@ allure_story: str = "默认事件(story)"
|
||||
|
||||
rsa_public = ""
|
||||
rsa_private = ""
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(root_path)
|
||||
print(base_url,db_host,db_port,db_user,db_password,db_database)
|
||||
|
||||
@@ -13,26 +13,17 @@ import copy
|
||||
import logging
|
||||
import re
|
||||
import string
|
||||
from commons.funcs import Funcs
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _str(s) -> str:
|
||||
# 将数据转换为str类型。
|
||||
return f"'{s}'"
|
||||
|
||||
|
||||
class Template(string.Template):
|
||||
"""
|
||||
1,支持函数调用
|
||||
2,参数也可以是变量
|
||||
"""
|
||||
func_mapping = {
|
||||
"str": _str,
|
||||
"int": int,
|
||||
"float": float,
|
||||
"bool": bool
|
||||
} # 内置函数有的,直接放入mapping;内置函数没有的,在funcs中定义,自动放入mapping
|
||||
|
||||
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
|
||||
|
||||
@@ -50,15 +41,17 @@ class Template(string.Template):
|
||||
:return: 替换后的结果
|
||||
"""
|
||||
mapping = copy.deepcopy(mapping)
|
||||
mapping.update(self.func_mapping) # 合并两个mapping
|
||||
|
||||
logger.info(f"mapping更新前: {mapping}")
|
||||
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
|
||||
mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
|
||||
logger.info(f"mapping更新后: {mapping}")
|
||||
def convert(mo):
|
||||
func_name = mo.group("func_name")
|
||||
func_args = mo.group("func_args").split(",")
|
||||
func = mapping.get(func_name) # 读取指定函数
|
||||
func_args_value = [mapping.get(arg, arg) for arg in func_args]
|
||||
|
||||
if func_args_value == [""]: # 处理没有参数的func
|
||||
if func_args_value == [""]: # 处理没有参数的func
|
||||
func_args_value = []
|
||||
|
||||
if not callable(func):
|
||||
@@ -68,15 +61,3 @@ class Template(string.Template):
|
||||
|
||||
return self.call_pattern.sub(convert, template)
|
||||
|
||||
|
||||
def hot_load():
|
||||
from commons import funcs
|
||||
for func_name in dir(funcs): # 遍历模块中的所有函数
|
||||
if func_name.startswith("_"):
|
||||
continue
|
||||
func_code = getattr(funcs, func_name) # 取到函数对象
|
||||
if callable(func_code): # 如果是一个可以调用的函数
|
||||
Template.func_mapping[func_name] = func_code # 函数放到Template中
|
||||
|
||||
|
||||
hot_load()
|
||||
|
||||
10
extract.yaml
10
extract.yaml
@@ -1,3 +1,7 @@
|
||||
code: 200
|
||||
msg: 成功。
|
||||
reason: base.success
|
||||
name: 张三
|
||||
age: '18'
|
||||
data:
|
||||
- 3
|
||||
- 4
|
||||
- 5
|
||||
aaa: null
|
||||
|
||||
@@ -1,18 +1,54 @@
|
||||
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_1_user.yaml')'
|
||||
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_2_url.yaml')'
|
||||
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.find_yaml_case:44 - load file yaml_path=WindowsPath('E:/PyP/InterfaceAutoTest/TestCases/test_3_sql.yaml')'
|
||||
''02/23/2025 10:17:34 PM' [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - ---------------Start: main.py::TestAPI::test_1_user.yaml[查询用户信息0]---------------'
|
||||
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:67 - =================================用例开始执行:查询用户信息=================================='
|
||||
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:71 - 1,正在注入变量...'
|
||||
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:74 - 2,正在请求接口...'
|
||||
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info'
|
||||
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login'}'
|
||||
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None '
|
||||
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200'
|
||||
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Sun, 23 Feb 2025 14:17:34 GMT', 'Content-Length': '64'}'
|
||||
''02/23/2025 10:17:34 PM' [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': '成功。', 'data': []}'
|
||||
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:77 - 3,正在提取变量...'
|
||||
''02/23/2025 10:17:34 PM' [commons.cases] INFO cases.test_func:83 - 4,正在断言...'
|
||||
''02/23/2025 10:17:34 PM' [commons.models] INFO models.assert_all:59 - assert 200 == code1, 状态码等于200'
|
||||
''02/23/2025 10:17:34 PM' [pytest_result_log] ERROR plugin.pytest_result_log:190 - test status is FAILED (main.py::TestAPI::test_1_user.yaml[查询用户信息0]): AssertionError'
|
||||
''02/23/2025 10:17:34 PM' [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ----------------End: main.py::TestAPI::test_1_user.yaml[查询用户信息0]----------------'
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:45 - 加载文件:D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:50 - case_info=title: 查询状态信息
|
||||
request:
|
||||
method: get
|
||||
url: /answer/api/v1/connector/info
|
||||
headers:
|
||||
Host: 119.91.19.171:40065
|
||||
Accept-Language: en_US
|
||||
Accept: application/json, text/plain, */*
|
||||
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,
|
||||
like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0
|
||||
Referer: http://119.91.19.171:40065/users/login
|
||||
Accept-Encoding: gzip, deflate
|
||||
extract:
|
||||
msg:
|
||||
- json
|
||||
- $.msg
|
||||
- 0
|
||||
validate:
|
||||
equals:
|
||||
状态码等于200:
|
||||
- Success.
|
||||
- ${msg}
|
||||
parametrize: []
|
||||
epic: 项目名称:answer
|
||||
feature: 页面状态
|
||||
story: 状态
|
||||
|
||||
03/03/2025 05:34:28 PM [commons.models] INFO models.ddt:81 - 1,执行这一步
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.new_case:63 - ddt_title=['查询状态信息']
|
||||
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - -----------------Start: main.py::TestAPI::test_1_status[查询状态信息]-----------------
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:71 - =================================用例开始执行:查询状态信息==================================
|
||||
03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称:answer', feature='页面状态', story='状态')
|
||||
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12}
|
||||
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': <class 'int'>, 'float': <class 'float'>, 'bool': <class 'bool'>, 'url_unquote': <function url_unquote at 0x00000299E6AAC0D0>, 'str': <function to_string at 0x00000299E6AAC160>, 'time_str': <function time_str at 0x00000299E6AAC1F0>, 'add': <function add at 0x00000299E6AAC280>, 'sql': <function sql at 0x00000299E6AAC310>, 'new_id': <function new_id at 0x00000299E6AAC3A0>, 'last_id': <function last_id at 0x00000299E6AAC430>, 'md5': <function md5 at 0x00000299E6AAC4C0>, 'base64_encode': <function base64_encode at 0x00000299E6AAC550>, 'base64_decode': <function base64_decode at 0x00000299E6AAC5E0>, 'rsa_encode': <function rsa_encode at 0x00000299E6AAC670>, 'rsa_decode': <function rsa_decode at 0x00000299E6AAC700>}
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:75 - 1,正在注入变量...
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:78 - 2,正在请求接口...
|
||||
03/03/2025 05:34:28 PM [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info
|
||||
03/03/2025 05:34:28 PM [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': 'application/json, text/plain, */*', 'Connection': 'keep-alive', 'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Referer': 'http://119.91.19.171:40065/users/login'}
|
||||
03/03/2025 05:34:28 PM [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None
|
||||
03/03/2025 05:34:28 PM [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200
|
||||
03/03/2025 05:34:28 PM [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Mon, 03 Mar 2025 09:34:29 GMT', 'Content-Length': '63'}
|
||||
03/03/2025 05:34:28 PM [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': 'Success.', 'data': []}
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:81 - 3,正在提取变量...
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:87 - 4,正在断言...
|
||||
03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称:answer', feature='页面状态', story='状态')
|
||||
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12}
|
||||
03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': <class 'int'>, 'float': <class 'float'>, 'bool': <class 'bool'>, 'url_unquote': <function url_unquote at 0x00000299E6AAC0D0>, 'str': <function to_string at 0x00000299E6AAC160>, 'time_str': <function time_str at 0x00000299E6AAC1F0>, 'add': <function add at 0x00000299E6AAC280>, 'sql': <function sql at 0x00000299E6AAC310>, 'new_id': <function new_id at 0x00000299E6AAC3A0>, 'last_id': <function last_id at 0x00000299E6AAC430>, 'md5': <function md5 at 0x00000299E6AAC4C0>, 'base64_encode': <function base64_encode at 0x00000299E6AAC550>, 'base64_decode': <function base64_decode at 0x00000299E6AAC5E0>, 'rsa_encode': <function rsa_encode at 0x00000299E6AAC670>, 'rsa_decode': <function rsa_decode at 0x00000299E6AAC700>}
|
||||
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:32 - 键:equals,值:{'状态码等于200': ['Success.', 'Success.']}
|
||||
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:34 - 获取到的断言:<function validate_equals at 0x00000299E6AAC940>
|
||||
03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.validate_equals:43 - assert Success. == Success., 状态码等于200执行这段代码
|
||||
03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:92 - =================================用例执行结束:查询状态信息==================================
|
||||
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_result_log:190 - test status is PASSED (main.py::TestAPI::test_1_status[查询状态信息]):
|
||||
03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ------------------End: main.py::TestAPI::test_1_status[查询状态信息]------------------
|
||||
|
||||
7
main.py
7
main.py
@@ -5,14 +5,15 @@ import pytest
|
||||
|
||||
from commons.cases import TestAPI
|
||||
|
||||
TestAPI.find_yaml_case() # 加载yaml文件
|
||||
TestAPI.run() # 加载yaml文件
|
||||
|
||||
if __name__ == '__main__':
|
||||
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
|
||||
# 1,启动框架(生成临时文件)
|
||||
pytest.main([__file__, "-x", "-v"]) # -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录:指定pytest.ini路径
|
||||
# -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录:指定pytest.ini路径;--alluredir=temp。指定数据生成目录
|
||||
pytest.main([__file__, "-x", "-v","--alluredir=temp"])
|
||||
# 2,生成HTML报告
|
||||
os.system('allure generate temp -o report --clean') # java程序只能借助操作系统执行
|
||||
|
||||
# 3,备份日志
|
||||
shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log")
|
||||
# shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log")
|
||||
|
||||
@@ -16,6 +16,8 @@ jsonpath = "^0.82.2"
|
||||
pymysql = "^1.1.1"
|
||||
pytest-result-log = "^1.2.2"
|
||||
allure-pytest = "^2.13.5"
|
||||
cryptography = "^44.0.2"
|
||||
dotenv = "^0.9.9"
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
@@ -3,8 +3,8 @@ addopts = -q --show-capture=no
|
||||
|
||||
|
||||
log_file = logs/pytest.log
|
||||
log_file_level = info
|
||||
log_file_format = '%(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s'
|
||||
log_file_date_format = '%m/%d/%Y %I:%M:%S %p'
|
||||
log_file_level = debug
|
||||
log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s
|
||||
log_file_date_format = %m/%d/%Y %I:%M:%S %p
|
||||
|
||||
disable_test_id_escaping_and_forfeit_all_rights_to_community_support = true
|
||||
48
utils/case_parser.py
Normal file
48
utils/case_parser.py
Normal file
@@ -0,0 +1,48 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: case_parser
|
||||
@date: 2025/2/27 17:25
|
||||
@desc:
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, asdict, field
|
||||
|
||||
import yaml
|
||||
|
||||
from commons.models import CaseInfo
|
||||
|
||||
|
||||
class CaseParser:
|
||||
@staticmethod
|
||||
def to_yaml(case_data: dict) -> str:
|
||||
try:
|
||||
|
||||
CaseInfo(**case_data)
|
||||
except TypeError as error:
|
||||
logging.error(error)
|
||||
raise error
|
||||
return yaml.safe_dump(case_data, allow_unicode=True, sort_keys=False)
|
||||
|
||||
@staticmethod
|
||||
def from_yaml(yaml_str: str) -> CaseInfo:
|
||||
return CaseInfo(**yaml.safe_load(yaml_str))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with open(r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
print(data)
|
||||
print(type(data))
|
||||
# print(CaseInfo(**data))
|
||||
case_parser = CaseParser()
|
||||
case_data_ = case_parser.to_yaml(data)
|
||||
# print(case_data_)
|
||||
# case_parser.from_yaml(case_data_)
|
||||
# print(type(case_data_))
|
||||
80
utils/case_validator.py
Normal file
80
utils/case_validator.py
Normal file
@@ -0,0 +1,80 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: case_validator
|
||||
@date: 2025/2/27 17:25
|
||||
@desc:
|
||||
"""
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CaseValidator:
|
||||
VALIDATORS = {}
|
||||
|
||||
@classmethod
|
||||
def register(cls, name: str):
|
||||
def decorator(func):
|
||||
cls.VALIDATORS[name] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
@classmethod
|
||||
def assert_all(cls, validate: dict):
|
||||
if not validate:
|
||||
return
|
||||
for assert_type, cases in validate.items():
|
||||
logger.info(f"键:{assert_type},值:{cases}")
|
||||
validator = cls.VALIDATORS.get(assert_type)
|
||||
logger.info(f"获取到的断言:{validator}")
|
||||
if not validator:
|
||||
raise KeyError(f"Unsupported validator: {assert_type}")
|
||||
for msg, (a, b) in cases.items():
|
||||
validator(a, b, msg)
|
||||
|
||||
|
||||
@CaseValidator.register('equals')
|
||||
def validate_equals(a, b, msg):
|
||||
logger.info(f"assert {a} == {b}, {msg}执行这段代码")
|
||||
assert a == b, msg
|
||||
|
||||
|
||||
@CaseValidator.register('not_equals')
|
||||
def validate_not_equals(a, b, msg):
|
||||
logger.info(f"assert {a} != {b}, {msg}")
|
||||
assert a != b, msg
|
||||
|
||||
|
||||
@CaseValidator.register('contains')
|
||||
def validate_contains(a, b, msg):
|
||||
logger.info(f"assert {a} in {b}, {msg}")
|
||||
assert a in b, msg
|
||||
|
||||
|
||||
@CaseValidator.register('not_contains')
|
||||
def validate_not_contains(a, b, msg):
|
||||
logger.info(f"assert {a} not in {b}, {msg}")
|
||||
assert a not in b, msg
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
mock_case = {
|
||||
"validate": {
|
||||
"equals": {
|
||||
"判断相等": ["Success.", "Success."]
|
||||
},
|
||||
"not_equals": {
|
||||
"判断不相等": ["Success.", "Suc."]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case_validator = CaseValidator()
|
||||
print(case_validator.VALIDATORS)
|
||||
case_validator.assert_all(mock_case.get("validate"))
|
||||
91
utils/data_driver.py
Normal file
91
utils/data_driver.py
Normal file
@@ -0,0 +1,91 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: data_driver
|
||||
@date: 2025/3/3 10:56
|
||||
@desc:
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
from commons.templates import Template
|
||||
from commons.file_processors.file_handle import FileHandle
|
||||
|
||||
|
||||
class DataDriver:
|
||||
@staticmethod
|
||||
def generate_cases(file_name, case_info) -> dict:
|
||||
|
||||
if not case_info.get("parametrize"):
|
||||
return {file_name + "[--]": case_info}
|
||||
|
||||
cases = {}
|
||||
args_names = case_info.get("parametrize")[0]
|
||||
for i, args_values in enumerate(case_info.get("parametrize")[1:]):
|
||||
# print(args_values)
|
||||
context = dict(zip(args_names, args_values))
|
||||
# print(context)
|
||||
rendered = Template(FileHandle.to_string(case_info)).render(context)
|
||||
cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
|
||||
|
||||
return cases
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml")
|
||||
|
||||
file_obj = FileHandle(file_path)
|
||||
print(file_path.stem)
|
||||
file_name_ = file_path.stem
|
||||
# mock_case_info = {
|
||||
# "case_info0": {
|
||||
# "feature": "页面状态",
|
||||
# "story": "状态",
|
||||
# "title": "查询状态信息",
|
||||
# "request": "",
|
||||
# "extract": "",
|
||||
# "validate": "",
|
||||
# "parametrize": [["title", "username", "password", "msg"], ["测试1", "user1", "pass1", "200"],
|
||||
# ["测试2", "user2", "pass2", "300"]]
|
||||
# },
|
||||
# "case_info1": {
|
||||
# "feature": "页面状态",
|
||||
# "story": "状态",
|
||||
# "title": "查询状态信息",
|
||||
# "request": "",
|
||||
# "extract": "",
|
||||
# "validate": "",
|
||||
# "parametrize": [1, 2, 3]
|
||||
# },
|
||||
# "case_info2": {
|
||||
# "feature": "页面状态",
|
||||
# "story": "状态",
|
||||
# "title": "查询状态信息",
|
||||
# "request": "",
|
||||
# "extract": "",
|
||||
# "validate": "",
|
||||
# "parametrize": [1, 2, 3]
|
||||
# }
|
||||
#
|
||||
# }
|
||||
|
||||
dd = DataDriver()
|
||||
# cases = dd.generate_cases(mock_case_info.get("case_info0"))
|
||||
cases_ = dd.generate_cases(file_name_, file_obj)
|
||||
print(cases_)
|
||||
case_keys = list(cases_.keys())
|
||||
case_values = cases_.values()
|
||||
|
||||
print(case_keys)
|
||||
print(case_values)
|
||||
aa = [i.get("title") for i in case_values]
|
||||
print(aa)
|
||||
# print(list(case_values)[0]["feature"])
|
||||
print(file_obj["feature"])
|
||||
# print(list(case_values)[0]["story"])
|
||||
print(file_obj["story"])
|
||||
|
||||
Reference in New Issue
Block a user