feat(session): 项目基本完成

- 新增热加载模块funcs.py
- 新增文件加载模块files.py
- 新增了日志打印
- 新增其他功能
This commit is contained in:
2025-02-23 22:46:33 +08:00
parent 129c845bd8
commit 913bb3f396
25 changed files with 989 additions and 2 deletions

11
commons/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: __init__.py.py
@date: 2024 2024/9/15 21:13
@desc:
"""

96
commons/cases.py Normal file
View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: cases.py
@date: 2024 2024/9/16 9:57
@desc: 动态生成用例
"""
from pathlib import Path
import logging
import allure
import pytest
from commons.files import YamlFile
from commons.models import CaseInfo
from commons.session import Session
from commons.exchange import Exchange
from commons import settings
logger = logging.getLogger(__name__)
session = Session(settings.base_url)
_case_path = Path(settings.case_path)
exchanger = Exchange(settings.exchanger)
@allure.epic("项目名称answer")
class TestAPI:
...
@classmethod
def find_yaml_case(cls, case_path: Path = _case_path):
"""
搜索和加载yaml文件
:return:
"""
yaml_path_list = case_path.glob("**/test_*.yaml") # 搜索当前目录及其子目录下以test_开头yaml为后缀的文件
for yaml_path in yaml_path_list:
logger.info(f"load file {yaml_path=}")
file = YamlFile(yaml_path) # 自动读取yaml文件
case_info = CaseInfo(**file) # 校验yaml格式
logger.debug(f"case_info={case_info.to_yaml()}") # 把case_info 转成字符串,然后记录日志
case_func = cls.new_case(case_info) # 从yaml格式转换为pytest格式
print(yaml_path.name)
setattr(cls, f"{yaml_path.name}", case_func) # 把pytest格式添加到类中
@classmethod
def new_case(cls, case_info: CaseInfo):
ddt_data = case_info.ddt()
print(ddt_data)
ddt_title = [data.title for data in ddt_data]
@allure.feature(case_info.feature)
@allure.story(case_info.story)
@pytest.mark.parametrize("case_info", ddt_data, ids=ddt_title)
def test_func(self, case_info: CaseInfo):
allure.dynamic.title(case_info.title)
logger.info(f"用例开始执行:{case_info.title}".center(80, "="))
# 0变量替换
new_case_info = exchanger.replace(case_info)
logger.info(f"1正在注入变量...")
# 1发送请求
logger.info(f"2正在请求接口...")
resp = session.request(**new_case_info.request)
logger.info(f"3正在提取变量...")
# 2保存变量(接口关联)
for var_name, extract_info in new_case_info.extract.items():
print(var_name, extract_info)
exchanger.extract(resp, var_name, *extract_info)
# 3断言
logger.info(f"4正在断言...")
assert_case_info = exchanger.replace(case_info) # 为断言加载变量
print(assert_case_info)
assert_case_info.assert_all() # 执行断言
logger.info(f"用例执行结束:{case_info.title}".center(80, "="))
return test_func
# TestAPI.find_yaml_case()
if __name__ == '__main__':
TestAPI.find_yaml_case()
# print(TestAPI.__dict__)

