refactor: 优化日志系统及自动化备份机制

- 替换 loguru 为原生 logging 库(与pytest兼容性更好)。
- 更新 pytest.ini 统一配置日志格式和基础命令。
- 优化 main.py 增加测试后的日志自动备份与定期清理功能。
- 新增 settings.py 实现配置解耦
- 更新 .gitignore
This commit is contained in:
2026-01-19 17:55:50 +08:00
parent 9355a576a9
commit a53a26766d
9 changed files with 263 additions and 146 deletions

28
.gitignore vendored
View File

@@ -5,10 +5,26 @@ build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
node_modules
.idea
uv.lock
# --- 依赖与环境 ---
.venv
venv/
node_modules/
uv.lock
# --- 屏蔽outputs ---
outputs/logs/*.log
outputs/logs/backups/*
outputs/screenshots/
# --- Allure 报告 ---
temp/
report/
.allure/
# --- pytest缓存 ---
.pytest_cache/
.allure_cache/
# --- 配置文件 ---
.env

11
core/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com,chenwei@zygj.com
@file: __init__.py
@date: 2026/1/16 10:49
@desc:
"""

125
core/driver.py Normal file
View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com,chenwei@zygj.com
@file: driver
@date: 2026/1/16 10:49
@desc:
"""
import logging
from enum import Enum
from typing import Optional, Type
from appium import webdriver
from appium.options.android import UiAutomator2Options
from appium.options.ios import XCUITestOptions
from appium.options.common.base import AppiumOptions
from appium.webdriver.webdriver import ExtensionBase
from appium.webdriver.client_config import AppiumClientConfig
logger = logging.getLogger(__name__)
class AppPlatform(Enum):
ANDROID = "android"
IOS = "ios"
class AppDriver:
def __init__(self):
self.driver: Optional[webdriver.Remote] = None
self._host = "127.0.0.1"
self._port = 4723
def server_config(self):
"""配置服务端信息"""
# self._host = host
# self._port = port
logger.info(f"Appium Server 指向 -> {self._host}:{self._port}")
def connect(self, platform: str | AppPlatform, caps: dict,
extensions: list[Type[ExtensionBase]] | None = None,
client_config: AppiumClientConfig | None = None) -> 'AppDriver':
"""
参照 KeyWordDriver 逻辑,但强化了配置校验和异常处理
"""
# 1. 统一格式化平台名称
platform_name = platform.value if isinstance(platform, AppPlatform) else platform.lower().strip()
url = f"http://{self._host}:{self._port}"
# 2. 预校验:如果已经有 driver 正在运行,先清理(防止 Session 冲突)
if self.driver:
logger.warning("发现旧的 Driver 实例尚未关闭,正在强制重置...")
self.quit()
try:
# 3. 匹配平台并加载 Options
match platform_name:
case AppPlatform.ANDROID.value:
logger.info(f"正在初始化 Android 会话...")
options: AppiumOptions = UiAutomator2Options().load_capabilities(caps)
case AppPlatform.IOS.value:
logger.info(f"正在初始化 iOS 会话...")
options: AppiumOptions = XCUITestOptions().load_capabilities(caps)
case _:
# 优化:不再默认返回 Android而是显式报错 (Fail Fast)
msg = f"不支持的平台类型: [{platform_name}]。当前仅支持: [android, ios]"
logger.error(msg)
raise ValueError(msg)
# 4. 创建连接
self.driver = webdriver.Remote(
command_executor=url,
options=options,
extensions=extensions,
client_config=client_config
)
logger.info(f"已成功连接到 {platform_name.upper()} 设备 (SessionID: {self.driver.session_id})")
return self
except Exception as e:
logger.error(f"驱动连接失败!底层错误信息: {e}")
# 确保失败后清理现场
self.driver = None
raise ConnectionError(f"无法连接到 Appium 服务,请检查端口 {self._port} 或设备状态。") from e
# --- 开始封装操作方法 ---
def click(self, locator: tuple):
"""封装点击locator 格式如 (AppiumBy.ID, "xxx")"""
logger.info(f"点击元素: {locator}")
self.driver.find_element(*locator).click()
def input(self, locator: tuple, text: str):
"""封装输入"""
logger.info(f"对元素 {locator} 输入内容: {text}")
el = self.driver.find_element(*locator)
el.clear()
el.send_keys(text)
def quit(self):
"""安全退出"""
if self.driver:
try:
# 获取 session_id 用于日志追踪
sid = self.driver.session_id
self.driver.quit()
logger.info(f"已安全断开连接 (Session: {sid})")
except Exception as e:
logger.warning(f"断开连接时发生异常 (可能服务已预先关闭): {e}")
finally:
self.driver = None
else:
logger.debug("没有正在运行的 Driver 实例需要关闭。")
@property
def is_alive(self) -> bool:
"""判断当前驱动是否可用"""
return self.driver is not None and self.driver.session_id is not None

View File

@@ -10,6 +10,7 @@
@desc:
"""
import functools
import logging
import signal
import subprocess
import time
@@ -18,16 +19,11 @@ import sys
import http.client
import socket
import json
from collections import namedtuple
from pathlib import Path
from enum import Enum
# --- 核心配置 ---
# 使用 pathlib 获取当前脚本所在的绝对路径
BASE_DIR = Path(__file__).resolve().parent
from core.settings import BASE_DIR, APPIUM_HOST, APPIUM_PORT
APPIUM_HOST = "127.0.0.1"
APPIUM_PORT = 4723
logger = logging.getLogger(__name__)
# 使用 npm run 确保 APPIUM_HOME=. 变量和本地版本生效
@@ -58,8 +54,8 @@ def get_appium_command():
if not appium_bin.exists():
# 报错提示:找不到本地 Appium引导用户安装
print(f"\n错误: 在路径 {appium_bin} 未找到 Appium 执行文件。")
print("💡 请确保已在项目目录下执行过: npm install appium")
logger.info(f"\n错误: 在路径 {appium_bin} 未找到 Appium 执行文件。")
logger.info("请确保已在项目目录下执行过: npm install appium")
sys.exit(1)
# 返回执行列表(用于 shell=False
return [str(appium_bin), "-p", str(APPIUM_PORT)]
@@ -73,7 +69,7 @@ def _cleanup_process_tree(process: subprocess.Popen = None):
"""核心清理逻辑:针对方案一的跨平台递归关闭"""
if not process or process.poll() is not None:
return
print(f"\n🛑 正在关闭 Appium 进程树 (PID: {process.pid})...")
logger.info(f"正在关闭 Appium 进程树 (PID: {process.pid})...")
try:
if sys.platform == "win32":
# Windows 下使用 taskkill 强制关闭进程树 /T
@@ -85,15 +81,15 @@ def _cleanup_process_tree(process: subprocess.Popen = None):
os.killpg(pgid, signal.SIGTERM)
process.wait(timeout=5)
print("所有相关进程已安全退出。")
logger.info("所有相关进程已安全退出。")
except Exception as e:
print(f"⚠️ 关闭进程时遇到意外: {e}")
logger.error(f"关闭进程时遇到意外: {e}")
try:
process.kill()
except Exception as e:
print(f"强制: {e}")
logger.warning(f"强制关闭: {e}")
# 这里通常保持安静,因为我们已经尝试过清理了
print("服务已完全清理。")
logger.info("服务已完全清理。")
class AppiumService:
@@ -107,12 +103,12 @@ class AppiumService:
"""统一停止接口:根据角色决定是否关闭进程"""
match self.role:
case ServiceRole.EXTERNAL:
print(f"--> [角色: {self.role.value}] 脚本退出,保留原服务运行。")
logger.info(f"--> [角色: {self.role.value}] 脚本退出,保留原服务运行。")
return
case ServiceRole.MANAGED:
_cleanup_process_tree(self.process)
case ServiceRole.NULL:
print(f"--> [角色: {self.role.value}] 无需清理。")
logger.info(f"--> [角色: {self.role.value}] 无需清理。")
def __repr__(self):
return f"<AppiumService 角色='{self.role.value}' 端口={APPIUM_PORT}>"
@@ -149,7 +145,7 @@ def get_appium_status() -> AppiumStatus:
except (http.client.HTTPException, json.JSONDecodeError):
return AppiumStatus.UNKNOWN
except Exception as e:
print(e)
logger.error(e)
return AppiumStatus.ERROR
finally:
if conn: conn.close()
@@ -169,28 +165,28 @@ def start_appium_service() -> AppiumService:
if managed:
# 安全打印 PID
pid_str = f"PID: {process.pid}" if process else "EXTERNAL"
print(f"Appium 已经完全就绪! ({pid_str})")
logger.info(f"Appium 已经完全就绪! ({pid_str})")
return AppiumService(ServiceRole.MANAGED, process)
else:
print(f"--> [复用] 有效的 Appium 服务已在运行 (Port: {APPIUM_PORT})")
print(" [注意] 脚本退出时将保留该服务,不会将其关闭。")
logger.info(f"--> [复用] 有效的 Appium 服务已在运行 (Port: {APPIUM_PORT})")
logger.info("--> [注意] 脚本退出时将保留该服务,不会将其关闭。")
return AppiumService(ServiceRole.EXTERNAL, process)
case AppiumStatus.CONFLICT:
print(f"\n[!] 错误: 端口 {APPIUM_PORT} 被非 Appium 程序占用。")
print("=" * 60)
print("请手动执行以下命令释放端口后重试:")
logger.warning(f"\n[!] 错误: 端口 {APPIUM_PORT} 被非 Appium 程序占用。")
logger.info("=" * 60)
logger.info("请手动执行以下命令释放端口后重试:")
if sys.platform == "win32":
print(
logger.info(
f" CMD: for /f \"tokens=5\" %a in ('netstat -aon ^| findstr :{APPIUM_PORT}') do taskkill /F /PID %a")
else:
print(f" Terminal: lsof -ti:{APPIUM_PORT} | xargs kill -9")
print("=" * 60)
logger.info(f" Terminal: lsof -ti:{APPIUM_PORT} | xargs kill -9")
logger.info("=" * 60)
sys.exit(1)
case AppiumStatus.OFFLINE:
if not managed:
print("🔌 Appium 未启动")
print(f"🚀 正在准备启动本地 Appium 服务 (Port: {APPIUM_PORT})...")
logger.info("Appium 服务未启动...")
logger.info(f"正在准备启动本地 Appium 服务 (Port: {APPIUM_PORT})...")
# 注入环境变量,确保 Appium 寻找项目本地的驱动
env_vars = os.environ.copy()
@@ -209,31 +205,31 @@ def start_appium_service() -> AppiumService:
)
managed = True
except Exception as e:
print(f"启动过程发生异常: {e}")
logger.error(f"启动过程发生异常: {e}")
sys.exit(1)
else:
if process and process.poll() is not None:
print("Appium 进程启动后异常退出。")
logger.warning("Appium 进程启动后异常退出。")
sys.exit(1)
case AppiumStatus.INITIALIZING:
if managed and process and process.poll() is not None:
print("Appium 驱动加载期间进程崩溃。")
logger.warning("Appium 驱动加载期间进程崩溃。")
_cleanup_process_tree(process)
sys.exit(1)
if i % 4 == 0: # 每 2 秒提醒一次,避免刷屏
print("Appium 正在加载驱动/插件,请稍候...")
logger.info("Appium 正在加载驱动/插件,请稍候...")
case AppiumStatus.ERROR:
print("探测接口发生内部错误(可能是解析失败或严重网络异常),脚本终止。")
logger.error("探测接口发生内部错误(可能是解析失败或严重网络异常),脚本终止。")
if managed and process:
_cleanup_process_tree(process)
sys.exit(1)
case _:
print("Appium 启动异常")
logger.error("Appium 启动异常")
sys.exit(1)
time.sleep(0.5)
print("启动超时Appium 在规定时间内未完成初始化。")
logger.warning("启动超时Appium 在规定时间内未完成初始化。")
_cleanup_process_tree(process)
sys.exit(1)
@@ -242,15 +238,18 @@ def stop_appium_service(server: AppiumService):
# """安全关闭服务"""
server.stop()
# --- 装饰器实现 ---
def with_appium(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
service = start_appium_service()
try:
return func(service, *args, **kwargs)
# return func(service, *args, **kwargs)
return func(*args, **kwargs)
finally:
service.stop()
return wrapper

58
main.py
View File

@@ -1,42 +1,38 @@
import time
import os
import shutil
import datetime
from pathlib import Path
from appium import webdriver
from appium.options.android import UiAutomator2Options
import pytest
from run_appium import start_appium_service, stop_appium_service, with_appium
from core.settings import LOG_SOURCE, LOG_BACKUP_DIR, ALLURE_TEMP, REPORT_DIR
# 日志自动清理
def _clean_old_logs(backup_dir, keep_count=10):
files = sorted(Path(backup_dir).glob("pytest_*.log"), key=os.path.getmtime)
while len(files) > keep_count:
os.remove(files.pop(0))
@with_appium
def main(service):
print(f"正在测试,服务模式: {service.role}")
# 简单操作示例
driver = None
def main():
try:
# 在自动化套件启动前执行
# proc = start_appium_service()
# 2. 执行 Pytest
# 建议保留你之前配置的 -s -v 等参数
exit_code = pytest.main(["test_cases", "-x", "-v", f"--alluredir={ALLURE_TEMP}"])
# 配置Android设备参数
capabilities = dict(
platformName='Android',
automationName='uiautomator2',
deviceName='Android',
appPackage='com.android.settings',
appActivity='.Settings'
)
# 3. 生成报告
if ALLURE_TEMP.exists():
os.system(f'allure generate {ALLURE_TEMP} -o {REPORT_DIR} --clean')
# 转换capabilities为Appium Options
options = UiAutomator2Options().load_capabilities(capabilities)
# 连接Appium服务器
# driver = webdriver.Remote('http://localhost:4723', options=options)
driver = webdriver.Remote('http://127.0.0.1:4723', options=options)
time.sleep(1)
print("当前Activity:", driver.current_activity)
finally:
driver.quit()
# 在自动化套件结束后执行
# stop_appium_service(proc)
print("Hello from AppAutoTest!")
# 4. 备份日志 (无论测试是否崩溃都执行)
if LOG_SOURCE.exists():
now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = LOG_BACKUP_DIR / f"pytest_{now}.log"
shutil.copy2(LOG_SOURCE, backup_path)
print(f"日志已备份至: {backup_path}")
_clean_old_logs(LOG_BACKUP_DIR)
else:
print("未找到原始日志文件,跳过备份。")
if __name__ == "__main__":

View File

@@ -1,12 +1,14 @@
[project]
name = "appautotest"
version = "0.1.0"
description = "Add your description here"
description = "App自动化测试框架"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"allure-pytest==2.13.5",
"appium-python-client>=5.2.4",
"loguru>=0.7.3",
"pytest>=8.3.5",
]
[[tool.uv.index]]

11
test_cases/conftest.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com,chenwei@zygj.com
@file: conftest
@date: 2026/1/19 14:08
@desc:
"""

11
utils/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python
# coding=utf-8
"""
@author: CNWei,ChenWei
@Software: PyCharm
@contact: t6g888@163.com,chenwei@zygj.com
@file: __init__.py
@date: 2026/1/16 09:06
@desc:
"""

View File

@@ -9,61 +9,21 @@
@date: 2026/1/15 11:30
@desc:
"""
import sys
import time
import functools
from pathlib import Path
import inspect
from loguru import logger
import logging
from core.settings import LOG_DIR
# 1. 确定日志存储路径
LOG_DIR = Path(__file__).parent.parent / "logs"
LOG_DIR.mkdir(exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)
# 2. 统一定义日志格式 (美化版)
# <green> 等标签是控制台颜色,文件日志中会自动剥离颜色代码
LOG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<magenta>{extra[source]: <8}</magenta> | "
"<cyan>{module}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
logger = logging.getLogger(__name__)
def setup_logger():
"""
只需在项目入口调用一次。
如果是简单的自动化脚本,甚至可以直接在模块内执行。
"""
# 移除 Loguru 默认的控制台处理器(避免重复打印)
logger.remove()
# 添加自定义控制台输出
logger.add(
sys.stdout,
format=LOG_FORMAT,
level="INFO",
colorize=True,
# 默认给一个 'Global' 的 device 标签
filter=lambda record: record["extra"].setdefault("source", "System")
)
# 添加按天滚动的日志文件
logger.add(
str(LOG_DIR / "appium_{time:YYYY-MM-DD}.log"),
format=LOG_FORMAT,
level="DEBUG",
rotation="00:00", # 每天午夜滚动
retention="10 days", # 保留最近10天
compression="zip", # 旧日志自动压缩
encoding="utf-8",
enqueue=True # 开启队列模式,确保多线程下日志不串行
)
# --- 核心特性 1装饰器集成 ---
def trace_step(step_desc="", source: str = 'task'):
# --- 核心特性:装饰器集成 ---
def trace_step(step_desc="", source='wrapper'):
"""
通用执行追踪装饰器:
1. 智能识别并过滤 self/cls 参数
@@ -74,17 +34,14 @@ def trace_step(step_desc="", source: str = 'task'):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# --- 智能参数解析 ---
# 获取函数的签名
# 获取对应的 logger
_logger = logging.getLogger(source)
# _logger = logging.getLogger(f"{source}.{func.__name__}")
# 参数解析 (跳过 self/cls)
sig = inspect.signature(func)
params = list(sig.parameters.values())
# 检查第一个参数名是否为 'self' 或 'cls'
# 这样既兼容了 PageObject 的实例方法,也兼容了纯函数
if params and params[0].name in ('self', 'cls'):
display_args = args[1:]
else:
display_args = args
display_args = args[1:] if params and params[0].name in ('self', 'cls') else args
# 格式化参数显示,方便阅读
args_repr = [repr(a) for a in display_args]
@@ -93,32 +50,21 @@ def trace_step(step_desc="", source: str = 'task'):
func_name = f"{func.__module__}.{func.__name__}"
# 使用 bind 临时改变这一步的 source 标签
_logger = logger.bind(source=source)
# 使用关联的上下文 logger
# logger.info(f"🚀 [步骤开始] {step_desc} | 执行方法: {func_name} | 参数: {display_args} {kwargs}")
_logger.info(f"🚀 [步骤开始] {step_desc} | 方法: {func_name}({all_params})")
_logger.info(f"[步骤开始] | {step_desc} | 方法: {func_name}({all_params})")
start_t = time.perf_counter()
try:
result = func(*args, **kwargs)
duration = time.perf_counter() - start_t
_logger.success(f"[步骤成功] {step_desc} | 耗时: {duration:.2f}s | 返回: {result!r}")
_logger.info(f"[步骤成功] {step_desc} | 耗时: {duration:.2f}s | 返回: {result!r}")
return result
except Exception as e:
duration = time.perf_counter() - start_t
_logger.error(
f"[步骤失败] {step_desc} | 耗时: {duration:.2f}s | 异常: {type(e).__name__}: {e}")
# logging.exception 关键点:它会自动把详细的堆栈信息写入日志文件
_logger.error(f"[步骤失败] {step_desc}| 耗时: {duration:.2f}s|异常: {type(e).__name__}")
# f"[步骤失败] {step_desc} | 耗时: {duration:.2f}s | 异常: {type(e).__name__}: {e}")
raise e
return wrapper
return decorator
# 初始化
setup_logger()
# 导出供外部使用
__all__ = ["logger", "trace_step"]