Files
InterfaceAutoTest/commons/cases.py
CNWei 300b5a92d4 refactor(): 优化测试用例数据的处理,优化代码结构
- 新增用例生成器和注册器
- 优化文件处理
2025-06-03 21:42:57 +08:00

232 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: cases.py
@date: 2024 2024/9/16 9:57
@desc: 动态生成用例
"""
from dataclasses import asdict
from pathlib import Path
import logging
from typing import Union, Generator, Type
from unittest import TestCase
import allure
import pytest
from commons import settings
from commons.file_processors.processor_factory import get_processor_class
# from commons.models import CaseInfo
from commons.session import Session
from commons.exchange import Exchange
from commons.templates import Template
from commons.case_handler import TestCaseHandle
from utils import case_validator
logger = logging.getLogger(__name__)
session = Session(settings.base_url)
cases_dir = Path(settings.cases_dir)
exchanger = Exchange(settings.exchanger)
class TestAPI:
@classmethod
def run(cls, testcase_dir: Union[Path, str] = cases_dir):
for fp in CaseFinder(testcase_dir).find_testcases():
print(fp.name)
case = CaseGenerator(fp).generate_testcases()
print(f"{case=}")
for i in case:
print(f"{i=}")
CaseRegister(cls).register_test_func(i)
# @classmethod
# def find_test_cases(cls, case_dir: Path = cases_dir):
# """
# 搜索和加载yaml文件
# :return:
# """
# case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
# for case_path in case_path_list:
# logger.info(f"加载文件:{case_path}")
#
# file = FileHandle(case_path) # 自动读取yaml文件
# try:
# CaseInfo(**file) # 校验用例格式
# logger.info(f"case_info{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志
# case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式
# # print(case_path.stem)
# setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中
# except Exception as e:
# logger.error(e)
#
# @classmethod
# def new_case(cls, file_name, case_info: dict):
# test_case = data_driver.DataDriver().generate_cases(file_name, case_info)
#
# keys_list = list(test_case.keys())
# logger.info(f"keys_list{keys_list}")
#
# values_list = list(test_case.values())
# logger.info(f"测试用例列表:{values_list}")
#
# driver_title = [i.get("title") for i in values_list]
# logger.info(f"driver_title={driver_title}")
#
# epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
# logger.info(f"epic{epic}")
#
# feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
# logger.info(f"feature{feature}")
#
# story = case_info["story"] if case_info["story"] else settings.allure_story
# logger.info(f"story{story}")
#
# @allure.epic(epic)
# @allure.feature(feature)
# @allure.story(story)
# @pytest.mark.parametrize("case_key", keys_list, ids=driver_title)
# def test_func(self, case_key):
# logger.info(f"case_key{case_key}")
#
# test_case_mapping = test_case.get(case_key)
# logger.info(f"测试用例:{test_case_mapping}")
#
# allure.dynamic.title(test_case_mapping.get("title"))
#
# logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "="))
#
# # 0变量替换
# new_case_info = exchanger.replace(test_case_mapping)
# logger.info(f"1正在注入变量...")
# logger.info(f"new_case_info{new_case_info}")
# # 1发送请求
# logger.info(f"2正在请求接口...")
# resp = session.request(**new_case_info.get("request"))
#
# logger.info(f"3正在提取变量...")
# # 2保存变量(接口关联)
# for var_name, extract_info in new_case_info.get("extract").items():
# logger.info(f"保存变量:{var_name}{extract_info}")
# exchanger.extract(resp, var_name, *extract_info)
# # 3断言
# logger.info(f"4正在断言...")
# assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量
# logger.info(f"替换变量后:{assert_case_info}")
# # assert_case_info.assert_all() # 执行断言
# _validator = case_validator.CaseValidator()
# _validator.assert_all(assert_case_info.get("validate"))
#
# logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "="))
#
# return test_func
class CaseFinder:
find_suffix: str = settings.test_suffix
def __init__(self, testcase_dir: Union[str, Path]):
if Path(testcase_dir).is_dir():
self.testcase_dir: Path = Path(testcase_dir)
else:
raise FileNotFoundError("不是有效的目录")
def find_testcases(self) -> Generator[Path, None, None]:
testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}")
for fp in testcase_files:
logger.info(f"加载文件:{fp}")
yield fp
class CaseGenerator:
def __init__(self, fp: Union[str, Path]):
self.fp: Path = Path(fp)
def generate_testcases(self) -> Generator[dict, None, None]:
file_name = self.fp.stem
case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件
case_info = TestCaseHandle.new(case_info_)
if not case_info.parametrize:
yield {file_name + "__": asdict(case_info)}
else:
cases = {}
args_names = case_info.parametrize[0]
for i, args_values in enumerate(case_info.parametrize[1:]):
# print(args_values)
context = dict(zip(args_names, args_values))
print(context)
# rendered = Template(FileHandle.to_string(case_info)).render(context)
rendered = Template(case_info.to_string()).render(context)
# cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)})
cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)})
yield cases
class CaseRegister:
def __init__(self, register: Type["TestAPI"]):
self.register: Type["TestAPI"] = register
def register_test_func(self, case: dict):
for test_filed_name, case_info in case.items():
epic = case_info["epic"] if case_info["epic"] else settings.allure_epic
logger.info(f"epic{epic}")
feature = case_info["feature"] if case_info["feature"] else settings.allure_feature
logger.info(f"feature{feature}")
story = case_info["story"] if case_info["story"] else settings.allure_story
logger.info(f"story{story}")
@allure.epic(epic)
@allure.feature(feature)
@allure.story(story)
def register_func(instance, testcase=case_info):
# allure.dynamic.epic(epic)
# allure.dynamic.feature(feature)
# allure.dynamic.story(story)
allure.dynamic.title(testcase.get("title"))
logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(testcase)
logger.info(f"1正在注入变量...")
logger.info(f"new_case_info{new_case_info}")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.get("request"))
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.get("extract").items():
logger.info(f"保存变量:{var_name}{extract_info}")
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(testcase) # 为断言加载变量
logger.info(f"替换变量后:{assert_case_info}")
# assert_case_info.assert_all() # 执行断言
_validator = case_validator.CaseValidator()
_validator.assert_all(assert_case_info.get("validate"))
logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "="))
# return test_func
setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中
# TestAPI.find_yaml_case()
if __name__ == '__main__':
TestAPI.run(cases_dir)
print(TestAPI.__dict__)