diff options
| author | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
|---|---|---|
| committer | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
| commit | 5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch) | |
| tree | 8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/dash/background_callback | |
| parent | b832d364da8c2efe09e3f75828caf73c50d01ce3 (diff) | |
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/dash/background_callback')
5 files changed, 709 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/dash/background_callback/__init__.py b/venv/lib/python3.8/site-packages/dash/background_callback/__init__.py new file mode 100644 index 0000000..e4c6c35 --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/background_callback/__init__.py @@ -0,0 +1,6 @@ +from .managers.celery_manager import ( # noqa: F401,E402 + CeleryManager, +) +from .managers.diskcache_manager import ( # noqa: F401,E402 + DiskcacheManager, +) diff --git a/venv/lib/python3.8/site-packages/dash/background_callback/_proxy_set_props.py b/venv/lib/python3.8/site-packages/dash/background_callback/_proxy_set_props.py new file mode 100644 index 0000000..4d89c5b --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/background_callback/_proxy_set_props.py @@ -0,0 +1,18 @@ +class ProxySetProps(dict): + """ + Defer dictionary item setter to run a custom function on change. + Used by background callback manager to save the `set_props` data. + """ + + def __init__(self, on_change): + super().__init__() + self.on_change = on_change + self._data = {} + + def __setitem__(self, key, value): + self.on_change(key, value) + self._data.setdefault(key, {}) + self._data[key] = {**self._data[key], **value} + + def get(self, key, default=None): + return self._data.get(key, default) diff --git a/venv/lib/python3.8/site-packages/dash/background_callback/managers/__init__.py b/venv/lib/python3.8/site-packages/dash/background_callback/managers/__init__.py new file mode 100644 index 0000000..7b3a0c1 --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/background_callback/managers/__init__.py @@ -0,0 +1,117 @@ +from abc import ABC +import inspect +import hashlib + + +class BaseBackgroundCallbackManager(ABC): + UNDEFINED = object() + + # Keep a ref to all the ref to register every callback to every manager. + managers = [] + + # Keep every function for late registering. + functions = [] + + def __init__(self, cache_by): + if cache_by is not None and not isinstance(cache_by, list): + cache_by = [cache_by] + + self.cache_by = cache_by + + BaseBackgroundCallbackManager.managers.append(self) + + self.func_registry = {} + + # Register all funcs that were added before instantiation. + # Ensure all celery task are registered. + for fdetails in self.functions: + self.register(*fdetails) + + def terminate_job(self, job): + raise NotImplementedError + + def terminate_unhealthy_job(self, job): + raise NotImplementedError + + def job_running(self, job): + raise NotImplementedError + + def make_job_fn(self, fn, progress, key=None): + raise NotImplementedError + + def call_job_fn(self, key, job_fn, args, context): + raise NotImplementedError + + def get_progress(self, key): + raise NotImplementedError + + def result_ready(self, key): + raise NotImplementedError + + def get_result(self, key, job): + raise NotImplementedError + + def get_updated_props(self, key): + raise NotImplementedError + + def build_cache_key(self, fn, args, cache_args_to_ignore, triggered): + fn_source = inspect.getsource(fn) + + if not isinstance(cache_args_to_ignore, (list, tuple)): + cache_args_to_ignore = [cache_args_to_ignore] + + if cache_args_to_ignore: + if isinstance(args, dict): + args = {k: v for k, v in args.items() if k not in cache_args_to_ignore} + else: + args = [ + arg for i, arg in enumerate(args) if i not in cache_args_to_ignore + ] + + hash_dict = dict(args=args, fn_source=fn_source, triggered=triggered) + + if self.cache_by is not None: + # Caching enabled + for i, cache_item in enumerate(self.cache_by): + # Call cache function + hash_dict[f"cache_key_{i}"] = cache_item() + + return hashlib.sha256(str(hash_dict).encode("utf-8")).hexdigest() + + def register(self, key, fn, progress): + self.func_registry[key] = self.make_job_fn(fn, progress, key) + + @staticmethod + def register_func(fn, progress, callback_id): + key = BaseBackgroundCallbackManager.hash_function(fn, callback_id) + BaseBackgroundCallbackManager.functions.append( + ( + key, + fn, + progress, + ) + ) + + for manager in BaseBackgroundCallbackManager.managers: + manager.register(key, fn, progress) + + return key + + @staticmethod + def _make_progress_key(key): + return key + "-progress" + + @staticmethod + def _make_set_props_key(key): + return f"{key}-set_props" + + @staticmethod + def hash_function(fn, callback_id=""): + try: + fn_source = inspect.getsource(fn) + fn_str = fn_source + except OSError: # pylint: disable=too-broad-exception + fn_str = getattr(fn, "__name__", "") + return hashlib.sha256( + callback_id.encode("utf-8") + fn_str.encode("utf-8") + ).hexdigest() diff --git a/venv/lib/python3.8/site-packages/dash/background_callback/managers/celery_manager.py b/venv/lib/python3.8/site-packages/dash/background_callback/managers/celery_manager.py new file mode 100644 index 0000000..5aa1f57 --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/background_callback/managers/celery_manager.py @@ -0,0 +1,263 @@ +import json +import traceback +from contextvars import copy_context +import asyncio +from functools import partial + +from _plotly_utils.utils import PlotlyJSONEncoder + +from dash._callback_context import context_value +from dash._utils import AttributeDict +from dash.exceptions import PreventUpdate +from dash.background_callback._proxy_set_props import ProxySetProps +from dash.background_callback.managers import BaseBackgroundCallbackManager + + +class CeleryManager(BaseBackgroundCallbackManager): + """Manage background execution of callbacks with a celery queue.""" + + def __init__(self, celery_app, cache_by=None, expire=None): + """ + Background callback manager that runs callback logic on a celery task queue, + and stores results using a celery result backend. + + :param celery_app: + A celery.Celery application instance that must be configured with a + result backend. See the celery documentation for information on + configuration options. + :param cache_by: + A list of zero-argument functions. When provided, caching is enabled and + the return values of these functions are combined with the callback + function's input arguments, triggered inputs and source code to generate cache keys. + :param expire: + If provided, a cache entry will be removed when it has not been accessed + for ``expire`` seconds. If not provided, the lifetime of cache entries + is determined by the default behavior of the celery result backend. + """ + try: + import celery # type: ignore[reportMissingImports]; pylint: disable=import-outside-toplevel,import-error + from celery.backends.base import ( # type: ignore[reportMissingImports]; pylint: disable=import-outside-toplevel,import-error + DisabledBackend, + ) + except ImportError as missing_imports: + raise ImportError( + """\ +CeleryManager requires extra dependencies which can be installed doing + + $ pip install "dash[celery]"\n""" + ) from missing_imports + + if not isinstance(celery_app, celery.Celery): + raise ValueError("First argument must be a celery.Celery object") + + if isinstance(celery_app.backend, DisabledBackend): + raise ValueError("Celery instance must be configured with a result backend") + + self.handle = celery_app + self.expire = expire + super().__init__(cache_by) + + def terminate_job(self, job): + if job is None: + return + + self.handle.control.terminate(job) + + def terminate_unhealthy_job(self, job): + task = self.get_task(job) + if task and task.status in ("FAILURE", "REVOKED"): + return self.terminate_job(job) + return False + + def job_running(self, job): + future = self.get_task(job) + return future and future.status in ( + "PENDING", + "RECEIVED", + "STARTED", + "RETRY", + "PROGRESS", + ) + + def make_job_fn(self, fn, progress, key=None): + return _make_job_fn(fn, self.handle, progress, key) + + def get_task(self, job): + if job: + return self.handle.AsyncResult(job) + + return None + + def clear_cache_entry(self, key): + self.handle.backend.delete(key) + + def call_job_fn(self, key, job_fn, args, context): + task = job_fn.delay(key, self._make_progress_key(key), args, context) + return task.task_id + + def get_progress(self, key): + progress_key = self._make_progress_key(key) + progress_data = self.handle.backend.get(progress_key) + if progress_data: + self.handle.backend.delete(progress_key) + return json.loads(progress_data) + + return None + + def result_ready(self, key): + return self.handle.backend.get(key) is not None + + def get_result(self, key, job): + # Get result value + result = self.handle.backend.get(key) + if result is None: + return self.UNDEFINED + + result = json.loads(result) + + # Clear result if not caching + if self.cache_by is None: + self.clear_cache_entry(key) + else: + if self.expire: + # Set/update expiration time + self.handle.backend.expire(key, self.expire) + self.clear_cache_entry(self._make_progress_key(key)) + + self.terminate_job(job) + return result + + def get_updated_props(self, key): + updated_props = self.handle.backend.get(self._make_set_props_key(key)) + if updated_props is None: + return {} + + self.clear_cache_entry(key) + + return json.loads(updated_props) + + +def _make_job_fn(fn, celery_app, progress, key): # pylint: disable=too-many-statements + cache = celery_app.backend + + @celery_app.task(name=f"background_callback_{key}") + def job_fn( + result_key, progress_key, user_callback_args, context=None + ): # pylint: disable=too-many-statements + def _set_progress(progress_value): + if not isinstance(progress_value, (list, tuple)): + progress_value = [progress_value] + + cache.set(progress_key, json.dumps(progress_value, cls=PlotlyJSONEncoder)) + + maybe_progress = [_set_progress] if progress else [] + + def _set_props(_id, props): + cache.set( + f"{result_key}-set_props", + json.dumps({_id: props}, cls=PlotlyJSONEncoder), + ) + + ctx = copy_context() + + def run(): + c = AttributeDict(**context) # type: ignore[reportCallIssue] + c.ignore_register_page = False + c.updated_props = ProxySetProps(_set_props) + context_value.set(c) + errored = False + user_callback_output = None # to help type checking + try: + if isinstance(user_callback_args, dict): + user_callback_output = fn(*maybe_progress, **user_callback_args) + elif isinstance(user_callback_args, (list, tuple)): + user_callback_output = fn(*maybe_progress, *user_callback_args) + else: + user_callback_output = fn(*maybe_progress, user_callback_args) + except PreventUpdate: + # Put NoUpdate dict directly to avoid circular imports. + errored = True + cache.set( + result_key, + json.dumps( + {"_dash_no_update": "_dash_no_update"}, cls=PlotlyJSONEncoder + ), + ) + except Exception as err: # pylint: disable=broad-except + errored = True + cache.set( + result_key, + json.dumps( + { + "background_callback_error": { + "msg": str(err), + "tb": traceback.format_exc(), + } + }, + ), + ) + + if not errored: + cache.set( + result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder) + ) + + async def async_run(): + c = AttributeDict(**context) + c.ignore_register_page = False + c.updated_props = ProxySetProps(_set_props) + context_value.set(c) + errored = False + try: + if isinstance(user_callback_args, dict): + user_callback_output = await fn( + *maybe_progress, **user_callback_args + ) + elif isinstance(user_callback_args, (list, tuple)): + user_callback_output = await fn( + *maybe_progress, *user_callback_args + ) + else: + user_callback_output = await fn(*maybe_progress, user_callback_args) + except PreventUpdate: + # Put NoUpdate dict directly to avoid circular imports. + errored = True + cache.set( + result_key, + json.dumps( + {"_dash_no_update": "_dash_no_update"}, cls=PlotlyJSONEncoder + ), + ) + except Exception as err: # pylint: disable=broad-except + errored = True + cache.set( + result_key, + json.dumps( + { + "background_callback_error": { + "msg": str(err), + "tb": traceback.format_exc(), + } + }, + ), + ) + + if asyncio.iscoroutine(user_callback_output): + user_callback_output = await user_callback_output + + if not errored: + cache.set( + result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder) + ) + + if asyncio.iscoroutinefunction(fn): + func = partial(ctx.run, async_run) + asyncio.run(func()) + else: + ctx.run(run) + + return job_fn + + +class CeleryLongCallbackManager(CeleryManager): + """Deprecated: use `from dash import CeleryManager` instead.""" diff --git a/venv/lib/python3.8/site-packages/dash/background_callback/managers/diskcache_manager.py b/venv/lib/python3.8/site-packages/dash/background_callback/managers/diskcache_manager.py new file mode 100644 index 0000000..094485a --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/background_callback/managers/diskcache_manager.py @@ -0,0 +1,305 @@ +import traceback +from contextvars import copy_context +import asyncio +from functools import partial + + +from . import BaseBackgroundCallbackManager +from .._proxy_set_props import ProxySetProps +from ..._callback_context import context_value +from ..._utils import AttributeDict +from ...exceptions import PreventUpdate + +_pending_value = "__$pending__" + + +class DiskcacheManager(BaseBackgroundCallbackManager): + """Manage the background execution of callbacks with subprocesses and a diskcache result backend.""" + + def __init__(self, cache=None, cache_by=None, expire=None): + """ + Background callback manager that runs callback logic in a subprocess and stores + results on disk using diskcache + + :param cache: + A diskcache.Cache or diskcache.FanoutCache instance. See the diskcache + documentation for information on configuration options. If not provided, + a diskcache.Cache instance will be created with default values. + :param cache_by: + A list of zero-argument functions. When provided, caching is enabled and + the return values of these functions are combined with the callback + function's input arguments, triggered inputs and source code to generate cache keys. + :param expire: + If provided, a cache entry will be removed when it has not been accessed + for ``expire`` seconds. If not provided, the lifetime of cache entries + is determined by the default behavior of the ``cache`` instance. + """ + try: + import diskcache # type: ignore[reportMissingImports]; pylint: disable=import-outside-toplevel + import psutil # noqa: F401,E402 pylint: disable=import-outside-toplevel,unused-import,unused-variable,import-error + import multiprocess # noqa: F401,E402 pylint: disable=import-outside-toplevel,unused-import,unused-variable + except ImportError as missing_imports: + raise ImportError( + """\ +DiskcacheManager requires extra dependencies which can be installed doing + + $ pip install "dash[diskcache]"\n""" + ) from missing_imports + + if cache is None: + self.handle = diskcache.Cache() + else: + if not isinstance(cache, (diskcache.Cache, diskcache.FanoutCache)): + raise ValueError( + "First argument must be a diskcache.Cache " + "or diskcache.FanoutCache object" + ) + self.handle = cache + + self.expire = expire + super().__init__(cache_by) + + def terminate_job(self, job): + import psutil # pylint: disable=import-outside-toplevel,import-error + + if job is None: + return + + job = int(job) + + # Use diskcache transaction so multiple process don't try to kill the + # process at the same time + with self.handle.transact(): + if psutil.pid_exists(job): + process = psutil.Process(job) + + for proc in process.children(recursive=True): + try: + proc.kill() + except psutil.NoSuchProcess: + pass + + try: + process.kill() + except psutil.NoSuchProcess: + pass + + try: + process.wait(1) + except (psutil.TimeoutExpired, psutil.NoSuchProcess): + pass + + def terminate_unhealthy_job(self, job): + import psutil # pylint: disable=import-outside-toplevel,import-error + + job = int(job) + + if job and psutil.pid_exists(job): + if not self.job_running(job): + self.terminate_job(job) + return True + + return False + + def job_running(self, job): + import psutil # pylint: disable=import-outside-toplevel,import-error + + job = int(job) + + if job and psutil.pid_exists(job): + proc = psutil.Process(job) + return proc.status() != psutil.STATUS_ZOMBIE + return False + + def make_job_fn(self, fn, progress, key=None): + return _make_job_fn(fn, self.handle, progress) + + def clear_cache_entry(self, key): + self.handle.delete(key) + + # noinspection PyUnresolvedReferences + def call_job_fn(self, key, job_fn, args, context): + """ + Call the job function, supporting both sync and async jobs. + Args: + key: Cache key for the job. + job_fn: The job function to execute. + args: Arguments for the job function. + context: Context for the job. + Returns: + The PID of the spawned process or None for async execution. + """ + # pylint: disable-next=import-outside-toplevel,no-name-in-module,import-error + from multiprocess import Process # type: ignore + + # pylint: disable-next=not-callable + process = Process( + target=job_fn, + args=(key, self._make_progress_key(key), args, context), + ) + process.start() + return process.pid + + @staticmethod + def _run_async_in_process(job_fn, key, args, context): + """ + Helper function to run an async job in a new process. + Args: + job_fn: The async job function. + key: Cache key for the job. + args: Arguments for the job function. + context: Context for the job. + """ + # Create a new event loop for the process + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Wrap the job function to include key and progress + async_job = partial(job_fn, key, args, context) + + try: + # Run the async job and wait for completion + loop.run_until_complete(async_job()) + except Exception as e: + # Handle errors, log them, and cache if necessary + raise Exception(str(e)) from e + finally: + loop.close() + + def get_progress(self, key): + progress_key = self._make_progress_key(key) + progress_data = self.handle.get(progress_key) + if progress_data: + self.handle.delete(progress_key) + + return progress_data + + def result_ready(self, key): + return self.handle.get(key) is not None + + def get_result(self, key, job): + # Get result value + result = self.handle.get(key, self.UNDEFINED) + if result is self.UNDEFINED: + return self.UNDEFINED + + # Clear result if not caching + if self.cache_by is None: + self.clear_cache_entry(key) + else: + if self.expire: + self.handle.touch(key, expire=self.expire) + + self.clear_cache_entry(self._make_progress_key(key)) + + if job: + self.terminate_job(job) + return result + + def get_updated_props(self, key): + set_props_key = self._make_set_props_key(key) + result = self.handle.get(set_props_key, self.UNDEFINED) + if result is self.UNDEFINED: + return {} + + self.clear_cache_entry(set_props_key) + + return result + + +# pylint: disable-next=too-many-statements +def _make_job_fn(fn, cache, progress): + # pylint: disable-next=too-many-statements + def job_fn(result_key, progress_key, user_callback_args, context): + def _set_progress(progress_value): + if not isinstance(progress_value, (list, tuple)): + progress_value = [progress_value] + + cache.set(progress_key, progress_value) + + maybe_progress = [_set_progress] if progress else [] + + def _set_props(_id, props): + cache.set(f"{result_key}-set_props", {_id: props}) + + ctx = copy_context() + + def run(): + c = AttributeDict(**context) + c.ignore_register_page = False + c.updated_props = ProxySetProps(_set_props) + context_value.set(c) + errored = False + user_callback_output = None # initialized to prevent type checker warnings + try: + if isinstance(user_callback_args, dict): + user_callback_output = fn(*maybe_progress, **user_callback_args) + elif isinstance(user_callback_args, (list, tuple)): + user_callback_output = fn(*maybe_progress, *user_callback_args) + else: + user_callback_output = fn(*maybe_progress, user_callback_args) + except PreventUpdate: + errored = True + cache.set(result_key, {"_dash_no_update": "_dash_no_update"}) + except Exception as err: # pylint: disable=broad-except + errored = True + cache.set( + result_key, + { + "background_callback_error": { + "msg": str(err), + "tb": traceback.format_exc(), + } + }, + ) + + if not errored: + cache.set(result_key, user_callback_output) + + async def async_run(): + c = AttributeDict(**context) + c.ignore_register_page = False + c.updated_props = ProxySetProps(_set_props) + context_value.set(c) + errored = False + try: + if isinstance(user_callback_args, dict): + user_callback_output = await fn( + *maybe_progress, **user_callback_args + ) + elif isinstance(user_callback_args, (list, tuple)): + user_callback_output = await fn( + *maybe_progress, *user_callback_args + ) + else: + user_callback_output = await fn(*maybe_progress, user_callback_args) + except PreventUpdate: + errored = True + cache.set(result_key, {"_dash_no_update": "_dash_no_update"}) + except Exception as err: # pylint: disable=broad-except + errored = True + cache.set( + result_key, + { + "background_callback_error": { + "msg": str(err), + "tb": traceback.format_exc(), + } + }, + ) + if asyncio.iscoroutine(user_callback_output): + user_callback_output = await user_callback_output + if not errored: + cache.set(result_key, user_callback_output) + + if asyncio.iscoroutinefunction(fn): + func = partial(ctx.run, async_run) + asyncio.run(func()) + else: + ctx.run(run) + + return job_fn + + +class DiskcacheLongCallbackManager(DiskcacheManager): + """Deprecated: use `from dash import DiskcacheManager` instead.""" |
