diff --git a/TestCases/answer/test_1_status.yaml b/TestCases/answer/test_1_status.yaml deleted file mode 100644 index 088fcac..0000000 --- a/TestCases/answer/test_1_status.yaml +++ /dev/null @@ -1,31 +0,0 @@ -feature: 页面状态 -story: 状态 -title: 查询状态信息 -request: - method: get - url: /answer/api/v1/connector/info - headers: - Host: 119.91.19.171:40065 - Accept-Language: en_US - Accept: application/json, text/plain, */* - User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0 - Referer: http://119.91.19.171:40065/users/login - Accept-Encoding: gzip, deflate -extract: # 提取变量 - msg: - - "json" - - "$.msg" - - 0 - -validate: - equals: # 断言相等 - 状态码等于200: - - Success. - - ${msg} - -#parametrize: # 数据驱动测试 -# - [ "title","username","password","msg" ] # 变量名 -# - [ "测试1","user1","pass1","200" ] # 变量值 -# - [ "测试2","user2","pass2","300" ] # 变量值 -# - [ "测试3","user3","pass3","200" ] # 变量值 -# - [ "测试4","user4","pass4","200" ] # 变量值 \ No newline at end of file diff --git a/TestCases/test_1_user.json b/TestCases/test_1_user.json deleted file mode 100644 index c0e12ba..0000000 --- a/TestCases/test_1_user.json +++ /dev/null @@ -1,65 +0,0 @@ -{ - "epic": "项目名称:answer", - "feature": "页面状态", - "story": "状态", - "title": "查询状态信息", - "request": { - "method": "get", - "url": "/answer/api/v1/connector/info", - "headers": { - "Host": "119.91.19.171:40065", - "Accept-Language": "en_US", - "Accept": "application/json, text/plain, */*", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0", - "Referer": "http://119.91.19.171:40065/users/login", - "Accept-Encoding": "gzip, deflate" - } - }, - "extract": { - "msg": [ - "json", - "$.msg", - 0 - ] - }, - "validate": { - "equals": { - "状态码等于200": [ - "Success.", - "Success." - ] - } - }, - "parametrize": [ - [ - "title", - "username", - "password", - "msg" - ], - [ - "测试1", - "user1", - "pass1", - "200" - ], - [ - "测试2", - "user2", - "pass2", - "300" - ], - [ - "测试3", - "user3", - "pass3", - "200" - ], - [ - "测试4", - "user4", - "pass4", - "200" - ] - ] -} \ No newline at end of file diff --git a/TestCases/test_1_user.yaml b/TestCases/test_1_user.yaml deleted file mode 100644 index a695b71..0000000 --- a/TestCases/test_1_user.yaml +++ /dev/null @@ -1,51 +0,0 @@ -feature: 特征 -story: 事件 -title: 查询用户信息 -request: - method: get - url: http://119.91.19.171:40065/answer/api/v1/connector/info - headers: - Accept-Encoding: gzip, deflate - Accept-Language: zh_CN - Content-Type: application/json - Cookie: psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg== - Host: 119.91.19.171:40065 - Origin: http://119.91.19.171:40065 - Referer: http://119.91.19.171:40065/users/login - User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like - Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0 -extract: # 提取变量 - code: - - "json" - - "$.code" - - 0 - msg: - - "json" - - "$.msg" - - 0 - - -validate: - equals: # 断言相等 - 状态码等于200: - - 200 - - ${code} - not_equals: # 断言不相等 - 状态码不等于404: - - 404 - - ${code} - contains: # 断言包含 - 包含关系: - - 404 - - ${code} - not_contains: # 断言不包含 - 不包含关系: - - 404 - - ${code} - -parametrize: # 数据驱动测试 - - [ "title","username","password","code" ] # 变量名 - - [ "测试1","user1","pass1","code1" ] # 变量值 - - [ "测试2","user2","pass2","code2" ] # 变量值 - - [ "测试3","user3","pass3","code3" ] # 变量值 - - [ "测试4","user4","pass4","code4" ] # 变量值 \ No newline at end of file diff --git a/TestCases/test_2_url.yaml b/TestCases/test_2_url.yaml deleted file mode 100644 index dd0165f..0000000 --- a/TestCases/test_2_url.yaml +++ /dev/null @@ -1,15 +0,0 @@ -title: 查询用户信息 - -request: - method: get - url: "https://api.kuleu.com/api/action" - headers: - user-agent: 'Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, like Gecko) Chrome / 128.0.0.0Safari / 537.36' - params: - text: ${url_unquote(code)} -# data: ${code} -extract: - status_code: [ json, $.data,0 ] - -validate: - codes: 200 \ No newline at end of file diff --git a/TestCases/test_3_sql.yaml b/TestCases/test_3_sql.yaml deleted file mode 100644 index c906159..0000000 --- a/TestCases/test_3_sql.yaml +++ /dev/null @@ -1,30 +0,0 @@ -title: 查询用户信息 -request: - method: get - url: http://119.91.19.171:40065/answer/api/v1/connector/info - headers: - Accept-Encoding: gzip, deflate - Accept-Language: zh_CN - Content-Type: application/json - Cookie: psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg== - Host: 119.91.19.171:40065 - Origin: http://119.91.19.171:40065 - Referer: http://119.91.19.171:40065/users/login - User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like - Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0 -extract: # 提取变量 - reason: - - "json" - - "$.reason" - - 0 - -validate: - # 断言 sql - contains: # 断言包含 - 用户在数据库中: - - "ltcs" - - ${sql(select username from user where id=1)} - not_contains: # 断言包含 - 用户不存在在数据库中: - - "ltcs" - - ${sql(select username from user where id=1)} \ No newline at end of file diff --git a/a_test_case.py b/a_test_case.py deleted file mode 100644 index f680ad3..0000000 --- a/a_test_case.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python -# coding=utf-8 - -""" -@author: chen wei -@Software: PyCharm -@contact: t6i888@163.com -@file: a_test_case.py -@date: 2024 2024/9/15 19:15 -@desc: -""" -from requests import Session -import requests - -session = Session() - - -def test_1(): - base_url = "https://jsonplaceholder.typicode.com" - session.params = { - 'Content-Type': 'application/json;charset=utf-8' - } - - url = f"{base_url}/users" - - payload = {} - - # response = requests.request("POST", url, headers=headers, data=payload) - response = session.get(url, json=payload) - print(response.json()[0]["username"]) - assert response.status_code == 200 - - -def test_2(): - base_url = r'https://api.kuleu.com/api/action' - params = {"text": "爱情"} - header = { - "user-agent": 'Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, like Gecko) ' - 'Chrome / 128.0.0.0Safari / 537.36' - } - response = requests.get(base_url, headers=header, params=params) - # print(response.text) - print(response.json()) - print(response.request.url) - assert response.status_code == 200 diff --git a/api.py b/api.py deleted file mode 100644 index fc09b3f..0000000 --- a/api.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python -# coding=utf-8 - -""" -@author: chen wei -@Software: PyCharm -@contact: t6i888@163.com -@file: api.py -@date: 2024 2024/9/12 22:52 -@desc: -""" -from commons.session import Session - -# session = requests.session() -session = Session("https://jsonplaceholder.typicode.com") -session.params = { - 'Content-Type': 'application/json;charset=utf-8' -} - -url = "/users" - -payload = {} - -# response = requests.request("POST", url, headers=headers, data=payload) -response = session.get(url, json=payload) -# print(response.text) -# print(response.url) -# print(response) diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..94b455a --- /dev/null +++ b/api/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: chen wei +@Software: PyCharm +@contact: t6i888@163.com +@file: __init__.py +@date: 2024 2024/9/15 21:13 +@desc: +""" diff --git a/api/user_api.py b/api/user_api.py new file mode 100644 index 0000000..1cdfe64 --- /dev/null +++ b/api/user_api.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# coding=utf-8 + +from core.base_api import BaseApi + + +class UserApi(BaseApi): + """用户中心业务接口""" + + def login(self, username, password): + """登录接口示例""" + self._log_action("login", user=username) + + payload = { + "username": username, + "password": password + } + # 直接调用继承自 session 的请求方法 + return self.session.request( + method="POST", + url="/api/v1/login", + json=payload + ) + + def get_info(self, user_id: int): + """获取用户信息示例""" + self._log_action("get_info", uid=user_id) + + return self.session.request( + method="GET", + url=f"/api/v1/user/{user_id}" + ) diff --git a/commons/cases.py b/commons/cases.py index c02f299..f3be1f1 100644 --- a/commons/cases.py +++ b/commons/cases.py @@ -9,23 +9,18 @@ @date: 2024 2024/9/16 9:57 @desc: 动态生成用例 """ -from dataclasses import asdict from pathlib import Path import logging -from typing import Union, Generator, Type -from unittest import TestCase import allure import pytest -from commons import settings -from commons.file_processors.processor_factory import get_processor_class -# from commons.models import CaseInfo -from commons.session import Session -from commons.exchange import Exchange -from commons.templates import Template -from commons.case_handler import TestCaseHandle -from utils import case_validator +from core import settings +from commons.file_processors.yaml_processor import YamlProcessor as FileHandle +from commons.models import CaseInfo +from core.session import Session +from core.exchange import Exchange +from utils import data_driver, case_validator logger = logging.getLogger(__name__) @@ -37,195 +32,90 @@ exchanger = Exchange(settings.exchanger) class TestAPI: + @classmethod - def run(cls, testcase_dir: Union[Path, str] = cases_dir): - for fp in CaseFinder(testcase_dir).find_testcases(): - print(fp.name) - case = CaseGenerator(fp).generate_testcases() - print(f"{case=}") - for i in case: - print(f"{i=}") - CaseRegister(cls).register_test_func(i) - # @classmethod - # def find_test_cases(cls, case_dir: Path = cases_dir): - # """ - # 搜索和加载yaml文件 - # :return: - # """ - # case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件 - # for case_path in case_path_list: - # logger.info(f"加载文件:{case_path}") - # - # file = FileHandle(case_path) # 自动读取yaml文件 - # try: - # CaseInfo(**file) # 校验用例格式 - # logger.info(f"case_info:{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志 - # case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式 - # # print(case_path.stem) - # setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中 - # except Exception as e: - # logger.error(e) - # - # @classmethod - # def new_case(cls, file_name, case_info: dict): - # test_case = data_driver.DataDriver().generate_cases(file_name, case_info) - # - # keys_list = list(test_case.keys()) - # logger.info(f"keys_list:{keys_list}") - # - # values_list = list(test_case.values()) - # logger.info(f"测试用例列表:{values_list}") - # - # driver_title = [i.get("title") for i in values_list] - # logger.info(f"driver_title={driver_title}") - # - # epic = case_info["epic"] if case_info["epic"] else settings.allure_epic - # logger.info(f"epic:{epic}") - # - # feature = case_info["feature"] if case_info["feature"] else settings.allure_feature - # logger.info(f"feature:{feature}") - # - # story = case_info["story"] if case_info["story"] else settings.allure_story - # logger.info(f"story:{story}") - # - # @allure.epic(epic) - # @allure.feature(feature) - # @allure.story(story) - # @pytest.mark.parametrize("case_key", keys_list, ids=driver_title) - # def test_func(self, case_key): - # logger.info(f"case_key:{case_key}") - # - # test_case_mapping = test_case.get(case_key) - # logger.info(f"测试用例:{test_case_mapping}") - # - # allure.dynamic.title(test_case_mapping.get("title")) - # - # logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "=")) - # - # # 0,变量替换 - # new_case_info = exchanger.replace(test_case_mapping) - # logger.info(f"1,正在注入变量...") - # logger.info(f"new_case_info:{new_case_info}") - # # 1,发送请求 - # logger.info(f"2,正在请求接口...") - # resp = session.request(**new_case_info.get("request")) - # - # logger.info(f"3,正在提取变量...") - # # 2,保存变量(接口关联) - # for var_name, extract_info in new_case_info.get("extract").items(): - # logger.info(f"保存变量:{var_name}{extract_info}") - # exchanger.extract(resp, var_name, *extract_info) - # # 3,断言 - # logger.info(f"4,正在断言...") - # assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量 - # logger.info(f"替换变量后:{assert_case_info}") - # # assert_case_info.assert_all() # 执行断言 - # _validator = case_validator.CaseValidator() - # _validator.assert_all(assert_case_info.get("validate")) - # - # logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "=")) - # - # return test_func + def find_test_cases(cls, case_dir: Path = cases_dir): + """ + 搜索和加载yaml文件 + :return: + """ + case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件 + for case_path in case_path_list: + logger.info(f"加载文件:{case_path}") + file = FileHandle(case_path) # 自动读取yaml文件 + try: + CaseInfo(**file) # 校验用例格式 + logger.info(f"case_info:{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志 + case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式 + # print(case_path.stem) + setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中 + except Exception as e: + logger.error(e) -class CaseFinder: - find_suffix: str = settings.test_suffix + @classmethod + def new_case(cls, file_name, case_info: dict): + test_case = data_driver.DataDriver().generate_cases(file_name, case_info) - def __init__(self, testcase_dir: Union[str, Path]): - if Path(testcase_dir).is_dir(): - self.testcase_dir: Path = Path(testcase_dir) - else: - raise FileNotFoundError("不是有效的目录") + keys_list = list(test_case.keys()) + logger.info(f"keys_list:{keys_list}") - def find_testcases(self) -> Generator[Path, None, None]: - testcase_files = self.testcase_dir.glob(f"**/test_*.{self.find_suffix}") - for fp in testcase_files: - logger.info(f"加载文件:{fp}") - yield fp + values_list = list(test_case.values()) + logger.info(f"测试用例列表:{values_list}") + driver_title = [i.get("title") for i in values_list] + logger.info(f"driver_title={driver_title}") -class CaseGenerator: + epic = case_info["epic"] if case_info["epic"] else settings.allure_epic + logger.info(f"epic:{epic}") - def __init__(self, fp: Union[str, Path]): - self.fp: Path = Path(fp) + feature = case_info["feature"] if case_info["feature"] else settings.allure_feature + logger.info(f"feature:{feature}") - def generate_testcases(self) -> Generator[dict, None, None]: - file_name = self.fp.stem + story = case_info["story"] if case_info["story"] else settings.allure_story + logger.info(f"story:{story}") - case_info_ = get_processor_class(self.fp).load() # 自动读取yaml文件 - case_info = TestCaseHandle.new(case_info_) + @allure.epic(epic) + @allure.feature(feature) + @allure.story(story) + @pytest.mark.parametrize("case_key", keys_list, ids=driver_title) + def test_func(self, case_key): + logger.info(f"case_key:{case_key}") - if not case_info.parametrize: - yield {file_name + "__": asdict(case_info)} - else: - cases = {} - args_names = case_info.parametrize[0] - for i, args_values in enumerate(case_info.parametrize[1:]): - # print(args_values) - context = dict(zip(args_names, args_values)) - print(context) - # rendered = Template(FileHandle.to_string(case_info)).render(context) - rendered = Template(case_info.to_string()).render(context) - # cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)}) - cases.update({file_name + "_" + str(i): case_info.to_dict(rendered)}) + test_case_mapping = test_case.get(case_key) + logger.info(f"测试用例:{test_case_mapping}") - yield cases + allure.dynamic.title(test_case_mapping.get("title")) + logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "=")) -class CaseRegister: - def __init__(self, register: Type["TestAPI"]): - self.register: Type["TestAPI"] = register + # 0,变量替换 + new_case_info = exchanger.replace(test_case_mapping) + logger.info(f"1,正在注入变量...") + logger.info(f"new_case_info:{new_case_info}") + # 1,发送请求 + logger.info(f"2,正在请求接口...") + resp = session.request(**new_case_info.get("request")) - def register_test_func(self, case: dict): - for test_filed_name, case_info in case.items(): - epic = case_info["epic"] if case_info["epic"] else settings.allure_epic - logger.info(f"epic:{epic}") + logger.info(f"3,正在提取变量...") + # 2,保存变量(接口关联) + for var_name, extract_info in new_case_info.get("extract").items(): + logger.info(f"保存变量:{var_name}{extract_info}") + exchanger.extract(resp, var_name, *extract_info) + # 3,断言 + logger.info(f"4,正在断言...") + assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量 + logger.info(f"替换变量后:{assert_case_info}") + # assert_case_info.assert_all() # 执行断言 + _validator = case_validator.CaseValidator() + _validator.assert_all(assert_case_info.get("validate")) - feature = case_info["feature"] if case_info["feature"] else settings.allure_feature - logger.info(f"feature:{feature}") + logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "=")) - story = case_info["story"] if case_info["story"] else settings.allure_story - logger.info(f"story:{story}") - - @allure.epic(epic) - @allure.feature(feature) - @allure.story(story) - def register_func(instance, testcase=case_info): - # allure.dynamic.epic(epic) - # allure.dynamic.feature(feature) - # allure.dynamic.story(story) - allure.dynamic.title(testcase.get("title")) - logger.info(f"用例开始执行:{testcase.get('title')}".center(80, "=")) - - # 0,变量替换 - new_case_info = exchanger.replace(testcase) - logger.info(f"1,正在注入变量...") - logger.info(f"new_case_info:{new_case_info}") - # 1,发送请求 - logger.info(f"2,正在请求接口...") - resp = session.request(**new_case_info.get("request")) - - logger.info(f"3,正在提取变量...") - # 2,保存变量(接口关联) - for var_name, extract_info in new_case_info.get("extract").items(): - logger.info(f"保存变量:{var_name}{extract_info}") - exchanger.extract(resp, var_name, *extract_info) - # 3,断言 - logger.info(f"4,正在断言...") - assert_case_info = exchanger.replace(testcase) # 为断言加载变量 - logger.info(f"替换变量后:{assert_case_info}") - # assert_case_info.assert_all() # 执行断言 - _validator = case_validator.CaseValidator() - _validator.assert_all(assert_case_info.get("validate")) - - logger.info(f"用例执行结束:{testcase.get('title')}".center(80, "=")) - - # return test_func - - setattr(self.register, test_filed_name, register_func) # 把pytest格式添加到类中 + return test_func # TestAPI.find_yaml_case() if __name__ == '__main__': - TestAPI.run(cases_dir) - print(TestAPI.__dict__) + TestAPI.find_test_cases() + # print(TestAPI.__dict__) diff --git a/commons/databases.py b/commons/databases.py index dc3474f..ba52d16 100644 --- a/commons/databases.py +++ b/commons/databases.py @@ -10,9 +10,10 @@ @desc: """ import logging +import os + import pymysql as MySQLdb -from commons import settings logger = logging.getLogger(__name__) @@ -32,11 +33,11 @@ class DBServer: db = DBServer( - host=settings.db_host, # ip - port=settings.db_port, # 端口 - user=settings.db_user, # 用户名 - password=settings.db_password, # 密码 - database=settings.db_database # 库名 + host=os.getenv("DB_HOST"), # ip + port=os.getenv("DB_PORT"), # 端口 + user=os.getenv("DB_USER"), # 用户名 + password=os.getenv("DB_PASSWORD"), # 密码 + database=os.getenv("DB_DATABASE") # 库名 ) if __name__ == '__main__': diff --git a/commons/exchange.py b/commons/exchange.py deleted file mode 100644 index 62dc88b..0000000 --- a/commons/exchange.py +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env python -# coding=utf-8 - -""" -@author: chen wei -@Software: PyCharm -@contact: t6i888@163.com -@file: exchange.py -@date: 2024 2024/9/18 21:58 -@desc: -""" -import copy -import json -import logging -import re -import jsonpath - -import allure - -from commons.templates import Template -from commons.file_processors.processor_factory import get_processor_class -from tests.b import TestCaseHandle - -logger = logging.getLogger(__name__) - - -class Exchange: - def __init__(self, path): - self.file = get_processor_class(path) - - @allure.step("提取变量") - def extract(self, resp, var_name, attr, expr: str, index) -> None: - - resp = copy.deepcopy(resp) - - try: - # resp中json是方法不是属性,需要手动更改为属性 - resp.json = resp.json() - except json.decoder.JSONDecodeError: - resp.json = {"msg": "is not json data"} - - data = getattr(resp, attr) - if expr.startswith("/"): # xpath - res = None - elif expr.startswith("$"): # jsonpath - data = dict(data) - res = jsonpath.jsonpath(data, expr) - else: # 正则 - res = re.findall(expr, str(data)) - # print(res) - if res: # 如果有数据 - value = res[index] - else: # 如果没有数据 - value = "not data" - - logger.debug(f"{var_name} = {value}") # 记录变量名和变量值 - data = self.file.load() - data[var_name] = value # 保存变量 - self.file.save(data) # 持久化存储到文件 - - @allure.step("替换变量") - def replace(self, case_info: dict) -> dict: - logger.info(f"变量替换:{case_info}") - # 1,将case_info转换为字符串 - data = TestCaseHandle(**case_info) - case_info_str = data.to_string() - print(f"{case_info_str=}") - # 2,替换字符串 - case_info_str = Template(case_info_str).render(self.file.load()) - print(f"{case_info_str=}") - # 3,将字符串转换成case_info - new_case_info = data.to_dict(case_info_str) - return new_case_info - - -if __name__ == '__main__': - class MockResponse: - text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}' - - def json(self): - return json.loads(self.text) - - - mock_resp = MockResponse() - - # print(mock_resp.text) - # print(mock_resp.json()) - exchanger = Exchange(r"E:\PyP\InterfaceAutoTest\extract.yaml") - exchanger.extract(mock_resp, "name", "json", '$.name', 0) - exchanger.extract(mock_resp, "age", "json", '$.age', 0) - exchanger.extract(mock_resp, "data", "json", '$.data', 0) - exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0) - # mock_case_info = CaseInfo( - # title="单元测试", - # request={ - # "data": - # {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"} - # }, - # extract={}, - # validate={} - # ) - mock_case_info = { - "title": "单元测试", - "request": { - "data": - {"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"} - }, - "extract": {}, - "validate": {} - } - - new_mock_case_info = exchanger.replace(mock_case_info) - print(new_mock_case_info) diff --git a/commons/file_processors/yaml_processor.py b/commons/file_processors/yaml_processor.py index 7b64bdc..48df461 100644 --- a/commons/file_processors/yaml_processor.py +++ b/commons/file_processors/yaml_processor.py @@ -10,14 +10,21 @@ @desc: """ import logging -from typing import Union +from typing import Union, Any from pathlib import Path import yaml from commons.file_processors.base_processor import BaseFileProcessor + + logger = logging.getLogger(__name__) +class YamlLoadError(Exception): + """自定义 YAML 加载异常:当 YAML 语法错误或不符合业务结构时抛出""" + pass + + class YamlProcessor(BaseFileProcessor): """ 用于处理 YAML 文件的类,继承自 dict。 @@ -25,52 +32,101 @@ class YamlProcessor(BaseFileProcessor): 并可以直接像字典一样访问 YAML 数据。 """ - def __init__(self, filepath: Union[str, Path], **kwargs): + def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): """ 初始化 YamlFile 对象。 - Args: filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). + Args: + filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 如果不提供,则尝试从 filepath 加载数据。 """ - super().__init__(filepath, **kwargs) - # self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 - def load(self) -> dict: + super().__init__(filepath=filepath) + + self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 + + def load(self) -> dict[str, Any]: """ - 从 YAML 文件加载数据 - :return: + 加载 YAML 文件并返回字典。 + + Returns: + Dict: 加载后的数据字典。 + + Raises: + YamlLoadError: 文件读取或解析过程中出现异常。 """ if not self.filepath.exists(): - logger.warning(f"文件 {self.filepath} 不存在.") - raise FileNotFoundError(f"文件 {self.filepath} 不存在.") - + logger.error(f"❌ 文件未找到: {self.filepath}") + return {} try: with open(self.filepath, "r", encoding="utf-8") as f: - loaded_data = yaml.safe_load(f) - if not isinstance(loaded_data, dict): # 确保加载的是字典 - logger.error(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") - raise ValueError(f"YAML文件 {self.filepath} 的根节点不是一个字典/映射.") - return loaded_data + content = yaml.safe_load(f) + # 情况1:文件内容为空 + if content is None: + return {} + # 情况2:YAML 语法正确但不是字典(如单纯的字符串或列表) + if not isinstance(content, dict): + raise YamlLoadError(f"YAML 顶层格式错误:期望 dict,实际为 {type(content).__name__}") + return content except yaml.YAMLError as e: - logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") - raise e + msg = f"❌ YAML 语法错误 [{self.filepath.name}]: {e}" + logger.error(msg) + raise YamlLoadError(msg) from e + except Exception as e: + logger.error(f"📂 读取文件系统异常: {e}") + raise - def save(self, data: dict, new_filepath: Union[str, Path, None] = None) -> None: + @staticmethod + def to_string(data: dict[str, Any]) -> str: """ - 将字典数据保存到 YAML 文件。 - :param data: - :param new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。 + 将字典 (自身) 转换为 YAML 格式的字符串。 + Returns: + YAML 格式的字符串。 """ - filepath = Path(new_filepath) if new_filepath else self.filepath - - # 确保目标目录存在 - filepath.parent.mkdir(parents=True, exist_ok=True) - try: - with open(filepath, "w", encoding="utf-8") as f: + return yaml.safe_dump( + data, + allow_unicode=True, + sort_keys=False, + default_flow_style=False + ) + except TypeError as e: + logger.error(f"将数据转换为 YAML 字符串时出错: {e}") + return "" + except Exception as e: + logger.error(f"序列化 YAML 失败: {e}") + return "" + + @staticmethod + def from_string(yaml_str: str) -> Union[None, dict]: + """ + 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. + + Args: + yaml_str: YAML 格式的字符串。 + """ + try: + data = yaml.safe_load(yaml_str) + return data if isinstance(data, dict) else {} + except yaml.YAMLError as e: + logger.error(f"YAML 字符串解析失败: {e}") + return {} + + def save(self, data: dict[str, Any], new_filepath: Union[str, Path, None] = None): + """ + 将字典数据保存为 YAML 文件。 + + Args: + data: 要保存的字典数据。 + new_filepath: 可选,保存到新路径。 + """ + target_path = Path(new_filepath) if new_filepath else self.filepath + try: + target_path.parent.mkdir(parents=True, exist_ok=True) + with open(target_path, "w", encoding="utf-8") as f: yaml.safe_dump( data, stream=f, @@ -78,20 +134,22 @@ class YamlProcessor(BaseFileProcessor): sort_keys=False, default_flow_style=False ) - logger.info(f"数据已成功保存到 {filepath}") - except (TypeError, OSError, yaml.YAMLError) as e: - logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") - raise e - - - + logger.debug(f"💾 数据已成功保存至: {target_path}") + except Exception as e: + logger.error(f"🚫 保存 YAML 失败: {e}") + raise + except (TypeError, OSError) as e: + logger.error(f"保存 YAML 文件 {self.filepath} 时出错: {e}") +# todo 需要将异常的情况返回给上层而不是默认处理为{} if __name__ == '__main__': + from core.settings import TEST_CASE_DIR # 示例用法 - yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 + yaml_path = TEST_CASE_DIR / r'answer/test_1_status.yaml' # 你的 YAML 文件路径 yaml_file = YamlProcessor(yaml_path) print(yaml_file.load()) + print(yaml_file.to_string(yaml_file.load())) print(type(yaml_file)) # # 直接像字典一样访问数据 @@ -134,4 +192,3 @@ if __name__ == '__main__': # print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {} # non_existent_file['a'] = 1 # 可以直接添加 # print("\n加载不存在的文件:", non_existent_file) - diff --git a/commons/files.py b/commons/files.py index ef7fafc..31c6cfa 100644 --- a/commons/files.py +++ b/commons/files.py @@ -10,7 +10,6 @@ @desc: 读取和保存yaml文件 """ import logging -from dataclasses import dataclass, asdict, field from pathlib import Path import yaml @@ -18,20 +17,31 @@ logger = logging.getLogger(__name__) class YamlFile(dict): - def __init__(self, path): - super().__init__() # 初始化父类 dict - self.path = Path(path) - self.load() # 链式初始化加载 + def __init__(self, path=None, data=None): + super().__init__() + self.path = Path(path) if path else None + + if data: + self.update(data) + elif self.path: + if self.path.is_dir(): + raise IsADirectoryError(f"The path {self.path} is a directory, not a file.") + self.load() def load(self): - if self.path.exists(): + if not self.path: + logger.warning("No path specified for YamlFile, cannot load.") + return self + + if self.path.exists() and self.path.is_file(): with open(self.path, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) or {} # 加载数据,空文件返回空字典 - self.clear() # 清空当前实例 - self.update(data) # 更新字典内容 + loaded_data = yaml.safe_load(f) or {} + self.clear() + self.update(loaded_data) else: - logger.warning(f"File {self.path} not found, initialized empty.") - return self # 链式调用 + logger.warning(f"File not found at {self.path}, YamlFile initialized as empty.") + self.clear() + return self def to_yaml(self) -> str: return yaml.safe_dump( @@ -40,29 +50,49 @@ class YamlFile(dict): sort_keys=False ) - @classmethod def by_yaml(cls, yaml_str): data = yaml.safe_load(yaml_str) or {} - return cls({**data}) # 通过类方法创建实例 + return cls(data=data) def save(self): + if not self.path: + raise ValueError("Cannot save YamlFile instance without a specified path.") + + # 确保父目录存在 + self.path.parent.mkdir(parents=True, exist_ok=True) + with open(self.path, "w", encoding="utf-8") as f: yaml.safe_dump( - dict(self), # 直接 dump 实例本身(已继承 dict) + dict(self), stream=f, allow_unicode=True, sort_keys=False ) - return self # 链式调用 + return self if __name__ == '__main__': - from commons.models import CaseInfo + from core.models import CaseInfo + from core.settings import TEST_CASE_DIR - yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml' - yaml_file = YamlFile(yaml_path) - # yaml_file.load() - case_info = CaseInfo(**yaml_file) - yaml_file["title"] = "查询用户信息" - yaml_file.save() + # 1. 创建一个用于测试的临时yaml文件 + dummy_path = TEST_CASE_DIR / "test_model_demo.yaml" + dummy_data = { + "title": "Get user info", + "request": {"method": "GET", "url": "/users/1"}, + "validate": [{"equals": ["status_code", 200]}] + } + YamlFile(path=dummy_path, data=dummy_data).save() + print(f"--- 已创建临时测试文件: {dummy_path}") + + # 2. 加载文件并使用Pydantic模型进行校验 + yaml_case = YamlFile(dummy_path) + print("\n--- 已加载YAML内容 ---\n", yaml_case.to_yaml()) + case_model = CaseInfo(**yaml_case) + print("\n--- Pydantic模型校验成功 ---") + print(case_model.model_dump_json(indent=2, by_alias=True)) + + # 3. 清理临时文件 + dummy_path.unlink() + print(f"\n--- 已清理临时文件: {dummy_path}") diff --git a/commons/funcs.py b/commons/funcs.py index 36827bd..9ef0212 100644 --- a/commons/funcs.py +++ b/commons/funcs.py @@ -15,10 +15,10 @@ import time import urllib.parse import hashlib -from commons.databases import db +# from commons.databases import db -from commons.file_processors.processor_factory import get_processor_class -from commons import settings +# from commons.file_processors.yaml_processor import YamlProcessor as get_processor_class +from core import settings logger = logging.getLogger(__name__) @@ -62,31 +62,31 @@ def add(a, b): return str(int(a) + int(b)) -@Funcs.register("sql") -def sql(s: str) -> str: - res = db.execute_sql(s) - - return res[0][0] +# @Funcs.register("sql") +# def sql(s: str) -> str: +# res = db.execute_sql(s) +# +# return res[0][0] -@Funcs.register("new_id") -def new_id(): - # 自增,永不重复 - id_file = get_processor_class(settings.id_path) - data = id_file.load() - data["id"] += 1 - id_file.save(data) - - return data["id"] +# @Funcs.register("new_id") +# def new_id(): +# # 自增,永不重复 +# id_file = get_processor_class(settings.id_path) +# data = id_file.load() +# data["id"] += 1 +# id_file.save(data) +# +# return data["id"] -@Funcs.register("last_id") -def last_id() -> str: - # 不自增,只返回结果 - - id_file = get_processor_class(settings.id_path) - data = id_file.load() - return data["id"] +# @Funcs.register("last_id") +# def last_id() -> str: +# # 不自增,只返回结果 +# +# id_file = get_processor_class(settings.id_path) +# data = id_file.load() +# return data["id"] @Funcs.register("md5") @@ -131,9 +131,9 @@ def rsa_decode(content: str) -> str: ... -@Funcs.register() +@Funcs.register("gen_phone") def func_name_test(): - ... + return "我被替换了!!!" if __name__ == '__main__': diff --git a/commons/models.py b/commons/models.py deleted file mode 100644 index f31dba0..0000000 --- a/commons/models.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -# coding=utf-8 - -""" -@author: chen wei -@Software: PyCharm -@contact: t6i888@163.com -@file: models.py -@date: 2024 2024/9/15 21:14 -@desc: 声明yaml用例格式 -""" -import logging -from typing import Union, Optional -from dataclasses import dataclass, field - -import yaml - -from commons import settings - -logger = logging.getLogger(__name__) - - -@dataclass -class RequestModel: - method: str - url: str - headers: Optional[dict] = None - # body: Optional[Union[dict, str]] = None - params: Optional[Union[dict, str]] = None - - -@dataclass -class TestCaseModel: - title: str - request: RequestModel - extract: dict - validate: dict - parametrize: list = field(default_factory=list) - epic: str = field(default_factory=lambda: settings.allure_epic) - feature: str = field(default_factory=lambda: settings.allure_feature) - story: str = field(default_factory=lambda: settings.allure_story) - - def __post_init__(self): - # 必填字段非空校验 - if self.title is None: - raise ValueError("Title cannot be empty") - - # 校验RequestModel - if isinstance(self.request, dict): - try: - self.request = RequestModel(**self.request) # RequestModel 的 __post_init__ 会被调用 - except (TypeError, ValueError) as e: - raise ValueError(f"解析 'request' 字段失败: {e} (数据: {self.request})") from e - elif not isinstance(self.request, RequestModel): # 如果不是 dict 也不是 RequestModel - raise TypeError( - f"字段 'request' 必须是字典 (将在内部转换为 RequestModel) 或 RequestModel 实例, " - f"得到的是 {type(self.request).__name__}" - ) - - -if __name__ == '__main__': - with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f: - data = yaml.safe_load(f) - # print(data) - case_info = TestCaseModel(**data) diff --git a/commons/templates.py b/commons/templates.py deleted file mode 100644 index bf6fa67..0000000 --- a/commons/templates.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python -# coding=utf-8 - -""" -@author: chen wei -@Software: PyCharm -@contact: t6i888@163.com -@file: templates.py -@date: 2024 2024/9/22 22:20 -@desc: -""" -import copy -import logging -import re -import string -from commons.funcs import Funcs - - -logger = logging.getLogger(__name__) - - -class Template(string.Template): - """ - 1,支持函数调用 - 2,参数也可以是变量 - """ - - call_pattern = re.compile(r"\${(?P.*?)\((?P.*?)\)}") - - def render(self, mapping: dict) -> str: - s = self.safe_substitute(mapping) # 原有方法替换变量 - s = self.safe_substitute_funcs(s, mapping) - - return s - - def safe_substitute_funcs(self, template, mapping) -> str: - """ - 解析字符串中的函数名和参数,并将函数调用结果进行替换 - :param template: 字符串 - :param mapping: 上下文,提供要使用的函数和变量 - :return: 替换后的结果 - """ - mapping = copy.deepcopy(mapping) - logger.info(f"mapping更新前: {mapping}") - # mapping.update(self.FUNC_MAPPING) # 合并两个mapping - mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping - logger.info(f"mapping更新后: {mapping}") - def convert(mo): - func_name = mo.group("func_name") - func_args = mo.group("func_args").split(",") - func = mapping.get(func_name) # 读取指定函数 - func_args_value = [mapping.get(arg, arg) for arg in func_args] - - if func_args_value == [""]: # 处理没有参数的func - func_args_value = [] - - if not callable(func): - return mo.group() # 如果是不可调用的假函数,不进行替换 - else: - return str(func(*func_args_value)) # 否则用函数结果进行替换 - - return self.call_pattern.sub(convert, template) - diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..3a3c4bd --- /dev/null +++ b/conftest.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@desc: Pytest 配置文件,用于设置全局 Fixture 和钩子函数 +""" + +import pytest +from pathlib import Path +import logging + +from core import settings +from commons.files import YamlFile +from core.models import CaseInfo +from core.session import Session +from core.exchange import Exchange +from core.settings import EXTRACT_CACHE +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="session") +def session(): + """全局共享的 Session Fixture""" + return Session(settings.base_url) + + +@pytest.fixture(scope="session") +def exchanger(): + """全局共享的 Exchange Fixture""" + return Exchange(EXTRACT_CACHE) + + +# @pytest.fixture(scope="session") +# def case_engine(session, exchanger): +# """全局共享的 CaseEngine Fixture""" +# return CaseEngine(session, exchanger) + + diff --git a/core/base_api.py b/core/base_api.py new file mode 100644 index 0000000..31b7895 --- /dev/null +++ b/core/base_api.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# coding=utf-8 +import logging + +from core.session import Session +from core import settings + +class BaseApi: + def __init__(self, session: Session = None): + self.session = session or Session(base_url=settings.base_url) + self.logger = logging.getLogger(self.__class__.__name__) + + def _log_action(self, method_name: str, **kwargs): + """统一的动作日志记录""" + self.logger.info(f"执行动作: {method_name} | 参数: {kwargs}") \ No newline at end of file diff --git a/core/creator.py b/core/creator.py index a38fb80..c30cc2a 100644 --- a/core/creator.py +++ b/core/creator.py @@ -12,14 +12,16 @@ import logging import allure -import pytest from pathlib import Path from core import settings from core.executor import WorkflowExecutor from core.session import Session -from commons.exchange import Exchange -from commons.file_processors.yaml_processor import YamlProcessor as FileHandle -from typing import Any, Dict, List, Type, Generator, Tuple +from core.exchange import Exchange + +from pydantic import ValidationError +from commons.file_processors.yaml_processor import YamlProcessor as FileHandle,YamlLoadError +from core.models import CaseInfo # 导入之前定义的 Pydantic 模型 +from typing import Any, List, Type, Generator, Union logger = logging.getLogger(__name__) @@ -39,29 +41,107 @@ class TestTemplateBase: class CaseDataLoader: """ - 职责 1: 数据加载器 - 负责与底层存储(YAML文件)打交道,输出标准化的原始数据对象 + 测试用例加载器 + 职责:扫描文件系统 -> 载入 YAML -> 拆解参数化 -> 封装为 CaseInfo 模型 """ @staticmethod - def fetch_yaml_cases(cases_dir: str) -> Generator[Tuple[Path, Dict], None, None]: + def fetch_yaml_files(cases_dir: str) -> Generator[Path, None, None]: """扫描目录并迭代返回 (文件路径, 原始内容)""" - yaml_files = Path(cases_dir).glob("**/test_*.yaml") - for file_path in yaml_files: - yield file_path, FileHandle(file_path) + base_path = Path(cases_dir) + if not base_path.exists(): + logger.error(f"📂 测试目录不存在: {base_path}") + return + # 匹配所有以 test_ 开头的 yaml 文件 + yield from base_path.rglob("test_*.yaml") + + @classmethod + def load_cases(cls, file_path: Path) -> List[CaseInfo]: + """ + 加载单个 YAML 文件并转化为 CaseInfo 列表 + 包含参数化数据的自动拆解逻辑 + """ + cases = [] + try: + # 1. 使用重构后的 YamlProcessor 加载原始字典 + processor = FileHandle(file_path) + raw_data = processor.load() + + if not raw_data: + return [] + + # 2. 检查是否存在参数化字段 + if "parametrize" in raw_data and isinstance(raw_data["parametrize"], list): + cases.extend(cls._parse_parametrize(raw_data)) + else: + # 3. 普通单条用例封装 + cases.append(CaseInfo(**raw_data)) + + except YamlLoadError: + # YamlProcessor 已经记录了 error 日志,这里直接跳过 + pass + except ValidationError as e: + logger.error(f"用例格式校验失败 [{file_path.name}]:\n{e.json()}") + except Exception as e: + logger.error(f"加载用例发生未知异常 [{file_path.name}]: {e}") + + return cases @staticmethod - def parse_parametrize(raw_data: Dict, default_name: str) -> Tuple[List[str], List[List[Any]], List[str]]: - """解析参数化结构,返回 (字段名列表, 数据值列表, ID列表)""" - if "parametrize" in raw_data: - fields = raw_data["parametrize"][0] - values = raw_data["parametrize"][1:] - ids = [f"{v[0]}" for v in values] - else: - fields = ["case_data"] - values = [[raw_data]] - ids = [raw_data.get("title", default_name)] - return fields, values, ids + def _parse_parametrize(raw_data: dict[str, Any]) -> List[CaseInfo]: + """ + 解析参数化逻辑:将 raw_data 中的 parametrize 展开为多个 CaseInfo 实例 + """ + param_content = raw_data.pop("parametrize") + if len(param_content) < 2: + logger.warning(f"参数化数据不足(需包含 Header 和至少一行 Data): {raw_data.get('title')}") + return [CaseInfo(**raw_data)] + + # 第一行作为变量名 (Headers),后续作为数据行 + headers = param_content[0] + data_rows = param_content[1:] + + case_list = [] + for row in data_rows: + # 将变量名和对应行数据打包成字典,例如 {"username": "user1", "title": "测试1"} + row_map = dict(zip(headers, row)) + + # 深拷贝原始模板,避免多行数据互相干扰 + case_tmp = raw_data.copy() + + # 关键优化:如果参数化里包含 'title',自动更新顶层的 title 字段 + # 注意:此处仅做初步合并,更复杂的 ${var} 替换由 WorkflowExecutor 的 Exchanger 完成 + if "title" in row_map: + case_tmp["title"] = row_map["title"] + + # 将当前行的数据注入到 CaseInfo 中(此处可以暂存在字段中,或由执行器处理) + # 为了保持模型兼容,我们把 row_map 的信息合入 case_tmp + case_list.append(CaseInfo(**case_tmp)) + + return case_list + + @classmethod + def get_all_cases(cls, cases_dir: Union[str, Path]) -> List[CaseInfo]: + """ + 全量获取接口:供 CaseGenerator 调用 + """ + all_cases = [] + for file in cls.fetch_yaml_files(cases_dir): + all_cases.extend(cls.load_cases(file)) + return all_cases + + # @staticmethod + # def parse_parametrize(raw_data: Dict, default_name: str) -> Tuple[List[str], List[List[Any]], List[str]]: + # """解析参数化结构,返回 (字段名列表, 数据值列表, ID列表)""" + # if "parametrize" in raw_data: + # fields = raw_data["parametrize"][0] + # values = raw_data["parametrize"][1:] + # ids = [f"{v[0]}" for v in values] + # else: + # fields = ["case_data"] + # values = [[raw_data]] + # ids = [raw_data.get("title", default_name)] + # return fields, values, ids class CaseGenerator: @@ -73,43 +153,56 @@ class CaseGenerator: @classmethod def build_and_register(cls, target_cls: Type[TestTemplateBase], cases_dir: str): # 1. 通过 Loader 获取数据 - for file_path, raw_data in CaseDataLoader.fetch_yaml_cases(cases_dir): - # 2. 解析参数化信息 - fields, values, ids = CaseDataLoader.parse_parametrize(raw_data, file_path.stem) - - # 3. 生成执行函数 (闭包) - dynamic_test_method = cls._create_case_method(raw_data, fields, values, ids) - - # 4. 挂载 - method_name = f"test_{file_path.stem}" - setattr(target_cls, method_name, dynamic_test_method) + all_cases=CaseDataLoader.get_all_cases(cases_dir) + for index, case_info in enumerate(all_cases): + dynamic_test_method=cls._create_case_method(case_info) + # for file_path, raw_data in CaseDataLoader.get_all_cases(cases_dir): + # # 2. 解析参数化信息 + # fields, values, ids = CaseDataLoader.parse_parametrize(raw_data, file_path.stem) + # + # # 3. 生成执行函数 (闭包) + # dynamic_test_method = cls._create_case_method(raw_data, fields, values, ids) + # + # # 4. 挂载 + # method_name = f"test_{file_path.stem}" + method_name = f"test_case_{index}_{case_info.title[:20]}" + safe_name = "".join([c if c.isalnum() else "_" for c in method_name]) + # setattr(target_cls, method_name, dynamic_test_method) + setattr(target_cls, safe_name, dynamic_test_method) logger.debug(f"Successfully registered: {method_name}") @staticmethod - def _create_case_method(case_template: Dict, fields: List[str], values: List[Any], ids: List[str]): + # def _create_case_method(case_template: Dict, fields: List[str], values: List[Any], ids: List[str]): + def _create_case_method(case_template: CaseInfo): """封装具体的 pytest 执行节点""" # 预取 Allure 层级信息 - epic = case_template.get("epic", settings.allure_epic) - feature = case_template.get("feature", settings.allure_feature) - story = case_template.get("story", settings.allure_story) + # epic = case_template.get("epic", settings.allure_epic) + # feature = case_template.get("feature", settings.allure_feature) + # story = case_template.get("story", settings.allure_story) + epic = case_template.epic or settings.allure_epic + feature = case_template.feature or settings.allure_feature + story = case_template.story or settings.allure_story @allure.epic(epic) @allure.feature(feature) @allure.story(story) - @pytest.mark.parametrize("case_args", values, ids=ids) - def build_actual_case(instance: TestTemplateBase, case_args: List[Any]): + # @pytest.mark.parametrize("case_args", values, ids=ids) + # def build_actual_case(instance: TestTemplateBase, case_args: List[Any]): + def build_actual_case(instance: TestTemplateBase): # 数据组装 - current_params = dict(zip(fields, case_args)) - case_exec_data = {**case_template, **current_params} - case_title = current_params.get("title", "未命名用例") + # current_params = dict(zip(fields, case_args)) + # case_exec_data = {**case_template, **current_params} + # case_title = current_params.get("title", "未命名用例") + case_title = case_template.title or "未命名用例" # 日志记录 (利用 instance 标注来源) logger.info(f"🚀 [Runner] Class: {instance.__class__.__name__} | Case: {case_title}") # 执行与断言 allure.dynamic.title(case_title) - executor.perform(case_exec_data) + # executor.perform(case_exec_data) + executor.perform(case_template) # 手动链路装饰 (Allure) # run_actual_case = allure.epic(epic)(run_actual_case) @@ -120,5 +213,8 @@ class CaseGenerator: if __name__ == '__main__': + from settings import TEST_CASE_DIR + + print(CaseDataLoader.get_all_cases(TEST_CASE_DIR)) # --- 引导执行 --- - CaseGenerator.build_and_register(TestTemplateBase, settings.TEST_CASE_DIR) + # CaseGenerator.build_and_register(TestTemplateBase, settings.TEST_CASE_DIR) diff --git a/core/exchange.py b/core/exchange.py new file mode 100644 index 0000000..eb0baf3 --- /dev/null +++ b/core/exchange.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@desc: 变量交换器,用于数据替换和提取 +""" + +import logging +import re +from typing import Any, Union, TypeVar + +import jsonpath +from lxml import etree + + +from core.models import CaseInfo +from core.settings import EXTRACT_CACHE +from core.templates import Template +from commons.file_processors.yaml_processor import YamlProcessor + +logger = logging.getLogger(__name__) + +# 定义泛型,用于保持返回类型一致 +T = TypeVar("T", bound=Union[dict, list, str, Any]) + + +class Exchange: + def __init__(self, cache_path: str): + self.cache_path = cache_path + self.file_handler = YamlProcessor(filepath=self.cache_path) + # 1. 增加内存缓存,避免频繁磁盘 I/O + self._variable_cache = self.file_handler.load() or {} + # 匹配标准变量 ${var},排除函数调用 ${func()} + # self.var_only_pattern = re.compile(r"\$\{([a-zA-Z_]\w*)}") + self.var_only_pattern = re.compile(r"^\$\{([a-zA-Z_]\w*)}$") + + def extract(self, resp, var_name: str, attr: str, expr: str, index: int = 0): + """ + 从响应中提取数据并更新到缓存及文件 + :param resp: Response 对象 + :param var_name: 变量名 + :param attr: 属性名 (json, text, headers 等) + :param expr: 提取表达式 ($.jsonpath, //xpath, regex) + :param index: 索引 + """ + + try: + # 兼容处理 resp.json + target_data = getattr(resp, attr, None) + if attr == "json": + try: + target_data = resp.json() + except Exception: + target_data = {"msg": "not json data"} + + if target_data is None: + logger.warning(f"提取失败: 响应对象中不存在属性 '{attr}'") + return + + value = None + + if expr.startswith("$"): # JSONPath + res = jsonpath.jsonpath(target_data, expr) + if res: value = res[index] + elif expr.startswith("/") or expr.startswith("./"): # XPath 模式 + # 将文本解析为 HTML 树 + html_content = resp.text + tree = etree.HTML(html_content) + res = tree.xpath(expr) + if res: + # 获取节点文本或属性值 + target_node = res[index] + value = target_node.text if hasattr(target_node, 'text') else str(target_node) + else: # 正则 + res = re.findall(expr, str(target_data)) + if res: value = res[index] + + if value is None: + logger.warning(f"变量 [{var_name}] 未通过表达式 [{expr}] 提取到数据") + value = "not data" + + self._variable_cache[var_name] = value + self.file_handler.save(self._variable_cache) + logger.info(f"变量提取成功: {var_name} -> {value} (Type: {type(value).__name__})") + + except Exception as e: + logger.error(f"提取变量 [{var_name}] 过程中发生异常: {e}", exc_info=True) + + def _smart_replace(self, content: Any) -> Any: + """ + 递归替换逻辑: + - 如果是纯变量占位符 ${token},则返回变量在缓存中的原始类型 (int, dict, list 等) + - 如果是混合字符串或函数调用,则调用 Template 渲染为字符串 + """ + if isinstance(content, dict): + return {k: self._smart_replace(v) for k, v in content.items()} + elif isinstance(content, list): + return [self._smart_replace(i) for i in content] + elif isinstance(content, str): + # A. 场景:纯变量(为了保持类型,不走 Template 渲染成字符串) + # 例子:content = "${order_id}",如果 order_id 是 int 123,则返回 123 + full_match = self.var_only_pattern.fullmatch(content) + if full_match: + var_name = full_match.group(1) + + return self._variable_cache.get(var_name, content) + + # B. 场景:混合文本或函数调用 + # 例子:"Bearer ${token}" 或 "${gen_phone()}" + if "${" in content: + # 调用你提供的 Template 类 + return Template(content).render(self._variable_cache) + + return content + + def replace(self, data: T) -> T: + """ + 通用的变量替换入口 + 支持输入 dict, list, str 或 Pydantic Model (需先 dump) + """ + if not data: + return data + + logger.debug(f"开始变量替换,原始数据类型: {type(data).__name__}") + + rendered_data = self._smart_replace(data) + + return rendered_data + + +if __name__ == "__main__": + from core.models import CaseInfo, RequestModel + + # 模拟外部写入一个初始变量 + with open(EXTRACT_CACHE, "w") as f: + f.write("existing_var: '100'\n") + + ex = Exchange(EXTRACT_CACHE) + + + # --- 场景 1: 变量提取验证 --- + class MockResponse: + def __init__(self): + self.json_data = {"data": {"token": "auth_123", "user_id": 888}} + self.text = "
ChenWei
" + + def json(self): return self.json_data + + + mock_resp = MockResponse() + print(">>> 执行提取...") + ex.extract(mock_resp, "token", "json", "$.data.token") + ex.extract(mock_resp, "u_id", "json", "$.data.user_id") + ex.extract(mock_resp, "user_name", "text", "//div[@id='name']") + + # --- 场景 2: 变量替换与类型保持 --- + # 定义一个复杂的 CaseInfo + raw_case = { + "title": "测试用例", + "request": { + "method": "POST", + "url": "http://api.com/${token}", # 混合文本 -> 应转为 str + "json_body": { + "id": "${u_id}", # 纯变量 -> 应保持 int + "name": "${user_name}", # 纯变量 -> str + "config": "${existing_var}" # 初始文件变量 -> int + }, + "timeout": "${existing_var}" # 字符串形式的数字 -> Pydantic 应转回 int + } + } + + print("\n>>> 执行替换...") + new_case = ex.replace(raw_case) + print(new_case) + new_case = CaseInfo(**new_case) + + # --- 校验结果 --- + print("\n--- 验证结果 ---") + print(f"URL (混合文本): {new_case.request.url} | 类型: {type(new_case.request.url)}") + print(f"ID (类型保持): {new_case.request.json_body['id']} | 类型: {type(new_case.request.json_body['id'])}") + print(f"Timeout (自动转换): {new_case.request.timeout} | 类型: {type(new_case.request.timeout)}") + + assert isinstance(new_case.request.json_body['id'], int) + + assert new_case.request.url == "http://api.com/auth_123" + assert new_case.request.timeout == 100 + + # if os.path.exists(cache_path): os.remove(cache_path) + print("\nExchange 场景全部验证通过!") diff --git a/core/executor.py b/core/executor.py index fd20113..b52a77a 100644 --- a/core/executor.py +++ b/core/executor.py @@ -7,59 +7,82 @@ import logging import importlib -from typing import Any -from commons.models.case_model import CaseInfo +from typing import Any, List + +from pydantic import TypeAdapter + +from core import settings +from core.models import CaseInfo, ValidateItem, RequestModel, ApiActionModel from core.session import Session -from commons.exchange import Exchange -from commons.asserts import Asserts +from core.exchange import Exchange +from utils.case_validator import CaseValidator logger = logging.getLogger(__name__) +# 定义一个复用的适配器(减少初始化开销) +VALIDATE_LIST_ADAPTER = TypeAdapter(List[ValidateItem]) + class WorkflowExecutor: def __init__(self, session: Session, exchanger: Exchange): self.session = session self.exchanger = exchanger - def perform(self, case_data: dict) -> Any: + def perform(self, case_info: CaseInfo) -> Any: """执行单个用例:支持直接请求和PO模式调用""" - # 1. 变量替换(将 ${var} 替换为真实值) - rendered_case = self.exchanger.replace(case_data) + try: + # raw_data = case_info.model_dump(by_alias=True, exclude_none=True) + # 1. 变量替换(将 ${var} 替换为真实值) + # rendered_dict = self.exchanger.replace(raw_data) + # rendered_case = CaseInfo.model_validate(rendered_dict) + # --- 2. 决定执行模式 --- + if case_info.is_po_mode(): + # PO 模式:仅渲染 api_action + action_dict = case_info.api_action.model_dump(by_alias=True, exclude_none=True) + rendered_action_dict = self.exchanger.replace(action_dict) + # 重新校验以修复类型(如 params 里的 int) + rendered_action = ApiActionModel.model_validate(rendered_action_dict) + # PO 模式:反射调用 - # 2. 决定执行模式 - if "api_action" in rendered_case: - # --- PO 模式:反射调用业务层 --- - action = rendered_case["api_action"] - resp = self._execute_po_method( - class_name=action["class"], - method_name=action["method"], - params=action.get("params", {}) - ) - else: - # --- 数据驱动模式:直接发送请求 --- - # 使用 Pydantic 校验 request 结构 - case_info = CaseInfo(**rendered_case) - request_info = case_info.request.model_dump(by_alias=True, exclude_none=True) - resp = self.session.request(**request_info) + resp = self._execute_po_method(action=rendered_action) + else: + # 接口模式:直接请求 + # 直接将 RequestModel 转为字典传给 session.request + request_kwargs = case_info.request.model_dump(by_alias=True, exclude_none=True) + rendered_req_dict = self.exchanger.replace(request_kwargs) + rendered_request = RequestModel.model_validate(rendered_req_dict) - # 3. 提取变量 (接口关联) - if rendered_case.get("extract"): - for var_name, extract_info in rendered_case["extract"].items(): - self.exchanger.extract(resp, var_name, *extract_info) + request_kwargs = rendered_request.model_dump(by_alias=True, exclude_none=True) + resp = self.session.request(**request_kwargs) - # 4. 断言校验 - if rendered_case.get("validate"): - Asserts.validate(resp, rendered_case["validate"]) + # --- 3. 后置处理 (提取 & 断言) --- + self._post_process(resp, case_info) - return resp + return resp + except Exception as e: + logger.error(f"用例执行失败: {case_info.title} | 原因: {e}", exc_info=True) + raise - def _execute_po_method(self, class_name: str, method_name: str, params: dict): + def _execute_po_method(self, action: ApiActionModel): """核心反射逻辑:根据字符串动态加载 api/ 目录下的类并执行方法""" + class_name = action.api_class + method_name = action.method + params = action.params or {} + # 1. 确定模块路径:优先级策略 + # 优先级 1: 显式映射 (API_MAP) + module_name = settings.API_MAP.get(class_name) + + # 优先级 2: 规约命名 (UserAPI -> api.user_api) + if not module_name: + base_name = class_name.lower().replace('api', '') + module_name = f"{settings.API_PACKAGE}.{base_name}_api" + try: # 1. 动态导入模块(假设都在 api 目录下) # 例如 class_name 是 UserAPI,则尝试从 api.user 导入 # 这里简单处理,你可以根据你的文件名约定进一步优化逻辑 - module_name = f"api.{class_name.lower().replace('api', '')}" + # module_name = f"api.{class_name.lower().replace('api', '')}" + module = importlib.import_module(module_name) # 2. 获取类并实例化 @@ -68,8 +91,33 @@ class WorkflowExecutor: # 3. 调用方法并返回结果 method = getattr(api_instance, method_name) - logger.info(f"🚀 调用业务层: {class_name}.{method_name} 参数: {params}") + logger.info(f"调用业务层: {class_name}.{method_name} 参数: {params}") return method(**params) + except ImportError as e: + logger.error(f"模块导入失败: 在 '{module_name}' 未找到对应文件。请检查文件名或 settings.API_MAP 配置。") + raise e + except AttributeError as e: + logger.error(f"成员获取失败: 模块 '{module_name}' 中不存在类或方法 '{class_name}.{method_name}'。") + raise e except Exception as e: logger.error(f"反射调用失败: {class_name}.{method_name} -> {e}") - raise \ No newline at end of file + raise + + def _post_process(self, resp: Any, rendered_case: CaseInfo): + # 3. 提取变量 (接口关联) + if rendered_case.extract: + for var_name, extract_info in rendered_case.extract.items(): + self.exchanger.extract(resp, var_name, *extract_info) + + # 4. 断言校验 + if rendered_case.validate_data: + # raw_validate_list = [i.model_dump(by_alias=True) for i in rendered_case.validate_data] + raw_validate_list = [ + item.model_dump(by_alias=True) if isinstance(item, ValidateItem) else item + for item in rendered_case.validate_data + ] + + rendered_validate_list = self.exchanger.replace(raw_validate_list) + # 重新通过 Adapter 触发类型修复 (str -> int) + final_validate_data = VALIDATE_LIST_ADAPTER.validate_python(rendered_validate_list) + CaseValidator.validate(resp, final_validate_data) diff --git a/core/models.py b/core/models.py new file mode 100644 index 0000000..5f64e91 --- /dev/null +++ b/core/models.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: chen wei +@Software: PyCharm +@contact: t6i888@163.com +@file: models.py +@date: 2024 2024/9/15 21:14 +@desc: 声明yaml用例格式 +""" +import logging +from typing import List, Any, Optional, Union, Annotated + +import yaml +from pydantic import BaseModel, Field, ConfigDict, model_validator, field_validator, AfterValidator + +from core import settings + +logger = logging.getLogger(__name__) + + +def smart_cast_int(v: Any) -> Any: + if isinstance(v, str) and v.startswith("${") and v.endswith("}"): + return v + try: + return int(v) + except (ValueError, TypeError): + return v + + +def smart_cast_dict(v: Any) -> Any: + """确保字典格式,若是占位符(字符串形式)则放行""" + if isinstance(v, str) and v.startswith("${") and v.endswith("}"): + return v + if isinstance(v, dict) or v is None: + return v + return v # 也可以根据需求抛出异常 + + +# 使用 Annotated 定义带校验的类型 +SmartInt = Annotated[Union[int, str], AfterValidator(smart_cast_int)] +SmartDict = Annotated[Union[dict[str, Any], str], AfterValidator(smart_cast_dict)] + + +# --- 基础请求模型 (用于第一种示例:直接请求) --- +class RequestModel(BaseModel): + method: str = Field(..., description="HTTP 请求方法: get, post, etc.") + url: str = Field(..., description="接口路径或完整 URL") + params: Optional[SmartDict] = None + data: Optional[SmartDict] = None + json_body: Optional[Any] = Field(None, alias="json") + headers: Optional[SmartDict] = None + cookies: Optional[dict[str, str]] = None + timeout: SmartInt = Field(default=10) + files: Optional[SmartDict] = None + + model_config = ConfigDict(extra="allow", populate_by_name=True) # 允许扩展 requests 的其他参数 + + +# --- PO 动作模型 (用于第二种示例:业务层反射调用) --- +class ApiActionModel(BaseModel): + api_class: str = Field(..., alias="class", description="要调用的 API 类名") + method: str = Field(..., description="类中的方法名") + params: Optional[SmartDict] = Field(default_factory=dict, description="传给方法的参数") + + model_config = ConfigDict(populate_by_name=True) + + +# --- 新增:断言条目模型 --- +class ValidateItem(BaseModel): + check: Any = Field(..., description="要检查的字段或表达式") + expect: Any = Field(..., description="期望值") + assert_method: str = Field(default="equals", alias="assert", description="断言方法") + msg: Optional[str] = Field(default="Assertion", description="断言描述") + + model_config = ConfigDict(populate_by_name=True) + + +# --- 核心用例数据模型 --- +class CaseInfo(BaseModel): + # 公共元数据 + title: str = Field(..., description="用例标题") + epic: Optional[str] = None + feature: Optional[str] = None + story: Optional[str] = None + + # 核心逻辑分叉:可以是 request 对象,也可以是 api_action 对象 + # 根据 WorkflowExecutor 的逻辑,这里设为 Optional,但在具体校验时可以互斥 + request: Optional[RequestModel] = None + api_action: Optional[ApiActionModel] = None + + # 后置处理 + extract: Optional[dict[str, List[Any]]] = Field( + default=None, + description="变量提取表达式,格式: {变量名: [来源, 表达式, 索引]}" + ) + validate_data: Optional[List[Union[ValidateItem, dict[str, Any]]]] = Field( + default_factory=list, + alias="validate", + description="断言信息" + ) + + # 参数化(在 DataLoader 阶段会被拆解,但在初始加载时需要定义) + parametrize: Optional[List[List[Any]]] = None + + model_config = ConfigDict( + populate_by_name=True, # 无论是在代码中用 api_class 还是在 YAML 中用 class 赋值,Pydantic 都能正确识别。 + arbitrary_types_allowed=True # 允许在模型中使用非 Pydantic 标准类型(如自定义类实例) + ) + + # 核心优化:增加互斥校验 + @model_validator(mode='after') + def check_action_type(self) -> 'CaseInfo': + if not self.request and not self.api_action: + raise ValueError("用例必须包含 'request' 或 'api_action' 其中之一") + if self.request and self.api_action: + raise ValueError("'request' 和 'api_action' 不能同时存在") + return self + + def is_po_mode(self) -> bool: + """判断是否为 PO 模式""" + return self.api_action is not None + + +if __name__ == '__main__': + # 模拟数据 1:标准请求模式 + raw_case_1 = { + "title": "查询状态信息", + "request": { + "method": "get", + "url": "/api/v1/info", + "headers": {"User-Agent": "pytest-ai"} + }, + "validate": [ + {"check": "status_code", "assert": "equals", "expect": 200, "msg": "响应码200"}, + {"check": "$.msg", "expect": "Success"} + ] + } + + # 模拟数据 2:PO 模式 (反射调用) + raw_case_2 = { + "title": "用户登录测试", + "api_action": { + "class": "UserAPI", + "method": "login", + "params": {"user": "admin", "pwd": "123"} + }, + "extract": { + "token": ["json", "$.data.token", 0] + } + } + + print("--- 开始模型校验测试 ---\n") + + try: + # 验证模式 1 + case1 = CaseInfo(**raw_case_1) + print(f"✅ 模式1 (Request) 校验通过: {case1.title}") + print(f" 请求URL: {case1.request.url}") + print(f" 第一个断言方法: {case1.validate_data[0].assert_method}\n") + + # 验证模式 2 + case2 = CaseInfo(**raw_case_2) + print(f"✅ 模式2 (PO Mode) 校验通过: {case2.title}") + print(f" 调用类: {case2.api_action.api_class}") + print(f" 提取规则数: {len(case2.extract)}\n") + + # 验证非法数据(如:既没有 request 也没有 api_action 的情况可以在业务层进一步校验) + # 这里演示 Pydantic 自动类型转换 + invalid_data = {"title": "错误用例", "request": {"url": "/api"}} # 缺少 method + print("--- 预期失败测试 ---") + CaseInfo(**invalid_data) + + except Exception as e: + print(f"❌ 预期内的校验失败: \n{e}") diff --git a/core/settings.py b/core/settings.py index c9285d7..b798255 100644 --- a/core/settings.py +++ b/core/settings.py @@ -21,6 +21,8 @@ load_dotenv() # --- 目录配置 --- TEST_CASE_DIR = BASE_DIR / "test_cases" +EXTRACT_CACHE = BASE_DIR / "data/extract.yaml" + OUTPUT_DIR = BASE_DIR / "outputs" SCREENSHOT_DIR = OUTPUT_DIR / "screenshots" LOG_DIR = OUTPUT_DIR / "logs" @@ -31,19 +33,23 @@ REPORT_DIR = BASE_DIR / "reports" CONFIG_DIR = BASE_DIR / "config" DATA_DIR = BASE_DIR / "data" -test_suffix = "yaml" +# 核心 API 目录路径 +API_PACKAGE = "api" -base_url = os.getenv("BASE_URL") -db_host = os.getenv("DB_HOST") # ip -db_port = os.getenv("DB_PORT") # 端口 -db_user = os.getenv("DB_USER") # 用户名 -db_password = os.getenv("DB_PASSWORD") # 密码 -db_database = os.getenv("DB_DATABASE") +# 可选:显式映射(类名 -> 完整模块路径),解决文件名不规则的问题 +API_MAP = { + "UserAPI": "api.business.user", + "OrderAPI": "api.v2.order_manager" +} allure_epic: str = "项目名称:answer" allure_feature: str = "默认特征(feature)" allure_story: str = "默认事件(story)" +test_suffix = "yaml" + +base_url = os.getenv("BASE_URL") + rsa_public = "" rsa_private = "" diff --git a/core/templates.py b/core/templates.py new file mode 100644 index 0000000..6856b59 --- /dev/null +++ b/core/templates.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: chen wei +@Software: PyCharm +@contact: t6i888@163.com +@file: templates.py +@date: 2024 2024/9/22 22:20 +@desc: +""" +import copy +import logging +import re +import string +import ast +from typing import List, Any + +from commons.funcs import Funcs + +logger = logging.getLogger(__name__) + + +class Template(string.Template): + """ + 增强型模板引擎: + 1. 兼容标准变量替换 ${var} + 2. 支持带参数的函数调用 ${func(arg1, arg2)} + 3. 支持变量嵌套作为函数参数 ${func(${var})} + """ + + # call_pattern = re.compile(r"\${(?P.*?)\((?P.*?)\)}") + # call_pattern = re.compile(r"\$\{(?P[a-zA-Z_]\w*)\((?P.*)\)}") + # 匹配函数调用结构:${函数名(参数)} + # 分组:func_name (字母下划线开头), func_args (括号内的所有内容) + call_pattern = re.compile(r"\$\{(?P[a-zA-Z_]\w*)\((?P.*)\)}") + + def render(self, mapping: dict) -> str: + """ + 渲染入口 + :param mapping: 变量缓存(来自 Exchange._variable_cache) + :return: 渲染后的字符串 + """ + # 1. 第一步:利用原生 string.Template 替换基础变量 + # 这一步会将参数中的 ${var} 预先替换为实际值,从而支持函数嵌套调用 + s = self.safe_substitute(mapping) # 原有方法替换变量 + # 2. 第二步:解析并执行函数调用 + s = self.safe_substitute_funcs(s, mapping) + + return s + + @staticmethod + def _parse_args(args_str: str, mapping: dict) -> List[Any]: + """ + 核心优化:安全拆分函数参数 + 利用正则预读,跳过引号内的逗号,解决 ${func('a,b', 123)} 的分割问题 + """ + args_str = args_str.strip() + if not args_str: + return [] + + # 正则解析说明:匹配逗号,但该逗号后面必须有偶数个引号(说明逗号不在引号内) + raw_args = re.split(r',(?=(?:[^\'"]*[\'"][^\'"]*[\'"])*[^\'"]*$)', args_str) + + processed_args = [] + for arg in raw_args: + arg = arg.strip() + # 1. 处理带引号的字符串参数 + if (arg.startswith("'") and arg.endswith("'")) or (arg.startswith('"') and arg.endswith('"')): + processed_args.append(arg[1:-1]) + # 2. 处理数字类型 + elif arg.isdigit(): + processed_args.append(int(arg)) + # 3. 处理布尔值 + elif arg.lower() == "true": + processed_args.append(True) + elif arg.lower() == "false": + processed_args.append(False) + # 4. 如果在 mapping 中能找到(针对未经过第一步替换的情况),取其值 + elif arg in mapping: + processed_args.append(mapping[arg]) + # 5. 其他情况按原样字符串处理 + else: + processed_args.append(arg) + + return processed_args + + def safe_substitute_funcs(self, template: str, mapping: dict) -> str: + """ + 解析字符串中的函数名和参数,并将函数调用结果进行替换 + :param template: 字符串 + :param mapping: 上下文,提供要使用的函数和变量 + :return: 替换后的结果 + """ + + # 合并函数映射和变量映射,作为统一上下文 + # 使用解构赋值替代 deepcopy,提升性能 + logger.info(f"mapping更新前: {mapping}") + render_context = {**Funcs.FUNC_MAPPING, **mapping} + logger.info(f"mapping更新后: {render_context}") + + # mapping = copy.deepcopy(mapping) + # logger.info(f"mapping更新前: {mapping}") + # mapping.update(self.FUNC_MAPPING) # 合并两个mapping + # mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping + # logger.info(f"mapping更新后: {mapping}") + + def convert(mo): + func_name = mo.group("func_name") + # func_args = mo.group("func_args").split(",") + func_args_str = mo.group("func_args") + + func = render_context.get(func_name) # 读取指定函数 + + if not callable(func): + logger.warning(f"模板中的函数 '{func_name}' 未定义或不可调用") + return mo.group() + # 解析参数列表 + args = self._parse_args(func_args_str, render_context) + + try: + # 执行函数并强制转为字符串返回,以便 re.sub 替换 + result = func(*args) + return str(result) + except Exception as e: + logger.error(f"执行函数 ${{{func_name}(...)}} 报错: {e}", exc_info=True) + return mo.group() + + return self.call_pattern.sub(convert, template) + + +if __name__ == '__main__': + # 模拟 Funcs.FUNC_MAPPING + def mock_concat(a, b): + return f"{a}_{b}" + + + def mock_get_now(): + return "2026-03-09" + + + def mock_add(x, y): + return x + y + + + # 注入模拟函数 + Funcs.FUNC_MAPPING = { + "concat": mock_concat, + "now": mock_get_now, + "add": mock_add + } + + # 模拟变量缓存 + test_mapping = { + "env": "prod", + "num1": 10, + "num2": 20 + } + + test_cases = [ + ("场景A:标准变量", "Current env is ${env}", "Current env is prod"), + ("场景B:无参数函数", "Date: ${now()}", "Date: 2026-03-09"), + ("场景C:带参数函数(含逗号)", "Res: ${concat('hello,world', 'test')}", "Res: hello,world_test"), + ("场景D:变量嵌套函数参数", "Sum: ${add(${num1}, ${num2})}", "Sum: 30"), + ("场景E:混合模式", "URL: /${env}/api/${now()}", "URL: /prod/api/2026-03-09"), + ("场景F:参数类型自动识别", "Value: ${add(5, 5)}", "Value: 10"), # 5应该被识别为int + ] + + print(f"{'测试场景':<25} | {'预期结果':<30} | {'实际结果'}") + print("-" * 80) + + for scene, tpl_str, expected in test_cases: + actual = Template(tpl_str).render(test_mapping) + status = "✅" if str(actual) == str(expected) else "❌" + print(f"{scene:<25} | {expected:<30} | {actual} {status}") + + # 特殊验证:嵌套失败回退 + print("\n>>> 验证未定义函数回退:") + error_tpl = "Check: ${undefined_func()}" + print(f"结果: {Template(error_tpl).render(test_mapping)}") diff --git a/data/extract.yaml b/data/extract.yaml new file mode 100644 index 0000000..3591633 --- /dev/null +++ b/data/extract.yaml @@ -0,0 +1,4 @@ +existing_var: '100' +token: auth_123 +u_id: 888 +user_name: ChenWei diff --git a/id.yaml b/data/id.yaml similarity index 100% rename from id.yaml rename to data/id.yaml diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..e7724c3 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,56 @@ +# Project Structure Documentation + +This document outlines the recommended structure for the Interface Automation Test project. A well-organized structure promotes maintainability, scalability, and collaboration. + +## Directory Structure + +Here is the proposed optimized directory structure: + +``` +/ +|-- core/ # Main source code +| |-- api.py +| |-- main.py +| |-- luffy.py +| +-- ... +| +|-- tests/ # Test cases +| |-- a_test_case.py +| +-- ... +| +|-- config/ # Configuration files +| |-- id.yaml +| |-- extract.yaml +| +-- ... +| +|-- utils/ # Utility modules +| +|-- docs/ # Project documentation +| +-- README.md +| +|-- .gitignore # Git ignore file +|-- pytest.ini # Pytest configuration +|-- pyproject.toml # Python project configuration +|-- README.md # Main project README +``` + +## Description of Directories + +* **`core/`**: This directory contains the core application logic for the interface tests. Files like `api.py`, `main.py`, and `luffy.py` which handle the main business logic should reside here. + +* **`tests/`**: This directory is for all the automated tests. Each test file should ideally correspond to a module or a feature. + +* **`config/`**: This directory should store all configuration files, such as `id.yaml` and `extract.yaml`. This separation makes it easier to manage different environments (e.g., development, staging, production). + +* **`utils/`**: This directory holds common utility functions and helper scripts that can be used across different parts of the project. + +* **`docs/`**: This directory contains all project-related documentation, including this structure guide. + +## Benefits of this Structure + +* **Clarity**: A clear separation of concerns makes it easy to find code. +* **Maintainability**: Easier to maintain and refactor code without affecting other parts of the system. +* **Scalability**: The structure can easily scale as the project grows in complexity. +* **Collaboration**: New developers can quickly understand the project layout and start contributing. + +We recommend moving the existing files into this new structure to improve the overall quality of the project. diff --git a/extract.yaml b/extract.yaml deleted file mode 100644 index 862d6c6..0000000 --- a/extract.yaml +++ /dev/null @@ -1,7 +0,0 @@ -name: 张三 -age: '18' -data: -- 3 -- 4 -- 5 -aaa: null diff --git a/logs/pytest.log b/logs/pytest.log deleted file mode 100644 index a70ffbe..0000000 --- a/logs/pytest.log +++ /dev/null @@ -1,54 +0,0 @@ -03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:45 - 加载文件:D:\CNWei\CNW\InterfaceAutoTest\TestCases\answer\test_1_status.yaml -03/03/2025 05:34:28 PM [commons.cases] INFO cases.find_yaml_case:50 - case_info=title: 查询状态信息 -request: - method: get - url: /answer/api/v1/connector/info - headers: - Host: 119.91.19.171:40065 - Accept-Language: en_US - Accept: application/json, text/plain, */* - User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, - like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0 - Referer: http://119.91.19.171:40065/users/login - Accept-Encoding: gzip, deflate -extract: - msg: - - json - - $.msg - - 0 -validate: - equals: - 状态码等于200: - - Success. - - ${msg} -parametrize: [] -epic: 项目名称:answer -feature: 页面状态 -story: 状态 - -03/03/2025 05:34:28 PM [commons.models] INFO models.ddt:81 - 1,执行这一步 -03/03/2025 05:34:28 PM [commons.cases] INFO cases.new_case:63 - ddt_title=['查询状态信息'] -03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_setup:122 - -----------------Start: main.py::TestAPI::test_1_status[查询状态信息]----------------- -03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:71 - =================================用例开始执行:查询状态信息================================== -03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称:answer', feature='页面状态', story='状态') -03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12} -03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': , 'float': , 'bool': , 'url_unquote': , 'str': , 'time_str': , 'add': , 'sql': , 'new_id': , 'last_id': , 'md5': , 'base64_encode': , 'base64_decode': , 'rsa_encode': , 'rsa_decode': } -03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:75 - 1,正在注入变量... -03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:78 - 2,正在请求接口... -03/03/2025 05:34:28 PM [requests.session] INFO session.send:36 - 发送请求>>>>>> 接口地址 = GET http://119.91.19.171:40065/answer/api/v1/connector/info -03/03/2025 05:34:28 PM [requests.session] INFO session.send:37 - 发送请求>>>>>> 请求头 = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': 'application/json, text/plain, */*', 'Connection': 'keep-alive', 'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Referer': 'http://119.91.19.171:40065/users/login'} -03/03/2025 05:34:28 PM [requests.session] INFO session.send:38 - 发送请求>>>>>> 请求正文 = None -03/03/2025 05:34:28 PM [requests.session] INFO session.send:42 - 接收响应 <<<<<< 状态码 = 200 -03/03/2025 05:34:28 PM [requests.session] INFO session.send:43 - 接收响应 <<<<<< 响应头 = {'Content-Type': 'application/json; charset=utf-8', 'Date': 'Mon, 03 Mar 2025 09:34:29 GMT', 'Content-Length': '63'} -03/03/2025 05:34:28 PM [requests.session] INFO session.send:44 - 接收响应 <<<<<< 响应正文 = {'code': 200, 'reason': 'base.success', 'msg': 'Success.', 'data': []} -03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:81 - 3,正在提取变量... -03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:87 - 4,正在断言... -03/03/2025 05:34:28 PM [commons.exchange] INFO exchange.replace:64 - CaseInfo(title='查询状态信息', request={'method': 'get', 'url': '/answer/api/v1/connector/info', 'headers': {'Host': '119.91.19.171:40065', 'Accept-Language': 'en_US', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0', 'Referer': 'http://119.91.19.171:40065/users/login', 'Accept-Encoding': 'gzip, deflate'}}, extract={'msg': ['json', '$.msg', 0]}, validate={'equals': {'状态码等于200': ['Success.', '${msg}']}}, parametrize=[], epic='项目名称:answer', feature='页面状态', story='状态') -03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:51 - mapping更新前: {'msg': 'Success.', 'id': 12} -03/03/2025 05:34:28 PM [commons.templates] INFO templates.safe_substitute_funcs:54 - mapping更新后: {'msg': 'Success.', 'id': 12, 'int': , 'float': , 'bool': , 'url_unquote': , 'str': , 'time_str': , 'add': , 'sql': , 'new_id': , 'last_id': , 'md5': , 'base64_encode': , 'base64_decode': , 'rsa_encode': , 'rsa_decode': } -03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:32 - 键:equals,值:{'状态码等于200': ['Success.', 'Success.']} -03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.assert_all:34 - 获取到的断言: -03/03/2025 05:34:28 PM [utils.case_validator] INFO case_validator.validate_equals:43 - assert Success. == Success., 状态码等于200执行这段代码 -03/03/2025 05:34:28 PM [commons.cases] INFO cases.test_func:92 - =================================用例执行结束:查询状态信息================================== -03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_result_log:190 - test status is PASSED (main.py::TestAPI::test_1_status[查询状态信息]): -03/03/2025 05:34:28 PM [pytest_result_log] INFO plugin.pytest_runtest_teardown:128 - ------------------End: main.py::TestAPI::test_1_status[查询状态信息]------------------ diff --git a/luffy.py b/luffy.py deleted file mode 100644 index 0a8e859..0000000 --- a/luffy.py +++ /dev/null @@ -1,64 +0,0 @@ -import os -import time -from logging.handlers import TimedRotatingFileHandler - - -class LufffyTimedRotatingFileHandler(TimedRotatingFileHandler): - def doRollover(self): - """ - do a rollover; in this case, a date/time stamp is appended to the filename - when the rollover happens. However, you want the file to be named for the - start of the interval, not the current time. If there is a backup count, - then we have to get a list of matching filenames, sort them and remove - the one with the oldest suffix. - """ - if self.stream: - self.stream.close() - self.stream = None - # get the time that this sequence started at and make it a TimeTuple - currentTime = int(time.time()) - dstNow = time.localtime(currentTime)[-1] - t = self.rolloverAt - self.interval - if self.utc: - timeTuple = time.gmtime(t) - else: - timeTuple = time.localtime(t) - dstThen = timeTuple[-1] - if dstNow != dstThen: - if dstNow: - addend = 3600 - else: - addend = -3600 - timeTuple = time.localtime(t + addend) - """ - dfn = self.rotation_filename(self.baseFilename + "." + - time.strftime(self.suffix, timeTuple)) - if os.path.exists(dfn): - os.remove(dfn) - self.rotate(self.baseFilename, dfn) - """ - # 多进程会导致误删日志,将上面代码重写为如下代码(判断如果不存在则重命名) - # 注意:如果改写的代码会影响其他模块则不能采用该方法 - dfn = self.rotation_filename(self.baseFilename + "." + - time.strftime(self.suffix, timeTuple)) - if not os.path.exists(dfn): - self.rotate(self.baseFilename, dfn) - - if self.backupCount > 0: - for s in self.getFilesToDelete(): - os.remove(s) - if not self.delay: - self.stream = self._open() - newRolloverAt = self.computeRollover(currentTime) - while newRolloverAt <= currentTime: - newRolloverAt = newRolloverAt + self.interval - #If DST changes and midnight or weekly rollover, adjust for this. - if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc: - dstAtRollover = time.localtime(newRolloverAt)[-1] - if dstNow != dstAtRollover: - if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour - addend = -3600 - else: # DST bows out before next rollover, so we need to add an hour - addend = 3600 - newRolloverAt += addend - self.rolloverAt = newRolloverAt \ No newline at end of file diff --git a/main.py b/main.py index 9b1657f..be3fb54 100644 --- a/main.py +++ b/main.py @@ -1,19 +1,22 @@ import os -import shutil -import datetime import pytest -from commons.cases import TestAPI - -TestAPI.run() # 加载yaml文件 - if __name__ == '__main__': - now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') - # 1,启动框架(生成临时文件) - # -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录:指定pytest.ini路径;--alluredir=temp。指定数据生成目录 - pytest.main([__file__, "-x", "-v","--alluredir=temp"]) - # 2,生成HTML报告 - os.system('allure generate temp -o report --clean') # java程序只能借助操作系统执行 - - # 3,备份日志 - # shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log") + # 定义报告和临时文件目录 + reports_dir = "reports" + temp_dir = os.path.join(reports_dir, "temp") + html_dir = os.path.join(reports_dir, "html") + + # 1. 执行 Pytest 测试 + # -v: 输出详细信息 + # --alluredir: 指定 Allure 临时数据目录 + # -c: 指定 pytest.ini 配置文件路径 + pytest.main([ + "-v", + "tests/", # 明确指定测试目录 + f"--alluredir={temp_dir}", + "-c=config/pytest.ini" + ]) + + # 2. 生成 Allure HTML 报告 + os.system(f'allure generate {temp_dir} -o {html_dir} --clean') diff --git a/pytest.ini b/pytest.ini index f6d23d8..57ff75e 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,5 @@ [pytest] +testpaths = tests addopts = -q --show-capture=no diff --git a/test_cases/answer/test_1_status.yaml b/test_cases/answer/test_1_status.yaml new file mode 100644 index 0000000..2d297d4 --- /dev/null +++ b/test_cases/answer/test_1_status.yaml @@ -0,0 +1,38 @@ +feature: 页面状态 +story: 状态 +title: 查询状态信息 +epic: 的点点滴滴 +request: + method: get + url: /answer/api/v1/connector/info + headers: + Host: 119.91.19.171:40065 + Accept-Language: en_US + Accept: application/json, text/plain, */* + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0 + Referer: http://119.91.19.171:40065/users/login + Accept-Encoding: gzip, deflate +extract: # 提取变量 + msg: + - "json" + - "$.msg" + - 0 + +validate: + - check: status_code # 检查的对象(或是变量名) + assert: equals # 断言方法 + expect: 200 # 期望值 + msg: "校验接口状态码" # 描述(可选) + + - check: $.msg + assert: contains + expect: "Success" + msg: "检查返回消息" + +parametrize: # 数据驱动测试 + - [ "title","username","password","msg" ] # 变量名 + - [ "测试1","user1","pass1","200" ] # 变量值 + - [ "测试2","user2","pass2","300" ] # 变量值 + - [ "测试3","user3","pass3","200" ] # 变量值 + - [ "测试4","user4","pass4","200" ] # 变量值 + - [ "测试5","user5","pass5","200" ] # 变量值 \ No newline at end of file diff --git a/test_cases/po_model_test.yaml b/test_cases/po_model_test.yaml new file mode 100644 index 0000000..4e04465 --- /dev/null +++ b/test_cases/po_model_test.yaml @@ -0,0 +1,25 @@ +feature: 用户管理 +story: 状态查询 +title: ${title} # 引用参数化里的变量 +epic: 混合模式示例 + +# 【关键改动】:不再写具体的 url, method, headers +# 而是指定要调用的 API 类和方法 +api_action: + class: UserAPI + method: get_connector_info + params: # 传给 get_connector_info 方法的参数 + username: ${username} + password: ${password} + +extract: + msg: ["json", "$.msg", 0] + +validate: + equals: + 业务状态码校验: ["${msg}", "Success."] + +parametrize: + - ["title", "username", "password", "msg"] + - ["测试1", "user1", "pass1", "Success."] + - ["测试2", "user2", "pass2", "Fail."] \ No newline at end of file diff --git a/test_cases/request_model_test.yaml b/test_cases/request_model_test.yaml new file mode 100644 index 0000000..bfd3718 --- /dev/null +++ b/test_cases/request_model_test.yaml @@ -0,0 +1,36 @@ +feature: 页面状态 +story: 状态 +title: 查询状态信息 +epic: 的点点滴滴 +request: + method: get + url: /answer/api/v1/connector/info + headers: + Host: 119.91.19.171:40065 + Accept-Language: en_US + Accept: application/json, text/plain, */* + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0 + Referer: http://119.91.19.171:40065/users/login + Accept-Encoding: gzip, deflate +extract: # 提取变量 + msg: + - "json" + - "$.msg" + - 0 + +validate: + - check: status_code # 检查的对象(或是变量名) + assert: equals # 断言方法 + expect: 200 # 期望值 + msg: "校验接口状态码" # 描述(可选) + + - check: $.msg + assert: contains + expect: "Success" + msg: "检查返回消息" +parametrize: # 数据驱动测试 + - [ "title","username","password","msg" ] # 变量名 + - [ "测试1","user1","pass1","200" ] # 变量值 + - [ "测试2","user2","pass2","300" ] # 变量值 + - [ "测试3","user3","pass3","200" ] # 变量值 + - [ "测试4","user4","pass4","200" ] # 变量值 \ No newline at end of file diff --git a/utils/case_parser.py b/utils/case_parser.py index 6c270c6..9c5a967 100644 --- a/utils/case_parser.py +++ b/utils/case_parser.py @@ -15,7 +15,7 @@ from dataclasses import dataclass, asdict, field import yaml -from commons.models import CaseInfo +from commons.models import TestCaseStruct class CaseParser: @@ -23,15 +23,15 @@ class CaseParser: def to_yaml(case_data: dict) -> str: try: - CaseInfo(**case_data) + TestCaseStruct(**case_data) except TypeError as error: logging.error(error) raise error return yaml.safe_dump(case_data, allow_unicode=True, sort_keys=False) @staticmethod - def from_yaml(yaml_str: str) -> CaseInfo: - return CaseInfo(**yaml.safe_load(yaml_str)) + def from_yaml(yaml_str: str) -> TestCaseStruct: + return TestCaseStruct(**yaml.safe_load(yaml_str)) if __name__ == '__main__': diff --git a/utils/case_validator.py b/utils/case_validator.py index 67c76b8..9d219b7 100644 --- a/utils/case_validator.py +++ b/utils/case_validator.py @@ -10,10 +10,18 @@ @desc: """ import logging +from typing import List, Union, Any + +from pydantic import TypeAdapter + +from core.exchange import Exchange +from core.models import ValidateItem logger = logging.getLogger(__name__) +VALIDATE_LIST_ADAPTER = TypeAdapter(List[ValidateItem]) + class CaseValidator: VALIDATORS = {} @@ -26,22 +34,46 @@ class CaseValidator: return decorator @classmethod - def assert_all(cls, validate: dict): - if not validate: + def validate(cls,response: Any, validate_list: List[ValidateItem]): + """ + 核心断言入口:适配 CaseInfo.validate_data (List[ValidateItem]) + """ + if not validate_list: return - for assert_type, cases in validate.items(): - logger.info(f"键:{assert_type},值:{cases}") - validator = cls.VALIDATORS.get(assert_type) - logger.info(f"获取到的断言:{validator}") + # dicts = [ + # item.model_dump(by_alias=True) if isinstance(item, ValidateItem) else item for item in validate_list + # ] + # rendered = exchanger.replace(dicts) + # # 触发 SmartInt/SmartDict 类型修复 + # final_list = VALIDATE_LIST_ADAPTER.validate_python(rendered) + + for item in validate_list: + # 1. 提取模型中的数据 + # 此时 final_case 里的 item 已经是经过变量替换后的实体 + actual = item.check + expect = item.expect + method = item.assert_method # 即模型中的 alias="assert" + msg = item.msg or f"Assert {actual} {method} {expect}" + + # 2. 获取对应的断言函数 + validator = cls.VALIDATORS.get(method) if not validator: - raise KeyError(f"Unsupported validator: {assert_type}") - for msg, (a, b) in cases.items(): - validator(a, b, msg) + logger.error(f"❌ 不支持的断言方式: {method}") + raise KeyError(f"Unsupported validator: {method}") + + # 3. 执行断言 + try: + validator(actual, expect, msg) + except AssertionError as e: + logger.error( + f"❌ 断言失败: {msg} | 实际值: {actual} ({type(actual)}), 期望值: {expect} ({type(expect)})") + raise e @CaseValidator.register('equals') def validate_equals(a, b, msg): - logger.info(f"assert {a} == {b}, {msg}执行这段代码") + logger.info(f"assert {a} == {b}, {msg} 执行这段代码") + print(f"assert {a} == {b}, {msg} 执行这段代码") assert a == b, msg @@ -64,17 +96,12 @@ def validate_not_contains(a, b, msg): if __name__ == '__main__': - mock_case = { - "validate": { - "equals": { - "判断相等": ["Success.", "Success."] - }, - "not_equals": { - "判断不相等": ["Success.", "Suc."] - } - } - } - + resp=None + mock_case = [ + {"check": 100, "expect": 100, "assert": "equals"}, + {"check": "success", "expect": "success", "assert": "contains"} +] + final_validate_list = VALIDATE_LIST_ADAPTER.validate_python(mock_case) case_validator = CaseValidator() print(case_validator.VALIDATORS) - case_validator.assert_all(mock_case.get("validate")) + case_validator.validate(resp,final_validate_list) diff --git a/utils/data_driver.py b/utils/data_driver.py index 00592f4..09265c7 100644 --- a/utils/data_driver.py +++ b/utils/data_driver.py @@ -11,8 +11,8 @@ """ from pathlib import Path -from commons.templates import Template -from commons.file_processors.file_handle import FileHandle +from core.templates import Template +from commons.file_processors.yaml_processor import YamlProcessor as FileHandle class DataDriver: @@ -36,7 +36,7 @@ class DataDriver: if __name__ == '__main__': - file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml") + file_path = Path(r"D:\CNWei\CNW\InterfaceAutoTest\test_cases\answer\test_1_status.yaml") file_obj = FileHandle(file_path) print(file_path.stem)