45
commons/databases.py Normal file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: databases
@date: 2025/2/16 20:53
@desc:
"""
import logging
import pymysql as MySQLdb
from commons import settings
logger = logging.getLogger(__name__)
class DBServer:
def __init__(self, host, port, user, password, database):
self.db = MySQLdb.connect(host=host, port=port, user=user, password=password, database=database)
self.cursor = self.db.cursor() # 创建新的会话
def execute_sql(self, sql):
logger.info(f"执行sql{sql}")
self.cursor.execute(sql) # 执行sql命令
# res = self.cursor.fetchone() # 返回单行结果
res = self.cursor.fetchall() # 返回多行结果
return res
db = DBServer(
host=settings.db_host, # ip
port=3306, # 端口
user='root', # 用户名
password='mysql_hNahSe', # 密码
database='answer' # 库名
)
if __name__ == '__main__':
...
res = db.execute_sql('select username from user where id=1;')
print(res[0])

100
commons/exchange.py Normal file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: exchange.py
@date: 2024 2024/9/18 21:58
@desc:
"""
import copy
import json
import logging
import re
import allure
from commons.templates import Template
import jsonpath
from commons.files import YamlFile
from commons.models import CaseInfo
logger = logging.getLogger(__name__)
class Exchange:
def __init__(self, path):
self.file = YamlFile(path)
@allure.step("提取变量")
def extract(self, resp, var_name, attr, expr: str, index):
# resp中json是方法不是属性需要手动更改为属性
resp = copy.deepcopy(resp)
try:
resp.json = resp.json()
except json.decoder.JSONDecodeError:
resp.json = {"msg": "is not json data"}
data = getattr(resp, attr)
# print(data)
if expr.startswith("/"): # xpath
res = None
elif expr.startswith("$"): # jsonpath
data = dict(data)
res = jsonpath.jsonpath(data, expr)
else: # 正则
res = re.findall(expr, str(data))
# print(res)
if res: # 如果有数据
value = res[index]
else: # 如果没有数据
value = "not data"
logger.debug(f"{var_name} = {value}") # 记录变量名和变量值
self.file[var_name] = value # 保存变量
self.file.save() # 持久化存储到文件
@allure.step("替换变量")
def replace(self, case_info: CaseInfo):
...
# 1将case_info转换为字符串
case_info_str = case_info.to_yaml()
# 2替换字符串
case_info_str = Template(case_info_str).render(self.file)
# 3将字符串转换成case_info
new_case_info = case_info.by_yaml(case_info_str)
return new_case_info
if __name__ == '__main__':
class MockResponse:
text = '{"name":"张三","age":"18","data":[3,4,5],"aaa":null}'
def json(self):
return json.loads(self.text)
mock_resp = MockResponse()
# print(mock_resp.text)
# print(mock_resp.json())
exchanger = Exchange(r"E:\PyP\InterfaceAutoTest\extract.yaml")
exchanger.extract(mock_resp, "name", "json", '$.name', 0)
exchanger.extract(mock_resp, "age", "json", '$.age', 0)
exchanger.extract(mock_resp, "data", "json", '$.data', 0)
exchanger.extract(mock_resp, "aaa", "json", '$.aaa', 0)
case_info = CaseInfo(
title="单元测试",
request={
"data":
{"name": "${name}", "age": "${str(age)}", "time": "${add(1,2)}"}
},
extract={},
validate={}
)
new_case_info = exchanger.replace(case_info)
print(new_case_info)

46
commons/files.py Normal file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: files.py
@date: 2024 2024/9/15 21:28
@desc: 读取和保存yaml文件
"""
import logging
import yaml
from commons.models import CaseInfo
logger = logging.getLogger(__name__)
class YamlFile(dict):
def __init__(self, path):
super().__init__()
self.path = path
self.load()
def load(self):
with open(self.path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) # 字典
if data:
self.update(data) # 把两个字段的内容合并
def save(self):
with open(self.path, "w", encoding="utf-8") as f:
yaml.safe_dump(
dict(self),
stream=f,
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False) # sort_keys保持原有排序
if __name__ == '__main__':
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml'
yaml_file = YamlFile(yaml_path)
# yaml_file.load()
case_info = CaseInfo(**yaml_file)
yaml_file["title"] = "查询用户信息"
yaml_file.save()

104
commons/funcs.py Normal file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: funcs.py
@date: 2024 2024/9/22 22:46
@desc:
"""
import base64
import logging
import time
import urllib.parse
import hashlib
from commons.databases import db
# from commons.files import YamlFile
from commons import settings
logger = logging.getLogger(__name__)
def url_unquote(s: str) -> str:
return urllib.parse.unquote(s)
def time_str() -> str:
return str(time.time())
def add(a, b):
return str(int(a) + int(b))
def sql(s: str) -> str:
res = db.execute_sql(s)
return res[0][0]
def new_id():
# 自增,永不重复
id_file = YamlFile(settings.id_path)
id_file["id"] += 1
id_file.save()
return id_file["id"]
def last_id() -> str:
# 不自增,只返回结果
id_file = YamlFile("id.yaml")
return id_file["id"]
def md5(content: str) -> str:
# 1原文转为字节
content = content.encode("utf-8")
result = hashlib.md5(content).hexdigest()
return result
def base64_encode(content: str) -> str:
# 1原文转二进制
content = content.encode("utf-8")
# 2base64编码二进制
encode_value = base64.b64encode(content)
# 3转为字符串
encode_str = encode_value.decode("utf-8")
return encode_str
def base64_decode(content: str) -> str:
# 1原文转二进制
content = content.encode("utf-8")
# 2base64解码二进制
decode_value = base64.b64decode(content)
# 3转为字符串
decode_str = decode_value.decode("utf-8")
return decode_str
def rsa_encode(content: str) -> str:
...
def rsa_decode(content: str) -> str:
...
if __name__ == '__main__':
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
# print(res)
a = "这是中文dddddd"
bb = base64_encode(a)
print(bb)
cc = base64_decode(bb)
print(cc)

