From 69b449f5b6efd5609742f785dd00afd2705ac647 Mon Sep 17 00:00:00 2001 From: CNWei Date: Fri, 23 Jan 2026 11:16:57 +0800 Subject: [PATCH] =?UTF-8?q?feat(driver,custom=5Fexpected=5Fconditions):=20?= =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E6=98=BE=E5=BC=8F=E7=AD=89=E5=BE=85,=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A=E4=B9=89=E6=9D=A1=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 引入 custom_expected_conditions 模块,允许通过字符串别名调用。 - 重构 CoreDriver,所有元素查找和操作统一调用 explicit_wait,确保日志和等待逻辑的一致性。 代码。 --- conftest.py | 13 +- core/custom_expected_conditions.py | 184 +++++++++++++++++++++++++++++ core/driver.py | 70 +++++++++-- core/run_appium.py | 10 +- core/settings.py | 9 ++ utils/decorators.py | 29 ++++- 6 files changed, 284 insertions(+), 31 deletions(-) create mode 100644 core/custom_expected_conditions.py diff --git a/conftest.py b/conftest.py index 3de9641..3ede409 100644 --- a/conftest.py +++ b/conftest.py @@ -14,6 +14,7 @@ import logging import pytest from core.run_appium import start_appium_service, stop_appium_service from core.driver import CoreDriver +from core.settings import ANDROID_CAPS @pytest.fixture(scope="session") @@ -38,17 +39,9 @@ def driver(app_server): # 实例化你提供的类结构 app_helper = CoreDriver() - # 配置Android设备参数 - capabilities = dict( - platformName='Android', - automationName='uiautomator2', - deviceName='Android', - appPackage='com.android.settings', - appActivity='.Settings' - ) - # 连接并获取原生 driver 实例 - app_helper.connect(platform="android", caps=capabilities) + # 这里可以根据需要扩展,比如通过命令行参数选择平台 + app_helper.connect(platform="android", caps=ANDROID_CAPS) yield app_helper diff --git a/core/custom_expected_conditions.py b/core/custom_expected_conditions.py new file mode 100644 index 0000000..e30102e --- /dev/null +++ b/core/custom_expected_conditions.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei,ChenWei +@Software: PyCharm +@contact: t6g888@163.com +@file: custom_expected_conditions +@date: 2026/1/22 16:13 +@desc: 自定义预期条件 (Expected Conditions) +用于 WebDriverWait 的显式等待判断逻辑。 +""" + +import logging +from typing import Tuple, Any, Union +from appium.webdriver.webdriver import WebDriver +from selenium.webdriver.support import expected_conditions as EC +from selenium.common.exceptions import StaleElementReferenceException, NoSuchElementException +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webelement import WebElement + + +logger = logging.getLogger(__name__) + +""" + 常用等待条件(expected_conditions)--来自EC模块 + presence_of_element_located: 元素存在于DOM。 + visibility_of_element_located: 元素可见。 + element_to_be_clickable: 元素可点击。 + title_contains: 页面标题包含特定文本。 + text_to_be_present_in_element: 元素包含特定文本。 +""" + + +# 自定义预期条件(Custom Expected Condition) +class BaseCondition: + """基础条件类:负责统一的 WebDriverWait 协议实现和异常拦截""" + + def __call__(self, driver: WebDriver): + try: + return self.check(driver) + except (NoSuchElementException, StaleElementReferenceException): + return False + + def check(self, driver: WebDriver): + raise NotImplementedError("子类必须实现 check 方法") + + +EC_MAPPING: dict[str, Any] = {} + + +def register(name: str = None): + """ + 强大的注册装饰器: + 1. @register() -> 使用函数名注册 + 2. @register("alias") -> 使用别名注册 + 3. register("name", func) -> 手动注入 + """ + + def decorator(item): + reg_name = name or item.__name__ + EC_MAPPING[reg_name] = item + return item + + return decorator + + +@register("toast_visible") +class ToastVisible(BaseCondition): + def __init__(self, text: str, partial: Union[str, bool] = True): + self.text = text + # 处理从装饰器传来的字符串 "true"/"false" + if isinstance(partial, str): + self.partial = partial.lower() != "false" + else: + self.partial = partial + + def check(self, driver: WebDriver): + # 注意:这里不再需要显式 try-except,BaseCondition 会处理 + xpath = f"//*[contains(@text, '{self.text}')]" if self.partial else f"//*[@text='{self.text}']" + element = driver.find_element(By.XPATH, xpath) + return element if element.is_displayed() else False + + +@register("attr_contains") +class ElementHasAttribute(BaseCondition): + # 扁平化参数以支持字符串调用: "attr_contains:id,btn_id,checked,true" + def __init__(self, by: str, value: str, attribute: str, expect_value: str): + self.locator = (by, value) + self.attribute = attribute + self.value = expect_value + + def check(self, driver: WebDriver): + element = driver.find_element(*self.locator) + attr_value = element.get_attribute(self.attribute) + return element if (attr_value and self.value in attr_value) else False + + +@register() +class ElementCountAtLeast(BaseCondition): + """检查页面上匹配定位符的元素数量是否至少为 N 个""" + + def __init__(self, by: str, value: str, count: Union[str, int]): + self.locator = (by, value) + # 确保字符串参数转为整数 + self.count = int(count) + + def check(self, driver: WebDriver) -> bool | list[WebElement]: + elements = driver.find_elements(*self.locator) + if len(elements) >= self.count: + return elements + return False + + +@register() # 使用函数名 is_element_present 注册 +def is_element_present(by: str, value: str): + locator = (by, value) + + def _predicate(driver): + try: + return driver.find_element(*locator) + except Exception as e: + logger.warning(f"{__name__}异常:{e}") + return False + + return _predicate + + +@register() +def system_ready(api_client): + def _predicate(_): # 忽略传入的 driver + + try: + return api_client.get_status() == "OK" + except Exception as e: + logger.warning(f"{__name__}异常:{e}") + return False + + return _predicate + + +def get_condition(method: Union[str, Any], *args, **kwargs): + """ + 智能获取预期条件: + 1. 如果 method 是字符串,先查自定义 EC_MAPPING + 2. 如果自定义里没有,去官方 selenium.webdriver.support.expected_conditions 找 + 3. 如果 method 本身就是 Callable (比如 EC.presence_of_element_located),直接透传 + """ + + # 情况 A: 如果传入的是官方 EC 对象或自定义函数实例,直接返回 + if callable(method) and not isinstance(method, type): + return method + + # 情况 B: 如果传入的是字符串别名 + if isinstance(method, str): + # 1. 尝试从自定义映射查找 + if method in EC_MAPPING: + target = EC_MAPPING[method] + # 2. 尝试从官方 EC 库查找 + elif hasattr(EC, method): + target = getattr(EC, method) + else: + raise ValueError(f"找不到预期条件: {method}. 请检查拼写或是否已注册。") + + # 实例化并返回 (无论是类还是闭包工厂) + return target(*args, **kwargs) + + # 情况 C: 传入的是类名本身 + if isinstance(method, type): + return method(*args, **kwargs) + + raise TypeError(f"不支持的条件类型: {type(method)}") + + +if __name__ == "__main__": + # print(EC_MAPPING) + cond1 = get_condition("toast_visible", "保存成功") + print(cond1) + # 调用闭包生成的条件 + cond2 = get_condition("is_element_present", (By.ID, "submit")) + print(cond2) + cond3 = get_condition(EC.presence_of_element_located, (By.ID, "submit")) + print(cond3) + # WebDriverWait(driver, 10).until(cond1) diff --git a/core/driver.py b/core/driver.py index 12c8236..61d0e21 100644 --- a/core/driver.py +++ b/core/driver.py @@ -4,7 +4,7 @@ """ @author: CNWei,ChenWei @Software: PyCharm -@contact: t6g888@163.com,chenwei@zygj.com +@contact: t6g888@163.com @file: driver @date: 2026/1/16 10:49 @desc: Appium 核心驱动封装,提供统一的 API 用于 Appium 会话管理和元素操作。 @@ -113,7 +113,7 @@ class CoreDriver: raise ConnectionError(f"无法连接到 Appium 服务,请检查端口 {self._port} 或设备状态。") from e # --- 核心操作 --- - def find_element(self, by, value, timeout: Optional[float] = None) -> WebElement: + def find_element(self, by: str, value: str, timeout: Optional[float] = None) -> WebElement: """ 内部通用查找(显式等待) :param by: 定位策略 @@ -123,8 +123,21 @@ class CoreDriver: """ by = by_converter(by) mark = (by, value) - wait_timeout = timeout if timeout is not None else EXPLICIT_WAIT_TIMEOUT - return WebDriverWait(self.driver, wait_timeout).until(EC.presence_of_element_located(mark)) + method = EC.presence_of_element_located(mark) + return self.explicit_wait(method, timeout) + + def find_elements(self, by: str, value: str, timeout: Optional[float] = None) -> list[WebElement]: + """ + 内部通用查找(显式等待) + :param by: 定位策略 + :param value: 定位值 + :param timeout: 等待超时时间 (秒)。如果为 None, 则使用全局默认超时. + :return: list[WebElement]. + """ + by = by_converter(by) + mark = (by, value) + method = EC.presence_of_all_elements_located(mark) + return self.explicit_wait(method, timeout) def delay(self, timeout: int | float) -> 'CoreDriver': """ @@ -151,6 +164,19 @@ class CoreDriver: Union[T, WebElement]: """ 执行显式等待,直到满足某个条件或超时。 + + 使用示例: + 1. 使用原生 Selenium EC: + driver.explicit_wait(EC.presence_of_element_located((By.ID, "el_id"))) + + 2. 使用自定义字符串别名 (支持参数传递): + # 检查 Toast 消息 + driver.explicit_wait("toast_visible:登录成功") + # 检查元素属性 (格式: key:by,value,attr,expect) + driver.explicit_wait("attr_contains:id,btn_submit,checked,true") + # 检查元素数量 + driver.explicit_wait("count_at_least:xpath,//android.widget.TextView,3") + :param method: EC等待条件(Callable) 或 自定义等待条件的名称(str) :param timeout: 超时时间 (秒)。如果为 None, 则使用全局默认超时. :return: 等待条件的执行结果 (通常是 WebElement 或 bool) @@ -162,6 +188,9 @@ class CoreDriver: func_name = getattr(method, '__name__', repr(method)) logger.info(f"执行显式等待: {func_name}, 超时: {wait_timeout}s") return WebDriverWait(self.driver, wait_timeout).until(method) + except TimeoutException: + logger.error(f"等待超时: {wait_timeout}s 内未满足条件 {method}") + raise except TypeError as te: logger.error(f"显示等待异常: {te}") # self.driver.quit() @@ -175,7 +204,7 @@ class CoreDriver: wait_timeout = timeout if timeout is not None else EXPLICIT_WAIT_TIMEOUT self.driver.set_page_load_timeout(wait_timeout) - def click(self, by, value, timeout: Optional[float] = None) -> 'CoreDriver': + def click(self, by: str, value: str, timeout: Optional[float] = None) -> 'CoreDriver': """ 查找元素并执行点击操作。 内置显式等待,确保元素可点击。 @@ -191,7 +220,7 @@ class CoreDriver: self.explicit_wait(method, timeout).click() return self - def clear(self, by, value, timeout: Optional[float] = None) -> 'CoreDriver': + def clear(self, by: str, value: str, timeout: Optional[float] = None) -> 'CoreDriver': """ 查找元素并清空其内容。 内置显式等待,确保元素可见。 @@ -207,19 +236,22 @@ class CoreDriver: self.explicit_wait(method, timeout).clear() return self - def input(self, by, value, text, timeout: Optional[float] = None) -> 'CoreDriver': + def input(self, by: str, value: str, text: str, sensitive: bool = False, + timeout: Optional[float] = None) -> 'CoreDriver': """ 查找元素并输入文本。 内置显式等待,确保元素可见。 :param by: 定位策略。 :param value: 定位值。 :param text: 要输入的文本。 + :param sensitive: 是否为敏感信息(如密码),如果是,日志中将掩码显示。 :param timeout: 等待超时时间。 :return: self """ by = by_converter(by) mark = (by, value) - logger.info(f"输入文本到 {mark}: '{text}'") + display_text = "******" if sensitive else text + logger.info(f"输入文本到 {mark}: '{display_text}'") method = EC.visibility_of_element_located(mark) self.explicit_wait(method, timeout).send_keys(text) return self @@ -241,7 +273,7 @@ class CoreDriver: except TimeoutException: return False - def get_text(self, by, value, timeout: Optional[float] = None) -> str: + def get_text(self, by: str, value: str, timeout: Optional[float] = None) -> str: """ 获取元素文本 :param by: 定位策略。 @@ -257,6 +289,22 @@ class CoreDriver: logger.info(f"获取到的文本{text}") return text + def get_attribute(self, by: str, value: str, name: str, timeout: Optional[float] = None) -> str: + """ + 获取元素属性 + :param by: 定位策略。 + :param value: 定位值。 + :param timeout: 等待超时时间。 + :param name: 属性名称 (如 'checked', 'enabled', 'resource-id') + """ + by = by_converter(by) + mark = (by, value) + method = EC.presence_of_element_located(mark) + element = self.explicit_wait(method, timeout) + attr_value = element.get_attribute(name) + logger.info(f"获取属性 {name} of {mark}: {attr_value}") + return attr_value + @property def session_id(self): """获取当前 Appium 会话的 Session ID。""" @@ -363,7 +411,7 @@ class CoreDriver: return self - def swipe_by_percent(self, start_xp, start_yp, end_xp, end_yp, duration: int = 1000) -> 'CoreDriver': + def swipe_by_percent(self, start_xp:int, start_yp:int, end_xp:int, end_yp:int, duration: int = 1000) -> 'CoreDriver': """ 按屏幕比例滑动 (0.5 = 50%) """ @@ -384,7 +432,7 @@ class CoreDriver: return self.driver is not None and self.driver.session_id is not None # --- 断言逻辑 --- - def assert_text(self, by, value, expected_text, timeout: Optional[float] = None) -> 'CoreDriver': + def assert_text(self, by:str, value:str, expected_text:str, timeout: Optional[float] = None) -> 'CoreDriver': """ 断言元素的文本内容是否符合预期。 :param by: 定位策略。 diff --git a/core/run_appium.py b/core/run_appium.py index fa04bf8..f6b1017 100644 --- a/core/run_appium.py +++ b/core/run_appium.py @@ -61,9 +61,8 @@ def get_appium_command(): return [str(appium_bin), "-p", str(APPIUM_PORT)] -# 在全局或 start_appium_service 中获取命令 -APP_CMD_LIST = get_appium_command() - +# 移除全局调用,防止 import 时因找不到 appium 而直接退出 +# APP_CMD_LIST = get_appium_command() def _cleanup_process_tree(process: subprocess.Popen = None): """核心清理逻辑:针对方案一的跨平台递归关闭""" @@ -156,6 +155,9 @@ def start_appium_service() -> AppiumService: process = None # 1. 预先初始化变量,防止作用域错误 managed = False # 轮询等待真正就绪 + + # 延迟获取命令,确保只在真正需要启动服务时检查环境 + app_cmd_list = get_appium_command() max_retries = 40 for i in range(max_retries): @@ -194,7 +196,7 @@ def start_appium_service() -> AppiumService: try: process = subprocess.Popen( - APP_CMD_LIST, + app_cmd_list, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env_vars, diff --git a/core/settings.py b/core/settings.py index b5ca9cd..7358861 100644 --- a/core/settings.py +++ b/core/settings.py @@ -38,3 +38,12 @@ APPIUM_SERVER = "http://127.0.0.1:4723" # --- 核心配置 --- APPIUM_HOST = "127.0.0.1" APPIUM_PORT = 4723 + +# --- 设备能力配置 (Capabilities) --- +ANDROID_CAPS = { + "platformName": "Android", + "automationName": "uiautomator2", + "deviceName": "Android", + "appPackage": "com.android.settings", + "appActivity": ".Settings", +} diff --git a/utils/decorators.py b/utils/decorators.py index 47b1ddc..7d64550 100644 --- a/utils/decorators.py +++ b/utils/decorators.py @@ -2,19 +2,36 @@ import logging from functools import wraps from typing import Union, Callable +from core.custom_expected_conditions import get_condition + logger = logging.getLogger(__name__) + def resolve_wait_method(func): """ 装饰器:将字符串形式的等待条件解析为可调用的 EC 对象 """ + @wraps(func) def wrapper(self, method: Union[Callable, str], *args, **kwargs): if isinstance(method, str): - # TODO: 这里可以接入 custom_ec 字典进行查找 - # method = custom_ec.get(method, lambda x: False) - logger.info(f"解析命名等待条件: '{method}'") - # 保持原有逻辑作为占位 - method = lambda _: False + # 解析格式 "key:arg1,arg2" 或 仅 "key" + ec_name = method + ec_args = [] + + if ":" in method: + ec_name, params = method.split(":", 1) + if params: + ec_args = params.split(",") + + # 委托给 core.custom_expected_conditions.get_condition 处理 + try: + logger.info(f"解析命名等待条件: '{ec_name}' 参数: {ec_args}") + method = get_condition(ec_name, *ec_args) + except Exception as e: + logger.error(f"解析等待条件 '{method}' 失败: {e}") + raise e + return func(self, method, *args, **kwargs) - return wrapper \ No newline at end of file + + return wrapper