diff options
| author | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
|---|---|---|
| committer | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
| commit | 5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch) | |
| tree | 8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/narwhals/_duckdb | |
| parent | b832d364da8c2efe09e3f75828caf73c50d01ce3 (diff) | |
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/narwhals/_duckdb')
12 files changed, 2311 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/__init__.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/__init__.py diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/dataframe.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/dataframe.py new file mode 100644 index 0000000..6b4b197 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/dataframe.py @@ -0,0 +1,512 @@ +from __future__ import annotations + +import contextlib +from functools import reduce +from operator import and_ +from typing import TYPE_CHECKING, Any, Iterator, Mapping, Sequence + +import duckdb +from duckdb import FunctionExpression, StarExpression + +from narwhals._duckdb.utils import ( + DeferredTimeZone, + col, + evaluate_exprs, + generate_partition_by_sql, + lit, + native_to_narwhals_dtype, +) +from narwhals._utils import ( + Implementation, + Version, + generate_temporary_column_name, + not_implemented, + parse_columns_to_drop, + parse_version, + validate_backend_version, +) +from narwhals.dependencies import get_duckdb +from narwhals.exceptions import InvalidOperationError +from narwhals.typing import CompliantLazyFrame + +if TYPE_CHECKING: + from types import ModuleType + + import pandas as pd + import pyarrow as pa + from duckdb import Expression + from duckdb.typing import DuckDBPyType + from typing_extensions import Self, TypeIs + + from narwhals._compliant.typing import CompliantDataFrameAny + from narwhals._duckdb.expr import DuckDBExpr + from narwhals._duckdb.group_by import DuckDBGroupBy + from narwhals._duckdb.namespace import DuckDBNamespace + from narwhals._duckdb.series import DuckDBInterchangeSeries + from narwhals._utils import _FullContext + from narwhals.dataframe import LazyFrame + from narwhals.dtypes import DType + from narwhals.stable.v1 import DataFrame as DataFrameV1 + from narwhals.typing import AsofJoinStrategy, JoinStrategy, LazyUniqueKeepStrategy + +with contextlib.suppress(ImportError): # requires duckdb>=1.3.0 + from duckdb import SQLExpression + + +class DuckDBLazyFrame( + CompliantLazyFrame[ + "DuckDBExpr", + "duckdb.DuckDBPyRelation", + "LazyFrame[duckdb.DuckDBPyRelation] | DataFrameV1[duckdb.DuckDBPyRelation]", + ] +): + _implementation = Implementation.DUCKDB + + def __init__( + self, + df: duckdb.DuckDBPyRelation, + *, + backend_version: tuple[int, ...], + version: Version, + ) -> None: + self._native_frame: duckdb.DuckDBPyRelation = df + self._version = version + self._backend_version = backend_version + self._cached_native_schema: dict[str, DuckDBPyType] | None = None + self._cached_columns: list[str] | None = None + validate_backend_version(self._implementation, self._backend_version) + + @staticmethod + def _is_native(obj: duckdb.DuckDBPyRelation | Any) -> TypeIs[duckdb.DuckDBPyRelation]: + return isinstance(obj, duckdb.DuckDBPyRelation) + + @classmethod + def from_native( + cls, data: duckdb.DuckDBPyRelation, /, *, context: _FullContext + ) -> Self: + return cls( + data, backend_version=context._backend_version, version=context._version + ) + + def to_narwhals( + self, *args: Any, **kwds: Any + ) -> LazyFrame[duckdb.DuckDBPyRelation] | DataFrameV1[duckdb.DuckDBPyRelation]: + if self._version is Version.MAIN: + return self._version.lazyframe(self, level="lazy") + + from narwhals.stable.v1 import DataFrame as DataFrameV1 + + return DataFrameV1(self, level="interchange") # type: ignore[no-any-return] + + def __narwhals_dataframe__(self) -> Self: # pragma: no cover + # Keep around for backcompat. + if self._version is not Version.V1: + msg = "__narwhals_dataframe__ is not implemented for DuckDBLazyFrame" + raise AttributeError(msg) + return self + + def __narwhals_lazyframe__(self) -> Self: + return self + + def __native_namespace__(self) -> ModuleType: + return get_duckdb() # type: ignore[no-any-return] + + def __narwhals_namespace__(self) -> DuckDBNamespace: + from narwhals._duckdb.namespace import DuckDBNamespace + + return DuckDBNamespace( + backend_version=self._backend_version, version=self._version + ) + + def get_column(self, name: str) -> DuckDBInterchangeSeries: + from narwhals._duckdb.series import DuckDBInterchangeSeries + + return DuckDBInterchangeSeries(self.native.select(name), version=self._version) + + def _iter_columns(self) -> Iterator[Expression]: + for name in self.columns: + yield col(name) + + def collect( + self, backend: ModuleType | Implementation | str | None, **kwargs: Any + ) -> CompliantDataFrameAny: + if backend is None or backend is Implementation.PYARROW: + import pyarrow as pa # ignore-banned-import + + from narwhals._arrow.dataframe import ArrowDataFrame + + return ArrowDataFrame( + self.native.arrow(), + backend_version=parse_version(pa), + version=self._version, + validate_column_names=True, + ) + + if backend is Implementation.PANDAS: + import pandas as pd # ignore-banned-import + + from narwhals._pandas_like.dataframe import PandasLikeDataFrame + + return PandasLikeDataFrame( + self.native.df(), + implementation=Implementation.PANDAS, + backend_version=parse_version(pd), + version=self._version, + validate_column_names=True, + ) + + if backend is Implementation.POLARS: + import polars as pl # ignore-banned-import + + from narwhals._polars.dataframe import PolarsDataFrame + + return PolarsDataFrame( + self.native.pl(), backend_version=parse_version(pl), version=self._version + ) + + msg = f"Unsupported `backend` value: {backend}" # pragma: no cover + raise ValueError(msg) # pragma: no cover + + def head(self, n: int) -> Self: + return self._with_native(self.native.limit(n)) + + def simple_select(self, *column_names: str) -> Self: + return self._with_native(self.native.select(*column_names)) + + def aggregate(self, *exprs: DuckDBExpr) -> Self: + selection = [val.alias(name) for name, val in evaluate_exprs(self, *exprs)] + return self._with_native(self.native.aggregate(selection)) # type: ignore[arg-type] + + def select(self, *exprs: DuckDBExpr) -> Self: + selection = (val.alias(name) for name, val in evaluate_exprs(self, *exprs)) + return self._with_native(self.native.select(*selection)) + + def drop(self, columns: Sequence[str], *, strict: bool) -> Self: + columns_to_drop = parse_columns_to_drop(self, columns, strict=strict) + selection = (name for name in self.columns if name not in columns_to_drop) + return self._with_native(self.native.select(*selection)) + + def lazy(self, *, backend: Implementation | None = None) -> Self: + # The `backend`` argument has no effect but we keep it here for + # backwards compatibility because in `narwhals.stable.v1` + # function `.from_native()` will return a DataFrame for DuckDB. + + if backend is not None: # pragma: no cover + msg = "`backend` argument is not supported for DuckDB" + raise ValueError(msg) + return self + + def with_columns(self, *exprs: DuckDBExpr) -> Self: + new_columns_map = dict(evaluate_exprs(self, *exprs)) + result = [ + new_columns_map.pop(name).alias(name) + if name in new_columns_map + else col(name) + for name in self.columns + ] + result.extend(value.alias(name) for name, value in new_columns_map.items()) + return self._with_native(self.native.select(*result)) + + def filter(self, predicate: DuckDBExpr) -> Self: + # `[0]` is safe as the predicate's expression only returns a single column + mask = predicate(self)[0] + return self._with_native(self.native.filter(mask)) + + @property + def schema(self) -> dict[str, DType]: + if self._cached_native_schema is None: + # Note: prefer `self._cached_native_schema` over `functools.cached_property` + # due to Python3.13 failures. + self._cached_native_schema = dict(zip(self.columns, self.native.types)) + + deferred_time_zone = DeferredTimeZone(self.native) + return { + column_name: native_to_narwhals_dtype( + duckdb_dtype, self._version, deferred_time_zone + ) + for column_name, duckdb_dtype in zip(self.native.columns, self.native.types) + } + + @property + def columns(self) -> list[str]: + if self._cached_columns is None: + self._cached_columns = ( + list(self.schema) + if self._cached_native_schema is not None + else self.native.columns + ) + return self._cached_columns + + def to_pandas(self) -> pd.DataFrame: + # only if version is v1, keep around for backcompat + import pandas as pd # ignore-banned-import() + + if parse_version(pd) >= (1, 0, 0): + return self.native.df() + else: # pragma: no cover + msg = f"Conversion to pandas requires 'pandas>=1.0.0', found {pd.__version__}" + raise NotImplementedError(msg) + + def to_arrow(self) -> pa.Table: + # only if version is v1, keep around for backcompat + return self.native.arrow() + + def _with_version(self, version: Version) -> Self: + return self.__class__( + self.native, version=version, backend_version=self._backend_version + ) + + def _with_native(self, df: duckdb.DuckDBPyRelation) -> Self: + return self.__class__( + df, backend_version=self._backend_version, version=self._version + ) + + def group_by( + self, keys: Sequence[str] | Sequence[DuckDBExpr], *, drop_null_keys: bool + ) -> DuckDBGroupBy: + from narwhals._duckdb.group_by import DuckDBGroupBy + + return DuckDBGroupBy(self, keys, drop_null_keys=drop_null_keys) + + def rename(self, mapping: Mapping[str, str]) -> Self: + df = self.native + selection = ( + col(name).alias(mapping[name]) if name in mapping else col(name) + for name in df.columns + ) + return self._with_native(self.native.select(*selection)) + + def join( + self, + other: Self, + *, + how: JoinStrategy, + left_on: Sequence[str] | None, + right_on: Sequence[str] | None, + suffix: str, + ) -> Self: + native_how = "outer" if how == "full" else how + + if native_how == "cross": + if self._backend_version < (1, 1, 4): + msg = f"'duckdb>=1.1.4' is required for cross-join, found version: {self._backend_version}" + raise NotImplementedError(msg) + rel = self.native.set_alias("lhs").cross(other.native.set_alias("rhs")) + else: + # help mypy + assert left_on is not None # noqa: S101 + assert right_on is not None # noqa: S101 + it = ( + col(f'lhs."{left}"') == col(f'rhs."{right}"') + for left, right in zip(left_on, right_on) + ) + condition: Expression = reduce(and_, it) + rel = self.native.set_alias("lhs").join( + other.native.set_alias("rhs"), + # NOTE: Fixed in `--pre` https://github.com/duckdb/duckdb/pull/16933 + condition=condition, # type: ignore[arg-type, unused-ignore] + how=native_how, + ) + + if native_how in {"inner", "left", "cross", "outer"}: + select = [col(f'lhs."{x}"') for x in self.columns] + for name in other.columns: + col_in_lhs: bool = name in self.columns + if native_how == "outer" and not col_in_lhs: + select.append(col(f'rhs."{name}"')) + elif (native_how == "outer") or ( + col_in_lhs and (right_on is None or name not in right_on) + ): + select.append(col(f'rhs."{name}"').alias(f"{name}{suffix}")) + elif right_on is None or name not in right_on: + select.append(col(name)) + res = rel.select(*select).set_alias(self.native.alias) + else: # semi, anti + res = rel.select("lhs.*").set_alias(self.native.alias) + + return self._with_native(res) + + def join_asof( + self, + other: Self, + *, + left_on: str, + right_on: str, + by_left: Sequence[str] | None, + by_right: Sequence[str] | None, + strategy: AsofJoinStrategy, + suffix: str, + ) -> Self: + lhs = self.native + rhs = other.native + conditions: list[Expression] = [] + if by_left is not None and by_right is not None: + conditions.extend( + col(f'lhs."{left}"') == col(f'rhs."{right}"') + for left, right in zip(by_left, by_right) + ) + else: + by_left = by_right = [] + if strategy == "backward": + conditions.append(col(f'lhs."{left_on}"') >= col(f'rhs."{right_on}"')) + elif strategy == "forward": + conditions.append(col(f'lhs."{left_on}"') <= col(f'rhs."{right_on}"')) + else: + msg = "Only 'backward' and 'forward' strategies are currently supported for DuckDB" + raise NotImplementedError(msg) + condition: Expression = reduce(and_, conditions) + select = ["lhs.*"] + for name in rhs.columns: + if name in lhs.columns and ( + right_on is None or name not in {right_on, *by_right} + ): + select.append(f'rhs."{name}" as "{name}{suffix}"') + elif right_on is None or name not in {right_on, *by_right}: + select.append(str(col(name))) + # Replace with Python API call once + # https://github.com/duckdb/duckdb/discussions/16947 is addressed. + query = f""" + SELECT {",".join(select)} + FROM lhs + ASOF LEFT JOIN rhs + ON {condition} + """ # noqa: S608 + return self._with_native(duckdb.sql(query)) + + def collect_schema(self) -> dict[str, DType]: + return self.schema + + def unique( + self, subset: Sequence[str] | None, *, keep: LazyUniqueKeepStrategy + ) -> Self: + if subset_ := subset if keep == "any" else (subset or self.columns): + if self._backend_version < (1, 3): + msg = ( + "At least version 1.3 of DuckDB is required for `unique` operation\n" + "with `subset` specified." + ) + raise NotImplementedError(msg) + # Sanitise input + if error := self._check_columns_exist(subset_): + raise error + idx_name = generate_temporary_column_name(8, self.columns) + count_name = generate_temporary_column_name(8, [*self.columns, idx_name]) + partition_by_sql = generate_partition_by_sql(*(subset_)) + name = count_name if keep == "none" else idx_name + idx_expr = SQLExpression( + f"{FunctionExpression('row_number')} over ({partition_by_sql})" + ).alias(idx_name) + count_expr = SQLExpression( + f"{FunctionExpression('count', StarExpression())} over ({partition_by_sql})" + ).alias(count_name) + return self._with_native( + self.native.select(StarExpression(), idx_expr, count_expr) + .filter(col(name) == lit(1)) + .select(StarExpression(exclude=[count_name, idx_name])) + ) + return self._with_native(self.native.unique(", ".join(self.columns))) + + def sort(self, *by: str, descending: bool | Sequence[bool], nulls_last: bool) -> Self: + if isinstance(descending, bool): + descending = [descending] * len(by) + if nulls_last: + it = ( + col(name).nulls_last() if not desc else col(name).desc().nulls_last() + for name, desc in zip(by, descending) + ) + else: + it = ( + col(name).nulls_first() if not desc else col(name).desc().nulls_first() + for name, desc in zip(by, descending) + ) + return self._with_native(self.native.sort(*it)) + + def drop_nulls(self, subset: Sequence[str] | None) -> Self: + subset_ = subset if subset is not None else self.columns + keep_condition = reduce(and_, (col(name).isnotnull() for name in subset_)) + return self._with_native(self.native.filter(keep_condition)) + + def explode(self, columns: Sequence[str]) -> Self: + dtypes = self._version.dtypes + schema = self.collect_schema() + for name in columns: + dtype = schema[name] + if dtype != dtypes.List: + msg = ( + f"`explode` operation not supported for dtype `{dtype}`, " + "expected List type" + ) + raise InvalidOperationError(msg) + + if len(columns) != 1: + msg = ( + "Exploding on multiple columns is not supported with DuckDB backend since " + "we cannot guarantee that the exploded columns have matching element counts." + ) + raise NotImplementedError(msg) + + col_to_explode = col(columns[0]) + rel = self.native + original_columns = self.columns + + not_null_condition = col_to_explode.isnotnull() & FunctionExpression( + "len", col_to_explode + ) > lit(0) + non_null_rel = rel.filter(not_null_condition).select( + *( + FunctionExpression("unnest", col_to_explode).alias(name) + if name in columns + else name + for name in original_columns + ) + ) + + null_rel = rel.filter(~not_null_condition).select( + *( + lit(None).alias(name) if name in columns else name + for name in original_columns + ) + ) + + return self._with_native(non_null_rel.union(null_rel)) + + def unpivot( + self, + on: Sequence[str] | None, + index: Sequence[str] | None, + variable_name: str, + value_name: str, + ) -> Self: + index_ = [] if index is None else index + on_ = [c for c in self.columns if c not in index_] if on is None else on + + if variable_name == "": + msg = "`variable_name` cannot be empty string for duckdb backend." + raise NotImplementedError(msg) + + if value_name == "": + msg = "`value_name` cannot be empty string for duckdb backend." + raise NotImplementedError(msg) + + unpivot_on = ", ".join(str(col(name)) for name in on_) + rel = self.native # noqa: F841 + # Replace with Python API once + # https://github.com/duckdb/duckdb/discussions/16980 is addressed. + query = f""" + unpivot rel + on {unpivot_on} + into + name "{variable_name}" + value "{value_name}" + """ + return self._with_native( + duckdb.sql(query).select(*[*index_, variable_name, value_name]) + ) + + gather_every = not_implemented.deprecated( + "`LazyFrame.gather_every` is deprecated and will be removed in a future version." + ) + tail = not_implemented.deprecated( + "`LazyFrame.tail` is deprecated and will be removed in a future version." + ) + with_row_index = not_implemented() diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py new file mode 100644 index 0000000..b3d55f3 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py @@ -0,0 +1,898 @@ +from __future__ import annotations + +import contextlib +import operator +from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence, cast + +from duckdb import CoalesceOperator, FunctionExpression, StarExpression +from duckdb.typing import DuckDBPyType + +from narwhals._compliant import LazyExpr +from narwhals._compliant.window import WindowInputs +from narwhals._duckdb.expr_dt import DuckDBExprDateTimeNamespace +from narwhals._duckdb.expr_list import DuckDBExprListNamespace +from narwhals._duckdb.expr_str import DuckDBExprStringNamespace +from narwhals._duckdb.expr_struct import DuckDBExprStructNamespace +from narwhals._duckdb.utils import ( + col, + generate_order_by_sql, + generate_partition_by_sql, + lit, + narwhals_to_native_dtype, + when, +) +from narwhals._expression_parsing import ExprKind +from narwhals._utils import Implementation, not_implemented, requires + +if TYPE_CHECKING: + from duckdb import Expression + from typing_extensions import Self + + from narwhals._compliant.typing import ( + AliasNames, + EvalNames, + EvalSeries, + WindowFunction, + ) + from narwhals._duckdb.dataframe import DuckDBLazyFrame + from narwhals._duckdb.namespace import DuckDBNamespace + from narwhals._expression_parsing import ExprMetadata + from narwhals._utils import Version, _FullContext + from narwhals.typing import ( + FillNullStrategy, + IntoDType, + NonNestedLiteral, + NumericLiteral, + RankMethod, + RollingInterpolationMethod, + TemporalLiteral, + ) + + DuckDBWindowFunction = WindowFunction[DuckDBLazyFrame, Expression] + DuckDBWindowInputs = WindowInputs[Expression] + + +with contextlib.suppress(ImportError): # requires duckdb>=1.3.0 + from duckdb import SQLExpression + + +class DuckDBExpr(LazyExpr["DuckDBLazyFrame", "Expression"]): + _implementation = Implementation.DUCKDB + + def __init__( + self, + call: EvalSeries[DuckDBLazyFrame, Expression], + window_function: DuckDBWindowFunction | None = None, + *, + evaluate_output_names: EvalNames[DuckDBLazyFrame], + alias_output_names: AliasNames | None, + backend_version: tuple[int, ...], + version: Version, + ) -> None: + self._call = call + self._evaluate_output_names = evaluate_output_names + self._alias_output_names = alias_output_names + self._backend_version = backend_version + self._version = version + self._metadata: ExprMetadata | None = None + self._window_function: DuckDBWindowFunction | None = window_function + + @property + def window_function(self) -> DuckDBWindowFunction: + def default_window_func( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + assert not window_inputs.order_by # noqa: S101 + partition_by_sql = generate_partition_by_sql(*window_inputs.partition_by) + template = f"{{expr}} over ({partition_by_sql})" + return [SQLExpression(template.format(expr=expr)) for expr in self(df)] + + return self._window_function or default_window_func + + def __call__(self, df: DuckDBLazyFrame) -> Sequence[Expression]: + return self._call(df) + + def __narwhals_expr__(self) -> None: ... + + def __narwhals_namespace__(self) -> DuckDBNamespace: # pragma: no cover + # Unused, just for compatibility with PandasLikeExpr + from narwhals._duckdb.namespace import DuckDBNamespace + + return DuckDBNamespace( + backend_version=self._backend_version, version=self._version + ) + + def _cum_window_func( + self, + *, + reverse: bool, + func_name: Literal["sum", "max", "min", "count", "product"], + ) -> DuckDBWindowFunction: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=not reverse) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + sql = ( + f"{func_name} ({{expr}}) over ({partition_by_sql} {order_by_sql} " + "rows between unbounded preceding and current row)" + ) + return [SQLExpression(sql.format(expr=expr)) for expr in self(df)] + + return func + + def _rolling_window_func( + self, + *, + func_name: Literal["sum", "mean", "std", "var"], + center: bool, + window_size: int, + min_samples: int, + ddof: int | None = None, + ) -> DuckDBWindowFunction: + supported_funcs = ["sum", "mean", "std", "var"] + if center: + half = (window_size - 1) // 2 + remainder = (window_size - 1) % 2 + start = f"{half + remainder} preceding" + end = f"{half} following" + else: + start = f"{window_size - 1} preceding" + end = "current row" + + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + window = f"({partition_by_sql} {order_by_sql} rows between {start} and {end})" + if func_name in {"sum", "mean"}: + func_: str = func_name + elif func_name == "var" and ddof == 0: + func_ = "var_pop" + elif func_name in "var" and ddof == 1: + func_ = "var_samp" + elif func_name == "std" and ddof == 0: + func_ = "stddev_pop" + elif func_name == "std" and ddof == 1: + func_ = "stddev_samp" + elif func_name in {"var", "std"}: # pragma: no cover + msg = f"Only ddof=0 and ddof=1 are currently supported for rolling_{func_name}." + raise ValueError(msg) + else: # pragma: no cover + msg = f"Only the following functions are supported: {supported_funcs}.\nGot: {func_name}." + raise ValueError(msg) + condition_sql = f"count({{expr}}) over {window} >= {min_samples}" + value_sql = f"{func_}({{expr}}) over {window}" + return [ + when( + SQLExpression(condition_sql.format(expr=expr)), + SQLExpression(value_sql.format(expr=expr)), + ) + for expr in self(df) + ] + + return func + + def broadcast(self, kind: Literal[ExprKind.AGGREGATION, ExprKind.LITERAL]) -> Self: + if kind is ExprKind.LITERAL: + return self + if self._backend_version < (1, 3): + msg = "At least version 1.3 of DuckDB is required for binary operations between aggregates and columns." + raise NotImplementedError(msg) + return self.over([lit(1)], []) + + @classmethod + def from_column_names( + cls, + evaluate_column_names: EvalNames[DuckDBLazyFrame], + /, + *, + context: _FullContext, + ) -> Self: + def func(df: DuckDBLazyFrame) -> list[Expression]: + return [col(name) for name in evaluate_column_names(df)] + + return cls( + func, + evaluate_output_names=evaluate_column_names, + alias_output_names=None, + backend_version=context._backend_version, + version=context._version, + ) + + @classmethod + def from_column_indices(cls, *column_indices: int, context: _FullContext) -> Self: + def func(df: DuckDBLazyFrame) -> list[Expression]: + columns = df.columns + return [col(columns[i]) for i in column_indices] + + return cls( + func, + evaluate_output_names=cls._eval_names_indices(column_indices), + alias_output_names=None, + backend_version=context._backend_version, + version=context._version, + ) + + def _callable_to_eval_series( + self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any + ) -> EvalSeries[DuckDBLazyFrame, Expression]: + def func(df: DuckDBLazyFrame) -> list[Expression]: + native_series_list = self(df) + other_native_series = { + key: df._evaluate_expr(value) if self._is_expr(value) else lit(value) + for key, value in expressifiable_args.items() + } + return [ + call(native_series, **other_native_series) + for native_series in native_series_list + ] + + return func + + def _push_down_window_function( + self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any + ) -> DuckDBWindowFunction: + def window_f( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + # If a function `f` is elementwise, and `g` is another function, then + # - `f(g) over (window)` + # - `f(g over (window)) + # are equivalent. + # Make sure to only use with if `call` is elementwise! + native_series_list = self.window_function(df, window_inputs) + other_native_series = { + key: df._evaluate_window_expr(value, window_inputs) + if self._is_expr(value) + else lit(value) + for key, value in expressifiable_args.items() + } + return [ + call(native_series, **other_native_series) + for native_series in native_series_list + ] + + return window_f + + def _with_callable( + self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any + ) -> Self: + """Create expression from callable. + + Arguments: + call: Callable from compliant DataFrame to native Expression + expr_name: Expression name + expressifiable_args: arguments pass to expression which should be parsed + as expressions (e.g. in `nw.col('a').is_between('b', 'c')`) + """ + return self.__class__( + self._callable_to_eval_series(call, **expressifiable_args), + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + def _with_elementwise( + self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any + ) -> Self: + return self.__class__( + self._callable_to_eval_series(call, **expressifiable_args), + self._push_down_window_function(call, **expressifiable_args), + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + def _with_binary(self, op: Callable[..., Expression], other: Self | Any) -> Self: + return self.__class__( + self._callable_to_eval_series(op, other=other), + self._push_down_window_function(op, other=other), + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + def _with_alias_output_names(self, func: AliasNames | None, /) -> Self: + return type(self)( + self._call, + self._window_function, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=func, + backend_version=self._backend_version, + version=self._version, + ) + + def _with_window_function(self, window_function: DuckDBWindowFunction) -> Self: + return self.__class__( + self._call, + window_function, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + @classmethod + def _alias_native(cls, expr: Expression, name: str) -> Expression: + return expr.alias(name) + + def __and__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr & other, other) + + def __or__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr | other, other) + + def __add__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr + other, other) + + def __truediv__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr / other, other) + + def __rtruediv__(self, other: DuckDBExpr) -> Self: + return self._with_binary( + lambda expr, other: other.__truediv__(expr), other + ).alias("literal") + + def __floordiv__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr // other, other) + + def __rfloordiv__(self, other: DuckDBExpr) -> Self: + return self._with_binary( + lambda expr, other: other.__floordiv__(expr), other + ).alias("literal") + + def __mod__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr % other, other) + + def __rmod__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: other.__mod__(expr), other).alias( + "literal" + ) + + def __sub__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr - other, other) + + def __rsub__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: other.__sub__(expr), other).alias( + "literal" + ) + + def __mul__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr * other, other) + + def __pow__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr**other, other) + + def __rpow__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: other.__pow__(expr), other).alias( + "literal" + ) + + def __lt__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr < other, other) + + def __gt__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr > other, other) + + def __le__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr <= other, other) + + def __ge__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr >= other, other) + + def __eq__(self, other: DuckDBExpr) -> Self: # type: ignore[override] + return self._with_binary(lambda expr, other: expr == other, other) + + def __ne__(self, other: DuckDBExpr) -> Self: # type: ignore[override] + return self._with_binary(lambda expr, other: expr != other, other) + + def __invert__(self) -> Self: + invert = cast("Callable[..., Expression]", operator.invert) + return self._with_elementwise(invert) + + def abs(self) -> Self: + return self._with_elementwise(lambda expr: FunctionExpression("abs", expr)) + + def mean(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("mean", expr)) + + def skew(self) -> Self: + def func(expr: Expression) -> Expression: + count = FunctionExpression("count", expr) + # Adjust population skewness by correction factor to get sample skewness + sample_skewness = ( + FunctionExpression("skewness", expr) + * (count - lit(2)) + / FunctionExpression("sqrt", count * (count - lit(1))) + ) + return when(count == lit(0), lit(None)).otherwise( + when(count == lit(1), lit(float("nan"))).otherwise( + when(count == lit(2), lit(0.0)).otherwise(sample_skewness) + ) + ) + + return self._with_callable(func) + + def median(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("median", expr)) + + def all(self) -> Self: + def f(expr: Expression) -> Expression: + return CoalesceOperator(FunctionExpression("bool_and", expr), lit(True)) # noqa: FBT003 + + def window_f( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + pb = generate_partition_by_sql(*window_inputs.partition_by) + return [ + CoalesceOperator( + SQLExpression(f"{FunctionExpression('bool_and', expr)} over ({pb})"), + lit(True), # noqa: FBT003 + ) + for expr in self(df) + ] + + return self._with_callable(f)._with_window_function(window_f) + + def any(self) -> Self: + def f(expr: Expression) -> Expression: + return CoalesceOperator(FunctionExpression("bool_or", expr), lit(False)) # noqa: FBT003 + + def window_f( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + pb = generate_partition_by_sql(*window_inputs.partition_by) + return [ + CoalesceOperator( + SQLExpression(f"{FunctionExpression('bool_or', expr)} over ({pb})"), + lit(False), # noqa: FBT003 + ) + for expr in self(df) + ] + + return self._with_callable(f)._with_window_function(window_f) + + def quantile( + self, quantile: float, interpolation: RollingInterpolationMethod + ) -> Self: + def func(expr: Expression) -> Expression: + if interpolation == "linear": + return FunctionExpression("quantile_cont", expr, lit(quantile)) + msg = "Only linear interpolation methods are supported for DuckDB quantile." + raise NotImplementedError(msg) + + return self._with_callable(func) + + def clip( + self, + lower_bound: Self | NumericLiteral | TemporalLiteral | None, + upper_bound: Self | NumericLiteral | TemporalLiteral | None, + ) -> Self: + def _clip_lower(expr: Expression, lower_bound: Any) -> Expression: + return FunctionExpression("greatest", expr, lower_bound) + + def _clip_upper(expr: Expression, upper_bound: Any) -> Expression: + return FunctionExpression("least", expr, upper_bound) + + def _clip_both( + expr: Expression, lower_bound: Any, upper_bound: Any + ) -> Expression: + return FunctionExpression( + "greatest", FunctionExpression("least", expr, upper_bound), lower_bound + ) + + if lower_bound is None: + return self._with_elementwise(_clip_upper, upper_bound=upper_bound) + if upper_bound is None: + return self._with_elementwise(_clip_lower, lower_bound=lower_bound) + return self._with_elementwise( + _clip_both, lower_bound=lower_bound, upper_bound=upper_bound + ) + + def sum(self) -> Self: + def f(expr: Expression) -> Expression: + return CoalesceOperator(FunctionExpression("sum", expr), lit(0)) + + def window_f( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + pb = generate_partition_by_sql(*window_inputs.partition_by) + return [ + CoalesceOperator( + SQLExpression(f"{FunctionExpression('sum', expr)} over ({pb})"), + lit(0), + ) + for expr in self(df) + ] + + return self._with_callable(f)._with_window_function(window_f) + + def n_unique(self) -> Self: + def func(expr: Expression) -> Expression: + # https://stackoverflow.com/a/79338887/4451315 + return FunctionExpression( + "array_unique", FunctionExpression("array_agg", expr) + ) + FunctionExpression( + "max", when(expr.isnotnull(), lit(0)).otherwise(lit(1)) + ) + + return self._with_callable(func) + + def count(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("count", expr)) + + def len(self) -> Self: + return self._with_callable(lambda _expr: FunctionExpression("count")) + + def std(self, ddof: int) -> Self: + if ddof == 0: + return self._with_callable( + lambda expr: FunctionExpression("stddev_pop", expr) + ) + if ddof == 1: + return self._with_callable( + lambda expr: FunctionExpression("stddev_samp", expr) + ) + + def _std(expr: Expression) -> Expression: + n_samples = FunctionExpression("count", expr) + return ( + FunctionExpression("stddev_pop", expr) + * FunctionExpression("sqrt", n_samples) + / (FunctionExpression("sqrt", (n_samples - lit(ddof)))) + ) + + return self._with_callable(_std) + + def var(self, ddof: int) -> Self: + if ddof == 0: + return self._with_callable(lambda expr: FunctionExpression("var_pop", expr)) + if ddof == 1: + return self._with_callable(lambda expr: FunctionExpression("var_samp", expr)) + + def _var(expr: Expression) -> Expression: + n_samples = FunctionExpression("count", expr) + return ( + FunctionExpression("var_pop", expr) * n_samples / (n_samples - lit(ddof)) + ) + + return self._with_callable(_var) + + def max(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("max", expr)) + + def min(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("min", expr)) + + def null_count(self) -> Self: + return self._with_callable( + lambda expr: FunctionExpression("sum", expr.isnull().cast("int")) + ) + + @requires.backend_version((1, 3)) + def over( + self, partition_by: Sequence[str | Expression], order_by: Sequence[str] + ) -> Self: + def func(df: DuckDBLazyFrame) -> Sequence[Expression]: + return self.window_function(df, WindowInputs(partition_by, order_by)) + + return self.__class__( + func, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + def is_null(self) -> Self: + return self._with_elementwise(lambda expr: expr.isnull()) + + def is_nan(self) -> Self: + return self._with_elementwise(lambda expr: FunctionExpression("isnan", expr)) + + def is_finite(self) -> Self: + return self._with_elementwise(lambda expr: FunctionExpression("isfinite", expr)) + + def is_in(self, other: Sequence[Any]) -> Self: + return self._with_elementwise( + lambda expr: FunctionExpression("contains", lit(other), expr) + ) + + def round(self, decimals: int) -> Self: + return self._with_elementwise( + lambda expr: FunctionExpression("round", expr, lit(decimals)) + ) + + @requires.backend_version((1, 3)) + def shift(self, n: int) -> Self: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + sql = f"lag({{expr}}, {n}) over ({partition_by_sql} {order_by_sql})" + return [SQLExpression(sql.format(expr=expr)) for expr in self(df)] + + return self._with_window_function(func) + + @requires.backend_version((1, 3)) + def is_first_distinct(self) -> Self: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + if inputs.partition_by: + partition_by_sql = ( + generate_partition_by_sql(*inputs.partition_by) + ", {expr}" + ) + else: + partition_by_sql = "partition by {expr}" + sql = ( + f"{FunctionExpression('row_number')} " + f"over({partition_by_sql} {order_by_sql})" + ) + return [SQLExpression(sql.format(expr=expr)) == lit(1) for expr in self(df)] + + return self._with_window_function(func) + + @requires.backend_version((1, 3)) + def is_last_distinct(self) -> Self: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=False) + if inputs.partition_by: + partition_by_sql = ( + generate_partition_by_sql(*inputs.partition_by) + ", {expr}" + ) + else: + partition_by_sql = "partition by {expr}" + sql = ( + f"{FunctionExpression('row_number')} " + f"over({partition_by_sql} {order_by_sql})" + ) + return [SQLExpression(sql.format(expr=expr)) == lit(1) for expr in self(df)] + + return self._with_window_function(func) + + @requires.backend_version((1, 3)) + def diff(self) -> Self: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + sql = f"lag({{expr}}) over ({partition_by_sql} {order_by_sql})" + return [expr - SQLExpression(sql.format(expr=expr)) for expr in self(df)] + + return self._with_window_function(func) + + @requires.backend_version((1, 3)) + def cum_sum(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="sum") + ) + + @requires.backend_version((1, 3)) + def cum_max(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="max") + ) + + @requires.backend_version((1, 3)) + def cum_min(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="min") + ) + + @requires.backend_version((1, 3)) + def cum_count(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="count") + ) + + @requires.backend_version((1, 3)) + def cum_prod(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="product") + ) + + @requires.backend_version((1, 3)) + def rolling_sum(self, window_size: int, *, min_samples: int, center: bool) -> Self: + return self._with_window_function( + self._rolling_window_func( + func_name="sum", + center=center, + window_size=window_size, + min_samples=min_samples, + ) + ) + + @requires.backend_version((1, 3)) + def rolling_mean(self, window_size: int, *, min_samples: int, center: bool) -> Self: + return self._with_window_function( + self._rolling_window_func( + func_name="mean", + center=center, + window_size=window_size, + min_samples=min_samples, + ) + ) + + @requires.backend_version((1, 3)) + def rolling_var( + self, window_size: int, *, min_samples: int, center: bool, ddof: int + ) -> Self: + return self._with_window_function( + self._rolling_window_func( + func_name="var", + center=center, + window_size=window_size, + min_samples=min_samples, + ddof=ddof, + ) + ) + + @requires.backend_version((1, 3)) + def rolling_std( + self, window_size: int, *, min_samples: int, center: bool, ddof: int + ) -> Self: + return self._with_window_function( + self._rolling_window_func( + func_name="std", + center=center, + window_size=window_size, + min_samples=min_samples, + ddof=ddof, + ) + ) + + def fill_null( + self, + value: Self | NonNestedLiteral, + strategy: FillNullStrategy | None, + limit: int | None, + ) -> Self: + if strategy is not None: + if self._backend_version < (1, 3): # pragma: no cover + msg = f"`fill_null` with `strategy={strategy}` is only available in 'duckdb>=1.3.0'." + raise NotImplementedError(msg) + + def _fill_with_strategy( + df: DuckDBLazyFrame, inputs: DuckDBWindowInputs + ) -> Sequence[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + + fill_func = "last_value" if strategy == "forward" else "first_value" + _limit = "unbounded" if limit is None else limit + rows_between = ( + f"{_limit} preceding and current row" + if strategy == "forward" + else f"current row and {_limit} following" + ) + sql = ( + f"{fill_func}({{expr}} ignore nulls) over " + f"({partition_by_sql} {order_by_sql} rows between {rows_between})" + ) + return [SQLExpression(sql.format(expr=expr)) for expr in self(df)] + + return self._with_window_function(_fill_with_strategy) + + def _fill_constant(expr: Expression, value: Any) -> Expression: + return CoalesceOperator(expr, value) + + return self._with_elementwise(_fill_constant, value=value) + + def cast(self, dtype: IntoDType) -> Self: + def func(expr: Expression) -> Expression: + native_dtype = narwhals_to_native_dtype(dtype, self._version) + return expr.cast(DuckDBPyType(native_dtype)) + + return self._with_elementwise(func) + + @requires.backend_version((1, 3)) + def is_unique(self) -> Self: + def _is_unique(expr: Expression, *partition_by: str | Expression) -> Expression: + pb = generate_partition_by_sql(expr, *partition_by) + sql = f"{FunctionExpression('count', col('*'))} over ({pb})" + return SQLExpression(sql) == lit(1) + + def _unpartitioned_is_unique(expr: Expression) -> Expression: + return _is_unique(expr) + + def _partitioned_is_unique( + df: DuckDBLazyFrame, inputs: DuckDBWindowInputs + ) -> Sequence[Expression]: + assert not inputs.order_by # noqa: S101 + return [_is_unique(expr, *inputs.partition_by) for expr in self(df)] + + return self._with_callable(_unpartitioned_is_unique)._with_window_function( + _partitioned_is_unique + ) + + @requires.backend_version((1, 3)) + def rank(self, method: RankMethod, *, descending: bool) -> Self: + if method in {"min", "max", "average"}: + func = FunctionExpression("rank") + elif method == "dense": + func = FunctionExpression("dense_rank") + else: # method == "ordinal" + func = FunctionExpression("row_number") + + def _rank( + expr: Expression, + *, + descending: bool, + partition_by: Sequence[str | Expression] | None = None, + ) -> Expression: + order_by_sql = ( + f"order by {expr} desc nulls last" + if descending + else f"order by {expr} asc nulls last" + ) + count_expr = FunctionExpression("count", StarExpression()) + if partition_by is not None: + window = f"{generate_partition_by_sql(*partition_by)} {order_by_sql}" + count_window = f"{generate_partition_by_sql(*partition_by, expr)}" + else: + window = order_by_sql + count_window = generate_partition_by_sql(expr) + if method == "max": + rank_expr = ( + SQLExpression(f"{func} OVER ({window})") + + SQLExpression(f"{count_expr} over ({count_window})") + - lit(1) + ) + elif method == "average": + rank_expr = SQLExpression(f"{func} OVER ({window})") + ( + SQLExpression(f"{count_expr} over ({count_window})") - lit(1) + ) / lit(2.0) + else: + rank_expr = SQLExpression(f"{func} OVER ({window})") + return when(expr.isnotnull(), rank_expr) + + def _unpartitioned_rank(expr: Expression) -> Expression: + return _rank(expr, descending=descending) + + def _partitioned_rank( + df: DuckDBLazyFrame, inputs: DuckDBWindowInputs + ) -> Sequence[Expression]: + assert not inputs.order_by # noqa: S101 + return [ + _rank(expr, descending=descending, partition_by=inputs.partition_by) + for expr in self(df) + ] + + return self._with_callable(_unpartitioned_rank)._with_window_function( + _partitioned_rank + ) + + def log(self, base: float) -> Self: + def _log(expr: Expression) -> Expression: + log = FunctionExpression("log", expr) + return ( + when(expr < lit(0), lit(float("nan"))) + .when(expr == lit(0), lit(float("-inf"))) + .otherwise(log / FunctionExpression("log", lit(base))) + ) + + return self._with_elementwise(_log) + + def exp(self) -> Self: + def _exp(expr: Expression) -> Expression: + return FunctionExpression("exp", expr) + + return self._with_elementwise(_exp) + + @property + def str(self) -> DuckDBExprStringNamespace: + return DuckDBExprStringNamespace(self) + + @property + def dt(self) -> DuckDBExprDateTimeNamespace: + return DuckDBExprDateTimeNamespace(self) + + @property + def list(self) -> DuckDBExprListNamespace: + return DuckDBExprListNamespace(self) + + @property + def struct(self) -> DuckDBExprStructNamespace: + return DuckDBExprStructNamespace(self) + + drop_nulls = not_implemented() + unique = not_implemented() diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_dt.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_dt.py new file mode 100644 index 0000000..68f0f9b --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_dt.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Sequence + +from duckdb import FunctionExpression + +from narwhals._duckdb.utils import UNITS_DICT, fetch_rel_time_zone, lit +from narwhals._duration import parse_interval_string +from narwhals._utils import not_implemented + +if TYPE_CHECKING: + from duckdb import Expression + + from narwhals._duckdb.dataframe import DuckDBLazyFrame + from narwhals._duckdb.expr import DuckDBExpr + + +class DuckDBExprDateTimeNamespace: + def __init__(self, expr: DuckDBExpr) -> None: + self._compliant_expr = expr + + def year(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("year", expr) + ) + + def month(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("month", expr) + ) + + def day(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("day", expr) + ) + + def hour(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("hour", expr) + ) + + def minute(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("minute", expr) + ) + + def second(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("second", expr) + ) + + def millisecond(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("millisecond", expr) + - FunctionExpression("second", expr) * lit(1_000) + ) + + def microsecond(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("microsecond", expr) + - FunctionExpression("second", expr) * lit(1_000_000) + ) + + def nanosecond(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("nanosecond", expr) + - FunctionExpression("second", expr) * lit(1_000_000_000) + ) + + def to_string(self, format: str) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("strftime", expr, lit(format)) + ) + + def weekday(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("isodow", expr) + ) + + def ordinal_day(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("dayofyear", expr) + ) + + def date(self) -> DuckDBExpr: + return self._compliant_expr._with_callable(lambda expr: expr.cast("date")) + + def total_minutes(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("datepart", lit("minute"), expr) + ) + + def total_seconds(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: lit(60) * FunctionExpression("datepart", lit("minute"), expr) + + FunctionExpression("datepart", lit("second"), expr) + ) + + def total_milliseconds(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: lit(60_000) * FunctionExpression("datepart", lit("minute"), expr) + + FunctionExpression("datepart", lit("millisecond"), expr) + ) + + def total_microseconds(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: lit(60_000_000) + * FunctionExpression("datepart", lit("minute"), expr) + + FunctionExpression("datepart", lit("microsecond"), expr) + ) + + def truncate(self, every: str) -> DuckDBExpr: + multiple, unit = parse_interval_string(every) + if multiple != 1: + # https://github.com/duckdb/duckdb/issues/17554 + msg = f"Only multiple 1 is currently supported for DuckDB.\nGot {multiple!s}." + raise ValueError(msg) + if unit == "ns": + msg = "Truncating to nanoseconds is not yet supported for DuckDB." + raise NotImplementedError(msg) + format = lit(UNITS_DICT[unit]) + + def _truncate(expr: Expression) -> Expression: + return FunctionExpression("date_trunc", format, expr) + + return self._compliant_expr._with_callable(_truncate) + + def _no_op_time_zone(self, time_zone: str) -> DuckDBExpr: + def func(df: DuckDBLazyFrame) -> Sequence[Expression]: + native_series_list = self._compliant_expr(df) + conn_time_zone = fetch_rel_time_zone(df.native) + if conn_time_zone != time_zone: + msg = ( + "DuckDB stores the time zone in the connection, rather than in the " + f"data type, so changing the timezone to anything other than {conn_time_zone} " + " (the current connection time zone) is not supported." + ) + raise NotImplementedError(msg) + return native_series_list + + return self._compliant_expr.__class__( + func, + evaluate_output_names=self._compliant_expr._evaluate_output_names, + alias_output_names=self._compliant_expr._alias_output_names, + backend_version=self._compliant_expr._backend_version, + version=self._compliant_expr._version, + ) + + def convert_time_zone(self, time_zone: str) -> DuckDBExpr: + return self._no_op_time_zone(time_zone) + + def replace_time_zone(self, time_zone: str | None) -> DuckDBExpr: + if time_zone is None: + return self._compliant_expr._with_callable( + lambda _input: _input.cast("timestamp") + ) + else: + return self._no_op_time_zone(time_zone) + + total_nanoseconds = not_implemented() diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_list.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_list.py new file mode 100644 index 0000000..60562e5 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_list.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from duckdb import FunctionExpression + +if TYPE_CHECKING: + from narwhals._duckdb.expr import DuckDBExpr + + +class DuckDBExprListNamespace: + def __init__(self, expr: DuckDBExpr) -> None: + self._compliant_expr = expr + + def len(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("len", expr) + ) diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_str.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_str.py new file mode 100644 index 0000000..6a9d8c2 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_str.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from duckdb import FunctionExpression + +from narwhals._duckdb.utils import lit +from narwhals._utils import not_implemented + +if TYPE_CHECKING: + from duckdb import Expression + + from narwhals._duckdb.expr import DuckDBExpr + + +class DuckDBExprStringNamespace: + def __init__(self, expr: DuckDBExpr) -> None: + self._compliant_expr = expr + + def starts_with(self, prefix: str) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("starts_with", expr, lit(prefix)) + ) + + def ends_with(self, suffix: str) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("ends_with", expr, lit(suffix)) + ) + + def contains(self, pattern: str, *, literal: bool) -> DuckDBExpr: + def func(expr: Expression) -> Expression: + if literal: + return FunctionExpression("contains", expr, lit(pattern)) + return FunctionExpression("regexp_matches", expr, lit(pattern)) + + return self._compliant_expr._with_callable(func) + + def slice(self, offset: int, length: int) -> DuckDBExpr: + def func(expr: Expression) -> Expression: + offset_lit = lit(offset) + return FunctionExpression( + "array_slice", + expr, + lit(offset + 1) + if offset >= 0 + else FunctionExpression("length", expr) + offset_lit + lit(1), + FunctionExpression("length", expr) + if length is None + else lit(length) + offset_lit, + ) + + return self._compliant_expr._with_callable(func) + + def split(self, by: str) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("str_split", expr, lit(by)) + ) + + def len_chars(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("length", expr) + ) + + def to_lowercase(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("lower", expr) + ) + + def to_uppercase(self) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("upper", expr) + ) + + def strip_chars(self, characters: str | None) -> DuckDBExpr: + import string + + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression( + "trim", expr, lit(string.whitespace if characters is None else characters) + ) + ) + + def replace_all(self, pattern: str, value: str, *, literal: bool) -> DuckDBExpr: + if not literal: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression( + "regexp_replace", expr, lit(pattern), lit(value), lit("g") + ) + ) + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("replace", expr, lit(pattern), lit(value)) + ) + + def to_datetime(self, format: str | None) -> DuckDBExpr: + if format is None: + msg = "Cannot infer format with DuckDB backend, please specify `format` explicitly." + raise NotImplementedError(msg) + + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("strptime", expr, lit(format)) + ) + + replace = not_implemented() diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_struct.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_struct.py new file mode 100644 index 0000000..3124204 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr_struct.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from duckdb import FunctionExpression + +from narwhals._duckdb.utils import lit + +if TYPE_CHECKING: + from narwhals._duckdb.expr import DuckDBExpr + + +class DuckDBExprStructNamespace: + def __init__(self, expr: DuckDBExpr) -> None: + self._compliant_expr = expr + + def field(self, name: str) -> DuckDBExpr: + return self._compliant_expr._with_callable( + lambda expr: FunctionExpression("struct_extract", expr, lit(name)) + ).alias(name) diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/group_by.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/group_by.py new file mode 100644 index 0000000..8fa2978 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/group_by.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from itertools import chain +from typing import TYPE_CHECKING, Sequence + +from narwhals._compliant import LazyGroupBy + +if TYPE_CHECKING: + from duckdb import Expression # noqa: F401 + + from narwhals._duckdb.dataframe import DuckDBLazyFrame + from narwhals._duckdb.expr import DuckDBExpr + + +class DuckDBGroupBy(LazyGroupBy["DuckDBLazyFrame", "DuckDBExpr", "Expression"]): + def __init__( + self, + df: DuckDBLazyFrame, + keys: Sequence[DuckDBExpr] | Sequence[str], + /, + *, + drop_null_keys: bool, + ) -> None: + frame, self._keys, self._output_key_names = self._parse_keys(df, keys=keys) + self._compliant_frame = frame.drop_nulls(self._keys) if drop_null_keys else frame + + def agg(self, *exprs: DuckDBExpr) -> DuckDBLazyFrame: + agg_columns = list(chain(self._keys, self._evaluate_exprs(exprs))) + return self.compliant._with_native( + self.compliant.native.aggregate(agg_columns) # type: ignore[arg-type] + ).rename(dict(zip(self._keys, self._output_key_names))) diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/namespace.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/namespace.py new file mode 100644 index 0000000..3616f2e --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/namespace.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import operator +from functools import reduce +from itertools import chain +from typing import TYPE_CHECKING, Callable, Iterable, Sequence + +import duckdb +from duckdb import CoalesceOperator, Expression, FunctionExpression +from duckdb.typing import BIGINT, VARCHAR + +from narwhals._compliant import LazyNamespace, LazyThen, LazyWhen +from narwhals._duckdb.dataframe import DuckDBLazyFrame +from narwhals._duckdb.expr import DuckDBExpr +from narwhals._duckdb.selectors import DuckDBSelectorNamespace +from narwhals._duckdb.utils import concat_str, lit, narwhals_to_native_dtype, when +from narwhals._expression_parsing import ( + combine_alias_output_names, + combine_evaluate_output_names, +) +from narwhals._utils import Implementation + +if TYPE_CHECKING: + from narwhals._duckdb.expr import DuckDBWindowInputs + from narwhals._utils import Version + from narwhals.typing import ConcatMethod, IntoDType, NonNestedLiteral + + +class DuckDBNamespace( + LazyNamespace[DuckDBLazyFrame, DuckDBExpr, duckdb.DuckDBPyRelation] +): + _implementation: Implementation = Implementation.DUCKDB + + def __init__(self, *, backend_version: tuple[int, ...], version: Version) -> None: + self._backend_version = backend_version + self._version = version + + @property + def selectors(self) -> DuckDBSelectorNamespace: + return DuckDBSelectorNamespace.from_namespace(self) + + @property + def _expr(self) -> type[DuckDBExpr]: + return DuckDBExpr + + @property + def _lazyframe(self) -> type[DuckDBLazyFrame]: + return DuckDBLazyFrame + + def _with_elementwise( + self, func: Callable[[Iterable[Expression]], Expression], *exprs: DuckDBExpr + ) -> DuckDBExpr: + def call(df: DuckDBLazyFrame) -> list[Expression]: + cols = (col for _expr in exprs for col in _expr(df)) + return [func(cols)] + + def window_function( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + cols = ( + col for _expr in exprs for col in _expr.window_function(df, window_inputs) + ) + return [func(cols)] + + return self._expr( + call=call, + window_function=window_function, + evaluate_output_names=combine_evaluate_output_names(*exprs), + alias_output_names=combine_alias_output_names(*exprs), + backend_version=self._backend_version, + version=self._version, + ) + + def concat( + self, items: Iterable[DuckDBLazyFrame], *, how: ConcatMethod + ) -> DuckDBLazyFrame: + native_items = [item._native_frame for item in items] + items = list(items) + first = items[0] + schema = first.schema + if how == "vertical" and not all(x.schema == schema for x in items[1:]): + msg = "inputs should all have the same schema" + raise TypeError(msg) + res = reduce(lambda x, y: x.union(y), native_items) + return first._with_native(res) + + def concat_str( + self, *exprs: DuckDBExpr, separator: str, ignore_nulls: bool + ) -> DuckDBExpr: + def func(df: DuckDBLazyFrame) -> list[Expression]: + cols = list(chain.from_iterable(expr(df) for expr in exprs)) + if not ignore_nulls: + null_mask_result = reduce(operator.or_, (s.isnull() for s in cols)) + cols_separated = [ + y + for x in [ + (col.cast(VARCHAR),) + if i == len(cols) - 1 + else (col.cast(VARCHAR), lit(separator)) + for i, col in enumerate(cols) + ] + for y in x + ] + return [when(~null_mask_result, concat_str(*cols_separated))] + else: + return [concat_str(*cols, separator=separator)] + + return self._expr( + call=func, + evaluate_output_names=combine_evaluate_output_names(*exprs), + alias_output_names=combine_alias_output_names(*exprs), + backend_version=self._backend_version, + version=self._version, + ) + + def all_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr: + def func(cols: Iterable[Expression]) -> Expression: + return reduce(operator.and_, cols) + + return self._with_elementwise(func, *exprs) + + def any_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr: + def func(cols: Iterable[Expression]) -> Expression: + return reduce(operator.or_, cols) + + return self._with_elementwise(func, *exprs) + + def max_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr: + def func(cols: Iterable[Expression]) -> Expression: + return FunctionExpression("greatest", *cols) + + return self._with_elementwise(func, *exprs) + + def min_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr: + def func(cols: Iterable[Expression]) -> Expression: + return FunctionExpression("least", *cols) + + return self._with_elementwise(func, *exprs) + + def sum_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr: + def func(cols: Iterable[Expression]) -> Expression: + return reduce(operator.add, (CoalesceOperator(col, lit(0)) for col in cols)) + + return self._with_elementwise(func, *exprs) + + def mean_horizontal(self, *exprs: DuckDBExpr) -> DuckDBExpr: + def func(cols: Iterable[Expression]) -> Expression: + cols = list(cols) + return reduce( + operator.add, (CoalesceOperator(col, lit(0)) for col in cols) + ) / reduce(operator.add, (col.isnotnull().cast(BIGINT) for col in cols)) + + return self._with_elementwise(func, *exprs) + + def when(self, predicate: DuckDBExpr) -> DuckDBWhen: + return DuckDBWhen.from_expr(predicate, context=self) + + def lit(self, value: NonNestedLiteral, dtype: IntoDType | None) -> DuckDBExpr: + def func(_df: DuckDBLazyFrame) -> list[Expression]: + if dtype is not None: + return [ + lit(value).cast( + narwhals_to_native_dtype(dtype, version=self._version) # type: ignore[arg-type] + ) + ] + return [lit(value)] + + return self._expr( + func, + evaluate_output_names=lambda _df: ["literal"], + alias_output_names=None, + backend_version=self._backend_version, + version=self._version, + ) + + def len(self) -> DuckDBExpr: + def func(_df: DuckDBLazyFrame) -> list[Expression]: + return [FunctionExpression("count")] + + return self._expr( + call=func, + evaluate_output_names=lambda _df: ["len"], + alias_output_names=None, + backend_version=self._backend_version, + version=self._version, + ) + + +class DuckDBWhen(LazyWhen["DuckDBLazyFrame", Expression, DuckDBExpr]): + @property + def _then(self) -> type[DuckDBThen]: + return DuckDBThen + + def __call__(self, df: DuckDBLazyFrame) -> Sequence[Expression]: + self.when = when + self.lit = lit + return super().__call__(df) + + def _window_function( + self, df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> Sequence[Expression]: + self.when = when + self.lit = lit + return super()._window_function(df, window_inputs) + + +class DuckDBThen(LazyThen["DuckDBLazyFrame", Expression, DuckDBExpr], DuckDBExpr): ... diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/selectors.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/selectors.py new file mode 100644 index 0000000..ea1c6ba --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/selectors.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from narwhals._compliant import CompliantSelector, LazySelectorNamespace +from narwhals._duckdb.expr import DuckDBExpr + +if TYPE_CHECKING: + from duckdb import Expression # noqa: F401 + + from narwhals._duckdb.dataframe import DuckDBLazyFrame # noqa: F401 + + +class DuckDBSelectorNamespace(LazySelectorNamespace["DuckDBLazyFrame", "Expression"]): + @property + def _selector(self) -> type[DuckDBSelector]: + return DuckDBSelector + + +class DuckDBSelector( # type: ignore[misc] + CompliantSelector["DuckDBLazyFrame", "Expression"], DuckDBExpr +): + def _to_expr(self) -> DuckDBExpr: + return DuckDBExpr( + self._call, + self._window_function, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/series.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/series.py new file mode 100644 index 0000000..5b284b9 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/series.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from narwhals._duckdb.utils import DeferredTimeZone, native_to_narwhals_dtype +from narwhals.dependencies import get_duckdb + +if TYPE_CHECKING: + from types import ModuleType + + import duckdb + from typing_extensions import Never, Self + + from narwhals._utils import Version + from narwhals.dtypes import DType + + +class DuckDBInterchangeSeries: + def __init__(self, df: duckdb.DuckDBPyRelation, version: Version) -> None: + self._native_series = df + self._version = version + + def __narwhals_series__(self) -> Self: + return self + + def __native_namespace__(self) -> ModuleType: + return get_duckdb() # type: ignore[no-any-return] + + @property + def dtype(self) -> DType: + return native_to_narwhals_dtype( + self._native_series.types[0], + self._version, + DeferredTimeZone(self._native_series), + ) + + def __getattr__(self, attr: str) -> Never: + msg = ( # pragma: no cover + f"Attribute {attr} is not supported for interchange-level dataframes.\n\n" + "If you would like to see this kind of object better supported in " + "Narwhals, please open a feature request " + "at https://github.com/narwhals-dev/narwhals/issues." + ) + raise NotImplementedError(msg) # pragma: no cover diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/utils.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/utils.py new file mode 100644 index 0000000..c5d4872 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/utils.py @@ -0,0 +1,287 @@ +from __future__ import annotations + +from functools import lru_cache +from typing import TYPE_CHECKING, Any + +import duckdb + +from narwhals._utils import Version, isinstance_or_issubclass + +if TYPE_CHECKING: + from duckdb import DuckDBPyRelation, Expression + from duckdb.typing import DuckDBPyType + + from narwhals._duckdb.dataframe import DuckDBLazyFrame + from narwhals._duckdb.expr import DuckDBExpr + from narwhals.dtypes import DType + from narwhals.typing import IntoDType + +UNITS_DICT = { + "y": "year", + "q": "quarter", + "mo": "month", + "d": "day", + "h": "hour", + "m": "minute", + "s": "second", + "ms": "millisecond", + "us": "microsecond", + "ns": "nanosecond", +} + +col = duckdb.ColumnExpression +"""Alias for `duckdb.ColumnExpression`.""" + +lit = duckdb.ConstantExpression +"""Alias for `duckdb.ConstantExpression`.""" + +when = duckdb.CaseExpression +"""Alias for `duckdb.CaseExpression`.""" + + +def concat_str(*exprs: Expression, separator: str = "") -> Expression: + """Concatenate many strings, NULL inputs are skipped. + + Wraps [concat] and [concat_ws] `FunctionExpression`(s). + + Arguments: + exprs: Native columns. + separator: String that will be used to separate the values of each column. + + Returns: + A new native expression. + + [concat]: https://duckdb.org/docs/stable/sql/functions/char.html#concatstring- + [concat_ws]: https://duckdb.org/docs/stable/sql/functions/char.html#concat_wsseparator-string- + """ + return ( + duckdb.FunctionExpression("concat_ws", lit(separator), *exprs) + if separator + else duckdb.FunctionExpression("concat", *exprs) + ) + + +def evaluate_exprs( + df: DuckDBLazyFrame, /, *exprs: DuckDBExpr +) -> list[tuple[str, Expression]]: + native_results: list[tuple[str, Expression]] = [] + for expr in exprs: + native_series_list = expr._call(df) + output_names = expr._evaluate_output_names(df) + if expr._alias_output_names is not None: + output_names = expr._alias_output_names(output_names) + if len(output_names) != len(native_series_list): # pragma: no cover + msg = f"Internal error: got output names {output_names}, but only got {len(native_series_list)} results" + raise AssertionError(msg) + native_results.extend(zip(output_names, native_series_list)) + return native_results + + +class DeferredTimeZone: + """Object which gets passed between `native_to_narwhals_dtype` calls. + + DuckDB stores the time zone in the connection, rather than in the dtypes, so + this ensures that when calculating the schema of a dataframe with multiple + timezone-aware columns, that the connection's time zone is only fetched once. + + Note: we cannot make the time zone a cached `DuckDBLazyFrame` property because + the time zone can be modified after `DuckDBLazyFrame` creation: + + ```python + df = nw.from_native(rel) + print(df.collect_schema()) + rel.query("set timezone = 'Asia/Kolkata'") + print(df.collect_schema()) # should change to reflect new time zone + ``` + """ + + _cached_time_zone: str | None = None + + def __init__(self, rel: DuckDBPyRelation) -> None: + self._rel = rel + + @property + def time_zone(self) -> str: + """Fetch relation time zone (if it wasn't calculated already).""" + if self._cached_time_zone is None: + self._cached_time_zone = fetch_rel_time_zone(self._rel) + return self._cached_time_zone + + +def native_to_narwhals_dtype( + duckdb_dtype: DuckDBPyType, version: Version, deferred_time_zone: DeferredTimeZone +) -> DType: + duckdb_dtype_id = duckdb_dtype.id + dtypes = version.dtypes + + # Handle nested data types first + if duckdb_dtype_id == "list": + return dtypes.List( + native_to_narwhals_dtype(duckdb_dtype.child, version, deferred_time_zone) + ) + + if duckdb_dtype_id == "struct": + children = duckdb_dtype.children + return dtypes.Struct( + [ + dtypes.Field( + name=child[0], + dtype=native_to_narwhals_dtype(child[1], version, deferred_time_zone), + ) + for child in children + ] + ) + + if duckdb_dtype_id == "array": + child, size = duckdb_dtype.children + shape: list[int] = [size[1]] + + while child[1].id == "array": + child, size = child[1].children + shape.insert(0, size[1]) + + inner = native_to_narwhals_dtype(child[1], version, deferred_time_zone) + return dtypes.Array(inner=inner, shape=tuple(shape)) + + if duckdb_dtype_id == "enum": + if version is Version.V1: + return dtypes.Enum() # type: ignore[call-arg] + categories = duckdb_dtype.children[0][1] + return dtypes.Enum(categories=categories) + + if duckdb_dtype_id == "timestamp with time zone": + return dtypes.Datetime(time_zone=deferred_time_zone.time_zone) + + return _non_nested_native_to_narwhals_dtype(duckdb_dtype_id, version) + + +def fetch_rel_time_zone(rel: duckdb.DuckDBPyRelation) -> str: + result = rel.query( + "duckdb_settings()", "select value from duckdb_settings() where name = 'TimeZone'" + ).fetchone() + assert result is not None # noqa: S101 + return result[0] # type: ignore[no-any-return] + + +@lru_cache(maxsize=16) +def _non_nested_native_to_narwhals_dtype(duckdb_dtype_id: str, version: Version) -> DType: + dtypes = version.dtypes + return { + "hugeint": dtypes.Int128(), + "bigint": dtypes.Int64(), + "integer": dtypes.Int32(), + "smallint": dtypes.Int16(), + "tinyint": dtypes.Int8(), + "uhugeint": dtypes.UInt128(), + "ubigint": dtypes.UInt64(), + "uinteger": dtypes.UInt32(), + "usmallint": dtypes.UInt16(), + "utinyint": dtypes.UInt8(), + "double": dtypes.Float64(), + "float": dtypes.Float32(), + "varchar": dtypes.String(), + "date": dtypes.Date(), + "timestamp": dtypes.Datetime(), + "boolean": dtypes.Boolean(), + "interval": dtypes.Duration(), + "decimal": dtypes.Decimal(), + "time": dtypes.Time(), + "blob": dtypes.Binary(), + }.get(duckdb_dtype_id, dtypes.Unknown()) + + +def narwhals_to_native_dtype(dtype: IntoDType, version: Version) -> str: # noqa: C901, PLR0912, PLR0915 + dtypes = version.dtypes + if isinstance_or_issubclass(dtype, dtypes.Decimal): + msg = "Casting to Decimal is not supported yet." + raise NotImplementedError(msg) + if isinstance_or_issubclass(dtype, dtypes.Float64): + return "DOUBLE" + if isinstance_or_issubclass(dtype, dtypes.Float32): + return "FLOAT" + if isinstance_or_issubclass(dtype, dtypes.Int128): + return "INT128" + if isinstance_or_issubclass(dtype, dtypes.Int64): + return "BIGINT" + if isinstance_or_issubclass(dtype, dtypes.Int32): + return "INTEGER" + if isinstance_or_issubclass(dtype, dtypes.Int16): + return "SMALLINT" + if isinstance_or_issubclass(dtype, dtypes.Int8): + return "TINYINT" + if isinstance_or_issubclass(dtype, dtypes.UInt128): + return "UINT128" + if isinstance_or_issubclass(dtype, dtypes.UInt64): + return "UBIGINT" + if isinstance_or_issubclass(dtype, dtypes.UInt32): + return "UINTEGER" + if isinstance_or_issubclass(dtype, dtypes.UInt16): # pragma: no cover + return "USMALLINT" + if isinstance_or_issubclass(dtype, dtypes.UInt8): # pragma: no cover + return "UTINYINT" + if isinstance_or_issubclass(dtype, dtypes.String): + return "VARCHAR" + if isinstance_or_issubclass(dtype, dtypes.Boolean): # pragma: no cover + return "BOOLEAN" + if isinstance_or_issubclass(dtype, dtypes.Time): + return "TIME" + if isinstance_or_issubclass(dtype, dtypes.Binary): + return "BLOB" + if isinstance_or_issubclass(dtype, dtypes.Categorical): + msg = "Categorical not supported by DuckDB" + raise NotImplementedError(msg) + if isinstance_or_issubclass(dtype, dtypes.Enum): + if version is Version.V1: + msg = "Converting to Enum is not supported in narwhals.stable.v1" + raise NotImplementedError(msg) + if isinstance(dtype, dtypes.Enum): + categories = "'" + "', '".join(dtype.categories) + "'" + return f"ENUM ({categories})" + msg = "Can not cast / initialize Enum without categories present" + raise ValueError(msg) + + if isinstance_or_issubclass(dtype, dtypes.Datetime): + _time_unit = dtype.time_unit + _time_zone = dtype.time_zone + msg = "todo" + raise NotImplementedError(msg) + if isinstance_or_issubclass(dtype, dtypes.Duration): # pragma: no cover + _time_unit = dtype.time_unit + msg = "todo" + raise NotImplementedError(msg) + if isinstance_or_issubclass(dtype, dtypes.Date): # pragma: no cover + return "DATE" + if isinstance_or_issubclass(dtype, dtypes.List): + inner = narwhals_to_native_dtype(dtype.inner, version) + return f"{inner}[]" + if isinstance_or_issubclass(dtype, dtypes.Struct): # pragma: no cover + inner = ", ".join( + f'"{field.name}" {narwhals_to_native_dtype(field.dtype, version)}' + for field in dtype.fields + ) + return f"STRUCT({inner})" + if isinstance_or_issubclass(dtype, dtypes.Array): # pragma: no cover + shape = dtype.shape + duckdb_shape_fmt = "".join(f"[{item}]" for item in shape) + inner_dtype: Any = dtype + for _ in shape: + inner_dtype = inner_dtype.inner + duckdb_inner = narwhals_to_native_dtype(inner_dtype, version) + return f"{duckdb_inner}{duckdb_shape_fmt}" + msg = f"Unknown dtype: {dtype}" # pragma: no cover + raise AssertionError(msg) + + +def generate_partition_by_sql(*partition_by: str | Expression) -> str: + if not partition_by: + return "" + by_sql = ", ".join([f"{col(x) if isinstance(x, str) else x}" for x in partition_by]) + return f"partition by {by_sql}" + + +def generate_order_by_sql(*order_by: str, ascending: bool) -> str: + if ascending: + by_sql = ", ".join([f"{col(x)} asc nulls first" for x in order_by]) + else: + by_sql = ", ".join([f"{col(x)} desc nulls last" for x in order_by]) + return f"order by {by_sql}" |
