aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/dash/background_callback/managers
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/dash/background_callback/managers
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/dash/background_callback/managers')
-rw-r--r--venv/lib/python3.8/site-packages/dash/background_callback/managers/__init__.py117
-rw-r--r--venv/lib/python3.8/site-packages/dash/background_callback/managers/celery_manager.py263
-rw-r--r--venv/lib/python3.8/site-packages/dash/background_callback/managers/diskcache_manager.py305
3 files changed, 685 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/dash/background_callback/managers/__init__.py b/venv/lib/python3.8/site-packages/dash/background_callback/managers/__init__.py
new file mode 100644
index 0000000..7b3a0c1
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/background_callback/managers/__init__.py
@@ -0,0 +1,117 @@
+from abc import ABC
+import inspect
+import hashlib
+
+
+class BaseBackgroundCallbackManager(ABC):
+ UNDEFINED = object()
+
+ # Keep a ref to all the ref to register every callback to every manager.
+ managers = []
+
+ # Keep every function for late registering.
+ functions = []
+
+ def __init__(self, cache_by):
+ if cache_by is not None and not isinstance(cache_by, list):
+ cache_by = [cache_by]
+
+ self.cache_by = cache_by
+
+ BaseBackgroundCallbackManager.managers.append(self)
+
+ self.func_registry = {}
+
+ # Register all funcs that were added before instantiation.
+ # Ensure all celery task are registered.
+ for fdetails in self.functions:
+ self.register(*fdetails)
+
+ def terminate_job(self, job):
+ raise NotImplementedError
+
+ def terminate_unhealthy_job(self, job):
+ raise NotImplementedError
+
+ def job_running(self, job):
+ raise NotImplementedError
+
+ def make_job_fn(self, fn, progress, key=None):
+ raise NotImplementedError
+
+ def call_job_fn(self, key, job_fn, args, context):
+ raise NotImplementedError
+
+ def get_progress(self, key):
+ raise NotImplementedError
+
+ def result_ready(self, key):
+ raise NotImplementedError
+
+ def get_result(self, key, job):
+ raise NotImplementedError
+
+ def get_updated_props(self, key):
+ raise NotImplementedError
+
+ def build_cache_key(self, fn, args, cache_args_to_ignore, triggered):
+ fn_source = inspect.getsource(fn)
+
+ if not isinstance(cache_args_to_ignore, (list, tuple)):
+ cache_args_to_ignore = [cache_args_to_ignore]
+
+ if cache_args_to_ignore:
+ if isinstance(args, dict):
+ args = {k: v for k, v in args.items() if k not in cache_args_to_ignore}
+ else:
+ args = [
+ arg for i, arg in enumerate(args) if i not in cache_args_to_ignore
+ ]
+
+ hash_dict = dict(args=args, fn_source=fn_source, triggered=triggered)
+
+ if self.cache_by is not None:
+ # Caching enabled
+ for i, cache_item in enumerate(self.cache_by):
+ # Call cache function
+ hash_dict[f"cache_key_{i}"] = cache_item()
+
+ return hashlib.sha256(str(hash_dict).encode("utf-8")).hexdigest()
+
+ def register(self, key, fn, progress):
+ self.func_registry[key] = self.make_job_fn(fn, progress, key)
+
+ @staticmethod
+ def register_func(fn, progress, callback_id):
+ key = BaseBackgroundCallbackManager.hash_function(fn, callback_id)
+ BaseBackgroundCallbackManager.functions.append(
+ (
+ key,
+ fn,
+ progress,
+ )
+ )
+
+ for manager in BaseBackgroundCallbackManager.managers:
+ manager.register(key, fn, progress)
+
+ return key
+
+ @staticmethod
+ def _make_progress_key(key):
+ return key + "-progress"
+
+ @staticmethod
+ def _make_set_props_key(key):
+ return f"{key}-set_props"
+
+ @staticmethod
+ def hash_function(fn, callback_id=""):
+ try:
+ fn_source = inspect.getsource(fn)
+ fn_str = fn_source
+ except OSError: # pylint: disable=too-broad-exception
+ fn_str = getattr(fn, "__name__", "")
+ return hashlib.sha256(
+ callback_id.encode("utf-8") + fn_str.encode("utf-8")
+ ).hexdigest()
diff --git a/venv/lib/python3.8/site-packages/dash/background_callback/managers/celery_manager.py b/venv/lib/python3.8/site-packages/dash/background_callback/managers/celery_manager.py
new file mode 100644
index 0000000..5aa1f57
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/background_callback/managers/celery_manager.py
@@ -0,0 +1,263 @@
+import json
+import traceback
+from contextvars import copy_context
+import asyncio
+from functools import partial
+
+from _plotly_utils.utils import PlotlyJSONEncoder
+
+from dash._callback_context import context_value
+from dash._utils import AttributeDict
+from dash.exceptions import PreventUpdate
+from dash.background_callback._proxy_set_props import ProxySetProps
+from dash.background_callback.managers import BaseBackgroundCallbackManager
+
+
+class CeleryManager(BaseBackgroundCallbackManager):
+ """Manage background execution of callbacks with a celery queue."""
+
+ def __init__(self, celery_app, cache_by=None, expire=None):
+ """
+ Background callback manager that runs callback logic on a celery task queue,
+ and stores results using a celery result backend.
+
+ :param celery_app:
+ A celery.Celery application instance that must be configured with a
+ result backend. See the celery documentation for information on
+ configuration options.
+ :param cache_by:
+ A list of zero-argument functions. When provided, caching is enabled and
+ the return values of these functions are combined with the callback
+ function's input arguments, triggered inputs and source code to generate cache keys.
+ :param expire:
+ If provided, a cache entry will be removed when it has not been accessed
+ for ``expire`` seconds. If not provided, the lifetime of cache entries
+ is determined by the default behavior of the celery result backend.
+ """
+ try:
+ import celery # type: ignore[reportMissingImports]; pylint: disable=import-outside-toplevel,import-error
+ from celery.backends.base import ( # type: ignore[reportMissingImports]; pylint: disable=import-outside-toplevel,import-error
+ DisabledBackend,
+ )
+ except ImportError as missing_imports:
+ raise ImportError(
+ """\
+CeleryManager requires extra dependencies which can be installed doing
+
+ $ pip install "dash[celery]"\n"""
+ ) from missing_imports
+
+ if not isinstance(celery_app, celery.Celery):
+ raise ValueError("First argument must be a celery.Celery object")
+
+ if isinstance(celery_app.backend, DisabledBackend):
+ raise ValueError("Celery instance must be configured with a result backend")
+
+ self.handle = celery_app
+ self.expire = expire
+ super().__init__(cache_by)
+
+ def terminate_job(self, job):
+ if job is None:
+ return
+
+ self.handle.control.terminate(job)
+
+ def terminate_unhealthy_job(self, job):
+ task = self.get_task(job)
+ if task and task.status in ("FAILURE", "REVOKED"):
+ return self.terminate_job(job)
+ return False
+
+ def job_running(self, job):
+ future = self.get_task(job)
+ return future and future.status in (
+ "PENDING",
+ "RECEIVED",
+ "STARTED",
+ "RETRY",
+ "PROGRESS",
+ )
+
+ def make_job_fn(self, fn, progress, key=None):
+ return _make_job_fn(fn, self.handle, progress, key)
+
+ def get_task(self, job):
+ if job:
+ return self.handle.AsyncResult(job)
+
+ return None
+
+ def clear_cache_entry(self, key):
+ self.handle.backend.delete(key)
+
+ def call_job_fn(self, key, job_fn, args, context):
+ task = job_fn.delay(key, self._make_progress_key(key), args, context)
+ return task.task_id
+
+ def get_progress(self, key):
+ progress_key = self._make_progress_key(key)
+ progress_data = self.handle.backend.get(progress_key)
+ if progress_data:
+ self.handle.backend.delete(progress_key)
+ return json.loads(progress_data)
+
+ return None
+
+ def result_ready(self, key):
+ return self.handle.backend.get(key) is not None
+
+ def get_result(self, key, job):
+ # Get result value
+ result = self.handle.backend.get(key)
+ if result is None:
+ return self.UNDEFINED
+
+ result = json.loads(result)
+
+ # Clear result if not caching
+ if self.cache_by is None:
+ self.clear_cache_entry(key)
+ else:
+ if self.expire:
+ # Set/update expiration time
+ self.handle.backend.expire(key, self.expire)
+ self.clear_cache_entry(self._make_progress_key(key))
+
+ self.terminate_job(job)
+ return result
+
+ def get_updated_props(self, key):
+ updated_props = self.handle.backend.get(self._make_set_props_key(key))
+ if updated_props is None:
+ return {}
+
+ self.clear_cache_entry(key)
+
+ return json.loads(updated_props)
+
+
+def _make_job_fn(fn, celery_app, progress, key): # pylint: disable=too-many-statements
+ cache = celery_app.backend
+
+ @celery_app.task(name=f"background_callback_{key}")
+ def job_fn(
+ result_key, progress_key, user_callback_args, context=None
+ ): # pylint: disable=too-many-statements
+ def _set_progress(progress_value):
+ if not isinstance(progress_value, (list, tuple)):
+ progress_value = [progress_value]
+
+ cache.set(progress_key, json.dumps(progress_value, cls=PlotlyJSONEncoder))
+
+ maybe_progress = [_set_progress] if progress else []
+
+ def _set_props(_id, props):
+ cache.set(
+ f"{result_key}-set_props",
+ json.dumps({_id: props}, cls=PlotlyJSONEncoder),
+ )
+
+ ctx = copy_context()
+
+ def run():
+ c = AttributeDict(**context) # type: ignore[reportCallIssue]
+ c.ignore_register_page = False
+ c.updated_props = ProxySetProps(_set_props)
+ context_value.set(c)
+ errored = False
+ user_callback_output = None # to help type checking
+ try:
+ if isinstance(user_callback_args, dict):
+ user_callback_output = fn(*maybe_progress, **user_callback_args)
+ elif isinstance(user_callback_args, (list, tuple)):
+ user_callback_output = fn(*maybe_progress, *user_callback_args)
+ else:
+ user_callback_output = fn(*maybe_progress, user_callback_args)
+ except PreventUpdate:
+ # Put NoUpdate dict directly to avoid circular imports.
+ errored = True
+ cache.set(
+ result_key,
+ json.dumps(
+ {"_dash_no_update": "_dash_no_update"}, cls=PlotlyJSONEncoder
+ ),
+ )
+ except Exception as err: # pylint: disable=broad-except
+ errored = True
+ cache.set(
+ result_key,
+ json.dumps(
+ {
+ "background_callback_error": {
+ "msg": str(err),
+ "tb": traceback.format_exc(),
+ }
+ },
+ ),
+ )
+
+ if not errored:
+ cache.set(
+ result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder)
+ )
+
+ async def async_run():
+ c = AttributeDict(**context)
+ c.ignore_register_page = False
+ c.updated_props = ProxySetProps(_set_props)
+ context_value.set(c)
+ errored = False
+ try:
+ if isinstance(user_callback_args, dict):
+ user_callback_output = await fn(
+ *maybe_progress, **user_callback_args
+ )
+ elif isinstance(user_callback_args, (list, tuple)):
+ user_callback_output = await fn(
+ *maybe_progress, *user_callback_args
+ )
+ else:
+ user_callback_output = await fn(*maybe_progress, user_callback_args)
+ except PreventUpdate:
+ # Put NoUpdate dict directly to avoid circular imports.
+ errored = True
+ cache.set(
+ result_key,
+ json.dumps(
+ {"_dash_no_update": "_dash_no_update"}, cls=PlotlyJSONEncoder
+ ),
+ )
+ except Exception as err: # pylint: disable=broad-except
+ errored = True
+ cache.set(
+ result_key,
+ json.dumps(
+ {
+ "background_callback_error": {
+ "msg": str(err),
+ "tb": traceback.format_exc(),
+ }
+ },
+ ),
+ )
+
+ if asyncio.iscoroutine(user_callback_output):
+ user_callback_output = await user_callback_output
+
+ if not errored:
+ cache.set(
+ result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder)
+ )
+
+ if asyncio.iscoroutinefunction(fn):
+ func = partial(ctx.run, async_run)
+ asyncio.run(func())
+ else:
+ ctx.run(run)
+
+ return job_fn
+
+
+class CeleryLongCallbackManager(CeleryManager):
+ """Deprecated: use `from dash import CeleryManager` instead."""
diff --git a/venv/lib/python3.8/site-packages/dash/background_callback/managers/diskcache_manager.py b/venv/lib/python3.8/site-packages/dash/background_callback/managers/diskcache_manager.py
new file mode 100644
index 0000000..094485a
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/dash/background_callback/managers/diskcache_manager.py
@@ -0,0 +1,305 @@
+import traceback
+from contextvars import copy_context
+import asyncio
+from functools import partial
+
+
+from . import BaseBackgroundCallbackManager
+from .._proxy_set_props import ProxySetProps
+from ..._callback_context import context_value
+from ..._utils import AttributeDict
+from ...exceptions import PreventUpdate
+
+_pending_value = "__$pending__"
+
+
+class DiskcacheManager(BaseBackgroundCallbackManager):
+ """Manage the background execution of callbacks with subprocesses and a diskcache result backend."""
+
+ def __init__(self, cache=None, cache_by=None, expire=None):
+ """
+ Background callback manager that runs callback logic in a subprocess and stores
+ results on disk using diskcache
+
+ :param cache:
+ A diskcache.Cache or diskcache.FanoutCache instance. See the diskcache
+ documentation for information on configuration options. If not provided,
+ a diskcache.Cache instance will be created with default values.
+ :param cache_by:
+ A list of zero-argument functions. When provided, caching is enabled and
+ the return values of these functions are combined with the callback
+ function's input arguments, triggered inputs and source code to generate cache keys.
+ :param expire:
+ If provided, a cache entry will be removed when it has not been accessed
+ for ``expire`` seconds. If not provided, the lifetime of cache entries
+ is determined by the default behavior of the ``cache`` instance.
+ """
+ try:
+ import diskcache # type: ignore[reportMissingImports]; pylint: disable=import-outside-toplevel
+ import psutil # noqa: F401,E402 pylint: disable=import-outside-toplevel,unused-import,unused-variable,import-error
+ import multiprocess # noqa: F401,E402 pylint: disable=import-outside-toplevel,unused-import,unused-variable
+ except ImportError as missing_imports:
+ raise ImportError(
+ """\
+DiskcacheManager requires extra dependencies which can be installed doing
+
+ $ pip install "dash[diskcache]"\n"""
+ ) from missing_imports
+
+ if cache is None:
+ self.handle = diskcache.Cache()
+ else:
+ if not isinstance(cache, (diskcache.Cache, diskcache.FanoutCache)):
+ raise ValueError(
+ "First argument must be a diskcache.Cache "
+ "or diskcache.FanoutCache object"
+ )
+ self.handle = cache
+
+ self.expire = expire
+ super().__init__(cache_by)
+
+ def terminate_job(self, job):
+ import psutil # pylint: disable=import-outside-toplevel,import-error
+
+ if job is None:
+ return
+
+ job = int(job)
+
+ # Use diskcache transaction so multiple process don't try to kill the
+ # process at the same time
+ with self.handle.transact():
+ if psutil.pid_exists(job):
+ process = psutil.Process(job)
+
+ for proc in process.children(recursive=True):
+ try:
+ proc.kill()
+ except psutil.NoSuchProcess:
+ pass
+
+ try:
+ process.kill()
+ except psutil.NoSuchProcess:
+ pass
+
+ try:
+ process.wait(1)
+ except (psutil.TimeoutExpired, psutil.NoSuchProcess):
+ pass
+
+ def terminate_unhealthy_job(self, job):
+ import psutil # pylint: disable=import-outside-toplevel,import-error
+
+ job = int(job)
+
+ if job and psutil.pid_exists(job):
+ if not self.job_running(job):
+ self.terminate_job(job)
+ return True
+
+ return False
+
+ def job_running(self, job):
+ import psutil # pylint: disable=import-outside-toplevel,import-error
+
+ job = int(job)
+
+ if job and psutil.pid_exists(job):
+ proc = psutil.Process(job)
+ return proc.status() != psutil.STATUS_ZOMBIE
+ return False
+
+ def make_job_fn(self, fn, progress, key=None):
+ return _make_job_fn(fn, self.handle, progress)
+
+ def clear_cache_entry(self, key):
+ self.handle.delete(key)
+
+ # noinspection PyUnresolvedReferences
+ def call_job_fn(self, key, job_fn, args, context):
+ """
+ Call the job function, supporting both sync and async jobs.
+ Args:
+ key: Cache key for the job.
+ job_fn: The job function to execute.
+ args: Arguments for the job function.
+ context: Context for the job.
+ Returns:
+ The PID of the spawned process or None for async execution.
+ """
+ # pylint: disable-next=import-outside-toplevel,no-name-in-module,import-error
+ from multiprocess import Process # type: ignore
+
+ # pylint: disable-next=not-callable
+ process = Process(
+ target=job_fn,
+ args=(key, self._make_progress_key(key), args, context),
+ )
+ process.start()
+ return process.pid
+
+ @staticmethod
+ def _run_async_in_process(job_fn, key, args, context):
+ """
+ Helper function to run an async job in a new process.
+ Args:
+ job_fn: The async job function.
+ key: Cache key for the job.
+ args: Arguments for the job function.
+ context: Context for the job.
+ """
+ # Create a new event loop for the process
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ # Wrap the job function to include key and progress
+ async_job = partial(job_fn, key, args, context)
+
+ try:
+ # Run the async job and wait for completion
+ loop.run_until_complete(async_job())
+ except Exception as e:
+ # Handle errors, log them, and cache if necessary
+ raise Exception(str(e)) from e
+ finally:
+ loop.close()
+
+ def get_progress(self, key):
+ progress_key = self._make_progress_key(key)
+ progress_data = self.handle.get(progress_key)
+ if progress_data:
+ self.handle.delete(progress_key)
+
+ return progress_data
+
+ def result_ready(self, key):
+ return self.handle.get(key) is not None
+
+ def get_result(self, key, job):
+ # Get result value
+ result = self.handle.get(key, self.UNDEFINED)
+ if result is self.UNDEFINED:
+ return self.UNDEFINED
+
+ # Clear result if not caching
+ if self.cache_by is None:
+ self.clear_cache_entry(key)
+ else:
+ if self.expire:
+ self.handle.touch(key, expire=self.expire)
+
+ self.clear_cache_entry(self._make_progress_key(key))
+
+ if job:
+ self.terminate_job(job)
+ return result
+
+ def get_updated_props(self, key):
+ set_props_key = self._make_set_props_key(key)
+ result = self.handle.get(set_props_key, self.UNDEFINED)
+ if result is self.UNDEFINED:
+ return {}
+
+ self.clear_cache_entry(set_props_key)
+
+ return result
+
+
+# pylint: disable-next=too-many-statements
+def _make_job_fn(fn, cache, progress):
+ # pylint: disable-next=too-many-statements
+ def job_fn(result_key, progress_key, user_callback_args, context):
+ def _set_progress(progress_value):
+ if not isinstance(progress_value, (list, tuple)):
+ progress_value = [progress_value]
+
+ cache.set(progress_key, progress_value)
+
+ maybe_progress = [_set_progress] if progress else []
+
+ def _set_props(_id, props):
+ cache.set(f"{result_key}-set_props", {_id: props})
+
+ ctx = copy_context()
+
+ def run():
+ c = AttributeDict(**context)
+ c.ignore_register_page = False
+ c.updated_props = ProxySetProps(_set_props)
+ context_value.set(c)
+ errored = False
+ user_callback_output = None # initialized to prevent type checker warnings
+ try:
+ if isinstance(user_callback_args, dict):
+ user_callback_output = fn(*maybe_progress, **user_callback_args)
+ elif isinstance(user_callback_args, (list, tuple)):
+ user_callback_output = fn(*maybe_progress, *user_callback_args)
+ else:
+ user_callback_output = fn(*maybe_progress, user_callback_args)
+ except PreventUpdate:
+ errored = True
+ cache.set(result_key, {"_dash_no_update": "_dash_no_update"})
+ except Exception as err: # pylint: disable=broad-except
+ errored = True
+ cache.set(
+ result_key,
+ {
+ "background_callback_error": {
+ "msg": str(err),
+ "tb": traceback.format_exc(),
+ }
+ },
+ )
+
+ if not errored:
+ cache.set(result_key, user_callback_output)
+
+ async def async_run():
+ c = AttributeDict(**context)
+ c.ignore_register_page = False
+ c.updated_props = ProxySetProps(_set_props)
+ context_value.set(c)
+ errored = False
+ try:
+ if isinstance(user_callback_args, dict):
+ user_callback_output = await fn(
+ *maybe_progress, **user_callback_args
+ )
+ elif isinstance(user_callback_args, (list, tuple)):
+ user_callback_output = await fn(
+ *maybe_progress, *user_callback_args
+ )
+ else:
+ user_callback_output = await fn(*maybe_progress, user_callback_args)
+ except PreventUpdate:
+ errored = True
+ cache.set(result_key, {"_dash_no_update": "_dash_no_update"})
+ except Exception as err: # pylint: disable=broad-except
+ errored = True
+ cache.set(
+ result_key,
+ {
+ "background_callback_error": {
+ "msg": str(err),
+ "tb": traceback.format_exc(),
+ }
+ },
+ )
+ if asyncio.iscoroutine(user_callback_output):
+ user_callback_output = await user_callback_output
+ if not errored:
+ cache.set(result_key, user_callback_output)
+
+ if asyncio.iscoroutinefunction(fn):
+ func = partial(ctx.run, async_run)
+ asyncio.run(func())
+ else:
+ ctx.run(run)
+
+ return job_fn
+
+
+class DiskcacheLongCallbackManager(DiskcacheManager):
+ """Deprecated: use `from dash import DiskcacheManager` instead."""