#!/usr/bin/env python # coding=utf-8 """ @author: CNWei @Software: PyCharm @contact: t6i888@163.com @file: key_driver @date: 2025/4/2 21:59 @desc: """ import logging from time import sleep import secrets # 原生库,用于生成安全的随机数 from typing import Optional, Callable, Union, Literal, Any, TypeVar from urllib.parse import urljoin from selenium.webdriver import Chrome, Edge, Firefox, Safari, Ie from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support.ui import Select from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.remote.webdriver import WebElement from commons.assert_functions import custom_asserts from commons.custom_expected_condition import EC, custom_ec from commons.modules import Browser, Locator from commons.settings import configs, EXPLICIT_WAIT_TIMEOUT, SCREENSHOT_DIR logger = logging.getLogger(__name__) WebDriverOrWebElement = Union[WebDriver, WebElement] D = TypeVar("D", bound=Union[WebDriver, WebElement]) T = TypeVar("T") # 触发器 trigger # 浏览器 Browser # 执行器 actuator # 筛选器 Filters # 查找器 finder # 转换器 converter def webdriver_finder(browser: str | Browser = Browser.CHROME) -> WebDriver: match browser: case Browser.CHROME: return Chrome() case Browser.FIREFOX: return Firefox() case Browser.IE: return Ie() case Browser.SAFARI: return Safari() case Browser.EDGE: return Edge() case _: return Chrome() # 默认情况 def by_converter(by_value: str | Locator): try: # 统一处理输入 if isinstance(by_value, str): by = Locator(by_value.lower().replace(' ', '')) # 创建对应的浏览器实例 by = { Locator.ID: By.ID, Locator.NAME: By.NAME, Locator.CLASS: By.CLASS_NAME, Locator.TAG: By.TAG_NAME, Locator.LINK_TEXT: By.LINK_TEXT, Locator.PARTIAL_LINK_TEXT: By.PARTIAL_LINK_TEXT, Locator.CSS: By.CSS_SELECTOR, Locator.XPATH: By.XPATH, }.get(by_value, By.XPATH) return by except ValueError: return By.XPATH class KeyWordDriver: # def __init__(self, browser: str | Browser): # self.driver = self.webdriver_finder(browser) def __init__(self): self.driver: WebDriver | None = None self._url: str | None = None # self.temp_value = None def base_url(self, url: str, *args, **kwargs): logger.info(f"前置URL: {url}") self._url = url # return url def browser(self, browser_name: str | Browser = Browser.CHROME, *args, **kwargs): browser_name = Browser(browser_name.lower().replace(' ', '')) if isinstance(browser_name, str) else browser_name match browser_name: case Browser.CHROME: logger.info(f"启动{Browser.CHROME}浏览器") self.driver = Chrome() # return Chrome() case Browser.FIREFOX: logger.info(f"启动{Browser.FIREFOX}浏览器") self.driver = Firefox() case Browser.IE: logger.info(f"启动{Browser.IE}浏览器") self.driver = Ie() case Browser.SAFARI: logger.info(f"启动{Browser.SAFARI}浏览器") self.driver = Safari() case Browser.EDGE: logger.info(f"启动{Browser.EDGE}浏览器") self.driver = Edge() case _: logger.info(f"启动默认浏览器: {Browser.CHROME}") self.driver = Chrome() # 默认情况 def find_element(self, by: str = By.XPATH, value: Optional[str] = None, *args, **kwargs) -> WebElement: by = by_converter(by) return self.driver.find_element(by, value) def delay(self, timeout: int | float): sleep(timeout) return self def implicit_wait(self, timeout: float, *args, **kwargs) -> None: """ 隐式等待 :param timeout: 超时时间 :param args: :param kwargs: :return: """ self.driver.implicitly_wait(timeout) def explicit_wait(self, method: T, timeout: float = EXPLICIT_WAIT_TIMEOUT, *args, **kwargs): """ 显示等待 :param method: 可调用对象名 :param timeout: 超时时间 :param args: :param kwargs: :return: """ try: if isinstance(method, str): method = custom_ec.get(method, (lambda _: False)) logger.info(f"预期条件: {method.__name__}") return WebDriverWait(self.driver, timeout).until(method) except TypeError as te: logger.error(f"显示等待异常: {te}") # self.driver.quit() raise te def page_load_timeout(self, timeout: float, *args, **kwargs) -> None: self.driver.set_page_load_timeout(timeout) def get(self, url, *args, **kwargs): if self.driver is None: self.browser(*args, **kwargs) if not url.startswith("http"): # 自动添加baseurl url = urljoin(self._url, url) logger.info(f"网址: {url}") print(url) self.driver.get(url) def maximize_window(self): self.driver.maximize_window() def click(self, by, value, *args, **kwargs): by = by_converter(by) mark = (by, value) method = EC.element_to_be_clickable(mark) self.explicit_wait(method).click() # self.find_element(by, value).click() def clear(self, by, value, *args, **kwargs): by = by_converter(by) mark = (by, value) method = EC.visibility_of_element_located(mark) self.explicit_wait(method).clear() # self.driver.find_element(by, value).clear() def input(self, by, value, content: str | None = None, *args, **kwargs): by = by_converter(by) mark = (by, value) method = EC.visibility_of_element_located(mark) self.explicit_wait(method).send_keys(content) # self.find_element(by, value).send_keys(content) def get_text(self, by, value, *args, **kwargs): """ 获取元素文本 :param by: :param value: :param args: :param kwargs: :return: """ by = by_converter(by) mark = (by, value) method = EC.visibility_of_element_located(mark) text = self.explicit_wait(method).text # print(text) return text def get_attribute(self, by, value, attributes: str, *args, **kwargs): """ 获取元素属性值(class,type,value,...) :param by: :param value: :param attributes: :param args: :param kwargs: :return: """ by = by_converter(by) mark = (by, value) method = EC.presence_of_element_located(mark) text = self.explicit_wait(method).get_attribute(attributes) # print(text) return text def enter_iframe(self, by, value, *args, **kwargs): by = by_converter(by) element = self.find_element(by, value) self.driver.switch_to.frame(element) def exit_iframe(self, *args, **kwargs): self.driver.switch_to.default_content() def get_cookie(self, name, *args, **kwargs) -> Optional[dict]: return self.driver.get_cookie(name) def add_cookie(self, cookie_dict, *args, **kwargs) -> None: self.driver.add_cookie(cookie_dict) def screenshot_png(self, by, value, name: str | None = None, *args, **kwargs) -> None: by = by_converter(by) # mark = (by, value) # method = EC.visibility_of_element_located(mark) if name is not None: path = (configs.SCREENSHOT_DIR / f"{name}.png").as_posix() else: # 生成 8 个随机字节,然后转为 16 个字符的十六进制字符串 random_hex = secrets.token_hex(8) # n=8 表示生成 8 字节 path = (configs.SCREENSHOT_DIR / f"{random_hex}.png").as_posix() logger.warning(f"截图存放路径: {path}") # self.explicit_wait(method).screenshot(path) self.find_element(by, value).screenshot(path) def select(self, by, value, text, func_name, *args, **kwargs): # 未完成 by = by_converter(by) mark = (by, value) method = EC.visibility_of_element_located(mark) self.explicit_wait(method) def run_javascript(self, by, value, code: str, *args, **kwargs) -> Any: # 还需打磨 by = by_converter(by) element = self.find_element(by, value) return self.driver.execute_script(code, element) # def assert_text_equals(self, by, value,method, text, msg=None): # by = by_converter(by) # # _text = self.get_text(by, value) # _text = getattr(self,method)(by, value) # # assert _text == text, msg # def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T: # try: # value = method(self.driver) # if value: # return value # except AssertionError as e: # logger.error(e) # # def assert_text_equals(self, method, text, msg=""): # if isinstance(method, str): # method = custom_asserts.get(method, (lambda _: False)) # logger.info(f"{method=}") # # _text = self.until(method) # _text = method(self.driver) # logger.info(_text) # assert _text == text, msg def assert_text_equals(self, by, value, text): by = by_converter(by) _text = self.get_text(by, value) assert _text == text, f"断言“{_text}”与“{text}”相等失败!!" logger.info(f"断言: {_text} == {text}") def assert_text_not_equals(self, by, value, text): by = by_converter(by) _text = self.get_text(by, value) assert _text != text, f"断言“{_text}”与“{text}”不相等失败!!" def assert_text_contains(self, by, value, text, msg=None): by = by_converter(by) _text = self.get_text(by, value) assert _text in text, f"断言“{_text}”包含于“{text}”中失败!!" def assert_text_not_contains(self, by, value, text, msg=None): by = by_converter(by) _text = self.get_text(by, value) assert _text not in text, f"断言“{_text}”不包含于“{text}”中失败!!" if __name__ == '__main__': from commons.settings import configs el = KeyWordDriver() el.base_url(configs.base_url) # el.browser("chrome") el.browser(Browser.EDGE) el.get("/users/login") el.input("", '//*[@id="email"]', configs.username) el.input("", '//*[@id="pass"]', configs.password) el.get_text("", '//*[@id="root"]/div[1]/div/div/form/div[1]/label') el.click("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button') type_value = el.get_attribute("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button', "type") print(type_value) el.assert_text_equals("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button', "登录")