98
commons/models.py Normal file
View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: models.py
@date: 2024 2024/9/15 21:14
@desc: 声明yaml用例格式
"""
import logging
from dataclasses import dataclass, asdict
import allure
import yaml
from commons.templates import Template
from commons import settings
logger = logging.getLogger(__name__)
@dataclass
class CaseInfo:
title: str
request: dict
extract: dict
validate: dict
parametrize: list = ""
epic: str = settings.allure_epic
feature: str = settings.allure_feature
story: str = settings.allure_story
def to_yaml(self) -> str:
# 序列化成yaml字符串
yaml_str = yaml.safe_dump(
asdict(self),
allow_unicode=True, # allow_unicode使用unicode编码正常显示中文
sort_keys=False)
return yaml_str
@classmethod
def by_yaml(cls, yaml_str):
# 反序列化
obj = cls(**yaml.safe_load(yaml_str))
return obj
@allure.step("断言")
def assert_all(self):
if not self.validate:
return
for assert_type, assert_value in self.validate.items():
for msg, data in assert_value.items():
a, b = data[0], data[1]
# print(assert_type, a, b, msg)
match assert_type:
case 'equals':
logger.info(f"assert {a} == {b}, {msg}")
assert a == b, msg
case 'not_equals':
logger.info(f"assert {a} != {b}, {msg}")
assert a != b, msg
case 'contains':
logger.info(f"assert {a} in {b}, {msg}")
assert a in b, msg
case 'not_contains':
logger.info(f"assert {a} not in {b}, {msg}")
assert a not in b, msg
# case "xxxxx
def ddt(self) -> list: # 返回一个列表列表中应该包含N个注入了变量的caseInfo
case_list = []
if not self.parametrize: # 没有使用数据驱动测试
case_list.append('')
else: # 使用数据驱动测试
args_name = self.parametrize[0]
args_value_list = self.parametrize[1:]
for args_value in args_value_list:
d = dict(zip(args_name, args_value))
# d 就是数据驱动测试的变量,应输入到用例中
case_info_str = self.to_yaml() # 转字符串
case_info_str = Template(case_info_str).render(d) # 输入变量
case_info = self.by_yaml(case_info_str) # 转成类
case_list.append(case_info) # 加入到返回值
return case_list
if __name__ == '__main__':
with open(r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.yaml', encoding='utf-8') as f:
data = yaml.safe_load(f)
# print(data)
case_info = CaseInfo(**data)
s = case_info.to_yaml()
print(s)
new_case_info = case_info.by_yaml(s)
print(new_case_info)

50
commons/session.py Normal file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: session.py
@date: 2024 2024/9/12 21:56
@desc:
"""
from urllib.parse import urljoin
import logging
import requests
import allure
from requests import Response, PreparedRequest
logger = logging.getLogger("requests.session")
logging.basicConfig(level=logging.INFO)
class Session(requests.Session):
def __init__(self, base_url=None):
super().__init__() # 先执行父类的初始化
self.base_url = base_url # 在执行子类的初始化操作
@allure.step("发送请求")
def request(self, method, url: str, *args, **kwargs) -> Response:
if not url.startswith("http"):
# 自动添加baseurl
url = urljoin(self.base_url, url)
return super().request(method, url, *args, **kwargs) # 按照原有方式执行
def send(self, request: PreparedRequest, *args, **kwargs) -> Response:
logger.info(f"发送请求>>>>>> 接口地址 = {request.method} {request.url}")
logger.info(f"发送请求>>>>>> 请求头 = {request.headers}")
logger.info(f"发送请求>>>>>> 请求正文 = {request.body} ")
resp = super().send(request, **kwargs) # 按照原有方式发送请求
logger.info(f"接收响应 <<<<<< 状态码 = {resp.status_code}")
logger.info(f"接收响应 <<<<<< 响应头 = {resp.headers}")
logger.info(f"接收响应 <<<<<< 响应正文 = {resp.json()}")
return resp
if __name__ == '__main__':
...

