diff options
| author | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
|---|---|---|
| committer | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
| commit | 5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch) | |
| tree | 8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/dash/testing | |
| parent | b832d364da8c2efe09e3f75828caf73c50d01ce3 (diff) | |
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/dash/testing')
10 files changed, 1819 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/dash/testing/__init__.py b/venv/lib/python3.8/site-packages/dash/testing/__init__.py new file mode 100644 index 0000000..8d33331 --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/__init__.py @@ -0,0 +1,17 @@ +from contextlib import contextmanager + +from .._callback_context import context_value as _ctx +from .._utils import AttributeDict as _AD + + +@contextmanager +def ignore_register_page(): + previous = _ctx.get() + copied = _AD(previous) + copied.ignore_register_page = True + _ctx.set(copied) + + try: + yield + finally: + _ctx.set(previous) diff --git a/venv/lib/python3.8/site-packages/dash/testing/application_runners.py b/venv/lib/python3.8/site-packages/dash/testing/application_runners.py new file mode 100644 index 0000000..dc88afe --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/application_runners.py @@ -0,0 +1,533 @@ +import sys +import os +import time +import uuid +import shlex +import threading +import shutil +import subprocess +import logging +import inspect +import ctypes + +import runpy +import requests +import psutil + +# pylint: disable=no-member +import multiprocess + +from dash.testing.errors import ( + NoAppFoundError, + TestingTimeoutError, + ServerCloseError, + DashAppLoadingError, +) +from dash.testing import wait + +logger = logging.getLogger(__name__) + + +def import_app(app_file, application_name="app"): + """Import a dash application from a module. The import path is in dot + notation to the module. The variable named app will be returned. + + :Example: + + >>> app = import_app("my_app.app") + + Will import the application in module `app` of the package `my_app`. + + :param app_file: Path to the app (dot-separated). + :type app_file: str + :param application_name: The name of the dash application instance. + :raise: dash_tests.errors.NoAppFoundError + :return: App from module. + :rtype: dash.Dash + """ + try: + app_module = runpy.run_module(app_file) + app = app_module[application_name] + except KeyError as app_name_missing: + logger.exception("the app name cannot be found") + raise NoAppFoundError( + f"No dash `app` instance was found in {app_file}" + ) from app_name_missing + return app + + +class BaseDashRunner: + """Base context manager class for running applications.""" + + _next_port = 58050 + + def __init__(self, keep_open, stop_timeout, scheme="http", host="localhost"): + self.scheme = scheme + self.host = host + self.port = 8050 + self.started = None + self.keep_open = keep_open + self.stop_timeout = stop_timeout + self._tmp_app_path = None + + def start(self, *args, **kwargs): + raise NotImplementedError # pragma: no cover + + def stop(self): + raise NotImplementedError # pragma: no cover + + @staticmethod + def accessible(url): + try: + requests.get(url) + except requests.exceptions.RequestException: + return False + return True + + def __call__(self, *args, **kwargs): + return self.start(*args, **kwargs) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, traceback): + if self.started and not self.keep_open: + try: + logger.info("killing the app runner") + self.stop() + except TestingTimeoutError as cannot_stop_server: + raise ServerCloseError( + f"Cannot stop server within {self.stop_timeout}s timeout" + ) from cannot_stop_server + logger.info("__exit__ complete") + + @property + def url(self): + """The default server url.""" + return f"{self.scheme}://{self.host}:{self.port}" + + @property + def is_windows(self): + return sys.platform == "win32" + + @property + def tmp_app_path(self): + return self._tmp_app_path + + +class KillerThread(threading.Thread): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._old_threads = list(threading._active.keys()) # type: ignore[reportAttributeAccessIssue]; pylint: disable=W0212 + + def kill(self): + # Kill all the new threads. + for thread_id in list(threading._active): # type: ignore[reportAttributeAccessIssue]; pylint: disable=W0212 + if thread_id in self._old_threads: + continue + + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread_id), ctypes.py_object(SystemExit) + ) + if res == 0: + raise ValueError(f"Invalid thread id: {thread_id}") + if res > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread_id), None + ) + raise SystemExit("Stopping thread failure") + + +class ThreadedRunner(BaseDashRunner): + """Runs a dash application in a thread. + + This is the default flavor to use in dash integration tests. + """ + + def __init__(self, keep_open=False, stop_timeout=3): + super().__init__(keep_open=keep_open, stop_timeout=stop_timeout) + self.thread = None + + def running_and_accessible(self, url): + if self.thread.is_alive(): # type: ignore[reportOptionalMemberAccess] + return self.accessible(url) + raise DashAppLoadingError("Thread is not alive.") + + # pylint: disable=arguments-differ + def start(self, app, start_timeout=3, **kwargs): + """Start the app server in threading flavor.""" + + def run(): + app.scripts.config.serve_locally = True + app.css.config.serve_locally = True + + options = kwargs.copy() + options["dev_tools_disable_version_check"] = True + + if "port" not in kwargs: + options["port"] = self.port = BaseDashRunner._next_port + BaseDashRunner._next_port += 1 + else: + self.port = options["port"] + + try: + app.run(threaded=True, **options) + except SystemExit: + logger.info("Server stopped") + except Exception as error: + logger.exception(error) + raise error + + retries = 0 + + while not self.started and retries < 3: + try: + if self.thread: + if self.thread.is_alive(): + self.stop() + else: + self.thread.kill() + + self.thread = KillerThread(target=run) + self.thread.daemon = True + self.thread.start() + # wait until server is able to answer http request + wait.until( + lambda: self.running_and_accessible(self.url), timeout=start_timeout + ) + self.started = self.thread.is_alive() + except Exception as err: # pylint: disable=broad-except + logger.exception(err) + self.started = False + retries += 1 + time.sleep(1) + + self.started = self.thread.is_alive() # type: ignore[reportOptionalMemberAccess] + if not self.started: + raise DashAppLoadingError("threaded server failed to start") + + def stop(self): + self.thread.kill() # type: ignore[reportOptionalMemberAccess] + self.thread.join() # type: ignore[reportOptionalMemberAccess] + wait.until_not(self.thread.is_alive, self.stop_timeout) # type: ignore[reportOptionalMemberAccess] + self.started = False + + +class MultiProcessRunner(BaseDashRunner): + def __init__(self, keep_open=False, stop_timeout=3): + super().__init__(keep_open, stop_timeout) + self.proc = None + + # pylint: disable=arguments-differ + def start(self, app, start_timeout=3, **kwargs): + self.port = kwargs.get("port", 8050) + + def target(): + app.scripts.config.serve_locally = True + app.css.config.serve_locally = True + + options = kwargs.copy() + + try: + app.run(threaded=True, **options) + except SystemExit: + logger.info("Server stopped") + raise + except Exception as error: + logger.exception(error) + raise error + + self.proc = multiprocess.Process(target=target) # type: ignore[reportAttributeAccessIssue]; pylint: disable=not-callable + self.proc.start() + + wait.until(lambda: self.accessible(self.url), timeout=start_timeout) + self.started = True + + def stop(self): + process = psutil.Process(self.proc.pid) # type: ignore[reportOptionalMemberAccess] + + for proc in process.children(recursive=True): + try: + proc.kill() + except psutil.NoSuchProcess: + pass + + try: + process.kill() + except psutil.NoSuchProcess: + pass + + try: + process.wait(1) + except (psutil.TimeoutExpired, psutil.NoSuchProcess): + pass + + +class ProcessRunner(BaseDashRunner): + """Runs a dash application in a waitress-serve subprocess. + + This flavor is closer to production environment but slower. + """ + + def __init__(self, keep_open=False, stop_timeout=3): + super().__init__(keep_open=keep_open, stop_timeout=stop_timeout) + self.proc = None + + # pylint: disable=arguments-differ + def start( + self, + app_module=None, + application_name="app", + raw_command=None, + port=8050, + start_timeout=3, + ): + """Start the server with waitress-serve in process flavor.""" + if not (app_module or raw_command): # need to set a least one + logging.error( + "the process runner needs to start with at least one valid command" + ) + return + self.port = port + args = shlex.split( + raw_command + if raw_command + else f"waitress-serve --listen=0.0.0.0:{port} {app_module}:{application_name}.server", + posix=not self.is_windows, + ) + + logger.debug("start dash process with %s", args) + + try: + self.proc = subprocess.Popen( # pylint: disable=consider-using-with + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + # wait until server is able to answer http request + wait.until(lambda: self.accessible(self.url), timeout=start_timeout) + + except (OSError, ValueError): + logger.exception("process server has encountered an error") + self.started = False + self.stop() + return + + self.started = True + + def stop(self): + if self.proc: + try: + logger.info("proc.terminate with pid %s", self.proc.pid) + self.proc.terminate() + if self.tmp_app_path and os.path.exists(self.tmp_app_path): + logger.debug("removing temporary app path %s", self.tmp_app_path) + shutil.rmtree(self.tmp_app_path) + + self.proc.communicate( + timeout=self.stop_timeout # pylint: disable=unexpected-keyword-arg + ) + except subprocess.TimeoutExpired: + logger.exception( + "subprocess terminate not success, trying to kill " + "the subprocess in a safe manner" + ) + self.proc.kill() + self.proc.communicate() + logger.info("process stop completes!") + + +class RRunner(ProcessRunner): + def __init__(self, keep_open=False, stop_timeout=3): + super().__init__(keep_open=keep_open, stop_timeout=stop_timeout) + self.proc = None + + # pylint: disable=arguments-differ + def start(self, app, start_timeout=2, cwd=None): # type: ignore[reportIncompatibleMethodOverride] + """Start the server with subprocess and Rscript.""" + + if os.path.isfile(app) and os.path.exists(app): + # app is already a file in a dir - use that as cwd + if not cwd: + cwd = os.path.dirname(app) + logger.info("RRunner inferred cwd from app path: %s", cwd) + else: + # app is a string chunk, we make a temporary folder to store app.R + # and its relevant assets + tmp_dir = "/tmp" if not self.is_windows else os.getenv("TEMP") + tmp_dir = str(tmp_dir) # to satisfy type checking + hex_id = uuid.uuid4().hex + self._tmp_app_path = os.path.join(tmp_dir, hex_id) + try: + os.mkdir(self.tmp_app_path) # type: ignore[reportArgumentType] + except OSError: + logger.exception("cannot make temporary folder %s", self.tmp_app_path) + path = os.path.join(self.tmp_app_path, "app.R") # type: ignore[reportCallIssue] + + logger.info("RRunner start => app is R code chunk") + logger.info("make a temporary R file for execution => %s", path) + logger.debug("content of the dashR app") + logger.debug("%s", app) + + with open(path, "w", encoding="utf-8") as fp: + fp.write(app) + + app = path + + # try to find the path to the calling script to use as cwd + if not cwd: + for entry in inspect.stack(): + if "/dash/testing/" not in entry[1].replace("\\", "/"): + cwd = os.path.dirname(os.path.realpath(entry[1])) + logger.warning("get cwd from inspect => %s", cwd) + break + if cwd: + logger.info("RRunner inferred cwd from the Python call stack: %s", cwd) + + # try copying all valid sub folders (i.e. assets) in cwd to tmp + # note that the R assets folder name can be any valid folder name + assets = [ + os.path.join(cwd, _) + for _ in os.listdir(cwd) + if not _.startswith("__") and os.path.isdir(os.path.join(cwd, _)) + ] + + for asset in assets: + target = os.path.join(self.tmp_app_path, os.path.basename(asset)) # type: ignore[reportCallIssue] + if os.path.exists(target): + logger.debug("delete existing target %s", target) + shutil.rmtree(target) + logger.debug("copying %s => %s", asset, self.tmp_app_path) + shutil.copytree(asset, target) + logger.debug("copied with %s", os.listdir(target)) + + else: + logger.warning( + "RRunner found no cwd in the Python call stack. " + "You may wish to specify an explicit working directory " + "using something like: " + "dashr.run_server(app, cwd=os.path.dirname(__file__))" + ) + + logger.info("Run dashR app with Rscript => %s", app) + + args = shlex.split( + f"Rscript -e 'source(\"{os.path.realpath(app)}\")'", + posix=not self.is_windows, + ) + logger.debug("start dash process with %s", args) + + try: + self.proc = subprocess.Popen( # pylint: disable=consider-using-with + args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=self.tmp_app_path if self.tmp_app_path else cwd, + ) + # wait until server is able to answer http request + wait.until(lambda: self.accessible(self.url), timeout=start_timeout) + + except (OSError, ValueError): + logger.exception("process server has encountered an error") + self.started = False + return + + self.started = True + + +class JuliaRunner(ProcessRunner): + def __init__(self, keep_open=False, stop_timeout=3): + super().__init__(keep_open=keep_open, stop_timeout=stop_timeout) + self.proc = None + + # pylint: disable=arguments-differ + def start(self, app, start_timeout=30, cwd=None): # type: ignore[reportIncompatibleMethodOverride] + """Start the server with subprocess and julia.""" + + if os.path.isfile(app) and os.path.exists(app): + # app is already a file in a dir - use that as cwd + if not cwd: + cwd = os.path.dirname(app) + logger.info("JuliaRunner inferred cwd from app path: %s", cwd) + else: + # app is a string chunk, we make a temporary folder to store app.jl + # and its relevant assets + tmp_dir = "/tmp" if not self.is_windows else os.getenv("TEMP") + assert isinstance(tmp_dir, str) # to satisfy typing + hex_id = uuid.uuid4().hex + self._tmp_app_path = os.path.join(tmp_dir, hex_id) + assert isinstance(self.tmp_app_path, str) # to satisfy typing + try: + os.mkdir(self.tmp_app_path) + except OSError: + logger.exception("cannot make temporary folder %s", self.tmp_app_path) + path = os.path.join(self.tmp_app_path, "app.jl") + + logger.info("JuliaRunner start => app is Julia code chunk") + logger.info("make a temporary Julia file for execution => %s", path) + logger.debug("content of the Dash.jl app") + logger.debug("%s", app) + + with open(path, "w", encoding="utf-8") as fp: + fp.write(app) + + app = path + + # try to find the path to the calling script to use as cwd + if not cwd: + for entry in inspect.stack(): + if "/dash/testing/" not in entry[1].replace("\\", "/"): + cwd = os.path.dirname(os.path.realpath(entry[1])) + logger.warning("get cwd from inspect => %s", cwd) + break + if cwd: + logger.info( + "JuliaRunner inferred cwd from the Python call stack: %s", cwd + ) + + # try copying all valid sub folders (i.e. assets) in cwd to tmp + # note that the R assets folder name can be any valid folder name + assets = [ + os.path.join(cwd, _) + for _ in os.listdir(cwd) + if not _.startswith("__") and os.path.isdir(os.path.join(cwd, _)) + ] + + for asset in assets: + target = os.path.join(self.tmp_app_path, os.path.basename(asset)) + if os.path.exists(target): + logger.debug("delete existing target %s", target) + shutil.rmtree(target) + logger.debug("copying %s => %s", asset, self.tmp_app_path) + shutil.copytree(asset, target) + logger.debug("copied with %s", os.listdir(target)) + + else: + logger.warning( + "JuliaRunner found no cwd in the Python call stack. " + "You may wish to specify an explicit working directory " + "using something like: " + "dashjl.run_server(app, cwd=os.path.dirname(__file__))" + ) + + logger.info("Run Dash.jl app with julia => %s", app) + + args = shlex.split( + f"julia --project {os.path.realpath(app)}", posix=not self.is_windows + ) + logger.debug("start Dash.jl process with %s", args) + + try: + self.proc = subprocess.Popen( # pylint: disable=consider-using-with + args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=self.tmp_app_path if self.tmp_app_path else cwd, + ) + # wait until server is able to answer http request + wait.until(lambda: self.accessible(self.url), timeout=start_timeout) + + except (OSError, ValueError): + logger.exception("process server has encountered an error") + self.started = False + return + + self.started = True diff --git a/venv/lib/python3.8/site-packages/dash/testing/browser.py b/venv/lib/python3.8/site-packages/dash/testing/browser.py new file mode 100644 index 0000000..b6b1ee1 --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/browser.py @@ -0,0 +1,671 @@ +# pylint: disable=missing-docstring +import os +import sys +import time +import logging +from typing import Union, Optional +import warnings +import percy +import requests + +from selenium import webdriver +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.by import By +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.common.action_chains import ActionChains + +from selenium.common.exceptions import ( + WebDriverException, + TimeoutException, + MoveTargetOutOfBoundsException, +) + +from dash.testing.wait import ( + text_to_equal, + style_to_equal, + class_to_equal, + contains_text, + contains_class, + until, +) +from dash.testing.dash_page import DashPageMixin +from dash.testing.errors import DashAppLoadingError, BrowserError, TestingTimeoutError +from dash.testing.consts import SELENIUM_GRID_DEFAULT + + +logger = logging.getLogger(__name__) + + +class Browser(DashPageMixin): + _url: str + + # pylint: disable=too-many-arguments + def __init__( + self, + browser: str, + remote: bool = False, + remote_url: Optional[str] = None, + headless: bool = False, + options: Optional[Union[dict, list]] = None, + download_path: str = "", + percy_run: bool = True, + percy_finalize: bool = True, + percy_assets_root: str = "", + wait_timeout: int = 10, + pause: bool = False, + ): + self._browser = browser.lower() + self._remote_url = remote_url + self._remote = ( + True if remote_url and remote_url != SELENIUM_GRID_DEFAULT else remote + ) + self._headless = headless + self._options = options + self._download_path = download_path + self._wait_timeout = wait_timeout + self._percy_finalize = percy_finalize + self._percy_run = percy_run + self._pause = pause + + self._driver = until(self.get_webdriver, timeout=1) + self._driver.implicitly_wait(2) + + self._wd_wait = WebDriverWait(self.driver, wait_timeout) + self._last_ts = 0 + self._url = "" + + self._window_idx = 0 # switch browser tabs + + if self._percy_run: + self.percy_runner = percy.Runner( + loader=percy.ResourceLoader( + webdriver=self.driver, + base_url="/assets", + root_dir=percy_assets_root, + ) + ) + self.percy_runner.initialize_build() + + logger.debug("initialize browser with arguments") + logger.debug(" headless => %s", self._headless) + logger.debug(" download_path => %s", self._download_path) + logger.debug(" percy asset root => %s", os.path.abspath(percy_assets_root)) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, traceback): + try: + self.driver.quit() + if self._percy_run and self._percy_finalize: + logger.info("percy runner finalize build now") + self.percy_runner.finalize_build() + else: + logger.info("percy finalize relies on CI job") + except WebDriverException: + logger.exception("webdriver quit was not successful") + except percy.errors.Error: # type: ignore[reportAttributeAccessIssue] + logger.exception("percy runner failed to finalize properly") + + def visit_and_snapshot( + self, + resource_path: str, + hook_id: str, + wait_for_callbacks=True, + convert_canvases=False, + assert_check=True, + stay_on_page=False, + widths=None, + ): + path = resource_path.lstrip("/") + try: + if path != resource_path: + logger.warning("we stripped the left '/' in resource_path") + self.server_url = self.server_url + self.driver.get(f"{self.server_url.rstrip('/')}/{path}") + + # wait for the hook_id to present and all callbacks get fired + self.wait_for_element_by_id(hook_id) + self.percy_snapshot( + path, + wait_for_callbacks=wait_for_callbacks, + convert_canvases=convert_canvases, + widths=widths, + ) + if assert_check: + assert not self.find_elements( + "div.dash-debug-alert" + ), "devtools should not raise an error alert" + if not stay_on_page: + self.driver.back() + except WebDriverException as e: + logger.exception("snapshot at resource %s error", path) + raise e + + def percy_snapshot( + self, name="", wait_for_callbacks=False, convert_canvases=False, widths=None + ): + """percy_snapshot - visual test api shortcut to `percy_runner.snapshot`. + It also combines the snapshot `name` with the Python version, + args: + - name: combined with the python version to give the final snapshot name + - wait_for_callbacks: default False, whether to wait for Dash callbacks, + after an extra second to ensure that any relevant callbacks have + been initiated + - convert_canvases: default False, whether to convert all canvas elements + in the DOM into static images for percy to see. They will be restored + after the snapshot is complete. + - widths: a list of pixel widths for percy to render the page with. Note + that this does not change the browser in which the DOM is constructed, + so the width will only affect CSS, not JS-driven layout. + Defaults to [1280] + """ + if widths is None: + widths = [1280] + try: + import asgiref # pylint: disable=unused-import, import-outside-toplevel # noqa: F401, C0415 + + name += "_async" + except ImportError: + pass + + logger.info("taking snapshot name => %s", name) + try: + if wait_for_callbacks: + # the extra one second sleep adds safe margin in the context + # of wait_for_callbacks + time.sleep(1) + until(self._wait_for_callbacks, timeout=40, poll=0.3) + except TestingTimeoutError: + # API will log the error but this TimeoutError should not block + # the test execution to continue and it will still do a snapshot + # as diff reference for the build run. + logger.error( + "wait_for_callbacks failed => status of invalid rqs %s", + self.redux_state_rqs, + ) + + if convert_canvases: + self.driver.execute_script( + """ + const stash = window._canvasStash = []; + Array.from(document.querySelectorAll('canvas')).forEach(c => { + const i = document.createElement('img'); + i.src = c.toDataURL(); + i.width = c.width; + i.height = c.height; + i.setAttribute('style', c.getAttribute('style')); + i.className = c.className; + i.setAttribute('data-canvasnum', stash.length); + stash.push(c); + c.parentElement.insertBefore(i, c); + c.parentElement.removeChild(c); + }); + """ + ) + + try: + self.percy_runner.snapshot(name=name, widths=widths) + except requests.HTTPError as err: + # Ignore retries. + if err.request.status_code != 400: # type: ignore[reportAttributeAccessIssue] + raise err + + if convert_canvases: + self.driver.execute_script( + """ + const stash = window._canvasStash; + Array.from( + document.querySelectorAll('img[data-canvasnum]') + ).forEach(i => { + const c = stash[+i.getAttribute('data-canvasnum')]; + i.parentElement.insertBefore(c, i); + i.parentElement.removeChild(i); + }); + delete window._canvasStash; + """ + ) + + def take_snapshot(self, name: str): + """Hook method to take snapshot when a selenium test fails. The + snapshot is placed under. + + - `/tmp/dash_artifacts` in linux + - `%TEMP` in windows + with a filename combining test case name and the + running selenium session id + """ + target = ( + "/tmp/dash_artifacts" if not self._is_windows() else os.getenv("TEMP", "") + ) + + if not os.path.exists(target): + try: + os.mkdir(target) + except OSError: + logger.exception("cannot make artifacts") + + self.driver.save_screenshot(f"{target}/{name}_{self.session_id}.png") + + def find_element(self, selector, attribute="CSS_SELECTOR"): + """find_element returns the first found element by the attribute `selector` + shortcut to `driver.find_element(By.CSS_SELECTOR, ...)`. + args: + - attribute: the attribute type to search for, aligns with the Selenium + API's `By` class. default "CSS_SELECTOR" + valid values: "CSS_SELECTOR", "ID", "NAME", "TAG_NAME", + "CLASS_NAME", "LINK_TEXT", "PARTIAL_LINK_TEXT", "XPATH" + """ + return self.driver.find_element(getattr(By, attribute.upper()), selector) + + def find_elements(self, selector, attribute="CSS_SELECTOR"): + """find_elements returns a list of all elements matching the attribute + `selector`. Shortcut to `driver.find_elements(By.CSS_SELECTOR, ...)`. + args: + - attribute: the attribute type to search for, aligns with the Selenium + API's `By` class. default "CSS_SELECTOR" + valid values: "CSS_SELECTOR", "ID", "NAME", "TAG_NAME", + "CLASS_NAME", "LINK_TEXT", "PARTIAL_LINK_TEXT", "XPATH" + """ + return self.driver.find_elements(getattr(By, attribute.upper()), selector) + + def _get_element(self, elem_or_selector): + if isinstance(elem_or_selector, str): + return self.find_element(elem_or_selector) + return elem_or_selector + + def _wait_for(self, method, timeout, msg): + """Abstract generic pattern for explicit WebDriverWait.""" + try: + _wait = ( + self._wd_wait + if timeout is None + else WebDriverWait(self.driver, timeout) + ) + logger.debug( + "method, timeout, poll => %s %s %s", + method, + _wait._timeout, # pylint: disable=protected-access + _wait._poll, # pylint: disable=protected-access + ) + + return _wait.until(method) + except Exception as err: + if callable(msg): + message = msg(self.driver) + else: + message = msg + raise TimeoutException(str(message)) from err + + def wait_for_element(self, selector, timeout=None): + """wait_for_element is shortcut to `wait_for_element_by_css_selector` + timeout if not set, equals to the fixture's `wait_timeout`.""" + return self.wait_for_element_by_css_selector(selector, timeout) + + def wait_for_element_by_css_selector(self, selector, timeout=None): + """Explicit wait until the element is present, timeout if not set, + equals to the fixture's `wait_timeout` shortcut to `WebDriverWait` with + `EC.presence_of_element_located`.""" + return self._wait_for( + EC.presence_of_element_located( + (By.CSS_SELECTOR, selector), + ), + timeout, + f"timeout {timeout or self._wait_timeout}s => waiting for selector {selector}", + ) + + def wait_for_no_elements(self, selector, timeout=None): + """Explicit wait until an element is NOT found. timeout defaults to + the fixture's `wait_timeout`.""" + until( + # if we use get_elements it waits a long time to see if they appear + # so this one calls out directly to execute_script + lambda: self.driver.execute_script( + f"return document.querySelectorAll('{selector}').length" + ) + == 0, + timeout or self._wait_timeout, + ) + + def wait_for_element_by_id(self, element_id, timeout=None): + """Explicit wait until the element is present, timeout if not set, + equals to the fixture's `wait_timeout` shortcut to `WebDriverWait` with + `EC.presence_of_element_located`.""" + return self._wait_for( + EC.presence_of_element_located( + (By.ID, element_id), + ), + timeout, + f"timeout {timeout or self._wait_timeout}s => waiting for element id {element_id}", + ) + + def wait_for_class_to_equal(self, selector, classname, timeout=None): + """Explicit wait until the element's class has expected `value` timeout + if not set, equals to the fixture's `wait_timeout` shortcut to + `WebDriverWait` with customized `class_to_equal` condition.""" + return self._wait_for( + method=class_to_equal(selector, classname), + timeout=timeout, + msg=f"classname => {classname} not found within {timeout or self._wait_timeout}s", + ) + + def wait_for_style_to_equal(self, selector, style, val, timeout=None): + """Explicit wait until the element's style has expected `value` timeout + if not set, equals to the fixture's `wait_timeout` shortcut to + `WebDriverWait` with customized `style_to_equal` condition.""" + return self._wait_for( + method=style_to_equal(selector, style, val), + timeout=timeout, + msg=f"style val => {style} {val} not found within {timeout or self._wait_timeout}s", + ) + + def wait_for_text_to_equal(self, selector, text, timeout=None): + """Explicit wait until the element's text equals the expected `text`. + + timeout if not set, equals to the fixture's `wait_timeout` + shortcut to `WebDriverWait` with customized `text_to_equal` + condition. + """ + method = text_to_equal(selector, text, timeout or self.wait_timeout) + + return self._wait_for( + method=method, + timeout=timeout, + msg=method.message, + ) + + def wait_for_contains_class(self, selector, classname, timeout=None): + """Explicit wait until the element's classes contains the expected `classname`. + + timeout if not set, equals to the fixture's `wait_timeout` + shortcut to `WebDriverWait` with customized `contains_class` + condition. + """ + return self._wait_for( + method=contains_class(selector, classname), + timeout=timeout, + msg=f"classname -> {classname} not found inside element within {timeout or self._wait_timeout}s", + ) + + def wait_for_contains_text(self, selector, text, timeout=None): + """Explicit wait until the element's text contains the expected `text`. + + timeout if not set, equals to the fixture's `wait_timeout` + shortcut to `WebDriverWait` with customized `contains_text` + condition. + """ + method = contains_text(selector, text, timeout or self.wait_timeout) + return self._wait_for( + method=method, + timeout=timeout, + msg=method.message, + ) + + def wait_for_page(self, url=None, timeout=10): + """wait_for_page navigates to the url in webdriver wait until the + renderer is loaded in browser. + + use the `server_url` if url is not provided. + """ + self.driver.get(self.server_url if url is None else url) + try: + self.wait_for_element_by_css_selector( + self.dash_entry_locator, timeout=timeout + ) + except TimeoutException as exc: + logger.exception("dash server is not loaded within %s seconds", timeout) + logs = "\n".join((str(log) for log in self.get_logs())) # type: ignore[reportOptionalIterable] + logger.debug(logs) + html = self.find_element("body").get_property("innerHTML") + raise DashAppLoadingError( + "the expected Dash react entry point cannot be loaded" + f" in browser\n HTML => {html}\n Console Logs => {logs}\n" + ) from exc + + if self._pause: + import pdb # pylint: disable=import-outside-toplevel + + pdb.set_trace() # pylint: disable=forgotten-debug-statement + + def select_dcc_dropdown(self, elem_or_selector, value=None, index=None): + dropdown = self._get_element(elem_or_selector) + dropdown.click() + + menu = dropdown.find_element(By.CSS_SELECTOR, "div.Select-menu-outer") + logger.debug("the available options are %s", "|".join(menu.text.split("\n"))) + + options = menu.find_elements(By.CSS_SELECTOR, "div.VirtualizedSelectOption") + if options: + if isinstance(index, int): + options[index].click() + return + + for option in options: + if option.text == value: + option.click() + return + + logger.error( + "cannot find matching option using value=%s or index=%s", value, index + ) + + def toggle_window(self): + """Switch between the current working window and the new opened one.""" + idx = (self._window_idx + 1) % 2 + self.switch_window(idx=idx) + self._window_idx += 1 + + def switch_window(self, idx=0): + """Switch to window by window index shortcut to + `driver.switch_to.window`.""" + if len(self.driver.window_handles) <= idx: + raise BrowserError("there is no second window in Browser") + + self.driver.switch_to.window(self.driver.window_handles[idx]) + + def open_new_tab(self, url=None): + """Open a new tab in browser url is not set, equals to `server_url`.""" + self.driver.execute_script( + f'window.open("{url or self.server_url}", "new window")' + ) + + def get_webdriver(self): + return getattr(self, f"_get_{self._browser}")() + + def _get_wd_options(self): + options = ( + self._options[0] + if self._options and isinstance(self._options, list) + else getattr(webdriver, self._browser).options.Options() + ) + + if self._headless: + options.add_argument("--headless") + + return options + + def _get_chrome(self): + options = self._get_wd_options() + + if "DASH_TEST_CHROMEPATH" in os.environ: + options.binary_location = os.environ["DASH_TEST_CHROMEPATH"] + + options.add_experimental_option( + "prefs", + { + "download.default_directory": self.download_path, + "download.prompt_for_download": False, + "download.directory_upgrade": True, + "safebrowsing.enabled": False, + "safebrowsing.disable_download_protection": True, + }, + ) + + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--no-sandbox") + options.add_argument("--disable-gpu") + options.add_argument("--remote-debugging-port=0") + + options.set_capability("goog:loggingPrefs", {"browser": "SEVERE"}) + + chrome = ( + webdriver.Remote(command_executor=self._remote_url, options=options) # type: ignore[reportAttributeAccessIssue] + if self._remote + else webdriver.Chrome(options=options) + ) + + # https://bugs.chromium.org/p/chromium/issues/detail?id=696481 + if self._headless: + # pylint: disable=protected-access + chrome.command_executor._commands["send_command"] = ( # type: ignore[reportArgumentType] + "POST", + "/session/$sessionId/chromium/send_command", + ) + params = { + "cmd": "Page.setDownloadBehavior", + "params": {"behavior": "allow", "downloadPath": self.download_path}, + } + res = chrome.execute("send_command", params) + logger.debug("enabled headless download returns %s", res) + + chrome.set_window_position(0, 0) + return chrome + + def _get_firefox(self): + options = self._get_wd_options() + + options.set_capability("marionette", True) + + options.set_preference("browser.download.dir", self.download_path) + options.set_preference("browser.download.folderList", 2) + options.set_preference( + "browser.helperApps.neverAsk.saveToDisk", + "application/octet-stream", # this MIME is generic for binary + ) + if not self._remote_url and self._remote: + raise TypeError("remote_url was not provided but required for Firefox") + + return ( + webdriver.Remote( + command_executor=self._remote_url, # type: ignore[reportTypeArgument] + options=options, + ) + if self._remote + else webdriver.Firefox(options=options) + ) + + @staticmethod + def _is_windows(): + return sys.platform == "win32" + + def multiple_click(self, elem_or_selector, clicks, delay=None): + """multiple_click click the element with number of `clicks`.""" + for _ in range(clicks): + self._get_element(elem_or_selector).click() + if delay: + time.sleep(delay) + + def clear_input(self, elem_or_selector): + """Simulate key press to clear the input.""" + elem = self._get_element(elem_or_selector) + logger.debug("clear input with %s => %s", elem_or_selector, elem) + ( + ActionChains(self.driver) + .move_to_element(elem) + .pause(0.2) + .click(elem) + .send_keys(Keys.END) + .key_down(Keys.SHIFT) + .send_keys(Keys.HOME) + .key_up(Keys.SHIFT) + .send_keys(Keys.DELETE) + ).perform() + + def zoom_in_graph_by_ratio( + self, elem_or_selector, start_fraction=0.5, zoom_box_fraction=0.2, compare=True + ): + """Zoom out a graph with a zoom box fraction of component dimension + default start at middle with a rectangle of 1/5 of the dimension use + `compare` to control if we check the svg get changed.""" + elem = self._get_element(elem_or_selector) + + prev = elem.get_attribute("innerHTML") + w, h = elem.size["width"], elem.size["height"] + try: + ActionChains(self.driver).move_to_element_with_offset( + elem, w * start_fraction, h * start_fraction + ).drag_and_drop_by_offset( + elem, w * zoom_box_fraction, h * zoom_box_fraction + ).perform() + except MoveTargetOutOfBoundsException: + logger.exception("graph offset outside of the boundary") + if compare: + assert prev != elem.get_attribute( + "innerHTML" + ), "SVG content should be different after zoom" + + def click_at_coord_fractions(self, elem_or_selector, fx, fy): + elem = self._get_element(elem_or_selector) + + ActionChains(self.driver).move_to_element_with_offset( + elem, elem.size["width"] * fx, elem.size["height"] * fy + ).click().perform() + + def get_logs(self): + """Return a list of `SEVERE` level logs after last reset time stamps + (default to 0, resettable by `reset_log_timestamp`. + + Chrome only + """ + if self._browser == "chrome": + return [ + entry + for entry in self.driver.get_log("browser") + if entry["timestamp"] > self._last_ts + ] + warnings.warn("get_logs always return None with webdrivers other than Chrome") + return None + + def reset_log_timestamp(self): + """reset_log_timestamp only work with chrome webdriver.""" + if self._browser == "chrome": + entries = self.driver.get_log("browser") + if entries: + self._last_ts = entries[-1]["timestamp"] + + @property + def driver(self): + """Expose the selenium webdriver as fixture property.""" + return self._driver + + @property + def session_id(self): + return self.driver.session_id + + @property + def server_url(self) -> str: + return self._url + + @server_url.setter + def server_url(self, value): + """Set the server url so the selenium is aware of the local server + port. + + It also implicitly calls `wait_for_page`. + """ + self._url = value + self.wait_for_page() + + @property + def download_path(self): + return self._download_path + + @property + def wait_timeout(self): + return self._wait_timeout + + @wait_timeout.setter + def wait_timeout(self, value): + self._wait_timeout = value + self._wd_wait = WebDriverWait(self.driver, value) diff --git a/venv/lib/python3.8/site-packages/dash/testing/composite.py b/venv/lib/python3.8/site-packages/dash/testing/composite.py new file mode 100644 index 0000000..8a3abee --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/composite.py @@ -0,0 +1,46 @@ +from dash.testing.browser import Browser + + +class DashComposite(Browser): + def __init__(self, server, **kwargs): + super().__init__(**kwargs) + self.server = server + + def start_server(self, app, navigate=True, **kwargs): + """Start the local server with app.""" + + # start server with app and pass Dash arguments + self.server(app, **kwargs) + + if navigate: + # set the default server_url, it implicitly call wait_for_page + self.server_url = self.server.url + + +class DashRComposite(Browser): + def __init__(self, server, **kwargs): + super().__init__(**kwargs) + self.server = server + + def start_server(self, app, cwd=None): + + # start server with dashR app. The app sets its own run_server args + # on the R side, but we support overriding the automatic cwd + self.server(app, cwd=cwd) + + # set the default server_url, it implicitly call wait_for_page + self.server_url = self.server.url + + +class DashJuliaComposite(Browser): + def __init__(self, server, **kwargs): + super().__init__(**kwargs) + self.server = server + + def start_server(self, app, cwd=None): + # start server with Dash.jl app. The app sets its own run_server args + # on the Julia side, but we support overriding the automatic cwd + self.server(app, cwd=cwd) + + # set the default server_url, it implicitly call wait_for_page + self.server_url = self.server.url diff --git a/venv/lib/python3.8/site-packages/dash/testing/consts.py b/venv/lib/python3.8/site-packages/dash/testing/consts.py new file mode 100644 index 0000000..ea62a29 --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/consts.py @@ -0,0 +1 @@ +SELENIUM_GRID_DEFAULT = "http://localhost:4444/wd/hub" diff --git a/venv/lib/python3.8/site-packages/dash/testing/dash_page.py b/venv/lib/python3.8/site-packages/dash/testing/dash_page.py new file mode 100644 index 0000000..2f4d69f --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/dash_page.py @@ -0,0 +1,99 @@ +# type: ignore[reportAttributeAccessIssue] +# Ignore attribute access issues when type checking because mixin +# class depends on other class lineage to supply things. We could use +# a protocol definition here instead… + +from bs4 import BeautifulSoup + + +class DashPageMixin: + def _get_dash_dom_by_attribute(self, attr): + return BeautifulSoup( + self.find_element(self.dash_entry_locator).get_attribute(attr), "lxml" + ) + + @property + def devtools_error_count_locator(self): + return ".test-devtools-error-count" + + @property + def dash_entry_locator(self): + return "#react-entry-point" + + @property + def dash_outerhtml_dom(self): + return self._get_dash_dom_by_attribute("outerHTML") + + @property + def dash_innerhtml_dom(self): + return self._get_dash_dom_by_attribute("innerHTML") + + @property + def redux_state_paths(self): + return self.driver.execute_script( + """ + var p = window.store.getState().paths; + return {strs: p.strs, objs: p.objs} + """ + ) + + @property + def redux_state_rqs(self): + return self.driver.execute_script( + """ + + // Check for legacy `pendingCallbacks` store prop (compatibility for Dash matrix testing) + var pendingCallbacks = window.store.getState().pendingCallbacks; + if (pendingCallbacks) { + return pendingCallbacks.map(function(cb) { + var out = {}; + for (var key in cb) { + if (typeof cb[key] !== 'function') { out[key] = cb[key]; } + } + return out; + }); + } + + // Otherwise, use the new `callbacks` store prop + var callbacksState = Object.assign({}, window.store.getState().callbacks); + delete callbacksState.stored; + delete callbacksState.completed; + + return Array.prototype.concat.apply([], Object.values(callbacksState)); + """ + ) + + @property + def redux_state_is_loading(self): + return self.driver.execute_script( + """ + return window.store.getState().isLoading; + """ + ) + + @property + def window_store(self): + return self.driver.execute_script("return window.store") + + def _wait_for_callbacks(self): + return (not self.window_store) or self.redux_state_rqs == [] + + def get_local_storage(self, store_id="local"): + return self.driver.execute_script( + f"return JSON.parse(window.localStorage.getItem('{store_id}'));" + ) + + def get_session_storage(self, session_id="session"): + return self.driver.execute_script( + f"return JSON.parse(window.sessionStorage.getItem('{session_id}'));" + ) + + def clear_local_storage(self): + self.driver.execute_script("window.localStorage.clear()") + + def clear_session_storage(self): + self.driver.execute_script("window.sessionStorage.clear()") + + def clear_storage(self): + self.clear_local_storage() + self.clear_session_storage() diff --git a/venv/lib/python3.8/site-packages/dash/testing/errors.py b/venv/lib/python3.8/site-packages/dash/testing/errors.py new file mode 100644 index 0000000..772386d --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/errors.py @@ -0,0 +1,26 @@ +class DashTestingError(Exception): + """Base error for pytest-dash.""" + + +class InvalidDriverError(DashTestingError): + """An invalid selenium driver was specified.""" + + +class NoAppFoundError(DashTestingError): + """No `app` was found in the file.""" + + +class DashAppLoadingError(DashTestingError): + """The dash app failed to load.""" + + +class ServerCloseError(DashTestingError): + """The server cannot be closed.""" + + +class TestingTimeoutError(DashTestingError): + """All timeout error about dash testing.""" + + +class BrowserError(DashTestingError): + """All browser relevant errors.""" diff --git a/venv/lib/python3.8/site-packages/dash/testing/newhooks.py b/venv/lib/python3.8/site-packages/dash/testing/newhooks.py new file mode 100644 index 0000000..07e850d --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/newhooks.py @@ -0,0 +1,2 @@ +def pytest_setup_options(): + """Called before webdriver is initialized.""" diff --git a/venv/lib/python3.8/site-packages/dash/testing/plugin.py b/venv/lib/python3.8/site-packages/dash/testing/plugin.py new file mode 100644 index 0000000..1b917d7 --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/plugin.py @@ -0,0 +1,263 @@ +# pylint: disable=missing-docstring,redefined-outer-name +from typing import Any + +import pytest +from .consts import SELENIUM_GRID_DEFAULT + + +# pylint: disable=too-few-public-methods +class MissingDashTesting: + def __init__(self, **kwargs): + raise Exception( + "dash[testing] was not installed. " + "Please install to use the dash testing fixtures." + ) + + def __enter__(self) -> Any: + """Implemented to satisfy type checking.""" + + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> bool: + """Implemented to satisfy type checking.""" + + return False + + +try: + from dash.testing.application_runners import ( + ThreadedRunner, + ProcessRunner, + RRunner, + JuliaRunner, + MultiProcessRunner, + ) + from dash.testing.browser import Browser + from dash.testing.composite import DashComposite, DashRComposite, DashJuliaComposite + + # pylint: disable=unused-import + import dash_testing_stub # noqa: F401 + + _installed = True +except ImportError: + # Running pytest without dash[testing] installed. + ThreadedRunner = MissingDashTesting + ProcessRunner = MissingDashTesting + MultiProcessRunner = MissingDashTesting + RRunner = MissingDashTesting + JuliaRunner = MissingDashTesting + Browser = MissingDashTesting + DashComposite = MissingDashTesting + DashRComposite = MissingDashTesting + DashJuliaComposite = MissingDashTesting + _installed = False + + +def pytest_addoption(parser): + if not _installed: + return + + dash = parser.getgroup("Dash", "Dash Integration Tests") + + dash.addoption( + "--webdriver", + choices=("Chrome", "Firefox"), + default="Chrome", + help="Name of the selenium driver to use", + ) + + dash.addoption( + "--remote", action="store_true", help="instruct pytest to use selenium grid" + ) + + dash.addoption( + "--remote-url", + action="store", + default=SELENIUM_GRID_DEFAULT, + help="set a different selenium grid remote url if other than default", + ) + + dash.addoption( + "--headless", action="store_true", help="set this flag to run in headless mode" + ) + + dash.addoption( + "--percy-assets", + action="store", + default="tests/assets", + help="configure how Percy will discover your app's assets", + ) + + dash.addoption( + "--nopercyfinalize", + action="store_false", + help="set this flag to control percy finalize at CI level", + ) + + dash.addoption( + "--pause", + action="store_true", + help="pause using pdb after opening the test app, so you can interact with it", + ) + + +@pytest.mark.tryfirst +def pytest_addhooks(pluginmanager): + if not _installed: + return + # https://github.com/pytest-dev/pytest-xdist/blob/974bd566c599dc6a9ea291838c6f226197208b46/xdist/plugin.py#L67 + # avoid warnings with pytest-2.8 + from dash.testing import newhooks # pylint: disable=import-outside-toplevel + + method = getattr(pluginmanager, "add_hookspecs", None) + if method is None: + method = pluginmanager.addhooks # pragma: no cover + method(newhooks) + + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): # pylint: disable=unused-argument + # execute all other hooks to obtain the report object + outcome = yield + if not _installed: + return + rep = outcome.get_result() + + # we only look at actual failing test calls, not setup/teardown + if rep.when == "call" and rep.failed and hasattr(item, "funcargs"): + for name, fixture in item.funcargs.items(): + try: + if name in {"dash_duo", "dash_br", "dashr", "dashjl"}: + fixture.take_snapshot(item.name) + except Exception as e: # pylint: disable=broad-except + print(e) + + +############################################################################### +# Fixtures +############################################################################### + + +@pytest.fixture +def dash_thread_server() -> ThreadedRunner: # type: ignore[reportInvalidTypeForm] + """Start a local dash server in a new thread.""" + with ThreadedRunner() as starter: + yield starter + + +@pytest.fixture +def dash_process_server() -> ProcessRunner: # type: ignore[reportInvalidTypeForm] + """Start a Dash server with subprocess.Popen and waitress-serve.""" + with ProcessRunner() as starter: + yield starter + + +@pytest.fixture +def dash_multi_process_server() -> MultiProcessRunner: # type: ignore[reportInvalidTypeForm] + with MultiProcessRunner() as starter: + yield starter + + +@pytest.fixture +def dashr_server() -> RRunner: # type: ignore[reportInvalidTypeForm] + with RRunner() as starter: + yield starter + + +@pytest.fixture +def dashjl_server() -> JuliaRunner: # type: ignore[reportInvalidTypeForm] + with JuliaRunner() as starter: + yield starter + + +@pytest.fixture +def dash_br(request, tmpdir) -> Browser: # type: ignore[reportInvalidTypeForm] + with Browser( + browser=request.config.getoption("webdriver"), + remote=request.config.getoption("remote"), + remote_url=request.config.getoption("remote_url"), + headless=request.config.getoption("headless"), + options=request.config.hook.pytest_setup_options(), + download_path=tmpdir.mkdir("download").strpath, + percy_assets_root=request.config.getoption("percy_assets"), + percy_finalize=request.config.getoption("nopercyfinalize"), + pause=request.config.getoption("pause"), + ) as browser: + yield browser + + +@pytest.fixture +def dash_duo(request, dash_thread_server, tmpdir) -> DashComposite: # type: ignore[reportInvalidTypeForm] + with DashComposite( + server=dash_thread_server, + browser=request.config.getoption("webdriver"), + remote=request.config.getoption("remote"), + remote_url=request.config.getoption("remote_url"), + headless=request.config.getoption("headless"), + options=request.config.hook.pytest_setup_options(), + download_path=tmpdir.mkdir("download").strpath, + percy_assets_root=request.config.getoption("percy_assets"), + percy_finalize=request.config.getoption("nopercyfinalize"), + pause=request.config.getoption("pause"), + ) as dc: + yield dc + + +@pytest.fixture +def dash_duo_mp(request, dash_multi_process_server, tmpdir) -> DashComposite: # type: ignore[reportInvalidTypeForm] + with DashComposite( + server=dash_multi_process_server, + browser=request.config.getoption("webdriver"), + remote=request.config.getoption("remote"), + remote_url=request.config.getoption("remote_url"), + headless=request.config.getoption("headless"), + options=request.config.hook.pytest_setup_options(), + download_path=tmpdir.mkdir("download").strpath, + percy_assets_root=request.config.getoption("percy_assets"), + percy_finalize=request.config.getoption("nopercyfinalize"), + pause=request.config.getoption("pause"), + ) as dc: + yield dc + + +@pytest.fixture +def dashr(request, dashr_server, tmpdir) -> DashRComposite: # type: ignore[reportInvalidTypeForm] + with DashRComposite( + server=dashr_server, + browser=request.config.getoption("webdriver"), + remote=request.config.getoption("remote"), + remote_url=request.config.getoption("remote_url"), + headless=request.config.getoption("headless"), + options=request.config.hook.pytest_setup_options(), + download_path=tmpdir.mkdir("download").strpath, + percy_assets_root=request.config.getoption("percy_assets"), + percy_finalize=request.config.getoption("nopercyfinalize"), + pause=request.config.getoption("pause"), + ) as dc: + yield dc + + +@pytest.fixture +def dashjl(request, dashjl_server, tmpdir) -> DashJuliaComposite: # type: ignore[reportInvalidTypeForm] + with DashJuliaComposite( + server=dashjl_server, + browser=request.config.getoption("webdriver"), + remote=request.config.getoption("remote"), + remote_url=request.config.getoption("remote_url"), + headless=request.config.getoption("headless"), + options=request.config.hook.pytest_setup_options(), + download_path=tmpdir.mkdir("download").strpath, + percy_assets_root=request.config.getoption("percy_assets"), + percy_finalize=request.config.getoption("nopercyfinalize"), + pause=request.config.getoption("pause"), + ) as dc: + yield dc + + +@pytest.fixture +def diskcache_manager(): + from dash.background_callback import ( # pylint: disable=import-outside-toplevel + DiskcacheManager, + ) + + return DiskcacheManager() diff --git a/venv/lib/python3.8/site-packages/dash/testing/wait.py b/venv/lib/python3.8/site-packages/dash/testing/wait.py new file mode 100644 index 0000000..d9bd8b0 --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/testing/wait.py @@ -0,0 +1,161 @@ +# pylint: disable=too-few-public-methods +"""Utils methods for pytest-dash such wait_for wrappers.""" + +import time +import logging +from selenium.common.exceptions import WebDriverException +from selenium.webdriver.common.by import By +from dash.testing.errors import TestingTimeoutError + + +logger = logging.getLogger(__name__) + + +def until( + wait_cond, timeout, poll=0.1, msg="expected condition not met within timeout" +): # noqa: C0330 + res = wait_cond() + logger.debug( + "start wait.until with method, timeout, poll => %s %s %s", + wait_cond, + timeout, + poll, + ) + end_time = time.time() + timeout + while not res: + if time.time() > end_time: + raise TestingTimeoutError(msg) + time.sleep(poll) + res = wait_cond() + logger.debug("poll => %s", time.time()) + + return res + + +def until_not( + wait_cond, timeout, poll=0.1, msg="expected condition met within timeout" +): # noqa: C0330 + res = wait_cond() + logger.debug( + "start wait.until_not method, timeout, poll => %s %s %s", + wait_cond, + timeout, + poll, + ) + end_time = time.time() + timeout + while res: + if time.time() > end_time: + raise TestingTimeoutError(msg) + time.sleep(poll) + res = wait_cond() + logger.debug("poll => %s", time.time()) + + return res + + +class contains_text: + def __init__(self, selector, text, timeout): + self.selector = selector + self.text = text + self.timeout = timeout + + def __call__(self, driver): + try: + elem = driver.find_element(By.CSS_SELECTOR, self.selector) + logger.debug("contains text {%s} => expected %s", elem.text, self.text) + value = elem.get_attribute("value") + return self.text in str(elem.text) or ( + value is not None and self.text in str(value) + ) + except WebDriverException: + return False + + def message(self, driver): + try: + element = self._get_element(driver) + text = "found: " + str(element.text) or str(element.get_attribute("value")) + except WebDriverException: + text = f"{self.selector} not found" + return f"text -> {self.text} not found inside element within {self.timeout}s, {text}" + + def _get_element(self, driver): + return driver.find_element(By.CSS_SELECTOR, self.selector) + + +class contains_class: + def __init__(self, selector, classname): + self.selector = selector + self.classname = classname + + def __call__(self, driver): + try: + elem = driver.find_element(By.CSS_SELECTOR, self.selector) + classname = elem.get_attribute("class") + logger.debug( + "contains class {%s} => expected %s", classname, self.classname + ) + return self.classname in str(classname).split(" ") + except WebDriverException: + return False + + +class text_to_equal: + def __init__(self, selector, text, timeout): + self.selector = selector + self.text = text + self.timeout = timeout + + def __call__(self, driver): + try: + elem = self._get_element(driver) + logger.debug("text to equal {%s} => expected %s", elem.text, self.text) + value = elem.get_attribute("value") + return str(elem.text) == self.text or ( + value is not None and str(value) == self.text + ) + except WebDriverException: + return False + + def message(self, driver): + try: + element = self._get_element(driver) + text = "found: " + str(element.text) or str(element.get_attribute("value")) + except WebDriverException: + text = f"{self.selector} not found" + return f"text -> {self.text} not found within {self.timeout}s, {text}" + + def _get_element(self, driver): + return driver.find_element(By.CSS_SELECTOR, self.selector) + + +class style_to_equal: + def __init__(self, selector, style, val): + self.selector = selector + self.style = style + self.val = val + + def __call__(self, driver): + try: + elem = driver.find_element(By.CSS_SELECTOR, self.selector) + val = elem.value_of_css_property(self.style) + logger.debug("style to equal {%s} => expected %s", val, self.val) + return val == self.val + except WebDriverException: + return False + + +class class_to_equal: + def __init__(self, selector, classname): + self.selector = selector + self.classname = classname + + def __call__(self, driver): + try: + elem = driver.find_element(By.CSS_SELECTOR, self.selector) + classname = elem.get_attribute("class") + logger.debug( + "class to equal {%s} => expected %s", classname, self.classname + ) + return str(classname) == self.classname + except WebDriverException: + return False |
