aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/narwhals/_dask
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/narwhals/_dask
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/narwhals/_dask')
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/__init__.py0
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/dataframe.py443
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/expr.py675
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/expr_dt.py162
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/expr_str.py98
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/group_by.py122
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/namespace.py320
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/selectors.py30
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_dask/utils.py160
9 files changed, 2010 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/__init__.py b/venv/lib/python3.8/site-packages/narwhals/_dask/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/__init__.py
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/dataframe.py b/venv/lib/python3.8/site-packages/narwhals/_dask/dataframe.py
new file mode 100644
index 0000000..f03c763
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/dataframe.py
@@ -0,0 +1,443 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Iterator, Mapping, Sequence
+
+import dask.dataframe as dd
+import pandas as pd
+
+from narwhals._dask.utils import add_row_index, evaluate_exprs
+from narwhals._pandas_like.utils import native_to_narwhals_dtype, select_columns_by_name
+from narwhals._utils import (
+ Implementation,
+ _remap_full_join_keys,
+ check_column_names_are_unique,
+ generate_temporary_column_name,
+ not_implemented,
+ parse_columns_to_drop,
+ parse_version,
+ validate_backend_version,
+)
+from narwhals.typing import CompliantLazyFrame
+
+if TYPE_CHECKING:
+ from types import ModuleType
+
+ import dask.dataframe.dask_expr as dx
+ from typing_extensions import Self, TypeIs
+
+ from narwhals._compliant.typing import CompliantDataFrameAny
+ from narwhals._dask.expr import DaskExpr
+ from narwhals._dask.group_by import DaskLazyGroupBy
+ from narwhals._dask.namespace import DaskNamespace
+ from narwhals._utils import Version, _FullContext
+ from narwhals.dataframe import LazyFrame
+ from narwhals.dtypes import DType
+ from narwhals.typing import AsofJoinStrategy, JoinStrategy, LazyUniqueKeepStrategy
+
+
+class DaskLazyFrame(
+ CompliantLazyFrame["DaskExpr", "dd.DataFrame", "LazyFrame[dd.DataFrame]"]
+):
+ def __init__(
+ self,
+ native_dataframe: dd.DataFrame,
+ *,
+ backend_version: tuple[int, ...],
+ version: Version,
+ ) -> None:
+ self._native_frame: dd.DataFrame = native_dataframe
+ self._backend_version = backend_version
+ self._implementation = Implementation.DASK
+ self._version = version
+ self._cached_schema: dict[str, DType] | None = None
+ self._cached_columns: list[str] | None = None
+ validate_backend_version(self._implementation, self._backend_version)
+
+ @staticmethod
+ def _is_native(obj: dd.DataFrame | Any) -> TypeIs[dd.DataFrame]:
+ return isinstance(obj, dd.DataFrame)
+
+ @classmethod
+ def from_native(cls, data: dd.DataFrame, /, *, context: _FullContext) -> Self:
+ return cls(
+ data, backend_version=context._backend_version, version=context._version
+ )
+
+ def to_narwhals(self) -> LazyFrame[dd.DataFrame]:
+ return self._version.lazyframe(self, level="lazy")
+
+ def __native_namespace__(self) -> ModuleType:
+ if self._implementation is Implementation.DASK:
+ return self._implementation.to_native_namespace()
+
+ msg = f"Expected dask, got: {type(self._implementation)}" # pragma: no cover
+ raise AssertionError(msg)
+
+ def __narwhals_namespace__(self) -> DaskNamespace:
+ from narwhals._dask.namespace import DaskNamespace
+
+ return DaskNamespace(backend_version=self._backend_version, version=self._version)
+
+ def __narwhals_lazyframe__(self) -> Self:
+ return self
+
+ def _with_version(self, version: Version) -> Self:
+ return self.__class__(
+ self.native, backend_version=self._backend_version, version=version
+ )
+
+ def _with_native(self, df: Any) -> Self:
+ return self.__class__(
+ df, backend_version=self._backend_version, version=self._version
+ )
+
+ def _iter_columns(self) -> Iterator[dx.Series]:
+ for _col, ser in self.native.items(): # noqa: PERF102
+ yield ser
+
+ def with_columns(self, *exprs: DaskExpr) -> Self:
+ new_series = evaluate_exprs(self, *exprs)
+ return self._with_native(self.native.assign(**dict(new_series)))
+
+ def collect(
+ self, backend: Implementation | None, **kwargs: Any
+ ) -> CompliantDataFrameAny:
+ result = self.native.compute(**kwargs)
+
+ if backend is None or backend is Implementation.PANDAS:
+ from narwhals._pandas_like.dataframe import PandasLikeDataFrame
+
+ return PandasLikeDataFrame(
+ result,
+ implementation=Implementation.PANDAS,
+ backend_version=parse_version(pd),
+ version=self._version,
+ validate_column_names=True,
+ )
+
+ if backend is Implementation.POLARS:
+ import polars as pl # ignore-banned-import
+
+ from narwhals._polars.dataframe import PolarsDataFrame
+
+ return PolarsDataFrame(
+ pl.from_pandas(result),
+ backend_version=parse_version(pl),
+ version=self._version,
+ )
+
+ if backend is Implementation.PYARROW:
+ import pyarrow as pa # ignore-banned-import
+
+ from narwhals._arrow.dataframe import ArrowDataFrame
+
+ return ArrowDataFrame(
+ pa.Table.from_pandas(result),
+ backend_version=parse_version(pa),
+ version=self._version,
+ validate_column_names=True,
+ )
+
+ msg = f"Unsupported `backend` value: {backend}" # pragma: no cover
+ raise ValueError(msg) # pragma: no cover
+
+ @property
+ def columns(self) -> list[str]:
+ if self._cached_columns is None:
+ self._cached_columns = (
+ list(self.schema)
+ if self._cached_schema is not None
+ else self.native.columns.tolist()
+ )
+ return self._cached_columns
+
+ def filter(self, predicate: DaskExpr) -> Self:
+ # `[0]` is safe as the predicate's expression only returns a single column
+ mask = predicate(self)[0]
+ return self._with_native(self.native.loc[mask])
+
+ def simple_select(self, *column_names: str) -> Self:
+ native = select_columns_by_name(
+ self.native, list(column_names), self._backend_version, self._implementation
+ )
+ return self._with_native(native)
+
+ def aggregate(self, *exprs: DaskExpr) -> Self:
+ new_series = evaluate_exprs(self, *exprs)
+ df = dd.concat([val.rename(name) for name, val in new_series], axis=1)
+ return self._with_native(df)
+
+ def select(self, *exprs: DaskExpr) -> Self:
+ new_series = evaluate_exprs(self, *exprs)
+ df = select_columns_by_name(
+ self.native.assign(**dict(new_series)),
+ [s[0] for s in new_series],
+ self._backend_version,
+ self._implementation,
+ )
+ return self._with_native(df)
+
+ def drop_nulls(self, subset: Sequence[str] | None) -> Self:
+ if subset is None:
+ return self._with_native(self.native.dropna())
+ plx = self.__narwhals_namespace__()
+ return self.filter(~plx.any_horizontal(plx.col(*subset).is_null()))
+
+ @property
+ def schema(self) -> dict[str, DType]:
+ if self._cached_schema is None:
+ native_dtypes = self.native.dtypes
+ self._cached_schema = {
+ col: native_to_narwhals_dtype(
+ native_dtypes[col], self._version, self._implementation
+ )
+ for col in self.native.columns
+ }
+ return self._cached_schema
+
+ def collect_schema(self) -> dict[str, DType]:
+ return self.schema
+
+ def drop(self, columns: Sequence[str], *, strict: bool) -> Self:
+ to_drop = parse_columns_to_drop(self, columns, strict=strict)
+
+ return self._with_native(self.native.drop(columns=to_drop))
+
+ def with_row_index(self, name: str) -> Self:
+ # Implementation is based on the following StackOverflow reply:
+ # https://stackoverflow.com/questions/60831518/in-dask-how-does-one-add-a-range-of-integersauto-increment-to-a-new-column/60852409#60852409
+ return self._with_native(
+ add_row_index(self.native, name, self._backend_version, self._implementation)
+ )
+
+ def rename(self, mapping: Mapping[str, str]) -> Self:
+ return self._with_native(self.native.rename(columns=mapping))
+
+ def head(self, n: int) -> Self:
+ return self._with_native(self.native.head(n=n, compute=False, npartitions=-1))
+
+ def unique(
+ self, subset: Sequence[str] | None, *, keep: LazyUniqueKeepStrategy
+ ) -> Self:
+ if subset and (error := self._check_columns_exist(subset)):
+ raise error
+ if keep == "none":
+ subset = subset or self.columns
+ token = generate_temporary_column_name(n_bytes=8, columns=subset)
+ ser = self.native.groupby(subset).size().rename(token)
+ ser = ser[ser == 1]
+ unique = ser.reset_index().drop(columns=token)
+ result = self.native.merge(unique, on=subset, how="inner")
+ else:
+ mapped_keep = {"any": "first"}.get(keep, keep)
+ result = self.native.drop_duplicates(subset=subset, keep=mapped_keep)
+ return self._with_native(result)
+
+ def sort(self, *by: str, descending: bool | Sequence[bool], nulls_last: bool) -> Self:
+ if isinstance(descending, bool):
+ ascending: bool | list[bool] = not descending
+ else:
+ ascending = [not d for d in descending]
+ position = "last" if nulls_last else "first"
+ return self._with_native(
+ self.native.sort_values(list(by), ascending=ascending, na_position=position)
+ )
+
+ def join( # noqa: C901
+ self,
+ other: Self,
+ *,
+ how: JoinStrategy,
+ left_on: Sequence[str] | None,
+ right_on: Sequence[str] | None,
+ suffix: str,
+ ) -> Self:
+ if how == "cross":
+ key_token = generate_temporary_column_name(
+ n_bytes=8, columns=[*self.columns, *other.columns]
+ )
+
+ return self._with_native(
+ self.native.assign(**{key_token: 0})
+ .merge(
+ other.native.assign(**{key_token: 0}),
+ how="inner",
+ left_on=key_token,
+ right_on=key_token,
+ suffixes=("", suffix),
+ )
+ .drop(columns=key_token)
+ )
+
+ if how == "anti":
+ indicator_token = generate_temporary_column_name(
+ n_bytes=8, columns=[*self.columns, *other.columns]
+ )
+
+ if right_on is None: # pragma: no cover
+ msg = "`right_on` cannot be `None` in anti-join"
+ raise TypeError(msg)
+ other_native = (
+ select_columns_by_name(
+ other.native,
+ list(right_on),
+ self._backend_version,
+ self._implementation,
+ )
+ .rename( # rename to avoid creating extra columns in join
+ columns=dict(zip(right_on, left_on)) # type: ignore[arg-type]
+ )
+ .drop_duplicates()
+ )
+ df = self.native.merge(
+ other_native,
+ how="outer",
+ indicator=indicator_token, # pyright: ignore[reportArgumentType]
+ left_on=left_on,
+ right_on=left_on,
+ )
+ return self._with_native(
+ df[df[indicator_token] == "left_only"].drop(columns=[indicator_token])
+ )
+
+ if how == "semi":
+ if right_on is None: # pragma: no cover
+ msg = "`right_on` cannot be `None` in semi-join"
+ raise TypeError(msg)
+ other_native = (
+ select_columns_by_name(
+ other.native,
+ list(right_on),
+ self._backend_version,
+ self._implementation,
+ )
+ .rename( # rename to avoid creating extra columns in join
+ columns=dict(zip(right_on, left_on)) # type: ignore[arg-type]
+ )
+ .drop_duplicates() # avoids potential rows duplication from inner join
+ )
+ return self._with_native(
+ self.native.merge(
+ other_native, how="inner", left_on=left_on, right_on=left_on
+ )
+ )
+
+ if how == "left":
+ result_native = self.native.merge(
+ other.native,
+ how="left",
+ left_on=left_on,
+ right_on=right_on,
+ suffixes=("", suffix),
+ )
+ extra = []
+ for left_key, right_key in zip(left_on, right_on): # type: ignore[arg-type]
+ if right_key != left_key and right_key not in self.columns:
+ extra.append(right_key)
+ elif right_key != left_key:
+ extra.append(f"{right_key}_right")
+ return self._with_native(result_native.drop(columns=extra))
+
+ if how == "full":
+ # dask does not retain keys post-join
+ # we must append the suffix to each key before-hand
+
+ # help mypy
+ assert left_on is not None # noqa: S101
+ assert right_on is not None # noqa: S101
+
+ right_on_mapper = _remap_full_join_keys(left_on, right_on, suffix)
+ other_native = other.native.rename(columns=right_on_mapper)
+ check_column_names_are_unique(other_native.columns)
+ right_on = list(right_on_mapper.values()) # we now have the suffixed keys
+ return self._with_native(
+ self.native.merge(
+ other_native,
+ left_on=left_on,
+ right_on=right_on,
+ how="outer",
+ suffixes=("", suffix),
+ )
+ )
+
+ return self._with_native(
+ self.native.merge(
+ other.native,
+ left_on=left_on,
+ right_on=right_on,
+ how=how,
+ suffixes=("", suffix),
+ )
+ )
+
+ def join_asof(
+ self,
+ other: Self,
+ *,
+ left_on: str,
+ right_on: str,
+ by_left: Sequence[str] | None,
+ by_right: Sequence[str] | None,
+ strategy: AsofJoinStrategy,
+ suffix: str,
+ ) -> Self:
+ plx = self.__native_namespace__()
+ return self._with_native(
+ plx.merge_asof(
+ self.native,
+ other.native,
+ left_on=left_on,
+ right_on=right_on,
+ left_by=by_left,
+ right_by=by_right,
+ direction=strategy,
+ suffixes=("", suffix),
+ )
+ )
+
+ def group_by(
+ self, keys: Sequence[str] | Sequence[DaskExpr], *, drop_null_keys: bool
+ ) -> DaskLazyGroupBy:
+ from narwhals._dask.group_by import DaskLazyGroupBy
+
+ return DaskLazyGroupBy(self, keys, drop_null_keys=drop_null_keys)
+
+ def tail(self, n: int) -> Self: # pragma: no cover
+ native_frame = self.native
+ n_partitions = native_frame.npartitions
+
+ if n_partitions == 1:
+ return self._with_native(self.native.tail(n=n, compute=False))
+ else:
+ msg = "`LazyFrame.tail` is not supported for Dask backend with multiple partitions."
+ raise NotImplementedError(msg)
+
+ def gather_every(self, n: int, offset: int) -> Self:
+ row_index_token = generate_temporary_column_name(n_bytes=8, columns=self.columns)
+ plx = self.__narwhals_namespace__()
+ return (
+ self.with_row_index(row_index_token)
+ .filter(
+ (plx.col(row_index_token) >= offset)
+ & ((plx.col(row_index_token) - offset) % n == 0)
+ )
+ .drop([row_index_token], strict=False)
+ )
+
+ def unpivot(
+ self,
+ on: Sequence[str] | None,
+ index: Sequence[str] | None,
+ variable_name: str,
+ value_name: str,
+ ) -> Self:
+ return self._with_native(
+ self.native.melt(
+ id_vars=index,
+ value_vars=on,
+ var_name=variable_name,
+ value_name=value_name,
+ )
+ )
+
+ explode = not_implemented()
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/expr.py b/venv/lib/python3.8/site-packages/narwhals/_dask/expr.py
new file mode 100644
index 0000000..aa51997
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/expr.py
@@ -0,0 +1,675 @@
+from __future__ import annotations
+
+import warnings
+from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence
+
+from narwhals._compliant import LazyExpr
+from narwhals._compliant.expr import DepthTrackingExpr
+from narwhals._dask.expr_dt import DaskExprDateTimeNamespace
+from narwhals._dask.expr_str import DaskExprStringNamespace
+from narwhals._dask.utils import (
+ add_row_index,
+ maybe_evaluate_expr,
+ narwhals_to_native_dtype,
+)
+from narwhals._expression_parsing import ExprKind, evaluate_output_names_and_aliases
+from narwhals._pandas_like.utils import native_to_narwhals_dtype
+from narwhals._utils import (
+ Implementation,
+ generate_temporary_column_name,
+ not_implemented,
+)
+from narwhals.exceptions import InvalidOperationError
+
+if TYPE_CHECKING:
+ import dask.dataframe.dask_expr as dx
+ from typing_extensions import Self
+
+ from narwhals._compliant.typing import AliasNames, EvalNames, EvalSeries, ScalarKwargs
+ from narwhals._dask.dataframe import DaskLazyFrame
+ from narwhals._dask.namespace import DaskNamespace
+ from narwhals._expression_parsing import ExprKind, ExprMetadata
+ from narwhals._utils import Version, _FullContext
+ from narwhals.typing import (
+ FillNullStrategy,
+ IntoDType,
+ NonNestedLiteral,
+ NumericLiteral,
+ RollingInterpolationMethod,
+ TemporalLiteral,
+ )
+
+
+class DaskExpr(
+ LazyExpr["DaskLazyFrame", "dx.Series"],
+ DepthTrackingExpr["DaskLazyFrame", "dx.Series"],
+):
+ _implementation: Implementation = Implementation.DASK
+
+ def __init__(
+ self,
+ call: EvalSeries[DaskLazyFrame, dx.Series],
+ *,
+ depth: int,
+ function_name: str,
+ evaluate_output_names: EvalNames[DaskLazyFrame],
+ alias_output_names: AliasNames | None,
+ backend_version: tuple[int, ...],
+ version: Version,
+ scalar_kwargs: ScalarKwargs | None = None,
+ ) -> None:
+ self._call = call
+ self._depth = depth
+ self._function_name = function_name
+ self._evaluate_output_names = evaluate_output_names
+ self._alias_output_names = alias_output_names
+ self._backend_version = backend_version
+ self._version = version
+ self._scalar_kwargs = scalar_kwargs or {}
+ self._metadata: ExprMetadata | None = None
+
+ def __call__(self, df: DaskLazyFrame) -> Sequence[dx.Series]:
+ return self._call(df)
+
+ def __narwhals_expr__(self) -> None: ...
+
+ def __narwhals_namespace__(self) -> DaskNamespace: # pragma: no cover
+ # Unused, just for compatibility with PandasLikeExpr
+ from narwhals._dask.namespace import DaskNamespace
+
+ return DaskNamespace(backend_version=self._backend_version, version=self._version)
+
+ def broadcast(self, kind: Literal[ExprKind.AGGREGATION, ExprKind.LITERAL]) -> Self:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ # result.loc[0][0] is a workaround for dask~<=2024.10.0/dask_expr~<=1.1.16
+ # that raised a KeyErrror for result[0] during collection.
+ return [result.loc[0][0] for result in self(df)]
+
+ return self.__class__(
+ func,
+ depth=self._depth,
+ function_name=self._function_name,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ scalar_kwargs=self._scalar_kwargs,
+ )
+
+ @classmethod
+ def from_column_names(
+ cls: type[Self],
+ evaluate_column_names: EvalNames[DaskLazyFrame],
+ /,
+ *,
+ context: _FullContext,
+ function_name: str = "",
+ ) -> Self:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ try:
+ return [
+ df._native_frame[column_name]
+ for column_name in evaluate_column_names(df)
+ ]
+ except KeyError as e:
+ if error := df._check_columns_exist(evaluate_column_names(df)):
+ raise error from e
+ raise
+
+ return cls(
+ func,
+ depth=0,
+ function_name=function_name,
+ evaluate_output_names=evaluate_column_names,
+ alias_output_names=None,
+ backend_version=context._backend_version,
+ version=context._version,
+ )
+
+ @classmethod
+ def from_column_indices(cls, *column_indices: int, context: _FullContext) -> Self:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ return [df.native.iloc[:, i] for i in column_indices]
+
+ return cls(
+ func,
+ depth=0,
+ function_name="nth",
+ evaluate_output_names=cls._eval_names_indices(column_indices),
+ alias_output_names=None,
+ backend_version=context._backend_version,
+ version=context._version,
+ )
+
+ def _with_callable(
+ self,
+ # First argument to `call` should be `dx.Series`
+ call: Callable[..., dx.Series],
+ /,
+ expr_name: str = "",
+ scalar_kwargs: ScalarKwargs | None = None,
+ **expressifiable_args: Self | Any,
+ ) -> Self:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ native_results: list[dx.Series] = []
+ native_series_list = self._call(df)
+ other_native_series = {
+ key: maybe_evaluate_expr(df, value)
+ for key, value in expressifiable_args.items()
+ }
+ for native_series in native_series_list:
+ result_native = call(native_series, **other_native_series)
+ native_results.append(result_native)
+ return native_results
+
+ return self.__class__(
+ func,
+ depth=self._depth + 1,
+ function_name=f"{self._function_name}->{expr_name}",
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ scalar_kwargs=scalar_kwargs,
+ )
+
+ def _with_alias_output_names(self, func: AliasNames | None, /) -> Self:
+ return type(self)(
+ call=self._call,
+ depth=self._depth,
+ function_name=self._function_name,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=func,
+ backend_version=self._backend_version,
+ version=self._version,
+ scalar_kwargs=self._scalar_kwargs,
+ )
+
+ def __add__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__add__(other), "__add__", other=other
+ )
+
+ def __sub__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__sub__(other), "__sub__", other=other
+ )
+
+ def __rsub__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: other - expr, "__rsub__", other=other
+ ).alias("literal")
+
+ def __mul__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__mul__(other), "__mul__", other=other
+ )
+
+ def __truediv__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__truediv__(other), "__truediv__", other=other
+ )
+
+ def __rtruediv__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: other / expr, "__rtruediv__", other=other
+ ).alias("literal")
+
+ def __floordiv__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__floordiv__(other), "__floordiv__", other=other
+ )
+
+ def __rfloordiv__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: other // expr, "__rfloordiv__", other=other
+ ).alias("literal")
+
+ def __pow__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__pow__(other), "__pow__", other=other
+ )
+
+ def __rpow__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: other**expr, "__rpow__", other=other
+ ).alias("literal")
+
+ def __mod__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__mod__(other), "__mod__", other=other
+ )
+
+ def __rmod__(self, other: Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: other % expr, "__rmod__", other=other
+ ).alias("literal")
+
+ def __eq__(self, other: DaskExpr) -> Self: # type: ignore[override]
+ return self._with_callable(
+ lambda expr, other: expr.__eq__(other), "__eq__", other=other
+ )
+
+ def __ne__(self, other: DaskExpr) -> Self: # type: ignore[override]
+ return self._with_callable(
+ lambda expr, other: expr.__ne__(other), "__ne__", other=other
+ )
+
+ def __ge__(self, other: DaskExpr | Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__ge__(other), "__ge__", other=other
+ )
+
+ def __gt__(self, other: DaskExpr) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__gt__(other), "__gt__", other=other
+ )
+
+ def __le__(self, other: DaskExpr) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__le__(other), "__le__", other=other
+ )
+
+ def __lt__(self, other: DaskExpr) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__lt__(other), "__lt__", other=other
+ )
+
+ def __and__(self, other: DaskExpr | Any) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__and__(other), "__and__", other=other
+ )
+
+ def __or__(self, other: DaskExpr) -> Self:
+ return self._with_callable(
+ lambda expr, other: expr.__or__(other), "__or__", other=other
+ )
+
+ def __invert__(self) -> Self:
+ return self._with_callable(lambda expr: expr.__invert__(), "__invert__")
+
+ def mean(self) -> Self:
+ return self._with_callable(lambda expr: expr.mean().to_series(), "mean")
+
+ def median(self) -> Self:
+ from narwhals.exceptions import InvalidOperationError
+
+ def func(s: dx.Series) -> dx.Series:
+ dtype = native_to_narwhals_dtype(s.dtype, self._version, Implementation.DASK)
+ if not dtype.is_numeric():
+ msg = "`median` operation not supported for non-numeric input type."
+ raise InvalidOperationError(msg)
+ return s.median_approximate().to_series()
+
+ return self._with_callable(func, "median")
+
+ def min(self) -> Self:
+ return self._with_callable(lambda expr: expr.min().to_series(), "min")
+
+ def max(self) -> Self:
+ return self._with_callable(lambda expr: expr.max().to_series(), "max")
+
+ def std(self, ddof: int) -> Self:
+ return self._with_callable(
+ lambda expr: expr.std(ddof=ddof).to_series(),
+ "std",
+ scalar_kwargs={"ddof": ddof},
+ )
+
+ def var(self, ddof: int) -> Self:
+ return self._with_callable(
+ lambda expr: expr.var(ddof=ddof).to_series(),
+ "var",
+ scalar_kwargs={"ddof": ddof},
+ )
+
+ def skew(self) -> Self:
+ return self._with_callable(lambda expr: expr.skew().to_series(), "skew")
+
+ def shift(self, n: int) -> Self:
+ return self._with_callable(lambda expr: expr.shift(n), "shift")
+
+ def cum_sum(self, *, reverse: bool) -> Self:
+ if reverse: # pragma: no cover
+ # https://github.com/dask/dask/issues/11802
+ msg = "`cum_sum(reverse=True)` is not supported with Dask backend"
+ raise NotImplementedError(msg)
+
+ return self._with_callable(lambda expr: expr.cumsum(), "cum_sum")
+
+ def cum_count(self, *, reverse: bool) -> Self:
+ if reverse: # pragma: no cover
+ msg = "`cum_count(reverse=True)` is not supported with Dask backend"
+ raise NotImplementedError(msg)
+
+ return self._with_callable(
+ lambda expr: (~expr.isna()).astype(int).cumsum(), "cum_count"
+ )
+
+ def cum_min(self, *, reverse: bool) -> Self:
+ if reverse: # pragma: no cover
+ msg = "`cum_min(reverse=True)` is not supported with Dask backend"
+ raise NotImplementedError(msg)
+
+ return self._with_callable(lambda expr: expr.cummin(), "cum_min")
+
+ def cum_max(self, *, reverse: bool) -> Self:
+ if reverse: # pragma: no cover
+ msg = "`cum_max(reverse=True)` is not supported with Dask backend"
+ raise NotImplementedError(msg)
+
+ return self._with_callable(lambda expr: expr.cummax(), "cum_max")
+
+ def cum_prod(self, *, reverse: bool) -> Self:
+ if reverse: # pragma: no cover
+ msg = "`cum_prod(reverse=True)` is not supported with Dask backend"
+ raise NotImplementedError(msg)
+
+ return self._with_callable(lambda expr: expr.cumprod(), "cum_prod")
+
+ def rolling_sum(self, window_size: int, *, min_samples: int, center: bool) -> Self:
+ return self._with_callable(
+ lambda expr: expr.rolling(
+ window=window_size, min_periods=min_samples, center=center
+ ).sum(),
+ "rolling_sum",
+ )
+
+ def rolling_mean(self, window_size: int, *, min_samples: int, center: bool) -> Self:
+ return self._with_callable(
+ lambda expr: expr.rolling(
+ window=window_size, min_periods=min_samples, center=center
+ ).mean(),
+ "rolling_mean",
+ )
+
+ def rolling_var(
+ self, window_size: int, *, min_samples: int, center: bool, ddof: int
+ ) -> Self:
+ if ddof == 1:
+ return self._with_callable(
+ lambda expr: expr.rolling(
+ window=window_size, min_periods=min_samples, center=center
+ ).var(),
+ "rolling_var",
+ )
+ else:
+ msg = "Dask backend only supports `ddof=1` for `rolling_var`"
+ raise NotImplementedError(msg)
+
+ def rolling_std(
+ self, window_size: int, *, min_samples: int, center: bool, ddof: int
+ ) -> Self:
+ if ddof == 1:
+ return self._with_callable(
+ lambda expr: expr.rolling(
+ window=window_size, min_periods=min_samples, center=center
+ ).std(),
+ "rolling_std",
+ )
+ else:
+ msg = "Dask backend only supports `ddof=1` for `rolling_std`"
+ raise NotImplementedError(msg)
+
+ def sum(self) -> Self:
+ return self._with_callable(lambda expr: expr.sum().to_series(), "sum")
+
+ def count(self) -> Self:
+ return self._with_callable(lambda expr: expr.count().to_series(), "count")
+
+ def round(self, decimals: int) -> Self:
+ return self._with_callable(lambda expr: expr.round(decimals), "round")
+
+ def unique(self) -> Self:
+ return self._with_callable(lambda expr: expr.unique(), "unique")
+
+ def drop_nulls(self) -> Self:
+ return self._with_callable(lambda expr: expr.dropna(), "drop_nulls")
+
+ def abs(self) -> Self:
+ return self._with_callable(lambda expr: expr.abs(), "abs")
+
+ def all(self) -> Self:
+ return self._with_callable(
+ lambda expr: expr.all(
+ axis=None, skipna=True, split_every=False, out=None
+ ).to_series(),
+ "all",
+ )
+
+ def any(self) -> Self:
+ return self._with_callable(
+ lambda expr: expr.any(axis=0, skipna=True, split_every=False).to_series(),
+ "any",
+ )
+
+ def fill_null(
+ self,
+ value: Self | NonNestedLiteral,
+ strategy: FillNullStrategy | None,
+ limit: int | None,
+ ) -> Self:
+ def func(expr: dx.Series) -> dx.Series:
+ if value is not None:
+ res_ser = expr.fillna(value)
+ else:
+ res_ser = (
+ expr.ffill(limit=limit)
+ if strategy == "forward"
+ else expr.bfill(limit=limit)
+ )
+ return res_ser
+
+ return self._with_callable(func, "fillna")
+
+ def clip(
+ self,
+ lower_bound: Self | NumericLiteral | TemporalLiteral | None,
+ upper_bound: Self | NumericLiteral | TemporalLiteral | None,
+ ) -> Self:
+ return self._with_callable(
+ lambda expr, lower_bound, upper_bound: expr.clip(
+ lower=lower_bound, upper=upper_bound
+ ),
+ "clip",
+ lower_bound=lower_bound,
+ upper_bound=upper_bound,
+ )
+
+ def diff(self) -> Self:
+ return self._with_callable(lambda expr: expr.diff(), "diff")
+
+ def n_unique(self) -> Self:
+ return self._with_callable(
+ lambda expr: expr.nunique(dropna=False).to_series(), "n_unique"
+ )
+
+ def is_null(self) -> Self:
+ return self._with_callable(lambda expr: expr.isna(), "is_null")
+
+ def is_nan(self) -> Self:
+ def func(expr: dx.Series) -> dx.Series:
+ dtype = native_to_narwhals_dtype(
+ expr.dtype, self._version, self._implementation
+ )
+ if dtype.is_numeric():
+ return expr != expr # pyright: ignore[reportReturnType] # noqa: PLR0124
+ msg = f"`.is_nan` only supported for numeric dtypes and not {dtype}, did you mean `.is_null`?"
+ raise InvalidOperationError(msg)
+
+ return self._with_callable(func, "is_null")
+
+ def len(self) -> Self:
+ return self._with_callable(lambda expr: expr.size.to_series(), "len")
+
+ def quantile(
+ self, quantile: float, interpolation: RollingInterpolationMethod
+ ) -> Self:
+ if interpolation == "linear":
+
+ def func(expr: dx.Series, quantile: float) -> dx.Series:
+ if expr.npartitions > 1:
+ msg = "`Expr.quantile` is not supported for Dask backend with multiple partitions."
+ raise NotImplementedError(msg)
+ return expr.quantile(
+ q=quantile, method="dask"
+ ).to_series() # pragma: no cover
+
+ return self._with_callable(func, "quantile", quantile=quantile)
+ else:
+ msg = "`higher`, `lower`, `midpoint`, `nearest` - interpolation methods are not supported by Dask. Please use `linear` instead."
+ raise NotImplementedError(msg)
+
+ def is_first_distinct(self) -> Self:
+ def func(expr: dx.Series) -> dx.Series:
+ _name = expr.name
+ col_token = generate_temporary_column_name(n_bytes=8, columns=[_name])
+ frame = add_row_index(
+ expr.to_frame(), col_token, self._backend_version, self._implementation
+ )
+ first_distinct_index = frame.groupby(_name).agg({col_token: "min"})[col_token]
+ return frame[col_token].isin(first_distinct_index)
+
+ return self._with_callable(func, "is_first_distinct")
+
+ def is_last_distinct(self) -> Self:
+ def func(expr: dx.Series) -> dx.Series:
+ _name = expr.name
+ col_token = generate_temporary_column_name(n_bytes=8, columns=[_name])
+ frame = add_row_index(
+ expr.to_frame(), col_token, self._backend_version, self._implementation
+ )
+ last_distinct_index = frame.groupby(_name).agg({col_token: "max"})[col_token]
+ return frame[col_token].isin(last_distinct_index)
+
+ return self._with_callable(func, "is_last_distinct")
+
+ def is_unique(self) -> Self:
+ def func(expr: dx.Series) -> dx.Series:
+ _name = expr.name
+ return (
+ expr.to_frame()
+ .groupby(_name, dropna=False)
+ .transform("size", meta=(_name, int))
+ == 1
+ )
+
+ return self._with_callable(func, "is_unique")
+
+ def is_in(self, other: Any) -> Self:
+ return self._with_callable(lambda expr: expr.isin(other), "is_in")
+
+ def null_count(self) -> Self:
+ return self._with_callable(
+ lambda expr: expr.isna().sum().to_series(), "null_count"
+ )
+
+ def over(self, partition_by: Sequence[str], order_by: Sequence[str]) -> Self:
+ # pandas is a required dependency of dask so it's safe to import this
+ from narwhals._pandas_like.group_by import PandasLikeGroupBy
+
+ if not partition_by:
+ assert order_by # noqa: S101
+
+ # This is something like `nw.col('a').cum_sum().order_by(key)`
+ # which we can always easily support, as it doesn't require grouping.
+ def func(df: DaskLazyFrame) -> Sequence[dx.Series]:
+ return self(df.sort(*order_by, descending=False, nulls_last=False))
+ elif not self._is_elementary(): # pragma: no cover
+ msg = (
+ "Only elementary expressions are supported for `.over` in dask.\n\n"
+ "Please see: "
+ "https://narwhals-dev.github.io/narwhals/concepts/improve_group_by_operation/"
+ )
+ raise NotImplementedError(msg)
+ elif order_by:
+ # Wrong results https://github.com/dask/dask/issues/11806.
+ msg = "`over` with `order_by` is not yet supported in Dask."
+ raise NotImplementedError(msg)
+ else:
+ function_name = PandasLikeGroupBy._leaf_name(self)
+ try:
+ dask_function_name = PandasLikeGroupBy._REMAP_AGGS[function_name]
+ except KeyError:
+ # window functions are unsupported: https://github.com/dask/dask/issues/11806
+ msg = (
+ f"Unsupported function: {function_name} in `over` context.\n\n"
+ f"Supported functions are {', '.join(PandasLikeGroupBy._REMAP_AGGS)}\n"
+ )
+ raise NotImplementedError(msg) from None
+
+ def func(df: DaskLazyFrame) -> Sequence[dx.Series]:
+ output_names, aliases = evaluate_output_names_and_aliases(self, df, [])
+
+ with warnings.catch_warnings():
+ # https://github.com/dask/dask/issues/11804
+ warnings.filterwarnings(
+ "ignore",
+ message=".*`meta` is not specified",
+ category=UserWarning,
+ )
+ grouped = df.native.groupby(partition_by)
+ if dask_function_name == "size":
+ if len(output_names) != 1: # pragma: no cover
+ msg = "Safety check failed, please report a bug."
+ raise AssertionError(msg)
+ res_native = grouped.transform(
+ dask_function_name, **self._scalar_kwargs
+ ).to_frame(output_names[0])
+ else:
+ res_native = grouped[list(output_names)].transform(
+ dask_function_name, **self._scalar_kwargs
+ )
+ result_frame = df._with_native(
+ res_native.rename(columns=dict(zip(output_names, aliases)))
+ ).native
+ return [result_frame[name] for name in aliases]
+
+ return self.__class__(
+ func,
+ depth=self._depth + 1,
+ function_name=self._function_name + "->over",
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def cast(self, dtype: IntoDType) -> Self:
+ def func(expr: dx.Series) -> dx.Series:
+ native_dtype = narwhals_to_native_dtype(dtype, self._version)
+ return expr.astype(native_dtype)
+
+ return self._with_callable(func, "cast")
+
+ def is_finite(self) -> Self:
+ import dask.array as da
+
+ return self._with_callable(da.isfinite, "is_finite")
+
+ def log(self, base: float) -> Self:
+ import dask.array as da
+
+ def _log(expr: dx.Series) -> dx.Series:
+ return da.log(expr) / da.log(base)
+
+ return self._with_callable(_log, "log")
+
+ def exp(self) -> Self:
+ import dask.array as da
+
+ return self._with_callable(da.exp, "exp")
+
+ @property
+ def str(self) -> DaskExprStringNamespace:
+ return DaskExprStringNamespace(self)
+
+ @property
+ def dt(self) -> DaskExprDateTimeNamespace:
+ return DaskExprDateTimeNamespace(self)
+
+ list = not_implemented() # pyright: ignore[reportAssignmentType]
+ struct = not_implemented() # pyright: ignore[reportAssignmentType]
+ rank = not_implemented() # pyright: ignore[reportAssignmentType]
+ _alias_native = not_implemented()
+ window_function = not_implemented() # pyright: ignore[reportAssignmentType]
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/expr_dt.py b/venv/lib/python3.8/site-packages/narwhals/_dask/expr_dt.py
new file mode 100644
index 0000000..14481a3
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/expr_dt.py
@@ -0,0 +1,162 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from narwhals._duration import parse_interval_string
+from narwhals._pandas_like.utils import (
+ UNIT_DICT,
+ calculate_timestamp_date,
+ calculate_timestamp_datetime,
+ native_to_narwhals_dtype,
+)
+from narwhals._utils import Implementation
+
+if TYPE_CHECKING:
+ import dask.dataframe.dask_expr as dx
+
+ from narwhals._dask.expr import DaskExpr
+ from narwhals.typing import TimeUnit
+
+
+class DaskExprDateTimeNamespace:
+ def __init__(self, expr: DaskExpr) -> None:
+ self._compliant_expr = expr
+
+ def date(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.dt.date, "date")
+
+ def year(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.dt.year, "year")
+
+ def month(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.dt.month, "month")
+
+ def day(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.dt.day, "day")
+
+ def hour(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.dt.hour, "hour")
+
+ def minute(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.dt.minute, "minute")
+
+ def second(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.dt.second, "second")
+
+ def millisecond(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.microsecond // 1000, "millisecond"
+ )
+
+ def microsecond(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.microsecond, "microsecond"
+ )
+
+ def nanosecond(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.microsecond * 1000 + expr.dt.nanosecond, "nanosecond"
+ )
+
+ def ordinal_day(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.dayofyear, "ordinal_day"
+ )
+
+ def weekday(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.weekday + 1, # Dask is 0-6
+ "weekday",
+ )
+
+ def to_string(self, format: str) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, format: expr.dt.strftime(format.replace("%.f", ".%f")),
+ "strftime",
+ format=format,
+ )
+
+ def replace_time_zone(self, time_zone: str | None) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, time_zone: expr.dt.tz_localize(None).dt.tz_localize(time_zone)
+ if time_zone is not None
+ else expr.dt.tz_localize(None),
+ "tz_localize",
+ time_zone=time_zone,
+ )
+
+ def convert_time_zone(self, time_zone: str) -> DaskExpr:
+ def func(s: dx.Series, time_zone: str) -> dx.Series:
+ dtype = native_to_narwhals_dtype(
+ s.dtype, self._compliant_expr._version, Implementation.DASK
+ )
+ if dtype.time_zone is None: # type: ignore[attr-defined]
+ return s.dt.tz_localize("UTC").dt.tz_convert(time_zone) # pyright: ignore[reportAttributeAccessIssue]
+ else:
+ return s.dt.tz_convert(time_zone) # pyright: ignore[reportAttributeAccessIssue]
+
+ return self._compliant_expr._with_callable(
+ func, "tz_convert", time_zone=time_zone
+ )
+
+ def timestamp(self, time_unit: TimeUnit) -> DaskExpr:
+ def func(s: dx.Series, time_unit: TimeUnit) -> dx.Series:
+ dtype = native_to_narwhals_dtype(
+ s.dtype, self._compliant_expr._version, Implementation.DASK
+ )
+ is_pyarrow_dtype = "pyarrow" in str(dtype)
+ mask_na = s.isna()
+ dtypes = self._compliant_expr._version.dtypes
+ if dtype == dtypes.Date:
+ # Date is only supported in pandas dtypes if pyarrow-backed
+ s_cast = s.astype("Int32[pyarrow]")
+ result = calculate_timestamp_date(s_cast, time_unit)
+ elif isinstance(dtype, dtypes.Datetime):
+ original_time_unit = dtype.time_unit
+ s_cast = (
+ s.astype("Int64[pyarrow]") if is_pyarrow_dtype else s.astype("int64")
+ )
+ result = calculate_timestamp_datetime(
+ s_cast, original_time_unit, time_unit
+ )
+ else:
+ msg = "Input should be either of Date or Datetime type"
+ raise TypeError(msg)
+ return result.where(~mask_na) # pyright: ignore[reportReturnType]
+
+ return self._compliant_expr._with_callable(func, "datetime", time_unit=time_unit)
+
+ def total_minutes(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.total_seconds() // 60, "total_minutes"
+ )
+
+ def total_seconds(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.total_seconds() // 1, "total_seconds"
+ )
+
+ def total_milliseconds(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.total_seconds() * 1000 // 1, "total_milliseconds"
+ )
+
+ def total_microseconds(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.total_seconds() * 1_000_000 // 1, "total_microseconds"
+ )
+
+ def total_nanoseconds(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.total_seconds() * 1_000_000_000 // 1, "total_nanoseconds"
+ )
+
+ def truncate(self, every: str) -> DaskExpr:
+ multiple, unit = parse_interval_string(every)
+ if unit in {"mo", "q", "y"}:
+ msg = f"Truncating to {unit} is not supported yet for dask."
+ raise NotImplementedError(msg)
+ freq = f"{multiple}{UNIT_DICT.get(unit, unit)}"
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.dt.floor(freq), "truncate"
+ )
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/expr_str.py b/venv/lib/python3.8/site-packages/narwhals/_dask/expr_str.py
new file mode 100644
index 0000000..b770b53
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/expr_str.py
@@ -0,0 +1,98 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import dask.dataframe as dd
+
+if TYPE_CHECKING:
+ from narwhals._dask.expr import DaskExpr
+
+
+class DaskExprStringNamespace:
+ def __init__(self, expr: DaskExpr) -> None:
+ self._compliant_expr = expr
+
+ def len_chars(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.str.len(), "len")
+
+ def replace(self, pattern: str, value: str, *, literal: bool, n: int) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, pattern, value, literal, n: expr.str.replace(
+ pattern, value, regex=not literal, n=n
+ ),
+ "replace",
+ pattern=pattern,
+ value=value,
+ literal=literal,
+ n=n,
+ )
+
+ def replace_all(self, pattern: str, value: str, *, literal: bool) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, pattern, value, literal: expr.str.replace(
+ pattern, value, n=-1, regex=not literal
+ ),
+ "replace",
+ pattern=pattern,
+ value=value,
+ literal=literal,
+ )
+
+ def strip_chars(self, characters: str | None) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, characters: expr.str.strip(characters),
+ "strip",
+ characters=characters,
+ )
+
+ def starts_with(self, prefix: str) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, prefix: expr.str.startswith(prefix), "starts_with", prefix=prefix
+ )
+
+ def ends_with(self, suffix: str) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, suffix: expr.str.endswith(suffix), "ends_with", suffix=suffix
+ )
+
+ def contains(self, pattern: str, *, literal: bool) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, pattern, literal: expr.str.contains(
+ pat=pattern, regex=not literal
+ ),
+ "contains",
+ pattern=pattern,
+ literal=literal,
+ )
+
+ def slice(self, offset: int, length: int | None) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, offset, length: expr.str.slice(
+ start=offset, stop=offset + length if length else None
+ ),
+ "slice",
+ offset=offset,
+ length=length,
+ )
+
+ def split(self, by: str) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, by: expr.str.split(pat=by), "split", by=by
+ )
+
+ def to_datetime(self, format: str | None) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr, format: dd.to_datetime(expr, format=format),
+ "to_datetime",
+ format=format,
+ )
+
+ def to_uppercase(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.str.upper(), "to_uppercase"
+ )
+
+ def to_lowercase(self) -> DaskExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: expr.str.lower(), "to_lowercase"
+ )
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/group_by.py b/venv/lib/python3.8/site-packages/narwhals/_dask/group_by.py
new file mode 100644
index 0000000..d71c3fa
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/group_by.py
@@ -0,0 +1,122 @@
+from __future__ import annotations
+
+from functools import partial
+from typing import TYPE_CHECKING, Any, Callable, ClassVar, Mapping, Sequence
+
+import dask.dataframe as dd
+
+from narwhals._compliant import DepthTrackingGroupBy
+from narwhals._expression_parsing import evaluate_output_names_and_aliases
+
+if TYPE_CHECKING:
+ import pandas as pd
+ from dask.dataframe.api import GroupBy as _DaskGroupBy
+ from pandas.core.groupby import SeriesGroupBy as _PandasSeriesGroupBy
+ from typing_extensions import TypeAlias
+
+ from narwhals._compliant.group_by import NarwhalsAggregation
+ from narwhals._dask.dataframe import DaskLazyFrame
+ from narwhals._dask.expr import DaskExpr
+
+ PandasSeriesGroupBy: TypeAlias = _PandasSeriesGroupBy[Any, Any]
+ _AggFn: TypeAlias = Callable[..., Any]
+
+else:
+ try:
+ import dask.dataframe.dask_expr as dx
+ except ModuleNotFoundError: # pragma: no cover
+ import dask_expr as dx
+ _DaskGroupBy = dx._groupby.GroupBy
+
+Aggregation: TypeAlias = "str | _AggFn"
+"""The name of an aggregation function, or the function itself."""
+
+
+def n_unique() -> dd.Aggregation:
+ def chunk(s: PandasSeriesGroupBy) -> pd.Series[Any]:
+ return s.nunique(dropna=False)
+
+ def agg(s0: PandasSeriesGroupBy) -> pd.Series[Any]:
+ return s0.sum()
+
+ return dd.Aggregation(name="nunique", chunk=chunk, agg=agg)
+
+
+def var(ddof: int) -> _AggFn:
+ return partial(_DaskGroupBy.var, ddof=ddof)
+
+
+def std(ddof: int) -> _AggFn:
+ return partial(_DaskGroupBy.std, ddof=ddof)
+
+
+class DaskLazyGroupBy(DepthTrackingGroupBy["DaskLazyFrame", "DaskExpr", Aggregation]):
+ _REMAP_AGGS: ClassVar[Mapping[NarwhalsAggregation, Aggregation]] = {
+ "sum": "sum",
+ "mean": "mean",
+ "median": "median",
+ "max": "max",
+ "min": "min",
+ "std": std,
+ "var": var,
+ "len": "size",
+ "n_unique": n_unique,
+ "count": "count",
+ }
+
+ def __init__(
+ self,
+ df: DaskLazyFrame,
+ keys: Sequence[DaskExpr] | Sequence[str],
+ /,
+ *,
+ drop_null_keys: bool,
+ ) -> None:
+ self._compliant_frame, self._keys, self._output_key_names = self._parse_keys(
+ df, keys=keys
+ )
+ self._grouped = self.compliant.native.groupby(
+ self._keys, dropna=drop_null_keys, observed=True
+ )
+
+ def agg(self, *exprs: DaskExpr) -> DaskLazyFrame:
+ from narwhals._dask.dataframe import DaskLazyFrame
+
+ if not exprs:
+ # No aggregation provided
+ return (
+ self.compliant.simple_select(*self._keys)
+ .unique(self._keys, keep="any")
+ .rename(dict(zip(self._keys, self._output_key_names)))
+ )
+
+ self._ensure_all_simple(exprs)
+ # This should be the fastpath, but cuDF is too far behind to use it.
+ # - https://github.com/rapidsai/cudf/issues/15118
+ # - https://github.com/rapidsai/cudf/issues/15084
+ simple_aggregations: dict[str, tuple[str, Aggregation]] = {}
+ exclude = (*self._keys, *self._output_key_names)
+ for expr in exprs:
+ output_names, aliases = evaluate_output_names_and_aliases(
+ expr, self.compliant, exclude
+ )
+ if expr._depth == 0:
+ # e.g. `agg(nw.len())`
+ column = self._keys[0]
+ agg_fn = self._remap_expr_name(expr._function_name)
+ simple_aggregations.update(dict.fromkeys(aliases, (column, agg_fn)))
+ continue
+
+ # e.g. `agg(nw.mean('a'))`
+ agg_fn = self._remap_expr_name(self._leaf_name(expr))
+ # deal with n_unique case in a "lazy" mode to not depend on dask globally
+ agg_fn = agg_fn(**expr._scalar_kwargs) if callable(agg_fn) else agg_fn
+ simple_aggregations.update(
+ (alias, (output_name, agg_fn))
+ for alias, output_name in zip(aliases, output_names)
+ )
+ return DaskLazyFrame(
+ self._grouped.agg(**simple_aggregations).reset_index(),
+ backend_version=self.compliant._backend_version,
+ version=self.compliant._version,
+ ).rename(dict(zip(self._keys, self._output_key_names)))
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/namespace.py b/venv/lib/python3.8/site-packages/narwhals/_dask/namespace.py
new file mode 100644
index 0000000..3e0506d
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/namespace.py
@@ -0,0 +1,320 @@
+from __future__ import annotations
+
+import operator
+from functools import reduce
+from typing import TYPE_CHECKING, Iterable, Sequence, cast
+
+import dask.dataframe as dd
+import pandas as pd
+
+from narwhals._compliant import CompliantThen, CompliantWhen, LazyNamespace
+from narwhals._compliant.namespace import DepthTrackingNamespace
+from narwhals._dask.dataframe import DaskLazyFrame
+from narwhals._dask.expr import DaskExpr
+from narwhals._dask.selectors import DaskSelectorNamespace
+from narwhals._dask.utils import (
+ align_series_full_broadcast,
+ narwhals_to_native_dtype,
+ validate_comparand,
+)
+from narwhals._expression_parsing import (
+ ExprKind,
+ combine_alias_output_names,
+ combine_evaluate_output_names,
+)
+from narwhals._utils import Implementation
+
+if TYPE_CHECKING:
+ import dask.dataframe.dask_expr as dx
+
+ from narwhals._utils import Version
+ from narwhals.typing import ConcatMethod, IntoDType, NonNestedLiteral
+
+
+class DaskNamespace(
+ LazyNamespace[DaskLazyFrame, DaskExpr, dd.DataFrame],
+ DepthTrackingNamespace[DaskLazyFrame, DaskExpr],
+):
+ _implementation: Implementation = Implementation.DASK
+
+ @property
+ def selectors(self) -> DaskSelectorNamespace:
+ return DaskSelectorNamespace.from_namespace(self)
+
+ @property
+ def _expr(self) -> type[DaskExpr]:
+ return DaskExpr
+
+ @property
+ def _lazyframe(self) -> type[DaskLazyFrame]:
+ return DaskLazyFrame
+
+ def __init__(self, *, backend_version: tuple[int, ...], version: Version) -> None:
+ self._backend_version = backend_version
+ self._version = version
+
+ def lit(self, value: NonNestedLiteral, dtype: IntoDType | None) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ if dtype is not None:
+ native_dtype = narwhals_to_native_dtype(dtype, self._version)
+ native_pd_series = pd.Series([value], dtype=native_dtype, name="literal")
+ else:
+ native_pd_series = pd.Series([value], name="literal")
+ npartitions = df._native_frame.npartitions
+ dask_series = dd.from_pandas(native_pd_series, npartitions=npartitions)
+ return [dask_series[0].to_series()]
+
+ return self._expr(
+ func,
+ depth=0,
+ function_name="lit",
+ evaluate_output_names=lambda _df: ["literal"],
+ alias_output_names=None,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def len(self) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ # We don't allow dataframes with 0 columns, so `[0]` is safe.
+ return [df._native_frame[df.columns[0]].size.to_series()]
+
+ return self._expr(
+ func,
+ depth=0,
+ function_name="len",
+ evaluate_output_names=lambda _df: ["len"],
+ alias_output_names=None,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def all_horizontal(self, *exprs: DaskExpr) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ series = align_series_full_broadcast(
+ df, *(s for _expr in exprs for s in _expr(df))
+ )
+ return [reduce(operator.and_, series)]
+
+ return self._expr(
+ call=func,
+ depth=max(x._depth for x in exprs) + 1,
+ function_name="all_horizontal",
+ evaluate_output_names=combine_evaluate_output_names(*exprs),
+ alias_output_names=combine_alias_output_names(*exprs),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def any_horizontal(self, *exprs: DaskExpr) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ series = align_series_full_broadcast(
+ df, *(s for _expr in exprs for s in _expr(df))
+ )
+ return [reduce(operator.or_, series)]
+
+ return self._expr(
+ call=func,
+ depth=max(x._depth for x in exprs) + 1,
+ function_name="any_horizontal",
+ evaluate_output_names=combine_evaluate_output_names(*exprs),
+ alias_output_names=combine_alias_output_names(*exprs),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def sum_horizontal(self, *exprs: DaskExpr) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ series = align_series_full_broadcast(
+ df, *(s for _expr in exprs for s in _expr(df))
+ )
+ return [dd.concat(series, axis=1).sum(axis=1)]
+
+ return self._expr(
+ call=func,
+ depth=max(x._depth for x in exprs) + 1,
+ function_name="sum_horizontal",
+ evaluate_output_names=combine_evaluate_output_names(*exprs),
+ alias_output_names=combine_alias_output_names(*exprs),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def concat(
+ self, items: Iterable[DaskLazyFrame], *, how: ConcatMethod
+ ) -> DaskLazyFrame:
+ if not items:
+ msg = "No items to concatenate" # pragma: no cover
+ raise AssertionError(msg)
+ dfs = [i._native_frame for i in items]
+ cols_0 = dfs[0].columns
+ if how == "vertical":
+ for i, df in enumerate(dfs[1:], start=1):
+ cols_current = df.columns
+ if not (
+ (len(cols_current) == len(cols_0)) and (cols_current == cols_0).all()
+ ):
+ msg = (
+ "unable to vstack, column names don't match:\n"
+ f" - dataframe 0: {cols_0.to_list()}\n"
+ f" - dataframe {i}: {cols_current.to_list()}\n"
+ )
+ raise TypeError(msg)
+ return DaskLazyFrame(
+ dd.concat(dfs, axis=0, join="inner"),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+ if how == "diagonal":
+ return DaskLazyFrame(
+ dd.concat(dfs, axis=0, join="outer"),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ raise NotImplementedError
+
+ def mean_horizontal(self, *exprs: DaskExpr) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ expr_results = [s for _expr in exprs for s in _expr(df)]
+ series = align_series_full_broadcast(df, *(s.fillna(0) for s in expr_results))
+ non_na = align_series_full_broadcast(
+ df, *(1 - s.isna() for s in expr_results)
+ )
+ num = reduce(lambda x, y: x + y, series) # pyright: ignore[reportOperatorIssue]
+ den = reduce(lambda x, y: x + y, non_na) # pyright: ignore[reportOperatorIssue]
+ return [cast("dx.Series", num / den)] # pyright: ignore[reportOperatorIssue]
+
+ return self._expr(
+ call=func,
+ depth=max(x._depth for x in exprs) + 1,
+ function_name="mean_horizontal",
+ evaluate_output_names=combine_evaluate_output_names(*exprs),
+ alias_output_names=combine_alias_output_names(*exprs),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def min_horizontal(self, *exprs: DaskExpr) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ series = align_series_full_broadcast(
+ df, *(s for _expr in exprs for s in _expr(df))
+ )
+
+ return [dd.concat(series, axis=1).min(axis=1)]
+
+ return self._expr(
+ call=func,
+ depth=max(x._depth for x in exprs) + 1,
+ function_name="min_horizontal",
+ evaluate_output_names=combine_evaluate_output_names(*exprs),
+ alias_output_names=combine_alias_output_names(*exprs),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def max_horizontal(self, *exprs: DaskExpr) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ series = align_series_full_broadcast(
+ df, *(s for _expr in exprs for s in _expr(df))
+ )
+
+ return [dd.concat(series, axis=1).max(axis=1)]
+
+ return self._expr(
+ call=func,
+ depth=max(x._depth for x in exprs) + 1,
+ function_name="max_horizontal",
+ evaluate_output_names=combine_evaluate_output_names(*exprs),
+ alias_output_names=combine_alias_output_names(*exprs),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def when(self, predicate: DaskExpr) -> DaskWhen:
+ return DaskWhen.from_expr(predicate, context=self)
+
+ def concat_str(
+ self, *exprs: DaskExpr, separator: str, ignore_nulls: bool
+ ) -> DaskExpr:
+ def func(df: DaskLazyFrame) -> list[dx.Series]:
+ expr_results = [s for _expr in exprs for s in _expr(df)]
+ series = (
+ s.astype(str) for s in align_series_full_broadcast(df, *expr_results)
+ )
+ null_mask = [s.isna() for s in align_series_full_broadcast(df, *expr_results)]
+
+ if not ignore_nulls:
+ null_mask_result = reduce(operator.or_, null_mask)
+ result = reduce(lambda x, y: x + separator + y, series).where(
+ ~null_mask_result, None
+ )
+ else:
+ init_value, *values = [
+ s.where(~nm, "") for s, nm in zip(series, null_mask)
+ ]
+
+ separators = (
+ nm.map({True: "", False: separator}, meta=str)
+ for nm in null_mask[:-1]
+ )
+ result = reduce(
+ operator.add, (s + v for s, v in zip(separators, values)), init_value
+ )
+
+ return [result]
+
+ return self._expr(
+ call=func,
+ depth=max(x._depth for x in exprs) + 1,
+ function_name="concat_str",
+ evaluate_output_names=getattr(
+ exprs[0], "_evaluate_output_names", lambda _df: ["literal"]
+ ),
+ alias_output_names=getattr(exprs[0], "_alias_output_names", None),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+
+class DaskWhen(CompliantWhen[DaskLazyFrame, "dx.Series", DaskExpr]):
+ @property
+ def _then(self) -> type[DaskThen]:
+ return DaskThen
+
+ def __call__(self, df: DaskLazyFrame) -> Sequence[dx.Series]:
+ then_value = (
+ self._then_value(df)[0]
+ if isinstance(self._then_value, DaskExpr)
+ else self._then_value
+ )
+ otherwise_value = (
+ self._otherwise_value(df)[0]
+ if isinstance(self._otherwise_value, DaskExpr)
+ else self._otherwise_value
+ )
+
+ condition = self._condition(df)[0]
+ # re-evaluate DataFrame if the condition aggregates to force
+ # then/otherwise to be evaluated against the aggregated frame
+ assert self._condition._metadata is not None # noqa: S101
+ if self._condition._metadata.is_scalar_like:
+ new_df = df._with_native(condition.to_frame())
+ condition = self._condition.broadcast(ExprKind.AGGREGATION)(df)[0]
+ df = new_df
+
+ if self._otherwise_value is None:
+ (condition, then_series) = align_series_full_broadcast(
+ df, condition, then_value
+ )
+ validate_comparand(condition, then_series)
+ return [then_series.where(condition)] # pyright: ignore[reportArgumentType]
+ (condition, then_series, otherwise_series) = align_series_full_broadcast(
+ df, condition, then_value, otherwise_value
+ )
+ validate_comparand(condition, then_series)
+ validate_comparand(condition, otherwise_series)
+ return [then_series.where(condition, otherwise_series)] # pyright: ignore[reportArgumentType]
+
+
+class DaskThen(CompliantThen[DaskLazyFrame, "dx.Series", DaskExpr], DaskExpr): ...
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/selectors.py b/venv/lib/python3.8/site-packages/narwhals/_dask/selectors.py
new file mode 100644
index 0000000..218b1e3
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/selectors.py
@@ -0,0 +1,30 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from narwhals._compliant import CompliantSelector, LazySelectorNamespace
+from narwhals._dask.expr import DaskExpr
+
+if TYPE_CHECKING:
+ import dask.dataframe.dask_expr as dx # noqa: F401
+
+ from narwhals._dask.dataframe import DaskLazyFrame # noqa: F401
+
+
+class DaskSelectorNamespace(LazySelectorNamespace["DaskLazyFrame", "dx.Series"]):
+ @property
+ def _selector(self) -> type[DaskSelector]:
+ return DaskSelector
+
+
+class DaskSelector(CompliantSelector["DaskLazyFrame", "dx.Series"], DaskExpr): # type: ignore[misc]
+ def _to_expr(self) -> DaskExpr:
+ return DaskExpr(
+ self._call,
+ depth=self._depth,
+ function_name=self._function_name,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
diff --git a/venv/lib/python3.8/site-packages/narwhals/_dask/utils.py b/venv/lib/python3.8/site-packages/narwhals/_dask/utils.py
new file mode 100644
index 0000000..fa2a2b0
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_dask/utils.py
@@ -0,0 +1,160 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from narwhals._pandas_like.utils import select_columns_by_name
+from narwhals._utils import (
+ Implementation,
+ Version,
+ isinstance_or_issubclass,
+ parse_version,
+)
+from narwhals.dependencies import get_pandas, get_pyarrow
+
+if TYPE_CHECKING:
+ import dask.dataframe as dd
+ import dask.dataframe.dask_expr as dx
+
+ from narwhals._dask.dataframe import DaskLazyFrame
+ from narwhals._dask.expr import DaskExpr
+ from narwhals.typing import IntoDType
+else:
+ try:
+ import dask.dataframe.dask_expr as dx
+ except ModuleNotFoundError: # pragma: no cover
+ import dask_expr as dx
+
+
+def maybe_evaluate_expr(df: DaskLazyFrame, obj: DaskExpr | object) -> dx.Series | object:
+ from narwhals._dask.expr import DaskExpr
+
+ if isinstance(obj, DaskExpr):
+ results = obj._call(df)
+ assert len(results) == 1 # debug assertion # noqa: S101
+ return results[0]
+ return obj
+
+
+def evaluate_exprs(df: DaskLazyFrame, /, *exprs: DaskExpr) -> list[tuple[str, dx.Series]]:
+ native_results: list[tuple[str, dx.Series]] = []
+ for expr in exprs:
+ native_series_list = expr(df)
+ aliases = expr._evaluate_aliases(df)
+ if len(aliases) != len(native_series_list): # pragma: no cover
+ msg = f"Internal error: got aliases {aliases}, but only got {len(native_series_list)} results"
+ raise AssertionError(msg)
+ native_results.extend(zip(aliases, native_series_list))
+ return native_results
+
+
+def align_series_full_broadcast(
+ df: DaskLazyFrame, *series: dx.Series | object
+) -> Sequence[dx.Series]:
+ return [
+ s if isinstance(s, dx.Series) else df._native_frame.assign(_tmp=s)["_tmp"]
+ for s in series
+ ] # pyright: ignore[reportReturnType]
+
+
+def add_row_index(
+ frame: dd.DataFrame,
+ name: str,
+ backend_version: tuple[int, ...],
+ implementation: Implementation,
+) -> dd.DataFrame:
+ original_cols = frame.columns
+ frame = frame.assign(**{name: 1})
+ return select_columns_by_name(
+ frame.assign(**{name: frame[name].cumsum(method="blelloch") - 1}),
+ [name, *original_cols],
+ backend_version,
+ implementation,
+ )
+
+
+def validate_comparand(lhs: dx.Series, rhs: dx.Series) -> None:
+ if not dx.expr.are_co_aligned(lhs._expr, rhs._expr): # pragma: no cover
+ # are_co_aligned is a method which cheaply checks if two Dask expressions
+ # have the same index, and therefore don't require index alignment.
+ # If someone only operates on a Dask DataFrame via expressions, then this
+ # should always be the case: expression outputs (by definition) all come from the
+ # same input dataframe, and Dask Series does not have any operations which
+ # change the index. Nonetheless, we perform this safety check anyway.
+
+ # However, we still need to carefully vet which methods we support for Dask, to
+ # avoid issues where `are_co_aligned` doesn't do what we want it to do:
+ # https://github.com/dask/dask-expr/issues/1112.
+ msg = "Objects are not co-aligned, so this operation is not supported for Dask backend"
+ raise RuntimeError(msg)
+
+
+def narwhals_to_native_dtype(dtype: IntoDType, version: Version) -> Any: # noqa: C901, PLR0912
+ dtypes = version.dtypes
+ if isinstance_or_issubclass(dtype, dtypes.Float64):
+ return "float64"
+ if isinstance_or_issubclass(dtype, dtypes.Float32):
+ return "float32"
+ if isinstance_or_issubclass(dtype, dtypes.Int64):
+ return "int64"
+ if isinstance_or_issubclass(dtype, dtypes.Int32):
+ return "int32"
+ if isinstance_or_issubclass(dtype, dtypes.Int16):
+ return "int16"
+ if isinstance_or_issubclass(dtype, dtypes.Int8):
+ return "int8"
+ if isinstance_or_issubclass(dtype, dtypes.UInt64):
+ return "uint64"
+ if isinstance_or_issubclass(dtype, dtypes.UInt32):
+ return "uint32"
+ if isinstance_or_issubclass(dtype, dtypes.UInt16):
+ return "uint16"
+ if isinstance_or_issubclass(dtype, dtypes.UInt8):
+ return "uint8"
+ if isinstance_or_issubclass(dtype, dtypes.String):
+ if (pd := get_pandas()) is not None and parse_version(pd) >= (2, 0, 0):
+ if get_pyarrow() is not None:
+ return "string[pyarrow]"
+ return "string[python]" # pragma: no cover
+ return "object" # pragma: no cover
+ if isinstance_or_issubclass(dtype, dtypes.Boolean):
+ return "bool"
+ if isinstance_or_issubclass(dtype, dtypes.Enum):
+ if version is Version.V1:
+ msg = "Converting to Enum is not supported in narwhals.stable.v1"
+ raise NotImplementedError(msg)
+ if isinstance(dtype, dtypes.Enum):
+ import pandas as pd
+
+ # NOTE: `pandas-stubs.core.dtypes.dtypes.CategoricalDtype.categories` is too narrow
+ # Should be one of the `ListLike*` types
+ # https://github.com/pandas-dev/pandas-stubs/blob/8434bde95460b996323cc8c0fea7b0a8bb00ea26/pandas-stubs/_typing.pyi#L497-L505
+ return pd.CategoricalDtype(dtype.categories, ordered=True) # pyright: ignore[reportArgumentType]
+ msg = "Can not cast / initialize Enum without categories present"
+ raise ValueError(msg)
+
+ if isinstance_or_issubclass(dtype, dtypes.Categorical):
+ return "category"
+ if isinstance_or_issubclass(dtype, dtypes.Datetime):
+ return "datetime64[us]"
+ if isinstance_or_issubclass(dtype, dtypes.Date):
+ return "date32[day][pyarrow]"
+ if isinstance_or_issubclass(dtype, dtypes.Duration):
+ return "timedelta64[ns]"
+ if isinstance_or_issubclass(dtype, dtypes.List): # pragma: no cover
+ msg = "Converting to List dtype is not supported yet"
+ return NotImplementedError(msg)
+ if isinstance_or_issubclass(dtype, dtypes.Struct): # pragma: no cover
+ msg = "Converting to Struct dtype is not supported yet"
+ return NotImplementedError(msg)
+ if isinstance_or_issubclass(dtype, dtypes.Array): # pragma: no cover
+ msg = "Converting to Array dtype is not supported yet"
+ return NotImplementedError(msg)
+ if isinstance_or_issubclass(dtype, dtypes.Time): # pragma: no cover
+ msg = "Converting to Time dtype is not supported yet"
+ return NotImplementedError(msg)
+ if isinstance_or_issubclass(dtype, dtypes.Binary): # pragma: no cover
+ msg = "Converting to Binary dtype is not supported yet"
+ return NotImplementedError(msg)
+
+ msg = f"Unknown dtype: {dtype}" # pragma: no cover
+ raise AssertionError(msg)