aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/dash/testing
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/dash/testing
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/dash/testing')
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/__init__.py17
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/application_runners.py533
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/browser.py671
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/composite.py46
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/consts.py1
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/dash_page.py99
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/errors.py26
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/newhooks.py2
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/plugin.py263
-rw-r--r--venv/lib/python3.8/site-packages/dash/testing/wait.py161
10 files changed, 1819 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/dash/testing/__init__.py b/venv/lib/python3.8/site-packages/dash/testing/__init__.py
new file mode 100644
index 0000000..8d33331
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/__init__.py
@@ -0,0 +1,17 @@
+from contextlib import contextmanager
+
+from .._callback_context import context_value as _ctx
+from .._utils import AttributeDict as _AD
+
+
+@contextmanager
+def ignore_register_page():
+ previous = _ctx.get()
+ copied = _AD(previous)
+ copied.ignore_register_page = True
+ _ctx.set(copied)
+
+ try:
+ yield
+ finally:
+ _ctx.set(previous)
diff --git a/venv/lib/python3.8/site-packages/dash/testing/application_runners.py b/venv/lib/python3.8/site-packages/dash/testing/application_runners.py
new file mode 100644
index 0000000..dc88afe
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/application_runners.py
@@ -0,0 +1,533 @@
+import sys
+import os
+import time
+import uuid
+import shlex
+import threading
+import shutil
+import subprocess
+import logging
+import inspect
+import ctypes
+
+import runpy
+import requests
+import psutil
+
+# pylint: disable=no-member
+import multiprocess
+
+from dash.testing.errors import (
+ NoAppFoundError,
+ TestingTimeoutError,
+ ServerCloseError,
+ DashAppLoadingError,
+)
+from dash.testing import wait
+
+logger = logging.getLogger(__name__)
+
+
+def import_app(app_file, application_name="app"):
+ """Import a dash application from a module. The import path is in dot
+ notation to the module. The variable named app will be returned.
+
+ :Example:
+
+ >>> app = import_app("my_app.app")
+
+ Will import the application in module `app` of the package `my_app`.
+
+ :param app_file: Path to the app (dot-separated).
+ :type app_file: str
+ :param application_name: The name of the dash application instance.
+ :raise: dash_tests.errors.NoAppFoundError
+ :return: App from module.
+ :rtype: dash.Dash
+ """
+ try:
+ app_module = runpy.run_module(app_file)
+ app = app_module[application_name]
+ except KeyError as app_name_missing:
+ logger.exception("the app name cannot be found")
+ raise NoAppFoundError(
+ f"No dash `app` instance was found in {app_file}"
+ ) from app_name_missing
+ return app
+
+
+class BaseDashRunner:
+ """Base context manager class for running applications."""
+
+ _next_port = 58050
+
+ def __init__(self, keep_open, stop_timeout, scheme="http", host="localhost"):
+ self.scheme = scheme
+ self.host = host
+ self.port = 8050
+ self.started = None
+ self.keep_open = keep_open
+ self.stop_timeout = stop_timeout
+ self._tmp_app_path = None
+
+ def start(self, *args, **kwargs):
+ raise NotImplementedError # pragma: no cover
+
+ def stop(self):
+ raise NotImplementedError # pragma: no cover
+
+ @staticmethod
+ def accessible(url):
+ try:
+ requests.get(url)
+ except requests.exceptions.RequestException:
+ return False
+ return True
+
+ def __call__(self, *args, **kwargs):
+ return self.start(*args, **kwargs)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, traceback):
+ if self.started and not self.keep_open:
+ try:
+ logger.info("killing the app runner")
+ self.stop()
+ except TestingTimeoutError as cannot_stop_server:
+ raise ServerCloseError(
+ f"Cannot stop server within {self.stop_timeout}s timeout"
+ ) from cannot_stop_server
+ logger.info("__exit__ complete")
+
+ @property
+ def url(self):
+ """The default server url."""
+ return f"{self.scheme}://{self.host}:{self.port}"
+
+ @property
+ def is_windows(self):
+ return sys.platform == "win32"
+
+ @property
+ def tmp_app_path(self):
+ return self._tmp_app_path
+
+
+class KillerThread(threading.Thread):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self._old_threads = list(threading._active.keys()) # type: ignore[reportAttributeAccessIssue]; pylint: disable=W0212
+
+ def kill(self):
+ # Kill all the new threads.
+ for thread_id in list(threading._active): # type: ignore[reportAttributeAccessIssue]; pylint: disable=W0212
+ if thread_id in self._old_threads:
+ continue
+
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
+ ctypes.c_long(thread_id), ctypes.py_object(SystemExit)
+ )
+ if res == 0:
+ raise ValueError(f"Invalid thread id: {thread_id}")
+ if res > 1:
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(
+ ctypes.c_long(thread_id), None
+ )
+ raise SystemExit("Stopping thread failure")
+
+
+class ThreadedRunner(BaseDashRunner):
+ """Runs a dash application in a thread.
+
+ This is the default flavor to use in dash integration tests.
+ """
+
+ def __init__(self, keep_open=False, stop_timeout=3):
+ super().__init__(keep_open=keep_open, stop_timeout=stop_timeout)
+ self.thread = None
+
+ def running_and_accessible(self, url):
+ if self.thread.is_alive(): # type: ignore[reportOptionalMemberAccess]
+ return self.accessible(url)
+ raise DashAppLoadingError("Thread is not alive.")
+
+ # pylint: disable=arguments-differ
+ def start(self, app, start_timeout=3, **kwargs):
+ """Start the app server in threading flavor."""
+
+ def run():
+ app.scripts.config.serve_locally = True
+ app.css.config.serve_locally = True
+
+ options = kwargs.copy()
+ options["dev_tools_disable_version_check"] = True
+
+ if "port" not in kwargs:
+ options["port"] = self.port = BaseDashRunner._next_port
+ BaseDashRunner._next_port += 1
+ else:
+ self.port = options["port"]
+
+ try:
+ app.run(threaded=True, **options)
+ except SystemExit:
+ logger.info("Server stopped")
+ except Exception as error:
+ logger.exception(error)
+ raise error
+
+ retries = 0
+
+ while not self.started and retries < 3:
+ try:
+ if self.thread:
+ if self.thread.is_alive():
+ self.stop()
+ else:
+ self.thread.kill()
+
+ self.thread = KillerThread(target=run)
+ self.thread.daemon = True
+ self.thread.start()
+ # wait until server is able to answer http request
+ wait.until(
+ lambda: self.running_and_accessible(self.url), timeout=start_timeout
+ )
+ self.started = self.thread.is_alive()
+ except Exception as err: # pylint: disable=broad-except
+ logger.exception(err)
+ self.started = False
+ retries += 1
+ time.sleep(1)
+
+ self.started = self.thread.is_alive() # type: ignore[reportOptionalMemberAccess]
+ if not self.started:
+ raise DashAppLoadingError("threaded server failed to start")
+
+ def stop(self):
+ self.thread.kill() # type: ignore[reportOptionalMemberAccess]
+ self.thread.join() # type: ignore[reportOptionalMemberAccess]
+ wait.until_not(self.thread.is_alive, self.stop_timeout) # type: ignore[reportOptionalMemberAccess]
+ self.started = False
+
+
+class MultiProcessRunner(BaseDashRunner):
+ def __init__(self, keep_open=False, stop_timeout=3):
+ super().__init__(keep_open, stop_timeout)
+ self.proc = None
+
+ # pylint: disable=arguments-differ
+ def start(self, app, start_timeout=3, **kwargs):
+ self.port = kwargs.get("port", 8050)
+
+ def target():
+ app.scripts.config.serve_locally = True
+ app.css.config.serve_locally = True
+
+ options = kwargs.copy()
+
+ try:
+ app.run(threaded=True, **options)
+ except SystemExit:
+ logger.info("Server stopped")
+ raise
+ except Exception as error:
+ logger.exception(error)
+ raise error
+
+ self.proc = multiprocess.Process(target=target) # type: ignore[reportAttributeAccessIssue]; pylint: disable=not-callable
+ self.proc.start()
+
+ wait.until(lambda: self.accessible(self.url), timeout=start_timeout)
+ self.started = True
+
+ def stop(self):
+ process = psutil.Process(self.proc.pid) # type: ignore[reportOptionalMemberAccess]
+
+ for proc in process.children(recursive=True):
+ try:
+ proc.kill()
+ except psutil.NoSuchProcess:
+ pass
+
+ try:
+ process.kill()
+ except psutil.NoSuchProcess:
+ pass
+
+ try:
+ process.wait(1)
+ except (psutil.TimeoutExpired, psutil.NoSuchProcess):
+ pass
+
+
+class ProcessRunner(BaseDashRunner):
+ """Runs a dash application in a waitress-serve subprocess.
+
+ This flavor is closer to production environment but slower.
+ """
+
+ def __init__(self, keep_open=False, stop_timeout=3):
+ super().__init__(keep_open=keep_open, stop_timeout=stop_timeout)
+ self.proc = None
+
+ # pylint: disable=arguments-differ
+ def start(
+ self,
+ app_module=None,
+ application_name="app",
+ raw_command=None,
+ port=8050,
+ start_timeout=3,
+ ):
+ """Start the server with waitress-serve in process flavor."""
+ if not (app_module or raw_command): # need to set a least one
+ logging.error(
+ "the process runner needs to start with at least one valid command"
+ )
+ return
+ self.port = port
+ args = shlex.split(
+ raw_command
+ if raw_command
+ else f"waitress-serve --listen=0.0.0.0:{port} {app_module}:{application_name}.server",
+ posix=not self.is_windows,
+ )
+
+ logger.debug("start dash process with %s", args)
+
+ try:
+ self.proc = subprocess.Popen( # pylint: disable=consider-using-with
+ args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+ )
+ # wait until server is able to answer http request
+ wait.until(lambda: self.accessible(self.url), timeout=start_timeout)
+
+ except (OSError, ValueError):
+ logger.exception("process server has encountered an error")
+ self.started = False
+ self.stop()
+ return
+
+ self.started = True
+
+ def stop(self):
+ if self.proc:
+ try:
+ logger.info("proc.terminate with pid %s", self.proc.pid)
+ self.proc.terminate()
+ if self.tmp_app_path and os.path.exists(self.tmp_app_path):
+ logger.debug("removing temporary app path %s", self.tmp_app_path)
+ shutil.rmtree(self.tmp_app_path)
+
+ self.proc.communicate(
+ timeout=self.stop_timeout # pylint: disable=unexpected-keyword-arg
+ )
+ except subprocess.TimeoutExpired:
+ logger.exception(
+ "subprocess terminate not success, trying to kill "
+ "the subprocess in a safe manner"
+ )
+ self.proc.kill()
+ self.proc.communicate()
+ logger.info("process stop completes!")
+
+
+class RRunner(ProcessRunner):
+ def __init__(self, keep_open=False, stop_timeout=3):
+ super().__init__(keep_open=keep_open, stop_timeout=stop_timeout)
+ self.proc = None
+
+ # pylint: disable=arguments-differ
+ def start(self, app, start_timeout=2, cwd=None): # type: ignore[reportIncompatibleMethodOverride]
+ """Start the server with subprocess and Rscript."""
+
+ if os.path.isfile(app) and os.path.exists(app):
+ # app is already a file in a dir - use that as cwd
+ if not cwd:
+ cwd = os.path.dirname(app)
+ logger.info("RRunner inferred cwd from app path: %s", cwd)
+ else:
+ # app is a string chunk, we make a temporary folder to store app.R
+ # and its relevant assets
+ tmp_dir = "/tmp" if not self.is_windows else os.getenv("TEMP")
+ tmp_dir = str(tmp_dir) # to satisfy type checking
+ hex_id = uuid.uuid4().hex
+ self._tmp_app_path = os.path.join(tmp_dir, hex_id)
+ try:
+ os.mkdir(self.tmp_app_path) # type: ignore[reportArgumentType]
+ except OSError:
+ logger.exception("cannot make temporary folder %s", self.tmp_app_path)
+ path = os.path.join(self.tmp_app_path, "app.R") # type: ignore[reportCallIssue]
+
+ logger.info("RRunner start => app is R code chunk")
+ logger.info("make a temporary R file for execution => %s", path)
+ logger.debug("content of the dashR app")
+ logger.debug("%s", app)
+
+ with open(path, "w", encoding="utf-8") as fp:
+ fp.write(app)
+
+ app = path
+
+ # try to find the path to the calling script to use as cwd
+ if not cwd:
+ for entry in inspect.stack():
+ if "/dash/testing/" not in entry[1].replace("\\", "/"):
+ cwd = os.path.dirname(os.path.realpath(entry[1]))
+ logger.warning("get cwd from inspect => %s", cwd)
+ break
+ if cwd:
+ logger.info("RRunner inferred cwd from the Python call stack: %s", cwd)
+
+ # try copying all valid sub folders (i.e. assets) in cwd to tmp
+ # note that the R assets folder name can be any valid folder name
+ assets = [
+ os.path.join(cwd, _)
+ for _ in os.listdir(cwd)
+ if not _.startswith("__") and os.path.isdir(os.path.join(cwd, _))
+ ]
+
+ for asset in assets:
+ target = os.path.join(self.tmp_app_path, os.path.basename(asset)) # type: ignore[reportCallIssue]
+ if os.path.exists(target):
+ logger.debug("delete existing target %s", target)
+ shutil.rmtree(target)
+ logger.debug("copying %s => %s", asset, self.tmp_app_path)
+ shutil.copytree(asset, target)
+ logger.debug("copied with %s", os.listdir(target))
+
+ else:
+ logger.warning(
+ "RRunner found no cwd in the Python call stack. "
+ "You may wish to specify an explicit working directory "
+ "using something like: "
+ "dashr.run_server(app, cwd=os.path.dirname(__file__))"
+ )
+
+ logger.info("Run dashR app with Rscript => %s", app)
+
+ args = shlex.split(
+ f"Rscript -e 'source(\"{os.path.realpath(app)}\")'",
+ posix=not self.is_windows,
+ )
+ logger.debug("start dash process with %s", args)
+
+ try:
+ self.proc = subprocess.Popen( # pylint: disable=consider-using-with
+ args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ cwd=self.tmp_app_path if self.tmp_app_path else cwd,
+ )
+ # wait until server is able to answer http request
+ wait.until(lambda: self.accessible(self.url), timeout=start_timeout)
+
+ except (OSError, ValueError):
+ logger.exception("process server has encountered an error")
+ self.started = False
+ return
+
+ self.started = True
+
+
+class JuliaRunner(ProcessRunner):
+ def __init__(self, keep_open=False, stop_timeout=3):
+ super().__init__(keep_open=keep_open, stop_timeout=stop_timeout)
+ self.proc = None
+
+ # pylint: disable=arguments-differ
+ def start(self, app, start_timeout=30, cwd=None): # type: ignore[reportIncompatibleMethodOverride]
+ """Start the server with subprocess and julia."""
+
+ if os.path.isfile(app) and os.path.exists(app):
+ # app is already a file in a dir - use that as cwd
+ if not cwd:
+ cwd = os.path.dirname(app)
+ logger.info("JuliaRunner inferred cwd from app path: %s", cwd)
+ else:
+ # app is a string chunk, we make a temporary folder to store app.jl
+ # and its relevant assets
+ tmp_dir = "/tmp" if not self.is_windows else os.getenv("TEMP")
+ assert isinstance(tmp_dir, str) # to satisfy typing
+ hex_id = uuid.uuid4().hex
+ self._tmp_app_path = os.path.join(tmp_dir, hex_id)
+ assert isinstance(self.tmp_app_path, str) # to satisfy typing
+ try:
+ os.mkdir(self.tmp_app_path)
+ except OSError:
+ logger.exception("cannot make temporary folder %s", self.tmp_app_path)
+ path = os.path.join(self.tmp_app_path, "app.jl")
+
+ logger.info("JuliaRunner start => app is Julia code chunk")
+ logger.info("make a temporary Julia file for execution => %s", path)
+ logger.debug("content of the Dash.jl app")
+ logger.debug("%s", app)
+
+ with open(path, "w", encoding="utf-8") as fp:
+ fp.write(app)
+
+ app = path
+
+ # try to find the path to the calling script to use as cwd
+ if not cwd:
+ for entry in inspect.stack():
+ if "/dash/testing/" not in entry[1].replace("\\", "/"):
+ cwd = os.path.dirname(os.path.realpath(entry[1]))
+ logger.warning("get cwd from inspect => %s", cwd)
+ break
+ if cwd:
+ logger.info(
+ "JuliaRunner inferred cwd from the Python call stack: %s", cwd
+ )
+
+ # try copying all valid sub folders (i.e. assets) in cwd to tmp
+ # note that the R assets folder name can be any valid folder name
+ assets = [
+ os.path.join(cwd, _)
+ for _ in os.listdir(cwd)
+ if not _.startswith("__") and os.path.isdir(os.path.join(cwd, _))
+ ]
+
+ for asset in assets:
+ target = os.path.join(self.tmp_app_path, os.path.basename(asset))
+ if os.path.exists(target):
+ logger.debug("delete existing target %s", target)
+ shutil.rmtree(target)
+ logger.debug("copying %s => %s", asset, self.tmp_app_path)
+ shutil.copytree(asset, target)
+ logger.debug("copied with %s", os.listdir(target))
+
+ else:
+ logger.warning(
+ "JuliaRunner found no cwd in the Python call stack. "
+ "You may wish to specify an explicit working directory "
+ "using something like: "
+ "dashjl.run_server(app, cwd=os.path.dirname(__file__))"
+ )
+
+ logger.info("Run Dash.jl app with julia => %s", app)
+
+ args = shlex.split(
+ f"julia --project {os.path.realpath(app)}", posix=not self.is_windows
+ )
+ logger.debug("start Dash.jl process with %s", args)
+
+ try:
+ self.proc = subprocess.Popen( # pylint: disable=consider-using-with
+ args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ cwd=self.tmp_app_path if self.tmp_app_path else cwd,
+ )
+ # wait until server is able to answer http request
+ wait.until(lambda: self.accessible(self.url), timeout=start_timeout)
+
+ except (OSError, ValueError):
+ logger.exception("process server has encountered an error")
+ self.started = False
+ return
+
+ self.started = True
diff --git a/venv/lib/python3.8/site-packages/dash/testing/browser.py b/venv/lib/python3.8/site-packages/dash/testing/browser.py
new file mode 100644
index 0000000..b6b1ee1
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/browser.py
@@ -0,0 +1,671 @@
+# pylint: disable=missing-docstring
+import os
+import sys
+import time
+import logging
+from typing import Union, Optional
+import warnings
+import percy
+import requests
+
+from selenium import webdriver
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.webdriver.common.by import By
+from selenium.webdriver.support.wait import WebDriverWait
+from selenium.webdriver.common.keys import Keys
+from selenium.webdriver.common.action_chains import ActionChains
+
+from selenium.common.exceptions import (
+ WebDriverException,
+ TimeoutException,
+ MoveTargetOutOfBoundsException,
+)
+
+from dash.testing.wait import (
+ text_to_equal,
+ style_to_equal,
+ class_to_equal,
+ contains_text,
+ contains_class,
+ until,
+)
+from dash.testing.dash_page import DashPageMixin
+from dash.testing.errors import DashAppLoadingError, BrowserError, TestingTimeoutError
+from dash.testing.consts import SELENIUM_GRID_DEFAULT
+
+
+logger = logging.getLogger(__name__)
+
+
+class Browser(DashPageMixin):
+ _url: str
+
+ # pylint: disable=too-many-arguments
+ def __init__(
+ self,
+ browser: str,
+ remote: bool = False,
+ remote_url: Optional[str] = None,
+ headless: bool = False,
+ options: Optional[Union[dict, list]] = None,
+ download_path: str = "",
+ percy_run: bool = True,
+ percy_finalize: bool = True,
+ percy_assets_root: str = "",
+ wait_timeout: int = 10,
+ pause: bool = False,
+ ):
+ self._browser = browser.lower()
+ self._remote_url = remote_url
+ self._remote = (
+ True if remote_url and remote_url != SELENIUM_GRID_DEFAULT else remote
+ )
+ self._headless = headless
+ self._options = options
+ self._download_path = download_path
+ self._wait_timeout = wait_timeout
+ self._percy_finalize = percy_finalize
+ self._percy_run = percy_run
+ self._pause = pause
+
+ self._driver = until(self.get_webdriver, timeout=1)
+ self._driver.implicitly_wait(2)
+
+ self._wd_wait = WebDriverWait(self.driver, wait_timeout)
+ self._last_ts = 0
+ self._url = ""
+
+ self._window_idx = 0 # switch browser tabs
+
+ if self._percy_run:
+ self.percy_runner = percy.Runner(
+ loader=percy.ResourceLoader(
+ webdriver=self.driver,
+ base_url="/assets",
+ root_dir=percy_assets_root,
+ )
+ )
+ self.percy_runner.initialize_build()
+
+ logger.debug("initialize browser with arguments")
+ logger.debug(" headless => %s", self._headless)
+ logger.debug(" download_path => %s", self._download_path)
+ logger.debug(" percy asset root => %s", os.path.abspath(percy_assets_root))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, traceback):
+ try:
+ self.driver.quit()
+ if self._percy_run and self._percy_finalize:
+ logger.info("percy runner finalize build now")
+ self.percy_runner.finalize_build()
+ else:
+ logger.info("percy finalize relies on CI job")
+ except WebDriverException:
+ logger.exception("webdriver quit was not successful")
+ except percy.errors.Error: # type: ignore[reportAttributeAccessIssue]
+ logger.exception("percy runner failed to finalize properly")
+
+ def visit_and_snapshot(
+ self,
+ resource_path: str,
+ hook_id: str,
+ wait_for_callbacks=True,
+ convert_canvases=False,
+ assert_check=True,
+ stay_on_page=False,
+ widths=None,
+ ):
+ path = resource_path.lstrip("/")
+ try:
+ if path != resource_path:
+ logger.warning("we stripped the left '/' in resource_path")
+ self.server_url = self.server_url
+ self.driver.get(f"{self.server_url.rstrip('/')}/{path}")
+
+ # wait for the hook_id to present and all callbacks get fired
+ self.wait_for_element_by_id(hook_id)
+ self.percy_snapshot(
+ path,
+ wait_for_callbacks=wait_for_callbacks,
+ convert_canvases=convert_canvases,
+ widths=widths,
+ )
+ if assert_check:
+ assert not self.find_elements(
+ "div.dash-debug-alert"
+ ), "devtools should not raise an error alert"
+ if not stay_on_page:
+ self.driver.back()
+ except WebDriverException as e:
+ logger.exception("snapshot at resource %s error", path)
+ raise e
+
+ def percy_snapshot(
+ self, name="", wait_for_callbacks=False, convert_canvases=False, widths=None
+ ):
+ """percy_snapshot - visual test api shortcut to `percy_runner.snapshot`.
+ It also combines the snapshot `name` with the Python version,
+ args:
+ - name: combined with the python version to give the final snapshot name
+ - wait_for_callbacks: default False, whether to wait for Dash callbacks,
+ after an extra second to ensure that any relevant callbacks have
+ been initiated
+ - convert_canvases: default False, whether to convert all canvas elements
+ in the DOM into static images for percy to see. They will be restored
+ after the snapshot is complete.
+ - widths: a list of pixel widths for percy to render the page with. Note
+ that this does not change the browser in which the DOM is constructed,
+ so the width will only affect CSS, not JS-driven layout.
+ Defaults to [1280]
+ """
+ if widths is None:
+ widths = [1280]
+ try:
+ import asgiref # pylint: disable=unused-import, import-outside-toplevel # noqa: F401, C0415
+
+ name += "_async"
+ except ImportError:
+ pass
+
+ logger.info("taking snapshot name => %s", name)
+ try:
+ if wait_for_callbacks:
+ # the extra one second sleep adds safe margin in the context
+ # of wait_for_callbacks
+ time.sleep(1)
+ until(self._wait_for_callbacks, timeout=40, poll=0.3)
+ except TestingTimeoutError:
+ # API will log the error but this TimeoutError should not block
+ # the test execution to continue and it will still do a snapshot
+ # as diff reference for the build run.
+ logger.error(
+ "wait_for_callbacks failed => status of invalid rqs %s",
+ self.redux_state_rqs,
+ )
+
+ if convert_canvases:
+ self.driver.execute_script(
+ """
+ const stash = window._canvasStash = [];
+ Array.from(document.querySelectorAll('canvas')).forEach(c => {
+ const i = document.createElement('img');
+ i.src = c.toDataURL();
+ i.width = c.width;
+ i.height = c.height;
+ i.setAttribute('style', c.getAttribute('style'));
+ i.className = c.className;
+ i.setAttribute('data-canvasnum', stash.length);
+ stash.push(c);
+ c.parentElement.insertBefore(i, c);
+ c.parentElement.removeChild(c);
+ });
+ """
+ )
+
+ try:
+ self.percy_runner.snapshot(name=name, widths=widths)
+ except requests.HTTPError as err:
+ # Ignore retries.
+ if err.request.status_code != 400: # type: ignore[reportAttributeAccessIssue]
+ raise err
+
+ if convert_canvases:
+ self.driver.execute_script(
+ """
+ const stash = window._canvasStash;
+ Array.from(
+ document.querySelectorAll('img[data-canvasnum]')
+ ).forEach(i => {
+ const c = stash[+i.getAttribute('data-canvasnum')];
+ i.parentElement.insertBefore(c, i);
+ i.parentElement.removeChild(i);
+ });
+ delete window._canvasStash;
+ """
+ )
+
+ def take_snapshot(self, name: str):
+ """Hook method to take snapshot when a selenium test fails. The
+ snapshot is placed under.
+
+ - `/tmp/dash_artifacts` in linux
+ - `%TEMP` in windows
+ with a filename combining test case name and the
+ running selenium session id
+ """
+ target = (
+ "/tmp/dash_artifacts" if not self._is_windows() else os.getenv("TEMP", "")
+ )
+
+ if not os.path.exists(target):
+ try:
+ os.mkdir(target)
+ except OSError:
+ logger.exception("cannot make artifacts")
+
+ self.driver.save_screenshot(f"{target}/{name}_{self.session_id}.png")
+
+ def find_element(self, selector, attribute="CSS_SELECTOR"):
+ """find_element returns the first found element by the attribute `selector`
+ shortcut to `driver.find_element(By.CSS_SELECTOR, ...)`.
+ args:
+ - attribute: the attribute type to search for, aligns with the Selenium
+ API's `By` class. default "CSS_SELECTOR"
+ valid values: "CSS_SELECTOR", "ID", "NAME", "TAG_NAME",
+ "CLASS_NAME", "LINK_TEXT", "PARTIAL_LINK_TEXT", "XPATH"
+ """
+ return self.driver.find_element(getattr(By, attribute.upper()), selector)
+
+ def find_elements(self, selector, attribute="CSS_SELECTOR"):
+ """find_elements returns a list of all elements matching the attribute
+ `selector`. Shortcut to `driver.find_elements(By.CSS_SELECTOR, ...)`.
+ args:
+ - attribute: the attribute type to search for, aligns with the Selenium
+ API's `By` class. default "CSS_SELECTOR"
+ valid values: "CSS_SELECTOR", "ID", "NAME", "TAG_NAME",
+ "CLASS_NAME", "LINK_TEXT", "PARTIAL_LINK_TEXT", "XPATH"
+ """
+ return self.driver.find_elements(getattr(By, attribute.upper()), selector)
+
+ def _get_element(self, elem_or_selector):
+ if isinstance(elem_or_selector, str):
+ return self.find_element(elem_or_selector)
+ return elem_or_selector
+
+ def _wait_for(self, method, timeout, msg):
+ """Abstract generic pattern for explicit WebDriverWait."""
+ try:
+ _wait = (
+ self._wd_wait
+ if timeout is None
+ else WebDriverWait(self.driver, timeout)
+ )
+ logger.debug(
+ "method, timeout, poll => %s %s %s",
+ method,
+ _wait._timeout, # pylint: disable=protected-access
+ _wait._poll, # pylint: disable=protected-access
+ )
+
+ return _wait.until(method)
+ except Exception as err:
+ if callable(msg):
+ message = msg(self.driver)
+ else:
+ message = msg
+ raise TimeoutException(str(message)) from err
+
+ def wait_for_element(self, selector, timeout=None):
+ """wait_for_element is shortcut to `wait_for_element_by_css_selector`
+ timeout if not set, equals to the fixture's `wait_timeout`."""
+ return self.wait_for_element_by_css_selector(selector, timeout)
+
+ def wait_for_element_by_css_selector(self, selector, timeout=None):
+ """Explicit wait until the element is present, timeout if not set,
+ equals to the fixture's `wait_timeout` shortcut to `WebDriverWait` with
+ `EC.presence_of_element_located`."""
+ return self._wait_for(
+ EC.presence_of_element_located(
+ (By.CSS_SELECTOR, selector),
+ ),
+ timeout,
+ f"timeout {timeout or self._wait_timeout}s => waiting for selector {selector}",
+ )
+
+ def wait_for_no_elements(self, selector, timeout=None):
+ """Explicit wait until an element is NOT found. timeout defaults to
+ the fixture's `wait_timeout`."""
+ until(
+ # if we use get_elements it waits a long time to see if they appear
+ # so this one calls out directly to execute_script
+ lambda: self.driver.execute_script(
+ f"return document.querySelectorAll('{selector}').length"
+ )
+ == 0,
+ timeout or self._wait_timeout,
+ )
+
+ def wait_for_element_by_id(self, element_id, timeout=None):
+ """Explicit wait until the element is present, timeout if not set,
+ equals to the fixture's `wait_timeout` shortcut to `WebDriverWait` with
+ `EC.presence_of_element_located`."""
+ return self._wait_for(
+ EC.presence_of_element_located(
+ (By.ID, element_id),
+ ),
+ timeout,
+ f"timeout {timeout or self._wait_timeout}s => waiting for element id {element_id}",
+ )
+
+ def wait_for_class_to_equal(self, selector, classname, timeout=None):
+ """Explicit wait until the element's class has expected `value` timeout
+ if not set, equals to the fixture's `wait_timeout` shortcut to
+ `WebDriverWait` with customized `class_to_equal` condition."""
+ return self._wait_for(
+ method=class_to_equal(selector, classname),
+ timeout=timeout,
+ msg=f"classname => {classname} not found within {timeout or self._wait_timeout}s",
+ )
+
+ def wait_for_style_to_equal(self, selector, style, val, timeout=None):
+ """Explicit wait until the element's style has expected `value` timeout
+ if not set, equals to the fixture's `wait_timeout` shortcut to
+ `WebDriverWait` with customized `style_to_equal` condition."""
+ return self._wait_for(
+ method=style_to_equal(selector, style, val),
+ timeout=timeout,
+ msg=f"style val => {style} {val} not found within {timeout or self._wait_timeout}s",
+ )
+
+ def wait_for_text_to_equal(self, selector, text, timeout=None):
+ """Explicit wait until the element's text equals the expected `text`.
+
+ timeout if not set, equals to the fixture's `wait_timeout`
+ shortcut to `WebDriverWait` with customized `text_to_equal`
+ condition.
+ """
+ method = text_to_equal(selector, text, timeout or self.wait_timeout)
+
+ return self._wait_for(
+ method=method,
+ timeout=timeout,
+ msg=method.message,
+ )
+
+ def wait_for_contains_class(self, selector, classname, timeout=None):
+ """Explicit wait until the element's classes contains the expected `classname`.
+
+ timeout if not set, equals to the fixture's `wait_timeout`
+ shortcut to `WebDriverWait` with customized `contains_class`
+ condition.
+ """
+ return self._wait_for(
+ method=contains_class(selector, classname),
+ timeout=timeout,
+ msg=f"classname -> {classname} not found inside element within {timeout or self._wait_timeout}s",
+ )
+
+ def wait_for_contains_text(self, selector, text, timeout=None):
+ """Explicit wait until the element's text contains the expected `text`.
+
+ timeout if not set, equals to the fixture's `wait_timeout`
+ shortcut to `WebDriverWait` with customized `contains_text`
+ condition.
+ """
+ method = contains_text(selector, text, timeout or self.wait_timeout)
+ return self._wait_for(
+ method=method,
+ timeout=timeout,
+ msg=method.message,
+ )
+
+ def wait_for_page(self, url=None, timeout=10):
+ """wait_for_page navigates to the url in webdriver wait until the
+ renderer is loaded in browser.
+
+ use the `server_url` if url is not provided.
+ """
+ self.driver.get(self.server_url if url is None else url)
+ try:
+ self.wait_for_element_by_css_selector(
+ self.dash_entry_locator, timeout=timeout
+ )
+ except TimeoutException as exc:
+ logger.exception("dash server is not loaded within %s seconds", timeout)
+ logs = "\n".join((str(log) for log in self.get_logs())) # type: ignore[reportOptionalIterable]
+ logger.debug(logs)
+ html = self.find_element("body").get_property("innerHTML")
+ raise DashAppLoadingError(
+ "the expected Dash react entry point cannot be loaded"
+ f" in browser\n HTML => {html}\n Console Logs => {logs}\n"
+ ) from exc
+
+ if self._pause:
+ import pdb # pylint: disable=import-outside-toplevel
+
+ pdb.set_trace() # pylint: disable=forgotten-debug-statement
+
+ def select_dcc_dropdown(self, elem_or_selector, value=None, index=None):
+ dropdown = self._get_element(elem_or_selector)
+ dropdown.click()
+
+ menu = dropdown.find_element(By.CSS_SELECTOR, "div.Select-menu-outer")
+ logger.debug("the available options are %s", "|".join(menu.text.split("\n")))
+
+ options = menu.find_elements(By.CSS_SELECTOR, "div.VirtualizedSelectOption")
+ if options:
+ if isinstance(index, int):
+ options[index].click()
+ return
+
+ for option in options:
+ if option.text == value:
+ option.click()
+ return
+
+ logger.error(
+ "cannot find matching option using value=%s or index=%s", value, index
+ )
+
+ def toggle_window(self):
+ """Switch between the current working window and the new opened one."""
+ idx = (self._window_idx + 1) % 2
+ self.switch_window(idx=idx)
+ self._window_idx += 1
+
+ def switch_window(self, idx=0):
+ """Switch to window by window index shortcut to
+ `driver.switch_to.window`."""
+ if len(self.driver.window_handles) <= idx:
+ raise BrowserError("there is no second window in Browser")
+
+ self.driver.switch_to.window(self.driver.window_handles[idx])
+
+ def open_new_tab(self, url=None):
+ """Open a new tab in browser url is not set, equals to `server_url`."""
+ self.driver.execute_script(
+ f'window.open("{url or self.server_url}", "new window")'
+ )
+
+ def get_webdriver(self):
+ return getattr(self, f"_get_{self._browser}")()
+
+ def _get_wd_options(self):
+ options = (
+ self._options[0]
+ if self._options and isinstance(self._options, list)
+ else getattr(webdriver, self._browser).options.Options()
+ )
+
+ if self._headless:
+ options.add_argument("--headless")
+
+ return options
+
+ def _get_chrome(self):
+ options = self._get_wd_options()
+
+ if "DASH_TEST_CHROMEPATH" in os.environ:
+ options.binary_location = os.environ["DASH_TEST_CHROMEPATH"]
+
+ options.add_experimental_option(
+ "prefs",
+ {
+ "download.default_directory": self.download_path,
+ "download.prompt_for_download": False,
+ "download.directory_upgrade": True,
+ "safebrowsing.enabled": False,
+ "safebrowsing.disable_download_protection": True,
+ },
+ )
+
+ options.add_argument("--disable-dev-shm-usage")
+ options.add_argument("--no-sandbox")
+ options.add_argument("--disable-gpu")
+ options.add_argument("--remote-debugging-port=0")
+
+ options.set_capability("goog:loggingPrefs", {"browser": "SEVERE"})
+
+ chrome = (
+ webdriver.Remote(command_executor=self._remote_url, options=options) # type: ignore[reportAttributeAccessIssue]
+ if self._remote
+ else webdriver.Chrome(options=options)
+ )
+
+ # https://bugs.chromium.org/p/chromium/issues/detail?id=696481
+ if self._headless:
+ # pylint: disable=protected-access
+ chrome.command_executor._commands["send_command"] = ( # type: ignore[reportArgumentType]
+ "POST",
+ "/session/$sessionId/chromium/send_command",
+ )
+ params = {
+ "cmd": "Page.setDownloadBehavior",
+ "params": {"behavior": "allow", "downloadPath": self.download_path},
+ }
+ res = chrome.execute("send_command", params)
+ logger.debug("enabled headless download returns %s", res)
+
+ chrome.set_window_position(0, 0)
+ return chrome
+
+ def _get_firefox(self):
+ options = self._get_wd_options()
+
+ options.set_capability("marionette", True)
+
+ options.set_preference("browser.download.dir", self.download_path)
+ options.set_preference("browser.download.folderList", 2)
+ options.set_preference(
+ "browser.helperApps.neverAsk.saveToDisk",
+ "application/octet-stream", # this MIME is generic for binary
+ )
+ if not self._remote_url and self._remote:
+ raise TypeError("remote_url was not provided but required for Firefox")
+
+ return (
+ webdriver.Remote(
+ command_executor=self._remote_url, # type: ignore[reportTypeArgument]
+ options=options,
+ )
+ if self._remote
+ else webdriver.Firefox(options=options)
+ )
+
+ @staticmethod
+ def _is_windows():
+ return sys.platform == "win32"
+
+ def multiple_click(self, elem_or_selector, clicks, delay=None):
+ """multiple_click click the element with number of `clicks`."""
+ for _ in range(clicks):
+ self._get_element(elem_or_selector).click()
+ if delay:
+ time.sleep(delay)
+
+ def clear_input(self, elem_or_selector):
+ """Simulate key press to clear the input."""
+ elem = self._get_element(elem_or_selector)
+ logger.debug("clear input with %s => %s", elem_or_selector, elem)
+ (
+ ActionChains(self.driver)
+ .move_to_element(elem)
+ .pause(0.2)
+ .click(elem)
+ .send_keys(Keys.END)
+ .key_down(Keys.SHIFT)
+ .send_keys(Keys.HOME)
+ .key_up(Keys.SHIFT)
+ .send_keys(Keys.DELETE)
+ ).perform()
+
+ def zoom_in_graph_by_ratio(
+ self, elem_or_selector, start_fraction=0.5, zoom_box_fraction=0.2, compare=True
+ ):
+ """Zoom out a graph with a zoom box fraction of component dimension
+ default start at middle with a rectangle of 1/5 of the dimension use
+ `compare` to control if we check the svg get changed."""
+ elem = self._get_element(elem_or_selector)
+
+ prev = elem.get_attribute("innerHTML")
+ w, h = elem.size["width"], elem.size["height"]
+ try:
+ ActionChains(self.driver).move_to_element_with_offset(
+ elem, w * start_fraction, h * start_fraction
+ ).drag_and_drop_by_offset(
+ elem, w * zoom_box_fraction, h * zoom_box_fraction
+ ).perform()
+ except MoveTargetOutOfBoundsException:
+ logger.exception("graph offset outside of the boundary")
+ if compare:
+ assert prev != elem.get_attribute(
+ "innerHTML"
+ ), "SVG content should be different after zoom"
+
+ def click_at_coord_fractions(self, elem_or_selector, fx, fy):
+ elem = self._get_element(elem_or_selector)
+
+ ActionChains(self.driver).move_to_element_with_offset(
+ elem, elem.size["width"] * fx, elem.size["height"] * fy
+ ).click().perform()
+
+ def get_logs(self):
+ """Return a list of `SEVERE` level logs after last reset time stamps
+ (default to 0, resettable by `reset_log_timestamp`.
+
+ Chrome only
+ """
+ if self._browser == "chrome":
+ return [
+ entry
+ for entry in self.driver.get_log("browser")
+ if entry["timestamp"] > self._last_ts
+ ]
+ warnings.warn("get_logs always return None with webdrivers other than Chrome")
+ return None
+
+ def reset_log_timestamp(self):
+ """reset_log_timestamp only work with chrome webdriver."""
+ if self._browser == "chrome":
+ entries = self.driver.get_log("browser")
+ if entries:
+ self._last_ts = entries[-1]["timestamp"]
+
+ @property
+ def driver(self):
+ """Expose the selenium webdriver as fixture property."""
+ return self._driver
+
+ @property
+ def session_id(self):
+ return self.driver.session_id
+
+ @property
+ def server_url(self) -> str:
+ return self._url
+
+ @server_url.setter
+ def server_url(self, value):
+ """Set the server url so the selenium is aware of the local server
+ port.
+
+ It also implicitly calls `wait_for_page`.
+ """
+ self._url = value
+ self.wait_for_page()
+
+ @property
+ def download_path(self):
+ return self._download_path
+
+ @property
+ def wait_timeout(self):
+ return self._wait_timeout
+
+ @wait_timeout.setter
+ def wait_timeout(self, value):
+ self._wait_timeout = value
+ self._wd_wait = WebDriverWait(self.driver, value)
diff --git a/venv/lib/python3.8/site-packages/dash/testing/composite.py b/venv/lib/python3.8/site-packages/dash/testing/composite.py
new file mode 100644
index 0000000..8a3abee
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/composite.py
@@ -0,0 +1,46 @@
+from dash.testing.browser import Browser
+
+
+class DashComposite(Browser):
+ def __init__(self, server, **kwargs):
+ super().__init__(**kwargs)
+ self.server = server
+
+ def start_server(self, app, navigate=True, **kwargs):
+ """Start the local server with app."""
+
+ # start server with app and pass Dash arguments
+ self.server(app, **kwargs)
+
+ if navigate:
+ # set the default server_url, it implicitly call wait_for_page
+ self.server_url = self.server.url
+
+
+class DashRComposite(Browser):
+ def __init__(self, server, **kwargs):
+ super().__init__(**kwargs)
+ self.server = server
+
+ def start_server(self, app, cwd=None):
+
+ # start server with dashR app. The app sets its own run_server args
+ # on the R side, but we support overriding the automatic cwd
+ self.server(app, cwd=cwd)
+
+ # set the default server_url, it implicitly call wait_for_page
+ self.server_url = self.server.url
+
+
+class DashJuliaComposite(Browser):
+ def __init__(self, server, **kwargs):
+ super().__init__(**kwargs)
+ self.server = server
+
+ def start_server(self, app, cwd=None):
+ # start server with Dash.jl app. The app sets its own run_server args
+ # on the Julia side, but we support overriding the automatic cwd
+ self.server(app, cwd=cwd)
+
+ # set the default server_url, it implicitly call wait_for_page
+ self.server_url = self.server.url
diff --git a/venv/lib/python3.8/site-packages/dash/testing/consts.py b/venv/lib/python3.8/site-packages/dash/testing/consts.py
new file mode 100644
index 0000000..ea62a29
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/consts.py
@@ -0,0 +1 @@
+SELENIUM_GRID_DEFAULT = "http://localhost:4444/wd/hub"
diff --git a/venv/lib/python3.8/site-packages/dash/testing/dash_page.py b/venv/lib/python3.8/site-packages/dash/testing/dash_page.py
new file mode 100644
index 0000000..2f4d69f
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/dash_page.py
@@ -0,0 +1,99 @@
+# type: ignore[reportAttributeAccessIssue]
+# Ignore attribute access issues when type checking because mixin
+# class depends on other class lineage to supply things. We could use
+# a protocol definition here instead…
+
+from bs4 import BeautifulSoup
+
+
+class DashPageMixin:
+ def _get_dash_dom_by_attribute(self, attr):
+ return BeautifulSoup(
+ self.find_element(self.dash_entry_locator).get_attribute(attr), "lxml"
+ )
+
+ @property
+ def devtools_error_count_locator(self):
+ return ".test-devtools-error-count"
+
+ @property
+ def dash_entry_locator(self):
+ return "#react-entry-point"
+
+ @property
+ def dash_outerhtml_dom(self):
+ return self._get_dash_dom_by_attribute("outerHTML")
+
+ @property
+ def dash_innerhtml_dom(self):
+ return self._get_dash_dom_by_attribute("innerHTML")
+
+ @property
+ def redux_state_paths(self):
+ return self.driver.execute_script(
+ """
+ var p = window.store.getState().paths;
+ return {strs: p.strs, objs: p.objs}
+ """
+ )
+
+ @property
+ def redux_state_rqs(self):
+ return self.driver.execute_script(
+ """
+
+ // Check for legacy `pendingCallbacks` store prop (compatibility for Dash matrix testing)
+ var pendingCallbacks = window.store.getState().pendingCallbacks;
+ if (pendingCallbacks) {
+ return pendingCallbacks.map(function(cb) {
+ var out = {};
+ for (var key in cb) {
+ if (typeof cb[key] !== 'function') { out[key] = cb[key]; }
+ }
+ return out;
+ });
+ }
+
+ // Otherwise, use the new `callbacks` store prop
+ var callbacksState = Object.assign({}, window.store.getState().callbacks);
+ delete callbacksState.stored;
+ delete callbacksState.completed;
+
+ return Array.prototype.concat.apply([], Object.values(callbacksState));
+ """
+ )
+
+ @property
+ def redux_state_is_loading(self):
+ return self.driver.execute_script(
+ """
+ return window.store.getState().isLoading;
+ """
+ )
+
+ @property
+ def window_store(self):
+ return self.driver.execute_script("return window.store")
+
+ def _wait_for_callbacks(self):
+ return (not self.window_store) or self.redux_state_rqs == []
+
+ def get_local_storage(self, store_id="local"):
+ return self.driver.execute_script(
+ f"return JSON.parse(window.localStorage.getItem('{store_id}'));"
+ )
+
+ def get_session_storage(self, session_id="session"):
+ return self.driver.execute_script(
+ f"return JSON.parse(window.sessionStorage.getItem('{session_id}'));"
+ )
+
+ def clear_local_storage(self):
+ self.driver.execute_script("window.localStorage.clear()")
+
+ def clear_session_storage(self):
+ self.driver.execute_script("window.sessionStorage.clear()")
+
+ def clear_storage(self):
+ self.clear_local_storage()
+ self.clear_session_storage()
diff --git a/venv/lib/python3.8/site-packages/dash/testing/errors.py b/venv/lib/python3.8/site-packages/dash/testing/errors.py
new file mode 100644
index 0000000..772386d
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/errors.py
@@ -0,0 +1,26 @@
+class DashTestingError(Exception):
+ """Base error for pytest-dash."""
+
+
+class InvalidDriverError(DashTestingError):
+ """An invalid selenium driver was specified."""
+
+
+class NoAppFoundError(DashTestingError):
+ """No `app` was found in the file."""
+
+
+class DashAppLoadingError(DashTestingError):
+ """The dash app failed to load."""
+
+
+class ServerCloseError(DashTestingError):
+ """The server cannot be closed."""
+
+
+class TestingTimeoutError(DashTestingError):
+ """All timeout error about dash testing."""
+
+
+class BrowserError(DashTestingError):
+ """All browser relevant errors."""
diff --git a/venv/lib/python3.8/site-packages/dash/testing/newhooks.py b/venv/lib/python3.8/site-packages/dash/testing/newhooks.py
new file mode 100644
index 0000000..07e850d
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/newhooks.py
@@ -0,0 +1,2 @@
+def pytest_setup_options():
+ """Called before webdriver is initialized."""
diff --git a/venv/lib/python3.8/site-packages/dash/testing/plugin.py b/venv/lib/python3.8/site-packages/dash/testing/plugin.py
new file mode 100644
index 0000000..1b917d7
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/plugin.py
@@ -0,0 +1,263 @@
+# pylint: disable=missing-docstring,redefined-outer-name
+from typing import Any
+
+import pytest
+from .consts import SELENIUM_GRID_DEFAULT
+
+
+# pylint: disable=too-few-public-methods
+class MissingDashTesting:
+ def __init__(self, **kwargs):
+ raise Exception(
+ "dash[testing] was not installed. "
+ "Please install to use the dash testing fixtures."
+ )
+
+ def __enter__(self) -> Any:
+ """Implemented to satisfy type checking."""
+
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
+ """Implemented to satisfy type checking."""
+
+ return False
+
+
+try:
+ from dash.testing.application_runners import (
+ ThreadedRunner,
+ ProcessRunner,
+ RRunner,
+ JuliaRunner,
+ MultiProcessRunner,
+ )
+ from dash.testing.browser import Browser
+ from dash.testing.composite import DashComposite, DashRComposite, DashJuliaComposite
+
+ # pylint: disable=unused-import
+ import dash_testing_stub # noqa: F401
+
+ _installed = True
+except ImportError:
+ # Running pytest without dash[testing] installed.
+ ThreadedRunner = MissingDashTesting
+ ProcessRunner = MissingDashTesting
+ MultiProcessRunner = MissingDashTesting
+ RRunner = MissingDashTesting
+ JuliaRunner = MissingDashTesting
+ Browser = MissingDashTesting
+ DashComposite = MissingDashTesting
+ DashRComposite = MissingDashTesting
+ DashJuliaComposite = MissingDashTesting
+ _installed = False
+
+
+def pytest_addoption(parser):
+ if not _installed:
+ return
+
+ dash = parser.getgroup("Dash", "Dash Integration Tests")
+
+ dash.addoption(
+ "--webdriver",
+ choices=("Chrome", "Firefox"),
+ default="Chrome",
+ help="Name of the selenium driver to use",
+ )
+
+ dash.addoption(
+ "--remote", action="store_true", help="instruct pytest to use selenium grid"
+ )
+
+ dash.addoption(
+ "--remote-url",
+ action="store",
+ default=SELENIUM_GRID_DEFAULT,
+ help="set a different selenium grid remote url if other than default",
+ )
+
+ dash.addoption(
+ "--headless", action="store_true", help="set this flag to run in headless mode"
+ )
+
+ dash.addoption(
+ "--percy-assets",
+ action="store",
+ default="tests/assets",
+ help="configure how Percy will discover your app's assets",
+ )
+
+ dash.addoption(
+ "--nopercyfinalize",
+ action="store_false",
+ help="set this flag to control percy finalize at CI level",
+ )
+
+ dash.addoption(
+ "--pause",
+ action="store_true",
+ help="pause using pdb after opening the test app, so you can interact with it",
+ )
+
+
+@pytest.mark.tryfirst
+def pytest_addhooks(pluginmanager):
+ if not _installed:
+ return
+ # https://github.com/pytest-dev/pytest-xdist/blob/974bd566c599dc6a9ea291838c6f226197208b46/xdist/plugin.py#L67
+ # avoid warnings with pytest-2.8
+ from dash.testing import newhooks # pylint: disable=import-outside-toplevel
+
+ method = getattr(pluginmanager, "add_hookspecs", None)
+ if method is None:
+ method = pluginmanager.addhooks # pragma: no cover
+ method(newhooks)
+
+
+@pytest.hookimpl(tryfirst=True, hookwrapper=True)
+def pytest_runtest_makereport(item, call): # pylint: disable=unused-argument
+ # execute all other hooks to obtain the report object
+ outcome = yield
+ if not _installed:
+ return
+ rep = outcome.get_result()
+
+ # we only look at actual failing test calls, not setup/teardown
+ if rep.when == "call" and rep.failed and hasattr(item, "funcargs"):
+ for name, fixture in item.funcargs.items():
+ try:
+ if name in {"dash_duo", "dash_br", "dashr", "dashjl"}:
+ fixture.take_snapshot(item.name)
+ except Exception as e: # pylint: disable=broad-except
+ print(e)
+
+
+###############################################################################
+# Fixtures
+###############################################################################
+
+
+@pytest.fixture
+def dash_thread_server() -> ThreadedRunner: # type: ignore[reportInvalidTypeForm]
+ """Start a local dash server in a new thread."""
+ with ThreadedRunner() as starter:
+ yield starter
+
+
+@pytest.fixture
+def dash_process_server() -> ProcessRunner: # type: ignore[reportInvalidTypeForm]
+ """Start a Dash server with subprocess.Popen and waitress-serve."""
+ with ProcessRunner() as starter:
+ yield starter
+
+
+@pytest.fixture
+def dash_multi_process_server() -> MultiProcessRunner: # type: ignore[reportInvalidTypeForm]
+ with MultiProcessRunner() as starter:
+ yield starter
+
+
+@pytest.fixture
+def dashr_server() -> RRunner: # type: ignore[reportInvalidTypeForm]
+ with RRunner() as starter:
+ yield starter
+
+
+@pytest.fixture
+def dashjl_server() -> JuliaRunner: # type: ignore[reportInvalidTypeForm]
+ with JuliaRunner() as starter:
+ yield starter
+
+
+@pytest.fixture
+def dash_br(request, tmpdir) -> Browser: # type: ignore[reportInvalidTypeForm]
+ with Browser(
+ browser=request.config.getoption("webdriver"),
+ remote=request.config.getoption("remote"),
+ remote_url=request.config.getoption("remote_url"),
+ headless=request.config.getoption("headless"),
+ options=request.config.hook.pytest_setup_options(),
+ download_path=tmpdir.mkdir("download").strpath,
+ percy_assets_root=request.config.getoption("percy_assets"),
+ percy_finalize=request.config.getoption("nopercyfinalize"),
+ pause=request.config.getoption("pause"),
+ ) as browser:
+ yield browser
+
+
+@pytest.fixture
+def dash_duo(request, dash_thread_server, tmpdir) -> DashComposite: # type: ignore[reportInvalidTypeForm]
+ with DashComposite(
+ server=dash_thread_server,
+ browser=request.config.getoption("webdriver"),
+ remote=request.config.getoption("remote"),
+ remote_url=request.config.getoption("remote_url"),
+ headless=request.config.getoption("headless"),
+ options=request.config.hook.pytest_setup_options(),
+ download_path=tmpdir.mkdir("download").strpath,
+ percy_assets_root=request.config.getoption("percy_assets"),
+ percy_finalize=request.config.getoption("nopercyfinalize"),
+ pause=request.config.getoption("pause"),
+ ) as dc:
+ yield dc
+
+
+@pytest.fixture
+def dash_duo_mp(request, dash_multi_process_server, tmpdir) -> DashComposite: # type: ignore[reportInvalidTypeForm]
+ with DashComposite(
+ server=dash_multi_process_server,
+ browser=request.config.getoption("webdriver"),
+ remote=request.config.getoption("remote"),
+ remote_url=request.config.getoption("remote_url"),
+ headless=request.config.getoption("headless"),
+ options=request.config.hook.pytest_setup_options(),
+ download_path=tmpdir.mkdir("download").strpath,
+ percy_assets_root=request.config.getoption("percy_assets"),
+ percy_finalize=request.config.getoption("nopercyfinalize"),
+ pause=request.config.getoption("pause"),
+ ) as dc:
+ yield dc
+
+
+@pytest.fixture
+def dashr(request, dashr_server, tmpdir) -> DashRComposite: # type: ignore[reportInvalidTypeForm]
+ with DashRComposite(
+ server=dashr_server,
+ browser=request.config.getoption("webdriver"),
+ remote=request.config.getoption("remote"),
+ remote_url=request.config.getoption("remote_url"),
+ headless=request.config.getoption("headless"),
+ options=request.config.hook.pytest_setup_options(),
+ download_path=tmpdir.mkdir("download").strpath,
+ percy_assets_root=request.config.getoption("percy_assets"),
+ percy_finalize=request.config.getoption("nopercyfinalize"),
+ pause=request.config.getoption("pause"),
+ ) as dc:
+ yield dc
+
+
+@pytest.fixture
+def dashjl(request, dashjl_server, tmpdir) -> DashJuliaComposite: # type: ignore[reportInvalidTypeForm]
+ with DashJuliaComposite(
+ server=dashjl_server,
+ browser=request.config.getoption("webdriver"),
+ remote=request.config.getoption("remote"),
+ remote_url=request.config.getoption("remote_url"),
+ headless=request.config.getoption("headless"),
+ options=request.config.hook.pytest_setup_options(),
+ download_path=tmpdir.mkdir("download").strpath,
+ percy_assets_root=request.config.getoption("percy_assets"),
+ percy_finalize=request.config.getoption("nopercyfinalize"),
+ pause=request.config.getoption("pause"),
+ ) as dc:
+ yield dc
+
+
+@pytest.fixture
+def diskcache_manager():
+ from dash.background_callback import ( # pylint: disable=import-outside-toplevel
+ DiskcacheManager,
+ )
+
+ return DiskcacheManager()
diff --git a/venv/lib/python3.8/site-packages/dash/testing/wait.py b/venv/lib/python3.8/site-packages/dash/testing/wait.py
new file mode 100644
index 0000000..d9bd8b0
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/testing/wait.py
@@ -0,0 +1,161 @@
+# pylint: disable=too-few-public-methods
+"""Utils methods for pytest-dash such wait_for wrappers."""
+
+import time
+import logging
+from selenium.common.exceptions import WebDriverException
+from selenium.webdriver.common.by import By
+from dash.testing.errors import TestingTimeoutError
+
+
+logger = logging.getLogger(__name__)
+
+
+def until(
+ wait_cond, timeout, poll=0.1, msg="expected condition not met within timeout"
+): # noqa: C0330
+ res = wait_cond()
+ logger.debug(
+ "start wait.until with method, timeout, poll => %s %s %s",
+ wait_cond,
+ timeout,
+ poll,
+ )
+ end_time = time.time() + timeout
+ while not res:
+ if time.time() > end_time:
+ raise TestingTimeoutError(msg)
+ time.sleep(poll)
+ res = wait_cond()
+ logger.debug("poll => %s", time.time())
+
+ return res
+
+
+def until_not(
+ wait_cond, timeout, poll=0.1, msg="expected condition met within timeout"
+): # noqa: C0330
+ res = wait_cond()
+ logger.debug(
+ "start wait.until_not method, timeout, poll => %s %s %s",
+ wait_cond,
+ timeout,
+ poll,
+ )
+ end_time = time.time() + timeout
+ while res:
+ if time.time() > end_time:
+ raise TestingTimeoutError(msg)
+ time.sleep(poll)
+ res = wait_cond()
+ logger.debug("poll => %s", time.time())
+
+ return res
+
+
+class contains_text:
+ def __init__(self, selector, text, timeout):
+ self.selector = selector
+ self.text = text
+ self.timeout = timeout
+
+ def __call__(self, driver):
+ try:
+ elem = driver.find_element(By.CSS_SELECTOR, self.selector)
+ logger.debug("contains text {%s} => expected %s", elem.text, self.text)
+ value = elem.get_attribute("value")
+ return self.text in str(elem.text) or (
+ value is not None and self.text in str(value)
+ )
+ except WebDriverException:
+ return False
+
+ def message(self, driver):
+ try:
+ element = self._get_element(driver)
+ text = "found: " + str(element.text) or str(element.get_attribute("value"))
+ except WebDriverException:
+ text = f"{self.selector} not found"
+ return f"text -> {self.text} not found inside element within {self.timeout}s, {text}"
+
+ def _get_element(self, driver):
+ return driver.find_element(By.CSS_SELECTOR, self.selector)
+
+
+class contains_class:
+ def __init__(self, selector, classname):
+ self.selector = selector
+ self.classname = classname
+
+ def __call__(self, driver):
+ try:
+ elem = driver.find_element(By.CSS_SELECTOR, self.selector)
+ classname = elem.get_attribute("class")
+ logger.debug(
+ "contains class {%s} => expected %s", classname, self.classname
+ )
+ return self.classname in str(classname).split(" ")
+ except WebDriverException:
+ return False
+
+
+class text_to_equal:
+ def __init__(self, selector, text, timeout):
+ self.selector = selector
+ self.text = text
+ self.timeout = timeout
+
+ def __call__(self, driver):
+ try:
+ elem = self._get_element(driver)
+ logger.debug("text to equal {%s} => expected %s", elem.text, self.text)
+ value = elem.get_attribute("value")
+ return str(elem.text) == self.text or (
+ value is not None and str(value) == self.text
+ )
+ except WebDriverException:
+ return False
+
+ def message(self, driver):
+ try:
+ element = self._get_element(driver)
+ text = "found: " + str(element.text) or str(element.get_attribute("value"))
+ except WebDriverException:
+ text = f"{self.selector} not found"
+ return f"text -> {self.text} not found within {self.timeout}s, {text}"
+
+ def _get_element(self, driver):
+ return driver.find_element(By.CSS_SELECTOR, self.selector)
+
+
+class style_to_equal:
+ def __init__(self, selector, style, val):
+ self.selector = selector
+ self.style = style
+ self.val = val
+
+ def __call__(self, driver):
+ try:
+ elem = driver.find_element(By.CSS_SELECTOR, self.selector)
+ val = elem.value_of_css_property(self.style)
+ logger.debug("style to equal {%s} => expected %s", val, self.val)
+ return val == self.val
+ except WebDriverException:
+ return False
+
+
+class class_to_equal:
+ def __init__(self, selector, classname):
+ self.selector = selector
+ self.classname = classname
+
+ def __call__(self, driver):
+ try:
+ elem = driver.find_element(By.CSS_SELECTOR, self.selector)
+ classname = elem.get_attribute("class")
+ logger.debug(
+ "class to equal {%s} => expected %s", classname, self.classname
+ )
+ return str(classname) == self.classname
+ except WebDriverException:
+ return False