init: 初始化项目
- 创建了基本的项目结构 - 添加了 .gitignore 文件 - 配置了基本的开发环境 - 添加清华镜像源 - 设置了基础的文件夹和文件(如 commons, utils, POM, pytest.ini) 项目说明: - [项目名称]:Web自动化测试 - [项目描述]:基于pytest,selenium的自动化测试工具 - [开发环境]:Python
This commit is contained in:
1
.python-version
Normal file
1
.python-version
Normal file
@@ -0,0 +1 @@
|
||||
3.10
|
||||
22
POM/conftest.py
Normal file
22
POM/conftest.py
Normal file
@@ -0,0 +1,22 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: conftest
|
||||
@date: 2025/4/5 09:28
|
||||
@desc:
|
||||
"""
|
||||
import pytest
|
||||
from selenium.webdriver import Chrome
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def driver():
|
||||
_driver = Chrome()
|
||||
yield _driver
|
||||
_driver.quit()
|
||||
|
||||
|
||||
55
POM/login_page.py
Normal file
55
POM/login_page.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: test_login
|
||||
@date: 2025/4/4 15:42
|
||||
@desc:
|
||||
"""
|
||||
from time import sleep
|
||||
|
||||
from selenium.webdriver import Chrome
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from commons.modules import Browser
|
||||
|
||||
import pytest
|
||||
|
||||
from commons.driver import KeyWordDriver
|
||||
|
||||
|
||||
class LoginPage(KeyWordDriver):
|
||||
url = "/users/login"
|
||||
email = '//*[@id="email"]'
|
||||
email_title = '//*[@id="root"]/div[1]/div/div/form/div[1]/label'
|
||||
password = '//*[@id="pass"]'
|
||||
login_submit = '//*[@id="root"]/div[1]/div/div/form/div[3]/button'
|
||||
|
||||
# def __init__(self, browser: Browser):
|
||||
# super().__init__(browser)
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def login(self, email, password):
|
||||
self.browser(Browser.EDGE)
|
||||
self.get(self.url)
|
||||
self.input(1, self.email, email)
|
||||
self.input(By.XPATH, self.password, password)
|
||||
text = self.get_text(By.XPATH, self.email_title)
|
||||
print(text)
|
||||
self.click(By.XPATH, self.login_submit)
|
||||
sleep(10)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from commons.settings import configs
|
||||
|
||||
_email = configs.username
|
||||
_password = configs.password
|
||||
login = LoginPage()
|
||||
login.base_url(configs.base_url)
|
||||
|
||||
login.login(_email, _password)
|
||||
39
POM/test_login.py
Normal file
39
POM/test_login.py
Normal file
@@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: main
|
||||
@date: 2025/4/4 17:52
|
||||
@desc:
|
||||
"""
|
||||
from time import sleep
|
||||
import pytest
|
||||
from selenium.webdriver import Chrome
|
||||
from login_page import LoginPage
|
||||
from commons.modules import Browser
|
||||
|
||||
|
||||
@pytest.mark.parametrize("email, password", [("username", "password"), ("<EMAIL>", "<PASSWORD>")])
|
||||
def test_login(driver, email, password):
|
||||
login = LoginPage()
|
||||
|
||||
login.login(email, password)
|
||||
# sleep(10)
|
||||
|
||||
|
||||
def test_logout_1(login_ok):
|
||||
login = LoginPage()
|
||||
login.browser(Browser.CHROME)
|
||||
login.get("https://www.baidu.com/")
|
||||
print("logout")
|
||||
|
||||
|
||||
# @pytest.mark.usefixtures("login_ok")
|
||||
def test_logout_2():
|
||||
login = LoginPage()
|
||||
login.browser(Browser.CHROME)
|
||||
login.get("https://www.baidu.com/")
|
||||
print("logout")
|
||||
23
README.md
Normal file
23
README.md
Normal file
@@ -0,0 +1,23 @@
|
||||
# Selenium
|
||||
|
||||
1,配置webdriver的下载路径(环境变量)
|
||||
```text
|
||||
# windows
|
||||
键:SE_CACHE_PATH
|
||||
值:webdriver的缓存路径(如 D:/WebDriver)
|
||||
```
|
||||
调用浏览器驱动的接口
|
||||
|
||||
接口文档:https://www.w3.org/TR/webdriver/#interface
|
||||
> Selenium对浏览器所有的控制,
|
||||
> - 通过接口调用实现(大部分)
|
||||
> - 通过js实现(小部分)
|
||||
|
||||
驱动地址:
|
||||
|
||||
https://registry.npmmirror.com/binary.html
|
||||
|
||||
pytest-xlsx
|
||||
使用:
|
||||
|
||||
excel文件需要以test_开头
|
||||
84
commons/assert_functions.py
Normal file
84
commons/assert_functions.py
Normal file
@@ -0,0 +1,84 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: assert_functions
|
||||
@date: 2025/4/13 21:06
|
||||
@desc:
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Callable, TypeVar, Tuple
|
||||
from typing import Literal
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.remote.webdriver import WebElement
|
||||
|
||||
# from commons.driver import KeyWordDriver
|
||||
|
||||
# from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
# __all__ = ["EC","custom_ec"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T")
|
||||
WebDriverOrWebElement = Union[WebDriver, WebElement]
|
||||
|
||||
custom_asserts = {
|
||||
|
||||
}
|
||||
|
||||
|
||||
def register(name: str | None = None):
|
||||
def decorator(func):
|
||||
if name is not None:
|
||||
custom_asserts[name] = func
|
||||
|
||||
custom_asserts[func.__name__] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@register()
|
||||
def get_text_(driver,*args) -> T:
|
||||
"""
|
||||
无参预期条件函数
|
||||
driver
|
||||
- WebDriver的实例(Ie、Firefox、Chrome或远程)或 一个WebElement
|
||||
- driver形参不可省略,即使不使用
|
||||
:return:
|
||||
"""
|
||||
_ = driver
|
||||
logger.warning(f"{args=}")
|
||||
list_ = [1, 2, 3, 4, 5, 6]
|
||||
for item in list_:
|
||||
logger.info(item)
|
||||
if item == 5:
|
||||
# logger.info(item)
|
||||
return item
|
||||
|
||||
|
||||
# @register()
|
||||
# def get_value(locator: Tuple[str, str]) -> Callable[
|
||||
# [KeyWordDriver], Union[Literal[False], WebElement]]:
|
||||
# """
|
||||
# 有参预期条件函数
|
||||
# :param locator:
|
||||
# :return:
|
||||
# """
|
||||
#
|
||||
# def _predicate(driver: KeyWordDriver):
|
||||
# try:
|
||||
# return driver.find_element(*locator)
|
||||
# except Exception as e:
|
||||
# return False
|
||||
#
|
||||
# return _predicate
|
||||
99
commons/custom_expected_condition.py
Normal file
99
commons/custom_expected_condition.py
Normal file
@@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: expected_conditional_function
|
||||
@date: 2025/4/4 14:15
|
||||
@desc:
|
||||
"""
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Callable, TypeVar, Tuple
|
||||
from typing import Literal
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.remote.webdriver import WebElement
|
||||
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
__all__ = ["EC","custom_ec"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T")
|
||||
WebDriverOrWebElement = Union[WebDriver, WebElement]
|
||||
|
||||
|
||||
"""
|
||||
常用等待条件(expected_conditions)--来自EC模块
|
||||
presence_of_element_located: 元素存在于DOM。
|
||||
visibility_of_element_located: 元素可见。
|
||||
element_to_be_clickable: 元素可点击。
|
||||
title_contains: 页面标题包含特定文本。
|
||||
text_to_be_present_in_element: 元素包含特定文本。
|
||||
"""
|
||||
# 自定义预期条件(Custom Expected Condition)
|
||||
custom_ec = {
|
||||
|
||||
}
|
||||
|
||||
|
||||
def register(name: str | None = None):
|
||||
def decorator(func):
|
||||
if name is not None:
|
||||
custom_ec[name] = func
|
||||
|
||||
custom_ec[func.__name__] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@register()
|
||||
def examples_no_parameters(driver: WebDriverOrWebElement)->T:
|
||||
"""
|
||||
无参预期条件函数
|
||||
driver
|
||||
- WebDriver的实例(Ie、Firefox、Chrome或远程)或 一个WebElement
|
||||
- driver形参不可省略,即使不使用
|
||||
:return:
|
||||
"""
|
||||
_ = driver
|
||||
list_ = [1, 2, 3, 4, 5, 6]
|
||||
for item in list_:
|
||||
logger.info(item)
|
||||
if item == 5:
|
||||
return item
|
||||
|
||||
@register()
|
||||
def examples_have_parameters(locator: Tuple[str, str]) -> Callable[[WebDriverOrWebElement], Union[Literal[False], WebElement]]:
|
||||
"""
|
||||
有参预期条件函数(暂不支持)
|
||||
:param locator:
|
||||
:return:
|
||||
"""
|
||||
def _predicate(driver: WebDriverOrWebElement):
|
||||
try:
|
||||
|
||||
return driver.find_element(*locator)
|
||||
except Exception as e:
|
||||
|
||||
return False
|
||||
|
||||
return _predicate
|
||||
|
||||
def func_3(mark):
|
||||
...
|
||||
|
||||
|
||||
def func_4(mark):
|
||||
...
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(custom_ec)
|
||||
341
commons/driver.py
Normal file
341
commons/driver.py
Normal file
@@ -0,0 +1,341 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: key_driver
|
||||
@date: 2025/4/2 21:59
|
||||
@desc:
|
||||
"""
|
||||
|
||||
import logging
|
||||
from time import sleep
|
||||
import secrets # 原生库,用于生成安全的随机数
|
||||
from typing import Optional, Callable, Union, Literal, Any, TypeVar
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from selenium.webdriver import Chrome, Edge, Firefox, Safari, Ie
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.wait import WebDriverWait
|
||||
from selenium.webdriver.support.ui import Select
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.remote.webdriver import WebElement
|
||||
|
||||
from commons.assert_functions import custom_asserts
|
||||
from commons.custom_expected_condition import EC, custom_ec
|
||||
from commons.modules import Browser, Locator
|
||||
from commons.settings import configs, EXPLICIT_WAIT_TIMEOUT, SCREENSHOT_DIR
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
WebDriverOrWebElement = Union[WebDriver, WebElement]
|
||||
D = TypeVar("D", bound=Union[WebDriver, WebElement])
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# 触发器 trigger
|
||||
# 浏览器 Browser
|
||||
# 执行器 actuator
|
||||
# 筛选器 Filters
|
||||
# 查找器 finder
|
||||
# 转换器 converter
|
||||
def webdriver_finder(browser: str | Browser = Browser.CHROME) -> WebDriver:
|
||||
match browser:
|
||||
case Browser.CHROME:
|
||||
return Chrome()
|
||||
case Browser.FIREFOX:
|
||||
return Firefox()
|
||||
case Browser.IE:
|
||||
return Ie()
|
||||
case Browser.SAFARI:
|
||||
return Safari()
|
||||
case Browser.EDGE:
|
||||
return Edge()
|
||||
case _:
|
||||
return Chrome() # 默认情况
|
||||
|
||||
|
||||
def by_converter(by_value: str | Locator):
|
||||
try:
|
||||
# 统一处理输入
|
||||
if isinstance(by_value, str):
|
||||
by = Locator(by_value.lower().replace(' ', ''))
|
||||
|
||||
# 创建对应的浏览器实例
|
||||
by = {
|
||||
Locator.ID: By.ID,
|
||||
Locator.NAME: By.NAME,
|
||||
Locator.CLASS: By.CLASS_NAME,
|
||||
Locator.TAG: By.TAG_NAME,
|
||||
Locator.LINK_TEXT: By.LINK_TEXT,
|
||||
Locator.PARTIAL_LINK_TEXT: By.PARTIAL_LINK_TEXT,
|
||||
Locator.CSS: By.CSS_SELECTOR,
|
||||
Locator.XPATH: By.XPATH,
|
||||
}.get(by_value, By.XPATH)
|
||||
return by
|
||||
except ValueError:
|
||||
return By.XPATH
|
||||
|
||||
|
||||
class KeyWordDriver:
|
||||
|
||||
# def __init__(self, browser: str | Browser):
|
||||
# self.driver = self.webdriver_finder(browser)
|
||||
def __init__(self):
|
||||
self.driver: WebDriver | None = None
|
||||
self._url: str | None = None
|
||||
# self.temp_value = None
|
||||
|
||||
def base_url(self, url: str, *args, **kwargs):
|
||||
logger.info(f"前置URL: {url}")
|
||||
self._url = url
|
||||
# return url
|
||||
|
||||
def browser(self, browser_name: str | Browser = Browser.CHROME, *args, **kwargs):
|
||||
browser_name = Browser(browser_name.lower().replace(' ', '')) if isinstance(browser_name, str) else browser_name
|
||||
match browser_name:
|
||||
case Browser.CHROME:
|
||||
logger.info(f"启动{Browser.CHROME}浏览器")
|
||||
self.driver = Chrome()
|
||||
# return Chrome()
|
||||
case Browser.FIREFOX:
|
||||
logger.info(f"启动{Browser.FIREFOX}浏览器")
|
||||
self.driver = Firefox()
|
||||
case Browser.IE:
|
||||
logger.info(f"启动{Browser.IE}浏览器")
|
||||
self.driver = Ie()
|
||||
case Browser.SAFARI:
|
||||
logger.info(f"启动{Browser.SAFARI}浏览器")
|
||||
self.driver = Safari()
|
||||
case Browser.EDGE:
|
||||
logger.info(f"启动{Browser.EDGE}浏览器")
|
||||
self.driver = Edge()
|
||||
case _:
|
||||
logger.info(f"启动默认浏览器: {Browser.CHROME}")
|
||||
self.driver = Chrome() # 默认情况
|
||||
|
||||
def find_element(self, by: str = By.XPATH, value: Optional[str] = None, *args, **kwargs) -> WebElement:
|
||||
by = by_converter(by)
|
||||
|
||||
return self.driver.find_element(by, value)
|
||||
|
||||
def delay(self, timeout: int | float):
|
||||
sleep(timeout)
|
||||
return self
|
||||
|
||||
def implicit_wait(self, timeout: float, *args, **kwargs) -> None:
|
||||
"""
|
||||
隐式等待
|
||||
:param timeout: 超时时间
|
||||
:param args:
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
self.driver.implicitly_wait(timeout)
|
||||
|
||||
def explicit_wait(self, method: T, timeout: float = EXPLICIT_WAIT_TIMEOUT, *args, **kwargs):
|
||||
"""
|
||||
显示等待
|
||||
:param method: 可调用对象名
|
||||
:param timeout: 超时时间
|
||||
:param args:
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
|
||||
try:
|
||||
if isinstance(method, str):
|
||||
method = custom_ec.get(method, (lambda _: False))
|
||||
logger.info(f"预期条件: {method.__name__}")
|
||||
|
||||
return WebDriverWait(self.driver, timeout).until(method)
|
||||
except TypeError as te:
|
||||
logger.error(f"显示等待异常: {te}")
|
||||
# self.driver.quit()
|
||||
raise te
|
||||
|
||||
def page_load_timeout(self, timeout: float, *args, **kwargs) -> None:
|
||||
self.driver.set_page_load_timeout(timeout)
|
||||
|
||||
def get(self, url, *args, **kwargs):
|
||||
if self.driver is None:
|
||||
self.browser(*args, **kwargs)
|
||||
if not url.startswith("http"):
|
||||
# 自动添加baseurl
|
||||
url = urljoin(self._url, url)
|
||||
logger.info(f"网址: {url}")
|
||||
print(url)
|
||||
self.driver.get(url)
|
||||
|
||||
def maximize_window(self):
|
||||
self.driver.maximize_window()
|
||||
|
||||
def click(self, by, value, *args, **kwargs):
|
||||
|
||||
by = by_converter(by)
|
||||
mark = (by, value)
|
||||
method = EC.element_to_be_clickable(mark)
|
||||
|
||||
self.explicit_wait(method).click()
|
||||
# self.find_element(by, value).click()
|
||||
|
||||
def clear(self, by, value, *args, **kwargs):
|
||||
|
||||
by = by_converter(by)
|
||||
mark = (by, value)
|
||||
method = EC.visibility_of_element_located(mark)
|
||||
|
||||
self.explicit_wait(method).clear()
|
||||
|
||||
# self.driver.find_element(by, value).clear()
|
||||
|
||||
def input(self, by, value, content: str | None = None, *args, **kwargs):
|
||||
|
||||
by = by_converter(by)
|
||||
mark = (by, value)
|
||||
method = EC.visibility_of_element_located(mark)
|
||||
|
||||
self.explicit_wait(method).send_keys(content)
|
||||
# self.find_element(by, value).send_keys(content)
|
||||
|
||||
def get_text(self, by, value, *args, **kwargs):
|
||||
"""
|
||||
获取元素文本
|
||||
:param by:
|
||||
:param value:
|
||||
:param args:
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
by = by_converter(by)
|
||||
mark = (by, value)
|
||||
method = EC.visibility_of_element_located(mark)
|
||||
|
||||
text = self.explicit_wait(method).text
|
||||
# print(text)
|
||||
return text
|
||||
|
||||
def get_attribute(self, by, value, attributes: str, *args, **kwargs):
|
||||
"""
|
||||
获取元素属性值(class,type,value,...)
|
||||
:param by:
|
||||
:param value:
|
||||
:param attributes:
|
||||
:param args:
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
by = by_converter(by)
|
||||
mark = (by, value)
|
||||
method = EC.presence_of_element_located(mark)
|
||||
|
||||
text = self.explicit_wait(method).get_attribute(attributes)
|
||||
# print(text)
|
||||
return text
|
||||
|
||||
def enter_iframe(self, by, value, *args, **kwargs):
|
||||
|
||||
by = by_converter(by)
|
||||
element = self.find_element(by, value)
|
||||
self.driver.switch_to.frame(element)
|
||||
|
||||
def exit_iframe(self, *args, **kwargs):
|
||||
self.driver.switch_to.default_content()
|
||||
|
||||
def get_cookie(self, name, *args, **kwargs) -> Optional[dict]:
|
||||
return self.driver.get_cookie(name)
|
||||
|
||||
def add_cookie(self, cookie_dict, *args, **kwargs) -> None:
|
||||
self.driver.add_cookie(cookie_dict)
|
||||
|
||||
def screenshot_png(self, by, value, name: str | None = None, *args, **kwargs) -> None:
|
||||
by = by_converter(by)
|
||||
# mark = (by, value)
|
||||
# method = EC.visibility_of_element_located(mark)
|
||||
|
||||
if name is not None:
|
||||
path = (configs.SCREENSHOT_DIR / f"{name}.png").as_posix()
|
||||
else:
|
||||
# 生成 8 个随机字节,然后转为 16 个字符的十六进制字符串
|
||||
random_hex = secrets.token_hex(8) # n=8 表示生成 8 字节
|
||||
path = (configs.SCREENSHOT_DIR / f"{random_hex}.png").as_posix()
|
||||
|
||||
logger.warning(f"截图存放路径: {path}")
|
||||
# self.explicit_wait(method).screenshot(path)
|
||||
self.find_element(by, value).screenshot(path)
|
||||
|
||||
def select(self, by, value, text, func_name, *args, **kwargs):
|
||||
# 未完成
|
||||
by = by_converter(by)
|
||||
mark = (by, value)
|
||||
|
||||
method = EC.visibility_of_element_located(mark)
|
||||
self.explicit_wait(method)
|
||||
|
||||
def run_javascript(self, by, value, code: str, *args, **kwargs) -> Any:
|
||||
# 还需打磨
|
||||
by = by_converter(by)
|
||||
element = self.find_element(by, value)
|
||||
return self.driver.execute_script(code, element)
|
||||
|
||||
# def assert_text_equals(self, by, value,method, text, msg=None):
|
||||
# by = by_converter(by)
|
||||
# # _text = self.get_text(by, value)
|
||||
# _text = getattr(self,method)(by, value)
|
||||
#
|
||||
# assert _text == text, msg
|
||||
# def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
|
||||
# try:
|
||||
# value = method(self.driver)
|
||||
# if value:
|
||||
# return value
|
||||
# except AssertionError as e:
|
||||
# logger.error(e)
|
||||
#
|
||||
# def assert_text_equals(self, method, text, msg=""):
|
||||
# if isinstance(method, str):
|
||||
# method = custom_asserts.get(method, (lambda _: False))
|
||||
# logger.info(f"{method=}")
|
||||
# # _text = self.until(method)
|
||||
# _text = method(self.driver)
|
||||
# logger.info(_text)
|
||||
# assert _text == text, msg
|
||||
def assert_text_equals(self, by, value, text):
|
||||
by = by_converter(by)
|
||||
_text = self.get_text(by, value)
|
||||
assert _text == text, f"断言“{_text}”与“{text}”相等失败!!"
|
||||
logger.info(f"断言: {_text} == {text}")
|
||||
|
||||
def assert_text_not_equals(self, by, value, text):
|
||||
by = by_converter(by)
|
||||
_text = self.get_text(by, value)
|
||||
assert _text != text, f"断言“{_text}”与“{text}”不相等失败!!"
|
||||
|
||||
def assert_text_contains(self, by, value, text, msg=None):
|
||||
by = by_converter(by)
|
||||
_text = self.get_text(by, value)
|
||||
assert _text in text, f"断言“{_text}”包含于“{text}”中失败!!"
|
||||
|
||||
def assert_text_not_contains(self, by, value, text, msg=None):
|
||||
by = by_converter(by)
|
||||
_text = self.get_text(by, value)
|
||||
assert _text not in text, f"断言“{_text}”不包含于“{text}”中失败!!"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from commons.settings import configs
|
||||
el = KeyWordDriver()
|
||||
el.base_url(configs.base_url)
|
||||
# el.browser("chrome")
|
||||
el.browser(Browser.EDGE)
|
||||
el.get("/users/login")
|
||||
el.input("", '//*[@id="email"]', configs.username)
|
||||
el.input("", '//*[@id="pass"]', configs.password)
|
||||
el.get_text("", '//*[@id="root"]/div[1]/div/div/form/div[1]/label')
|
||||
el.click("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button')
|
||||
type_value = el.get_attribute("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button', "type")
|
||||
print(type_value)
|
||||
el.assert_text_equals("", '//*[@id="root"]/div[1]/div/div/form/div[3]/button', "登录")
|
||||
|
||||
137
commons/funcs.py
Normal file
137
commons/funcs.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: chen wei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: funcs.py
|
||||
@date: 2024 2024/9/22 22:46
|
||||
@desc:
|
||||
"""
|
||||
import base64
|
||||
import logging
|
||||
import time
|
||||
import urllib.parse
|
||||
import hashlib
|
||||
|
||||
# from commons.databases import db
|
||||
#
|
||||
from utils.file_processors.file_handle import FileHandle
|
||||
from commons import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Funcs:
|
||||
FUNC_MAPPING = {
|
||||
"int": int,
|
||||
"float": float,
|
||||
"bool": bool
|
||||
} # 内置函数有的,直接放入mapping;内置函数没有的,在funcs中定义,自动放入mapping
|
||||
|
||||
@classmethod
|
||||
def register(cls, name: str | None):
|
||||
def decorator(func):
|
||||
if name is None:
|
||||
cls.FUNC_MAPPING[func.__name__] = func
|
||||
cls.FUNC_MAPPING[name] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@Funcs.register("url_unquote")
|
||||
def url_unquote(s: str) -> str:
|
||||
return urllib.parse.unquote(s)
|
||||
|
||||
|
||||
@Funcs.register("str")
|
||||
def to_string(s) -> str:
|
||||
# 将数据转换为str类型。
|
||||
return f"'{s}'"
|
||||
|
||||
|
||||
@Funcs.register("time_str")
|
||||
def time_str() -> str:
|
||||
return str(time.time())
|
||||
|
||||
|
||||
@Funcs.register("add")
|
||||
def add(a, b):
|
||||
return str(int(a) + int(b))
|
||||
|
||||
|
||||
# @Funcs.register("sql")
|
||||
# def sql(s: str) -> str:
|
||||
# res = db.execute_sql(s)
|
||||
#
|
||||
# return res[0][0]
|
||||
|
||||
|
||||
@Funcs.register("new_id")
|
||||
def new_id():
|
||||
# 自增,永不重复
|
||||
id_file = FileHandle(settings.ID_PATH)
|
||||
id_file["id"] += 1
|
||||
id_file.save()
|
||||
|
||||
return id_file["id"]
|
||||
|
||||
|
||||
# @Funcs.register("last_id")
|
||||
# def last_id() -> str:
|
||||
# # 不自增,只返回结果
|
||||
#
|
||||
# id_file = FileHandle(settings.id_path)
|
||||
# return id_file["id"]
|
||||
|
||||
|
||||
@Funcs.register("md5")
|
||||
def md5(content: str) -> str:
|
||||
# 1,原文转为字节
|
||||
content = content.encode("utf-8")
|
||||
result = hashlib.md5(content).hexdigest()
|
||||
return result
|
||||
|
||||
|
||||
@Funcs.register("base64_encode")
|
||||
def base64_encode(content: str) -> str:
|
||||
# 1,原文转二进制
|
||||
content = content.encode("utf-8")
|
||||
# 2,base64编码(二进制)
|
||||
encode_value = base64.b64encode(content)
|
||||
# 3,转为字符串
|
||||
encode_str = encode_value.decode("utf-8")
|
||||
|
||||
return encode_str
|
||||
|
||||
|
||||
@Funcs.register("base64_decode")
|
||||
def base64_decode(content: str) -> str:
|
||||
# 1,原文转二进制
|
||||
content = content.encode("utf-8")
|
||||
# 2,base64解码(二进制)
|
||||
decode_value = base64.b64decode(content)
|
||||
# 3,转为字符串
|
||||
decode_str = decode_value.decode("utf-8")
|
||||
|
||||
return decode_str
|
||||
|
||||
|
||||
@Funcs.register("rsa_encode")
|
||||
def rsa_encode(content: str) -> str:
|
||||
...
|
||||
|
||||
|
||||
@Funcs.register("rsa_decode")
|
||||
def rsa_decode(content: str) -> str:
|
||||
...
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# res = url_unquote("%E6%88%90%E5%8A%9F%E3%80%82")
|
||||
# print(res)
|
||||
# print(f"计数器:{new_id()}")
|
||||
# print(f"当前数值:{last_id()}")
|
||||
print(Funcs().FUNC_MAPPING)
|
||||
41
commons/modules.py
Normal file
41
commons/modules.py
Normal file
@@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: modules
|
||||
@date: 2025/4/5 20:29
|
||||
@desc:
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class Browser(str, Enum):
|
||||
CHROME = "chrome"
|
||||
FIREFOX = "firefox"
|
||||
IE = "ie"
|
||||
SAFARI = "safari"
|
||||
EDGE = "edge"
|
||||
|
||||
|
||||
class Locator(str, Enum):
|
||||
ID = "id",
|
||||
NAME = "name",
|
||||
CLASS = "class",
|
||||
TAG = "tag",
|
||||
LINK_TEXT = "link_text",
|
||||
PARTIAL_LINK_TEXT = "partial_link_text",
|
||||
CSS = "css",
|
||||
XPATH = "xpath"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(Browser.CHROME)
|
||||
print(type(Browser.CHROME))
|
||||
print(Browser("chR ome ".lower().replace(' ', '')))
|
||||
match "chrome":
|
||||
case Browser.CHROME:
|
||||
print("我能执行!!!")
|
||||
137
commons/settings.py
Normal file
137
commons/settings.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: settings
|
||||
@date: 2025/2/23 21:34
|
||||
@desc:
|
||||
"""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import tomlkit
|
||||
|
||||
ROOT_PATH = (Path(__file__)).resolve().parents[1] # 获取根路径(绝对路径)
|
||||
|
||||
EXPLICIT_WAIT_TIMEOUT = 10
|
||||
|
||||
DATA_ROOT = ROOT_PATH / "data"
|
||||
|
||||
if not DATA_ROOT.exists():
|
||||
DATA_ROOT.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 配置文件路
|
||||
CONF_PATH = Path(DATA_ROOT, "settings.toml")
|
||||
|
||||
# 截图保存路径
|
||||
SCREENSHOT_DIR = Path(ROOT_PATH, "screenshot")
|
||||
|
||||
if not SCREENSHOT_DIR.exists():
|
||||
SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 测试案例路径
|
||||
CASES_DIR = Path(ROOT_PATH, "tests")
|
||||
|
||||
# 变量替换
|
||||
EXCHANGER = Path(ROOT_PATH, "extract.toml")
|
||||
|
||||
# 自增ID
|
||||
ID_PATH = Path(ROOT_PATH, "id.toml")
|
||||
|
||||
# 默认配置
|
||||
DEFAULT_CONF = {
|
||||
"SCREENSHOT_DIR": SCREENSHOT_DIR,
|
||||
"CASES_DIR": CASES_DIR,
|
||||
"EXCHANGER": EXCHANGER,
|
||||
"ID_PATH": ID_PATH,
|
||||
}
|
||||
|
||||
|
||||
class Settings:
|
||||
"""
|
||||
配置管理类 (单例模式 Singleton Pattern)。
|
||||
|
||||
优先从项目根目录下的 'settings.toml' 文件加载配置。
|
||||
如果 'settings.toml' 文件不存在、无法解析或缺少某个配置项,
|
||||
则使用此类中定义的默认值。
|
||||
|
||||
通过创建类的唯一实例 `settings` 来全局访问配置。
|
||||
"""
|
||||
_instance = None
|
||||
|
||||
# --- 单例实现 ---
|
||||
def __new__(cls, *args, **kwargs):
|
||||
"""
|
||||
实现单例模式。确保全局只有一个 Settings 实例。
|
||||
"""
|
||||
if not cls._instance:
|
||||
cls._instance = super().__new__(cls, *args, **kwargs)
|
||||
# 初始化只进行一次
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
初始化配置。仅在第一次创建实例时执行。
|
||||
"""
|
||||
# 防止重复初始化 (单例模式下可能会被多次调用 __init__)
|
||||
if self._initialized:
|
||||
return
|
||||
self._initialized = True
|
||||
|
||||
logging.info("开始初始化配置...")
|
||||
self._init_config()
|
||||
self._load_config()
|
||||
|
||||
def _init_config(self):
|
||||
"""初始化配置文件"""
|
||||
if not CONF_PATH.exists():
|
||||
CONF_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
CONF_PATH.touch()
|
||||
# self._save_config(DEFAULT_CONF)
|
||||
return self
|
||||
|
||||
def _load_config(self):
|
||||
with open(CONF_PATH, 'r', encoding='utf-8') as f:
|
||||
result = tomlkit.parse(f.read())
|
||||
logging.debug(f"加载 settings.toml 文件 ===> {result}")
|
||||
|
||||
new_conf = DEFAULT_CONF | result
|
||||
for key, value in new_conf.items():
|
||||
|
||||
self.__setattr__(key, value)
|
||||
return self
|
||||
|
||||
def _save_config(self, data: dict[str, Any]):
|
||||
"""保存配置到文件"""
|
||||
for key, value in data.items():
|
||||
if isinstance(value, Path):
|
||||
data[key] = value.as_posix()
|
||||
with open(CONF_PATH, 'w', encoding='utf-8') as f:
|
||||
tomlkit.dump(data=data, fp=f, sort_keys=False)
|
||||
return self
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
self.__dict__[key] = value
|
||||
|
||||
def items(self):
|
||||
conf: dict = {}
|
||||
for key, value in self.__dict__.items():
|
||||
if not key.startswith('_'):
|
||||
conf[key] = value
|
||||
# return self.__dict__.items()
|
||||
return conf
|
||||
|
||||
|
||||
configs = Settings()
|
||||
|
||||
# --- 用于直接运行此文件进行测试 ---
|
||||
if __name__ == '__main__':
|
||||
...
|
||||
print(configs.items())
|
||||
|
||||
63
commons/templates.py
Normal file
63
commons/templates.py
Normal file
@@ -0,0 +1,63 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: chen wei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: templates.py
|
||||
@date: 2024 2024/9/22 22:20
|
||||
@desc:
|
||||
"""
|
||||
import copy
|
||||
import logging
|
||||
import re
|
||||
import string
|
||||
from commons.funcs import Funcs
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Template(string.Template):
|
||||
"""
|
||||
1,支持函数调用
|
||||
2,参数也可以是变量
|
||||
"""
|
||||
|
||||
call_pattern = re.compile(r"\${(?P<func_name>.*?)\((?P<func_args>.*?)\)}")
|
||||
|
||||
def render(self, mapping: dict) -> str:
|
||||
s = self.safe_substitute(mapping) # 原有方法替换变量
|
||||
s = self.safe_substitute_funcs(s, mapping)
|
||||
|
||||
return s
|
||||
|
||||
def safe_substitute_funcs(self, template, mapping) -> str:
|
||||
"""
|
||||
解析字符串中的函数名和参数,并将函数调用结果进行替换
|
||||
:param template: 字符串
|
||||
:param mapping: 上下文,提供要使用的函数和变量
|
||||
:return: 替换后的结果
|
||||
"""
|
||||
mapping = copy.deepcopy(mapping)
|
||||
logger.info(f"mapping更新前: {mapping}")
|
||||
# mapping.update(self.FUNC_MAPPING) # 合并两个mapping
|
||||
mapping.update(Funcs.FUNC_MAPPING) # 合并两个mapping
|
||||
logger.info(f"mapping更新后: {mapping}")
|
||||
def convert(mo):
|
||||
func_name = mo.group("func_name")
|
||||
func_args = mo.group("func_args").split(",")
|
||||
func = mapping.get(func_name) # 读取指定函数
|
||||
func_args_value = [mapping.get(arg, arg) for arg in func_args]
|
||||
|
||||
if func_args_value == [""]: # 处理没有参数的func
|
||||
func_args_value = []
|
||||
|
||||
if not callable(func):
|
||||
return mo.group() # 如果是不可调用的假函数,不进行替换
|
||||
else:
|
||||
return str(func(*func_args_value)) # 否则用函数结果进行替换
|
||||
|
||||
return self.call_pattern.sub(convert, template)
|
||||
|
||||
99
conftest.py
Normal file
99
conftest.py
Normal file
@@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: conftest
|
||||
@date: 2025/4/2 21:57
|
||||
@desc:
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
from itertools import chain
|
||||
import inspect
|
||||
|
||||
import allure
|
||||
from pytest_xlsx.file import XlsxItem
|
||||
import pytest
|
||||
from selenium.webdriver import Chrome
|
||||
|
||||
from POM.login_page import LoginPage
|
||||
from commons.driver import KeyWordDriver
|
||||
from utils.file_processors.file_handle import FileHandle
|
||||
from commons.templates import Template
|
||||
from commons.settings import configs
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# @pytest.fixture()
|
||||
def login_ok():
|
||||
username = configs.username
|
||||
password = configs.password
|
||||
login = LoginPage()
|
||||
logger.debug("fixture-login_ok 我被执行了!!!")
|
||||
login.login(username, password)
|
||||
yield login.driver
|
||||
login.driver.quit()
|
||||
|
||||
|
||||
file = FileHandle(configs.variable)
|
||||
|
||||
|
||||
def use_cases_converter(xlsx_item: dict) -> list:
|
||||
# 变量替换
|
||||
logger.info(f"变量替换前: {xlsx_item}")
|
||||
step_str = FileHandle.to_string(xlsx_item)
|
||||
xlsx_item = FileHandle.to_dict(Template(step_str).render(file))
|
||||
logger.info(f"变量替换: {xlsx_item}")
|
||||
|
||||
new_values = []
|
||||
value = list(xlsx_item.values())
|
||||
for item in value:
|
||||
if not isinstance(item, list):
|
||||
new_values.append(item)
|
||||
else:
|
||||
new_values.extend(item)
|
||||
|
||||
logger.info(f"转换后的测试用例: <===={new_values}")
|
||||
return new_values
|
||||
|
||||
|
||||
kwd = KeyWordDriver()
|
||||
|
||||
|
||||
# hook(钩子)
|
||||
def pytest_xlsx_run_step(item: XlsxItem):
|
||||
step = item.current_step
|
||||
logger.info(f"转换前的测试用例: ====>{step}")
|
||||
step = use_cases_converter(step)
|
||||
# print(step)
|
||||
|
||||
content = slice(1, 2)
|
||||
key = slice(2, 3)
|
||||
values = slice(3, None)
|
||||
|
||||
remark = step[content].pop()
|
||||
keyword = step[key].pop()
|
||||
_arges = [item for item in step[values] if item is not None]
|
||||
|
||||
logger.info(f"标记: {keyword} -- 参数: {_arges}")
|
||||
try:
|
||||
func = getattr(kwd, keyword)
|
||||
logger.info(f"当前执行的操作:{func.__name__}")
|
||||
func(*_arges)
|
||||
except AttributeError as e:
|
||||
logger.error(f'标记 {keyword} 错误: {e}')
|
||||
raise e
|
||||
|
||||
if remark:
|
||||
# 截图
|
||||
png = kwd.driver.get_screenshot_as_png() # 保留为二进制格式
|
||||
# 将截图报错附加到allure
|
||||
allure.attach(png, remark, allure.attachment_type.PNG)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(bool([1, 2, 3]))
|
||||
22
main.py
Normal file
22
main.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import os
|
||||
import shutil
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from commons.settings import configs
|
||||
|
||||
# 指定测试目录
|
||||
CASES_DIR = configs.CASES_DIR
|
||||
|
||||
if __name__ == '__main__':
|
||||
now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
|
||||
# 1,启动框架(生成临时文件)
|
||||
# -x表示有一个用例失败后面将不执行;-v表示展示用例名称;-c,配置文件所在目录:指定pytest.ini路径;--alluredir=temp。指定数据生成目录
|
||||
pytest.main([str(CASES_DIR), "-x", "-v", "--alluredir=temp"])
|
||||
# 2,生成HTML报告
|
||||
os.system('allure generate temp -o report --clean') # java程序只能借助操作系统执行
|
||||
|
||||
# 3,备份日志
|
||||
# shutil.copy2("logs/pytest.log", f"logs/pytest_{now}.log")
|
||||
21
pyproject.toml
Normal file
21
pyproject.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[project]
|
||||
name = "webautotest"
|
||||
version = "0.1.0"
|
||||
description = "Web自动化测试框架"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"pydantic-settings>=2.8.1",
|
||||
"pytest>=8.3.5",
|
||||
"pytest-xlsx>=0.5.1",
|
||||
"selenium>=4.30.0",
|
||||
"tomlkit>=0.13.2",
|
||||
]
|
||||
|
||||
[[tool.uv.index]]
|
||||
url = "https://mirrors.cloud.tencent.com/pypi/simple"
|
||||
default = true
|
||||
10
pytest.ini
Normal file
10
pytest.ini
Normal file
@@ -0,0 +1,10 @@
|
||||
[pytest]
|
||||
addopts = -q --show-capture=no
|
||||
|
||||
|
||||
log_file = logs/pytest.log
|
||||
log_file_level = info
|
||||
log_file_format = %(asctime)s [%(name)s] %(levelname)s %(module)s.%(funcName)s:%(lineno)d - %(message)s
|
||||
log_file_date_format = %m/%d/%Y %H:%M:%S %p
|
||||
|
||||
disable_test_id_escaping_and_forfeit_all_rights_to_community_support = true
|
||||
BIN
tests/test_login.xlsx
Normal file
BIN
tests/test_login.xlsx
Normal file
Binary file not shown.
11
utils/__init__.py
Normal file
11
utils/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: __init__.py
|
||||
@date: 2025/4/8 21:44
|
||||
@desc:
|
||||
"""
|
||||
11
utils/file_processors/__init__.py
Normal file
11
utils/file_processors/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: __init__.py
|
||||
@date: 2025/3/4 17:23
|
||||
@desc:
|
||||
"""
|
||||
41
utils/file_processors/base.py
Normal file
41
utils/file_processors/base.py
Normal file
@@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: base
|
||||
@date: 2025/3/4 17:23
|
||||
@desc:
|
||||
"""
|
||||
import abc
|
||||
|
||||
|
||||
class BaseFileProcessor(abc.ABC): # 使用 abc 模块定义抽象基类
|
||||
"""
|
||||
文件处理器的抽象基类。
|
||||
定义了所有子类必须实现的方法。
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def load(self):
|
||||
"""加载."""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def to_string(data: dict) -> str:
|
||||
"""将文件内容转换为字符串。"""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def to_dict(data: str) -> dict:
|
||||
"""将文件内容转换为字典。"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def save(self, new_filepath=None):
|
||||
"""将数据保存."""
|
||||
pass
|
||||
42
utils/file_processors/file_handle.py
Normal file
42
utils/file_processors/file_handle.py
Normal file
@@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: file_handle
|
||||
@date: 2025/3/7 09:31
|
||||
@desc:
|
||||
"""
|
||||
|
||||
from utils.file_processors.toml_processor import TomlProcessor
|
||||
from utils.file_processors.yaml_processor import YamlProcessor
|
||||
from utils.file_processors.json_processor import JsonProcessor
|
||||
from commons.settings import configs
|
||||
processors = {
|
||||
'toml': TomlProcessor,# 暂不支持
|
||||
'yaml': YamlProcessor,
|
||||
'yml': YamlProcessor,
|
||||
'json': JsonProcessor,
|
||||
|
||||
}
|
||||
|
||||
|
||||
def get_processor(ext):
|
||||
agent_model = processors.get(ext, YamlProcessor) # 代理模式
|
||||
|
||||
return agent_model # 默认回退到 Yaml
|
||||
|
||||
|
||||
FileHandle = get_processor("yaml")
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 示例用法
|
||||
yaml_file = FileHandle(configs.variable)
|
||||
print(yaml_file)
|
||||
print(type(yaml_file))
|
||||
file_string = FileHandle.to_string(yaml_file)
|
||||
print(file_string)
|
||||
file_dict = FileHandle.to_dict(file_string)
|
||||
print(file_dict)
|
||||
126
utils/file_processors/json_processor.py
Normal file
126
utils/file_processors/json_processor.py
Normal file
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: yaml_processor
|
||||
@date: 2025/3/4 17:28
|
||||
@desc:
|
||||
"""
|
||||
import logging
|
||||
from typing import Union
|
||||
from pathlib import Path
|
||||
import json
|
||||
from utils.file_processors.base import BaseFileProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JsonProcessor(BaseFileProcessor, dict):
|
||||
"""
|
||||
用于处理 YAML 文件的类,继承自 dict。
|
||||
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
|
||||
并可以直接像字典一样访问 YAML 数据。
|
||||
"""
|
||||
|
||||
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
|
||||
"""
|
||||
初始化 YamlFile 对象。
|
||||
|
||||
Args:
|
||||
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
|
||||
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
|
||||
如果不提供,则尝试从 filepath 加载数据。
|
||||
"""
|
||||
super().__init__() # 初始化父类 dict
|
||||
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
|
||||
if data is not None:
|
||||
self.update(data) # 如果提供了初始数据,则更新字典
|
||||
else:
|
||||
self.load() # 否则,尝试从文件加载
|
||||
|
||||
def load(self) -> None:
|
||||
"""
|
||||
从 YAML 文件加载数据并更新字典。
|
||||
如果文件不存在或加载失败,则清空字典并记录警告/错误。
|
||||
"""
|
||||
self.clear() # 清空现有数据
|
||||
if self.filepath.exists():
|
||||
try:
|
||||
with open(self.filepath, "r", encoding="utf-8") as f:
|
||||
loaded_data = json.load(f) or {}
|
||||
self.update(loaded_data) # 使用加载的数据更新字典
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
|
||||
# 保持字典为空 (已在开头 clear)
|
||||
else:
|
||||
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
|
||||
# 保持字典为空 (已在开头 clear)
|
||||
|
||||
@staticmethod
|
||||
def to_string(data: dict) -> str:
|
||||
"""
|
||||
将字典 (自身) 转换为 YAML 格式的字符串。
|
||||
|
||||
Returns:
|
||||
YAML 格式的字符串。
|
||||
"""
|
||||
try:
|
||||
return json.dumps(
|
||||
dict(data), # 使用dict转换为标准的字典
|
||||
ensure_ascii=False, # 允许非ASCII字符
|
||||
# indent=4, # 美化输出,缩进4个空格
|
||||
sort_keys=False # 不排序键
|
||||
)
|
||||
except TypeError as e:
|
||||
logger.error(f"将数据转换为 JSON 字符串时出错: {e}")
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def to_dict(data: str) -> None:
|
||||
"""
|
||||
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
|
||||
|
||||
Args:
|
||||
data: YAML 格式的字符串。
|
||||
"""
|
||||
try:
|
||||
loaded_data = json.loads(data) or {}
|
||||
return loaded_data
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"将 JSON 字符串转换为字典时出错: {e}")
|
||||
|
||||
def save(self, new_filepath: Union[str, Path, None] = None):
|
||||
"""
|
||||
将字典数据 (自身) 保存到 YAML 文件。
|
||||
|
||||
Args:
|
||||
new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。
|
||||
"""
|
||||
filepath = Path(new_filepath) if new_filepath else self.filepath
|
||||
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
json.dump(
|
||||
dict(self), # 使用dict转换为标准的字典
|
||||
f,
|
||||
ensure_ascii=False, # 允许非ASCII字符
|
||||
indent=4, # 美化输出,缩进4个空格
|
||||
sort_keys=False # 不排序键
|
||||
)
|
||||
except (TypeError, OSError) as e:
|
||||
logger.error(f"保存 JSON 文件 {filepath} 时出错: {e}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 示例用法
|
||||
json_path = r'E:\PyP\InterfaceAutoTest\TestCases\test_1_user.json' # 你的 JSON 文件路径
|
||||
json_file = JsonProcessor(json_path)
|
||||
print(json_file)
|
||||
print(type(json_file))
|
||||
json_string = JsonProcessor.to_string(json_file)
|
||||
JsonProcessor.to_dict(json_string)
|
||||
print(json_string)
|
||||
json_file.save()
|
||||
110
utils/file_processors/toml_processor.py
Normal file
110
utils/file_processors/toml_processor.py
Normal file
@@ -0,0 +1,110 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: file_processing
|
||||
@date: 2025/4/8 21:22
|
||||
@desc:
|
||||
"""
|
||||
from pathlib import Path
|
||||
from tomlkit import parse, dumps
|
||||
from tomlkit.toml_file import TOMLFile
|
||||
|
||||
import logging
|
||||
from typing import Union
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from pathlib import Path
|
||||
import tomlkit
|
||||
from utils.file_processors.base import BaseFileProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TomlProcessor(BaseFileProcessor, dict):
|
||||
"""
|
||||
用于处理 YAML 文件的类,继承自 dict。
|
||||
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
|
||||
并可以直接像字典一样访问 YAML 数据。
|
||||
"""
|
||||
|
||||
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
|
||||
"""
|
||||
初始化 YamlFile 对象。
|
||||
|
||||
Args:
|
||||
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
|
||||
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
|
||||
如果不提供,则尝试从 filepath 加载数据。
|
||||
"""
|
||||
super().__init__() # 初始化父类 dict
|
||||
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
|
||||
if data is not None:
|
||||
self.update(data) # 如果提供了初始数据,则更新字典
|
||||
else:
|
||||
self.load() # 否则,尝试从文件加载
|
||||
|
||||
def load(self) -> None:
|
||||
|
||||
self.clear() # 清空现有数据
|
||||
if self.filepath.exists():
|
||||
try:
|
||||
with open(self.filepath, 'r', encoding='utf-8') as f:
|
||||
result = tomlkit.parse(f.read()) or {}
|
||||
# print(result)
|
||||
self.update(result)
|
||||
except Exception as e:
|
||||
logger.error(f"加载 TOML 文件 {self.filepath} 时出错: {e}")
|
||||
else:
|
||||
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
|
||||
@staticmethod
|
||||
def to_string(data: dict) -> str:
|
||||
try:
|
||||
return tomlkit.dumps(
|
||||
dict(data), # 使用dict转换为标准的字典
|
||||
sort_keys=False
|
||||
)
|
||||
except TypeError as e:
|
||||
logger.error(f"将数据转换为 TOML 字符串时出错: {e}")
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def to_dict(data: str) -> Union[None, dict]:
|
||||
try:
|
||||
loaded_data = tomlkit.loads(data) or {}
|
||||
return loaded_data
|
||||
except Exception as e:
|
||||
logger.error(f"将 TOML 字符串转换为字典时出错: {e}")
|
||||
|
||||
def save(self, new_filepath: Union[str, Path, None] = None):
|
||||
|
||||
filepath = Path(new_filepath) if new_filepath else self.filepath
|
||||
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
tomlkit.dump(
|
||||
dict(self), # 使用dict转换为标准的字典
|
||||
fp=f,
|
||||
sort_keys=False,
|
||||
)
|
||||
except (TypeError, OSError) as e:
|
||||
logger.error(f"保存 TOML 文件 {filepath} 时出错: {e}")
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
...
|
||||
toml_path =r'E:\PyP\WebAutoTest\data\var.toml'
|
||||
toml_file = TomlProcessor(toml_path)
|
||||
print(toml_file)
|
||||
print(type(toml_file))
|
||||
print(toml_file.to_string(toml_file))
|
||||
print(toml_file.to_dict(toml_file.to_string(toml_file)))
|
||||
print(toml_file.to_dict(toml_file.to_string(toml_file)))
|
||||
toml_file.to_dict(toml_file.to_string(toml_file))
|
||||
# toml_file.save()
|
||||
|
||||
print(toml_file.to_string(
|
||||
{'用例ID': None, '内容': '打开浏览器', '标记': 'browser', '参数': 'edge', '_BlankField': [None, None]}))
|
||||
127
utils/file_processors/yaml_processor.py
Normal file
127
utils/file_processors/yaml_processor.py
Normal file
@@ -0,0 +1,127 @@
|
||||
#!/usr/bin/env python
|
||||
# coding=utf-8
|
||||
|
||||
"""
|
||||
@author: CNWei
|
||||
@Software: PyCharm
|
||||
@contact: t6i888@163.com
|
||||
@file: yaml_processor
|
||||
@date: 2025/3/4 17:28
|
||||
@desc:
|
||||
"""
|
||||
import logging
|
||||
from typing import Union
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
from utils.file_processors.base import BaseFileProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class YamlProcessor(BaseFileProcessor, dict):
|
||||
"""
|
||||
用于处理 YAML 文件的类,继承自 dict。
|
||||
提供了从文件加载、保存到文件、转换为字符串和从字符串转换的功能,
|
||||
并可以直接像字典一样访问 YAML 数据。
|
||||
"""
|
||||
|
||||
def __init__(self, filepath: Union[str, Path], data: Union[dict, None] = None):
|
||||
"""
|
||||
初始化 YamlFile 对象。
|
||||
|
||||
Args:
|
||||
filepath: YAML 文件的路径 (可以是字符串或 pathlib.Path 对象).
|
||||
data: 可选的初始数据字典。如果提供,则用该字典初始化 YamlFile。
|
||||
如果不提供,则尝试从 filepath 加载数据。
|
||||
"""
|
||||
super().__init__() # 初始化父类 dict
|
||||
self.filepath: Path = Path(filepath) # 确保 filepath 是 Path 对象
|
||||
if data is not None:
|
||||
self.update(data) # 如果提供了初始数据,则更新字典
|
||||
else:
|
||||
self.load() # 否则,尝试从文件加载
|
||||
|
||||
def load(self) -> None:
|
||||
"""
|
||||
从 YAML 文件加载数据并更新字典。
|
||||
如果文件不存在或加载失败,则清空字典并记录警告/错误。
|
||||
"""
|
||||
self.clear() # 清空现有数据
|
||||
if self.filepath.exists():
|
||||
try:
|
||||
with open(self.filepath, "r", encoding="utf-8") as f:
|
||||
loaded_data = yaml.safe_load(f) or {}
|
||||
self.update(loaded_data) # 使用加载的数据更新字典
|
||||
except yaml.YAMLError as e:
|
||||
logger.error(f"加载 YAML 文件 {self.filepath} 时出错: {e}")
|
||||
# 保持字典为空 (已在开头 clear)
|
||||
else:
|
||||
logger.warning(f"文件 {self.filepath} 不存在, 字典保持为空.")
|
||||
# 保持字典为空 (已在开头 clear)
|
||||
|
||||
@staticmethod
|
||||
def to_string(data: dict) -> str:
|
||||
"""
|
||||
将字典 (自身) 转换为 YAML 格式的字符串。
|
||||
|
||||
Returns:
|
||||
YAML 格式的字符串。
|
||||
"""
|
||||
try:
|
||||
return yaml.safe_dump(
|
||||
dict(data), # 使用dict转换为标准的字典
|
||||
allow_unicode=True,
|
||||
sort_keys=False,
|
||||
default_flow_style=False
|
||||
)
|
||||
except TypeError as e:
|
||||
logger.error(f"将数据转换为 YAML 字符串时出错: {e}")
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def to_dict(data: str) -> Union[None, dict]:
|
||||
"""
|
||||
将 YAML 格式的字符串转换为字典,并更新当前字典的内容.
|
||||
|
||||
Args:
|
||||
data: YAML 格式的字符串。
|
||||
"""
|
||||
try:
|
||||
loaded_data = yaml.safe_load(data) or {}
|
||||
return loaded_data
|
||||
except yaml.YAMLError as e:
|
||||
logger.error(f"将 YAML 字符串转换为字典时出错: {e}")
|
||||
|
||||
def save(self, new_filepath: Union[str, Path, None] = None):
|
||||
"""
|
||||
将字典数据 (自身) 保存到 YAML 文件。
|
||||
|
||||
Args:
|
||||
new_filepath: 可选参数,指定新的文件路径。如果为 None,则覆盖原文件。
|
||||
"""
|
||||
filepath = Path(new_filepath) if new_filepath else self.filepath
|
||||
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
yaml.safe_dump(
|
||||
dict(self), # 使用dict转换为标准的字典
|
||||
stream=f,
|
||||
allow_unicode=True,
|
||||
sort_keys=False,
|
||||
default_flow_style=False
|
||||
)
|
||||
except (TypeError, OSError) as e:
|
||||
logger.error(f"保存 YAML 文件 {filepath} 时出错: {e}")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 示例用法
|
||||
yaml_path = r'E:\PyP\InterfaceAutoTest\TestCases\answer\test_1_status.yaml' # 你的 YAML 文件路径
|
||||
yaml_file = YamlProcessor(yaml_path)
|
||||
print(yaml_file)
|
||||
print(type(yaml_file))
|
||||
|
||||
Reference in New Issue
Block a user