Source code for mops.base.driver_wrapper

from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any, ClassVar

from appium.webdriver.webdriver import WebDriver as AppiumDriver
from playwright.sync_api import (
    Browser as PlaywrightBrowser,
    BrowserContext as PlaywrightContext,
    Page as PlaywrightDriver,
)
from selenium.webdriver.remote.webdriver import WebDriver as SeleniumDriver

from mops.abstraction.driver_wrapper_abc import DriverWrapperABC
from mops.exceptions import DriverWrapperException
from mops.js_scripts import storage_get_items_js, storage_set_item_js
from mops.mixins.internal_mixin import InternalMixin
from mops.mixins.objects.box import Box
from mops.mixins.objects.driver import Driver
from mops.mixins.objects.visual_comaprison_mixin import hide_before_screenshot, reveal_after_screenshot
from mops.playwright.play_driver import PlayDriver
from mops.selenium.driver.mobile_driver import MobileDriver
from mops.selenium.driver.web_driver import WebDriver
from mops.utils.internal_utils import extract_named_objects, get_attributes_from_object
from mops.utils.logs import Logging, LogLevel
from mops.visual_comparison import VisualComparison

if TYPE_CHECKING:
    from typing import Self

    from PIL import Image

    from mops.base.element import Element
    from mops.mixins.objects.box import Box
    from mops.mixins.objects.driver import Driver


