from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any, ClassVar
from appium.webdriver.webdriver import WebDriver as AppiumDriver
from playwright.sync_api import (
Browser as PlaywrightBrowser,
BrowserContext as PlaywrightContext,
Page as PlaywrightDriver,
)
from selenium.webdriver.remote.webdriver import WebDriver as SeleniumDriver
from mops.abstraction.driver_wrapper_abc import DriverWrapperABC
from mops.exceptions import DriverWrapperException
from mops.js_scripts import storage_get_items_js, storage_set_item_js
from mops.mixins.internal_mixin import InternalMixin
from mops.mixins.objects.box import Box
from mops.mixins.objects.driver import Driver
from mops.mixins.objects.visual_comaprison_mixin import hide_before_screenshot, reveal_after_screenshot
from mops.playwright.play_driver import PlayDriver
from mops.selenium.driver.mobile_driver import MobileDriver
from mops.selenium.driver.web_driver import WebDriver
from mops.utils.internal_utils import extract_named_objects, get_attributes_from_object
from mops.utils.logs import Logging, LogLevel
from mops.visual_comparison import VisualComparison
if TYPE_CHECKING:
from typing import Self
from PIL import Image
from mops.base.element import Element
from mops.mixins.objects.box import Box
from mops.mixins.objects.driver import Driver
[docs]class DriverWrapperSessions:
all_sessions: ClassVar[list[DriverWrapper]] = []
[docs] @classmethod
def add_session(cls, driver_wrapper: DriverWrapper) -> None:
"""
Add a :obj:`.DriverWrapper` object to the session pool.
:param driver_wrapper: The :obj:`.DriverWrapper` instance to add to the pool.
:return: None
"""
cls.all_sessions.append(driver_wrapper)
[docs] @classmethod
def remove_session(cls, driver_wrapper: DriverWrapper) -> None:
"""
Remove a :obj:`.DriverWrapper` object from the session pool.
:param driver_wrapper: The :obj:`.DriverWrapper` instance to remove from the pool.
:return: None
"""
cls.all_sessions.remove(driver_wrapper)
[docs] @classmethod
def sessions_count(cls) -> int:
"""
Get the count of initialized :obj:`.DriverWrapper` objects.
:return: :obj:`int` - The number of initialized sessions.
"""
return len(cls.all_sessions)
[docs] @classmethod
def first_session(cls) -> DriverWrapper | None:
"""
Get the first :obj:`.DriverWrapper` object from the session pool.
:return: The first :obj:`.DriverWrapper` object in the pool, or `None` if no session exists.
:rtype: typing.Union[DriverWrapper, None]
"""
return cls.all_sessions[0] if cls.all_sessions else None
[docs] @classmethod
def is_connected(cls) -> bool:
"""
Check the connection status of any :obj:`.DriverWrapper` object in the pool.
:return: :obj:`bool` - :obj:`True` if at least one :obj:`.DriverWrapper` object is available,
otherwise :obj:`False`.
"""
return any(cls.all_sessions)
[docs]class DriverWrapper(InternalMixin, Logging, DriverWrapperABC):
"""
A wrapper class for managing web and mobile driver instances,
supporting Selenium, Appium, and Playwright.
This class serves as a crossroad for interacting with different driver types,
allowing for flexible management of web and mobile sessions.
It also provides platform-specific flags and information to assist with automation tasks.
"""
driver: SeleniumDriver | AppiumDriver | PlaywrightDriver
context: PlaywrightContext
browser: PlaywrightBrowser
_object: str = 'driver_wrapper'
_base_cls: type[PlayDriver, MobileDriver, WebDriver] = None
session: DriverWrapperSessions = DriverWrapperSessions
anchor: Element | None = None
is_desktop: bool = False
is_selenium: bool = False
is_playwright: bool = False
is_mobile_resolution: bool = False
is_appium: bool = False
is_mobile: bool = False
is_tablet: bool = False
is_ios: bool = False
is_ios_tablet: bool = False
is_ios_mobile: bool = False
is_android: bool = False
is_android_tablet: bool = False
is_android_mobile: bool = False
is_simulator: bool = False
is_real_device: bool = False
browser_name: str | None = None
def __new__(cls, *args: Any, **kwargs: Any) -> Self:
"""Create a new DriverWrapper instance or a shadow wrapper for multi-session support."""
if cls.session.sessions_count() == 0:
instance = super().__new__(cls)
else:
attrs = get_attributes_from_object(cls)
attrs.pop('_configured', None)
shadow_cls = type('ShadowDriverWrapper', (cls,), attrs)
instance = super().__new__(shadow_cls)
for name in extract_named_objects(instance, bool):
setattr(instance, name, False)
return instance
def __repr__(self):
cls = self.__class__
label = 'desktop'
if cls.is_android:
label = 'android'
elif cls.is_ios:
label = 'ios'
return f'{cls.__name__}({self.label}={self.driver}) at {hex(id(self))}, platform={label}'
[docs] def __init__(self, driver: Driver):
"""
Initialize the DriverWrapper instance based on the provided driver source.
This constructor sets up the driver wrapper, which can support
Appium, Selenium, or Playwright drivers.
It also manages session tracking and platform-specific configurations,
such as mobile resolution and platform type.
:param driver: :obj:`.Driver` object that holds appium / selenium / playwright driver to initialize
"""
self.__driver_container = driver
self.session.add_session(self)
self.label = f'{self.session.all_sessions.index(self) + 1}_driver'
self.__init_base_class__()
if driver.is_mobile_resolution:
self.is_mobile_resolution = True
self.is_desktop = False
self.is_mobile = True
[docs] def quit(self, silent: bool = False, trace_path: str = 'trace.zip') -> None:
"""
Quit the driver instance.
:param silent: If :obj:`True`, suppresses logging.
:type silent: bool
**Selenium/Appium:**
:param trace_path: Compatibility argument for Playwright.
:type trace_path: str
**Playwright:**
:param trace_path: Path to the trace file.
:type trace_path: str
:return: :obj:`None`
"""
if not silent:
self.log('Quit driver instance')
self._base_cls.quit(self, trace_path)
self.session.remove_session(self)
[docs] def save_screenshot(
self,
file_name: str,
screenshot_base: Image | bytes = None,
convert_type: str | None = None,
) -> Image:
"""
Take a full screenshot of the driver and save it to the specified path/filename.
:param file_name: Path or filename for the screenshot.
:type file_name: str
:param screenshot_base: Screenshot binary or image to use (optional).
:type screenshot_base: :obj:`bytes`, :class:`PIL.Image.Image`
:param convert_type: Image conversion type before saving (optional).
:type convert_type: str
:return: :class:`PIL.Image.Image`
"""
self.log('Save driver screenshot')
image_object = screenshot_base
if isinstance(screenshot_base, bytes) or screenshot_base is None:
image_object = self.screenshot_image(screenshot_base)
if convert_type:
image_object = image_object.convert(convert_type)
image_object.save(file_name)
return image_object
[docs] def assert_screenshot(
self,
filename: str = '',
test_name: str = '',
name_suffix: str = '',
threshold: float | None = None,
delay: float | None = None,
remove: Element | list[Element] = None,
cut_box: Box = None,
hide: Element | list[Element] = None,
) -> None:
"""
Assert that the given screenshot matches the currently taken screenshot.
:param filename: The full name of the screenshot file.
If empty - filename will be generated based on test name & :class:`Element` ``name`` argument & platform.
:type filename: str
:param test_name: The custom test name for generated filename.
If empty - it will be determined automatically.
:type test_name: str
:param name_suffix: A suffix to add to the filename.
Useful for distinguishing between positive and negative cases for the same :class:`Element` during one test.
:type name_suffix: str
:param threshold: The acceptable threshold for comparing screenshots.
If :obj:`None` - takes default threshold or calculate its automatically based on screenshot size.
:type threshold: typing.Optional[int or float]
:param delay: The delay in seconds before taking the screenshot.
If :obj:`None` - takes default delay.
:type delay: typing.Optional[int or float]
:param remove: :class:`Element` to remove from the screenshot.
Can be a single element or a list of elements.
:type remove: typing.Optional[Element or typing.List[Element]]
:param cut_box: A :class:`.Box` specifying a region to cut from the screenshot.
If :obj:`None`, no region is cut.
:type cut_box: typing.Optional[Box]
:param hide: :class:`Element` to hide in the screenshot.
Can be a single element or a list of elements.
:type hide: typing.Optional[Element or typing.List[Element]]
:return: :obj:`None`
"""
delay = delay or VisualComparison.default_delay
remove = [remove] if type(remove) is not list and remove else remove
hide_before_screenshot(hide, is_optional=False, dw=self)
self.wait(delay)
hide_before_screenshot(VisualComparison.always_hide, is_optional=True, dw=self)
VisualComparison(self).assert_screenshot(
filename=filename,
test_name=test_name,
name_suffix=name_suffix,
threshold=threshold,
remove=remove,
fill_background=False,
cut_box=cut_box,
)
reveal_after_screenshot(VisualComparison.always_hide, dw=self)
[docs] def soft_assert_screenshot(
self,
filename: str = '',
test_name: str = '',
name_suffix: str = '',
threshold: float | None = None,
delay: float | None = None,
remove: Element | list[Element] = None,
cut_box: Box = None,
hide: Element | list[Element] = None,
) -> tuple[bool, str]:
"""
Compare the currently taken screenshot to the expected screenshot and return a result.
:param filename: The full name of the screenshot file.
If empty - filename will be generated based on test name & :class:`Element` ``name`` argument & platform.
:type filename: str
:param test_name: The custom test name for generated filename.
If empty - it will be determined automatically.
:type test_name: str
:param name_suffix: A suffix to add to the filename.
Useful for distinguishing between positive and negative cases for the same :class:`Element` during one test.
:type name_suffix: str
:param threshold: The acceptable threshold for comparing screenshots.
If :obj:`None` - takes default threshold or calculate its automatically based on screenshot size.
:type threshold: typing.Optional[int or float]
:param delay: The delay in seconds before taking the screenshot.
If :obj:`None` - takes default delay.
:type delay: typing.Optional[int or float]
:param remove: :class:`Element` to remove from the screenshot.
:type remove: typing.Optional[Element or typing.List[Element]]
:param cut_box: A :class:`.Box` specifying a region to cut from the screenshot.
If :obj:`None`, no region is cut.
:type cut_box: typing.Optional[Box]
:param hide: :class:`Element` to hide in the screenshot.
Can be a single element or a list of elements.
:return: :class:`typing.Tuple` (:class:`bool`, :class:`str`) - result state and result message
"""
try:
self.assert_screenshot(filename, test_name, name_suffix, threshold, delay, remove, cut_box, hide)
except AssertionError as exc:
exc = str(exc)
self.log(exc, level=LogLevel.ERROR)
return False, exc
return True, 'No visual mismatch found for entire screen'
[docs] def set_local_storage_item(self, items: list[dict]) -> DriverWrapper:
"""
Set one or more items in localStorage.
Each dict must contain ``key`` and ``value`` fields.
:param items: A list of dicts with ``key`` and ``value``.
:type items: typing.List[dict]
:return: :obj:`.DriverWrapper` - The current instance of the driver wrapper.
"""
self.execute_script(storage_set_item_js, items, 'localStorage')
return self
[docs] def set_session_storage_item(self, items: list[dict]) -> DriverWrapper:
"""
Set one or more items in sessionStorage.
Each dict must contain ``key`` and ``value`` fields.
:param items: A list of dicts with ``key`` and ``value``.
:type items: typing.List[dict]
:return: :obj:`.DriverWrapper` - The current instance of the driver wrapper.
"""
self.execute_script(storage_set_item_js, items, 'sessionStorage')
return self
[docs] def get_local_storage_item(self, key: str) -> str | None:
"""
Retrieve a single item from localStorage by key.
:param key: The key to look up.
:type key: str
:return: The value string, or :obj:`None` if the key does not exist.
:rtype: typing.Union[str, None]
"""
return self.execute_script(f'return localStorage.getItem({json.dumps(key)})')
[docs] def get_session_storage_item(self, key: str) -> str | None:
"""
Retrieve a single item from sessionStorage by key.
:param key: The key to look up.
:type key: str
:return: The value string, or :obj:`None` if the key does not exist.
:rtype: typing.Union[str, None]
"""
return self.execute_script(f'return sessionStorage.getItem({json.dumps(key)})')
[docs] def get_local_storage_items(self) -> dict:
"""
Retrieve all items from localStorage as a dictionary.
:return: A dict mapping every key to its value.
:rtype: dict
"""
return self.execute_script(storage_get_items_js, 'localStorage')
[docs] def get_session_storage_items(self) -> dict:
"""
Retrieve all items from sessionStorage as a dictionary.
:return: A dict mapping every key to its value.
:rtype: dict
"""
return self.execute_script(storage_get_items_js, 'sessionStorage')
[docs] def remove_local_storage_item(self, key: str) -> DriverWrapper:
"""
Remove a single item from localStorage by key.
:param key: The key to remove.
:type key: str
:return: :obj:`.DriverWrapper` - The current instance of the driver wrapper.
"""
self.execute_script(f'localStorage.removeItem({json.dumps(key)})')
return self
[docs] def remove_session_storage_item(self, key: str) -> DriverWrapper:
"""
Remove a single item from sessionStorage by key.
:param key: The key to remove.
:type key: str
:return: :obj:`.DriverWrapper` - The current instance of the driver wrapper.
"""
self.execute_script(f'sessionStorage.removeItem({json.dumps(key)})')
return self
[docs] def clear_local_storage(self) -> DriverWrapper:
"""
Remove all items from localStorage.
:return: :obj:`.DriverWrapper` - The current instance of the driver wrapper.
"""
self.execute_script('localStorage.clear()')
return self
[docs] def clear_session_storage(self) -> DriverWrapper:
"""
Remove all items from sessionStorage.
:return: :obj:`.DriverWrapper` - The current instance of the driver wrapper.
"""
self.execute_script('sessionStorage.clear()')
return self
def __init_base_class__(self) -> None:
"""
Get driver wrapper class in according to given driver source, and set him as base class
:return: None
"""
source_driver = self.__driver_container.driver
if isinstance(source_driver, PlaywrightDriver):
self.is_playwright = True
self._base_cls = PlayDriver
elif isinstance(source_driver, AppiumDriver):
self.is_appium = True
self._base_cls = MobileDriver
elif isinstance(source_driver, SeleniumDriver):
self.is_selenium = True
self._base_cls = WebDriver
else:
msg = (
f'Cannot initialize {self.__class__.__name__}: '
f'unsupported driver type "{type(source_driver).__name__}". '
f'Expected Playwright, Appium or Selenium driver instance'
)
raise DriverWrapperException(msg)
self._set_static(self._base_cls)
self._base_cls.__init__(self, driver_container=self.__driver_container)
for name, value in self.__dict__.items():
setattr(self.__class__, name, value)