28
commons/settings.py Normal file
View File

@@ -0,0 +1,28 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei
@Software: PyCharm
@contact: t6i888@163.com
@file: settings
@date: 2025/2/23 21:34
@desc:
"""
base_url = 'http://127.0.0.1:8000'
case_path = r"E:\PyP\InterfaceAutoTest\TestCases"
exchanger = r"E:\PyP\InterfaceAutoTest\extract.yaml"
id_path =r"E:\PyP\InterfaceAutoTest\id.yaml"
db_host = '119.91.19.171' # ip
db_port = 3306 # 端口
db_user = 'root' # 用户名
db_password = 'mysql_hNahSe' # 密码
db_database = 'answer' # 库名
allure_epic: str = "项目名称answer"
allure_feature: str = "默认特征feature"
allure_story: str = "默认事件story"
rsa_public = ""
rsa_private = ""

82
commons/templates.py Normal file
View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: chen wei
@Software: PyCharm
@contact: t6i888@163.com
@file: templates.py
@date: 2024 2024/9/22 22:20
@desc:
"""
import copy
import logging
import re
import string
logger = logging.getLogger(__name__)
def _str(s) -> str:
# 将数据转换为str类型。
return f"'{s}'"
class Template(string.Template):
"""
1支持函数调用
2参数也可以是变量
"""
func_mapping = {
"str": _str,
"int": int,
"float": float,
"bool": bool
} # 内置函数有的直接放入mapping内置函数没有的在funcs中定义自动放入mapping
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
def render(self, mapping: dict) -> str:
s = self.safe_substitute(mapping) # 原有方法替换变量
s = self.safe_substitute_funcs(s, mapping)
return s
def safe_substitute_funcs(self, template, mapping) -> str:
"""
解析字符串中的函数名和参数,并将函数调用结果进行替换
:param template: 字符串
:param mapping: 上下文,提供要使用的函数和变量
:return: 替换后的结果
"""
mapping = copy.deepcopy(mapping)
mapping.update(self.func_mapping) # 合并两个mapping
def convert(mo):
func_name = mo.group("func_name")
func_args = mo.group("func_args").split(",")
func = mapping.get(func_name) # 读取指定函数
func_args_value = [mapping.get(arg, arg) for arg in func_args]
if func_args_value == [""]: # 处理没有参数的func
func_args_value = []
if not callable(func):
return mo.group() # 如果是不可调用的假函数,不进行替换
else:
return str(func(*func_args_value)) # 否则用函数结果进行替换
return self.call_pattern.sub(convert, template)
def hot_load():
from commons import funcs
for func_name in dir(funcs): # 遍历模块中的所有函数
if func_name.startswith("_"):
continue
func_code = getattr(funcs, func_name) # 取到函数对象
if callable(func_code): # 如果是一个可以调用的函数
Template.func_mapping[func_name] = func_code # 函数放到Template中
hot_load()