diff --git a/TestCases/test_1_user.json b/TestCases/test_1_user.json new file mode 100644 index 0000000..c0e12ba --- /dev/null +++ b/TestCases/test_1_user.json @@ -0,0 +1,65 @@ +{ + "epic": "项目名称:answer", + "feature": "页面状态", + "story": "状态", + "title": "查询状态信息", + "request": { + "method": "get", + "url": "/answer/api/v1/connector/info", + "headers": { + "Host": "119.91.19.171:40065", + "Accept-Language": "en_US", + "Accept": "application/json, text/plain, */*", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0", + "Referer": "http://119.91.19.171:40065/users/login", + "Accept-Encoding": "gzip, deflate" + } + }, + "extract": { + "msg": [ + "json", + "$.msg", + 0 + ] + }, + "validate": { + "equals": { + "状态码等于200": [ + "Success.", + "Success." + ] + } + }, + "parametrize": [ + [ + "title", + "username", + "password", + "msg" + ], + [ + "测试1", + "user1", + "pass1", + "200" + ], + [ + "测试2", + "user2", + "pass2", + "300" + ], + [ + "测试3", + "user3", + "pass3", + "200" + ], + [ + "测试4", + "user4", + "pass4", + "200" + ] + ] +} \ No newline at end of file diff --git a/commons/cases.py b/commons/cases.py index 460b495..ee7ee1f 100644 --- a/commons/cases.py +++ b/commons/cases.py @@ -16,7 +16,7 @@ import allure import pytest from commons import settings -from commons.file_processors.yaml_processor import YamlProcessor +from commons.file_processors.file_handle import FileHandle from commons.models import CaseInfo from commons.session import Session from commons.exchange import Exchange @@ -26,99 +26,96 @@ logger = logging.getLogger(__name__) session = Session(settings.base_url) -_case_path = Path(settings.case_path) +cases_dir = Path(settings.cases_dir) exchanger = Exchange(settings.exchanger) -@allure.epic("项目名称:answer") class TestAPI: @classmethod - def find_yaml_case(cls, case_path: Path = _case_path): + def find_test_cases(cls, case_dir: Path = cases_dir): """ 搜索和加载yaml文件 :return: """ - yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件 - for yaml_path in yaml_path_list: - logger.info(f"加载文件:{yaml_path}") + case_path_list = case_dir.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件 + for case_path in case_path_list: + logger.info(f"加载文件:{case_path}") - file = YamlProcessor(yaml_path) # 自动读取yaml文件 - - case_info = CaseInfo(**file) # 校验yaml格式 - - logger.info(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志 - # case_info = {yaml_path.stem:case_info} - # logger.info(f"case_info_dict={case_info}") - case_func = cls.new_case(yaml_path.stem, file) # 从yaml格式转换为pytest格式 - print(yaml_path.stem) - setattr(cls, f"{yaml_path.stem}", case_func) # 把pytest格式添加到类中 + file = FileHandle(case_path) # 自动读取yaml文件 + try: + CaseInfo(**file) # 校验用例格式 + logger.info(f"case_info:{FileHandle.to_string(file)}") # 把case_info 转成字符串,然后记录日志 + case_func = cls.new_case(case_path.stem, file) # 转换为pytest格式 + # print(case_path.stem) + setattr(cls, f"{case_path.stem}", case_func) # 把pytest格式添加到类中 + except Exception as e: + logger.error(e) @classmethod - def new_case(cls,file_name, case_info: dict): - case_data = data_driver.DataDriver().generate_cases(file_name,case_info) - # ddt_data = case_info.ddt() - # keys_list = ['test_1_user[0]', 'test_1_user[1]', 'test_1_user[2]', 'test_1_user[3]'] - # titles = ['查询用户信息', '查询用户信息', '查询用户信息', '查询用户信息'] - keys_list = [] - titles = [] - for item in case_data: - # 遍历列表中的每个字典 - for key, value in item.items(): - print(f"key:{key}") - keys_list.append(key) - print(f"value:{value}") - # # 遍历内层字典(这里内层字典其实只有一个键值对) - titles.append(value['title']) - print(f"测试数据:{case_data}") - # item={'test_1_user[0]': {'feature': '特征', 'story': '事件', 'title': '查询用户信息', 'request': {'method': 'get', 'url': 'http://119.91.19.171:40065/answer/api/v1/connector/info', 'headers': {'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh_CN', 'Content-Type': 'application/json', 'Cookie': 'psession=33c6c2de-7e5d-40e2-9bbc-3c637a690c3f; lang=zh-CN; 3x-ui=MTcyNjU2NDcwOHxEWDhFQVFMX2dBQUJFQUVRQUFCMV80QUFBUVp6ZEhKcGJtY01EQUFLVEU5SFNVNWZWVk5GVWhoNExYVnBMMlJoZEdGaVlYTmxMMjF2WkdWc0xsVnpaWExfZ1FNQkFRUlZjMlZ5QWYtQ0FBRUVBUUpKWkFFRUFBRUlWWE5sY201aGJXVUJEQUFCQ0ZCaGMzTjNiM0prQVF3QUFRdE1iMmRwYmxObFkzSmxkQUVNQUFBQUdQLUNGUUVDQVFkNGRXa3lNREkwQVFkNGRXa3lNREkwQUE9PXwLOhLRIDjzvQ3oI-UF-GhkMheEENkxRJ8GkAZ79eFHvg==', 'Host': '119.91.19.171:40065', 'Origin': 'http://119.91.19.171:40065', 'Referer': 'http://119.91.19.171:40065/users/login', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0'}}, 'extract': {'code': ['json', '$.code', 0], 'msg': ['json', '$.msg', 0]}, 'validate': {'equals': {'状态码等于200': [200, 'code1']}, 'not_equals': {'状态码不等于404': [404, 'code1']}, 'contains': {'包含关系': [404, 'code1']}, 'not_contains': {'不包含关系': [404, 'code1']}}, 'parametrize': [['title', 'username', 'password', 'code'], ['测试1', 'user1', 'pass1', 'code1'], ['测试2', 'user2', 'pass2', 'code2'], ['测试3', 'user3', 'pass3', 'code3'], ['测试4', 'user4', 'pass4', 'code4']]}} - # ddt_title = [data.title for data in ddt_data] - # logger.info(f"{ddt_title=}") - logger.info(f"keys_list={keys_list}") - logger.info(f"titles={titles}") - logger.info(f"feature={case_info.get('feature')}") - logger.info(f"story={case_info.get('story')}") - @allure.feature(case_info.get("feature")) - @allure.story(case_info.get("story")) - @pytest.mark.parametrize("case_key", keys_list, ids=titles) - def test_func(self, case_key): - logger.info(f"case_key={case_key}") - item.get(case_key) - logger.info(f"========:{item}") - logger.info(f"========:{item.get(case_key)}") - allure.dynamic.title(case_info.get("title")) + def new_case(cls, file_name, case_info: dict): + test_case = data_driver.DataDriver().generate_cases(file_name, case_info) - logger.info(f"用例开始执行:{case_info.get('title')}".center(80, "=")) + keys_list = list(test_case.keys()) + logger.info(f"keys_list:{keys_list}") + + values_list = list(test_case.values()) + logger.info(f"测试用例列表:{values_list}") + + driver_title = [i.get("title") for i in values_list] + logger.info(f"driver_title={driver_title}") + + epic = case_info["epic"] if case_info["epic"] else settings.allure_epic + logger.info(f"epic:{epic}") + + feature = case_info["feature"] if case_info["feature"] else settings.allure_feature + logger.info(f"feature:{feature}") + + story = case_info["story"] if case_info["story"] else settings.allure_story + logger.info(f"story:{story}") + + @allure.epic(epic) + @allure.feature(feature) + @allure.story(story) + @pytest.mark.parametrize("case_key", keys_list, ids=driver_title) + def test_func(self, case_key): + logger.info(f"case_key:{case_key}") + + test_case_mapping = test_case.get(case_key) + logger.info(f"测试用例:{test_case_mapping}") + + allure.dynamic.title(test_case_mapping.get("title")) + + logger.info(f"用例开始执行:{test_case_mapping.get('title')}".center(80, "=")) # 0,变量替换 - new_case_info = exchanger.replace(case_info) + new_case_info = exchanger.replace(test_case_mapping) logger.info(f"1,正在注入变量...") - logger.info(f"new_case_info={new_case_info}") + logger.info(f"new_case_info:{new_case_info}") # 1,发送请求 logger.info(f"2,正在请求接口...") resp = session.request(**new_case_info.get("request")) logger.info(f"3,正在提取变量...") # 2,保存变量(接口关联) - # new_case_info = CaseInfo(**new_case_info) for var_name, extract_info in new_case_info.get("extract").items(): logger.info(f"保存变量:{var_name}{extract_info}") exchanger.extract(resp, var_name, *extract_info) # 3,断言 logger.info(f"4,正在断言...") - assert_case_info = exchanger.replace(case_info) # 为断言加载变量 - # logger.info(f"替换变量后:{assert_case_info}") + assert_case_info = exchanger.replace(test_case_mapping) # 为断言加载变量 + logger.info(f"替换变量后:{assert_case_info}") # assert_case_info.assert_all() # 执行断言 _validator = case_validator.CaseValidator() _validator.assert_all(assert_case_info.get("validate")) - logger.info(f"用例执行结束:{case_info.get('title')}".center(80, "=")) + logger.info(f"用例执行结束:{test_case_mapping.get('title')}".center(80, "=")) return test_func # TestAPI.find_yaml_case() if __name__ == '__main__': - TestAPI.find_yaml_case() + TestAPI.find_test_cases() # print(TestAPI.__dict__) diff --git a/commons/exchange.py b/commons/exchange.py index 51915ab..eb5dde7 100644 --- a/commons/exchange.py +++ b/commons/exchange.py @@ -13,21 +13,19 @@ import copy import json import logging import re +import jsonpath import allure from commons.templates import Template -import jsonpath - -from commons.file_processors.yaml_processor import YamlProcessor, StringOrDict -from commons.models import CaseInfo +from commons.file_processors.file_handle import FileHandle logger = logging.getLogger(__name__) class Exchange: def __init__(self, path): - self.file = YamlProcessor(path) + self.file = FileHandle(path) @allure.step("提取变量") def extract(self, resp, var_name, attr, expr: str, index): @@ -59,29 +57,17 @@ class Exchange: self.file[var_name] = value # 保存变量 self.file.save() # 持久化存储到文件 - # @allure.step("替换变量") - # def replace(self, case_info: CaseInfo): - # logger.info(case_info) - # # 1,将case_info转换为字符串 - # case_info_str = case_info.to_yaml() - # print(f"{case_info_str=}") - # # 2,替换字符串 - # case_info_str = Template(case_info_str).render(self.file) - # print(f"{case_info_str=}") - # # 3,将字符串转换成case_info - # new_case_info = case_info.by_yaml(case_info_str) - # return new_case_info @allure.step("替换变量") def replace(self, case_info: dict) -> dict: - logger.info(case_info) + logger.info(f"变量替换:{case_info}") # 1,将case_info转换为字符串 - case_info_str = YamlProcessor.to_string(case_info) + case_info_str = FileHandle.to_string(case_info) print(f"{case_info_str=}") # 2,替换字符串 case_info_str = Template(case_info_str).render(self.file) print(f"{case_info_str=}") # 3,将字符串转换成case_info - new_case_info = YamlProcessor.to_dict(case_info_str) + new_case_info = FileHandle.to_dict(case_info_str) return new_case_info @@ -97,7 +83,7 @@ if __name__ == '__main__': # print(mock_resp.text) # print(mock_resp.json()) - exchanger = Exchange(r"D:\CNWei\CNW\InterfaceAutoTest\extract.yaml") + exchanger = Exchange(r"E:\PyP\InterfaceAutoTest\extract.yaml") exchanger.extract(mock_resp, "name", "json", '$.name', 0) exchanger.extract(mock_resp, "age", "json", '$.age', 0) exchanger.extract(mock_resp, "data", "json", '$.data', 0) diff --git a/commons/file_processors/file_handle.py b/commons/file_processors/file_handle.py index 03d6e27..e311b58 100644 --- a/commons/file_processors/file_handle.py +++ b/commons/file_processors/file_handle.py @@ -9,38 +9,33 @@ @date: 2025/3/7 09:31 @desc: """ -from pathlib import Path -from typing import Union -from yaml_processor import YamlProcessor -from json_processor import JsonProcessor +from commons.file_processors.yaml_processor import YamlProcessor +from commons.file_processors.json_processor import JsonProcessor -class FileHandle: - def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): - # self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 - # self.data: Union[dict, None] = data - self.processor = get_processor(filepath, data) +processors = { + 'yaml': YamlProcessor, + 'yml': YamlProcessor, + 'json': JsonProcessor, - def load(self) -> None: - self.processor.load() - - def to_string(self) -> str: - return self.processor.to_string() - - def to_dict(self, data: str) -> None: - self.processor.to_dict(data) - - def save(self, new_filepath: Union[str, Path, None] = None): - self.processor.save(new_filepath) +} -def get_processor(filepath, data): - ext = Path(filepath).suffix.lower()[1:] # 获取后缀名,如 'json' - processors = { - 'yaml': YamlProcessor, - 'yml': YamlProcessor, - 'json': JsonProcessor, - - } +def get_processor(ext): agent_model = processors.get(ext, YamlProcessor) # 代理模式 - return agent_model(filepath, data) # 默认回退到 Yaml + + return agent_model # 默认回退到 Yaml + + +FileHandle = get_processor("yaml") + +if __name__ == '__main__': + # 示例用法 + yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 + yaml_file = FileHandle(yaml_path) + print(yaml_file) + print(type(yaml_file)) + file_string = FileHandle.to_string(yaml_file) + print(file_string) + file_dict = FileHandle.to_dict(file_string) + print(file_dict) diff --git a/commons/file_processors/json_processor.py b/commons/file_processors/json_processor.py index fceae06..521d89b 100644 --- a/commons/file_processors/json_processor.py +++ b/commons/file_processors/json_processor.py @@ -11,9 +11,8 @@ """ import logging from typing import Union -from dataclasses import dataclass, asdict, field from pathlib import Path -import yaml +import json from commons.file_processors.base import BaseFileProcessor logger = logging.getLogger(__name__) @@ -51,16 +50,17 @@ class JsonProcessor(BaseFileProcessor, dict): if self.filepath.exists(): try: with open(self.filepath, "r", encoding="utf-8") as f: - loaded_data = yaml.safe_load(f) or {} + loaded_data = json.load(f) or {} self.update(loaded_data) # 使用加载的数据更新字典 - except yaml.YAMLError as e: + except json.JSONDecodeError as e: logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") # 保持字典为空 (已在开头 clear) else: logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") # 保持字典为空 (已在开头 clear) - def to_string(self) -> str: + @staticmethod + def to_string(data: dict) -> str: """ 将字典 (自身) 转换为 YAML 格式的字符串。 @@ -68,17 +68,18 @@ class JsonProcessor(BaseFileProcessor, dict): YAML 格式的字符串。 """ try: - return yaml.safe_dump( - dict(self), # 使用dict转换为标准的字典 - allow_unicode=True, - sort_keys=False, - default_flow_style=False + return json.dumps( + dict(data), # 使用dict转换为标准的字典 + ensure_ascii=False, # 允许非ASCII字符 + # indent=4, # 美化输出,缩进4个空格 + sort_keys=False # 不排序键 ) except TypeError as e: - logger.error(f"将数据转换为 YAML 字符串时出错: {e}") + logger.error(f"将数据转换为 JSON 字符串时出错: {e}") return "" - def to_dict(self, data: str) -> None: + @staticmethod + def to_dict(data: str) -> None: """ 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. @@ -86,12 +87,10 @@ class JsonProcessor(BaseFileProcessor, dict): data: YAML 格式的字符串。 """ try: - loaded_data = yaml.safe_load(data) or {} - self.clear() - self.update(loaded_data) # 清空并更新 - except yaml.YAMLError as e: - logger.error(f"将 YAML 字符串转换为字典时出错: {e}") - self.clear() # 出错时也清空 + loaded_data = json.loads(data) or {} + return loaded_data + except json.JSONDecodeError as e: + logger.error(f"将 JSON 字符串转换为字典时出错: {e}") def save(self, new_filepath: Union[str, Path, None] = None): """ @@ -104,108 +103,24 @@ class JsonProcessor(BaseFileProcessor, dict): try: with open(filepath, "w", encoding="utf-8") as f: - yaml.safe_dump( + json.dump( dict(self), # 使用dict转换为标准的字典 - stream=f, - allow_unicode=True, - sort_keys=False, - default_flow_style=False + f, + ensure_ascii=False, # 允许非ASCII字符 + indent=4, # 美化输出,缩进4个空格 + sort_keys=False # 不排序键 ) except (TypeError, OSError) as e: - logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") - - -class StringOrDict: - @classmethod - def to_string(cls, data: dict) -> str: - """ - 将字典 (自身) 转换为 YAML 格式的字符串。 - - Returns: - YAML 格式的字符串。 - """ - try: - return yaml.safe_dump( - dict(data), # 使用dict转换为标准的字典 - allow_unicode=True, - sort_keys=False, - default_flow_style=False - ) - except TypeError as e: - logger.error(f"将数据转换为 YAML 字符串时出错: {e}") - return "" - - @classmethod - def to_dict(cls, data: str) -> Union[None, dict]: - """ - 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. - - Args: - data: YAML 格式的字符串。 - """ - try: - loaded_data = yaml.safe_load(data) or {} - return loaded_data - except yaml.YAMLError as e: - logger.error(f"将 YAML 字符串转换为字典时出错: {e}") + logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}") if __name__ == '__main__': # 示例用法 - yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' # 你的 YAML 文件路径 - yaml_file = YamlFile(yaml_path) - print(yaml_file) - print(type(yaml_file)) - - # # 直接像字典一样访问数据 - # print("加载的数据:", yaml_file) # 直接打印对象,就是打印字典内容 - # print("title:", yaml_file.get("title")) # 使用 get 方法 - # if "title" in yaml_file: # 使用 in 检查键 - # print("原始title:", yaml_file["title"]) # 使用方括号访问 - # yaml_file["title"] = "新的标题" # 使用方括号修改 - # print("修改后的title:", yaml_file["title"]) - # # - # yaml_file["new_key"] = "new_value" # 添加新的键值对 - # - # # 将字典转换为 YAML 字符串 - # yaml_string = yaml_file.to_string() - # print("\nYAML 字符串:", yaml_string) - # # - # # 将 YAML 字符串转换回字典 (并更新 yaml_file) - # yaml_file.to_dict(yaml_string) - # print("\n从字符串加载的数据:", yaml_file) - # - # # 保存修改后的数据 (覆盖原文件) - # yaml_file.save() - # - # # 保存到新文件 - # new_yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user_new.yaml' - # yaml_file.save(new_filepath=new_yaml_path) - - # 测试从字符串初始化 - # yaml_string2 = """ - # name: Test User - # age: 30 - # """ - - # yaml_file2 = YamlFile("test2.yaml", data=yaml.safe_load(yaml_string2)) # 从字符串初始化 - # print("\n从字符串初始化的 YamlFile:", yaml_file2) - # yaml_file2.save() # 保存到 test2.yaml - # - # 测试文件不存在的情形 - # non_existent_file = YamlFile("non_existent_file.yaml") - # print("\n加载不存在的文件:", non_existent_file) # 应该打印空字典 {} - # non_existent_file['a'] = 1 # 可以直接添加 - # print("\n加载不存在的文件:", non_existent_file) - -# if __name__ == '__main__': -# from commons.models import CaseInfo -# -# yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' -# yaml_file = YamlFile(yaml_path) -# print(yaml_file.load()) -# # yaml_file.load() -# # case_info = CaseInfo(**yaml_file) -# # print(case_info) -# # yaml_file["title"] = "查询用户信息" -# # yaml_file.save() + json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径 + json_file = JsonProcessor(json_path) + print(json_file) + print(type(json_file)) + json_string = JsonProcessor.to_string(json_file) + JsonProcessor.to_dict(json_string) + print(json_string) + json_file.save() diff --git a/commons/file_processors/yaml_processor.py b/commons/file_processors/yaml_processor.py index f9b3166..8dcb80a 100644 --- a/commons/file_processors/yaml_processor.py +++ b/commons/file_processors/yaml_processor.py @@ -60,38 +60,6 @@ class YamlProcessor(BaseFileProcessor, dict): logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") # 保持字典为空 (已在开头 clear) - # def to_string(self) -> str: - # """ - # 将字典 (自身) 转换为 YAML 格式的字符串。 - # - # Returns: - # YAML 格式的字符串。 - # """ - # try: - # return yaml.safe_dump( - # dict(self), # 使用dict转换为标准的字典 - # allow_unicode=True, - # sort_keys=False, - # default_flow_style=False - # ) - # except TypeError as e: - # logger.error(f"将数据转换为 YAML 字符串时出错: {e}") - # return "" - - # def to_dict(self, data: str) -> None: - # """ - # 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. - # - # Args: - # data: YAML 格式的字符串。 - # """ - # try: - # loaded_data = yaml.safe_load(data) or {} - # self.clear() - # self.update(loaded_data) # 清空并更新 - # except yaml.YAMLError as e: - # logger.error(f"将 YAML 字符串转换为字典时出错: {e}") - # self.clear() # 出错时也清空 @staticmethod def to_string(data: dict) -> str: """ @@ -147,48 +115,12 @@ class YamlProcessor(BaseFileProcessor, dict): logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") -class StringOrDict: - # @classmethod - # def to_string(cls, data: dict) -> str: - @staticmethod - def to_string(data: dict) -> str: - """ - 将字典 (自身) 转换为 YAML 格式的字符串。 - Returns: - YAML 格式的字符串。 - """ - try: - return yaml.safe_dump( - dict(data), # 使用dict转换为标准的字典 - allow_unicode=True, - sort_keys=False, - default_flow_style=False - ) - except TypeError as e: - logger.error(f"将数据转换为 YAML 字符串时出错: {e}") - return "" - - # @classmethod - # def to_dict(cls, data: str) -> Union[None, dict]: - @staticmethod - def to_dict(data: str) -> Union[None, dict]: - """ - 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. - - Args: - data: YAML 格式的字符串。 - """ - try: - loaded_data = yaml.safe_load(data) or {} - return loaded_data - except yaml.YAMLError as e: - logger.error(f"将 YAML 字符串转换为字典时出错: {e}") if __name__ == '__main__': # 示例用法 - yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' # 你的 YAML 文件路径 + yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 yaml_file = YamlProcessor(yaml_path) print(yaml_file) print(type(yaml_file)) @@ -234,14 +166,3 @@ if __name__ == '__main__': # non_existent_file['a'] = 1 # 可以直接添加 # print("\n加载不存在的文件:", non_existent_file) -# if __name__ == '__main__': -# from commons.models import CaseInfo -# -# yaml_path = r'D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml' -# yaml_file = YamlFile(yaml_path) -# print(yaml_file.load()) -# # yaml_file.load() -# # case_info = CaseInfo(**yaml_file) -# # print(case_info) -# # yaml_file["title"] = "查询用户信息" -# # yaml_file.save() diff --git a/commons/funcs.py b/commons/funcs.py index 13924be..9a98b55 100644 --- a/commons/funcs.py +++ b/commons/funcs.py @@ -15,9 +15,9 @@ import time import urllib.parse import hashlib -# from commons.databases import db +from commons.databases import db -from commons.files import YamlFile +from commons.file_processors.file_handle import FileHandle from commons import settings logger = logging.getLogger(__name__) @@ -66,7 +66,7 @@ def sql(s: str) -> str: @Funcs.register("new_id") def new_id(): # 自增,永不重复 - id_file = YamlFile(settings.id_path) + id_file = FileHandle(settings.id_path) id_file["id"] += 1 id_file.save() @@ -76,7 +76,7 @@ def new_id(): def last_id() -> str: # 不自增,只返回结果 - id_file = YamlFile(settings.id_path) + id_file = FileHandle(settings.id_path) return id_file["id"] @Funcs.register("md5") diff --git a/commons/models.py b/commons/models.py index 0cdf0f9..c697f5f 100644 --- a/commons/models.py +++ b/commons/models.py @@ -10,14 +10,11 @@ @desc: 声明yaml用例格式 """ import logging -from dataclasses import dataclass, asdict, field +from dataclasses import dataclass, field -import allure import yaml -from commons.templates import Template from commons import settings -from utils import case_validator logger = logging.getLogger(__name__) @@ -33,79 +30,10 @@ class CaseInfo: feature: str = settings.allure_feature story: str = settings.allure_story - def to_yaml(self) -> str: - # 序列化成yaml字符串 - yaml_str = yaml.safe_dump( - asdict(self), - allow_unicode=True, # allow_unicode:使用unicode编码正常显示中文 - sort_keys=False) - return yaml_str - - @classmethod - def by_yaml(cls, yaml_str): - # 反序列化 - obj = cls(**yaml.safe_load(yaml_str)) - return obj - - # @allure.step("断言") - # def assert_all(self): - # _validator = case_validator.CaseValidator() - # # print(case_validator.VALIDATORS) - # - # - # - # _validator.assert_all(self.validate) - # for assert_type, assert_value in self.validate.items(): - # for msg, data in assert_value.items(): - # a, b = data[0], data[1] - # # print(assert_type, a, b, msg) - # match assert_type: - # case 'equals': - # logger.info(f"assert {a} == {b}, {msg}") - # assert a == b, msg - # case 'not_equals': - # logger.info(f"assert {a} != {b}, {msg}") - # assert a != b, msg - # case 'contains': - # logger.info(f"assert {a} in {b}, {msg}") - # assert a in b, msg - # case 'not_contains': - # logger.info(f"assert {a} not in {b}, {msg}") - # assert a not in b, msg - # case "xxxxx - - # def ddt(self) -> list: # 返回一个列表,列表中应该包含N个注入了变量的caseInfo - # case_list = [] - # if not self.parametrize: # 没有使用数据驱动测试 - # logger.info("1,执行这一步") - # # case_info_str = self.to_yaml() # 转字符串 - # # case_info_str = Template(case_info_str).render(d) # 输入变量 - # # case_info = self.by_yaml(case_info_str) # 转成类 - # # case_list.append(case_info) - # case_list.append(self) - # else: # 使用数据驱动测试 - # args_name = self.parametrize[0] - # args_value_list = self.parametrize[1:] - # for args_value in args_value_list: - # d = dict(zip(args_name, args_value)) - # print(f"D的值:{d}") - # # d 就是数据驱动测试的变量,应输入到用例中 - # case_info_str = self.to_yaml() # 转字符串 - # case_info_str = Template(case_info_str).render(d) # 输入变量 - # case_info = self.by_yaml(case_info_str) # 转成类 - # - # case_list.append(case_info) # 加入到返回值 - # return case_list - if __name__ == '__main__': with open(r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml', encoding='utf-8') as f: data = yaml.safe_load(f) # print(data) case_info = CaseInfo(**data) - s = case_info.to_yaml() - # print(s) - new_case_info = case_info.by_yaml(s) - # print(new_case_info) - ddt_ddt = case_info.ddt() - print(ddt_ddt) + diff --git a/commons/session.py b/commons/session.py index 4b326b5..9779eb2 100644 --- a/commons/session.py +++ b/commons/session.py @@ -9,14 +9,16 @@ @date: 2024 2024/9/12 21:56 @desc: """ -from urllib.parse import urljoin import logging +from urllib.parse import urljoin + import requests -import allure from requests import Response, PreparedRequest +import allure -logger = logging.getLogger("requests.session") +# logger = logging.getLogger("requests.session") +logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) diff --git a/commons/settings.py b/commons/settings.py index c6d1883..5b285d7 100644 --- a/commons/settings.py +++ b/commons/settings.py @@ -14,7 +14,7 @@ from pathlib import Path root_path = (Path(__file__)).resolve().parents[1] base_url = 'http://119.91.19.171:40065' -case_path = rf"{root_path}\TestCases\answer" +cases_dir = rf"{root_path}\TestCases\answer" exchanger = rf"{root_path}\extract.yaml" id_path = rf"{root_path}\id.yaml" diff --git a/commons/templates.py b/commons/templates.py index dea30d6..bf6fa67 100644 --- a/commons/templates.py +++ b/commons/templates.py @@ -13,7 +13,6 @@ import copy import logging import re import string -import inspect from commons.funcs import Funcs @@ -26,12 +25,6 @@ class Template(string.Template): 2,参数也可以是变量 """ - # FUNC_MAPPING = { - # "int": int, - # "float": float, - # "bool": bool - # } # 内置函数有的,直接放入mapping;内置函数没有的,在funcs中定义,自动放入mapping - call_pattern = re.compile(r"\${(?P.*?)\((?P.*?)\)}") def render(self, mapping: dict) -> str: @@ -68,28 +61,3 @@ class Template(string.Template): return self.call_pattern.sub(convert, template) - -# def hot_load(): -# from commons import funcs -# -# for func_name in dir(funcs): # 遍历模块中的所有函数 -# if func_name.startswith("_"): -# continue -# func_code = getattr(funcs, func_name) # 取到函数对象 -# # print(func_code) -# if callable(func_code): # 如果是一个可以调用的函数 -# Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中 -# print(Template.FUNC_MAPPING) -# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数 -# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中 -# # print(Template.FUNC_MAPPING) - -# def hot_load(): -# from commons.funcs import Funcs -# -# print(Funcs.FUNC_MAPPING) -# # if inspect.isfunction(func_code): # 如果是一个可以调用的函数 -# # Template.FUNC_MAPPING[func_name] = func_code # 函数放到Template中 -# # print(Template.FUNC_MAPPING) -# -# hot_load() diff --git a/main.py b/main.py index 6edf32b..b5310e0 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import pytest from commons.cases import TestAPI -TestAPI.find_yaml_case() # 加载yaml文件 +TestAPI.find_test_cases() # 加载yaml文件 if __name__ == '__main__': now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') diff --git a/pyproject.toml b/pyproject.toml index a2f6a83..e915c26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ jsonpath = "^0.82.2" pymysql = "^1.1.1" pytest-result-log = "^1.2.2" allure-pytest = "^2.13.5" +cryptography = "^44.0.2" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/utils/case_validator.py b/utils/case_validator.py index 7ad6389..67c76b8 100644 --- a/utils/case_validator.py +++ b/utils/case_validator.py @@ -77,4 +77,4 @@ if __name__ == '__main__': case_validator = CaseValidator() print(case_validator.VALIDATORS) - # case_validator.assert_all(mock_case.get("validate")) + case_validator.assert_all(mock_case.get("validate")) diff --git a/utils/data_driver.py b/utils/data_driver.py index c0d1a20..00592f4 100644 --- a/utils/data_driver.py +++ b/utils/data_driver.py @@ -11,39 +11,36 @@ """ from pathlib import Path -from commons.models import CaseInfo from commons.templates import Template -from commons.file_processors.yaml_processor import StringOrDict +from commons.file_processors.file_handle import FileHandle class DataDriver: @staticmethod - def generate_cases(file_name, case_info) -> list: + def generate_cases(file_name, case_info) -> dict: if not case_info.get("parametrize"): return {file_name + "[--]": case_info} - # cases = [] + cases = {} args_names = case_info.get("parametrize")[0] for i, args_values in enumerate(case_info.get("parametrize")[1:]): # print(args_values) context = dict(zip(args_names, args_values)) # print(context) - # rendered = Template(CaseInfo(**case_info).to_yaml()).render(context) - rendered = Template(StringOrDict.to_string(case_info)).render(context) - # cases.append({file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)}) - yield {file_name + "[" + str(i) + "]": StringOrDict.to_dict(rendered)} - # return cases + rendered = Template(FileHandle.to_string(case_info)).render(context) + cases.update({file_name + "[" + str(i) + "]": FileHandle.to_dict(rendered)}) + + return cases if __name__ == '__main__': - from commons.file_processors.yaml_processor import YamlProcessor - file_path = Path(r"D:\CNWei\CNW\InterfaceAutoTest\TestCases\test_1_user.yaml") + file_path = Path(r"E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml") - file_obj = YamlProcessor(file_path) + file_obj = FileHandle(file_path) print(file_path.stem) - file_name = file_path.stem + file_name_ = file_path.stem # mock_case_info = { # "case_info0": { # "feature": "页面状态", @@ -78,24 +75,17 @@ if __name__ == '__main__': dd = DataDriver() # cases = dd.generate_cases(mock_case_info.get("case_info0")) - cases = dd.generate_cases(file_name, file_obj) - print(cases) - # print(len(cases)) - keys_list = [] - titles = [] - for item in cases: - # print(item) - # 遍历列表中的每个字典 - for key, value in item.items(): - # print(f"key:{key}") - keys_list.append(key) - # print(f"value:{value}") - # # 遍历内层字典(这里内层字典其实只有一个键值对) - titles.append(value['title']) - # print(item) + cases_ = dd.generate_cases(file_name_, file_obj) + print(cases_) + case_keys = list(cases_.keys()) + case_values = cases_.values() - print(keys_list) - print(titles) + print(case_keys) + print(case_values) + aa = [i.get("title") for i in case_values] + print(aa) + # print(list(case_values)[0]["feature"]) + print(file_obj["feature"]) + # print(list(case_values)[0]["story"]) + print(file_obj["story"]) - # ddt_title = [data.title for data in ddt_data] - # logger.info(f"{ddt_title=}")