aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/narwhals/_duckdb
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/narwhals/_duckdb
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/narwhals/_duckdb')
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/__init__.py0
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/dataframe.py512
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py898
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_dt.py160
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_list.py18
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_str.py103
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_struct.py20
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/group_by.py31
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/namespace.py207
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/selectors.py31
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/series.py44
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/utils.py287
12 files changed, 2311 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/__init__.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/__init__.py
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/dataframe.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/dataframe.py
new file mode 100644
index 0000000..6b4b197
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/dataframe.py
@@ -0,0 +1,512 @@
+from __future__ import annotations
+
+import contextlib
+from functools import reduce
+from operator import and_
+from typing import TYPE_CHECKING, Any, Iterator, Mapping, Sequence
+
+import duckdb
+from duckdb import FunctionExpression, StarExpression
+
+from narwhals._duckdb.utils import (
+ DeferredTimeZone,
+ col,
+ evaluate_exprs,
+ generate_partition_by_sql,
+ lit,
+ native_to_narwhals_dtype,
+)
+from narwhals._utils import (
+ Implementation,
+ Version,
+ generate_temporary_column_name,
+ not_implemented,
+ parse_columns_to_drop,
+ parse_version,
+ validate_backend_version,
+)
+from narwhals.dependencies import get_duckdb
+from narwhals.exceptions import InvalidOperationError
+from narwhals.typing import CompliantLazyFrame
+
+if TYPE_CHECKING:
+ from types import ModuleType
+
+ import pandas as pd
+ import pyarrow as pa
+ from duckdb import Expression
+ from duckdb.typing import DuckDBPyType
+ from typing_extensions import Self, TypeIs
+
+ from narwhals._compliant.typing import CompliantDataFrameAny
+ from narwhals._duckdb.expr import DuckDBExpr
+ from narwhals._duckdb.group_by import DuckDBGroupBy
+ from narwhals._duckdb.namespace import DuckDBNamespace
+ from narwhals._duckdb.series import DuckDBInterchangeSeries
+ from narwhals._utils import _FullContext
+ from narwhals.dataframe import LazyFrame
+ from narwhals.dtypes import DType
+ from narwhals.stable.v1 import DataFrame as DataFrameV1
+ from narwhals.typing import AsofJoinStrategy, JoinStrategy, LazyUniqueKeepStrategy
+
+with contextlib.suppress(ImportError): # requires duckdb>=1.3.0
+ from duckdb import SQLExpression
+
+
+class DuckDBLazyFrame(
+ CompliantLazyFrame[
+ "DuckDBExpr",
+ "duckdb.DuckDBPyRelation",
+ "LazyFrame[duckdb.DuckDBPyRelation] | DataFrameV1[duckdb.DuckDBPyRelation]",
+ ]
+):
+ _implementation = Implementation.DUCKDB
+
+ def __init__(
+ self,
+ df: duckdb.DuckDBPyRelation,
+ *,
+ backend_version: tuple[int, ...],
+ version: Version,
+ ) -> None:
+ self._native_frame: duckdb.DuckDBPyRelation = df
+ self._version = version
+ self._backend_version = backend_version
+ self._cached_native_schema: dict[str, DuckDBPyType] | None = None
+ self._cached_columns: list[str] | None = None
+ validate_backend_version(self._implementation, self._backend_version)
+
+ @staticmethod
+ def _is_native(obj: duckdb.DuckDBPyRelation | Any) -> TypeIs[duckdb.DuckDBPyRelation]:
+ return isinstance(obj, duckdb.DuckDBPyRelation)
+
+ @classmethod
+ def from_native(
+ cls, data: duckdb.DuckDBPyRelation, /, *, context: _FullContext
+ ) -> Self:
+ return cls(
+ data, backend_version=context._backend_version, version=context._version
+ )
+
+ def to_narwhals(
+ self, *args: Any, **kwds: Any
+ ) -> LazyFrame[duckdb.DuckDBPyRelation] | DataFrameV1[duckdb.DuckDBPyRelation]:
+ if self._version is Version.MAIN:
+ return self._version.lazyframe(self, level="lazy")
+
+ from narwhals.stable.v1 import DataFrame as DataFrameV1
+
+ return DataFrameV1(self, level="interchange") # type: ignore[no-any-return]
+
+ def __narwhals_dataframe__(self) -> Self: # pragma: no cover
+ # Keep around for backcompat.
+ if self._version is not Version.V1:
+ msg = "__narwhals_dataframe__ is not implemented for DuckDBLazyFrame"
+ raise AttributeError(msg)
+ return self
+
+ def __narwhals_lazyframe__(self) -> Self:
+ return self
+
+ def __native_namespace__(self) -> ModuleType:
+ return get_duckdb() # type: ignore[no-any-return]
+
+ def __narwhals_namespace__(self) -> DuckDBNamespace:
+ from narwhals._duckdb.namespace import DuckDBNamespace
+
+ return DuckDBNamespace(
+ backend_version=self._backend_version, version=self._version
+ )
+
+ def get_column(self, name: str) -> DuckDBInterchangeSeries:
+ from narwhals._duckdb.series import DuckDBInterchangeSeries
+
+ return DuckDBInterchangeSeries(self.native.select(name), version=self._version)
+
+ def _iter_columns(self) -> Iterator[Expression]:
+ for name in self.columns:
+ yield col(name)
+
+ def collect(
+ self, backend: ModuleType | Implementation | str | None, **kwargs: Any
+ ) -> CompliantDataFrameAny:
+ if backend is None or backend is Implementation.PYARROW:
+ import pyarrow as pa # ignore-banned-import
+
+ from narwhals._arrow.dataframe import ArrowDataFrame
+
+ return ArrowDataFrame(
+ self.native.arrow(),
+ backend_version=parse_version(pa),
+ version=self._version,
+ validate_column_names=True,
+ )
+
+ if backend is Implementation.PANDAS:
+ import pandas as pd # ignore-banned-import
+
+ from narwhals._pandas_like.dataframe import PandasLikeDataFrame
+
+ return PandasLikeDataFrame(
+ self.native.df(),
+ implementation=Implementation.PANDAS,
+ backend_version=parse_version(pd),
+ version=self._version,
+ validate_column_names=True,
+ )
+
+ if backend is Implementation.POLARS:
+ import polars as pl # ignore-banned-import
+
+ from narwhals._polars.dataframe import PolarsDataFrame
+
+ return PolarsDataFrame(
+ self.native.pl(), backend_version=parse_version(pl), version=self._version
+ )
+
+ msg = f"Unsupported `backend` value: {backend}" # pragma: no cover
+ raise ValueError(msg) # pragma: no cover
+
+ def head(self, n: int) -> Self:
+ return self._with_native(self.native.limit(n))
+
+ def simple_select(self, *column_names: str) -> Self:
+ return self._with_native(self.native.select(*column_names))
+
+ def aggregate(self, *exprs: DuckDBExpr) -> Self:
+ selection = [val.alias(name) for name, val in evaluate_exprs(self, *exprs)]
+ return self._with_native(self.native.aggregate(selection)) # type: ignore[arg-type]
+
+ def select(self, *exprs: DuckDBExpr) -> Self:
+ selection = (val.alias(name) for name, val in evaluate_exprs(self, *exprs))
+ return self._with_native(self.native.select(*selection))
+
+ def drop(self, columns: Sequence[str], *, strict: bool) -> Self:
+ columns_to_drop = parse_columns_to_drop(self, columns, strict=strict)
+ selection = (name for name in self.columns if name not in columns_to_drop)
+ return self._with_native(self.native.select(*selection))
+
+ def lazy(self, *, backend: Implementation | None = None) -> Self:
+ # The `backend`` argument has no effect but we keep it here for
+ # backwards compatibility because in `narwhals.stable.v1`
+ # function `.from_native()` will return a DataFrame for DuckDB.
+
+ if backend is not None: # pragma: no cover
+ msg = "`backend` argument is not supported for DuckDB"
+ raise ValueError(msg)
+ return self
+
+ def with_columns(self, *exprs: DuckDBExpr) -> Self:
+ new_columns_map = dict(evaluate_exprs(self, *exprs))
+ result = [
+ new_columns_map.pop(name).alias(name)
+ if name in new_columns_map
+ else col(name)
+ for name in self.columns
+ ]
+ result.extend(value.alias(name) for name, value in new_columns_map.items())
+ return self._with_native(self.native.select(*result))
+
+ def filter(self, predicate: DuckDBExpr) -> Self:
+ # `[0]` is safe as the predicate's expression only returns a single column
+ mask = predicate(self)[0]
+ return self._with_native(self.native.filter(mask))
+
+ @property
+ def schema(self) -> dict[str, DType]:
+ if self._cached_native_schema is None:
+ # Note: prefer `self._cached_native_schema` over `functools.cached_property`
+ # due to Python3.13 failures.
+ self._cached_native_schema = dict(zip(self.columns, self.native.types))
+
+ deferred_time_zone = DeferredTimeZone(self.native)
+ return {
+ column_name: native_to_narwhals_dtype(
+ duckdb_dtype, self._version, deferred_time_zone
+ )
+ for column_name, duckdb_dtype in zip(self.native.columns, self.native.types)
+ }
+
+ @property
+ def columns(self) -> list[str]:
+ if self._cached_columns is None:
+ self._cached_columns = (
+ list(self.schema)
+ if self._cached_native_schema is not None
+ else self.native.columns
+ )
+ return self._cached_columns
+
+ def to_pandas(self) -> pd.DataFrame:
+ # only if version is v1, keep around for backcompat
+ import pandas as pd # ignore-banned-import()
+
+ if parse_version(pd) >= (1, 0, 0):
+ return self.native.df()
+ else: # pragma: no cover
+ msg = f"Conversion to pandas requires 'pandas>=1.0.0', found {pd.__version__}"
+ raise NotImplementedError(msg)
+
+ def to_arrow(self) -> pa.Table:
+ # only if version is v1, keep around for backcompat
+ return self.native.arrow()
+
+ def _with_version(self, version: Version) -> Self:
+ return self.__class__(
+ self.native, version=version, backend_version=self._backend_version
+ )
+
+ def _with_native(self, df: duckdb.DuckDBPyRelation) -> Self:
+ return self.__class__(
+ df, backend_version=self._backend_version, version=self._version
+ )
+
+ def group_by(
+ self, keys: Sequence[str] | Sequence[DuckDBExpr], *, drop_null_keys: bool
+ ) -> DuckDBGroupBy:
+ from narwhals._duckdb.group_by import DuckDBGroupBy
+
+ return DuckDBGroupBy(self, keys, drop_null_keys=drop_null_keys)
+
+ def rename(self, mapping: Mapping[str, str]) -> Self:
+ df = self.native
+ selection = (
+ col(name).alias(mapping[name]) if name in mapping else col(name)
+ for name in df.columns
+ )
+ return self._with_native(self.native.select(*selection))
+
+ def join(
+ self,
+ other: Self,
+ *,
+ how: JoinStrategy,
+ left_on: Sequence[str] | None,
+ right_on: Sequence[str] | None,
+ suffix: str,
+ ) -> Self:
+ native_how = "outer" if how == "full" else how
+
+ if native_how == "cross":
+ if self._backend_version < (1, 1, 4):
+ msg = f"'duckdb>=1.1.4' is required for cross-join, found version: {self._backend_version}"
+ raise NotImplementedError(msg)
+ rel = self.native.set_alias("lhs").cross(other.native.set_alias("rhs"))
+ else:
+ # help mypy
+ assert left_on is not None # noqa: S101
+ assert right_on is not None # noqa: S101
+ it = (
+ col(f'lhs."{left}"') == col(f'rhs."{right}"')
+ for left, right in zip(left_on, right_on)
+ )
+ condition: Expression = reduce(and_, it)
+ rel = self.native.set_alias("lhs").join(
+ other.native.set_alias("rhs"),
+ # NOTE: Fixed in `--pre` https://github.com/duckdb/duckdb/pull/16933
+ condition=condition, # type: ignore[arg-type, unused-ignore]
+ how=native_how,
+ )
+
+ if native_how in {"inner", "left", "cross", "outer"}:
+ select = [col(f'lhs."{x}"') for x in self.columns]
+ for name in other.columns:
+ col_in_lhs: bool = name in self.columns
+ if native_how == "outer" and not col_in_lhs:
+ select.append(col(f'rhs."{name}"'))
+ elif (native_how == "outer") or (
+ col_in_lhs and (right_on is None or name not in right_on)
+ ):
+ select.append(col(f'rhs."{name}"').alias(f"{name}{suffix}"))
+ elif right_on is None or name not in right_on:
+ select.append(col(name))
+ res = rel.select(*select).set_alias(self.native.alias)
+ else: # semi, anti
+ res = rel.select("lhs.*").set_alias(self.native.alias)
+
+ return self._with_native(res)
+
+ def join_asof(
+ self,
+ other: Self,
+ *,
+ left_on: str,
+ right_on: str,
+ by_left: Sequence[str] | None,
+ by_right: Sequence[str] | None,
+ strategy: AsofJoinStrategy,
+ suffix: str,
+ ) -> Self:
+ lhs = self.native
+ rhs = other.native
+ conditions: list[Expression] = []
+ if by_left is not None and by_right is not None:
+ conditions.extend(
+ col(f'lhs."{left}"') == col(f'rhs."{right}"')
+ for left, right in zip(by_left, by_right)
+ )
+ else:
+ by_left = by_right = []
+ if strategy == "backward":
+ conditions.append(col(f'lhs."{left_on}"') >= col(f'rhs."{right_on}"'))
+ elif strategy == "forward":
+ conditions.append(col(f'lhs."{left_on}"') <= col(f'rhs."{right_on}"'))
+ else:
+ msg = "Only 'backward' and 'forward' strategies are currently supported for DuckDB"
+ raise NotImplementedError(msg)
+ condition: Expression = reduce(and_, conditions)
+ select = ["lhs.*"]
+ for name in rhs.columns:
+ if name in lhs.columns and (
+ right_on is None or name not in {right_on, *by_right}
+ ):
+ select.append(f'rhs."{name}" as "{name}{suffix}"')
+ elif right_on is None or name not in {right_on, *by_right}:
+ select.append(str(col(name)))
+ # Replace with Python API call once
+ # https://github.com/duckdb/duckdb/discussions/16947 is addressed.
+ query = f"""
+ SELECT {",".join(select)}
+ FROM lhs
+ ASOF LEFT JOIN rhs
+ ON {condition}
+ """ # noqa: S608
+ return self._with_native(duckdb.sql(query))
+
+ def collect_schema(self) -> dict[str, DType]:
+ return self.schema
+
+ def unique(
+ self, subset: Sequence[str] | None, *, keep: LazyUniqueKeepStrategy
+ ) -> Self:
+ if subset_ := subset if keep == "any" else (subset or self.columns):
+ if self._backend_version < (1, 3):
+ msg = (
+ "At least version 1.3 of DuckDB is required for `unique` operation\n"
+ "with `subset` specified."
+ )
+ raise NotImplementedError(msg)
+ # Sanitise input
+ if error := self._check_columns_exist(subset_):
+ raise error
+ idx_name = generate_temporary_column_name(8, self.columns)
+ count_name = generate_temporary_column_name(8, [*self.columns, idx_name])
+ partition_by_sql = generate_partition_by_sql(*(subset_))
+ name = count_name if keep == "none" else idx_name
+ idx_expr = SQLExpression(
+ f"{FunctionExpression('row_number')} over ({partition_by_sql})"
+ ).alias(idx_name)
+ count_expr = SQLExpression(
+ f"{FunctionExpression('count', StarExpression())} over ({partition_by_sql})"
+ ).alias(count_name)
+ return self._with_native(
+ self.native.select(StarExpression(), idx_expr, count_expr)
+ .filter(col(name) == lit(1))
+ .select(StarExpression(exclude=[count_name, idx_name]))
+ )
+ return self._with_native(self.native.unique(", ".join(self.columns)))
+
+ def sort(self, *by: str, descending: bool | Sequence[bool], nulls_last: bool) -> Self:
+ if isinstance(descending, bool):
+ descending = [descending] * len(by)
+ if nulls_last:
+ it = (
+ col(name).nulls_last() if not desc else col(name).desc().nulls_last()
+ for name, desc in zip(by, descending)
+ )
+ else:
+ it = (
+ col(name).nulls_first() if not desc else col(name).desc().nulls_first()
+ for name, desc in zip(by, descending)
+ )
+ return self._with_native(self.native.sort(*it))
+
+ def drop_nulls(self, subset: Sequence[str] | None) -> Self:
+ subset_ = subset if subset is not None else self.columns
+ keep_condition = reduce(and_, (col(name).isnotnull() for name in subset_))
+ return self._with_native(self.native.filter(keep_condition))
+
+ def explode(self, columns: Sequence[str]) -> Self:
+ dtypes = self._version.dtypes
+ schema = self.collect_schema()
+ for name in columns:
+ dtype = schema[name]
+ if dtype != dtypes.List:
+ msg = (
+ f"`explode` operation not supported for dtype `{dtype}`, "
+ "expected List type"
+ )
+ raise InvalidOperationError(msg)
+
+ if len(columns) != 1:
+ msg = (
+ "Exploding on multiple columns is not supported with DuckDB backend since "
+ "we cannot guarantee that the exploded columns have matching element counts."
+ )
+ raise NotImplementedError(msg)
+
+ col_to_explode = col(columns[0])
+ rel = self.native
+ original_columns = self.columns
+
+ not_null_condition = col_to_explode.isnotnull() & FunctionExpression(
+ "len", col_to_explode
+ ) > lit(0)
+ non_null_rel = rel.filter(not_null_condition).select(
+ *(
+ FunctionExpression("unnest", col_to_explode).alias(name)
+ if name in columns
+ else name
+ for name in original_columns
+ )
+ )
+
+ null_rel = rel.filter(~not_null_condition).select(
+ *(
+ lit(None).alias(name) if name in columns else name
+ for name in original_columns
+ )
+ )
+
+ return self._with_native(non_null_rel.union(null_rel))
+
+ def unpivot(
+ self,
+ on: Sequence[str] | None,
+ index: Sequence[str] | None,
+ variable_name: str,
+ value_name: str,
+ ) -> Self:
+ index_ = [] if index is None else index
+ on_ = [c for c in self.columns if c not in index_] if on is None else on
+
+ if variable_name == "":
+ msg = "`variable_name` cannot be empty string for duckdb backend."
+ raise NotImplementedError(msg)
+
+ if value_name == "":
+ msg = "`value_name` cannot be empty string for duckdb backend."
+ raise NotImplementedError(msg)
+
+ unpivot_on = ", ".join(str(col(name)) for name in on_)
+ rel = self.native # noqa: F841
+ # Replace with Python API once
+ # https://github.com/duckdb/duckdb/discussions/16980 is addressed.
+ query = f"""
+ unpivot rel
+ on {unpivot_on}
+ into
+ name "{variable_name}"
+ value "{value_name}"
+ """
+ return self._with_native(
+ duckdb.sql(query).select(*[*index_, variable_name, value_name])
+ )
+
+ gather_every = not_implemented.deprecated(
+ "`LazyFrame.gather_every` is deprecated and will be removed in a future version."
+ )
+ tail = not_implemented.deprecated(
+ "`LazyFrame.tail` is deprecated and will be removed in a future version."
+ )
+ with_row_index = not_implemented()
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py
new file mode 100644
index 0000000..b3d55f3
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py
@@ -0,0 +1,898 @@
+from __future__ import annotations
+
+import contextlib
+import operator
+from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence, cast
+
+from duckdb import CoalesceOperator, FunctionExpression, StarExpression
+from duckdb.typing import DuckDBPyType
+
+from narwhals._compliant import LazyExpr
+from narwhals._compliant.window import WindowInputs
+from narwhals._duckdb.expr_dt import DuckDBExprDateTimeNamespace
+from narwhals._duckdb.expr_list import DuckDBExprListNamespace
+from narwhals._duckdb.expr_str import DuckDBExprStringNamespace
+from narwhals._duckdb.expr_struct import DuckDBExprStructNamespace
+from narwhals._duckdb.utils import (
+ col,
+ generate_order_by_sql,
+ generate_partition_by_sql,
+ lit,
+ narwhals_to_native_dtype,
+ when,
+)
+from narwhals._expression_parsing import ExprKind
+from narwhals._utils import Implementation, not_implemented, requires
+
+if TYPE_CHECKING:
+ from duckdb import Expression
+ from typing_extensions import Self
+
+ from narwhals._compliant.typing import (
+ AliasNames,
+ EvalNames,
+ EvalSeries,
+ WindowFunction,
+ )
+ from narwhals._duckdb.dataframe import DuckDBLazyFrame
+ from narwhals._duckdb.namespace import DuckDBNamespace
+ from narwhals._expression_parsing import ExprMetadata
+ from narwhals._utils import Version, _FullContext
+ from narwhals.typing import (
+ FillNullStrategy,
+ IntoDType,
+ NonNestedLiteral,
+ NumericLiteral,
+ RankMethod,
+ RollingInterpolationMethod,
+ TemporalLiteral,
+ )
+
+ DuckDBWindowFunction = WindowFunction[DuckDBLazyFrame, Expression]
+ DuckDBWindowInputs = WindowInputs[Expression]
+
+
+with contextlib.suppress(ImportError): # requires duckdb>=1.3.0
+ from duckdb import SQLExpression
+
+
+class DuckDBExpr(LazyExpr["DuckDBLazyFrame", "Expression"]):
+ _implementation = Implementation.DUCKDB
+
+ def __init__(
+ self,
+ call: EvalSeries[DuckDBLazyFrame, Expression],
+ window_function: DuckDBWindowFunction | None = None,
+ *,
+ evaluate_output_names: EvalNames[DuckDBLazyFrame],
+ alias_output_names: AliasNames | None,
+ backend_version: tuple[int, ...],
+ version: Version,
+ ) -> None:
+ self._call = call
+ self._evaluate_output_names = evaluate_output_names
+ self._alias_output_names = alias_output_names
+ self._backend_version = backend_version
+ self._version = version
+ self._metadata: ExprMetadata | None = None
+ self._window_function: DuckDBWindowFunction | None = window_function
+
+ @property
+ def window_function(self) -> DuckDBWindowFunction:
+ def default_window_func(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ assert not window_inputs.order_by # noqa: S101
+ partition_by_sql = generate_partition_by_sql(*window_inputs.partition_by)
+ template = f"{{expr}} over ({partition_by_sql})"
+ return [SQLExpression(template.format(expr=expr)) for expr in self(df)]
+
+ return self._window_function or default_window_func
+
+ def __call__(self, df: DuckDBLazyFrame) -> Sequence[Expression]:
+ return self._call(df)
+
+ def __narwhals_expr__(self) -> None: ...
+
+ def __narwhals_namespace__(self) -> DuckDBNamespace: # pragma: no cover
+ # Unused, just for compatibility with PandasLikeExpr
+ from narwhals._duckdb.namespace import DuckDBNamespace
+
+ return DuckDBNamespace(
+ backend_version=self._backend_version, version=self._version
+ )
+
+ def _cum_window_func(
+ self,
+ *,
+ reverse: bool,
+ func_name: Literal["sum", "max", "min", "count", "product"],
+ ) -> DuckDBWindowFunction:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=not reverse)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+ sql = (
+ f"{func_name} ({{expr}}) over ({partition_by_sql} {order_by_sql} "
+ "rows between unbounded preceding and current row)"
+ )
+ return [SQLExpression(sql.format(expr=expr)) for expr in self(df)]
+
+ return func
+
+ def _rolling_window_func(
+ self,
+ *,
+ func_name: Literal["sum", "mean", "std", "var"],
+ center: bool,
+ window_size: int,
+ min_samples: int,
+ ddof: int | None = None,
+ ) -> DuckDBWindowFunction:
+ supported_funcs = ["sum", "mean", "std", "var"]
+ if center:
+ half = (window_size - 1) // 2
+ remainder = (window_size - 1) % 2
+ start = f"{half + remainder} preceding"
+ end = f"{half} following"
+ else:
+ start = f"{window_size - 1} preceding"
+ end = "current row"
+
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+ window = f"({partition_by_sql} {order_by_sql} rows between {start} and {end})"
+ if func_name in {"sum", "mean"}:
+ func_: str = func_name
+ elif func_name == "var" and ddof == 0:
+ func_ = "var_pop"
+ elif func_name in "var" and ddof == 1:
+ func_ = "var_samp"
+ elif func_name == "std" and ddof == 0:
+ func_ = "stddev_pop"
+ elif func_name == "std" and ddof == 1:
+ func_ = "stddev_samp"
+ elif func_name in {"var", "std"}: # pragma: no cover
+ msg = f"Only ddof=0 and ddof=1 are currently supported for rolling_{func_name}."
+ raise ValueError(msg)
+ else: # pragma: no cover
+ msg = f"Only the following functions are supported: {supported_funcs}.\nGot: {func_name}."
+ raise ValueError(msg)
+ condition_sql = f"count({{expr}}) over {window} >= {min_samples}"
+ value_sql = f"{func_}({{expr}}) over {window}"
+ return [
+ when(
+ SQLExpression(condition_sql.format(expr=expr)),
+ SQLExpression(value_sql.format(expr=expr)),
+ )
+ for expr in self(df)
+ ]
+
+ return func
+
+ def broadcast(self, kind: Literal[ExprKind.AGGREGATION, ExprKind.LITERAL]) -> Self:
+ if kind is ExprKind.LITERAL:
+ return self
+ if self._backend_version < (1, 3):
+ msg = "At least version 1.3 of DuckDB is required for binary operations between aggregates and columns."
+ raise NotImplementedError(msg)
+ return self.over([lit(1)], [])
+
+ @classmethod
+ def from_column_names(
+ cls,
+ evaluate_column_names: EvalNames[DuckDBLazyFrame],
+ /,
+ *,
+ context: _FullContext,
+ ) -> Self:
+ def func(df: DuckDBLazyFrame) -> list[Expression]:
+ return [col(name) for name in evaluate_column_names(df)]
+
+ return cls(
+ func,
+ evaluate_output_names=evaluate_column_names,
+ alias_output_names=None,
+ backend_version=context._backend_version,
+ version=context._version,
+ )
+
+ @classmethod
+ def from_column_indices(cls, *column_indices: int, context: _FullContext) -> Self:
+ def func(df: DuckDBLazyFrame) -> list[Expression]:
+ columns = df.columns
+ return [col(columns[i]) for i in column_indices]
+
+ return cls(
+ func,
+ evaluate_output_names=cls._eval_names_indices(column_indices),
+ alias_output_names=None,
+ backend_version=context._backend_version,
+ version=context._version,
+ )
+
+ def _callable_to_eval_series(
+ self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any
+ ) -> EvalSeries[DuckDBLazyFrame, Expression]:
+ def func(df: DuckDBLazyFrame) -> list[Expression]:
+ native_series_list = self(df)
+ other_native_series = {
+ key: df._evaluate_expr(value) if self._is_expr(value) else lit(value)
+ for key, value in expressifiable_args.items()
+ }
+ return [
+ call(native_series, **other_native_series)
+ for native_series in native_series_list
+ ]
+
+ return func
+
+ def _push_down_window_function(
+ self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any
+ ) -> DuckDBWindowFunction:
+ def window_f(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ # If a function `f` is elementwise, and `g` is another function, then
+ # - `f(g) over (window)`
+ # - `f(g over (window))
+ # are equivalent.
+ # Make sure to only use with if `call` is elementwise!
+ native_series_list = self.window_function(df, window_inputs)
+ other_native_series = {
+ key: df._evaluate_window_expr(value, window_inputs)
+ if self._is_expr(value)
+ else lit(value)
+ for key, value in expressifiable_args.items()
+ }
+ return [
+ call(native_series, **other_native_series)
+ for native_series in native_series_list
+ ]
+
+ return window_f
+
+ def _with_callable(
+ self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any
+ ) -> Self:
+ """Create expression from callable.
+
+ Arguments:
+ call: Callable from compliant DataFrame to native Expression
+ expr_name: Expression name
+ expressifiable_args: arguments pass to expression which should be parsed
+ as expressions (e.g. in `nw.col('a').is_between('b', 'c')`)
+ """
+ return self.__class__(
+ self._callable_to_eval_series(call, **expressifiable_args),
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def _with_elementwise(
+ self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any
+ ) -> Self:
+ return self.__class__(
+ self._callable_to_eval_series(call, **expressifiable_args),
+ self._push_down_window_function(call, **expressifiable_args),
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def _with_binary(self, op: Callable[..., Expression], other: Self | Any) -> Self:
+ return self.__class__(
+ self._callable_to_eval_series(op, other=other),
+ self._push_down_window_function(op, other=other),
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def _with_alias_output_names(self, func: AliasNames | None, /) -> Self:
+ return type(self)(
+ self._call,
+ self._window_function,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=func,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def _with_window_function(self, window_function: DuckDBWindowFunction) -> Self:
+ return self.__class__(
+ self._call,
+ window_function,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ @classmethod
+ def _alias_native(cls, expr: Expression, name: str) -> Expression:
+ return expr.alias(name)
+
+ def __and__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr & other, other)
+
+ def __or__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr | other, other)
+
+ def __add__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr + other, other)
+
+ def __truediv__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr / other, other)
+
+ def __rtruediv__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(
+ lambda expr, other: other.__truediv__(expr), other
+ ).alias("literal")
+
+ def __floordiv__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr // other, other)
+
+ def __rfloordiv__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(
+ lambda expr, other: other.__floordiv__(expr), other
+ ).alias("literal")
+
+ def __mod__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr % other, other)
+
+ def __rmod__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: other.__mod__(expr), other).alias(
+ "literal"
+ )
+
+ def __sub__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr - other, other)
+
+ def __rsub__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: other.__sub__(expr), other).alias(
+ "literal"
+ )
+
+ def __mul__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr * other, other)
+
+ def __pow__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr**other, other)
+
+ def __rpow__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: other.__pow__(expr), other).alias(
+ "literal"
+ )
+
+ def __lt__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr < other, other)
+
+ def __gt__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr > other, other)
+
+ def __le__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr <= other, other)
+
+ def __ge__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr >= other, other)
+
+ def __eq__(self, other: DuckDBExpr) -> Self: # type: ignore[override]
+ return self._with_binary(lambda expr, other: expr == other, other)
+
+ def __ne__(self, other: DuckDBExpr) -> Self: # type: ignore[override]
+ return self._with_binary(lambda expr, other: expr != other, other)
+
+ def __invert__(self) -> Self:
+ invert = cast("Callable[..., Expression]", operator.invert)
+ return self._with_elementwise(invert)
+
+ def abs(self) -> Self:
+ return self._with_elementwise(lambda expr: FunctionExpression("abs", expr))
+
+ def mean(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("mean", expr))
+
+ def skew(self) -> Self:
+ def func(expr: Expression) -> Expression:
+ count = FunctionExpression("count", expr)
+ # Adjust population skewness by correction factor to get sample skewness
+ sample_skewness = (
+ FunctionExpression("skewness", expr)
+ * (count - lit(2))
+ / FunctionExpression("sqrt", count * (count - lit(1)))
+ )
+ return when(count == lit(0), lit(None)).otherwise(
+ when(count == lit(1), lit(float("nan"))).otherwise(
+ when(count == lit(2), lit(0.0)).otherwise(sample_skewness)
+ )
+ )
+
+ return self._with_callable(func)
+
+ def median(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("median", expr))
+
+ def all(self) -> Self:
+ def f(expr: Expression) -> Expression:
+ return CoalesceOperator(FunctionExpression("bool_and", expr), lit(True)) # noqa: FBT003
+
+ def window_f(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ pb = generate_partition_by_sql(*window_inputs.partition_by)
+ return [
+ CoalesceOperator(
+ SQLExpression(f"{FunctionExpression('bool_and', expr)} over ({pb})"),
+ lit(True), # noqa: FBT003
+ )
+ for expr in self(df)
+ ]
+
+ return self._with_callable(f)._with_window_function(window_f)
+
+ def any(self) -> Self:
+ def f(expr: Expression) -> Expression:
+ return CoalesceOperator(FunctionExpression("bool_or", expr), lit(False)) # noqa: FBT003
+
+ def window_f(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ pb = generate_partition_by_sql(*window_inputs.partition_by)
+ return [
+ CoalesceOperator(
+ SQLExpression(f"{FunctionExpression('bool_or', expr)} over ({pb})"),
+ lit(False), # noqa: FBT003
+ )
+ for expr in self(df)
+ ]
+
+ return self._with_callable(f)._with_window_function(window_f)
+
+ def quantile(
+ self, quantile: float, interpolation: RollingInterpolationMethod
+ ) -> Self:
+ def func(expr: Expression) -> Expression:
+ if interpolation == "linear":
+ return FunctionExpression("quantile_cont", expr, lit(quantile))
+ msg = "Only linear interpolation methods are supported for DuckDB quantile."
+ raise NotImplementedError(msg)
+
+ return self._with_callable(func)
+
+ def clip(
+ self,
+ lower_bound: Self | NumericLiteral | TemporalLiteral | None,
+ upper_bound: Self | NumericLiteral | TemporalLiteral | None,
+ ) -> Self:
+ def _clip_lower(expr: Expression, lower_bound: Any) -> Expression:
+ return FunctionExpression("greatest", expr, lower_bound)
+
+ def _clip_upper(expr: Expression, upper_bound: Any) -> Expression:
+ return FunctionExpression("least", expr, upper_bound)
+
+ def _clip_both(
+ expr: Expression, lower_bound: Any, upper_bound: Any
+ ) -> Expression:
+ return FunctionExpression(
+ "greatest", FunctionExpression("least", expr, upper_bound), lower_bound
+ )
+
+ if lower_bound is None:
+ return self._with_elementwise(_clip_upper, upper_bound=upper_bound)
+ if upper_bound is None:
+ return self._with_elementwise(_clip_lower, lower_bound=lower_bound)
+ return self._with_elementwise(
+ _clip_both, lower_bound=lower_bound, upper_bound=upper_bound
+ )
+
+ def sum(self) -> Self:
+ def f(expr: Expression) -> Expression:
+ return CoalesceOperator(FunctionExpression("sum", expr), lit(0))
+
+ def window_f(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ pb = generate_partition_by_sql(*window_inputs.partition_by)
+ return [
+ CoalesceOperator(
+ SQLExpression(f"{FunctionExpression('sum', expr)} over ({pb})"),
+ lit(0),
+ )
+ for expr in self(df)
+ ]
+
+ return self._with_callable(f)._with_window_function(window_f)
+
+ def n_unique(self) -> Self:
+ def func(expr: Expression) -> Expression:
+ # https://stackoverflow.com/a/79338887/4451315
+ return FunctionExpression(
+ "array_unique", FunctionExpression("array_agg", expr)
+ ) + FunctionExpression(
+ "max", when(expr.isnotnull(), lit(0)).otherwise(lit(1))
+ )
+
+ return self._with_callable(func)
+
+ def count(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("count", expr))
+
+ def len(self) -> Self:
+ return self._with_callable(lambda _expr: FunctionExpression("count"))
+
+ def std(self, ddof: int) -> Self:
+ if ddof == 0:
+ return self._with_callable(
+ lambda expr: FunctionExpression("stddev_pop", expr)
+ )
+ if ddof == 1:
+ return self._with_callable(
+ lambda expr: FunctionExpression("stddev_samp", expr)
+ )
+
+ def _std(expr: Expression) -> Expression:
+ n_samples = FunctionExpression("count", expr)
+ return (
+ FunctionExpression("stddev_pop", expr)
+ * FunctionExpression("sqrt", n_samples)
+ / (FunctionExpression("sqrt", (n_samples - lit(ddof))))
+ )
+
+ return self._with_callable(_std)
+
+ def var(self, ddof: int) -> Self:
+ if ddof == 0:
+ return self._with_callable(lambda expr: FunctionExpression("var_pop", expr))
+ if ddof == 1:
+ return self._with_callable(lambda expr: FunctionExpression("var_samp", expr))
+
+ def _var(expr: Expression) -> Expression:
+ n_samples = FunctionExpression("count", expr)
+ return (
+ FunctionExpression("var_pop", expr) * n_samples / (n_samples - lit(ddof))
+ )
+
+ return self._with_callable(_var)
+
+ def max(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("max", expr))
+
+ def min(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("min", expr))
+
+ def null_count(self) -> Self:
+ return self._with_callable(
+ lambda expr: FunctionExpression("sum", expr.isnull().cast("int"))
+ )
+
+ @requires.backend_version((1, 3))
+ def over(
+ self, partition_by: Sequence[str | Expression], order_by: Sequence[str]
+ ) -> Self:
+ def func(df: DuckDBLazyFrame) -> Sequence[Expression]:
+ return self.window_function(df, WindowInputs(partition_by, order_by))
+
+ return self.__class__(
+ func,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def is_null(self) -> Self:
+ return self._with_elementwise(lambda expr: expr.isnull())
+
+ def is_nan(self) -> Self:
+ return self._with_elementwise(lambda expr: FunctionExpression("isnan", expr))
+
+ def is_finite(self) -> Self:
+ return self._with_elementwise(lambda expr: FunctionExpression("isfinite", expr))
+
+ def is_in(self, other: Sequence[Any]) -> Self:
+ return self._with_elementwise(
+ lambda expr: FunctionExpression("contains", lit(other), expr)
+ )
+
+ def round(self, decimals: int) -> Self:
+ return self._with_elementwise(
+ lambda expr: FunctionExpression("round", expr, lit(decimals))
+ )
+
+ @requires.backend_version((1, 3))
+ def shift(self, n: int) -> Self:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+ sql = f"lag({{expr}}, {n}) over ({partition_by_sql} {order_by_sql})"
+ return [SQLExpression(sql.format(expr=expr)) for expr in self(df)]
+
+ return self._with_window_function(func)
+
+ @requires.backend_version((1, 3))
+ def is_first_distinct(self) -> Self:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ if inputs.partition_by:
+ partition_by_sql = (
+ generate_partition_by_sql(*inputs.partition_by) + ", {expr}"
+ )
+ else:
+ partition_by_sql = "partition by {expr}"
+ sql = (
+ f"{FunctionExpression('row_number')} "
+ f"over({partition_by_sql} {order_by_sql})"
+ )
+ return [SQLExpression(sql.format(expr=expr)) == lit(1) for expr in self(df)]
+
+ return self._with_window_function(func)
+
+ @requires.backend_version((1, 3))
+ def is_last_distinct(self) -> Self:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=False)
+ if inputs.partition_by:
+ partition_by_sql = (
+ generate_partition_by_sql(*inputs.partition_by) + ", {expr}"
+ )
+ else:
+ partition_by_sql = "partition by {expr}"
+ sql = (
+ f"{FunctionExpression('row_number')} "
+ f"over({partition_by_sql} {order_by_sql})"
+ )
+ return [SQLExpression(sql.format(expr=expr)) == lit(1) for expr in self(df)]
+
+ return self._with_window_function(func)
+
+ @requires.backend_version((1, 3))
+ def diff(self) -> Self:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+ sql = f"lag({{expr}}) over ({partition_by_sql} {order_by_sql})"
+ return [expr - SQLExpression(sql.format(expr=expr)) for expr in self(df)]
+
+ return self._with_window_function(func)
+
+ @requires.backend_version((1, 3))
+ def cum_sum(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="sum")
+ )
+
+ @requires.backend_version((1, 3))
+ def cum_max(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="max")
+ )
+
+ @requires.backend_version((1, 3))
+ def cum_min(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="min")
+ )
+
+ @requires.backend_version((1, 3))
+ def cum_count(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="count")
+ )
+
+ @requires.backend_version((1, 3))
+ def cum_prod(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="product")
+ )
+
+ @requires.backend_version((1, 3))
+ def rolling_sum(self, window_size: int, *, min_samples: int, center: bool) -> Self:
+ return self._with_window_function(
+ self._rolling_window_func(
+ func_name="sum",
+ center=center,
+ window_size=window_size,
+ min_samples=min_samples,
+ )
+ )
+
+ @requires.backend_version((1, 3))
+ def rolling_mean(self, window_size: int, *, min_samples: int, center: bool) -> Self:
+ return self._with_window_function(
+ self._rolling_window_func(
+ func_name="mean",
+ center=center,
+ window_size=window_size,
+ min_samples=min_samples,
+ )
+ )
+
+ @requires.backend_version((1, 3))
+ def rolling_var(
+ self, window_size: int, *, min_samples: int, center: bool, ddof: int
+ ) -> Self:
+ return self._with_window_function(
+ self._rolling_window_func(
+ func_name="var",
+ center=center,
+ window_size=window_size,
+ min_samples=min_samples,
+ ddof=ddof,
+ )
+ )
+
+ @requires.backend_version((1, 3))
+ def rolling_std(
+ self, window_size: int, *, min_samples: int, center: bool, ddof: int
+ ) -> Self:
+ return self._with_window_function(
+ self._rolling_window_func(
+ func_name="std",
+ center=center,
+ window_size=window_size,
+ min_samples=min_samples,
+ ddof=ddof,
+ )
+ )
+
+ def fill_null(
+ self,
+ value: Self | NonNestedLiteral,
+ strategy: FillNullStrategy | None,
+ limit: int | None,
+ ) -> Self:
+ if strategy is not None:
+ if self._backend_version < (1, 3): # pragma: no cover
+ msg = f"`fill_null` with `strategy={strategy}` is only available in 'duckdb>=1.3.0'."
+ raise NotImplementedError(msg)
+
+ def _fill_with_strategy(
+ df: DuckDBLazyFrame, inputs: DuckDBWindowInputs
+ ) -> Sequence[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+
+ fill_func = "last_value" if strategy == "forward" else "first_value"
+ _limit = "unbounded" if limit is None else limit
+ rows_between = (
+ f"{_limit} preceding and current row"
+ if strategy == "forward"
+ else f"current row and {_limit} following"
+ )
+ sql = (
+ f"{fill_func}({{expr}} ignore nulls) over "
+ f"({partition_by_sql} {order_by_sql} rows between {rows_between})"
+ )
+ return [SQLExpression(sql.format(expr=expr)) for expr in self(df)]
+
+ return self._with_window_function(_fill_with_strategy)
+
+ def _fill_constant(expr: Expression, value: Any) -> Expression:
+ return CoalesceOperator(expr, value)
+
+ return self._with_elementwise(_fill_constant, value=value)
+
+ def cast(self, dtype: IntoDType) -> Self:
+ def func(expr: Expression) -> Expression:
+ native_dtype = narwhals_to_native_dtype(dtype, self._version)
+ return expr.cast(DuckDBPyType(native_dtype))
+
+ return self._with_elementwise(func)
+
+ @requires.backend_version((1, 3))
+ def is_unique(self) -> Self:
+ def _is_unique(expr: Expression, *partition_by: str | Expression) -> Expression:
+ pb = generate_partition_by_sql(expr, *partition_by)
+ sql = f"{FunctionExpression('count', col('*'))} over ({pb})"
+ return SQLExpression(sql) == lit(1)
+
+ def _unpartitioned_is_unique(expr: Expression) -> Expression:
+ return _is_unique(expr)
+
+ def _partitioned_is_unique(
+ df: DuckDBLazyFrame, inputs: DuckDBWindowInputs
+ ) -> Sequence[Expression]:
+ assert not inputs.order_by # noqa: S101
+ return [_is_unique(expr, *inputs.partition_by) for expr in self(df)]
+
+ return self._with_callable(_unpartitioned_is_unique)._with_window_function(
+ _partitioned_is_unique
+ )
+
+ @requires.backend_version((1, 3))
+ def rank(self, method: RankMethod, *, descending: bool) -> Self:
+ if method in {"min", "max", "average"}:
+ func = FunctionExpression("rank")
+ elif method == "dense":
+ func = FunctionExpression("dense_rank")
+ else: # method == "ordinal"
+ func = FunctionExpression("row_number")
+
+ def _rank(
+ expr: Expression,
+ *,
+ descending: bool,
+ partition_by: Sequence[str | Expression] | None = None,
+ ) -> Expression:
+ order_by_sql = (
+ f"order by {expr} desc nulls last"
+ if descending
+ else f"order by {expr} asc nulls last"
+ )
+ count_expr = FunctionExpression("count", StarExpression())
+ if partition_by is not None:
+ window = f"{generate_partition_by_sql(*partition_by)} {order_by_sql}"
+ count_window = f"{generate_partition_by_sql(*partition_by, expr)}"
+ else:
+ window = order_by_sql
+ count_window = generate_partition_by_sql(expr)
+ if method == "max":
+ rank_expr = (
+ SQLExpression(f"{func} OVER ({window})")
+ + SQLExpression(f"{count_expr} over ({count_window})")
+ - lit(1)
+ )
+ elif method == "average":
+ rank_expr = SQLExpression(f"{func} OVER ({window})") + (
+ SQLExpression(f"{count_expr} over ({count_window})") - lit(1)
+ ) / lit(2.0)
+ else:
+ rank_expr = SQLExpression(f"{func} OVER ({window})")
+ return when(expr.isnotnull(), rank_expr)
+
+ def _unpartitioned_rank(expr: Expression) -> Expression:
+ return _rank(expr, descending=descending)
+
+ def _partitioned_rank(
+ df: DuckDBLazyFrame, inputs: DuckDBWindowInputs
+ ) -> Sequence[Expression]:
+ assert not inputs.order_by # noqa: S101
+ return [
+ _rank(expr, descending=descending, partition_by=inputs.partition_by)
+ for expr in self(df)
+ ]
+
+ return self._with_callable(_unpartitioned_rank)._with_window_function(
+ _partitioned_rank
+ )
+
+ def log(self, base: float) -> Self:
+ def _log(expr: Expression) -> Expression:
+ log = FunctionExpression("log", expr)
+ return (
+ when(expr < lit(0), lit(float("nan")))
+ .when(expr == lit(0), lit(float("-inf")))
+ .otherwise(log / FunctionExpression("log", lit(base)))
+ )
+
+ return self._with_elementwise(_log)
+
+ def exp(self) -> Self:
+ def _exp(expr: Expression) -> Expression:
+ return FunctionExpression("exp", expr)
+
+ return self._with_elementwise(_exp)
+
+ @property
+ def str(self) -> DuckDBExprStringNamespace:
+ return DuckDBExprStringNamespace(self)
+
+ @property
+ def dt(self) -> DuckDBExprDateTimeNamespace:
+ return DuckDBExprDateTimeNamespace(self)
+
+ @property
+ def list(self) -> DuckDBExprListNamespace:
+ return DuckDBExprListNamespace(self)
+
+ @property
+ def struct(self) -> DuckDBExprStructNamespace:
+ return DuckDBExprStructNamespace(self)
+
+ drop_nulls = not_implemented()
+ unique = not_implemented()
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_dt.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_dt.py
new file mode 100644
index 0000000..68f0f9b
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_dt.py
@@ -0,0 +1,160 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Sequence
+
+from duckdb import FunctionExpression
+
+from narwhals._duckdb.utils import UNITS_DICT, fetch_rel_time_zone, lit
+from narwhals._duration import parse_interval_string
+from narwhals._utils import not_implemented
+
+if TYPE_CHECKING:
+ from duckdb import Expression
+
+ from narwhals._duckdb.dataframe import DuckDBLazyFrame
+ from narwhals._duckdb.expr import DuckDBExpr
+
+
+class DuckDBExprDateTimeNamespace:
+ def __init__(self, expr: DuckDBExpr) -> None:
+ self._compliant_expr = expr
+
+ def year(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("year", expr)
+ )
+
+ def month(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("month", expr)
+ )
+
+ def day(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("day", expr)
+ )
+
+ def hour(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("hour", expr)
+ )
+
+ def minute(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("minute", expr)
+ )
+
+ def second(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("second", expr)
+ )
+
+ def millisecond(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("millisecond", expr)
+ - FunctionExpression("second", expr) * lit(1_000)
+ )
+
+ def microsecond(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("microsecond", expr)
+ - FunctionExpression("second", expr) * lit(1_000_000)
+ )
+
+ def nanosecond(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("nanosecond", expr)
+ - FunctionExpression("second", expr) * lit(1_000_000_000)
+ )
+
+ def to_string(self, format: str) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("strftime", expr, lit(format))
+ )
+
+ def weekday(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("isodow", expr)
+ )
+
+ def ordinal_day(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("dayofyear", expr)
+ )
+
+ def date(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(lambda expr: expr.cast("date"))
+
+ def total_minutes(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("datepart", lit("minute"), expr)
+ )
+
+ def total_seconds(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: lit(60) * FunctionExpression("datepart", lit("minute"), expr)
+ + FunctionExpression("datepart", lit("second"), expr)
+ )
+
+ def total_milliseconds(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: lit(60_000) * FunctionExpression("datepart", lit("minute"), expr)
+ + FunctionExpression("datepart", lit("millisecond"), expr)
+ )
+
+ def total_microseconds(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: lit(60_000_000)
+ * FunctionExpression("datepart", lit("minute"), expr)
+ + FunctionExpression("datepart", lit("microsecond"), expr)
+ )
+
+ def truncate(self, every: str) -> DuckDBExpr:
+ multiple, unit = parse_interval_string(every)
+ if multiple != 1:
+ # https://github.com/duckdb/duckdb/issues/17554
+ msg = f"Only multiple 1 is currently supported for DuckDB.\nGot {multiple!s}."
+ raise ValueError(msg)
+ if unit == "ns":
+ msg = "Truncating to nanoseconds is not yet supported for DuckDB."
+ raise NotImplementedError(msg)
+ format = lit(UNITS_DICT[unit])
+
+ def _truncate(expr: Expression) -> Expression:
+ return FunctionExpression("date_trunc", format, expr)
+
+ return self._compliant_expr._with_callable(_truncate)
+
+ def _no_op_time_zone(self, time_zone: str) -> DuckDBExpr:
+ def func(df: DuckDBLazyFrame) -> Sequence[Expression]:
+ native_series_list = self._compliant_expr(df)
+ conn_time_zone = fetch_rel_time_zone(df.native)
+ if conn_time_zone != time_zone:
+ msg = (
+ "DuckDB stores the time zone in the connection, rather than in the "
+ f"data type, so changing the timezone to anything other than {conn_time_zone} "
+ " (the current connection time zone) is not supported."
+ )
+ raise NotImplementedError(msg)
+ return native_series_list
+
+ return self._compliant_expr.__class__(
+ func,
+ evaluate_output_names=self._compliant_expr._evaluate_output_names,
+ alias_output_names=self._compliant_expr._alias_output_names,
+ backend_version=self._compliant_expr._backend_version,
+ version=self._compliant_expr._version,
+ )
+
+ def convert_time_zone(self, time_zone: str) -> DuckDBExpr:
+ return self._no_op_time_zone(time_zone)
+
+ def replace_time_zone(self, time_zone: str | None) -> DuckDBExpr:
+ if time_zone is None:
+ return self._compliant_expr._with_callable(
+ lambda _input: _input.cast("timestamp")
+ )
+ else:
+ return self._no_op_time_zone(time_zone)
+
+ total_nanoseconds = not_implemented()
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_list.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_list.py
new file mode 100644
index 0000000..60562e5
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_list.py
@@ -0,0 +1,18 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from duckdb import FunctionExpression
+
+if TYPE_CHECKING:
+ from narwhals._duckdb.expr import DuckDBExpr
+
+
+class DuckDBExprListNamespace:
+ def __init__(self, expr: DuckDBExpr) -> None:
+ self._compliant_expr = expr
+
+ def len(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("len", expr)
+ )
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_str.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_str.py
new file mode 100644
index 0000000..6a9d8c2
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_str.py
@@ -0,0 +1,103 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from duckdb import FunctionExpression
+
+from narwhals._duckdb.utils import lit
+from narwhals._utils import not_implemented
+
+if TYPE_CHECKING:
+ from duckdb import Expression
+
+ from narwhals._duckdb.expr import DuckDBExpr
+
+
+class DuckDBExprStringNamespace:
+ def __init__(self, expr: DuckDBExpr) -> None:
+ self._compliant_expr = expr
+
+ def starts_with(self, prefix: str) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("starts_with", expr, lit(prefix))
+ )
+
+ def ends_with(self, suffix: str) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("ends_with", expr, lit(suffix))
+ )
+
+ def contains(self, pattern: str, *, literal: bool) -> DuckDBExpr:
+ def func(expr: Expression) -> Expression:
+ if literal:
+ return FunctionExpression("contains", expr, lit(pattern))
+ return FunctionExpression("regexp_matches", expr, lit(pattern))
+
+ return self._compliant_expr._with_callable(func)
+
+ def slice(self, offset: int, length: int) -> DuckDBExpr:
+ def func(expr: Expression) -> Expression:
+ offset_lit = lit(offset)
+ return FunctionExpression(
+ "array_slice",
+ expr,
+ lit(offset + 1)
+ if offset >= 0
+ else FunctionExpression("length", expr) + offset_lit + lit(1),
+ FunctionExpression("length", expr)
+ if length is None
+ else lit(length) + offset_lit,
+ )
+
+ return self._compliant_expr._with_callable(func)
+
+ def split(self, by: str) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("str_split", expr, lit(by))
+ )
+
+ def len_chars(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("length", expr)
+ )
+
+ def to_lowercase(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("lower", expr)
+ )
+
+ def to_uppercase(self) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("upper", expr)
+ )
+
+ def strip_chars(self, characters: str | None) -> DuckDBExpr:
+ import string
+
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression(
+ "trim", expr, lit(string.whitespace if characters is None else characters)
+ )
+ )
+
+ def replace_all(self, pattern: str, value: str, *, literal: bool) -> DuckDBExpr:
+ if not literal:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression(
+ "regexp_replace", expr, lit(pattern), lit(value), lit("g")
+ )
+ )
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("replace", expr, lit(pattern), lit(value))
+ )
+
+ def to_datetime(self, format: str | None) -> DuckDBExpr:
+ if format is None:
+ msg = "Cannot infer format with DuckDB backend, please specify `format` explicitly."
+ raise NotImplementedError(msg)
+
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("strptime", expr, lit(format))
+ )
+
+ replace = not_implemented()
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_struct.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_struct.py
new file mode 100644
index 0000000..3124204
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_struct.py
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from duckdb import FunctionExpression
+
+from narwhals._duckdb.utils import lit
+
+if TYPE_CHECKING:
+ from narwhals._duckdb.expr import DuckDBExpr
+
+
+class DuckDBExprStructNamespace:
+ def __init__(self, expr: DuckDBExpr) -> None:
+ self._compliant_expr = expr
+
+ def field(self, name: str) -> DuckDBExpr:
+ return self._compliant_expr._with_callable(
+ lambda expr: FunctionExpression("struct_extract", expr, lit(name))
+ ).alias(name)
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/group_by.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/group_by.py
new file mode 100644
index 0000000..8fa2978
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/group_by.py
@@ -0,0 +1,31 @@
+from __future__ import annotations
+
+from itertools import chain
+from typing import TYPE_CHECKING, Sequence
+
+from narwhals._compliant import LazyGroupBy
+
+if TYPE_CHECKING:
+ from duckdb import Expression # noqa: F401
+
+ from narwhals._duckdb.dataframe import DuckDBLazyFrame
+ from narwhals._duckdb.expr import DuckDBExpr
+
+
+class DuckDBGroupBy(LazyGroupBy["DuckDBLazyFrame", "DuckDBExpr", "Expression"]):
+ def __init__(
+ self,
+ df: DuckDBLazyFrame,
+ keys: Sequence[DuckDBExpr] | Sequence[str],
+ /,
+ *,
+ drop_null_keys: bool,
+ ) -> None:
+ frame, self._keys, self._output_key_names = self._parse_keys(df, keys=keys)
+ self._compliant_frame = frame.drop_nulls(self._keys) if drop_null_keys else frame
+
+ def agg(self, *exprs: DuckDBExpr) -> DuckDBLazyFrame:
+ agg_columns = list(chain(self._keys, self._evaluate_exprs(exprs)))
+ return self.compliant._with_native(
+ self.compliant.native.aggregate(agg_columns) # type: ignore[arg-type]
+ ).rename(dict(zip(self._keys, self._output_key_names)))
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/namespace.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/namespace.py
new file mode 100644
index 0000000..3616f2e
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/namespace.py
@@ -0,0 +1,207 @@
+from __future__ import annotations
+
+import operator
+from functools import reduce
+from itertools import chain
+from typing import TYPE_CHECKING, Callable, Iterable, Sequence
+
+import duckdb
+from duckdb import CoalesceOperator, Expression, FunctionExpression
+from duckdb.typing import BIGINT, VARCHAR
+
+from narwhals._compliant import LazyNamespace, LazyThen, LazyWhen
+from narwhals._duckdb.dataframe import DuckDBLazyFrame
+from narwhals._duckdb.expr import DuckDBExpr
+from narwhals._duckdb.selectors import DuckDBSelectorNamespace
+from narwhals._duckdb.utils import concat_str, lit, narwhals_to_native_dtype, when
+from narwhals._expression_parsing import (
+ combine_alias_output_names,
+ combine_evaluate_output_names,
+)
+from narwhals._utils import Implementation
+
+if TYPE_CHECKING:
+ from narwhals._duckdb.expr import DuckDBWindowInputs
+ from narwhals._utils import Version
+ from narwhals.typing import ConcatMethod, IntoDType, NonNestedLiteral
+
+
+class DuckDBNamespace(
+ LazyNamespace[DuckDBLazyFrame, DuckDBExpr, duckdb.DuckDBPyRelation]
+):
+ _implementation: Implementation = Implementation.DUCKDB
+
+ def __init__(self, *, backend_version: tuple[int, ...], version: Version) -> None:
+ self._backend_version = backend_version
+ self._version = version
+
+ @property
+ def selectors(self) -> DuckDBSelectorNamespace:
+ return DuckDBSelectorNamespace.from_namespace(self)
+
+ @property
+ def _expr(self) -> type[DuckDBExpr]:
+ return DuckDBExpr
+
+ @property
+ def _lazyframe(self) -> type[DuckDBLazyFrame]:
+ return DuckDBLazyFrame
+
+ def _with_elementwise(
+ self, func: Callable[[Iterable[Expression]], Expression], *exprs: DuckDBExpr
+ ) -> DuckDBExpr:
+ def call(df: DuckDBLazyFrame) -> list[Expression]:
+ cols = (col for _expr in exprs for col in _expr(df))
+ return [func(cols)]
+
+ def window_function(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ cols = (
+ col for _expr in exprs for col in _expr.window_function(df, window_inputs)
+ )
+ return [func(cols)]
+
+ return self._expr(
+ call=call,
+ window_function=window_function,
+ evaluate_output_names=combine_evaluate_output_names(*exprs),
+ alias_output_names=combine_alias_output_names(*exprs),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def concat(
+ self, items: Iterable[DuckDBLazyFrame], *, how: ConcatMethod
+ ) -> DuckDBLazyFrame:
+ native_items = [item._native_frame for item in items]
+ items = list(items)
+ first = items[0]
+ schema = first.schema
+ if how == "vertical" and not all(x.schema == schema for x in items[1:]):
+ msg = "inputs should all have the same schema"
+ raise TypeError(msg)
+ res = reduce(lambda x, y: x.union(y), native_items)
+ return first._with_native(res)
+
+ def concat_str(
+ self, *exprs: DuckDBExpr, separator: str, ignore_nulls: bool
+ ) -> DuckDBExpr:
+ def func(df: DuckDBLazyFrame) -> list[Expression]:
+ cols = list(chain.from_iterable(expr(df) for expr in exprs))
+ if not ignore_nulls:
+ null_mask_result = reduce(operator.or_, (s.isnull() for s in cols))
+ cols_separated = [
+ y
+ for x in [
+ (col.cast(VARCHAR),)
+ if i == len(cols) - 1
+ else (col.cast(VARCHAR), lit(separator))
+ for i, col in enumerate(cols)
+ ]
+ for y in x
+ ]
+ return [when(~null_mask_result, concat_str(*cols_separated))]
+ else:
+ return [concat_str(*cols, separator=separator)]
+
+ return self._expr(
+ call=func,
+ evaluate_output_names=combine_evaluate_output_names(*exprs),
+ alias_output_names=combine_alias_output_names(*exprs),
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def all_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr:
+ def func(cols: Iterable[Expression]) -> Expression:
+ return reduce(operator.and_, cols)
+
+ return self._with_elementwise(func, *exprs)
+
+ def any_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr:
+ def func(cols: Iterable[Expression]) -> Expression:
+ return reduce(operator.or_, cols)
+
+ return self._with_elementwise(func, *exprs)
+
+ def max_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr:
+ def func(cols: Iterable[Expression]) -> Expression:
+ return FunctionExpression("greatest", *cols)
+
+ return self._with_elementwise(func, *exprs)
+
+ def min_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr:
+ def func(cols: Iterable[Expression]) -> Expression:
+ return FunctionExpression("least", *cols)
+
+ return self._with_elementwise(func, *exprs)
+
+ def sum_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr:
+ def func(cols: Iterable[Expression]) -> Expression:
+ return reduce(operator.add, (CoalesceOperator(col, lit(0)) for col in cols))
+
+ return self._with_elementwise(func, *exprs)
+
+ def mean_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr:
+ def func(cols: Iterable[Expression]) -> Expression:
+ cols = list(cols)
+ return reduce(
+ operator.add, (CoalesceOperator(col, lit(0)) for col in cols)
+ ) / reduce(operator.add, (col.isnotnull().cast(BIGINT) for col in cols))
+
+ return self._with_elementwise(func, *exprs)
+
+ def when(self, predicate: DuckDBExpr) -> DuckDBWhen:
+ return DuckDBWhen.from_expr(predicate, context=self)
+
+ def lit(self, value: NonNestedLiteral, dtype: IntoDType | None) -> DuckDBExpr:
+ def func(_df: DuckDBLazyFrame) -> list[Expression]:
+ if dtype is not None:
+ return [
+ lit(value).cast(
+ narwhals_to_native_dtype(dtype, version=self._version) # type: ignore[arg-type]
+ )
+ ]
+ return [lit(value)]
+
+ return self._expr(
+ func,
+ evaluate_output_names=lambda _df: ["literal"],
+ alias_output_names=None,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def len(self) -> DuckDBExpr:
+ def func(_df: DuckDBLazyFrame) -> list[Expression]:
+ return [FunctionExpression("count")]
+
+ return self._expr(
+ call=func,
+ evaluate_output_names=lambda _df: ["len"],
+ alias_output_names=None,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+
+class DuckDBWhen(LazyWhen["DuckDBLazyFrame", Expression, DuckDBExpr]):
+ @property
+ def _then(self) -> type[DuckDBThen]:
+ return DuckDBThen
+
+ def __call__(self, df: DuckDBLazyFrame) -> Sequence[Expression]:
+ self.when = when
+ self.lit = lit
+ return super().__call__(df)
+
+ def _window_function(
+ self, df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> Sequence[Expression]:
+ self.when = when
+ self.lit = lit
+ return super()._window_function(df, window_inputs)
+
+
+class DuckDBThen(LazyThen["DuckDBLazyFrame", Expression, DuckDBExpr], DuckDBExpr): ...
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/selectors.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/selectors.py
new file mode 100644
index 0000000..ea1c6ba
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/selectors.py
@@ -0,0 +1,31 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from narwhals._compliant import CompliantSelector, LazySelectorNamespace
+from narwhals._duckdb.expr import DuckDBExpr
+
+if TYPE_CHECKING:
+ from duckdb import Expression # noqa: F401
+
+ from narwhals._duckdb.dataframe import DuckDBLazyFrame # noqa: F401
+
+
+class DuckDBSelectorNamespace(LazySelectorNamespace["DuckDBLazyFrame", "Expression"]):
+ @property
+ def _selector(self) -> type[DuckDBSelector]:
+ return DuckDBSelector
+
+
+class DuckDBSelector( # type: ignore[misc]
+ CompliantSelector["DuckDBLazyFrame", "Expression"], DuckDBExpr
+):
+ def _to_expr(self) -> DuckDBExpr:
+ return DuckDBExpr(
+ self._call,
+ self._window_function,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/series.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/series.py
new file mode 100644
index 0000000..5b284b9
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/series.py
@@ -0,0 +1,44 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from narwhals._duckdb.utils import DeferredTimeZone, native_to_narwhals_dtype
+from narwhals.dependencies import get_duckdb
+
+if TYPE_CHECKING:
+ from types import ModuleType
+
+ import duckdb
+ from typing_extensions import Never, Self
+
+ from narwhals._utils import Version
+ from narwhals.dtypes import DType
+
+
+class DuckDBInterchangeSeries:
+ def __init__(self, df: duckdb.DuckDBPyRelation, version: Version) -> None:
+ self._native_series = df
+ self._version = version
+
+ def __narwhals_series__(self) -> Self:
+ return self
+
+ def __native_namespace__(self) -> ModuleType:
+ return get_duckdb() # type: ignore[no-any-return]
+
+ @property
+ def dtype(self) -> DType:
+ return native_to_narwhals_dtype(
+ self._native_series.types[0],
+ self._version,
+ DeferredTimeZone(self._native_series),
+ )
+
+ def __getattr__(self, attr: str) -> Never:
+ msg = ( # pragma: no cover
+ f"Attribute {attr} is not supported for interchange-level dataframes.\n\n"
+ "If you would like to see this kind of object better supported in "
+ "Narwhals, please open a feature request "
+ "at https://github.com/narwhals-dev/narwhals/issues."
+ )
+ raise NotImplementedError(msg) # pragma: no cover
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/utils.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/utils.py
new file mode 100644
index 0000000..c5d4872
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/utils.py
@@ -0,0 +1,287 @@
+from __future__ import annotations
+
+from functools import lru_cache
+from typing import TYPE_CHECKING, Any
+
+import duckdb
+
+from narwhals._utils import Version, isinstance_or_issubclass
+
+if TYPE_CHECKING:
+ from duckdb import DuckDBPyRelation, Expression
+ from duckdb.typing import DuckDBPyType
+
+ from narwhals._duckdb.dataframe import DuckDBLazyFrame
+ from narwhals._duckdb.expr import DuckDBExpr
+ from narwhals.dtypes import DType
+ from narwhals.typing import IntoDType
+
+UNITS_DICT = {
+ "y": "year",
+ "q": "quarter",
+ "mo": "month",
+ "d": "day",
+ "h": "hour",
+ "m": "minute",
+ "s": "second",
+ "ms": "millisecond",
+ "us": "microsecond",
+ "ns": "nanosecond",
+}
+
+col = duckdb.ColumnExpression
+"""Alias for `duckdb.ColumnExpression`."""
+
+lit = duckdb.ConstantExpression
+"""Alias for `duckdb.ConstantExpression`."""
+
+when = duckdb.CaseExpression
+"""Alias for `duckdb.CaseExpression`."""
+
+
+def concat_str(*exprs: Expression, separator: str = "") -> Expression:
+ """Concatenate many strings, NULL inputs are skipped.
+
+ Wraps [concat] and [concat_ws] `FunctionExpression`(s).
+
+ Arguments:
+ exprs: Native columns.
+ separator: String that will be used to separate the values of each column.
+
+ Returns:
+ A new native expression.
+
+ [concat]: https://duckdb.org/docs/stable/sql/functions/char.html#concatstring-
+ [concat_ws]: https://duckdb.org/docs/stable/sql/functions/char.html#concat_wsseparator-string-
+ """
+ return (
+ duckdb.FunctionExpression("concat_ws", lit(separator), *exprs)
+ if separator
+ else duckdb.FunctionExpression("concat", *exprs)
+ )
+
+
+def evaluate_exprs(
+ df: DuckDBLazyFrame, /, *exprs: DuckDBExpr
+) -> list[tuple[str, Expression]]:
+ native_results: list[tuple[str, Expression]] = []
+ for expr in exprs:
+ native_series_list = expr._call(df)
+ output_names = expr._evaluate_output_names(df)
+ if expr._alias_output_names is not None:
+ output_names = expr._alias_output_names(output_names)
+ if len(output_names) != len(native_series_list): # pragma: no cover
+ msg = f"Internal error: got output names {output_names}, but only got {len(native_series_list)} results"
+ raise AssertionError(msg)
+ native_results.extend(zip(output_names, native_series_list))
+ return native_results
+
+
+class DeferredTimeZone:
+ """Object which gets passed between `native_to_narwhals_dtype` calls.
+
+ DuckDB stores the time zone in the connection, rather than in the dtypes, so
+ this ensures that when calculating the schema of a dataframe with multiple
+ timezone-aware columns, that the connection's time zone is only fetched once.
+
+ Note: we cannot make the time zone a cached `DuckDBLazyFrame` property because
+ the time zone can be modified after `DuckDBLazyFrame` creation:
+
+ ```python
+ df = nw.from_native(rel)
+ print(df.collect_schema())
+ rel.query("set timezone = 'Asia/Kolkata'")
+ print(df.collect_schema()) # should change to reflect new time zone
+ ```
+ """
+
+ _cached_time_zone: str | None = None
+
+ def __init__(self, rel: DuckDBPyRelation) -> None:
+ self._rel = rel
+
+ @property
+ def time_zone(self) -> str:
+ """Fetch relation time zone (if it wasn't calculated already)."""
+ if self._cached_time_zone is None:
+ self._cached_time_zone = fetch_rel_time_zone(self._rel)
+ return self._cached_time_zone
+
+
+def native_to_narwhals_dtype(
+ duckdb_dtype: DuckDBPyType, version: Version, deferred_time_zone: DeferredTimeZone
+) -> DType:
+ duckdb_dtype_id = duckdb_dtype.id
+ dtypes = version.dtypes
+
+ # Handle nested data types first
+ if duckdb_dtype_id == "list":
+ return dtypes.List(
+ native_to_narwhals_dtype(duckdb_dtype.child, version, deferred_time_zone)
+ )
+
+ if duckdb_dtype_id == "struct":
+ children = duckdb_dtype.children
+ return dtypes.Struct(
+ [
+ dtypes.Field(
+ name=child[0],
+ dtype=native_to_narwhals_dtype(child[1], version, deferred_time_zone),
+ )
+ for child in children
+ ]
+ )
+
+ if duckdb_dtype_id == "array":
+ child, size = duckdb_dtype.children
+ shape: list[int] = [size[1]]
+
+ while child[1].id == "array":
+ child, size = child[1].children
+ shape.insert(0, size[1])
+
+ inner = native_to_narwhals_dtype(child[1], version, deferred_time_zone)
+ return dtypes.Array(inner=inner, shape=tuple(shape))
+
+ if duckdb_dtype_id == "enum":
+ if version is Version.V1:
+ return dtypes.Enum() # type: ignore[call-arg]
+ categories = duckdb_dtype.children[0][1]
+ return dtypes.Enum(categories=categories)
+
+ if duckdb_dtype_id == "timestamp with time zone":
+ return dtypes.Datetime(time_zone=deferred_time_zone.time_zone)
+
+ return _non_nested_native_to_narwhals_dtype(duckdb_dtype_id, version)
+
+
+def fetch_rel_time_zone(rel: duckdb.DuckDBPyRelation) -> str:
+ result = rel.query(
+ "duckdb_settings()", "select value from duckdb_settings() where name = 'TimeZone'"
+ ).fetchone()
+ assert result is not None # noqa: S101
+ return result[0] # type: ignore[no-any-return]
+
+
+@lru_cache(maxsize=16)
+def _non_nested_native_to_narwhals_dtype(duckdb_dtype_id: str, version: Version) -> DType:
+ dtypes = version.dtypes
+ return {
+ "hugeint": dtypes.Int128(),
+ "bigint": dtypes.Int64(),
+ "integer": dtypes.Int32(),
+ "smallint": dtypes.Int16(),
+ "tinyint": dtypes.Int8(),
+ "uhugeint": dtypes.UInt128(),
+ "ubigint": dtypes.UInt64(),
+ "uinteger": dtypes.UInt32(),
+ "usmallint": dtypes.UInt16(),
+ "utinyint": dtypes.UInt8(),
+ "double": dtypes.Float64(),
+ "float": dtypes.Float32(),
+ "varchar": dtypes.String(),
+ "date": dtypes.Date(),
+ "timestamp": dtypes.Datetime(),
+ "boolean": dtypes.Boolean(),
+ "interval": dtypes.Duration(),
+ "decimal": dtypes.Decimal(),
+ "time": dtypes.Time(),
+ "blob": dtypes.Binary(),
+ }.get(duckdb_dtype_id, dtypes.Unknown())
+
+
+def narwhals_to_native_dtype(dtype: IntoDType, version: Version) -> str: # noqa: C901, PLR0912, PLR0915
+ dtypes = version.dtypes
+ if isinstance_or_issubclass(dtype, dtypes.Decimal):
+ msg = "Casting to Decimal is not supported yet."
+ raise NotImplementedError(msg)
+ if isinstance_or_issubclass(dtype, dtypes.Float64):
+ return "DOUBLE"
+ if isinstance_or_issubclass(dtype, dtypes.Float32):
+ return "FLOAT"
+ if isinstance_or_issubclass(dtype, dtypes.Int128):
+ return "INT128"
+ if isinstance_or_issubclass(dtype, dtypes.Int64):
+ return "BIGINT"
+ if isinstance_or_issubclass(dtype, dtypes.Int32):
+ return "INTEGER"
+ if isinstance_or_issubclass(dtype, dtypes.Int16):
+ return "SMALLINT"
+ if isinstance_or_issubclass(dtype, dtypes.Int8):
+ return "TINYINT"
+ if isinstance_or_issubclass(dtype, dtypes.UInt128):
+ return "UINT128"
+ if isinstance_or_issubclass(dtype, dtypes.UInt64):
+ return "UBIGINT"
+ if isinstance_or_issubclass(dtype, dtypes.UInt32):
+ return "UINTEGER"
+ if isinstance_or_issubclass(dtype, dtypes.UInt16): # pragma: no cover
+ return "USMALLINT"
+ if isinstance_or_issubclass(dtype, dtypes.UInt8): # pragma: no cover
+ return "UTINYINT"
+ if isinstance_or_issubclass(dtype, dtypes.String):
+ return "VARCHAR"
+ if isinstance_or_issubclass(dtype, dtypes.Boolean): # pragma: no cover
+ return "BOOLEAN"
+ if isinstance_or_issubclass(dtype, dtypes.Time):
+ return "TIME"
+ if isinstance_or_issubclass(dtype, dtypes.Binary):
+ return "BLOB"
+ if isinstance_or_issubclass(dtype, dtypes.Categorical):
+ msg = "Categorical not supported by DuckDB"
+ raise NotImplementedError(msg)
+ if isinstance_or_issubclass(dtype, dtypes.Enum):
+ if version is Version.V1:
+ msg = "Converting to Enum is not supported in narwhals.stable.v1"
+ raise NotImplementedError(msg)
+ if isinstance(dtype, dtypes.Enum):
+ categories = "'" + "', '".join(dtype.categories) + "'"
+ return f"ENUM ({categories})"
+ msg = "Can not cast / initialize Enum without categories present"
+ raise ValueError(msg)
+
+ if isinstance_or_issubclass(dtype, dtypes.Datetime):
+ _time_unit = dtype.time_unit
+ _time_zone = dtype.time_zone
+ msg = "todo"
+ raise NotImplementedError(msg)
+ if isinstance_or_issubclass(dtype, dtypes.Duration): # pragma: no cover
+ _time_unit = dtype.time_unit
+ msg = "todo"
+ raise NotImplementedError(msg)
+ if isinstance_or_issubclass(dtype, dtypes.Date): # pragma: no cover
+ return "DATE"
+ if isinstance_or_issubclass(dtype, dtypes.List):
+ inner = narwhals_to_native_dtype(dtype.inner, version)
+ return f"{inner}[]"
+ if isinstance_or_issubclass(dtype, dtypes.Struct): # pragma: no cover
+ inner = ", ".join(
+ f'"{field.name}" {narwhals_to_native_dtype(field.dtype, version)}'
+ for field in dtype.fields
+ )
+ return f"STRUCT({inner})"
+ if isinstance_or_issubclass(dtype, dtypes.Array): # pragma: no cover
+ shape = dtype.shape
+ duckdb_shape_fmt = "".join(f"[{item}]" for item in shape)
+ inner_dtype: Any = dtype
+ for _ in shape:
+ inner_dtype = inner_dtype.inner
+ duckdb_inner = narwhals_to_native_dtype(inner_dtype, version)
+ return f"{duckdb_inner}{duckdb_shape_fmt}"
+ msg = f"Unknown dtype: {dtype}" # pragma: no cover
+ raise AssertionError(msg)
+
+
+def generate_partition_by_sql(*partition_by: str | Expression) -> str:
+ if not partition_by:
+ return ""
+ by_sql = ", ".join([f"{col(x) if isinstance(x, str) else x}" for x in partition_by])
+ return f"partition by {by_sql}"
+
+
+def generate_order_by_sql(*order_by: str, ascending: bool) -> str:
+ if ascending:
+ by_sql = ", ".join([f"{col(x)} asc nulls first" for x in order_by])
+ else:
+ by_sql = ", ".join([f"{col(x)} desc nulls last" for x in order_by])
+ return f"order by {by_sql}"