commit 92d06dd9cf1545421c7b50c881d556b304c824a2 Author: CNWei Date: Mon Apr 14 23:05:39 2025 +0800 init: 初始化项目 - 创建了基本的项目结构 - 添加了 .gitignore 文件 - 配置了基本的开发环境 - 添加清华镜像源 - 设置了基础的文件夹和文件(如 commons, utils, POM, pytest.ini) 项目说明: - [项目名称]:Web自动化测试 - [项目描述]:基于pytest,selenium的自动化测试工具 - [开发环境]:Python diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..c8cfe39 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.10 diff --git a/POM/conftest.py b/POM/conftest.py new file mode 100644 index 0000000..14a77b6 --- /dev/null +++ b/POM/conftest.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: conftest +@date: 2025/4/5 09:28 +@desc: +""" +import pytest +from selenium.webdriver import Chrome + + +@pytest.fixture() +def driver(): + _driver = Chrome() + yield _driver + _driver.quit() + + diff --git a/POM/login_page.py b/POM/login_page.py new file mode 100644 index 0000000..50baa62 --- /dev/null +++ b/POM/login_page.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: test_login +@date: 2025/4/4 15:42 +@desc: +""" +from time import sleep + +from selenium.webdriver import Chrome +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webdriver import WebDriver +from commons.modules import Browser + +import pytest + +from commons.driver import KeyWordDriver + + +class LoginPage(KeyWordDriver): + url = "/users/login" + email = '//*[@id="email"]' + email_title = '//*[@id="root"]/div[1]/div/div/form/div[1]/label' + password = '//*[@id="pass"]' + login_submit = '//*[@id="root"]/div[1]/div/div/form/div[3]/button' + + # def __init__(self, browser: Browser): + # super().__init__(browser) + def __init__(self): + super().__init__() + + def login(self, email, password): + self.browser(Browser.EDGE) + self.get(self.url) + self.input(1, self.email, email) + self.input(By.XPATH, self.password, password) + text = self.get_text(By.XPATH, self.email_title) + print(text) + self.click(By.XPATH, self.login_submit) + sleep(10) + + +if __name__ == '__main__': + from commons.settings import configs + + _email = configs.username + _password = configs.password + login = LoginPage() + login.base_url(configs.base_url) + + login.login(_email, _password) diff --git a/POM/test_login.py b/POM/test_login.py new file mode 100644 index 0000000..e5f8aac --- /dev/null +++ b/POM/test_login.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: main +@date: 2025/4/4 17:52 +@desc: +""" +from time import sleep +import pytest +from selenium.webdriver import Chrome +from login_page import LoginPage +from commons.modules import Browser + + +@pytest.mark.parametrize("email, password", [("username", "password"), ("", "")]) +def test_login(driver, email, password): + login = LoginPage() + + login.login(email, password) + # sleep(10) + + +def test_logout_1(login_ok): + login = LoginPage() + login.browser(Browser.CHROME) + login.get("https://www.baidu.com/") + print("logout") + + +# @pytest.mark.usefixtures("login_ok") +def test_logout_2(): + login = LoginPage() + login.browser(Browser.CHROME) + login.get("https://www.baidu.com/") + print("logout") diff --git a/README.md b/README.md new file mode 100644 index 0000000..19f31cd --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# Selenium + +1,配置webdriver的下载路径(环境变量) +```text +# windows +键:SE_CACHE_PATH +值:webdriver的缓存路径(如 D:/WebDriver) +``` +调用浏览器驱动的接口 + +接口文档:https://www.w3.org/TR/webdriver/#interface +> Selenium对浏览器所有的控制, +> - 通过接口调用实现(大部分) +> - 通过js实现(小部分) + +驱动地址: + +https://registry.npmmirror.com/binary.html + +pytest-xlsx +使用: + +excel文件需要以test_开头 \ No newline at end of file diff --git a/commons/assert_functions.py b/commons/assert_functions.py new file mode 100644 index 0000000..1e15d47 --- /dev/null +++ b/commons/assert_functions.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: assert_functions +@date: 2025/4/13 21:06 +@desc: +""" + +import logging +from dataclasses import dataclass +from typing import Dict, Callable, TypeVar, Tuple +from typing import Literal +from typing import Optional +from typing import Union + +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.remote.webdriver import WebElement + +# from commons.driver import KeyWordDriver + +# from selenium.webdriver.support import expected_conditions as EC + +# __all__ = ["EC","custom_ec"] + +logger = logging.getLogger(__name__) + +T = TypeVar("T") +WebDriverOrWebElement = Union[WebDriver, WebElement] + +custom_asserts = { + +} + + +def register(name: str | None = None): + def decorator(func): + if name is not None: + custom_asserts[name] = func + + custom_asserts[func.__name__] = func + return func + + return decorator + + +@register() +def get_text_(driver,*args) -> T: + """ + 无参预期条件函数 + driver + - WebDriver的实例(Ie、Firefox、Chrome或远程)或 一个WebElement + - driver形参不可省略,即使不使用 + :return: + """ + _ = driver + logger.warning(f"{args=}") + list_ = [1, 2, 3, 4, 5, 6] + for item in list_: + logger.info(item) + if item == 5: + # logger.info(item) + return item + + +# @register() +# def get_value(locator: Tuple[str, str]) -> Callable[ +# [KeyWordDriver], Union[Literal[False], WebElement]]: +# """ +# 有参预期条件函数 +# :param locator: +# :return: +# """ +# +# def _predicate(driver: KeyWordDriver): +# try: +# return driver.find_element(*locator) +# except Exception as e: +# return False +# +# return _predicate diff --git a/commons/custom_expected_condition.py b/commons/custom_expected_condition.py new file mode 100644 index 0000000..139399e --- /dev/null +++ b/commons/custom_expected_condition.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: expected_conditional_function +@date: 2025/4/4 14:15 +@desc: +""" +import logging +from dataclasses import dataclass +from typing import Dict, Callable, TypeVar, Tuple +from typing import Literal +from typing import Optional +from typing import Union + +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.remote.webdriver import WebElement + +from selenium.webdriver.support import expected_conditions as EC + +__all__ = ["EC","custom_ec"] + +logger = logging.getLogger(__name__) + +T = TypeVar("T") +WebDriverOrWebElement = Union[WebDriver, WebElement] + + +""" + 常用等待条件(expected_conditions)--来自EC模块 + presence_of_element_located: 元素存在于DOM。 + visibility_of_element_located: 元素可见。 + element_to_be_clickable: 元素可点击。 + title_contains: 页面标题包含特定文本。 + text_to_be_present_in_element: 元素包含特定文本。 +""" +# 自定义预期条件(Custom Expected Condition) +custom_ec = { + +} + + +def register(name: str | None = None): + def decorator(func): + if name is not None: + custom_ec[name] = func + + custom_ec[func.__name__] = func + return func + + return decorator + + +@register() +def examples_no_parameters(driver: WebDriverOrWebElement)->T: + """ + 无参预期条件函数 + driver + - WebDriver的实例(Ie、Firefox、Chrome或远程)或 一个WebElement + - driver形参不可省略,即使不使用 + :return: + """ + _ = driver + list_ = [1, 2, 3, 4, 5, 6] + for item in list_: + logger.info(item) + if item == 5: + return item + +@register() +def examples_have_parameters(locator: Tuple[str, str]) -> Callable[[WebDriverOrWebElement], Union[Literal[False], WebElement]]: + """ + 有参预期条件函数(暂不支持) + :param locator: + :return: + """ + def _predicate(driver: WebDriverOrWebElement): + try: + + return driver.find_element(*locator) + except Exception as e: + + return False + + return _predicate + +def func_3(mark): + ... + + +def func_4(mark): + ... + + +if __name__ == "__main__": + print(custom_ec) diff --git a/commons/driver.py b/commons/driver.py new file mode 100644 index 0000000..faa0324 --- /dev/null +++ b/commons/driver.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: key_driver +@date: 2025/4/2 21:59 +@desc: +""" + +import logging +from time import sleep +import secrets # 原生库,用于生成安全的随机数 +from typing import Optional, Callable, Union, Literal, Any, TypeVar +from urllib.parse import urljoin + +from selenium.webdriver import Chrome, Edge, Firefox, Safari, Ie +from selenium.webdriver.common.by import By +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.support.ui import Select +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.remote.webdriver import WebElement + +from commons.assert_functions import custom_asserts +from commons.custom_expected_condition import EC, custom_ec +from commons.modules import Browser, Locator +from commons.settings import configs, EXPLICIT_WAIT_TIMEOUT, SCREENSHOT_DIR + +logger = logging.getLogger(__name__) + +WebDriverOrWebElement = Union[WebDriver, WebElement] +D = TypeVar("D", bound=Union[WebDriver, WebElement]) +T = TypeVar("T") + + +# 触发器 trigger +# 浏览器 Browser +# 执行器 actuator +# 筛选器 Filters +# 查找器 finder +# 转换器 converter +def webdriver_finder(browser: str | Browser = Browser.CHROME) -> WebDriver: + match browser: + case Browser.CHROME: + return Chrome() + case Browser.FIREFOX: + return Firefox() + case Browser.IE: + return Ie() + case Browser.SAFARI: + return Safari() + case Browser.EDGE: + return Edge() + case _: + return Chrome() # 默认情况 + + +def by_converter(by_value: str | Locator): + try: + # 统一处理输入 + if isinstance(by_value, str): + by = Locator(by_value.lower().replace(' ', '')) + + # 创建对应的浏览器实例 + by = { + Locator.ID: By.ID, + Locator.NAME: By.NAME, + Locator.CLASS: By.CLASS_NAME, + Locator.TAG: By.TAG_NAME, + Locator.LINK_TEXT: By.LINK_TEXT, + Locator.PARTIAL_LINK_TEXT: By.PARTIAL_LINK_TEXT, + Locator.CSS: By.CSS_SELECTOR, + Locator.XPATH: By.XPATH, + }.get(by_value, By.XPATH) + return by + except ValueError: + return By.XPATH + + +class KeyWordDriver: + + # def __init__(self, browser: str | Browser): + # self.driver = self.webdriver_finder(browser) + def __init__(self): + self.driver: WebDriver | None = None + self._url: str | None = None + # self.temp_value = None + + def base_url(self, url: str, *args, **kwargs): + logger.info(f"前置URL: {url}") + self._url = url + # return url + + def browser(self, browser_name: str | Browser = Browser.CHROME, *args, **kwargs): + browser_name = Browser(browser_name.lower().replace(' ', '')) if isinstance(browser_name, str) else browser_name + match browser_name: + case Browser.CHROME: + logger.info(f"启动{Browser.CHROME}浏览器") + self.driver = Chrome() + # return Chrome() + case Browser.FIREFOX: + logger.info(f"启动{Browser.FIREFOX}浏览器") + self.driver = Firefox() + case Browser.IE: + logger.info(f"启动{Browser.IE}浏览器") + self.driver = Ie() + case Browser.SAFARI: + logger.info(f"启动{Browser.SAFARI}浏览器") + self.driver = Safari() + case Browser.EDGE: + logger.info(f"启动{Browser.EDGE}浏览器") + self.driver = Edge() + case _: + logger.info(f"启动默认浏览器: {Browser.CHROME}") + self.driver = Chrome() # 默认情况 + + def find_element(self, by: str = By.XPATH, value: Optional[str] = None, *args, **kwargs) -> WebElement: + by = by_converter(by) + + return self.driver.find_element(by, value) + + def delay(self, timeout: int | float): + sleep(timeout) + return self + + def implicit_wait(self, timeout: float, *args, **kwargs) -> None: + """ + 隐式等待 + :param timeout: 超时时间 + :param args: + :param kwargs: + :return: + """ + self.driver.implicitly_wait(timeout) + + def explicit_wait(self, method: T, timeout: float = EXPLICIT_WAIT_TIMEOUT, *args, **kwargs): + """ + 显示等待 + :param method: 可调用对象名 + :param timeout: 超时时间 + :param args: + :param kwargs: + :return: + """ + + try: + if isinstance(method, str): + method = custom_ec.get(method, (lambda _: False)) + logger.info(f"预期条件: {method.__name__}") + + return WebDriverWait(self.driver, timeout).until(method) + except TypeError as te: + logger.error(f"显示等待异常: {te}") + # self.driver.quit() + raise te + + def page_load_timeout(self, timeout: float, *args, **kwargs) -> None: + self.driver.set_page_load_timeout(timeout) + + def get(self, url, *args, **kwargs): + if self.driver is None: + self.browser(*args, **kwargs) + if not url.startswith("http"): + # 自动添加baseurl + url = urljoin(self._url, url) + logger.info(f"网址: {url}") + print(url) + self.driver.get(url) + + def maximize_window(self): + self.driver.maximize_window() + + def click(self, by, value, *args, **kwargs): + + by = by_converter(by) + mark = (by, value) + method = EC.element_to_be_clickable(mark) + + self.explicit_wait(method).click() + # self.find_element(by, value).click() + + def clear(self, by, value, *args, **kwargs): + + by = by_converter(by) + mark = (by, value) + method = EC.visibility_of_element_located(mark) + + self.explicit_wait(method).clear() + + # self.driver.find_element(by, value).clear() + + def input(self, by, value, content: str | None = None, *args, **kwargs): + + by = by_converter(by) + mark = (by, value) + method = EC.visibility_of_element_located(mark) + + self.explicit_wait(method).send_keys(content) + # self.find_element(by, value).send_keys(content) + + def get_text(self, by, value, *args, **kwargs): + """ + 获取元素文本 + :param by: + :param value: + :param args: + :param kwargs: + :return: + """ + by = by_converter(by) + mark = (by, value) + method = EC.visibility_of_element_located(mark) + + text = self.explicit_wait(method).text + # print(text) + return text + + def get_attribute(self, by, value, attributes: str, *args, **kwargs): + """ + 获取元素属性值(class,type,value,...) + :param by: + :param value: + :param attributes: + :param args: + :param kwargs: + :return: + """ + by = by_converter(by) + mark = (by, value) + method = EC.presence_of_element_located(mark) + + text = self.explicit_wait(method).get_attribute(attributes) + # print(text) + return text + + def enter_iframe(self, by, value, *args, **kwargs): + + by = by_converter(by) + element = self.find_element(by, value) + self.driver.switch_to.frame(element) + + def exit_iframe(self, *args, **kwargs): + self.driver.switch_to.default_content() + + def get_cookie(self, name, *args, **kwargs) -> Optional[dict]: + return self.driver.get_cookie(name) + + def add_cookie(self, cookie_dict, *args, **kwargs) -> None: + self.driver.add_cookie(cookie_dict) + + def screenshot_png(self, by, value, name: str | None = None, *args, **kwargs) -> None: + by = by_converter(by) + # mark = (by, value) + # method = EC.visibility_of_element_located(mark) + + if name is not None: + path = (configs.SCREENSHOT_DIR / f"{name}.png").as_posix() + else: + # 生成 8 个随机字节,然后转为 16 个字符的十六进制字符串 + random_hex = secrets.token_hex(8) # n=8 表示生成 8 字节 + path = (configs.SCREENSHOT_DIR / f"{random_hex}.png").as_posix() + + logger.warning(f"截图存放路径: {path}") + # self.explicit_wait(method).screenshot(path) + self.find_element(by, value).screenshot(path) + + def select(self, by, value, text, func_name, *args, **kwargs): + # 未完成 + by = by_converter(by) + mark = (by, value) + + method = EC.visibility_of_element_located(mark) + self.explicit_wait(method) + + def run_javascript(self, by, value, code: str, *args, **kwargs) -> Any: + # 还需打磨 + by = by_converter(by) + element = self.find_element(by, value) + return self.driver.execute_script(code, element) + + # def assert_text_equals(self, by, value,method, text, msg=None): + # by = by_converter(by) + # # _text = self.get_text(by, value) + # _text = getattr(self,method)(by, value) + # + # assert _text == text, msg + # def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T: + # try: + # value = method(self.driver) + # if value: + # return value + # except AssertionError as e: + # logger.error(e) + # + # def assert_text_equals(self, method, text, msg=""): + # if isinstance(method, str): + # method = custom_asserts.get(method, (lambda _: False)) + # logger.info(f"{method=}") + # # _text = self.until(method) + # _text = method(self.driver) + # logger.info(_text) + # assert _text == text, msg + def assert_text_equals(self, by, value, text): + by = by_converter(by) + _text = self.get_text(by, value) + assert _text == text, f"断言“{_text}”与“{text}”相等失败!!" + logger.info(f"断言: {_text} == {text}") + + def assert_text_not_equals(self, by, value, text): + by = by_converter(by) + _text = self.get_text(by, value) + assert _text != text, f"断言“{_text}”与“{text}”不相等失败!!" + + def assert_text_contains(self, by, value, text, msg=None): + by = by_converter(by) + _text = self.get_text(by, value) + assert _text in text, f"断言“{_text}”包含于“{text}”中失败!!" + + def assert_text_not_contains(self, by, value, text, msg=None): + by = by_converter(by) + _text = self.get_text(by, value) + assert _text not in text, f"断言“{_text}”不包含于“{text}”中失败!!" + + +if __name__ == '__main__': + from commons.settings import configs + el = KeyWordDriver() + el.base_url(configs.base_url) + # el.browser("chrome") + el.browser(Browser.EDGE) + el.get("/users/login") + el.input("", '//*[@id="email"]', configs.username) + el.input("", '//*[@id="pass"]', configs.password) + el.get_text("", '//*[@id="root"]/div[1]/div/div/form/div[1]/label') + el.click("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button') + type_value = el.get_attribute("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button', "type") + print(type_value) + el.assert_text_equals("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button', "登录") + diff --git a/commons/funcs.py b/commons/funcs.py new file mode 100644 index 0000000..7f3d2cc --- /dev/null +++ b/commons/funcs.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: chen wei +@Software: PyCharm +@contact: t6i888@163.com +@file: funcs.py +@date: 2024 2024/9/22 22:46 +@desc: +""" +import base64 +import logging +import time +import urllib.parse +import hashlib + +# from commons.databases import db +# +from utils.file_processors.file_handle import FileHandle +from commons import settings + +logger = logging.getLogger(__name__) + + +class Funcs: + FUNC_MAPPING = { + "int": int, + "float": float, + "bool": bool + } # 内置函数有的,直接放入mapping;内置函数没有的,在funcs中定义,自动放入mapping + + @classmethod + def register(cls, name: str | None): + def decorator(func): + if name is None: + cls.FUNC_MAPPING[func.__name__] = func + cls.FUNC_MAPPING[name] = func + return func + + return decorator + + +@Funcs.register("url_unquote") +def url_unquote(s: str) -> str: + return urllib.parse.unquote(s) + + +@Funcs.register("str") +def to_string(s) -> str: + # 将数据转换为str类型。 + return f"'{s}'" + + +@Funcs.register("time_str") +def time_str() -> str: + return str(time.time()) + + +@Funcs.register("add") +def add(a, b): + return str(int(a) + int(b)) + + +# @Funcs.register("sql") +# def sql(s: str) -> str: +# res = db.execute_sql(s) +# +# return res[0][0] + + +@Funcs.register("new_id") +def new_id(): + # 自增,永不重复 + id_file = FileHandle(settings.ID_PATH) + id_file["id"] += 1 + id_file.save() + + return id_file["id"] + + +# @Funcs.register("last_id") +# def last_id() -> str: +# # 不自增,只返回结果 +# +# id_file = FileHandle(settings.id_path) +# return id_file["id"] + + +@Funcs.register("md5") +def md5(content: str) -> str: + # 1,原文转为字节 + content = content.encode("utf-8") + result = hashlib.md5(content).hexdigest() + return result + + +@Funcs.register("base64_encode") +def base64_encode(content: str) -> str: + # 1,原文转二进制 + content = content.encode("utf-8") + # 2,base64编码(二进制) + encode_value = base64.b64encode(content) + # 3,转为字符串 + encode_str = encode_value.decode("utf-8") + + return encode_str + + +@Funcs.register("base64_decode") +def base64_decode(content: str) -> str: + # 1,原文转二进制 + content = content.encode("utf-8") + # 2,base64解码(二进制) + decode_value = base64.b64decode(content) + # 3,转为字符串 + decode_str = decode_value.decode("utf-8") + + return decode_str + + +@Funcs.register("rsa_encode") +def rsa_encode(content: str) -> str: + ... + + +@Funcs.register("rsa_decode") +def rsa_decode(content: str) -> str: + ... + + +if __name__ == '__main__': + # res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82") + # print(res) + # print(f"计数器:{new_id()}") + # print(f"当前数值:{last_id()}") + print(Funcs().FUNC_MAPPING) diff --git a/commons/modules.py b/commons/modules.py new file mode 100644 index 0000000..62b215d --- /dev/null +++ b/commons/modules.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: modules +@date: 2025/4/5 20:29 +@desc: +""" + +from enum import Enum + + +class Browser(str, Enum): + CHROME = "chrome" + FIREFOX = "firefox" + IE = "ie" + SAFARI = "safari" + EDGE = "edge" + + +class Locator(str, Enum): + ID = "id", + NAME = "name", + CLASS = "class", + TAG = "tag", + LINK_TEXT = "link_text", + PARTIAL_LINK_TEXT = "partial_link_text", + CSS = "css", + XPATH = "xpath" + + +if __name__ == '__main__': + print(Browser.CHROME) + print(type(Browser.CHROME)) + print(Browser("chR ome ".lower().replace(' ', ''))) + match "chrome": + case Browser.CHROME: + print("我能执行!!!") diff --git a/commons/settings.py b/commons/settings.py new file mode 100644 index 0000000..c27f4b3 --- /dev/null +++ b/commons/settings.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: settings +@date: 2025/2/23 21:34 +@desc: +""" + +import logging +from pathlib import Path +from typing import Any + +import tomlkit + +ROOT_PATH = (Path(__file__)).resolve().parents[1] # 获取根路径(绝对路径) + +EXPLICIT_WAIT_TIMEOUT = 10 + +DATA_ROOT = ROOT_PATH / "data" + +if not DATA_ROOT.exists(): + DATA_ROOT.mkdir(parents=True, exist_ok=True) + +# 配置文件路 +CONF_PATH = Path(DATA_ROOT, "settings.toml") + +# 截图保存路径 +SCREENSHOT_DIR = Path(ROOT_PATH, "screenshot") + +if not SCREENSHOT_DIR.exists(): + SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True) + +# 测试案例路径 +CASES_DIR = Path(ROOT_PATH, "tests") + +# 变量替换 +EXCHANGER = Path(ROOT_PATH, "extract.toml") + +# 自增ID +ID_PATH = Path(ROOT_PATH, "id.toml") + +# 默认配置 +DEFAULT_CONF = { + "SCREENSHOT_DIR": SCREENSHOT_DIR, + "CASES_DIR": CASES_DIR, + "EXCHANGER": EXCHANGER, + "ID_PATH": ID_PATH, +} + + +class Settings: + """ + 配置管理类 (单例模式 Singleton Pattern)。 + + 优先从项目根目录下的 'settings.toml' 文件加载配置。 + 如果 'settings.toml' 文件不存在、无法解析或缺少某个配置项, + 则使用此类中定义的默认值。 + + 通过创建类的唯一实例 `settings` 来全局访问配置。 + """ + _instance = None + + # --- 单例实现 --- + def __new__(cls, *args, **kwargs): + """ + 实现单例模式。确保全局只有一个 Settings 实例。 + """ + if not cls._instance: + cls._instance = super().__new__(cls, *args, **kwargs) + # 初始化只进行一次 + cls._instance._initialized = False + return cls._instance + + def __init__(self): + """ + 初始化配置。仅在第一次创建实例时执行。 + """ + # 防止重复初始化 (单例模式下可能会被多次调用 __init__) + if self._initialized: + return + self._initialized = True + + logging.info("开始初始化配置...") + self._init_config() + self._load_config() + + def _init_config(self): + """初始化配置文件""" + if not CONF_PATH.exists(): + CONF_PATH.parent.mkdir(parents=True, exist_ok=True) + CONF_PATH.touch() + # self._save_config(DEFAULT_CONF) + return self + + def _load_config(self): + with open(CONF_PATH, 'r', encoding='utf-8') as f: + result = tomlkit.parse(f.read()) + logging.debug(f"加载 settings.toml 文件 ===> {result}") + + new_conf = DEFAULT_CONF | result + for key, value in new_conf.items(): + + self.__setattr__(key, value) + return self + + def _save_config(self, data: dict[str, Any]): + """保存配置到文件""" + for key, value in data.items(): + if isinstance(value, Path): + data[key] = value.as_posix() + with open(CONF_PATH, 'w', encoding='utf-8') as f: + tomlkit.dump(data=data, fp=f, sort_keys=False) + return self + + def __setattr__(self, key, value): + self.__dict__[key] = value + + def items(self): + conf: dict = {} + for key, value in self.__dict__.items(): + if not key.startswith('_'): + conf[key] = value + # return self.__dict__.items() + return conf + + +configs = Settings() + +# --- 用于直接运行此文件进行测试 --- +if __name__ == '__main__': + ... + print(configs.items()) + diff --git a/commons/templates.py b/commons/templates.py new file mode 100644 index 0000000..bf6fa67 --- /dev/null +++ b/commons/templates.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: chen wei +@Software: PyCharm +@contact: t6i888@163.com +@file: templates.py +@date: 2024 2024/9/22 22:20 +@desc: +""" +import copy +import logging +import re +import string +from commons.funcs import Funcs + + +logger = logging.getLogger(__name__) + + +class Template(string.Template): + """ + 1,支持函数调用 + 2,参数也可以是变量 + """ + + call_pattern = re.compile(r"\${(?P.*?)\((?P.*?)\)}") + + def render(self, mapping: dict) -> str: + s = self.safe_substitute(mapping) # 原有方法替换变量 + s = self.safe_substitute_funcs(s, mapping) + + return s + + def safe_substitute_funcs(self, template, mapping) -> str: + """ + 解析字符串中的函数名和参数,并将函数调用结果进行替换 + :param template: 字符串 + :param mapping: 上下文,提供要使用的函数和变量 + :return: 替换后的结果 + """ + mapping = copy.deepcopy(mapping) + logger.info(f"mapping更新前: {mapping}") + # mapping.update(self.FUNC_MAPPING) # 合并两个mapping + mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping + logger.info(f"mapping更新后: {mapping}") + def convert(mo): + func_name = mo.group("func_name") + func_args = mo.group("func_args").split(",") + func = mapping.get(func_name) # 读取指定函数 + func_args_value = [mapping.get(arg, arg) for arg in func_args] + + if func_args_value == [""]: # 处理没有参数的func + func_args_value = [] + + if not callable(func): + return mo.group() # 如果是不可调用的假函数,不进行替换 + else: + return str(func(*func_args_value)) # 否则用函数结果进行替换 + + return self.call_pattern.sub(convert, template) + diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..e06b280 --- /dev/null +++ b/conftest.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: conftest +@date: 2025/4/2 21:57 +@desc: +""" +import json +import logging +from itertools import chain +import inspect + +import allure +from pytest_xlsx.file import XlsxItem +import pytest +from selenium.webdriver import Chrome + +from POM.login_page import LoginPage +from commons.driver import KeyWordDriver +from utils.file_processors.file_handle import FileHandle +from commons.templates import Template +from commons.settings import configs + +logger = logging.getLogger(__name__) + + +# @pytest.fixture() +def login_ok(): + username = configs.username + password = configs.password + login = LoginPage() + logger.debug("fixture-login_ok 我被执行了!!!") + login.login(username, password) + yield login.driver + login.driver.quit() + + +file = FileHandle(configs.variable) + + +def use_cases_converter(xlsx_item: dict) -> list: + # 变量替换 + logger.info(f"变量替换前: {xlsx_item}") + step_str = FileHandle.to_string(xlsx_item) + xlsx_item = FileHandle.to_dict(Template(step_str).render(file)) + logger.info(f"变量替换: {xlsx_item}") + + new_values = [] + value = list(xlsx_item.values()) + for item in value: + if not isinstance(item, list): + new_values.append(item) + else: + new_values.extend(item) + + logger.info(f"转换后的测试用例: <===={new_values}") + return new_values + + +kwd = KeyWordDriver() + + +# hook(钩子) +def pytest_xlsx_run_step(item: XlsxItem): + step = item.current_step + logger.info(f"转换前的测试用例: ====>{step}") + step = use_cases_converter(step) + # print(step) + + content = slice(1, 2) + key = slice(2, 3) + values = slice(3, None) + + remark = step[content].pop() + keyword = step[key].pop() + _arges = [item for item in step[values] if item is not None] + + logger.info(f"标记: {keyword} -- 参数: {_arges}") + try: + func = getattr(kwd, keyword) + logger.info(f"当前执行的操作:{func.__name__}") + func(*_arges) + except AttributeError as e: + logger.error(f'标记 {keyword} 错误: {e}') + raise e + + if remark: + # 截图 + png = kwd.driver.get_screenshot_as_png() # 保留为二进制格式 + # 将截图报错附加到allure + allure.attach(png, remark, allure.attachment_type.PNG) + + +if __name__ == '__main__': + print(bool([1, 2, 3])) diff --git a/main.py b/main.py new file mode 100644 index 0000000..66156e9 --- /dev/null +++ b/main.py @@ -0,0 +1,22 @@ +import os +import shutil +import datetime +from pathlib import Path + +import pytest + +from commons.settings import configs + +# 指定测试目录 +CASES_DIR = configs.CASES_DIR + +if __name__ == '__main__': + now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + # 1,启动框架(生成临时文件) + # -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录:指定pytest.ini路径;--alluredir=temp。指定数据生成目录 + pytest.main([str(CASES_DIR), "-x", "-v", "--alluredir=temp"]) + # 2,生成HTML报告 + os.system('allure generate temp -o report --clean') # java程序只能借助操作系统执行 + + # 3,备份日志 + # shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ea15532 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[project] +name = "webautotest" +version = "0.1.0" +description = "Web自动化测试框架" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ +] + +[dependency-groups] +dev = [ + "pydantic-settings>=2.8.1", + "pytest>=8.3.5", + "pytest-xlsx>=0.5.1", + "selenium>=4.30.0", + "tomlkit>=0.13.2", +] + +[[tool.uv.index]] +url = "https://mirrors.cloud.tencent.com/pypi/simple" +default = true diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..f49930f --- /dev/null +++ b/pytest.ini @@ -0,0 +1,10 @@ +[pytest] +addopts = -q --show-capture=no + + +log_file = logs/pytest.log +log_file_level = info +log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s +log_file_date_format = %m/%d/%Y %H:%M:%S %p + +disable_test_id_escaping_and_forfeit_all_rights_to_community_support = true \ No newline at end of file diff --git a/tests/test_login.xlsx b/tests/test_login.xlsx new file mode 100644 index 0000000..3b9772d Binary files /dev/null and b/tests/test_login.xlsx differ diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..7aa3b32 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: __init__.py +@date: 2025/4/8 21:44 +@desc: +""" diff --git a/utils/file_processors/__init__.py b/utils/file_processors/__init__.py new file mode 100644 index 0000000..b525101 --- /dev/null +++ b/utils/file_processors/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: __init__.py +@date: 2025/3/4 17:23 +@desc: +""" diff --git a/utils/file_processors/base.py b/utils/file_processors/base.py new file mode 100644 index 0000000..28e9583 --- /dev/null +++ b/utils/file_processors/base.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: base +@date: 2025/3/4 17:23 +@desc: +""" +import abc + + +class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类 + """ + 文件处理器的抽象基类。 + 定义了所有子类必须实现的方法。 + """ + + @abc.abstractmethod + def load(self): + """加载.""" + pass + + @staticmethod + @abc.abstractmethod + def to_string(data: dict) -> str: + """将文件内容转换为字符串。""" + pass + + @staticmethod + @abc.abstractmethod + def to_dict(data: str) -> dict: + """将文件内容转换为字典。""" + pass + + @abc.abstractmethod + def save(self, new_filepath=None): + """将数据保存.""" + pass diff --git a/utils/file_processors/file_handle.py b/utils/file_processors/file_handle.py new file mode 100644 index 0000000..f74eee9 --- /dev/null +++ b/utils/file_processors/file_handle.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: file_handle +@date: 2025/3/7 09:31 +@desc: +""" + +from utils.file_processors.toml_processor import TomlProcessor +from utils.file_processors.yaml_processor import YamlProcessor +from utils.file_processors.json_processor import JsonProcessor +from commons.settings import configs +processors = { + 'toml': TomlProcessor,# 暂不支持 + 'yaml': YamlProcessor, + 'yml': YamlProcessor, + 'json': JsonProcessor, + +} + + +def get_processor(ext): + agent_model = processors.get(ext, YamlProcessor) # 代理模式 + + return agent_model # 默认回退到 Yaml + + +FileHandle = get_processor("yaml") + +if __name__ == '__main__': + # 示例用法 + yaml_file = FileHandle(configs.variable) + print(yaml_file) + print(type(yaml_file)) + file_string = FileHandle.to_string(yaml_file) + print(file_string) + file_dict = FileHandle.to_dict(file_string) + print(file_dict) diff --git a/utils/file_processors/json_processor.py b/utils/file_processors/json_processor.py new file mode 100644 index 0000000..b307b35 --- /dev/null +++ b/utils/file_processors/json_processor.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: yaml_processor +@date: 2025/3/4 17:28 +@desc: +""" +import logging +from typing import Union +from pathlib import Path +import json +from utils.file_processors.base import BaseFileProcessor + +logger = logging.getLogger(__name__) + + +class JsonProcessor(BaseFileProcessor, dict): + """ + 用于处理 YAML 文件的类,继承自 dict。 + 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, + 并可以直接像字典一样访问 YAML 数据。 + """ + + def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): + """ + 初始化 YamlFile 对象。 + + Args: + filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). + data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 + 如果不提供,则尝试从 filepath 加载数据。 + """ + super().__init__() # 初始化父类 dict + self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 + if data is not None: + self.update(data) # 如果提供了初始数据,则更新字典 + else: + self.load() # 否则,尝试从文件加载 + + def load(self) -> None: + """ + 从 YAML 文件加载数据并更新字典。 + 如果文件不存在或加载失败,则清空字典并记录警告/错误。 + """ + self.clear() # 清空现有数据 + if self.filepath.exists(): + try: + with open(self.filepath, "r", encoding="utf-8") as f: + loaded_data = json.load(f) or {} + self.update(loaded_data) # 使用加载的数据更新字典 + except json.JSONDecodeError as e: + logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") + # 保持字典为空 (已在开头 clear) + else: + logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") + # 保持字典为空 (已在开头 clear) + + @staticmethod + def to_string(data: dict) -> str: + """ + 将字典 (自身) 转换为 YAML 格式的字符串。 + + Returns: + YAML 格式的字符串。 + """ + try: + return json.dumps( + dict(data), # 使用dict转换为标准的字典 + ensure_ascii=False, # 允许非ASCII字符 + # indent=4, # 美化输出,缩进4个空格 + sort_keys=False # 不排序键 + ) + except TypeError as e: + logger.error(f"将数据转换为 JSON 字符串时出错: {e}") + return "" + + @staticmethod + def to_dict(data: str) -> None: + """ + 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. + + Args: + data: YAML 格式的字符串。 + """ + try: + loaded_data = json.loads(data) or {} + return loaded_data + except json.JSONDecodeError as e: + logger.error(f"将 JSON 字符串转换为字典时出错: {e}") + + def save(self, new_filepath: Union[str, Path, None] = None): + """ + 将字典数据 (自身) 保存到 YAML 文件。 + + Args: + new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。 + """ + filepath = Path(new_filepath) if new_filepath else self.filepath + + try: + with open(filepath, "w", encoding="utf-8") as f: + json.dump( + dict(self), # 使用dict转换为标准的字典 + f, + ensure_ascii=False, # 允许非ASCII字符 + indent=4, # 美化输出,缩进4个空格 + sort_keys=False # 不排序键 + ) + except (TypeError, OSError) as e: + logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}") + + +if __name__ == '__main__': + # 示例用法 + json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径 + json_file = JsonProcessor(json_path) + print(json_file) + print(type(json_file)) + json_string = JsonProcessor.to_string(json_file) + JsonProcessor.to_dict(json_string) + print(json_string) + json_file.save() diff --git a/utils/file_processors/toml_processor.py b/utils/file_processors/toml_processor.py new file mode 100644 index 0000000..35b8403 --- /dev/null +++ b/utils/file_processors/toml_processor.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: file_processing +@date: 2025/4/8 21:22 +@desc: +""" +from pathlib import Path +from tomlkit import parse, dumps +from tomlkit.toml_file import TOMLFile + +import logging +from typing import Union +from dataclasses import dataclass, asdict, field +from pathlib import Path +import tomlkit +from utils.file_processors.base import BaseFileProcessor + +logger = logging.getLogger(__name__) + + +class TomlProcessor(BaseFileProcessor, dict): + """ + 用于处理 YAML 文件的类,继承自 dict。 + 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, + 并可以直接像字典一样访问 YAML 数据。 + """ + + def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): + """ + 初始化 YamlFile 对象。 + + Args: + filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). + data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 + 如果不提供,则尝试从 filepath 加载数据。 + """ + super().__init__() # 初始化父类 dict + self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 + if data is not None: + self.update(data) # 如果提供了初始数据,则更新字典 + else: + self.load() # 否则,尝试从文件加载 + + def load(self) -> None: + + self.clear() # 清空现有数据 + if self.filepath.exists(): + try: + with open(self.filepath, 'r', encoding='utf-8') as f: + result = tomlkit.parse(f.read()) or {} + # print(result) + self.update(result) + except Exception as e: + logger.error(f"加载 TOML 文件 {self.filepath} 时出错: {e}") + else: + logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") + @staticmethod + def to_string(data: dict) -> str: + try: + return tomlkit.dumps( + dict(data), # 使用dict转换为标准的字典 + sort_keys=False + ) + except TypeError as e: + logger.error(f"将数据转换为 TOML 字符串时出错: {e}") + return "" + + @staticmethod + def to_dict(data: str) -> Union[None, dict]: + try: + loaded_data = tomlkit.loads(data) or {} + return loaded_data + except Exception as e: + logger.error(f"将 TOML 字符串转换为字典时出错: {e}") + + def save(self, new_filepath: Union[str, Path, None] = None): + + filepath = Path(new_filepath) if new_filepath else self.filepath + + try: + with open(filepath, "w", encoding="utf-8") as f: + tomlkit.dump( + dict(self), # 使用dict转换为标准的字典 + fp=f, + sort_keys=False, + ) + except (TypeError, OSError) as e: + logger.error(f"保存 TOML 文件 {filepath} 时出错: {e}") + + + +if __name__ == '__main__': + ... + toml_path =r'E:\PyP\WebAutoTest\data\var.toml' + toml_file = TomlProcessor(toml_path) + print(toml_file) + print(type(toml_file)) + print(toml_file.to_string(toml_file)) + print(toml_file.to_dict(toml_file.to_string(toml_file))) + print(toml_file.to_dict(toml_file.to_string(toml_file))) + toml_file.to_dict(toml_file.to_string(toml_file)) + # toml_file.save() + + print(toml_file.to_string( + {'用例ID': None, '内容': '打开浏览器', '标记': 'browser', '参数': 'edge', '_BlankField': [None, None]})) \ No newline at end of file diff --git a/utils/file_processors/yaml_processor.py b/utils/file_processors/yaml_processor.py new file mode 100644 index 0000000..b8aca75 --- /dev/null +++ b/utils/file_processors/yaml_processor.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +# coding=utf-8 + +""" +@author: CNWei +@Software: PyCharm +@contact: t6i888@163.com +@file: yaml_processor +@date: 2025/3/4 17:28 +@desc: +""" +import logging +from typing import Union +from dataclasses import dataclass, asdict, field +from pathlib import Path +import yaml +from utils.file_processors.base import BaseFileProcessor + +logger = logging.getLogger(__name__) + + +class YamlProcessor(BaseFileProcessor, dict): + """ + 用于处理 YAML 文件的类,继承自 dict。 + 提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能, + 并可以直接像字典一样访问 YAML 数据。 + """ + + def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None): + """ + 初始化 YamlFile 对象。 + + Args: + filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象). + data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。 + 如果不提供,则尝试从 filepath 加载数据。 + """ + super().__init__() # 初始化父类 dict + self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象 + if data is not None: + self.update(data) # 如果提供了初始数据,则更新字典 + else: + self.load() # 否则,尝试从文件加载 + + def load(self) -> None: + """ + 从 YAML 文件加载数据并更新字典。 + 如果文件不存在或加载失败,则清空字典并记录警告/错误。 + """ + self.clear() # 清空现有数据 + if self.filepath.exists(): + try: + with open(self.filepath, "r", encoding="utf-8") as f: + loaded_data = yaml.safe_load(f) or {} + self.update(loaded_data) # 使用加载的数据更新字典 + except yaml.YAMLError as e: + logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}") + # 保持字典为空 (已在开头 clear) + else: + logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.") + # 保持字典为空 (已在开头 clear) + + @staticmethod + def to_string(data: dict) -> str: + """ + 将字典 (自身) 转换为 YAML 格式的字符串。 + + Returns: + YAML 格式的字符串。 + """ + try: + return yaml.safe_dump( + dict(data), # 使用dict转换为标准的字典 + allow_unicode=True, + sort_keys=False, + default_flow_style=False + ) + except TypeError as e: + logger.error(f"将数据转换为 YAML 字符串时出错: {e}") + return "" + + @staticmethod + def to_dict(data: str) -> Union[None, dict]: + """ + 将 YAML 格式的字符串转换为字典,并更新当前字典的内容. + + Args: + data: YAML 格式的字符串。 + """ + try: + loaded_data = yaml.safe_load(data) or {} + return loaded_data + except yaml.YAMLError as e: + logger.error(f"将 YAML 字符串转换为字典时出错: {e}") + + def save(self, new_filepath: Union[str, Path, None] = None): + """ + 将字典数据 (自身) 保存到 YAML 文件。 + + Args: + new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。 + """ + filepath = Path(new_filepath) if new_filepath else self.filepath + + try: + with open(filepath, "w", encoding="utf-8") as f: + yaml.safe_dump( + dict(self), # 使用dict转换为标准的字典 + stream=f, + allow_unicode=True, + sort_keys=False, + default_flow_style=False + ) + except (TypeError, OSError) as e: + logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}") + + + + + +if __name__ == '__main__': + # 示例用法 + yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径 + yaml_file = YamlProcessor(yaml_path) + print(yaml_file) + print(type(yaml_file)) +