diff options
| author | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
|---|---|---|
| committer | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
| commit | 5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch) | |
| tree | 8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/narwhals/_ibis/dataframe.py | |
| parent | b832d364da8c2efe09e3f75828caf73c50d01ce3 (diff) | |
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/narwhals/_ibis/dataframe.py')
| -rw-r--r-- | venv/lib/python3.8/site-packages/narwhals/_ibis/dataframe.py | 430 |
1 files changed, 430 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/narwhals/_ibis/dataframe.py b/venv/lib/python3.8/site-packages/narwhals/_ibis/dataframe.py new file mode 100644 index 0000000..4e18fa6 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_ibis/dataframe.py @@ -0,0 +1,430 @@ +from __future__ import annotations + +import operator +from typing import ( + TYPE_CHECKING, + Any, + Iterable, + Iterator, + Literal, + Mapping, + Sequence, + cast, +) + +import ibis +import ibis.expr.types as ir + +from narwhals._ibis.utils import evaluate_exprs, native_to_narwhals_dtype +from narwhals._utils import ( + Implementation, + Version, + not_implemented, + parse_columns_to_drop, + parse_version, + validate_backend_version, +) +from narwhals.exceptions import ColumnNotFoundError, InvalidOperationError +from narwhals.typing import CompliantLazyFrame + +if TYPE_CHECKING: + from types import ModuleType + + import pandas as pd + import pyarrow as pa + from ibis.expr.operations import Binary + from typing_extensions import Self, TypeAlias, TypeIs + + from narwhals._compliant.typing import CompliantDataFrameAny + from narwhals._ibis.expr import IbisExpr + from narwhals._ibis.group_by import IbisGroupBy + from narwhals._ibis.namespace import IbisNamespace + from narwhals._ibis.series import IbisInterchangeSeries + from narwhals._utils import _FullContext + from narwhals.dataframe import LazyFrame + from narwhals.dtypes import DType + from narwhals.stable.v1 import DataFrame as DataFrameV1 + from narwhals.typing import AsofJoinStrategy, JoinStrategy, LazyUniqueKeepStrategy + + JoinPredicates: TypeAlias = "Sequence[ir.BooleanColumn] | Sequence[str]" + + +class IbisLazyFrame( + CompliantLazyFrame[ + "IbisExpr", "ir.Table", "LazyFrame[ir.Table] | DataFrameV1[ir.Table]" + ] +): + _implementation = Implementation.IBIS + + def __init__( + self, df: ir.Table, *, backend_version: tuple[int, ...], version: Version + ) -> None: + self._native_frame: ir.Table = df + self._version = version + self._backend_version = backend_version + self._cached_schema: dict[str, DType] | None = None + self._cached_columns: list[str] | None = None + validate_backend_version(self._implementation, self._backend_version) + + @staticmethod + def _is_native(obj: ir.Table | Any) -> TypeIs[ir.Table]: + return isinstance(obj, ir.Table) + + @classmethod + def from_native(cls, data: ir.Table, /, *, context: _FullContext) -> Self: + return cls( + data, backend_version=context._backend_version, version=context._version + ) + + def to_narwhals(self) -> LazyFrame[ir.Table] | DataFrameV1[ir.Table]: + if self._version is Version.MAIN: + return self._version.lazyframe(self, level="lazy") + + from narwhals.stable.v1 import DataFrame as DataFrameV1 + + return DataFrameV1(self, level="interchange") + + def __narwhals_dataframe__(self) -> Self: # pragma: no cover + # Keep around for backcompat. + if self._version is not Version.V1: + msg = "__narwhals_dataframe__ is not implemented for IbisLazyFrame" + raise AttributeError(msg) + return self + + def __narwhals_lazyframe__(self) -> Self: + return self + + def __native_namespace__(self) -> ModuleType: + return ibis + + def __narwhals_namespace__(self) -> IbisNamespace: + from narwhals._ibis.namespace import IbisNamespace + + return IbisNamespace(backend_version=self._backend_version, version=self._version) + + def get_column(self, name: str) -> IbisInterchangeSeries: + from narwhals._ibis.series import IbisInterchangeSeries + + return IbisInterchangeSeries(self.native.select(name), version=self._version) + + def _iter_columns(self) -> Iterator[ir.Expr]: + for name in self.columns: + yield self.native[name] + + def collect( + self, backend: ModuleType | Implementation | str | None, **kwargs: Any + ) -> CompliantDataFrameAny: + if backend is None or backend is Implementation.PYARROW: + import pyarrow as pa # ignore-banned-import + + from narwhals._arrow.dataframe import ArrowDataFrame + + return ArrowDataFrame( + self.native.to_pyarrow(), + backend_version=parse_version(pa), + version=self._version, + validate_column_names=True, + ) + + if backend is Implementation.PANDAS: + import pandas as pd # ignore-banned-import + + from narwhals._pandas_like.dataframe import PandasLikeDataFrame + + return PandasLikeDataFrame( + self.native.to_pandas(), + implementation=Implementation.PANDAS, + backend_version=parse_version(pd), + version=self._version, + validate_column_names=True, + ) + + if backend is Implementation.POLARS: + import polars as pl # ignore-banned-import + + from narwhals._polars.dataframe import PolarsDataFrame + + return PolarsDataFrame( + self.native.to_polars(), + backend_version=parse_version(pl), + version=self._version, + ) + + msg = f"Unsupported `backend` value: {backend}" # pragma: no cover + raise ValueError(msg) # pragma: no cover + + def head(self, n: int) -> Self: + return self._with_native(self.native.head(n)) + + def simple_select(self, *column_names: str) -> Self: + return self._with_native(self.native.select(*column_names)) + + def aggregate(self, *exprs: IbisExpr) -> Self: + selection = [ + cast("ir.Scalar", val.name(name)) + for name, val in evaluate_exprs(self, *exprs) + ] + return self._with_native(self.native.aggregate(selection)) + + def select(self, *exprs: IbisExpr) -> Self: + selection = [val.name(name) for name, val in evaluate_exprs(self, *exprs)] + if not selection: + msg = "At least one expression must be provided to `select` with the Ibis backend." + raise ValueError(msg) + + t = self.native.select(*selection) + return self._with_native(t) + + def drop(self, columns: Sequence[str], *, strict: bool) -> Self: + columns_to_drop = parse_columns_to_drop(self, columns, strict=strict) + selection = (col for col in self.columns if col not in columns_to_drop) + return self._with_native(self.native.select(*selection)) + + def lazy(self, *, backend: Implementation | None = None) -> Self: + # The `backend`` argument has no effect but we keep it here for + # backwards compatibility because in `narwhals.stable.v1` + # function `.from_native()` will return a DataFrame for Ibis. + + if backend is not None: # pragma: no cover + msg = "`backend` argument is not supported for Ibis" + raise ValueError(msg) + return self + + def with_columns(self, *exprs: IbisExpr) -> Self: + new_columns_map = dict(evaluate_exprs(self, *exprs)) + return self._with_native(self.native.mutate(**new_columns_map)) + + def filter(self, predicate: IbisExpr) -> Self: + # `[0]` is safe as the predicate's expression only returns a single column + mask = cast("ir.BooleanValue", predicate(self)[0]) + return self._with_native(self.native.filter(mask)) + + @property + def schema(self) -> dict[str, DType]: + if self._cached_schema is None: + # Note: prefer `self._cached_schema` over `functools.cached_property` + # due to Python3.13 failures. + self._cached_schema = { + name: native_to_narwhals_dtype(dtype, self._version) + for name, dtype in self.native.schema().fields.items() + } + return self._cached_schema + + @property + def columns(self) -> list[str]: + if self._cached_columns is None: + self._cached_columns = ( + list(self.schema) + if self._cached_schema is not None + else list(self.native.columns) + ) + return self._cached_columns + + def to_pandas(self) -> pd.DataFrame: + # only if version is v1, keep around for backcompat + import pandas as pd # ignore-banned-import() + + if parse_version(pd) >= (1, 0, 0): + return self.native.to_pandas() + else: # pragma: no cover + msg = f"Conversion to pandas requires pandas>=1.0.0, found {pd.__version__}" + raise NotImplementedError(msg) + + def to_arrow(self) -> pa.Table: + # only if version is v1, keep around for backcompat + return self.native.to_pyarrow() + + def _with_version(self, version: Version) -> Self: + return self.__class__( + self.native, version=version, backend_version=self._backend_version + ) + + def _with_native(self, df: ir.Table) -> Self: + return self.__class__( + df, backend_version=self._backend_version, version=self._version + ) + + def group_by( + self, keys: Sequence[str] | Sequence[IbisExpr], *, drop_null_keys: bool + ) -> IbisGroupBy: + from narwhals._ibis.group_by import IbisGroupBy + + return IbisGroupBy(self, keys, drop_null_keys=drop_null_keys) + + def rename(self, mapping: Mapping[str, str]) -> Self: + def _rename(col: str) -> str: + return mapping.get(col, col) + + return self._with_native(self.native.rename(_rename)) + + @staticmethod + def _join_drop_duplicate_columns(df: ir.Table, columns: Iterable[str], /) -> ir.Table: + """Ibis adds a suffix to the right table col, even when it matches the left during a join.""" + duplicates = set(df.columns).intersection(columns) + return df.drop(*duplicates) if duplicates else df + + def join( + self, + other: Self, + *, + how: JoinStrategy, + left_on: Sequence[str] | None, + right_on: Sequence[str] | None, + suffix: str, + ) -> Self: + how_native = "outer" if how == "full" else how + rname = "{name}" + suffix + if other == self: + # Ibis does not support self-references unless created as a view + other = self._with_native(other.native.view()) + if how_native == "cross": + joined = self.native.join(other.native, how=how_native, rname=rname) + return self._with_native(joined) + # help mypy + assert left_on is not None # noqa: S101 + assert right_on is not None # noqa: S101 + predicates = self._convert_predicates(other, left_on, right_on) + joined = self.native.join(other.native, predicates, how=how_native, rname=rname) + if how_native == "left": + right_names = (n + suffix for n in right_on) + joined = self._join_drop_duplicate_columns(joined, right_names) + it = (cast("Binary", p.op()) for p in predicates if not isinstance(p, str)) + to_drop = [] + for pred in it: + right = pred.right.name + # Mirrors how polars works. + if right not in self.columns and pred.left.name != right: + to_drop.append(right) + if to_drop: + joined = joined.drop(*to_drop) + return self._with_native(joined) + + def join_asof( + self, + other: Self, + *, + left_on: str, + right_on: str, + by_left: Sequence[str] | None, + by_right: Sequence[str] | None, + strategy: AsofJoinStrategy, + suffix: str, + ) -> Self: + rname = "{name}" + suffix + strategy_op = {"backward": operator.ge, "forward": operator.le} + predicates: JoinPredicates = [] + if op := strategy_op.get(strategy): + on: ir.BooleanColumn = op(self.native[left_on], other.native[right_on]) + else: + msg = "Only `backward` and `forward` strategies are currently supported for Ibis" + raise NotImplementedError(msg) + if by_left is not None and by_right is not None: + predicates = self._convert_predicates(other, by_left, by_right) + joined = self.native.asof_join(other.native, on, predicates, rname=rname) + joined = self._join_drop_duplicate_columns(joined, [right_on + suffix]) + if by_right is not None: + right_names = (n + suffix for n in by_right) + joined = self._join_drop_duplicate_columns(joined, right_names) + return self._with_native(joined) + + def _convert_predicates( + self, other: Self, left_on: Sequence[str], right_on: Sequence[str] + ) -> JoinPredicates: + if left_on == right_on: + return left_on + return [ + cast("ir.BooleanColumn", (self.native[left] == other.native[right])) + for left, right in zip(left_on, right_on) + ] + + def collect_schema(self) -> dict[str, DType]: + return { + name: native_to_narwhals_dtype(dtype, self._version) + for name, dtype in self.native.schema().fields.items() + } + + def unique( + self, subset: Sequence[str] | None, *, keep: LazyUniqueKeepStrategy + ) -> Self: + if subset_ := subset if keep == "any" else (subset or self.columns): + # Sanitise input + if any(x not in self.columns for x in subset_): + msg = f"Columns {set(subset_).difference(self.columns)} not found in {self.columns}." + raise ColumnNotFoundError(msg) + + mapped_keep: dict[str, Literal["first"] | None] = { + "any": "first", + "none": None, + } + to_keep = mapped_keep[keep] + return self._with_native(self.native.distinct(on=subset_, keep=to_keep)) + return self._with_native(self.native.distinct(on=subset)) + + def sort(self, *by: str, descending: bool | Sequence[bool], nulls_last: bool) -> Self: + if isinstance(descending, bool): + descending = [descending for _ in range(len(by))] + + sort_cols = [] + + for i in range(len(by)): + direction_fn = ibis.desc if descending[i] else ibis.asc + col = direction_fn(by[i], nulls_first=not nulls_last) + sort_cols.append(cast("ir.Column", col)) + + return self._with_native(self.native.order_by(*sort_cols)) + + def drop_nulls(self, subset: Sequence[str] | None) -> Self: + subset_ = subset if subset is not None else self.columns + return self._with_native(self.native.drop_null(subset_)) + + def explode(self, columns: Sequence[str]) -> Self: + dtypes = self._version.dtypes + schema = self.collect_schema() + for col in columns: + dtype = schema[col] + + if dtype != dtypes.List: + msg = ( + f"`explode` operation not supported for dtype `{dtype}`, " + "expected List type" + ) + raise InvalidOperationError(msg) + + if len(columns) != 1: + msg = ( + "Exploding on multiple columns is not supported with Ibis backend since " + "we cannot guarantee that the exploded columns have matching element counts." + ) + raise NotImplementedError(msg) + + return self._with_native(self.native.unnest(columns[0], keep_empty=True)) + + def unpivot( + self, + on: Sequence[str] | None, + index: Sequence[str] | None, + variable_name: str, + value_name: str, + ) -> Self: + import ibis.selectors as s + + index_: Sequence[str] = [] if index is None else index + on_: Sequence[str] = ( + [c for c in self.columns if c not in index_] if on is None else on + ) + + # Discard columns not in the index + final_columns = list(dict.fromkeys([*index_, variable_name, value_name])) + + unpivoted = self.native.pivot_longer( + s.cols(*on_), names_to=variable_name, values_to=value_name + ) + return self._with_native(unpivoted.select(*final_columns)) + + gather_every = not_implemented.deprecated( + "`LazyFrame.gather_every` is deprecated and will be removed in a future version." + ) + tail = not_implemented.deprecated( + "`LazyFrame.tail` is deprecated and will be removed in a future version." + ) + with_row_index = not_implemented() |