[docs]class DriverWrapperSessions: all_sessions: ClassVar[list[DriverWrapper]] = []
[docs] @classmethod def add_session(cls, driver_wrapper: DriverWrapper) -> None: """ Add a :obj:`.DriverWrapper` object to the session pool. :param driver_wrapper: The :obj:`.DriverWrapper` instance to add to the pool. :return: None """ cls.all_sessions.append(driver_wrapper)
[docs] @classmethod def remove_session(cls, driver_wrapper: DriverWrapper) -> None: """ Remove a :obj:`.DriverWrapper` object from the session pool. :param driver_wrapper: The :obj:`.DriverWrapper` instance to remove from the pool. :return: None """ cls.all_sessions.remove(driver_wrapper)
[docs] @classmethod def sessions_count(cls) -> int: """ Get the count of initialized :obj:`.DriverWrapper` objects. :return: :obj:`int` - The number of initialized sessions. """ return len(cls.all_sessions)
[docs] @classmethod def first_session(cls) -> DriverWrapper | None: """ Get the first :obj:`.DriverWrapper` object from the session pool. :return: The first :obj:`.DriverWrapper` object in the pool, or `None` if no session exists. :rtype: typing.Union[DriverWrapper, None] """ return cls.all_sessions[0] if cls.all_sessions else None
[docs] @classmethod def is_connected(cls) -> bool: """ Check the connection status of any :obj:`.DriverWrapper` object in the pool. :return: :obj:`bool` - :obj:`True` if at least one :obj:`.DriverWrapper` object is available, otherwise :obj:`False`. """ return any(cls.all_sessions)
[docs]class DriverWrapper(InternalMixin, Logging, DriverWrapperABC): """ A wrapper class for managing web and mobile driver instances, supporting Selenium, Appium, and Playwright. This class serves as a crossroad for interacting with different driver types, allowing for flexible management of web and mobile sessions. It also provides platform-specific flags and information to assist with automation tasks. """ driver: SeleniumDriver | AppiumDriver | PlaywrightDriver context: PlaywrightContext browser: PlaywrightBrowser _object: str = 'driver_wrapper' _base_cls: type[PlayDriver, MobileDriver, WebDriver] = None session: DriverWrapperSessions = DriverWrapperSessions anchor: Element | None = None is_desktop: bool = False is_selenium: bool = False is_playwright: bool = False is_mobile_resolution: bool = False is_appium: bool = False is_mobile: bool = False is_tablet: bool = False is_ios: bool = False is_ios_tablet: bool = False is_ios_mobile: bool = False is_android: bool = False is_android_tablet: bool = False is_android_mobile: bool = False is_simulator: bool = False is_real_device: bool = False browser_name: str | None = None def __new__(cls, *args: Any, **kwargs: Any) -> Self: """Create a new DriverWrapper instance or a shadow wrapper for multi-session support.""" if cls.session.sessions_count() == 0: instance = super().__new__(cls) else: attrs = get_attributes_from_object(cls) attrs.pop('_configured', None) shadow_cls = type('ShadowDriverWrapper', (cls,), attrs) instance = super().__new__(shadow_cls) for name in extract_named_objects(instance, bool): setattr(instance, name, False) return instance def __repr__(self): cls = self.__class__ label = 'desktop' if cls.is_android: label = 'android' elif cls.is_ios: label = 'ios' return f'{cls.__name__}({self.label}={self.driver}) at {hex(id(self))}, platform={label}'
[docs] def __init__(self, driver: Driver): """ Initialize the DriverWrapper instance based on the provided driver source. This constructor sets up the driver wrapper, which can support Appium, Selenium, or Playwright drivers. It also manages session tracking and platform-specific configurations, such as mobile resolution and platform type. :param driver: :obj:`.Driver` object that holds appium / selenium / playwright driver to initialize """ self.__driver_container = driver self.session.add_session(self) self.label = f'{self.session.all_sessions.index(self) + 1}_driver' self.__init_base_class__() if driver.is_mobile_resolution: self.is_mobile_resolution = True self.is_desktop = False self.is_mobile = True
[docs] def quit(self, silent: bool = False, trace_path: str = 'trace.zip') -> None: """ Quit the driver instance. :param silent: If :obj:`True`, suppresses logging. :type silent: bool **Selenium/Appium:** :param trace_path: Compatibility argument for Playwright. :type trace_path: str **Playwright:** :param trace_path: Path to the trace file. :type trace_path: str :return: :obj:`None` """ if not silent: self.log('Quit driver instance') self._base_cls.quit(self, trace_path) self.session.remove_session(self)
[docs] def save_screenshot( self, file_name: str, screenshot_base: Image | bytes = None, convert_type: str | None = None, ) -> Image: """ Take a full screenshot of the driver and save it to the specified path/filename. :param file_name: Path or filename for the screenshot. :type file_name: str :param screenshot_base: Screenshot binary or image to use (optional). :type screenshot_base: :obj:`bytes`, :class:`PIL.Image.Image` :param convert_type: Image conversion type before saving (optional). :type convert_type: str :return: :class:`PIL.Image.Image` """ self.log('Save driver screenshot') image_object = screenshot_base if isinstance(screenshot_base, bytes) or screenshot_base is None: image_object = self.screenshot_image(screenshot_base) if convert_type: image_object = image_object.convert(convert_type) image_object.save(file_name) return image_object
[docs] def get_scroll_position(self) -> int: """ Return the current vertical scroll position of the page. :return: :class:`int` - Current vertical scroll offset in pixels. """ return self.execute_script('return window.pageYOffset')
[docs] def assert_screenshot( self, filename: str = '', test_name: str = '', name_suffix: str = '', threshold: float | None = None, delay: float | None = None, remove: Element | list[Element] = None, cut_box: Box = None, hide: Element | list[Element] = None, ) -> None: """ Assert that the given screenshot matches the currently taken screenshot. :param filename: The full name of the screenshot file. If empty - filename will be generated based on test name & :class:`Element` ``name`` argument & platform. :type filename: str :param test_name: The custom test name for generated filename. If empty - it will be determined automatically. :type test_name: str :param name_suffix: A suffix to add to the filename. Useful for distinguishing between positive and negative cases for the same :class:`Element` during one test. :type name_suffix: str :param threshold: The acceptable threshold for comparing screenshots. If :obj:`None` - takes default threshold or calculate its automatically based on screenshot size. :type threshold: typing.Optional[int or float] :param delay: The delay in seconds before taking the screenshot. If :obj:`None` - takes default delay. :type delay: typing.Optional[int or float] :param remove: :class:`Element` to remove from the screenshot. Can be a single element or a list of elements. :type remove: typing.Optional[Element or typing.List[Element]] :param cut_box: A :class:`.Box` specifying a region to cut from the screenshot. If :obj:`None`, no region is cut. :type cut_box: typing.Optional[Box] :param hide: :class:`Element` to hide in the screenshot. Can be a single element or a list of elements. :type hide: typing.Optional[Element or typing.List[Element]] :return: :obj:`None` """ delay = delay or VisualComparison.default_delay remove = [remove] if type(remove) is not list and remove else remove hide_before_screenshot(hide, is_optional=False, dw=self) self.wait(delay) hide_before_screenshot(VisualComparison.always_hide, is_optional=True, dw=self) VisualComparison(self).assert_screenshot( filename=filename, test_name=test_name, name_suffix=name_suffix, threshold=threshold, remove=remove, fill_background=False, cut_box=cut_box, ) reveal_after_screenshot(VisualComparison.always_hide, dw=self)
[docs] def soft_assert_screenshot( self, filename: str = '', test_name: str = '', name_suffix: str = '', threshold: float | None = None, delay: float | None = None, remove: Element | list[Element] = None, cut_box: Box = None, hide: Element | list[Element] = None, ) -> tuple[bool, str]: """ Compare the currently taken screenshot to the expected screenshot and return a result. :param filename: The full name of the screenshot file. If empty - filename will be generated based on test name & :class:`Element` ``name`` argument & platform. :type filename: str :param test_name: The custom test name for generated filename. If empty - it will be determined automatically. :type test_name: str :param name_suffix: A suffix to add to the filename. Useful for distinguishing between positive and negative cases for the same :class:`Element` during one test. :type name_suffix: str :param threshold: The acceptable threshold for comparing screenshots. If :obj:`None` - takes default threshold or calculate its automatically based on screenshot size. :type threshold: typing.Optional[int or float] :param delay: The delay in seconds before taking the screenshot. If :obj:`None` - takes default delay. :type delay: typing.Optional[int or float] :param remove: :class:`Element` to remove from the screenshot. :type remove: typing.Optional[Element or typing.List[Element]] :param cut_box: A :class:`.Box` specifying a region to cut from the screenshot. If :obj:`None`, no region is cut. :type cut_box: typing.Optional[Box] :param hide: :class:`Element` to hide in the screenshot. Can be a single element or a list of elements. :return: :class:`typing.Tuple` (:class:`bool`, :class:`str`) - result state and result message """ try: self.assert_screenshot(filename, test_name, name_suffix, threshold, delay, remove, cut_box, hide) except AssertionError as exc: exc = str(exc) self.log(exc, level=LogLevel.ERROR) return False, exc return True, 'No visual mismatch found for entire screen'
[docs] def set_local_storage_item(self, items: list[dict]) -> DriverWrapper: """ Set one or more items in localStorage. Each dict must contain ``key`` and ``value`` fields. :param items: A list of dicts with ``key`` and ``value``. :type items: typing.List[dict] :return: :obj:`.DriverWrapper` - The current instance of the driver wrapper. """ self.execute_script(storage_set_item_js, items, 'localStorage') return self
[docs] def set_session_storage_item(self, items: list[dict]) -> DriverWrapper: """ Set one or more items in sessionStorage. Each dict must contain ``key`` and ``value`` fields. :param items: A list of dicts with ``key`` and ``value``. :type items: typing.List[dict] :return: :obj:`.DriverWrapper` - The current instance of the driver wrapper. """ self.execute_script(storage_set_item_js, items, 'sessionStorage') return self
[docs] def get_local_storage_item(self, key: str) -> str | None: """ Retrieve a single item from localStorage by key. :param key: The key to look up. :type key: str :return: The value string, or :obj:`None` if the key does not exist. :rtype: typing.Union[str, None] """ return self.execute_script(f'return localStorage.getItem({json.dumps(key)})')
[docs] def get_session_storage_item(self, key: str) -> str | None: """ Retrieve a single item from sessionStorage by key. :param key: The key to look up. :type key: str :return: The value string, or :obj:`None` if the key does not exist. :rtype: typing.Union[str, None] """ return self.execute_script(f'return sessionStorage.getItem({json.dumps(key)})')
[docs] def get_local_storage_items(self) -> dict: """ Retrieve all items from localStorage as a dictionary. :return: A dict mapping every key to its value. :rtype: dict """ return self.execute_script(storage_get_items_js, 'localStorage')
[docs] def get_session_storage_items(self) -> dict: """ Retrieve all items from sessionStorage as a dictionary. :return: A dict mapping every key to its value. :rtype: dict """ return self.execute_script(storage_get_items_js, 'sessionStorage')
[docs] def remove_local_storage_item(self, key: str) -> DriverWrapper: """ Remove a single item from localStorage by key. :param key: The key to remove. :type key: str :return: :obj:`.DriverWrapper` - The current instance of the driver wrapper. """ self.execute_script(f'localStorage.removeItem({json.dumps(key)})') return self
[docs] def remove_session_storage_item(self, key: str) -> DriverWrapper: """ Remove a single item from sessionStorage by key. :param key: The key to remove. :type key: str :return: :obj:`.DriverWrapper` - The current instance of the driver wrapper. """ self.execute_script(f'sessionStorage.removeItem({json.dumps(key)})') return self
[docs] def clear_local_storage(self) -> DriverWrapper: """ Remove all items from localStorage. :return: :obj:`.DriverWrapper` - The current instance of the driver wrapper. """ self.execute_script('localStorage.clear()') return self
[docs] def clear_session_storage(self) -> DriverWrapper: """ Remove all items from sessionStorage. :return: :obj:`.DriverWrapper` - The current instance of the driver wrapper. """ self.execute_script('sessionStorage.clear()') return self
def __init_base_class__(self) -> None: """ Get driver wrapper class in according to given driver source, and set him as base class :return: None """ source_driver = self.__driver_container.driver if isinstance(source_driver, PlaywrightDriver): self.is_playwright = True self._base_cls = PlayDriver elif isinstance(source_driver, AppiumDriver): self.is_appium = True self._base_cls = MobileDriver elif isinstance(source_driver, SeleniumDriver): self.is_selenium = True self._base_cls = WebDriver else: msg = ( f'Cannot initialize {self.__class__.__name__}: ' f'unsupported driver type "{type(source_driver).__name__}". ' f'Expected Playwright, Appium or Selenium driver instance' ) raise DriverWrapperException(msg) self._set_static(self._base_cls) self._base_cls.__init__(self, driver_container=self.__driver_container) for name, value in self.__dict__.items(): setattr(self.__class__, name, value)