Source code for bigframes.core.indexes.base

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""An index based on a single column."""

from __future__ import annotations

import functools
import typing
from typing import cast, Hashable, Literal, Optional, overload, Sequence, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index
import google.cloud.bigquery as bigquery
import numpy as np
import pandas

from bigframes import dtypes
import bigframes.core.agg_expressions as ex_types
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
import bigframes.core.expression as ex
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
import bigframes.dtypes
import bigframes.formatting_helpers as formatter
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.series
import bigframes.session.execution_spec as ex_spec

if typing.TYPE_CHECKING:
    import bigframes.dataframe
    import bigframes.operations.strings
    import bigframes.series


[docs] class Index(vendored_pandas_index.Index): __doc__ = vendored_pandas_index.Index.__doc__ _query_job = None _block: blocks.Block _linked_frame: Union[ bigframes.dataframe.DataFrame, bigframes.series.Series, None ] = None # Must be above 5000 for pandas to delegate to bigframes for binops __pandas_priority__ = 12000 # Overrided on __new__ to create subclasses like pandas does def __new__( cls, data=None, dtype=None, *, name=None, session=None, ): import bigframes.dataframe as df import bigframes.series as series if isinstance(data, blocks.Block): block = data.select_columns([]) elif isinstance(data, df.DataFrame): raise ValueError("Cannot construct index from dataframe.") elif isinstance(data, series.Series) or isinstance(data, Index): if isinstance(data, series.Series): block = data._block block = block.set_index(col_ids=[data._value_column]) elif isinstance(data, Index): block = data._block index = Index(data=block) name = data.name if name is None else name if name is not None: index.name = name if dtype is not None: bf_dtype = bigframes.dtypes.bigframes_type(dtype) index = index.astype(bf_dtype) block = index._block elif isinstance(data, pandas.Index): pd_df = pandas.DataFrame(index=data) block = df.DataFrame(pd_df, session=session)._block else: if isinstance(dtype, str) and dtype.lower() == "json": dtype = bigframes.dtypes.JSON_DTYPE pd_index = pandas.Index(data=data, dtype=dtype, name=name) pd_df = pandas.DataFrame(index=pd_index) block = df.DataFrame(pd_df, session=session)._block # TODO: Support more index subtypes if len(block._index_columns) > 1: from bigframes.core.indexes.multi import MultiIndex klass: type[Index] = MultiIndex # type hint to make mypy happy elif _should_create_datetime_index(block): from bigframes.core.indexes.datetimes import DatetimeIndex klass = DatetimeIndex else: klass = cls result = typing.cast(Index, object.__new__(klass)) result._query_job = None result._block = block block.session._register_object(result) return result
[docs] @classmethod def from_frame( cls, frame: Union[bigframes.series.Series, bigframes.dataframe.DataFrame] ) -> Index: if len(frame._block.index_columns) == 0: raise bigframes.exceptions.NullIndexError( "Cannot access index properties with Null Index. Set an index using set_index." ) frame._block._throw_if_null_index("from_frame") index = Index(frame._block) index._linked_frame = frame return index
@property def _session(self): return self._block.session @property def name(self) -> blocks.Label: names = self.names if len(names) == 1: return self.names[0] else: # pandas returns None for MultiIndex.name. return None @name.setter def name(self, value: blocks.Label): self.names = [value] @property def names(self) -> typing.Sequence[blocks.Label]: return self._block._index_labels @names.setter def names(self, values: typing.Sequence[blocks.Label]): self.rename(values, inplace=True) @property def nlevels(self) -> int: return len(self._block.index_columns) @property def values(self) -> np.ndarray: return self.to_numpy() @property def ndim(self) -> int: return 1 @property def shape(self) -> typing.Tuple[int]: return (self._block.shape[0],) @property def dtype(self): dtype = self._block.index.dtypes[0] if self.nlevels == 1 else np.dtype("O") bigframes.dtypes.warn_on_db_dtypes_json_dtype([dtype]) return dtype @property def dtypes(self) -> pandas.Series: dtypes = self._block.index.dtypes bigframes.dtypes.warn_on_db_dtypes_json_dtype(dtypes) return pandas.Series( data=dtypes, index=typing.cast(typing.Tuple, self._block.index.names), ) def __setitem__(self, key, value) -> None: """Index objects are immutable. Use Index constructor to create modified Index.""" raise TypeError("Index does not support mutable operations") @property def size(self) -> int: return self.shape[0] @property def empty(self) -> bool: """Returns True if the Index is empty, otherwise returns False.""" return self.shape[0] == 0 @property @validations.requires_ordering() def is_monotonic_increasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_increasing(self._block.index_columns), ) @property @validations.requires_ordering() def is_monotonic_decreasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_decreasing(self._block.index_columns), ) @property def is_unique(self) -> bool: # TODO: Cache this at block level # Avoid circular imports return not self.has_duplicates @property def has_duplicates(self) -> bool: # TODO: Cache this at block level # Avoid circular imports import bigframes.core.block_transforms as block_ops import bigframes.dataframe as df duplicates_block, indicator = block_ops.indicate_duplicates( self._block, self._block.index_columns ) duplicates_block = duplicates_block.select_columns( [indicator] ).with_column_labels(["is_duplicate"]) duplicates_df = df.DataFrame(duplicates_block) return duplicates_df["is_duplicate"].any() @property def T(self) -> Index: return self.transpose() @property def query_job(self) -> bigquery.QueryJob: """BigQuery job metadata for the most recent query. Returns: The most recent `QueryJob <https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_. """ if self._query_job is None: _, query_job = self._block._compute_dry_run() self._query_job = query_job return self._query_job @property def str(self) -> bigframes.operations.strings.StringMethods: import bigframes.operations.strings return bigframes.operations.strings.StringMethods(self)
[docs] def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]: """Get integer location, slice or boolean mask for requested label. Args: key: The label to search for in the index. Returns: An integer, slice, or boolean mask representing the location(s) of the key. Raises: NotImplementedError: If the index has more than one level. KeyError: If the key is not found in the index. """ if self.nlevels != 1: raise NotImplementedError("get_loc only supports single-level indexes") # Get the index column from the block index_column = self._block.index_columns[0] # Use promote_offsets to get row numbers (similar to argmax/argmin implementation) block_with_offsets, offsets_id = self._block.promote_offsets( "temp_get_loc_offsets_" ) # Create expression to find matching positions match_expr = ops.eq_op.as_expr(ex.deref(index_column), ex.const(key)) block_with_offsets, match_col_id = block_with_offsets.project_expr(match_expr) # Filter to only rows where the key matches filtered_block = block_with_offsets.filter_by_id(match_col_id) # Check if key exists at all by counting count_agg = ex_types.UnaryAggregation(agg_ops.count_op, ex.deref(offsets_id)) count_result = filtered_block._expr.aggregate([(count_agg, "count")]) count_scalar = ( self._block.session._executor.execute( count_result, ex_spec.ExecutionSpec(promise_under_10gb=True) ) .batches() .to_py_scalar() ) if count_scalar == 0: raise KeyError(f"'{key}' is not in index") # If only one match, return integer position if count_scalar == 1: min_agg = ex_types.UnaryAggregation(agg_ops.min_op, ex.deref(offsets_id)) position_result = filtered_block._expr.aggregate([(min_agg, "position")]) position_scalar = ( self._block.session._executor.execute( position_result, ex_spec.ExecutionSpec(promise_under_10gb=True) ) .batches() .to_py_scalar() ) return int(position_scalar) # Handle multiple matches based on index monotonicity is_monotonic = self.is_monotonic_increasing or self.is_monotonic_decreasing if is_monotonic: return self._get_monotonic_slice(filtered_block, offsets_id) else: # Return boolean mask for non-monotonic duplicates mask_block = block_with_offsets.select_columns([match_col_id]) mask_block = mask_block.reset_index(drop=True) result_series = bigframes.series.Series(mask_block) return result_series.astype("boolean")
def _get_monotonic_slice( self, filtered_block, offsets_id: __builtins__.str ) -> slice: """Helper method to get a slice for monotonic duplicates with an optimized query.""" # Combine min and max aggregations into a single query for efficiency min_max_aggs = [ ( ex_types.UnaryAggregation(agg_ops.min_op, ex.deref(offsets_id)), "min_pos", ), ( ex_types.UnaryAggregation(agg_ops.max_op, ex.deref(offsets_id)), "max_pos", ), ] combined_result = filtered_block._expr.aggregate(min_max_aggs) # Execute query and extract positions result_df = ( self._block.session._executor.execute( combined_result, execution_spec=ex_spec.ExecutionSpec(promise_under_10gb=True), ) .batches() .to_pandas() ) min_pos = int(result_df["min_pos"].iloc[0]) max_pos = int(result_df["max_pos"].iloc[0]) # Create slice (stop is exclusive) return slice(min_pos, max_pos + 1) def __repr__(self) -> __builtins__.str: # Protect against errors with uninitialized Series. See: # https://github.com/googleapis/python-bigquery-dataframes/issues/728 if not hasattr(self, "_block"): return object.__repr__(self) # TODO(swast): Add a timeout here? If the query is taking a long time, # maybe we just print the job metadata that we have so far? # TODO(swast): Avoid downloading the whole series by using job # metadata, like we do with DataFrame. opts = bigframes.options.display max_results = opts.max_rows # anywdiget mode uses the same display logic as the "deferred" mode # for faster execution if opts.repr_mode in ("deferred", "anywidget"): _, dry_run_query_job = self._block._compute_dry_run() return formatter.repr_query_job(dry_run_query_job) pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) self._query_job = query_job return repr(pandas_df.index)
[docs] def copy(self, name: Optional[Hashable] = None): copy_index = Index(self._block) if name is not None: copy_index.name = name return copy_index
[docs] def to_series( self, index: Optional[Index] = None, name: Optional[Hashable] = None ) -> bigframes.series.Series: if self.nlevels != 1: NotImplementedError( f"Converting multi-index to series is not yet supported. {constants.FEEDBACK_LINK}" ) import bigframes.series name = self.name if name is None else name if index is None: return bigframes.series.Series( data=self, index=self, name=str(name), session=self._session ) else: return bigframes.series.Series( data=self, index=Index(index, session=self._session), name=str(name), session=self._session, )
[docs] def get_level_values(self, level) -> Index: level_n = level if isinstance(level, int) else self.names.index(level) block = self._block.drop_levels( [self._block.index_columns[i] for i in range(self.nlevels) if i != level_n] ) return Index(block)
def _memory_usage(self) -> int: (n_rows,) = self.shape return sum( self.dtypes.map( lambda dtype: bigframes.dtypes.DTYPE_BYTE_SIZES.get(dtype, 8) * n_rows ) )
[docs] def transpose(self) -> Index: return self
[docs] def sort_values( self, *, inplace: bool = False, ascending: bool = True, na_position: __builtins__.str = "last", ) -> Index: if na_position not in ["first", "last"]: raise ValueError("Param na_position must be one of 'first' or 'last'") na_last = na_position == "last" index_columns = self._block.index_columns ordering = [ order.ascending_over(column, na_last) if ascending else order.descending_over(column, na_last) for column in index_columns ] return Index(self._block.order_by(ordering))
[docs] def astype( self, dtype, *, errors: Literal["raise", "null"] = "raise", ) -> Index: if errors not in ["raise", "null"]: raise ValueError("Argument 'errors' must be one of 'raise' or 'null'") if self.nlevels > 1: raise TypeError("Multiindex does not support 'astype'") dtype = bigframes.dtypes.bigframes_type(dtype) return self._apply_unary_expr( ops.AsTypeOp(to_type=dtype, safe=(errors == "null")).as_expr( ex.free_var("arg") ) )
[docs] def all(self) -> bool: if self.nlevels > 1: raise TypeError("Multiindex does not support 'all'") return typing.cast(bool, self._apply_aggregation(agg_ops.all_op))
[docs] def any(self) -> bool: if self.nlevels > 1: raise TypeError("Multiindex does not support 'any'") return typing.cast(bool, self._apply_aggregation(agg_ops.any_op))
[docs] def nunique(self) -> int: return typing.cast(int, self._apply_aggregation(agg_ops.nunique_op))
[docs] def max(self) -> typing.Any: return self._apply_aggregation(agg_ops.max_op)
[docs] def min(self) -> typing.Any: return self._apply_aggregation(agg_ops.min_op)
[docs] @validations.requires_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( [ *[order.descending_over(col) for col in self._block.index_columns], order.ascending_over(row_nums), ] ) import bigframes.series as series return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0])
[docs] @validations.requires_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( [ *[order.ascending_over(col) for col in self._block.index_columns], order.ascending_over(row_nums), ] ) import bigframes.series as series return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0])
[docs] def value_counts( self, normalize: bool = False, sort: bool = True, ascending: bool = False, *, dropna: bool = True, ): block = block_ops.value_counts( self._block, self._block.index_columns, normalize=normalize, ascending=ascending, drop_na=dropna, ) import bigframes.series as series return series.Series(block)
[docs] def fillna(self, value=None) -> Index: if self.nlevels > 1: raise TypeError("Multiindex does not support 'fillna'") return self._apply_unary_expr( ops.fillna_op.as_expr(ex.free_var("arg"), ex.const(value)) )
@overload def rename( self, name: Union[blocks.Label, Sequence[blocks.Label]], ) -> Index: ... @overload def rename( self, name: Union[blocks.Label, Sequence[blocks.Label]], *, inplace: Literal[False], ) -> Index: ... @overload def rename( self, name: Union[blocks.Label, Sequence[blocks.Label]], *, inplace: Literal[True], ) -> None: ...
[docs] def rename( self, name: Union[blocks.Label, Sequence[blocks.Label]], *, inplace: bool = False, ) -> Optional[Index]: # Tuples are allowed as a label, but we specifically exclude them here. # This is because tuples are hashable, but we want to treat them as a # sequence. If name is iterable, we want to assume we're working with a # MultiIndex. Unfortunately, strings are iterable and we don't want a # list of all the characters, so specifically exclude the non-tuple # hashables. if isinstance(name, blocks.Label) and not isinstance(name, tuple): names = [name] else: names = list(name) if len(names) != self.nlevels: raise ValueError("'name' must be same length as levels") new_block = self._block.with_index_labels(names) if inplace: if self._linked_frame is not None: self._linked_frame._set_block( self._linked_frame._block.with_index_labels(names) ) self._block = new_block return None else: return Index(new_block)
[docs] def drop( self, labels: typing.Any, ) -> Index: # ignore axis, columns params block = self._block level_id = self._block.index_columns[0] if utils.is_list_like(labels): block, inverse_condition_id = block.apply_unary_op( level_id, ops.IsInOp(values=tuple(labels), match_nulls=True) ) block, condition_id = block.apply_unary_op( inverse_condition_id, ops.invert_op ) else: block, condition_id = block.project_expr( ops.ne_op.as_expr(level_id, ex.const(labels)) ) block = block.filter_by_id(condition_id, keep_null=True) block = block.drop_columns([condition_id]) return Index(block)
[docs] def dropna(self, how: typing.Literal["all", "any"] = "any") -> Index: if how not in ("any", "all"): raise ValueError("'how' must be one of 'any', 'all'") result = block_ops.dropna(self._block, self._block.index_columns, how=how) return Index(result)
[docs] def drop_duplicates(self, *, keep: __builtins__.str = "first") -> Index: if keep is not False: validations.enforce_ordered(self, "drop_duplicates") block = block_ops.drop_duplicates(self._block, self._block.index_columns, keep) return Index(block)
[docs] def unique(self, level: Hashable | int | None = None) -> Index: if level is None: return self.drop_duplicates() return self.get_level_values(level).drop_duplicates()
[docs] def isin(self, values) -> Index: import bigframes.series as series if isinstance(values, (series.Series, Index)): return Index(self.to_series().isin(values)) if not utils.is_list_like(values): raise TypeError( "only list-like objects are allowed to be passed to " f"isin(), you passed a [{type(values).__name__}]" ) return self._apply_unary_expr( ops.IsInOp(values=tuple(values), match_nulls=True).as_expr( ex.free_var("arg") ) ).fillna(value=False)
def __contains__(self, key) -> bool: hash(key) # to throw for unhashable values if self.nlevels == 0: return False if (not isinstance(key, tuple)) or (self.nlevels == 1): key = (key,) match_exprs = [] for key_part, index_col, dtype in zip( key, self._block.index_columns, self._block.index.dtypes ): key_type = bigframes.dtypes.is_compatible(key_part, dtype) if key_type is None: return False key_expr = ex.const(key_part, key_type) match_expr = ops.eq_null_match_op.as_expr(ex.deref(index_col), key_expr) match_exprs.append(match_expr) match_expr_final = functools.reduce(ops.and_op.as_expr, match_exprs) block, match_col = self._block.project_expr(match_expr_final) return cast(bool, block.get_stat(match_col, agg_ops.AnyOp())) def _apply_unary_op(self, op: ops.UnaryOp) -> Index: return self._apply_unary_expr(op.as_expr(ex.free_var("input"))) def _apply_unary_expr( self, op: ex.Expression, ) -> Index: """Applies a unary operator to the index.""" if len(op.free_variables) != 1: raise ValueError("Expression must have exactly 1 unbound variable.") unbound_variable = op.free_variables[0] block = self._block result_ids = [] for col in self._block.index_columns: block, result_id = block.project_expr( op.bind_variables({unbound_variable: ex.deref(col)}) ) result_ids.append(result_id) block = block.set_index(result_ids, index_labels=self._block.index.names) return Index(block) def _apply_aggregation(self, op: agg_ops.UnaryAggregateOp) -> typing.Any: if self.nlevels > 1: raise NotImplementedError(f"Multiindex does not yet support {op.name}") column_id = self._block.index_columns[0] return self._block.get_stat(column_id, op) def __getitem__(self, key: int) -> typing.Any: if isinstance(key, int): if key != -1: result_pd_df, _ = self._block.slice(key, key + 1, 1).to_pandas() else: # special case, want [-1:] instead of [-1:0] result_pd_df, _ = self._block.slice(key).to_pandas() if result_pd_df.index.empty: raise IndexError("single positional indexer is out-of-bounds") return result_pd_df.index[0] else: raise NotImplementedError(f"Index key not supported {key}") @overload def to_pandas( # type: ignore[overload-overlap] self, *, allow_large_results: Optional[bool] = ..., dry_run: Literal[False] = ..., ) -> pandas.Index: ... @overload def to_pandas( self, *, allow_large_results: Optional[bool] = ..., dry_run: Literal[True] = ... ) -> pandas.Series: ...
[docs] def to_pandas( self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False, ) -> pandas.Index | pandas.Series: """Gets the Index as a pandas Index. Args: allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. dry_run (bool, default False): If this argument is true, this method will not process the data. Instead, it returns a Pandas series containing dtype and the amount of bytes to be processed. Returns: pandas.Index | pandas.Series: A pandas Index with all of the labels from this Index. If dry run is set to True, returns a Series containing dry run statistics. """ if dry_run: dry_run_stats, dry_run_job = self._block.index._compute_dry_run( ordered=True ) self._query_job = dry_run_job return dry_run_stats df, query_job = self._block.index.to_pandas( ordered=True, allow_large_results=allow_large_results ) if query_job: self._query_job = query_job return df
[docs] def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray: return self.to_pandas(allow_large_results=allow_large_results).to_numpy( dtype, **kwargs )
__array__ = to_numpy
[docs] def to_list(self, *, allow_large_results: Optional[bool] = None) -> list: return self.to_pandas(allow_large_results=allow_large_results).to_list()
def __len__(self): return self.shape[0]
[docs] def item(self): # Docstring is in third_party/bigframes_vendored/pandas/core/indexes/base.py return self.to_series().peek(2).item()
def __eq__(self, other) -> Index: # type: ignore return self._apply_binary_op(other, ops.eq_op) def _apply_binary_op( self, other, op: ops.BinaryOp, alignment: typing.Literal["outer", "left"] = "outer", ) -> Index: # Note: alignment arg is for compatibility with accessors, is ignored as irrelevant for implicit joins. # TODO: Handle local objects, or objects not implicitly alignable? Gets ambiguous with partial ordering though if isinstance(other, (bigframes.series.Series, Index)): other = Index(other) if other.nlevels != self.nlevels: raise ValueError("Dimensions do not match") lexpr = self._block.expr rexpr = other._block.expr join_result = lexpr.try_row_join(rexpr) if join_result is None: raise ValueError("Cannot align objects") expr, (lmap, rmap) = join_result expr, res_ids = expr.compute_values( [ op.as_expr(lmap[lid], rmap[rid]) for lid, rid in zip(lexpr.column_ids, rexpr.column_ids) ] ) labels = self.names if self.names == other.names else [None] * len(res_ids) return Index( blocks.Block( expr.select_columns(res_ids), index_columns=res_ids, column_labels=[], index_labels=labels, ) ) elif ( isinstance(other, bigframes.dtypes.LOCAL_SCALAR_TYPES) and self.nlevels == 1 ): block, id = self._block.project_expr( op.as_expr(self._block.index_columns[0], ex.const(other)) ) return Index(block.set_index([id], index_labels=self.names)) elif isinstance(other, tuple) and len(other) == self.nlevels: block = self._block.project_exprs( [ op.as_expr(self._block.index_columns[i], ex.const(other[i])) for i in range(self.nlevels) ], labels=[None] * self.nlevels, drop=True, ) return Index(block.set_index(block.value_columns, index_labels=self.names)) else: return NotImplemented
def _should_create_datetime_index(block: blocks.Block) -> bool: if len(block.index.dtypes) != 1: return False return dtypes.is_datetime_like(block.index.dtypes[0])