refactor: 优化代码

- 优化 部分核心功能实现。
- 新增 详细的文档字符串(Docstrings)和注释。
- 移除 代码中的冗余注释和无效代码。
This commit is contained in:
2026-02-02 17:48:30 +08:00
parent fd6f4fbcbe
commit 4de84039cb
10 changed files with 344 additions and 164 deletions

View File

@@ -20,8 +20,9 @@ import http.client
import socket
import json
from enum import Enum
from typing import List
from core.settings import BASE_DIR, APPIUM_HOST, APPIUM_PORT
from core.settings import BASE_DIR, APPIUM_HOST, APPIUM_PORT, MAX_RETRIES
logger = logging.getLogger(__name__)
@@ -41,20 +42,26 @@ class AppiumStatus(Enum):
class ServiceRole(Enum):
"""服务角色枚举:明确权责"""
MANAGED = "由脚本启动 (受控模式)" # 需要负责清理
EXTERNAL = "复用外部服务 (共享模式)" # 不负责清理
NULL = "无效服务 (空模式)" # 占位或失败状态
"""服务角色枚举:定义服务的所有权和生命周期"""
MANAGED = "由脚本启动 (托管模式)" # 由本脚本启动,负责清理
EXTERNAL = "复用外部服务 (共享模式)" # 复用现有服务,不负责清理
NULL = "无效服务 (空模式)" # 无效或未初始化的服务
def get_appium_command():
"""精确定位 Appium 执行文件,避免 npm 包装层"""
def resolve_appium_command() -> List[str]:
"""
解析 Appium 可执行文件的绝对路径。
优先查找项目 node_modules 下的本地安装版本,避免 npm 包装层带来的信号传递问题。
:return: 用于 subprocess 的命令列表
:raises SystemExit: 如果找不到 Appium 执行文件
"""
bin_name = "appium.cmd" if sys.platform == "win32" else "appium"
appium_bin = BASE_DIR / "node_modules" / ".bin" / bin_name
if not appium_bin.exists():
# 报错提示:找不到本地 Appium引导用户安装
logger.info(f"\n错误: 在路径 {appium_bin} 未找到 Appium 执行文件。")
logger.error(f"\n错误: 在路径 {appium_bin} 未找到 Appium 执行文件。")
logger.info("请确保已在项目目录下执行过: npm install appium")
sys.exit(1)
# 返回执行列表(用于 shell=False
@@ -62,37 +69,45 @@ def get_appium_command():
# 移除全局调用,防止 import 时因找不到 appium 而直接退出
# APP_CMD_LIST = get_appium_command()
# APP_CMD_LIST = resolve_appium_command()
def _cleanup_process_tree(process: subprocess.Popen = None):
"""核心清理逻辑:针对方案一的跨平台递归关闭"""
def _cleanup_process_tree(process: subprocess.Popen = None) -> None:
"""
核心清理逻辑:跨平台递归终止进程树。
:param process: subprocess.Popen 对象
"""
if not process or process.poll() is not None:
return
logger.info(f"正在关闭 Appium 进程树 (PID: {process.pid})...")
try:
if sys.platform == "win32":
# Windows 下使用 taskkill 强制关闭进程树 /T
subprocess.run(['taskkill', '/F', '/T', '/PID', str(process.pid)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.run(
['taskkill', '/F', '/T', '/PID', str(process.pid)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
else:
# Unix/Mac: 获取进程组 ID 并发送终止信号
# Unix/Linux: 获取进程组 ID 并发送终止信号
pgid = os.getpgid(process.pid)
os.killpg(pgid, signal.SIGTERM)
process.wait(timeout=5)
logger.info("所有相关进程已安全退出。")
logger.info("Appium 服务已停止,相关进程已安全退出。")
except Exception as e:
logger.error(f"关闭进程时遇到意外: {e}")
logger.error(f"停止服务时发生异常: {e}")
try:
process.kill()
logger.warning(f"已强制杀死进程: {e}")
except Exception as e:
logger.warning(f"强制关闭: {e}")
logger.debug(f"强制杀死进程失败 (可能已退出): {e}")
# 这里通常保持安静,因为我们已经尝试过清理了
logger.info("服务已完全清理。")
class AppiumService:
"""Appium 服务上下文容器"""
"""Appium 服务实例封装,用于管理服务生命周期"""
def __init__(self, role: ServiceRole, process: subprocess.Popen = None):
self.role = role
@@ -102,25 +117,47 @@ class AppiumService:
"""统一停止接口:根据角色决定是否关闭进程"""
match self.role:
case ServiceRole.EXTERNAL:
logger.info(f"--> [角色: {self.role.value}] 脚本退出,保留服务运行。")
logger.info(f"--> [角色: {self.role.value}] 脚本退出,保留外部服务运行。")
return
case ServiceRole.MANAGED:
_cleanup_process_tree(self.process)
case ServiceRole.NULL:
logger.info(f"--> [角色: {self.role.value}] 无需清理。")
logger.info(f"--> [角色: {self.role.value}] 无需执行清理。")
def __repr__(self):
return f"<AppiumService 角色='{self.role.value}' 端口={APPIUM_PORT}>"
def _check_port_availability() -> AppiumStatus:
"""辅助函数:检查端口是否被占用"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.settimeout(1)
# 尝试绑定端口,如果成功说明端口空闲
sock.bind((APPIUM_HOST, APPIUM_PORT))
return AppiumStatus.OFFLINE # 真正未启动
except OSError:
# 绑定失败,说明端口被占用,但此前 HTTP 请求失败,说明不是 Appium
return AppiumStatus.CONFLICT # 端口被占用但没响应 HTTP
def get_appium_status() -> AppiumStatus:
"""深度探测 Appium 状态"""
conn = None
"""
检测 Appium 服务当前状态。
逻辑:
1. 尝试 HTTP 连接 /status 接口。
2. 如果连接成功,检查 ready 字段。
3. 如果连接被拒绝,尝试绑定端口以确认端口是否真正空闲。
:return: AppiumStatus 枚举
"""
connection = None
try:
# 1. 端口开启,尝试获取 Appium 状态接口
conn = http.client.HTTPConnection(APPIUM_HOST, APPIUM_PORT, timeout=2)
conn.request("GET", "/status")
response = conn.getresponse()
connection = http.client.HTTPConnection(APPIUM_HOST, APPIUM_PORT, timeout=2)
connection.request("GET", "/status")
response = connection.getresponse()
if response.status != 200:
return AppiumStatus.CONFLICT
@@ -134,95 +171,63 @@ def get_appium_status() -> AppiumStatus:
except (socket.error, ConnectionRefusedError):
# 3. 如果通信拒绝,检查端口是否真的空闲
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.settimeout(1)
s.bind((APPIUM_HOST, APPIUM_PORT))
return AppiumStatus.OFFLINE # 真正未启动
except OSError:
return AppiumStatus.CONFLICT # 端口被占用但没响应 HTTP
return _check_port_availability()
except (http.client.HTTPException, json.JSONDecodeError):
return AppiumStatus.UNKNOWN
except Exception as e:
logger.error(e)
logger.error(f"状态检测异常: {e}")
return AppiumStatus.ERROR
finally:
if conn: conn.close()
if connection: connection.close()
def start_appium_service() -> AppiumService:
"""管理 Appium 服务的生命周期"""
process = None # 1. 预先初始化变量,防止作用域错误
managed = False
# 轮询等待真正就绪
# 延迟获取命令,确保只在真正需要启动服务时检查环境
app_cmd_list = get_appium_command()
max_retries = 40
"""
管理 Appium 服务的生命周期
如果服务未启动,则启动本地服务;如果已启动,则复用。
for i in range(max_retries):
:return: AppiumService 对象
"""
process = None # 1. 预先初始化变量,防止作用域错误
is_managed = False
# 轮询等待真正就绪
# 延迟获取命令,确保只在真正需要启动服务时检查环境
cmd_args = resolve_appium_command()
for i in range(MAX_RETRIES):
status = get_appium_status()
match status: # Python 3.10+ 的模式匹配
case AppiumStatus.READY:
if managed:
if is_managed:
# 安全打印 PID
pid_str = f"PID: {process.pid}" if process else "EXTERNAL"
logger.info(f"Appium 已经完全就绪! ({pid_str})")
pid_info = f"PID: {process.pid}" if process else "EXTERNAL"
logger.info(f"Appium 服务启动成功! ({pid_info})")
return AppiumService(ServiceRole.MANAGED, process)
else:
logger.info(f"--> [复用] 有效的 Appium 服务已在运行 (Port: {APPIUM_PORT})")
logger.info("--> [注意] 脚本退出时将保留该服务,不会将其关闭。")
return AppiumService(ServiceRole.EXTERNAL, process)
case AppiumStatus.CONFLICT:
logger.warning(f"\n[!] 错误: 端口 {APPIUM_PORT} 被非 Appium 程序占用。")
logger.info("=" * 60)
logger.info("请手动执行以下命令释放端口后重试:")
if sys.platform == "win32":
logger.info(
f" CMD: for /f \"tokens=5\" %a in ('netstat -aon ^| findstr :{APPIUM_PORT}') do taskkill /F /PID %a")
else:
logger.info(f" Terminal: lsof -ti:{APPIUM_PORT} | xargs kill -9")
logger.info("=" * 60)
sys.exit(1)
_handle_port_conflict()
case AppiumStatus.OFFLINE:
if not is_managed:
process = _spawn_appium_process(cmd_args)
is_managed = True
if not managed:
logger.info("Appium 服务未启动...")
logger.info(f"正在准备启动本地 Appium 服务 (Port: {APPIUM_PORT})...")
# 注入环境变量,确保 Appium 寻找项目本地的驱动
env_vars = os.environ.copy()
env_vars["APPIUM_HOME"] = str(BASE_DIR)
try:
process = subprocess.Popen(
app_cmd_list,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env_vars,
cwd=BASE_DIR,
# Windows 和 Linux/Mac 的处理方式不同
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if sys.platform == "win32" else 0,
preexec_fn=os.setsid if sys.platform != "win32" else None
)
managed = True
except Exception as e:
logger.error(f"启动过程发生异常: {e}")
sys.exit(1)
else:
if process and process.poll() is not None:
logger.warning("Appium 进程启动后异常退出。")
sys.exit(1)
case AppiumStatus.INITIALIZING:
if managed and process and process.poll() is not None:
logger.warning("Appium 驱动加载期间进程崩溃。")
if is_managed and process and process.poll() is not None:
logger.warning("Appium 在初始化期间崩溃。")
_cleanup_process_tree(process)
sys.exit(1)
if i % 4 == 0: # 每 2 秒提醒一次,避免刷屏
logger.info("Appium 正在加载驱动/插件,请稍候...")
case AppiumStatus.ERROR:
logger.error("探测接口发生内部错误(可能是解析失败或严重网络异常),脚本终止。")
if managed and process:
if is_managed and process:
_cleanup_process_tree(process)
sys.exit(1)
case _:
@@ -236,13 +241,60 @@ def start_appium_service() -> AppiumService:
sys.exit(1)
def stop_appium_service(server: AppiumService):
# """安全关闭服务"""
server.stop()
def _handle_port_conflict():
logger.warning(f"\n[!] 错误: 端口 {APPIUM_PORT} 被占用。")
logger.info("=" * 60)
logger.info("请手动执行以下命令释放端口后重试:")
if sys.platform == "win32":
logger.info(
f" CMD: for /f \"tokens=5\" %a in ('netstat -aon ^| findstr :{APPIUM_PORT}') do taskkill /F /PID %a")
else:
logger.info(f" Terminal: lsof -ti:{APPIUM_PORT} | xargs kill -9")
logger.info("=" * 60)
sys.exit(1)
def _spawn_appium_process(cmd_args: List[str]) -> subprocess.Popen:
"""启动 Appium 子进程"""
logger.info(f"正在启动本地 Appium 服务 (Port: {APPIUM_PORT})...")
# 注入环境变量,确保 Appium 寻找项目本地的驱动
env_vars = os.environ.copy()
env_vars["APPIUM_HOME"] = str(BASE_DIR)
try:
return subprocess.Popen(
cmd_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env_vars,
cwd=BASE_DIR,
# Windows 和 Linux/Mac 的处理方式不同
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if sys.platform == "win32" else 0,
preexec_fn=os.setsid if sys.platform != "win32" else None
)
except Exception as e:
logger.error(f"启动过程发生异常: {e}")
sys.exit(1)
def stop_appium_service(service: AppiumService):
"""
停止 Appium 服务。
:param service: AppiumService 对象
"""
service.stop()
# --- 装饰器实现 ---
def with_appium(func):
"""
装饰器:在函数执行前后自动启动和停止 Appium 服务。
适用于简单的脚本或调试场景。
:param func: 需要包装的函数
:return: 包装后的函数
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
service = start_appium_service()
@@ -257,18 +309,18 @@ def with_appium(func):
if __name__ == "__main__":
# 使用示例:作为一个上下文管理器或简单的生命周期示例
appium_proc = None
appium_service = None
try:
appium_proc = start_appium_service()
appium_service = start_appium_service()
print(f"\n[项目路径] {BASE_DIR}")
print("\n[提示] 现在可以手动或通过其他脚本运行测试用例。")
print("[提示] 按 Ctrl+C 停止由本脚本启动的服务。")
print(f"[服务状态] Appium 运行中 (Port: {APPIUM_PORT})")
print("[操作提示] 按 Ctrl+C 停止服务...")
# 保持运行,直到手动停止(在实际测试框架中,这里会被替换为测试执行逻辑)
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
print("\n收到停止信号...")
finally:
stop_appium_service(appium_proc)
stop_appium_service(appium_service)