Source code for bigframes.dataframe

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""DataFrame is a two dimensional data structure."""

from __future__ import annotations

import datetime
import inspect
import itertools
import json
import re
import sys
import textwrap
import traceback
import typing
from typing import (
    Any,
    Callable,
    cast,
    Dict,
    Hashable,
    Iterable,
    List,
    Literal,
    Mapping,
    Optional,
    overload,
    Sequence,
    Tuple,
    Union,
)
import warnings

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.frame as vendored_pandas_frame
import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing
import google.api_core.exceptions
import google.cloud.bigquery as bigquery
import numpy
import pandas
from pandas.api import extensions as pd_ext
import pandas.io.formats.format
import pyarrow
import tabulate

import bigframes._config.display_options as display_options
import bigframes.constants
import bigframes.core
from bigframes.core import agg_expressions, log_adapter
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
import bigframes.core.convert
import bigframes.core.explode
import bigframes.core.expression as ex
import bigframes.core.groupby as groupby
import bigframes.core.guid
import bigframes.core.indexers as indexers
import bigframes.core.indexes as indexes
import bigframes.core.interchange
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
import bigframes.core.window
from bigframes.core.window import rolling
import bigframes.core.window_spec as windows
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.formatting_helpers as formatter
import bigframes.functions
from bigframes.functions import function_typing
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.operations.ai
import bigframes.operations.plotting as plotting
import bigframes.operations.semantics
import bigframes.operations.structs
import bigframes.series
import bigframes.session._io.bigquery
import bigframes.session.execution_spec as ex_spec

if typing.TYPE_CHECKING:
    from _typeshed import SupportsRichComparison

    import bigframes.session

    SingleItemValue = Union[
        bigframes.series.Series, int, float, str, pandas.Timedelta, Callable
    ]
    MultiItemValue = Union[
        "DataFrame", Sequence[int | float | str | pandas.Timedelta | Callable]
    ]

LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]

ERROR_IO_ONLY_GS_PATHS = f"Only Google Cloud Storage (gs://...) paths are supported. {constants.FEEDBACK_LINK}"
ERROR_IO_REQUIRES_WILDCARD = (
    "Google Cloud Storage path must contain a wildcard '*' character. See: "
    "https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#export_data_statement"
    f"{constants.FEEDBACK_LINK}"
)


# Inherits from pandas DataFrame so that we can use the same docstrings.
[docs] @log_adapter.class_logger class DataFrame(vendored_pandas_frame.DataFrame): __doc__ = vendored_pandas_frame.DataFrame.__doc__ # internal flag to disable cache at all _disable_cache_override: bool = False # Must be above 5000 for pandas to delegate to bigframes for binops __pandas_priority__ = 15000
[docs] def __init__( self, data=None, index: vendored_pandas_typing.Axes | None = None, columns: vendored_pandas_typing.Axes | None = None, dtype: typing.Optional[ bigframes.dtypes.DtypeString | bigframes.dtypes.Dtype ] = None, copy: typing.Optional[bool] = None, *, session: typing.Optional[bigframes.session.Session] = None, ): global bigframes self._query_job: Optional[bigquery.QueryJob] = None if copy is not None and not copy: raise ValueError( f"DataFrame constructor only supports copy=True. {constants.FEEDBACK_LINK}" ) # Ignore object dtype if provided, as it provides no additional # information about what BigQuery type to use. if dtype is not None and bigframes.dtypes.is_object_like(dtype): dtype = None # Check to see if constructing from BigQuery-backed objects before # falling back to pandas constructor block = None if isinstance(data, blocks.Block): block = data elif isinstance(data, DataFrame): block = data._get_block() # Dict of Series elif ( utils.is_dict_like(data) and len(data) >= 1 and any( isinstance(data[key], bigframes.series.Series) for key in data.keys() ) ): if not all( isinstance(data[key], bigframes.series.Series) for key in data.keys() ): # TODO(tbergeron): Support local list/series data by converting to memtable. raise NotImplementedError( f"Cannot mix Series with other types. {constants.FEEDBACK_LINK}" ) keys = list(data.keys()) first_label, first_series = keys[0], data[keys[0]] block = ( typing.cast(bigframes.series.Series, first_series) ._get_block() .with_column_labels([first_label]) ) for key in keys[1:]: other = typing.cast(bigframes.series.Series, data[key]) other_block = other._block.with_column_labels([key]) # Pandas will keep original sorting if all indices are aligned. # We cannot detect this easily however, and so always sort on index block, _ = block.join( # type:ignore other_block, how="outer", sort=True ) if block: if index is not None: bf_index = indexes.Index(index) idx_block = bf_index._block idx_cols = idx_block.index_columns block, (_, r_mapping) = block.reset_index().join( bf_index._block.reset_index(), how="inner" ) block = block.set_index([r_mapping[idx_col] for idx_col in idx_cols]) if columns: column_ids = [ block.resolve_label_exact_or_error(label) for label in list(columns) ] block = block.select_columns(column_ids) # type:ignore if dtype: bf_dtype = bigframes.dtypes.bigframes_type(dtype) block = block.multi_apply_unary_op(ops.AsTypeOp(to_type=bf_dtype)) else: if isinstance(dtype, str) and dtype.lower() == "json": dtype = bigframes.dtypes.JSON_DTYPE import bigframes.pandas pd_dataframe = pandas.DataFrame( data=data, index=index, # type:ignore columns=columns, # type:ignore dtype=dtype, # type:ignore ) if session: block = session.read_pandas(pd_dataframe)._get_block() else: block = bigframes.pandas.read_pandas(pd_dataframe)._get_block() # We use _block as an indicator in __getattr__ and __setattr__ to see # if the object is fully initialized, so make sure we set the _block # attribute last. self._block = block self._block.session._register_object(self)
def __dir__(self): return dir(type(self)) + [ label for label in self._block.column_labels if label and isinstance(label, str) ] def _ipython_key_completions_(self) -> List[str]: return list( [ label for label in self._block.column_labels if label and isinstance(label, str) ] ) def _find_indices( self, columns: Union[blocks.Label, Sequence[blocks.Label]], tolerance: bool = False, ) -> Sequence[int]: """Find corresponding indices in df._block.column_labels for column name(s). Order is kept the same as input names order. Args: columns: column name(s) tolerance: True to pass through columns not found. False to raise ValueError. """ col_ids = self._sql_names(columns, tolerance) return [self._block.value_columns.index(col_id) for col_id in col_ids] def _resolve_label_exact(self, label) -> Optional[str]: return self._block.resolve_label_exact(label) def _sql_names( self, columns: Union[blocks.Label, Sequence[blocks.Label], pandas.Index], tolerance: bool = False, ) -> Sequence[str]: """Retrieve sql name (column name in BQ schema) of column(s).""" labels = ( columns if utils.is_list_like(columns) and not isinstance(columns, tuple) else [columns] ) # type:ignore results: Sequence[str] = [] for label in labels: col_ids = self._block.label_to_col_id.get(label, []) if not tolerance and len(col_ids) == 0: raise ValueError(f"Column name {label} doesn't exist") results = (*results, *col_ids) return results @property @validations.requires_index def index( self, ) -> indexes.Index: return indexes.Index.from_frame(self) @index.setter def index(self, value): # TODO: Handle assigning MultiIndex result = self._assign_single_item("_new_bf_index", value).set_index( "_new_bf_index" ) self._set_block(result._get_block()) self.index.name = value.name if hasattr(value, "name") else None @property @validations.requires_index def loc(self) -> indexers.LocDataFrameIndexer: return indexers.LocDataFrameIndexer(self) @property @validations.requires_ordering() def iloc(self) -> indexers.ILocDataFrameIndexer: return indexers.ILocDataFrameIndexer(self) @property @validations.requires_ordering() def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) @property @validations.requires_index def at(self) -> indexers.AtDataFrameIndexer: return indexers.AtDataFrameIndexer(self) @property def dtypes(self) -> pandas.Series: dtypes = self._block.dtypes bigframes.dtypes.warn_on_db_dtypes_json_dtype(dtypes) return pandas.Series(data=dtypes, index=self._block.column_labels) @property def columns(self) -> pandas.Index: return self.dtypes.index @columns.setter def columns(self, labels: pandas.Index): new_block = self._block.with_column_labels(labels) self._set_block(new_block) @property def shape(self) -> Tuple[int, int]: return self._block.shape @property def size(self) -> int: rows, cols = self.shape return rows * cols @property def ndim(self) -> int: return 2 @property def empty(self) -> bool: return self.size == 0 @property def values(self) -> numpy.ndarray: return self.to_numpy() @property def bqclient(self) -> bigframes.Session: """BigQuery REST API Client the DataFrame uses for operations.""" return self._session.bqclient @property def _session(self) -> bigframes.Session: return self._get_block().expr.session @property def _has_index(self) -> bool: return len(self._block.index_columns) > 0 @property @validations.requires_ordering() def T(self) -> DataFrame: return DataFrame(self._get_block().transpose())
[docs] @validations.requires_index @validations.requires_ordering() def transpose(self) -> DataFrame: return self.T
def __len__(self): rows, _ = self.shape return rows __len__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__len__) def __iter__(self): return iter(self.columns) def __contains__(self, key) -> bool: return key in self.columns
[docs] def astype( self, dtype: Union[ bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype, type, dict[str, Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype]], ], *, errors: Literal["raise", "null"] = "raise", ) -> DataFrame: if errors not in ["raise", "null"]: raise ValueError("Arg 'error' must be one of 'raise' or 'null'") safe_cast = errors == "null" if isinstance(dtype, dict): result = self.copy() for col, to_type in dtype.items(): result[col] = result[col].astype(to_type) return result dtype = bigframes.dtypes.bigframes_type(dtype) return self._apply_unary_op(ops.AsTypeOp(dtype, safe_cast))
def _should_sql_have_index(self) -> bool: """Should the SQL we pass to BQML and other I/O include the index?""" return self._has_index and ( self.index.name is not None or len(self.index.names) > 1 ) def _to_placeholder_table(self, dry_run: bool = False) -> bigquery.TableReference: """Compiles this DataFrame's expression tree to SQL and saves it to a (temporary) view or table (in the case of a dry run). """ return self._block.to_placeholder_table( include_index=self._should_sql_have_index(), dry_run=dry_run ) def _to_sql_query( self, include_index: bool, enable_cache: bool = True ) -> Tuple[str, list[str], list[blocks.Label]]: """Compiles this DataFrame's expression tree to SQL, optionally including index columns. Args: include_index (bool): whether to include index columns. Returns: Tuple[sql_string, index_column_id_list, index_column_label_list]: If include_index is set to False, index_column_id_list and index_column_label_list return empty lists. """ return self._block.to_sql_query(include_index, enable_cache=enable_cache) @property def sql(self) -> str: """Compiles this DataFrame's expression tree to SQL. Returns: str: string representing the compiled SQL. """ try: include_index = self._should_sql_have_index() sql, _, _ = self._to_sql_query(include_index=include_index) return sql except AttributeError as e: # Workaround for a development-mode debugging issue: # An `AttributeError` originating *inside* this @property getter (e.g., due to # a typo or referencing a non-existent attribute) can be mistakenly intercepted # by the class's __getattr__ method if one is defined. # We catch the AttributeError and raise SyntaxError instead to make it clear # the error originates *here* in the property implementation. # See: https://stackoverflow.com/questions/50542177/correct-handling-of-attributeerror-in-getattr-when-using-property raise SyntaxError( "AttributeError encountered. Please check the implementation for incorrect attribute access." ) from e @property def query_job(self) -> Optional[bigquery.QueryJob]: """BigQuery job metadata for the most recent query. Returns: None or google.cloud.bigquery.QueryJob: The most recent `QueryJob <https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_. """ if self._query_job is None: self._set_internal_query_job(self._compute_dry_run()) return self._query_job
[docs] def memory_usage(self, index: bool = True): n_rows, _ = self.shape # like pandas, treat all variable-size objects as just 8-byte pointers, ignoring actual object column_sizes = self.dtypes.map( lambda dtype: bigframes.dtypes.DTYPE_BYTE_SIZES.get(dtype, 8) * n_rows ) if index and self._has_index: index_size = pandas.Series([self.index._memory_usage()], index=["Index"]) column_sizes = pandas.concat([index_size, column_sizes]) return column_sizes
[docs] def info( self, verbose: Optional[bool] = None, buf=None, max_cols: Optional[int] = None, memory_usage: Optional[bool] = None, show_counts: Optional[bool] = None, ): obuf = buf or sys.stdout n_rows, n_columns = self.shape max_cols = ( max_cols if max_cols is not None else bigframes.options.display.max_info_columns ) show_all_columns = verbose if verbose is not None else (n_columns < max_cols) obuf.write(f"{type(self)}\n") if self._block.has_index: index_type = "MultiIndex" if self.index.nlevels > 1 else "Index" index_stats = f"{n_rows} entries" if n_rows > 0: # These accessses are kind of expensive, maybe should try to skip? first_indice = self.index[0] last_indice = self.index[-1] index_stats += f", {first_indice} to {last_indice}" obuf.write(f"{index_type}: {index_stats}\n") else: obuf.write("NullIndex\n") if n_columns == 0: # We don't display any more information if the dataframe has no columns obuf.write("Empty DataFrame\n") return dtype_strings = self.dtypes.astype("string") if show_all_columns: obuf.write(f"Data columns (total {n_columns} columns):\n") column_info = self.columns.to_frame(name="Column") max_rows = bigframes.options.display.max_info_rows too_many_rows = n_rows > max_rows if max_rows is not None else False if show_counts if show_counts is not None else (not too_many_rows): non_null_counts = self.count().to_pandas() column_info["Non-Null Count"] = non_null_counts.map( lambda x: f"{int(x)} non-null" ) column_info["Dtype"] = dtype_strings column_info = column_info.reset_index(drop=True) column_info.index.name = "#" column_info_formatted = tabulate.tabulate(column_info, headers="keys") # type: ignore obuf.write(column_info_formatted) obuf.write("\n") else: # Just number of columns and first, last obuf.write( f"Columns: {n_columns} entries, {self.columns[0]} to {self.columns[-1]}\n" ) dtype_counts = dtype_strings.value_counts().sort_index(ascending=True).items() dtype_counts_formatted = ", ".join( f"{dtype}({count})" for dtype, count in dtype_counts ) obuf.write(f"dtypes: {dtype_counts_formatted}\n") show_memory = ( memory_usage if memory_usage is not None else bigframes.options.display.memory_usage ) if show_memory: # TODO: Convert to different units (kb, mb, etc.) obuf.write(f"memory usage: {self.memory_usage().sum()} bytes\n")
[docs] def select_dtypes(self, include=None, exclude=None) -> DataFrame: # Create empty pandas dataframe with same schema and then leverage actual pandas implementation as_pandas = pandas.DataFrame( { col_id: pandas.Series([], dtype=dtype) for col_id, dtype in zip(self._block.value_columns, self._block.dtypes) } ) selected_columns = tuple( as_pandas.select_dtypes(include=include, exclude=exclude).columns ) return DataFrame(self._block.select_columns(selected_columns))
def _set_internal_query_job(self, query_job: Optional[bigquery.QueryJob]): self._query_job = query_job @overload def __getitem__( self, key: bigframes.series.Series, ) -> DataFrame: ... @overload def __getitem__( self, key: slice, ) -> DataFrame: ... @overload def __getitem__( self, key: List[str], ) -> DataFrame: ... @overload def __getitem__( self, key: List[blocks.Label], ) -> DataFrame: ... @overload def __getitem__(self, key: pandas.Index) -> DataFrame: ... @overload def __getitem__( self, key: blocks.Label, ) -> bigframes.series.Series: ... def __getitem__( self, key: Union[ blocks.Label, List[str], List[blocks.Label], # Index of column labels can be treated the same as a sequence of column labels. pandas.Index, bigframes.series.Series, slice, ], ): # No return type annotations (like pandas) as type cannot always be determined statically # NOTE: This implements the operations described in # https://pandas.pydata.org/docs/getting_started/intro_tutorials/03_subset_data.html if isinstance(key, bigframes.series.Series): return self._getitem_bool_series(key) if isinstance(key, slice): return self.iloc[key] # TODO(tswast): Fix this pylance warning: Class overlaps "Hashable" # unsafely and could produce a match at runtime if isinstance(key, blocks.Label): return self._getitem_label(key) if utils.is_list_like(key): return self._getitem_columns(key) else: # TODO(tswast): What case is this supposed to be handling? return self._getitem_columns([cast(Hashable, key)]) __getitem__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__getitem__) def _getitem_columns(self, key: Sequence[blocks.Label]) -> DataFrame: selected_ids: Tuple[str, ...] = () for label in key: col_ids = self._block.label_to_col_id[label] selected_ids = (*selected_ids, *col_ids) return DataFrame(self._block.select_columns(selected_ids)) def _getitem_label(self, key: blocks.Label): col_ids = self._block.cols_matching_label(key) if len(col_ids) == 0: raise KeyError( f"{key} not found in DataFrame columns: {self._block.column_labels}" ) block = self._block.select_columns(col_ids) if isinstance(self.columns, pandas.MultiIndex): # Multiindex should drop-level if not selecting entire key_levels = len(key) if isinstance(key, tuple) else 1 index_levels = self.columns.nlevels if key_levels < index_levels: block = block.with_column_labels( block.column_labels.droplevel(list(range(key_levels))) ) # Force return DataFrame in this case, even if only single column return DataFrame(block) if len(col_ids) == 1: return bigframes.series.Series(block) return DataFrame(block) # Bool Series selects rows def _getitem_bool_series(self, key: bigframes.series.Series) -> DataFrame: if not key.dtype == pandas.BooleanDtype(): raise NotImplementedError( f"Only boolean series currently supported for indexing. {constants.FEEDBACK_LINK}" ) # TODO: enforce stricter alignment combined_index, ( get_column_left, get_column_right, ) = self._block.join(key._block, how="left") block = combined_index filter_col_id = get_column_right[key._value_column] block = block.filter_by_id(filter_col_id) block = block.drop_columns([filter_col_id]) return DataFrame(block) def __getattr__(self, key: str): # To allow subclasses to set private attributes before the class is # fully initialized, protect against recursion errors with # uninitialized DataFrame objects. Note: this comes at the downside # that columns with a leading `_` won't be treated as columns. # # See: # https://github.com/googleapis/python-bigquery-dataframes/issues/728 # and # https://nedbatchelder.com/blog/201010/surprising_getattr_recursion.html if key == "_block": raise AttributeError(key) if key in self._block.column_labels: return self.__getitem__(key) if hasattr(pandas.DataFrame, key): log_adapter.submit_pandas_labels( self._block.expr.session.bqclient, self.__class__.__name__, key ) raise AttributeError( textwrap.dedent( f""" BigQuery DataFrames has not yet implemented an equivalent to 'pandas.DataFrame.{key}'. {constants.FEEDBACK_LINK} """ ) ) raise AttributeError(key) def __setattr__(self, key: str, value): if key == "_block": object.__setattr__(self, key, value) return # To allow subclasses to set private attributes before the class is # fully initialized, assume anything set before `_block` is initialized # is a regular attribute. if not hasattr(self, "_block"): object.__setattr__(self, key, value) return # If someone has a column named the same as a normal attribute # (e.g. index), we want to set the normal attribute, not the column. # To do that, check if there is a normal attribute by using # __getattribute__ (not __getattr__, because that includes columns). # If that returns a value without raising, then we know this is a # normal attribute and we should prefer that. try: object.__getattribute__(self, key) return object.__setattr__(self, key, value) except AttributeError: pass # If we made it here, then we know that it's not a regular attribute # already, so it might be a column to update. Note: we don't allow # adding new columns using __setattr__, only __setitem__, that way we # can still add regular new attributes. if key in self._block.column_labels: self[key] = value else: object.__setattr__(self, key, value) def __repr__(self) -> str: """Converts a DataFrame to a string. Calls to_pandas. Only represents the first `bigframes.options.display.max_rows`. """ # Protect against errors with uninitialized DataFrame. See: # https://github.com/googleapis/python-bigquery-dataframes/issues/728 if not hasattr(self, "_block"): return object.__repr__(self) opts = bigframes.options.display max_results = opts.max_rows # anywdiget mode uses the same display logic as the "deferred" mode # for faster execution if opts.repr_mode in ("deferred", "anywidget"): return formatter.repr_query_job(self._compute_dry_run()) # TODO(swast): pass max_columns and get the true column count back. Maybe # get 1 more column than we have requested so that pandas can add the # ... for us? pandas_df, row_count, query_job = self._block.retrieve_repr_request_results( max_results ) self._set_internal_query_job(query_job) column_count = len(pandas_df.columns) with display_options.pandas_repr(opts): import pandas.io.formats # safe to mutate this, this dict is owned by this code, and does not affect global config to_string_kwargs = ( pandas.io.formats.format.get_dataframe_repr_params() # type: ignore ) if not self._has_index: to_string_kwargs.update({"index": False}) repr_string = pandas_df.to_string(**to_string_kwargs) # Modify the end of the string to reflect count. lines = repr_string.split("\n") pattern = re.compile("\\[[0-9]+ rows x [0-9]+ columns\\]") if pattern.match(lines[-1]): lines = lines[:-2] if row_count > len(lines) - 1: lines.append("...") lines.append("") lines.append(f"[{row_count} rows x {column_count} columns]") return "\n".join(lines) def _repr_html_(self) -> str: """ Returns an html string primarily for use by notebooks for displaying a representation of the DataFrame. Displays 20 rows by default since many notebooks are not configured for large tables. """ opts = bigframes.options.display max_results = opts.max_rows if opts.repr_mode == "deferred": return formatter.repr_query_job(self._compute_dry_run()) # Process blob columns first, regardless of display mode self._cached() df = self.copy() if bigframes.options.display.blob_display: blob_cols = [ series_name for series_name, series in df.items() if series.dtype == bigframes.dtypes.OBJ_REF_DTYPE ] for col in blob_cols: # TODO(garrettwu): Not necessary to get access urls for all the rows. Update when having a to get URLs from local data. df[col] = df[col].blob._get_runtime(mode="R", with_metadata=True) else: blob_cols = [] if opts.repr_mode == "anywidget": try: from IPython.display import display as ipython_display from bigframes import display # Always create a new widget instance for each display call # This ensures that each cell gets its own widget and prevents # unintended sharing between cells widget = display.TableWidget(df.copy()) ipython_display(widget) return "" # Return empty string since we used display() except (AttributeError, ValueError, ImportError): # Fallback if anywidget is not available warnings.warn( "Anywidget mode is not available. " "Please `pip install anywidget traitlets` or `pip install 'bigframes[anywidget]'` to use interactive tables. " f"Falling back to deferred mode. Error: {traceback.format_exc()}" ) return formatter.repr_query_job(self._compute_dry_run()) # Continue with regular HTML rendering for non-anywidget modes # TODO(swast): pass max_columns and get the true column count back. Maybe # get 1 more column than we have requested so that pandas can add the # ... for us? pandas_df, row_count, query_job = df._block.retrieve_repr_request_results( max_results ) self._set_internal_query_job(query_job) column_count = len(pandas_df.columns) with display_options.pandas_repr(opts): # Allows to preview images in the DataFrame. The implementation changes the string repr as well, that it doesn't truncate strings or escape html charaters such as "<" and ">". We may need to implement a full-fledged repr module to better support types not in pandas. if bigframes.options.display.blob_display and blob_cols: def obj_ref_rt_to_html(obj_ref_rt) -> str: obj_ref_rt_json = json.loads(obj_ref_rt) obj_ref_details = obj_ref_rt_json["objectref"]["details"] if "gcs_metadata" in obj_ref_details: gcs_metadata = obj_ref_details["gcs_metadata"] content_type = typing.cast( str, gcs_metadata.get("content_type", "") ) if content_type.startswith("image"): size_str = "" if bigframes.options.display.blob_display_width: size_str = f' width="{bigframes.options.display.blob_display_width}"' if bigframes.options.display.blob_display_height: size_str = ( size_str + f' height="{bigframes.options.display.blob_display_height}"' ) url = obj_ref_rt_json["access_urls"]["read_url"] return f'<img src="{url}"{size_str}>' return f'uri: {obj_ref_rt_json["objectref"]["uri"]}, authorizer: {obj_ref_rt_json["objectref"]["authorizer"]}' formatters = {blob_col: obj_ref_rt_to_html for blob_col in blob_cols} # set max_colwidth so not to truncate the image url with pandas.option_context("display.max_colwidth", None): max_rows = pandas.get_option("display.max_rows") max_cols = pandas.get_option("display.max_columns") show_dimensions = pandas.get_option("display.show_dimensions") html_string = pandas_df.to_html( escape=False, notebook=True, max_rows=max_rows, max_cols=max_cols, show_dimensions=show_dimensions, formatters=formatters, # type: ignore ) else: # _repr_html_ stub is missing so mypy thinks it's a Series. Ignore mypy. html_string = pandas_df._repr_html_() # type:ignore html_string += f"[{row_count} rows x {column_count} columns in total]" return html_string def __delitem__(self, key: str): df = self.drop(columns=[key]) self._set_block(df._get_block()) def __setitem__( self, key: str | list[str] | pandas.Index, value: SingleItemValue | MultiItemValue, ): if isinstance(key, (list, pandas.Index)): df = self._assign_multi_items(key, value) else: df = self._assign_single_item(key, value) self._set_block(df._get_block()) __setitem__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__setitem__) def _apply_binop( self, other: float | int | bigframes.series.Series | DataFrame, op, axis: str | int = "columns", how: str = "outer", reverse: bool = False, ): if isinstance(other, bigframes.dtypes.LOCAL_SCALAR_TYPES): return self._apply_scalar_binop(other, op, reverse=reverse) elif isinstance(other, DataFrame): return self._apply_dataframe_binop(other, op, how=how, reverse=reverse) elif isinstance(other, pandas.DataFrame): return self._apply_dataframe_binop( DataFrame(other), op, how=how, reverse=reverse ) elif utils.get_axis_number(axis) == 0: return self._apply_series_binop_axis_0(other, op, how, reverse) elif utils.get_axis_number(axis) == 1: return self._apply_series_binop_axis_1(other, op, how, reverse) raise NotImplementedError( f"binary operation is not implemented on the second operand of type {type(other).__name__}." f"{constants.FEEDBACK_LINK}" ) def _apply_scalar_binop( self, other: bigframes.dtypes.LOCAL_SCALAR_TYPE, op: ops.BinaryOp, reverse: bool = False, ) -> DataFrame: if reverse: expr = op.as_expr( left_input=ex.const(other), right_input=ex.free_var("var1"), ) else: expr = op.as_expr( left_input=ex.free_var("var1"), right_input=ex.const(other), ) return DataFrame(self._block.multi_apply_unary_op(expr)) def _apply_series_binop_axis_0( self, other, op: ops.BinaryOp, how: str = "outer", reverse: bool = False, ) -> DataFrame: bf_series = bigframes.core.convert.to_bf_series( other, self.index if self._has_index else None, self._session ) aligned_block, columns, expr_pairs = self._block._align_axis_0( bf_series._block, how=how ) result = aligned_block._apply_binop( op, inputs=expr_pairs, labels=columns, reverse=reverse ) return DataFrame(result) def _apply_series_binop_axis_1( self, other, op: ops.BinaryOp, how: str = "outer", reverse: bool = False, ) -> DataFrame: """Align dataframe with pandas series by inlining series values as literals.""" # If we already know the transposed schema (from the transpose cache), we don't need to materialize rows from other # Instead, can fully defer execution (as a cross-join) if ( isinstance(other, bigframes.series.Series) and other._block._transpose_cache is not None ): aligned_block, columns, expr_pairs = self._block._align_series_block_axis_1( other._block, how=how ) else: # Fallback path, materialize `other` locally pd_series = bigframes.core.convert.to_pd_series(other, self.columns) aligned_block, columns, expr_pairs = self._block._align_pd_series_axis_1( pd_series, how=how ) result = aligned_block._apply_binop( op, inputs=expr_pairs, labels=columns, reverse=reverse ) return DataFrame(result) def _apply_dataframe_binop( self, other: DataFrame, op: ops.BinaryOp, how: str = "outer", reverse: bool = False, ) -> DataFrame: aligned_block, columns, expr_pairs = self._block._align_both_axes( other._block, how=how ) result = aligned_block._apply_binop( op, inputs=expr_pairs, labels=columns, reverse=reverse ) return DataFrame(result)
[docs] def eq(self, other: typing.Any, axis: str | int = "columns") -> DataFrame: return self._apply_binop(other, ops.eq_op, axis=axis)
def __eq__(self, other) -> DataFrame: # type: ignore return self.eq(other) __eq__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__eq__)
[docs] def ne(self, other: typing.Any, axis: str | int = "columns") -> DataFrame: return self._apply_binop(other, ops.ne_op, axis=axis)
def __ne__(self, other) -> DataFrame: # type: ignore return self.ne(other) __ne__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__ne__) def __invert__(self) -> DataFrame: return self._apply_unary_op(ops.invert_op) __invert__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__invert__)
[docs] def le(self, other: typing.Any, axis: str | int = "columns") -> DataFrame: return self._apply_binop(other, ops.le_op, axis=axis)
def __le__(self, other) -> DataFrame: return self.le(other) __le__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__le__)
[docs] def lt(self, other: typing.Any, axis: str | int = "columns") -> DataFrame: return self._apply_binop(other, ops.lt_op, axis=axis)
def __lt__(self, other) -> DataFrame: return self.lt(other) __lt__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__lt__)
[docs] def ge(self, other: typing.Any, axis: str | int = "columns") -> DataFrame: return self._apply_binop(other, ops.ge_op, axis=axis)
def __ge__(self, other) -> DataFrame: return self.ge(other) __ge__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__ge__)
[docs] def gt(self, other: typing.Any, axis: str | int = "columns") -> DataFrame: return self._apply_binop(other, ops.gt_op, axis=axis)
def __gt__(self, other) -> DataFrame: return self.gt(other) __gt__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__gt__)
[docs] def add( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: # TODO(swast): Support fill_value parameter. # TODO(swast): Support level parameter with MultiIndex. return self._apply_binop(other, ops.add_op, axis=axis)
[docs] def radd( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: # TODO(swast): Support fill_value parameter. # TODO(swast): Support level parameter with MultiIndex. return self._apply_binop(other, ops.add_op, axis=axis, reverse=True)
def __add__(self, other) -> DataFrame: return self.add(other) __add__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__add__) def __radd__(self, other) -> DataFrame: return self.radd(other) __radd__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__radd__)
[docs] def sub( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: return self._apply_binop(other, ops.sub_op, axis=axis)
subtract = sub subtract.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.sub) def __sub__(self, other): return self.sub(other) __sub__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__sub__)
[docs] def rsub( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: return self._apply_binop(other, ops.sub_op, axis=axis, reverse=True)
def __rsub__(self, other): return self.rsub(other) __rsub__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__rsub__)
[docs] def mul( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: return self._apply_binop(other, ops.mul_op, axis=axis)
multiply = mul multiply.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.mul) def __mul__(self, other): return self.mul(other) __mul__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__mul__)
[docs] def rmul( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: return self.mul(other, axis=axis)
def __rmul__(self, other): return self.rmul(other) __rmul__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__rmul__)
[docs] def truediv( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: return self._apply_binop(other, ops.div_op, axis=axis)
truediv.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.truediv) div = divide = truediv def __truediv__(self, other): return self.truediv(other) __truediv__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__truediv__)
[docs] def rtruediv( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: return self._apply_binop(other, ops.div_op, axis=axis, reverse=True)
rdiv = rtruediv rdiv.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.rtruediv) def __rtruediv__(self, other): return self.rtruediv(other) __rtruediv__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__rtruediv__)
[docs] def floordiv( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: return self._apply_binop(other, ops.floordiv_op, axis=axis)
def __floordiv__(self, other): return self.floordiv(other) __floordiv__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__floordiv__)
[docs] def rfloordiv( self, other: float | int | bigframes.series.Series | DataFrame, axis: str | int = "columns", ) -> DataFrame: return self._apply_binop(other, ops.floordiv_op, axis=axis, reverse=True)
def __rfloordiv__(self, other): return self.rfloordiv(other) __rfloordiv__.__doc__ = inspect.getdoc( vendored_pandas_frame.DataFrame.__rfloordiv__ )
[docs] def mod(self, other: int | bigframes.series.Series | DataFrame, axis: str | int = "columns") -> DataFrame: # type: ignore return self._apply_binop(other, ops.mod_op, axis=axis)
def __mod__(self, other): return self.mod(other) __mod__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__mod__)
[docs] def rmod(self, other: int | bigframes.series.Series | DataFrame, axis: str | int = "columns") -> DataFrame: # type: ignore return self._apply_binop(other, ops.mod_op, axis=axis, reverse=True)
def __rmod__(self, other): return self.rmod(other) __rmod__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__rmod__)
[docs] def pow( self, other: int | bigframes.series.Series, axis: str | int = "columns" ) -> DataFrame: return self._apply_binop(other, ops.pow_op, axis=axis)
def __pow__(self, other): return self.pow(other) __pow__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__pow__)
[docs] def rpow( self, other: int | bigframes.series.Series, axis: str | int = "columns" ) -> DataFrame: return self._apply_binop(other, ops.pow_op, axis=axis, reverse=True)
def __rpow__(self, other): return self.rpow(other) __rpow__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__rpow__) def __and__(self, other: bool | int | bigframes.series.Series) -> DataFrame: return self._apply_binop(other, ops.and_op) __and__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__and__) __rand__ = __and__ def __or__(self, other: bool | int | bigframes.series.Series) -> DataFrame: return self._apply_binop(other, ops.or_op) __or__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__or__) __ror__ = __or__ def __xor__(self, other: bool | int | bigframes.series.Series) -> DataFrame: return self._apply_binop(other, ops.xor_op) __xor__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__xor__) __rxor__ = __xor__ def __pos__(self) -> DataFrame: return self._apply_unary_op(ops.pos_op) def __neg__(self) -> DataFrame: return self._apply_unary_op(ops.neg_op) def __abs__(self) -> DataFrame: return self._apply_unary_op(ops.abs_op) __abs__.__doc__ = abs.__doc__
[docs] def align( self, other: typing.Union[DataFrame, bigframes.series.Series], join: str = "outer", axis: typing.Union[str, int, None] = None, ) -> typing.Tuple[ typing.Union[DataFrame, bigframes.series.Series], typing.Union[DataFrame, bigframes.series.Series], ]: axis_n = utils.get_axis_number(axis) if axis else None if axis_n == 1 and isinstance(other, bigframes.series.Series): raise NotImplementedError( f"align with series and axis=1 not supported. {constants.FEEDBACK_LINK}" ) left_block, right_block = block_ops.align( self._block, other._block, join=join, axis=axis ) return DataFrame(left_block), other.__class__(right_block)
[docs] def update(self, other, join: str = "left", overwrite=True, filter_func=None): other = other if isinstance(other, DataFrame) else DataFrame(other) if join != "left": raise ValueError("Only 'left' join supported for update") if filter_func is not None: # Will always take other if possible def update_func( left: bigframes.series.Series, right: bigframes.series.Series ) -> bigframes.series.Series: return left.mask(right.notna() & filter_func(left), right) elif overwrite: def update_func( left: bigframes.series.Series, right: bigframes.series.Series ) -> bigframes.series.Series: return left.mask(right.notna(), right) else: def update_func( left: bigframes.series.Series, right: bigframes.series.Series ) -> bigframes.series.Series: return left.mask(left.isna(), right) result = self.combine(other, update_func, how=join) self._set_block(result._block)
[docs] def combine( self, other: DataFrame, func: typing.Callable[ [bigframes.series.Series, bigframes.series.Series], bigframes.series.Series ], fill_value=None, overwrite: bool = True, *, how: str = "outer", ) -> DataFrame: l_aligned, r_aligned = block_ops.align(self._block, other._block, join=how) other_missing_labels = self._block.column_labels.difference( other._block.column_labels ) l_frame = DataFrame(l_aligned) r_frame = DataFrame(r_aligned) results = [] for (label, lseries), (_, rseries) in zip(l_frame.items(), r_frame.items()): if not ((label in other_missing_labels) and not overwrite): if fill_value is not None: result = func( lseries.fillna(fill_value), rseries.fillna(fill_value) ) else: result = func(lseries, rseries) else: result = ( lseries.fillna(fill_value) if fill_value is not None else lseries ) results.append(result) if all([isinstance(val, bigframes.series.Series) for val in results]): import bigframes.core.reshape.api as rs return rs.concat(results, axis=1) else: raise ValueError("'func' must return Series")
[docs] def combine_first(self, other: DataFrame): return self._apply_dataframe_binop(other, ops.fillna_op)
def _fast_stat_matrix(self, op: agg_ops.BinaryAggregateOp) -> DataFrame: """Faster corr, cov calculations, but creates more sql text, so cannot scale to many columns""" assert len(self.columns) * len(self.columns) < bigframes.constants.MAX_COLUMNS orig_columns = self.columns frame = self.copy() # Replace column names with 0 to n - 1 to keep order # and avoid the influence of duplicated column name frame.columns = pandas.Index(range(len(orig_columns))) frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) block = frame._block aggregations = [ agg_expressions.BinaryAggregation( op, ex.deref(left_col), ex.deref(right_col) ) for left_col in block.value_columns for right_col in block.value_columns ] # unique columns stops uniq_orig_columns = utils.combine_indices( orig_columns, pandas.Index(range(len(orig_columns))) ) labels = utils.cross_indices(uniq_orig_columns, uniq_orig_columns) block, _ = block.aggregate(aggregations=aggregations, column_labels=labels) block = block.stack(levels=orig_columns.nlevels + 1) # The aggregate operation crated a index level with just 0, need to drop it # Also, drop the last level of each index, which was created to guarantee uniqueness return DataFrame(block).droplevel(0).droplevel(-1, axis=0).droplevel(-1, axis=1)
[docs] def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFrame: if method != "pearson": raise NotImplementedError( f"Only Pearson correlation is currently supported. {constants.FEEDBACK_LINK}" ) if min_periods: raise NotImplementedError( f"min_periods not yet supported. {constants.FEEDBACK_LINK}" ) if not numeric_only: frame = self._raise_on_non_numeric("corr") else: frame = self._drop_non_numeric() if len(frame.columns) <= 30: return frame._fast_stat_matrix(agg_ops.CorrOp()) frame = frame.copy() orig_columns = frame.columns # Replace column names with 0 to n - 1 to keep order # and avoid the influence of duplicated column name frame.columns = pandas.Index(range(len(orig_columns))) frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) block = frame._block # A new column that uniquely identifies each row block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx") val_col_ids = [ col_id for col_id in block.value_columns if col_id != ordering_col ] block = block.melt( [ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value" ) block = block.merge( block, left_join_ids=[ordering_col], right_join_ids=[ordering_col], how="inner", sort=False, ) frame = DataFrame(block).dropna( subset=["_bigframes_value_x", "_bigframes_value_y"] ) paired_mean_frame = ( frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) .agg( _bigframes_paired_mean_x=bigframes.pandas.NamedAgg( column="_bigframes_value_x", aggfunc="mean" ), _bigframes_paired_mean_y=bigframes.pandas.NamedAgg( column="_bigframes_value_y", aggfunc="mean" ), ) .reset_index() ) frame = frame.merge( paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"] ) frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"] frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"] frame["_bigframes_dividend"] = ( frame["_bigframes_value_x"] * frame["_bigframes_value_y"] ) frame["_bigframes_x_square"] = ( frame["_bigframes_value_x"] * frame["_bigframes_value_x"] ) frame["_bigframes_y_square"] = ( frame["_bigframes_value_y"] * frame["_bigframes_value_y"] ) result = ( frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) .agg( _bigframes_dividend_sum=bigframes.pandas.NamedAgg( column="_bigframes_dividend", aggfunc="sum" ), _bigframes_x_square_sum=bigframes.pandas.NamedAgg( column="_bigframes_x_square", aggfunc="sum" ), _bigframes_y_square_sum=bigframes.pandas.NamedAgg( column="_bigframes_y_square", aggfunc="sum" ), ) .reset_index() ) result["_bigframes_corr"] = result["_bigframes_dividend_sum"] / ( ( result["_bigframes_x_square_sum"] * result["_bigframes_y_square_sum"] )._apply_unary_op(ops.sqrt_op) ) result = result._pivot( index="_bigframes_variable_x", columns="_bigframes_variable_y", values="_bigframes_corr", ) map_data = { f"_bigframes_level_{i}": orig_columns.get_level_values(i) for i in range(orig_columns.nlevels) } map_data["_bigframes_keys"] = range(len(orig_columns)) map_df = bigframes.dataframe.DataFrame( map_data, session=self._get_block().expr.session, ).set_index("_bigframes_keys") result = result.join(map_df).sort_index() index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] result = result.set_index(index_columns) result.index.names = orig_columns.names result.columns = orig_columns return result
[docs] def cov(self, *, numeric_only: bool = False) -> DataFrame: if not numeric_only: frame = self._raise_on_non_numeric("corr") else: frame = self._drop_non_numeric() if len(frame.columns) <= 30: return frame._fast_stat_matrix(agg_ops.CovOp()) frame = frame.copy() orig_columns = frame.columns # Replace column names with 0 to n - 1 to keep order # and avoid the influence of duplicated column name frame.columns = pandas.Index(range(len(orig_columns))) frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) block = frame._block # A new column that uniquely identifies each row block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx") val_col_ids = [ col_id for col_id in block.value_columns if col_id != ordering_col ] block = block.melt( [ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value" ) block = block.merge( block, left_join_ids=[ordering_col], right_join_ids=[ordering_col], how="inner", sort=False, ) frame = DataFrame(block).dropna( subset=["_bigframes_value_x", "_bigframes_value_y"] ) paired_mean_frame = ( frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) .agg( _bigframes_paired_mean_x=bigframes.pandas.NamedAgg( column="_bigframes_value_x", aggfunc="mean" ), _bigframes_paired_mean_y=bigframes.pandas.NamedAgg( column="_bigframes_value_y", aggfunc="mean" ), ) .reset_index() ) frame = frame.merge( paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"] ) frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"] frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"] frame["_bigframes_dividend"] = ( frame["_bigframes_value_x"] * frame["_bigframes_value_y"] ) result = ( frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) .agg( _bigframes_dividend_sum=bigframes.pandas.NamedAgg( column="_bigframes_dividend", aggfunc="sum" ), _bigframes_dividend_count=bigframes.pandas.NamedAgg( column="_bigframes_dividend", aggfunc="count" ), ) .reset_index() ) result["_bigframes_cov"] = result["_bigframes_dividend_sum"] / ( result["_bigframes_dividend_count"] - 1 ) result = result._pivot( index="_bigframes_variable_x", columns="_bigframes_variable_y", values="_bigframes_cov", ) map_data = { f"_bigframes_level_{i}": orig_columns.get_level_values(i) for i in range(orig_columns.nlevels) } map_data["_bigframes_keys"] = range(len(orig_columns)) map_df = bigframes.dataframe.DataFrame( map_data, session=self._get_block().expr.session, ).set_index("_bigframes_keys") result = result.join(map_df).sort_index() index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] result = result.set_index(index_columns) result.index.names = orig_columns.names result.columns = orig_columns return result
[docs] def corrwith( self, other: typing.Union[DataFrame, bigframes.series.Series], *, numeric_only: bool = False, ): other_frame = other if isinstance(other, DataFrame) else other.to_frame() if numeric_only: l_frame = self._drop_non_numeric() r_frame = other_frame._drop_non_numeric() else: l_frame = self._raise_on_non_numeric("corrwith") r_frame = other_frame._raise_on_non_numeric("corrwith") l_block = l_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block r_block = r_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block if isinstance(other, DataFrame): block, labels, expr_pairs = l_block._align_both_axes(r_block, how="inner") else: assert isinstance(other, bigframes.series.Series) block, labels, expr_pairs = l_block._align_axis_0(r_block, how="inner") na_cols = l_block.column_labels.join( r_block.column_labels, how="outer" ).difference(labels) block, _ = block.aggregate( aggregations=tuple( agg_expressions.BinaryAggregation(agg_ops.CorrOp(), left_ex, right_ex) for left_ex, right_ex in expr_pairs ), column_labels=labels, ) block = block.project_exprs( (ex.const(float("nan")),) * len(na_cols), labels=na_cols ) block = block.transpose( original_row_index=pandas.Index([None]), single_row_mode=True ) return bigframes.pandas.Series(block)
def __dataframe__( self, nan_as_null: bool = False, allow_copy: bool = True ) -> bigframes.core.interchange.InterchangeDataFrame: return bigframes.core.interchange.InterchangeDataFrame._from_bigframes(self)
[docs] def to_arrow( self, *, ordered: bool = True, allow_large_results: Optional[bool] = None, ) -> pyarrow.Table: """Write DataFrame to an Arrow table / record batch. Args: ordered (bool, default True): Determines whether the resulting Arrow table will be ordered. In some cases, unordered may result in a faster-executing query. allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. Returns: pyarrow.Table: A pyarrow Table with all rows and columns of this DataFrame. """ msg = bfe.format_message( "to_arrow is in preview. Types and unnamed or duplicate name columns may " "change in future." ) warnings.warn(msg, category=bfe.PreviewWarning) pa_table, query_job = self._block.to_arrow( ordered=ordered, allow_large_results=allow_large_results ) if query_job: self._set_internal_query_job(query_job) return pa_table
@overload def to_pandas( # type: ignore[overload-overlap] self, max_download_size: Optional[int] = ..., sampling_method: Optional[str] = ..., random_state: Optional[int] = ..., *, ordered: bool = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., ) -> pandas.DataFrame: ... @overload def to_pandas( self, max_download_size: Optional[int] = ..., sampling_method: Optional[str] = ..., random_state: Optional[int] = ..., *, ordered: bool = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., ) -> pandas.Series: ...
[docs] def to_pandas( self, max_download_size: Optional[int] = None, sampling_method: Optional[str] = None, random_state: Optional[int] = None, *, ordered: bool = True, dry_run: bool = False, allow_large_results: Optional[bool] = None, ) -> pandas.DataFrame | pandas.Series: """Write DataFrame to pandas DataFrame. **Examples:** >>> df = bpd.DataFrame({'col': [4, 2, 2]}) Download the data from BigQuery and convert it into an in-memory pandas DataFrame. >>> df.to_pandas() col 0 4 1 2 2 2 Estimate job statistics without processing or downloading data by using `dry_run=True`. >>> df.to_pandas(dry_run=True) # doctest: +SKIP columnCount 1 columnDtypes {'col': Int64} indexLevel 1 indexDtypes [Int64] projectId bigframes-dev location US jobType QUERY destinationTable {'projectId': 'bigframes-dev', 'datasetId': '_... useLegacySql False referencedTables None totalBytesProcessed 0 cacheHit False statementType SELECT creationTime 2025-04-02 20:17:12.038000+00:00 dtype: object Args: max_download_size (int, default None): .. deprecated:: 2.0.0 ``max_download_size`` parameter is deprecated. Please use ``to_pandas_batches()`` method instead. Download size threshold in MB. If ``max_download_size`` is exceeded when downloading data, the data will be downsampled if ``bigframes.options.sampling.enable_downsampling`` is ``True``, otherwise, an error will be raised. If set to a value other than ``None``, this will supersede the global config. sampling_method (str, default None): .. deprecated:: 2.0.0 ``sampling_method`` parameter is deprecated. Please use ``sample()`` method instead. Downsampling algorithms to be chosen from, the choices are: "head": This algorithm returns a portion of the data from the beginning. It is fast and requires minimal computations to perform the downsampling; "uniform": This algorithm returns uniform random samples of the data. If set to a value other than None, this will supersede the global config. random_state (int, default None): .. deprecated:: 2.0.0 ``random_state`` parameter is deprecated. Please use ``sample()`` method instead. The seed for the uniform downsampling algorithm. If provided, the uniform method may take longer to execute and require more computation. If set to a value other than None, this will supersede the global config. ordered (bool, default True): Determines whether the resulting pandas dataframe will be ordered. In some cases, unordered may result in a faster-executing query. dry_run (bool, default False): If this argument is true, this method will not process the data. Instead, it returns a Pandas Series containing dry run statistics allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. Returns: pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the data_sampling_threshold_mb is not exceeded; otherwise, a pandas DataFrame with downsampled rows and all columns of this DataFrame. If dry_run is set, a pandas Series containing dry run statistics will be returned. """ if max_download_size is not None: msg = bfe.format_message( "DEPRECATED: The `max_download_size` parameters for `DataFrame.to_pandas()` " "are deprecated and will be removed soon. Please use `DataFrame.to_pandas_batches()`." ) warnings.warn(msg, category=FutureWarning) if sampling_method is not None or random_state is not None: msg = bfe.format_message( "DEPRECATED: The `sampling_method` and `random_state` parameters for " "`DataFrame.to_pandas()` are deprecated and will be removed soon. " "Please use `DataFrame.sample().to_pandas()` instead for sampling." ) warnings.warn(msg, category=FutureWarning, stacklevel=2) if dry_run: dry_run_stats, dry_run_job = self._block._compute_dry_run( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, ordered=ordered, ) self._set_internal_query_job(dry_run_job) return dry_run_stats df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, ordered=ordered, allow_large_results=allow_large_results, ) if query_job: self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False)
[docs] def to_pandas_batches( self, page_size: Optional[int] = None, max_results: Optional[int] = None, *, allow_large_results: Optional[bool] = None, ) -> Iterable[pandas.DataFrame]: """Stream DataFrame results to an iterable of pandas DataFrame. page_size and max_results determine the size and number of batches, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result **Examples:** >>> df = bpd.DataFrame({'col': [4, 3, 2, 2, 3]}) Iterate through the results in batches, limiting the total rows yielded across all batches via `max_results`: >>> for df_batch in df.to_pandas_batches(max_results=3): ... print(df_batch) col 0 4 1 3 2 2 Alternatively, control the approximate size of each batch using `page_size` and fetch batches manually using `next()`: >>> it = df.to_pandas_batches(page_size=2) >>> next(it) col 0 4 1 3 >>> next(it) col 2 2 3 2 Args: page_size (int, default None): The maximum number of rows of each batch. Non-positive values are ignored. max_results (int, default None): The maximum total number of rows of all batches. allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. Returns: Iterable[pandas.DataFrame]: An iterable of smaller dataframes which combine to form the original dataframe. Results stream from bigquery, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable """ return self._to_pandas_batches( page_size=page_size, max_results=max_results, allow_large_results=allow_large_results, )
def _to_pandas_batches( self, page_size: Optional[int] = None, max_results: Optional[int] = None, *, allow_large_results: Optional[bool] = None, ) -> blocks.PandasBatches: return self._block.to_pandas_batches( page_size=page_size, max_results=max_results, allow_large_results=allow_large_results, ) def _compute_dry_run(self) -> bigquery.QueryJob: _, query_job = self._block._compute_dry_run() return query_job
[docs] def copy(self) -> DataFrame: return DataFrame(self._block)
[docs] @validations.requires_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) def head(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[:n])
[docs] @validations.requires_ordering() def tail(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[-n:])
[docs] def peek( self, n: int = 5, *, force: bool = True, allow_large_results=None ) -> pandas.DataFrame: """ Preview n arbitrary rows from the dataframe. No guarantees about row selection or ordering. ``DataFrame.peek(force=False)`` will always be very fast, but will not succeed if data requires full data scanning. Using ``force=True`` will always succeed, but may be perform queries. Query results will be cached so that future steps will benefit from these queries. Args: n (int, default 5): The number of rows to select from the dataframe. Which N rows are returned is non-deterministic. force (bool, default True): If the data cannot be peeked efficiently, the dataframe will instead be fully materialized as part of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError. allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. Returns: pandas.DataFrame: A pandas DataFrame with n rows. Raises: ValueError: If force=False and data cannot be efficiently peeked. """ maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results) if maybe_result is None: if force: self._cached() maybe_result = self._block.try_peek( n, force=True, allow_large_results=allow_large_results ) assert maybe_result is not None else: raise ValueError( "Cannot peek efficiently when data has aggregates, joins or window functions applied. Use force=True to fully compute dataframe." ) return maybe_result.set_axis(self._block.column_labels, axis=1, copy=False)
[docs] def nlargest( self, n: int, columns: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], keep: str = "first", ) -> DataFrame: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": validations.enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nlargest(self._block, n, column_ids, keep=keep))
[docs] def nsmallest( self, n: int, columns: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], keep: str = "first", ) -> DataFrame: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": validations.enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nsmallest(self._block, n, column_ids, keep=keep))
[docs] def insert( self, loc: int, column: blocks.Label, value: SingleItemValue, allow_duplicates: bool = False, ): column_count = len(self.columns) if loc > column_count: raise IndexError( f"Column index {loc} is out of bounds with {column_count} total columns." ) if (column in self.columns) and not allow_duplicates: raise ValueError(f"cannot insert {column}, already exists") temp_column = bigframes.core.guid.generate_guid(prefix=str(column)) df = self._assign_single_item(temp_column, value) block = df._get_block() value_columns = typing.cast(List, block.value_columns) value_columns, new_column = value_columns[:-1], value_columns[-1] value_columns.insert(loc, new_column) block = block.select_columns(value_columns) block = block.rename(columns={temp_column: column}) self._set_block(block)
@overload def drop( self, labels: typing.Any = None, *, axis: typing.Union[int, str] = 0, index: typing.Any = None, columns: Union[blocks.Label, Sequence[blocks.Label]] = None, level: typing.Optional[LevelType] = None, inplace: Literal[False] = False, ) -> DataFrame: ... @overload def drop( self, labels: typing.Any = None, *, axis: typing.Union[int, str] = 0, index: typing.Any = None, columns: Union[blocks.Label, Sequence[blocks.Label]] = None, level: typing.Optional[LevelType] = None, inplace: Literal[True], ) -> None: ...
[docs] def drop( self, labels: typing.Any = None, *, axis: typing.Union[int, str] = 0, index: typing.Any = None, columns: Union[blocks.Label, Sequence[blocks.Label]] = None, level: typing.Optional[LevelType] = None, inplace: bool = False, ) -> Optional[DataFrame]: if labels: if index or columns: raise ValueError("Cannot specify both 'labels' and 'index'/'columns") axis_n = utils.get_axis_number(axis) if axis_n == 0: index = labels else: columns = labels block = self._block if index is not None: self._throw_if_null_index("drop(axis=0)") level_id = self._resolve_levels(level or 0)[0] if utils.is_list_like(index): # Only tuple is treated as multi-index value combinations if isinstance(index, tuple): if level is not None: raise ValueError("Multi-index tuple can't specify level.") condition_id = None for i, idx in enumerate(index): level_id = self._resolve_levels(i)[0] block, condition_id_cur = block.project_expr( ops.ne_op.as_expr(level_id, ex.const(idx)) ) if condition_id: block, condition_id = block.apply_binary_op( condition_id, condition_id_cur, ops.or_op ) else: condition_id = condition_id_cur condition_id = typing.cast(str, condition_id) else: block, inverse_condition_id = block.apply_unary_op( level_id, ops.IsInOp(values=tuple(index), match_nulls=True) ) block, condition_id = block.apply_unary_op( inverse_condition_id, ops.invert_op ) elif isinstance(index, indexes.Index): dropped_block = self._drop_by_index(index)._get_block() if inplace: self._set_block(dropped_block) return None return DataFrame(dropped_block) else: block, condition_id = block.project_expr( ops.ne_op.as_expr(level_id, ex.const(index)) ) block = block.filter_by_id(condition_id, keep_null=True).select_columns( self._block.value_columns ) if columns: block = block.drop_columns(self._sql_names(columns)) if index is None and not columns: raise ValueError("Must specify 'labels' or 'index'/'columns") if inplace: self._set_block(block) return None else: return DataFrame(block)
def _drop_by_index(self, index: indexes.Index) -> DataFrame: block = index._block block, ordering_col = block.promote_offsets() joined_index, (get_column_left, get_column_right) = self._block.join(block) new_ordering_col = get_column_right[ordering_col] drop_block = joined_index drop_block, drop_col = drop_block.apply_unary_op( new_ordering_col, ops.isnull_op, ) drop_block = drop_block.filter_by_id(drop_col) original_columns = [ get_column_left[column] for column in self._block.value_columns ] drop_block = drop_block.select_columns(original_columns) return DataFrame(drop_block)
[docs] def droplevel(self, level: LevelsType, axis: int | str = 0): axis_n = utils.get_axis_number(axis) if axis_n == 0: resolved_level_ids = self._resolve_levels(level) return DataFrame(self._block.drop_levels(resolved_level_ids)) else: if isinstance(self.columns, pandas.MultiIndex): new_df = self.copy() new_df.columns = self.columns.droplevel(level) return new_df else: raise ValueError("Columns must be a multiindex to drop levels.")
[docs] def swaplevel(self, i: int = -2, j: int = -1, axis: int | str = 0): axis_n = utils.get_axis_number(axis) if axis_n == 0: level_i = self._block.index_columns[i] level_j = self._block.index_columns[j] mapping = {level_i: level_j, level_j: level_i} reordering = [ mapping.get(index_id, index_id) for index_id in self._block.index_columns ] return DataFrame(self._block.reorder_levels(reordering)) else: if isinstance(self.columns, pandas.MultiIndex): new_df = self.copy() new_df.columns = self.columns.swaplevel(i, j) return new_df else: raise ValueError("Columns must be a multiindex to reorder levels.")
[docs] def reorder_levels(self, order: LevelsType, axis: int | str = 0): axis_n = utils.get_axis_number(axis) if axis_n == 0: resolved_level_ids = self._resolve_levels(order) return DataFrame(self._block.reorder_levels(resolved_level_ids)) else: if isinstance(self.columns, pandas.MultiIndex): new_df = self.copy() new_df.columns = self.columns.reorder_levels(order) return new_df else: raise ValueError("Columns must be a multiindex to reorder levels.")
def _resolve_levels(self, level: LevelsType) -> typing.Sequence[str]: return self._block.index.resolve_level(level) @overload def rename(self, *, columns: Mapping[blocks.Label, blocks.Label]) -> DataFrame: ... @overload def rename( self, *, columns: Mapping[blocks.Label, blocks.Label], inplace: Literal[False] ) -> DataFrame: ... @overload def rename( self, *, columns: Mapping[blocks.Label, blocks.Label], inplace: Literal[True] ) -> None: ...
[docs] def rename( self, *, columns: Mapping[blocks.Label, blocks.Label], inplace: bool = False ) -> Optional[DataFrame]: block = self._block.rename(columns=columns) if inplace: self._block = block return None else: return DataFrame(block)
@overload def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], ) -> DataFrame: ... @overload def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], *, inplace: Literal[False], **kwargs, ) -> DataFrame: ... @overload def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], *, inplace: Literal[True], **kwargs, ) -> None: ...
[docs] def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], *, inplace: bool = False, **kwargs, ) -> Optional[DataFrame]: if len(kwargs) != 0: raise NotImplementedError( f"rename_axis does not currently support any keyword arguments. {constants.FEEDBACK_LINK}" ) # limited implementation: the new index name is simply the 'mapper' parameter if utils.is_list_like(mapper): labels = mapper else: labels = [mapper] block = self._block.with_index_labels(labels) if inplace: self._block = block return None else: return DataFrame(block)
[docs] @validations.requires_ordering() def equals(self, other: typing.Union[bigframes.series.Series, DataFrame]) -> bool: # Must be same object type, same column dtypes, and same label values if not isinstance(other, DataFrame): return False return block_ops.equals(self._block, other._block)
[docs] def assign(self, **kwargs) -> DataFrame: # TODO(garrettwu) Support list-like values. Requires ordering. # TODO(garrettwu) Support callable values. cur = self for k, v in kwargs.items(): cur = cur._assign_single_item(k, v) return cur
def _assign_single_item( self, k: str, v: SingleItemValue | MultiItemValue, ) -> DataFrame: if isinstance(v, bigframes.series.Series): return self._assign_series_join_on_index(k, v) elif isinstance(v, bigframes.dataframe.DataFrame): v_df_col_count = len(v._block.value_columns) if v_df_col_count != 1: raise ValueError( f"Cannot set a DataFrame with {v_df_col_count} columns to the single column {k}" ) return self._assign_series_join_on_index(k, v[v.columns[0]]) elif callable(v): copy = self.copy() copy[k] = v(copy) return copy elif utils.is_list_like(v): return self._assign_single_item_listlike(k, v) else: return self._assign_scalar(k, v) # type: ignore def _assign_multi_items( self, k: list[str] | pandas.Index, v: SingleItemValue | MultiItemValue, ) -> DataFrame: value_sources: Sequence[Any] = [] if isinstance(v, DataFrame): value_sources = [v[col] for col in v.columns] elif isinstance(v, bigframes.series.Series): # For behavior consistency with Pandas. raise ValueError("Columns must be same length as key") elif isinstance(v, Sequence): value_sources = v else: # We assign the same scalar value to all target columns. value_sources = [v] * len(k) if len(value_sources) != len(k): raise ValueError("Columns must be same length as key") # Repeatedly assign columns in order. result = self._assign_single_item(k[0], value_sources[0]) for target, source in zip(k[1:], value_sources[1:]): result = result._assign_single_item(target, source) return result def _assign_single_item_listlike(self, k: str, v: Sequence) -> DataFrame: given_rows = len(v) actual_rows = len(self) assigning_to_empty_df = len(self.columns) == 0 and actual_rows == 0 if not assigning_to_empty_df and given_rows != actual_rows: raise ValueError( f"Length of values ({given_rows}) does not match length of index ({actual_rows})" ) local_df = DataFrame({k: v}, session=self._get_block().expr.session) # local_df is likely (but not guaranteed) to be cached locally # since the original list came from memory and so is probably < MAX_INLINE_DF_SIZE new_column_block = local_df._block original_index_column_ids = self._block.index_columns self_block = self._block.reset_index(drop=False) if assigning_to_empty_df: if len(self._block.index_columns) > 1: # match error raised by pandas here raise ValueError( "Assigning listlike to a first column under multiindex is not supported." ) result_block = new_column_block.with_index_labels(self._block.index.names) result_block = result_block.with_column_labels([k]) else: result_block, ( get_column_left, get_column_right, ) = self_block.join(new_column_block, how="left", block_identity_join=True) result_block = result_block.set_index( [get_column_left[col_id] for col_id in original_index_column_ids], index_labels=self._block.index.names, ) src_col = get_column_right[new_column_block.value_columns[0]] # Check to see if key exists, and modify in place col_ids = self._block.cols_matching_label(k) for col_id in col_ids: result_block = result_block.copy_values( src_col, get_column_left[col_id] ) if len(col_ids) > 0: result_block = result_block.drop_columns([src_col]) return DataFrame(result_block) def _assign_scalar(self, label: str, value: Union[int, float, str]) -> DataFrame: col_ids = self._block.cols_matching_label(label) block, constant_col_id = self._block.create_constant(value, label) for col_id in col_ids: block = block.copy_values(constant_col_id, col_id) if len(col_ids) > 0: block = block.drop_columns([constant_col_id]) return DataFrame(block) def _assign_series_join_on_index( self, label: str, series: bigframes.series.Series ) -> DataFrame: block, (get_column_left, get_column_right) = self._block.join( series._block, how="left" ) column_ids = [ get_column_left[col_id] for col_id in self._block.cols_matching_label(label) ] source_column = get_column_right[series._value_column] # Replace each column matching the label for column_id in column_ids: block = block.copy_values(source_column, column_id).assign_label( column_id, label ) if not column_ids: # Append case, so new column needs appropriate label block = block.assign_label(source_column, label) else: # Update case, remove after copying into columns block = block.drop_columns([source_column]) return DataFrame(block.with_index_labels(self._block.index.names)) @overload # type: ignore[override] def reset_index( self, level: blocks.LevelsType = ..., drop: bool = ..., inplace: Literal[False] = ..., col_level: Union[int, str] = ..., col_fill: Hashable = ..., allow_duplicates: Optional[bool] = ..., names: Union[None, Hashable, Sequence[Hashable]] = ..., ) -> DataFrame: ... @overload def reset_index( self, level: blocks.LevelsType = ..., drop: bool = ..., inplace: Literal[True] = ..., col_level: Union[int, str] = ..., col_fill: Hashable = ..., allow_duplicates: Optional[bool] = ..., names: Union[None, Hashable, Sequence[Hashable]] = ..., ) -> None: ... @overload def reset_index( self, level: blocks.LevelsType = None, drop: bool = False, inplace: bool = ..., col_level: Union[int, str] = ..., col_fill: Hashable = ..., allow_duplicates: Optional[bool] = ..., names: Union[None, Hashable, Sequence[Hashable]] = ..., ) -> Optional[DataFrame]: ...
[docs] def reset_index( self, level: blocks.LevelsType = None, drop: bool = False, inplace: bool = False, col_level: Union[int, str] = 0, col_fill: Hashable = "", allow_duplicates: Optional[bool] = None, names: Union[None, Hashable, Sequence[Hashable]] = None, ) -> Optional[DataFrame]: block = self._block if names: if isinstance(names, blocks.Label) and not isinstance(names, tuple): names = [names] else: names = list(names) if len(names) != self.index.nlevels: raise ValueError("'names' must be same length as levels") block = block.with_index_labels(names) if allow_duplicates is None: allow_duplicates = False block = block.reset_index( level, drop, col_level=col_level, col_fill=col_fill, allow_duplicates=allow_duplicates, ) if inplace: self._set_block(block) return None else: return DataFrame(block)
[docs] def set_index( self, keys: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], append: bool = False, drop: bool = True, ) -> DataFrame: if not utils.is_list_like(keys): keys = typing.cast(typing.Sequence[blocks.Label], (keys,)) else: keys = typing.cast(typing.Sequence[blocks.Label], tuple(keys)) col_ids = [self._resolve_label_exact(key) for key in keys] missing = [keys[i] for i in range(len(col_ids)) if col_ids[i] is None] if len(missing) > 0: raise KeyError(f"None of {missing} are in the columns") # convert col_ids to non-optional strs since we just determined they are not None col_ids_strs: List[str] = [col_id for col_id in col_ids if col_id is not None] return DataFrame(self._block.set_index(col_ids_strs, append=append, drop=drop))
@overload # type: ignore[override] def sort_index( self, *, ascending: bool = ..., inplace: Literal[False] = ..., na_position: Literal["first", "last"] = ..., ) -> DataFrame: ... @overload def sort_index( self, *, ascending: bool = ..., inplace: Literal[True] = ..., na_position: Literal["first", "last"] = ..., ) -> None: ...
[docs] def sort_index( self, *, axis: Union[int, str] = 0, ascending: bool = True, inplace: bool = False, na_position: Literal["first", "last"] = "last", ) -> Optional[DataFrame]: if utils.get_axis_number(axis) == 0: if na_position not in ["first", "last"]: raise ValueError("Param na_position must be one of 'first' or 'last'") na_last = na_position == "last" index_columns = self._block.index_columns ordering = [ order.ascending_over(column, na_last) if ascending else order.descending_over(column, na_last) for column in index_columns ] block = self._block.order_by(ordering) else: # axis=1 _, indexer = self.columns.sort_values( return_indexer=True, ascending=ascending, na_position=na_position # type: ignore ) block = self._block.select_columns( [self._block.value_columns[i] for i in indexer] ) if inplace: self._set_block(block) return None else: return DataFrame(block)
@overload # type: ignore[override] def sort_values( self, by: str | typing.Sequence[str], *, inplace: Literal[False] = ..., ascending: bool | typing.Sequence[bool] = ..., kind: str = ..., na_position: typing.Literal["first", "last"] = ..., ) -> DataFrame: ... @overload def sort_values( self, by: str | typing.Sequence[str], *, inplace: Literal[True] = ..., ascending: bool | typing.Sequence[bool] = ..., kind: str = ..., na_position: typing.Literal["first", "last"] = ..., ) -> None: ...
[docs] def sort_values( self, by: str | typing.Sequence[str], *, inplace: bool = False, ascending: bool | typing.Sequence[bool] = True, kind: str = "quicksort", na_position: typing.Literal["first", "last"] = "last", ) -> Optional[DataFrame]: if isinstance(by, (bigframes.series.Series, indexes.Index, DataFrame)): raise KeyError( f"Invalid key type: {type(by).__name__}. Please provide valid column name(s)." ) if na_position not in {"first", "last"}: raise ValueError("Param na_position must be one of 'first' or 'last'") sort_labels = list(by) if utils.is_list_like(by) else [by] sort_column_ids = self._sql_names(sort_labels) len_by = len(sort_labels) if not isinstance(ascending, bool): if len(ascending) != len_by: raise ValueError("Length of 'ascending' must equal length of 'by'") sort_directions = ascending else: sort_directions = (ascending,) * len_by ordering = [] for i in range(len(sort_labels)): column_id = sort_column_ids[i] is_ascending = sort_directions[i] na_last = na_position == "last" ordering.append( order.ascending_over(column_id, na_last) if is_ascending else order.descending_over(column_id, na_last) ) block = self._block.order_by(ordering) if inplace: self._set_block(block) return None else: return DataFrame(block)
[docs] def eval(self, expr: str) -> DataFrame: import bigframes.core.eval as bf_eval return bf_eval.eval(self, expr, target=self)
[docs] def query(self, expr: str) -> DataFrame: import bigframes.core.eval as bf_eval eval_result = bf_eval.eval(self, expr, target=None) return self[eval_result]
[docs] def value_counts( self, subset: typing.Union[blocks.Label, typing.Sequence[blocks.Label]] = None, normalize: bool = False, sort: bool = True, ascending: bool = False, dropna: bool = True, ): # 'sort'=False allows arbitrary sorting, so we will sort anyways and ignore the param columns = self._sql_names(subset) if subset else self._block.value_columns block = block_ops.value_counts( self._block, columns, normalize=normalize, sort=sort, ascending=ascending, drop_na=dropna, ) return bigframes.series.Series(block)
[docs] def add_prefix(self, prefix: str, axis: int | str | None = None) -> DataFrame: axis = 1 if axis is None else axis return DataFrame(self._get_block().add_prefix(prefix, axis))
[docs] def add_suffix(self, suffix: str, axis: int | str | None = None) -> DataFrame: axis = 1 if axis is None else axis return DataFrame(self._get_block().add_suffix(suffix, axis))
[docs] def take( self, indices: typing.Sequence[int], axis: int | str | None = 0, **kwargs ) -> DataFrame: if not utils.is_list_like(indices): raise ValueError("indices should be a list-like object.") if axis == 0 or axis == "index": return self.iloc[indices] elif axis == 1 or axis == "columns": return self.iloc[:, indices] else: raise ValueError(f"No axis named {axis} for object type DataFrame")
[docs] def filter( self, items: typing.Optional[typing.Iterable] = None, like: typing.Optional[str] = None, regex: typing.Optional[str] = None, axis: int | str | None = None, ) -> DataFrame: if sum([(items is not None), (like is not None), (regex is not None)]) != 1: raise ValueError( "Need to provide exactly one of 'items', 'like', or 'regex'" ) axis_n = utils.get_axis_number(axis) if (axis is not None) else 1 if axis_n == 0: # row labels return self._filter_rows(items, like, regex) else: # column labels return self._filter_columns(items, like, regex)
def _filter_rows( self, items: typing.Optional[typing.Iterable] = None, like: typing.Optional[str] = None, regex: typing.Optional[str] = None, ) -> DataFrame: if len(self._block.index_columns) > 1: raise NotImplementedError( f"Method filter does not support rows multiindex. {constants.FEEDBACK_LINK}" ) if (like is not None) or (regex is not None): block = self._block block, label_string_id = block.apply_unary_op( self._block.index_columns[0], ops.AsTypeOp(to_type=pandas.StringDtype(storage="pyarrow")), ) if like is not None: block, mask_id = block.apply_unary_op( label_string_id, ops.StrContainsOp(pat=like) ) else: # regex assert regex is not None block, mask_id = block.apply_unary_op( label_string_id, ops.StrContainsRegexOp(pat=regex) ) block = block.filter_by_id(mask_id) block = block.select_columns(self._block.value_columns) return DataFrame(block) elif items is not None: # Behavior matches pandas 2.1+, older pandas versions would reindex block = self._block block, mask_id = block.apply_unary_op( self._block.index_columns[0], ops.IsInOp(values=tuple(items)) ) block = block.filter_by_id(mask_id) block = block.select_columns(self._block.value_columns) return DataFrame(block) else: raise ValueError("Need to provide 'items', 'like', or 'regex'") def _filter_columns( self, items: typing.Optional[typing.Iterable] = None, like: typing.Optional[str] = None, regex: typing.Optional[str] = None, ) -> DataFrame: if (like is not None) or (regex is not None): def label_filter(label): label_str = label if isinstance(label, str) else str(label) if like: return like in label_str else: # regex # TODO(b/340891296): fix type error return re.match(regex, label_str) is not None # type: ignore cols = [ col_id for col_id, label in zip(self._block.value_columns, self.columns) if label_filter(label) ] return DataFrame(self._block.select_columns(cols)) if items is not None: # Behavior matches pandas 2.1+, older pandas versions would reorder using order of items new_columns = self.columns.intersection(pandas.Index(items)) return self.reindex(columns=new_columns) else: raise ValueError("Need to provide 'items', 'like', or 'regex'")
[docs] def reindex( self, labels=None, *, index=None, columns=None, axis: typing.Optional[typing.Union[str, int]] = None, validate: typing.Optional[bool] = None, ): if labels: if index or columns: raise ValueError("Cannot specify both 'labels' and 'index'/'columns") axis_n = utils.get_axis_number(axis) if (axis is not None) else 0 if axis_n == 0: index = labels else: columns = labels if (index is not None) and (columns is not None): return self._reindex_columns(columns)._reindex_rows( index, validate=validate or False ) if index is not None: return self._reindex_rows(index, validate=validate or False) if columns is not None: return self._reindex_columns(columns)
@validations.requires_index def _reindex_rows( self, index, *, validate: typing.Optional[bool] = None, ): if validate and not self.index.is_unique: raise ValueError("Original index must be unique to reindex") keep_original_names = False if isinstance(index, indexes.Index): new_indexer = DataFrame(data=index._block)[[]] else: if not isinstance(index, pandas.Index): keep_original_names = True index = pandas.Index(index) if index.nlevels != self.index.nlevels: raise NotImplementedError( "Cannot reindex with index with different nlevels" ) new_indexer = DataFrame(index=index, session=self._session)[[]] # multiindex join is senstive to index names, so we will set all these result = new_indexer.rename_axis(range(new_indexer.index.nlevels)).join( self.rename_axis(range(self.index.nlevels)), how="left", ) # and then reset the names after the join return result.rename_axis( self.index.names if keep_original_names else index.names ) def _reindex_columns(self, columns): block = self._block new_column_index, indexer = self.columns.reindex(columns) if indexer is None: # The new index is the same as the old one. Do nothing. return self result_cols = [] for label, index in zip(columns, indexer): if index >= 0: result_cols.append(self._block.value_columns[index]) else: block, null_col = block.create_constant( pandas.NA, label, dtype=pandas.Float64Dtype() ) result_cols.append(null_col) result_df = DataFrame(block.select_columns(result_cols)) result_df.columns = new_column_index return result_df
[docs] @validations.requires_index def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate)
[docs] @validations.requires_ordering() @validations.requires_index def interpolate(self, method: str = "linear") -> DataFrame: if method == "pad": return self.ffill() result = block_ops.interpolate(self._block, method) return DataFrame(result)
[docs] def fillna(self, value=None) -> DataFrame: return self._apply_binop(value, ops.fillna_op, how="left")
[docs] def replace( self, to_replace: typing.Any, value: typing.Any = None, *, regex: bool = False ): if utils.is_dict_like(value): return self.apply( lambda x: x.replace( to_replace=to_replace, value=value[x.name], regex=regex ) if (x.name in value) else x ) return self.apply( lambda x: x.replace(to_replace=to_replace, value=value, regex=regex) )
[docs] @validations.requires_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = windows.rows(start=None if limit is None else -limit, end=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window)
[docs] @validations.requires_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = windows.rows(start=0, end=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window)
[docs] def isin(self, values) -> DataFrame: if utils.is_dict_like(values): block = self._block result_ids = [] for col, label in zip(self._block.value_columns, self._block.column_labels): if label in values.keys(): value_for_key = values[label] block, result_id = block.apply_unary_op( col, ops.IsInOp(values=tuple(value_for_key), match_nulls=True), label, ) result_ids.append(result_id) else: block, result_id = block.create_constant( False, label=label, dtype=pandas.BooleanDtype() ) result_ids.append(result_id) return DataFrame(block.select_columns(result_ids)) elif utils.is_list_like(values): return self._apply_unary_op( ops.IsInOp(values=tuple(values), match_nulls=True) ) else: raise TypeError( "only list-like objects are allowed to be passed to " f"isin(), you passed a [{type(values).__name__}]" )
[docs] def keys(self) -> pandas.Index: return self.columns
[docs] def items(self): column_ids = self._block.value_columns column_labels = self._block.column_labels for col_id, col_label in zip(column_ids, column_labels): yield col_label, bigframes.series.Series(self._block.select_column(col_id))
[docs] def iterrows(self) -> Iterable[tuple[typing.Any, pandas.Series]]: for df in self.to_pandas_batches(): for item in df.iterrows(): yield item
[docs] def itertuples( self, index: bool = True, name: typing.Optional[str] = "Pandas" ) -> Iterable[tuple[typing.Any, ...]]: for df in self.to_pandas_batches(): for item in df.itertuples(index=index, name=name): yield item
def _apply_callable(self, condition): """Executes the possible callable condition as needed.""" if callable(condition): # When it's a bigframes function. if hasattr(condition, "bigframes_bigquery_function"): return self.apply(condition, axis=1) # When it's a plain Python function. return condition(self) # When it's not a callable. return condition
[docs] def where(self, cond, other=None): if self.columns.nlevels > 1: raise NotImplementedError( "The dataframe.where() method does not support multi-column." ) # Execute it with the DataFrame when cond or/and other is callable. # It can be either a plain python function or remote/managed function. cond = self._apply_callable(cond) other = self._apply_callable(other) if isinstance(other, bigframes.series.Series): raise ValueError("Seires is not a supported replacement type!") aligned_block, (_, _) = self._block.join(cond._block, how="left") # No left join is needed when 'other' is None or constant. if isinstance(other, bigframes.dataframe.DataFrame): aligned_block, (_, _) = aligned_block.join(other._block, how="left") self_len = len(self._block.value_columns) cond_len = len(cond._block.value_columns) ids = aligned_block.value_columns[:self_len] labels = aligned_block.column_labels[:self_len] self_col = {x: ex.deref(y) for x, y in zip(labels, ids)} if isinstance(cond, bigframes.series.Series): # This is when 'cond' is a valid series. y = aligned_block.value_columns[self_len] cond_col = {x: ex.deref(y) for x in self_col.keys()} else: # This is when 'cond' is a dataframe. ids = aligned_block.value_columns[self_len : self_len + cond_len] labels = aligned_block.column_labels[self_len : self_len + cond_len] cond_col = {x: ex.deref(y) for x, y in zip(labels, ids)} if isinstance(other, DataFrame): other_len = len(self._block.value_columns) ids = aligned_block.value_columns[-other_len:] labels = aligned_block.column_labels[-other_len:] other_col = {x: ex.deref(y) for x, y in zip(labels, ids)} else: # This is when 'other' is None or constant. labels = aligned_block.column_labels[:self_len] other_col = {x: ex.const(other) for x in labels} # type: ignore result_series = {} for x, self_id in self_col.items(): cond_id = cond_col[x] if x in cond_col else ex.const(False) other_id = other_col[x] if x in other_col else ex.const(None) result_block, result_id = aligned_block.project_expr( ops.where_op.as_expr(self_id, cond_id, other_id) ) series = bigframes.series.Series( result_block.select_column(result_id).with_column_labels([x]) ) result_series[x] = series result = DataFrame(result_series) result.columns.name = self.columns.name result.columns.names = self.columns.names return result
[docs] def mask(self, cond, other=None): return self.where(~self._apply_callable(cond), other=other)
[docs] def dropna( self, *, axis: int | str = 0, how: str = "any", thresh: typing.Optional[int] = None, subset: typing.Union[None, blocks.Label, Sequence[blocks.Label]] = None, inplace: bool = False, ignore_index=False, ) -> DataFrame: if inplace: raise NotImplementedError( f"'inplace'=True not supported. {constants.FEEDBACK_LINK}" ) # Check if both thresh and how are explicitly provided if thresh is not None: # cannot specify both thresh and how parameters if how != "any": raise TypeError( "You cannot set both the how and thresh arguments at the same time." ) else: # Only validate 'how' when thresh is not provided if how not in ("any", "all"): raise ValueError("'how' must be one of 'any', 'all'") axis_n = utils.get_axis_number(axis) if subset is not None and axis_n != 0: raise NotImplementedError( f"subset only supported when axis=0. {constants.FEEDBACK_LINK}" ) if axis_n == 0: # subset needs to be converted into column IDs, not column labels. if subset is None: subset_ids = None elif not utils.is_list_like(subset): subset_ids = [id_ for id_ in self._block.label_to_col_id[subset]] else: subset_ids = [ id_ for label in subset for id_ in self._block.label_to_col_id[label] ] result = block_ops.dropna( self._block, self._block.value_columns, how=how, thresh=thresh, subset=subset_ids, ) # type: ignore if ignore_index: result = result.reset_index() return DataFrame(result) else: if thresh is not None: # Keep columns with at least 'thresh' non-null values notnull_block = self._block.multi_apply_unary_op(ops.notnull_op) notnull_counts = DataFrame(notnull_block).sum().to_pandas() keep_columns = [ col for col, count in zip(self._block.value_columns, notnull_counts) if count >= thresh ] else: isnull_block = self._block.multi_apply_unary_op(ops.isnull_op) if how == "any": null_locations = DataFrame(isnull_block).any().to_pandas() else: # 'all' null_locations = DataFrame(isnull_block).all().to_pandas() keep_columns = [ col for col, to_drop in zip(self._block.value_columns, null_locations) if not to_drop ] return DataFrame(self._block.select_columns(keep_columns))
[docs] def any( self, *, axis: typing.Union[str, int] = 0, bool_only: bool = False, ) -> bigframes.series.Series: if not bool_only: frame = self._raise_on_non_boolean("any") else: frame = self._drop_non_bool() block = frame._block.aggregate_all_and_stack(agg_ops.any_op, axis=axis) return bigframes.series.Series(block)
[docs] def all( self, axis: typing.Union[str, int] = 0, *, bool_only: bool = False ) -> bigframes.series.Series: if not bool_only: frame = self._raise_on_non_boolean("all") else: frame = self._drop_non_bool() block = frame._block.aggregate_all_and_stack(agg_ops.all_op, axis=axis) return bigframes.series.Series(block)
[docs] def sum( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False ) -> bigframes.series.Series: if not numeric_only: frame = self._raise_on_non_numeric("sum") else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.sum_op, axis=axis) return bigframes.series.Series(block)
[docs] def mean( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False ) -> bigframes.series.Series: if not numeric_only: frame = self._raise_on_non_numeric("mean") else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.mean_op, axis=axis) return bigframes.series.Series(block)
[docs] def median( self, *, numeric_only: bool = False, exact: bool = True ) -> bigframes.series.Series: if not numeric_only: frame = self._raise_on_non_numeric("median") else: frame = self._drop_non_numeric() if exact: result = frame.quantile() result.name = None return result else: block = frame._block.aggregate_all_and_stack(agg_ops.median_op) return bigframes.series.Series(block)
[docs] def quantile( self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False ): if not numeric_only: frame = self._raise_on_non_numeric("median") else: frame = self._drop_non_numeric() multi_q = utils.is_list_like(q) result = block_ops.quantile( frame._block, frame._block.value_columns, qs=tuple(q) if multi_q else (q,) # type: ignore ) if multi_q: return DataFrame(result.stack()).droplevel(0) else: # Drop the last level, which contains q, unnecessary since only one q result = result.with_column_labels(result.column_labels.droplevel(-1)) result, index_col = result.create_constant(q, None) result = result.set_index([index_col]) return bigframes.series.Series( result.transpose(original_row_index=pandas.Index([q])) )
[docs] def std( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False ) -> bigframes.series.Series: if not numeric_only: frame = self._raise_on_non_numeric("std") else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.std_op, axis=axis) return bigframes.series.Series(block)
[docs] def var( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False ) -> bigframes.series.Series: if not numeric_only: frame = self._raise_on_non_numeric("var") else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.var_op, axis=axis) return bigframes.series.Series(block)
[docs] def min( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False ) -> bigframes.series.Series: if not numeric_only: frame = self._raise_on_non_numeric("min") else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.min_op, axis=axis) return bigframes.series.Series(block)
[docs] def max( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False ) -> bigframes.series.Series: if not numeric_only: frame = self._raise_on_non_numeric("max") else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.max_op, axis=axis) return bigframes.series.Series(block)
[docs] def prod( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False ) -> bigframes.series.Series: if not numeric_only: frame = self._raise_on_non_numeric("prod") else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.product_op, axis=axis) return bigframes.series.Series(block)
product = prod product.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.prod)
[docs] def count(self, *, numeric_only: bool = False) -> bigframes.series.Series: if not numeric_only: frame = self else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.count_op) return bigframes.series.Series(block)
[docs] def nunique(self) -> bigframes.series.Series: block = self._block.aggregate_all_and_stack(agg_ops.nunique_op) return bigframes.series.Series(block)
[docs] def agg(self, func) -> DataFrame | bigframes.series.Series: if utils.is_dict_like(func): # Must check dict-like first because dictionaries are list-like # according to Pandas. aggs = [] labels = [] funcnames = [] for col_label, agg_func in func.items(): agg_func_list = agg_func if utils.is_list_like(agg_func) else [agg_func] col_id = self._block.resolve_label_exact(col_label) if col_id is None: raise KeyError(f"Column {col_label} does not exist") for agg_func in agg_func_list: op_and_label = agg_ops.lookup_agg_func(agg_func) agg_expr = ( agg_expressions.UnaryAggregation( op_and_label[0], ex.deref(col_id) ) if isinstance(op_and_label[0], agg_ops.UnaryAggregateOp) else agg_expressions.NullaryAggregation(op_and_label[0]) ) aggs.append(agg_expr) labels.append(col_label) funcnames.append(op_and_label[1]) # if any list in dict values, format output differently if any(utils.is_list_like(v) for v in func.values()): new_index, _ = self.columns.reindex(labels) new_index = utils.combine_indices(new_index, pandas.Index(funcnames)) agg_block, _ = self._block.aggregate( aggregations=aggs, column_labels=new_index ) return DataFrame(agg_block).stack().droplevel(0, axis="index") else: new_index, _ = self.columns.reindex(labels) agg_block, _ = self._block.aggregate( aggregations=aggs, column_labels=new_index ) return bigframes.series.Series( agg_block.transpose( single_row_mode=True, original_row_index=pandas.Index([None]) ) ) elif utils.is_list_like(func): aggregations = [agg_ops.lookup_agg_func(f)[0] for f in func] for dtype, agg in itertools.product(self.dtypes, aggregations): agg.output_type( dtype ) # Raises exception if the agg does not support the dtype. return DataFrame( self._block.summarize( self._block.value_columns, aggregations, ) ) else: # function name string return bigframes.series.Series( self._block.aggregate_all_and_stack(agg_ops.lookup_agg_func(func)[0]) )
aggregate = agg aggregate.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.agg)
[docs] @validations.requires_index @validations.requires_ordering() def idxmin(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmin(self._block))
[docs] @validations.requires_index @validations.requires_ordering() def idxmax(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmax(self._block))
[docs] @validations.requires_ordering() def melt( self, id_vars: typing.Optional[typing.Iterable[typing.Hashable]] = None, value_vars: typing.Optional[typing.Iterable[typing.Hashable]] = None, var_name: typing.Union[ typing.Hashable, typing.Sequence[typing.Hashable] ] = None, value_name: typing.Hashable = "value", ): if var_name is None: # Determine default var_name. Attempt to use column labels if they are unique if self.columns.nlevels > 1: if len(set(self.columns.names)) == len(self.columns.names): var_name = self.columns.names else: var_name = [f"variable_{i}" for i in range(len(self.columns.names))] else: var_name = self.columns.name or "variable" var_name = tuple(var_name) if utils.is_list_like(var_name) else (var_name,) if id_vars is not None: id_col_ids = [self._resolve_label_exact(col) for col in id_vars] else: id_col_ids = [] if value_vars is not None: val_col_ids = [self._resolve_label_exact(col) for col in value_vars] else: val_col_ids = [ col_id for col_id in self._block.value_columns if col_id not in id_col_ids ] return DataFrame( self._block.melt(id_col_ids, val_col_ids, var_name, value_name) )
[docs] def describe(self, include: None | Literal["all"] = None) -> DataFrame: from bigframes.pandas.core.methods import describe return typing.cast(DataFrame, describe.describe(self, include))
[docs] def skew(self, *, numeric_only: bool = False): if not numeric_only: frame = self._raise_on_non_numeric("skew") else: frame = self._drop_non_numeric() result_block = block_ops.skew(frame._block, frame._block.value_columns) return bigframes.series.Series(result_block)
[docs] def kurt(self, *, numeric_only: bool = False): if not numeric_only: frame = self._raise_on_non_numeric("kurt") else: frame = self._drop_non_numeric() result_block = block_ops.kurt(frame._block, frame._block.value_columns) return bigframes.series.Series(result_block)
kurtosis = kurt kurtosis.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.kurt) def _pivot( self, *, columns: typing.Union[blocks.Label, Sequence[blocks.Label]], columns_unique_values: typing.Optional[ typing.Union[pandas.Index, Sequence[object]] ] = None, index: typing.Optional[ typing.Union[blocks.Label, Sequence[blocks.Label]] ] = None, values: typing.Optional[ typing.Union[blocks.Label, Sequence[blocks.Label]] ] = None, ) -> DataFrame: if index: block = self.set_index(index)._block else: block = self._block column_ids = self._sql_names(columns) if values: value_col_ids = self._sql_names(values) else: value_col_ids = [ col for col in block.value_columns if col not in column_ids ] pivot_block = block.pivot( columns=column_ids, values=value_col_ids, columns_unique_values=columns_unique_values, values_in_index=utils.is_list_like(values), ) return DataFrame(pivot_block)
[docs] def pivot( self, *, columns: typing.Union[blocks.Label, Sequence[blocks.Label]], index: typing.Optional[ typing.Union[blocks.Label, Sequence[blocks.Label]] ] = None, values: typing.Optional[ typing.Union[blocks.Label, Sequence[blocks.Label]] ] = None, ) -> DataFrame: return self._pivot(columns=columns, index=index, values=values)
[docs] def pivot_table( self, values: typing.Optional[ typing.Union[blocks.Label, Sequence[blocks.Label]] ] = None, index: typing.Optional[ typing.Union[blocks.Label, Sequence[blocks.Label]] ] = None, columns: typing.Union[blocks.Label, Sequence[blocks.Label]] = None, aggfunc: str = "mean", fill_value=None, margins: bool = False, dropna: bool = True, margins_name: Hashable = "All", observed: bool = False, sort: bool = True, ) -> DataFrame: if margins: raise NotImplementedError( "DataFrame.pivot_table margins arg not supported. {constants.FEEDBACK_LINK}" ) if not dropna: raise NotImplementedError( "DataFrame.pivot_table dropna arg not supported. {constants.FEEDBACK_LINK}" ) if margins_name != "All": raise NotImplementedError( "DataFrame.pivot_table margins_name arg not supported. {constants.FEEDBACK_LINK}" ) if observed: raise NotImplementedError( "DataFrame.pivot_table observed arg not supported. {constants.FEEDBACK_LINK}" ) if isinstance(index, Iterable) and not ( isinstance(index, blocks.Label) and index in self.columns ): index = list(index) else: index = [index] if isinstance(columns, Iterable) and not ( isinstance(columns, blocks.Label) and columns in self.columns ): columns = list(columns) else: columns = [columns] if isinstance(values, Iterable) and not ( isinstance(values, blocks.Label) and values in self.columns ): values = list(values) else: values = [values] # Unlike pivot, pivot_table has values always ordered. values.sort(key=lambda val: typing.cast("SupportsRichComparison", val)) keys = index + columns agged = self.groupby(keys, dropna=True)[values].agg(aggfunc) if isinstance(agged, bigframes.series.Series): agged = agged.to_frame() agged = agged.dropna(how="all") if len(values) == 1: agged = agged.rename(columns={agged.columns[0]: values[0]}) agged = agged.reset_index() pivoted = agged.pivot( columns=columns, index=index, values=values if len(values) > 1 else None, ) if fill_value is not None: pivoted = pivoted.fillna(fill_value) if sort: pivoted = pivoted.sort_index() # TODO: Remove the reordering step once the issue is resolved. # The pivot_table method results in multi-index columns that are always ordered. # However, the order of the pivoted result columns is not guaranteed to be sorted. # Sort and reorder. return pivoted.sort_index(axis=1) # type: ignore
[docs] def stack(self, level: LevelsType = -1): if not isinstance(self.columns, pandas.MultiIndex): if level not in [0, -1, self.columns.name]: raise IndexError(f"Invalid level {level} for single-level index") return self._stack_mono() return self._stack_multi(level)
def _stack_mono(self): result_block = self._block.stack() return bigframes.series.Series(result_block) def _stack_multi(self, level: LevelsType = -1): n_levels = self.columns.nlevels if not utils.is_list_like(level): level = [level] level_indices = [] for level_ref in level: if isinstance(level_ref, int): if level_ref < 0: level_indices.append(n_levels + level_ref) else: level_indices.append(level_ref) else: # str level_indices.append(self.columns.names.index(level_ref)) # type: ignore new_order = [ *[i for i in range(n_levels) if i not in level_indices], *level_indices, ] original_columns = typing.cast(pandas.MultiIndex, self.columns) new_columns = original_columns.reorder_levels(new_order) block = self._block.with_column_labels(new_columns) block = block.stack(levels=len(level)) return DataFrame(block)
[docs] @validations.requires_index @validations.requires_ordering() def unstack(self, level: LevelsType = -1): if not utils.is_list_like(level): level = [level] block = self._block # Special case, unstack with mono-index transpose into a series if self.index.nlevels == 1: block = block.stack(how="right", levels=self.columns.nlevels) return bigframes.series.Series(block) # Pivot by index levels unstack_ids = self._resolve_levels(level) block = block.reset_index(drop=False) block = block.set_index( [col for col in self._block.index_columns if col not in unstack_ids] ) pivot_block = block.pivot( columns=unstack_ids, values=self._block.value_columns, values_in_index=True, ) return DataFrame(pivot_block)
def _drop_non_numeric(self, permissive=True) -> DataFrame: numeric_types = ( set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) if permissive else set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE) ) non_numeric_cols = [ col_id for col_id, dtype in zip(self._block.value_columns, self._block.dtypes) if dtype not in numeric_types ] return DataFrame(self._block.drop_columns(non_numeric_cols)) def _drop_non_bool(self) -> DataFrame: non_bool_cols = [ col_id for col_id, dtype in zip(self._block.value_columns, self._block.dtypes) if dtype not in bigframes.dtypes.BOOL_BIGFRAMES_TYPES ] return DataFrame(self._block.drop_columns(non_bool_cols)) def _raise_on_non_numeric(self, op: str): if not all( dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE for dtype in self._block.dtypes ): raise NotImplementedError( f"'{op}' does not support non-numeric columns. " f"Set 'numeric_only'=True to ignore non-numeric columns. {constants.FEEDBACK_LINK}" ) return self def _raise_on_non_boolean(self, op: str): if not all( dtype in bigframes.dtypes.BOOL_BIGFRAMES_TYPES for dtype in self._block.dtypes ): raise NotImplementedError( f"'{op}' does not support non-bool columns. " f"Set 'bool_only'=True to ignore non-bool columns. {constants.FEEDBACK_LINK}" ) return self
[docs] def merge( self, right: DataFrame, how: Literal[ "inner", "left", "outer", "right", "cross", ] = "inner", on: Union[blocks.Label, Sequence[blocks.Label], None] = None, *, left_on: Union[blocks.Label, Sequence[blocks.Label], None] = None, right_on: Union[blocks.Label, Sequence[blocks.Label], None] = None, left_index: bool = False, right_index: bool = False, sort: bool = False, suffixes: tuple[str, str] = ("_x", "_y"), ) -> DataFrame: from bigframes.core.reshape import merge return merge.merge( self, right, how, on, left_on=left_on, right_on=right_on, left_index=left_index, right_index=right_index, sort=sort, suffixes=suffixes, )
[docs] def join( self, other: Union[DataFrame, bigframes.series.Series], on: Optional[str] = None, how: str = "left", lsuffix: str = "", rsuffix: str = "", ) -> DataFrame: if isinstance(other, bigframes.series.Series): other = other.to_frame() left, right = self, other col_intersection = left.columns.intersection(right.columns) if not col_intersection.empty: if lsuffix == rsuffix == "": raise ValueError( f"columns overlap but no suffix specified: {col_intersection}" ) if how == "cross": if on is not None: raise ValueError("'on' is not supported for cross join.") result_block = left._block.merge( right._block, left_join_ids=[], right_join_ids=[], suffixes=(lsuffix, rsuffix), how="cross", sort=True, ) return DataFrame(result_block) # Join left columns with right index if on is not None: if left._has_index and (on in left.index.names): if on in left.columns: raise ValueError( f"'{on}' is both an index level and a column label, which is ambiguous." ) else: raise NotImplementedError( f"Joining on index level '{on}' is not yet supported. {constants.FEEDBACK_LINK}" ) if (left.columns == on).sum() > 1: raise ValueError(f"The column label '{on}' is not unique.") if other._block.index.nlevels != 1: raise ValueError( "Join on columns must match the index level of the other DataFrame. Join on column with multi-index haven't been supported." ) return self._join_on_key( other, on=on, how=how, lsuffix=lsuffix, rsuffix=rsuffix, should_duplicate_on_key=(on in col_intersection), ) # Join left index with right index if left._block.index.nlevels != right._block.index.nlevels: raise ValueError("Index to join on must have the same number of levels.") return left._perform_join_by_index(right, how=how)._add_join_suffix( left.columns, right.columns, lsuffix=lsuffix, rsuffix=rsuffix )
def _join_on_key( self, other: DataFrame, on: str, how: str, lsuffix: str, rsuffix: str, should_duplicate_on_key: bool, ) -> DataFrame: left, right = self.copy(), other # Replace all columns names with unique names for reordering. left_col_original_names = left.columns on_col_name = "bigframes_left_col_on" dup_on_col_name = "bigframes_left_col_on_dup" left_col_temp_names = [ f"bigframes_left_col_name_{i}" if col_name != on else on_col_name for i, col_name in enumerate(left_col_original_names) ] left.columns = pandas.Index(left_col_temp_names) # if on column is also in right df, we need to duplicate the column # and set it to be the first column if should_duplicate_on_key: left[dup_on_col_name] = left[on_col_name] on_col_name = dup_on_col_name left_col_temp_names = [on_col_name] + left_col_temp_names left = left[left_col_temp_names] # Switch left index with on column left_idx_original_names = left.index.names if left._has_index else () left_idx_names_in_cols = [ f"bigframes_left_idx_name_{i}" for i in range(len(left_idx_original_names)) ] if left._has_index: left.index.names = left_idx_names_in_cols left = left.reset_index(drop=False) left = left.set_index(on_col_name) right_col_original_names = right.columns right_col_temp_names = [ f"bigframes_right_col_name_{i}" for i in range(len(right_col_original_names)) ] right.columns = pandas.Index(right_col_temp_names) # Join on index and switch back combined_df = left._perform_join_by_index(right, how=how) combined_df.index.name = on_col_name combined_df = combined_df.reset_index(drop=False) combined_df = combined_df.set_index(left_idx_names_in_cols) # To be consistent with Pandas if combined_df._has_index: combined_df.index.names = ( left_idx_original_names if how in ("inner", "left") else ([None] * len(combined_df.index.names)) ) # Reorder columns combined_df = combined_df[left_col_temp_names + right_col_temp_names] return combined_df._add_join_suffix( left_col_original_names, right_col_original_names, lsuffix=lsuffix, rsuffix=rsuffix, extra_col=on if on_col_name == dup_on_col_name else None, ) def _perform_join_by_index( self, other: Union[DataFrame, indexes.Index], *, how: str = "left", always_order: bool = False, ): block, _ = self._block.join( other._block, how=how, block_identity_join=True, always_order=always_order ) return DataFrame(block) def _add_join_suffix( self, left_columns, right_columns, lsuffix: str = "", rsuffix: str = "", extra_col: typing.Optional[str] = None, ): """Applies suffixes to overlapping column names to mimic a pandas join. This method identifies columns that are common to both a "left" and "right" set of columns and renames them using the provided suffixes. Columns that are not in the intersection are kept with their original names. Args: left_columns (pandas.Index): The column labels from the left DataFrame. right_columns (pandas.Index): The column labels from the right DataFrame. lsuffix (str): The suffix to apply to overlapping column names from the left side. rsuffix (str): The suffix to apply to overlapping column names from the right side. extra_col (typing.Optional[str]): An optional column name to prepend to the final list of columns. This argument is used specifically to match the behavior of a pandas join. When a join key (i.e., the 'on' column) exists in both the left and right DataFrames, pandas creates two versions of that column: one copy keeps its original name and is placed as the first column, while the other instances receive the normal suffix. Passing the join key's name here replicates that behavior. Returns: DataFrame: A new DataFrame with the columns renamed to resolve overlaps. """ combined_df = self.copy() col_intersection = left_columns.intersection(right_columns) final_col_names = [] if extra_col is None else [extra_col] for col_name in left_columns: if col_name in col_intersection: final_col_names.append(f"{col_name}{lsuffix}") else: final_col_names.append(col_name) for col_name in right_columns: if col_name in col_intersection: final_col_names.append(f"{col_name}{rsuffix}") else: final_col_names.append(col_name) combined_df.columns = pandas.Index(final_col_names) return combined_df
[docs] @validations.requires_ordering() def rolling( self, window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, min_periods=None, on: str | None = None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> bigframes.core.window.Window: if isinstance(window, int): window_def = windows.WindowSpec( bounds=windows.RowsWindowBounds.from_window_size(window, closed), min_periods=min_periods if min_periods is not None else window, ) skip_agg_col_id = ( None if on is None else self._block.resolve_label_exact_or_error(on) ) return bigframes.core.window.Window( self._block, window_def, self._block.value_columns, skip_agg_column_id=skip_agg_col_id, ) return rolling.create_range_window( self._block, window, min_periods=min_periods, on=on, closed=closed, is_series=False, )
[docs] @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window = windows.cumulative_rows(min_periods=min_periods) return bigframes.core.window.Window( self._block, window, self._block.value_columns )
[docs] def groupby( self, by: typing.Union[ blocks.Label, bigframes.series.Series, typing.Sequence[typing.Union[blocks.Label, bigframes.series.Series]], None, ] = None, *, level: typing.Optional[LevelsType] = None, as_index: bool = True, dropna: bool = True, ) -> groupby.DataFrameGroupBy: if (by is not None) and (level is not None): raise ValueError("Do not specify both 'by' and 'level'") if by is not None: return self._groupby_series(by, as_index=as_index, dropna=dropna) if level is not None: return self._groupby_level(level, as_index=as_index, dropna=dropna) else: raise TypeError("You have to supply one of 'by' and 'level'")
@validations.requires_index def _groupby_level( self, level: LevelsType, as_index: bool = True, dropna: bool = True, ): if utils.is_list_like(level): by_key_is_singular = False else: by_key_is_singular = True return groupby.DataFrameGroupBy( self._block, by_col_ids=self._resolve_levels(level), as_index=as_index, dropna=dropna, by_key_is_singular=by_key_is_singular, ) def _groupby_series( self, by: typing.Union[ blocks.Label, bigframes.series.Series, typing.Sequence[typing.Union[blocks.Label, bigframes.series.Series]], ], as_index: bool = True, dropna: bool = True, ): # Pandas makes a distinction between groupby with a list of keys # versus groupby with a single item in some methods, like __iter__. if not isinstance(by, bigframes.series.Series) and utils.is_list_like(by): by = list(by) by_key_is_singular = False else: by = [typing.cast(typing.Union[blocks.Label, bigframes.series.Series], by)] by_key_is_singular = True block = self._block col_ids: typing.Sequence[str] = [] for key in by: if isinstance(key, bigframes.series.Series): block, ( get_column_left, get_column_right, ) = block.join(key._block, how="inner" if dropna else "left") col_ids = [ *[get_column_left[value] for value in col_ids], get_column_right[key._value_column], ] else: # Interpret as index level or column name col_matches = block.label_to_col_id.get(key, []) level_matches = block.index_name_to_col_id.get(key, []) matches = [*col_matches, *level_matches] if len(matches) != 1: raise ValueError( f"GroupBy key {key} does not match a unique column or index level. BigQuery DataFrames only interprets lists of strings as column or index names, not directly as per-row group assignments." ) col_ids = [*col_ids, matches[0]] return groupby.DataFrameGroupBy( block, by_col_ids=col_ids, as_index=as_index, dropna=dropna, by_key_is_singular=by_key_is_singular, )
[docs] def abs(self) -> DataFrame: return self._apply_unary_op(ops.abs_op)
[docs] def round(self, decimals: Union[int, dict[Hashable, int]] = 0) -> DataFrame: is_mapping = utils.is_dict_like(decimals) if not (is_mapping or isinstance(decimals, int)): raise TypeError("'decimals' must be either a dict-like or integer.") block = self._block exprs = [] for label, col_id, dtype in zip( block.column_labels, block.value_columns, block.dtypes ): if dtype in set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) - { bigframes.dtypes.BOOL_DTYPE }: if is_mapping: if label in decimals: # type: ignore exprs.append( ops.round_op.as_expr( col_id, ex.const( decimals[label], dtype=bigframes.dtypes.INT_DTYPE # type: ignore ), ) ) else: exprs.append(ex.deref(col_id)) else: exprs.append( ops.round_op.as_expr( col_id, ex.const( typing.cast(int, decimals), dtype=bigframes.dtypes.INT_DTYPE, ), ) ) else: exprs.append(ex.deref(col_id)) return DataFrame( block.project_exprs(exprs, labels=block.column_labels, drop=True) )
[docs] def isna(self) -> DataFrame: return self._apply_unary_op(ops.isnull_op)
isnull = isna isnull.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.isna)
[docs] def notna(self) -> DataFrame: return self._apply_unary_op(ops.notnull_op)
notnull = notna notnull.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.notna)
[docs] @validations.requires_ordering() def cumsum(self): is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) for _, dtype in self.dtypes.items() ] if not all(is_numeric_types): raise ValueError("All values must be numeric to apply cumsum.") return self._apply_window_op( agg_ops.sum_op, windows.cumulative_rows(), )
[docs] @validations.requires_ordering() def cumprod(self) -> DataFrame: is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) for _, dtype in self.dtypes.items() ] if not all(is_numeric_types): raise ValueError("All values must be numeric to apply cumsum.") return self._apply_window_op( agg_ops.product_op, windows.cumulative_rows(), )
[docs] @validations.requires_ordering() def cummin(self) -> DataFrame: return self._apply_window_op( agg_ops.min_op, windows.cumulative_rows(), )
[docs] @validations.requires_ordering() def cummax(self) -> DataFrame: return self._apply_window_op( agg_ops.max_op, windows.cumulative_rows(), )
[docs] @validations.requires_ordering() def shift(self, periods: int = 1) -> DataFrame: window_spec = windows.rows() return self._apply_window_op(agg_ops.ShiftOp(periods), window_spec)
[docs] @validations.requires_ordering() def diff(self, periods: int = 1) -> DataFrame: window_spec = windows.rows() return self._apply_window_op(agg_ops.DiffOp(periods), window_spec)
[docs] @validations.requires_ordering() def pct_change(self, periods: int = 1) -> DataFrame: # Future versions of pandas will not perfrom ffill automatically df = self.ffill() return DataFrame(block_ops.pct_change(df._block, periods=periods))
def _apply_window_op( self, op: agg_ops.UnaryWindowOp, window_spec: windows.WindowSpec, ): block, result_ids = self._block.multi_apply_window_op( self._block.value_columns, op, window_spec=window_spec, ) if op.skips_nulls: block = block.project_exprs( tuple( bigframes.operations.where_op.as_expr( r_col, bigframes.operations.notnull_op.as_expr(og_col), ex.const(None), ) for og_col, r_col in zip(self._block.value_columns, result_ids) ), labels=self._block.column_labels, drop=True, ) else: block = block.select_columns(result_ids) return DataFrame(block)
[docs] @validations.requires_ordering() def sample( self, n: Optional[int] = None, frac: Optional[float] = None, *, random_state: Optional[int] = None, sort: Optional[bool | Literal["random"]] = "random", ) -> DataFrame: if n is not None and frac is not None: raise ValueError("Only one of 'n' or 'frac' parameter can be specified.") ns = (n,) if n is not None else () fracs = (frac,) if frac is not None else () return DataFrame( self._block.split(ns=ns, fracs=fracs, random_state=random_state, sort=sort)[ 0 ] )
[docs] def explode( self, column: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], *, ignore_index: Optional[bool] = False, ) -> DataFrame: column_labels = bigframes.core.explode.check_column(column) column_ids = [self._resolve_label_exact(label) for label in column_labels] missing = [ column_labels[i] for i in range(len(column_ids)) if column_ids[i] is None ] if len(missing) > 0: raise KeyError(f"None of {missing} are in the columns") return DataFrame( self._block.explode( column_ids=typing.cast(typing.Sequence[str], tuple(column_ids)), ignore_index=ignore_index, ) )
def _split( self, ns: Iterable[int] = (), fracs: Iterable[float] = (), *, random_state: Optional[int] = None, ) -> List[DataFrame]: """Internal function to support splitting DF to multiple parts along index axis. At most one of ns and fracs can be passed in. If neither, default to ns = (1,). Return a list of sampled DataFrames. """ blocks = self._block.split(ns=ns, fracs=fracs, random_state=random_state) return [DataFrame(block) for block in blocks]
[docs] @validations.requires_ordering() def resample( self, rule: str, *, closed: Optional[Literal["right", "left"]] = None, label: Optional[Literal["right", "left"]] = None, on: blocks.Label = None, level: Optional[LevelsType] = None, origin: Union[ Union[ pandas.Timestamp, datetime.datetime, numpy.datetime64, int, float, str ], Literal["epoch", "start", "start_day", "end", "end_day"], ] = "start_day", ) -> bigframes.core.groupby.DataFrameGroupBy: block = self._block._generate_resample_label( rule=rule, closed=closed, label=label, on=on, level=level, origin=origin, ) df = DataFrame(block) return df.groupby(level=0)
[docs] @classmethod def from_dict( cls, data: dict, orient: str = "columns", dtype=None, columns=None, ) -> DataFrame: return cls(pandas.DataFrame.from_dict(data, orient, dtype, columns)) # type: ignore
[docs] @classmethod def from_records( cls, data, index=None, exclude=None, columns=None, coerce_float: bool = False, nrows: int | None = None, ) -> DataFrame: return cls( pandas.DataFrame.from_records( data, index, exclude, columns, coerce_float, nrows ) )
[docs] def to_csv( self, path_or_buf=None, sep=",", *, header: bool = True, index: bool = True, allow_large_results: Optional[bool] = None, ) -> Optional[str]: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. # TODO(swast): Some warning that wildcard is recommended for large # query results? See: # https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size if not utils.is_gcs_path(path_or_buf): pd_df = self.to_pandas(allow_large_results=allow_large_results) return pd_df.to_csv(path_or_buf, sep=sep, header=header, index=index) if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) export_array, id_overrides = self._prepare_export( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) options: dict[str, Union[bool, str]] = { "field_delimiter": sep, "header": header, } result = self._session._executor.execute( export_array.rename_columns(id_overrides), ex_spec.ExecutionSpec( ex_spec.GcsOutputSpec( uri=path_or_buf, format="csv", export_options=tuple(options.items()) ) ), ) self._set_internal_query_job(result.query_job) return None
[docs] def to_json( self, path_or_buf=None, orient: Optional[ Literal["split", "records", "index", "columns", "values", "table"] ] = None, *, lines: bool = False, index: bool = True, allow_large_results: Optional[bool] = None, ) -> Optional[str]: # TODO(swast): Can we support partition columns argument? if not utils.is_gcs_path(path_or_buf): pd_df = self.to_pandas(allow_large_results=allow_large_results) return pd_df.to_json( path_or_buf, orient=orient, lines=lines, index=index, default_handler=str, ) if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) # TODO(ashleyxu) Support lines=False for small tables with arrays and TO_JSON_STRING. # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#to_json_string if lines is False: raise NotImplementedError( f"Only newline-delimited JSON is supported. Add `lines=True` to your function call. {constants.FEEDBACK_LINK}" ) if lines is True and orient != "records": raise ValueError( "'lines' keyword is only valid when 'orient' is 'records'." ) export_array, id_overrides = self._prepare_export( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) result = self._session._executor.execute( export_array.rename_columns(id_overrides), ex_spec.ExecutionSpec( ex_spec.GcsOutputSpec(uri=path_or_buf, format="json", export_options=()) ), ) self._set_internal_query_job(result.query_job) return None
[docs] def to_gbq( self, destination_table: Optional[str] = None, *, if_exists: Optional[Literal["fail", "replace", "append"]] = None, index: bool = True, ordering_id: Optional[str] = None, clustering_columns: Union[pandas.Index, Iterable[typing.Hashable]] = (), labels: dict[str, str] = {}, ) -> str: index = index and self._has_index temp_table_ref = None if destination_table is None: if if_exists is not None and if_exists != "replace": raise ValueError( f"Got invalid value {repr(if_exists)} for if_exists. " "When no destination table is specified, a new table is always created. " "None or 'replace' are the only valid options in this case." ) if_exists = "replace" # The client code owns this table reference now temp_table_ref = ( self._session._anon_dataset_manager.generate_unique_resource_id() ) destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}" table_parts = destination_table.split(".") default_project = self._block.expr.session.bqclient.project if len(table_parts) == 2: destination_dataset = f"{default_project}.{table_parts[0]}" elif len(table_parts) == 3: destination_dataset = f"{table_parts[0]}.{table_parts[1]}" else: raise ValueError( f"Got invalid value for destination_table {repr(destination_table)}. " "Should be of the form 'datasetId.tableId' or 'projectId.datasetId.tableId'." ) if if_exists is None: if_exists = "fail" valid_if_exists = ["fail", "replace", "append"] if if_exists not in valid_if_exists: raise ValueError( f"Got invalid value {repr(if_exists)} for if_exists. " f"Valid options include None or one of {valid_if_exists}." ) try: self._session.bqclient.get_dataset(destination_dataset) except google.api_core.exceptions.NotFound: self._session.bqclient.create_dataset(destination_dataset, exists_ok=True) clustering_fields = self._map_clustering_columns( clustering_columns, index=index ) export_array, id_overrides = self._prepare_export( index=index and self._has_index, ordering_id=ordering_id ) destination: bigquery.table.TableReference = ( bigquery.table.TableReference.from_string( destination_table, default_project=default_project, ) ) result = self._session._executor.execute( export_array.rename_columns(id_overrides), ex_spec.ExecutionSpec( ex_spec.TableOutputSpec( destination, cluster_cols=tuple(clustering_fields), if_exists=if_exists, ) ), ) assert result.query_job is not None self._set_internal_query_job(result.query_job) # The query job should have finished, so there should be always be a result table. result_table = result.query_job.destination assert result_table is not None if temp_table_ref: bigframes.session._io.bigquery.set_table_expiration( self._session.bqclient, temp_table_ref, datetime.datetime.now(datetime.timezone.utc) + bigframes.constants.DEFAULT_EXPIRATION, ) if len(labels) != 0: table = bigquery.Table(result_table) table.labels = labels self._session.bqclient.update_table(table, ["labels"]) return destination_table
[docs] def to_numpy( self, dtype=None, copy=False, na_value=pd_ext.no_default, *, allow_large_results=None, **kwargs, ) -> numpy.ndarray: return self.to_pandas(allow_large_results=allow_large_results).to_numpy( dtype, copy, na_value, **kwargs )
def __array__(self, dtype=None, copy: Optional[bool] = None) -> numpy.ndarray: if copy is False: raise ValueError("Cannot convert to array without copy.") return self.to_numpy(dtype=dtype) __array__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__array__)
[docs] def to_parquet( self, path=None, *, compression: Optional[Literal["snappy", "gzip"]] = "snappy", index: bool = True, allow_large_results: Optional[bool] = None, ) -> Optional[bytes]: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. # TODO(swast): Some warning that wildcard is recommended for large # query results? See: # https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size if not utils.is_gcs_path(path): pd_df = self.to_pandas(allow_large_results=allow_large_results) return pd_df.to_parquet(path, compression=compression, index=index) if "*" not in path: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) if compression not in {None, "snappy", "gzip"}: raise ValueError("'{0}' is not valid for compression".format(compression)) export_options: Dict[str, Union[bool, str]] = {} if compression: export_options["compression"] = compression.upper() export_array, id_overrides = self._prepare_export( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) result = self._session._executor.execute( export_array.rename_columns(id_overrides), ex_spec.ExecutionSpec( ex_spec.GcsOutputSpec( uri=path, format="parquet", export_options=tuple(export_options.items()), ) ), ) self._set_internal_query_job(result.query_job) return None
[docs] def to_dict( self, orient: Literal[ "dict", "list", "series", "split", "tight", "records", "index" ] = "dict", into: type[dict] = dict, *, allow_large_results: Optional[bool] = None, **kwargs, ) -> dict | list[dict]: return self.to_pandas(allow_large_results=allow_large_results).to_dict(orient=orient, into=into, **kwargs) # type: ignore
[docs] def to_excel( self, excel_writer, sheet_name: str = "Sheet1", *, allow_large_results: Optional[bool] = None, **kwargs, ) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_excel( excel_writer, sheet_name, **kwargs )
[docs] def to_latex( self, buf=None, columns: Sequence | None = None, header: bool | Sequence[str] = True, index: bool = True, *, allow_large_results: Optional[bool] = None, **kwargs, ) -> str | None: return self.to_pandas(allow_large_results=allow_large_results).to_latex( buf, columns=columns, header=header, index=index, **kwargs # type: ignore )
[docs] def to_records( self, index: bool = True, column_dtypes=None, index_dtypes=None, *, allow_large_results=None, ) -> numpy.recarray: return self.to_pandas(allow_large_results=allow_large_results).to_records( index, column_dtypes, index_dtypes )
[docs] def to_string( self, buf=None, columns: Sequence[str] | None = None, col_space=None, header: bool | Sequence[str] = True, index: bool = True, na_rep: str = "NaN", formatters=None, float_format=None, sparsify: bool | None = None, index_names: bool = True, justify: str | None = None, max_rows: int | None = None, max_cols: int | None = None, show_dimensions: bool = False, decimal: str = ".", line_width: int | None = None, min_rows: int | None = None, max_colwidth: int | None = None, encoding: str | None = None, *, allow_large_results: Optional[bool] = None, ) -> str | None: return self.to_pandas(allow_large_results=allow_large_results).to_string( buf, columns=columns, # type: ignore col_space=col_space, header=header, # type: ignore index=index, na_rep=na_rep, formatters=formatters, float_format=float_format, sparsify=sparsify, index_names=index_names, justify=justify, max_rows=max_rows, max_cols=max_cols, show_dimensions=show_dimensions, decimal=decimal, line_width=line_width, min_rows=min_rows, max_colwidth=max_colwidth, encoding=encoding, )
[docs] def to_html( self, buf=None, columns: Sequence[str] | None = None, col_space=None, header: bool = True, index: bool = True, na_rep: str = "NaN", formatters=None, float_format=None, sparsify: bool | None = None, index_names: bool = True, justify: str | None = None, max_rows: int | None = None, max_cols: int | None = None, show_dimensions: bool = False, decimal: str = ".", bold_rows: bool = True, classes: str | list | tuple | None = None, escape: bool = True, notebook: bool = False, border: int | None = None, table_id: str | None = None, render_links: bool = False, encoding: str | None = None, *, allow_large_results: bool | None = None, ) -> str: return self.to_pandas(allow_large_results=allow_large_results).to_html( buf, columns=columns, # type: ignore col_space=col_space, header=header, index=index, na_rep=na_rep, formatters=formatters, float_format=float_format, sparsify=sparsify, index_names=index_names, justify=justify, # type: ignore max_rows=max_rows, max_cols=max_cols, show_dimensions=show_dimensions, decimal=decimal, bold_rows=bold_rows, classes=classes, escape=escape, notebook=notebook, border=border, table_id=table_id, render_links=render_links, encoding=encoding, )
[docs] def to_markdown( self, buf=None, mode: str = "wt", index: bool = True, *, allow_large_results: Optional[bool] = None, **kwargs, ) -> str | None: return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore
[docs] def to_pickle(self, path, *, allow_large_results=None, **kwargs) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_pickle( path, **kwargs )
[docs] def to_orc(self, path=None, *, allow_large_results=None, **kwargs) -> bytes | None: as_pandas = self.to_pandas(allow_large_results=allow_large_results) # to_orc only works with default index as_pandas_default_index = as_pandas.reset_index() return as_pandas_default_index.to_orc(path, **kwargs)
def _apply_unary_op(self, operation: ops.UnaryOp) -> DataFrame: block = self._block.multi_apply_unary_op(operation) return DataFrame(block) def _map_clustering_columns( self, clustering_columns: Union[pandas.Index, Iterable[typing.Hashable]], index: bool, ) -> List[str]: """Maps the provided clustering columns to the existing columns in the DataFrame.""" def map_columns_on_occurrence(columns): mapped_columns = [] for col in clustering_columns: if col in columns: count = columns.count(col) mapped_columns.extend([col] * count) return mapped_columns if not clustering_columns: return [] if len(list(clustering_columns)) != len(set(clustering_columns)): raise ValueError("Duplicates are not supported in clustering_columns") all_possible_columns = ( (set(self.columns) | set(self.index.names)) if index else set(self.columns) ) missing_columns = set(clustering_columns) - all_possible_columns if missing_columns: raise ValueError( f"Clustering columns not found in DataFrame: {missing_columns}" ) clustering_columns_for_df = map_columns_on_occurrence( list(self._block.column_labels) ) clustering_columns_for_index = ( map_columns_on_occurrence(list(self.index.names)) if index else [] ) ( clustering_columns_for_df, clustering_columns_for_index, ) = utils.get_standardized_ids( clustering_columns_for_df, clustering_columns_for_index ) return clustering_columns_for_index + clustering_columns_for_df def _prepare_export( self, index: bool, ordering_id: Optional[str] ) -> Tuple[bigframes.core.ArrayValue, Dict[str, str]]: array_value = self._block.expr new_col_labels, new_idx_labels = utils.get_standardized_ids( self._block.column_labels, self._block.index.names ) columns = list(self._block.value_columns) column_labels = new_col_labels # This code drops unnamed indexes to keep consistent with the behavior of # most pandas write APIs. The exception is `pandas.to_csv`, which keeps # unnamed indexes as `Unnamed: 0`. # TODO(chelsealin): check if works for multiple indexes. if index and self.index.name is not None: columns.extend(self._block.index_columns) column_labels.extend(new_idx_labels) else: array_value = array_value.drop_columns(self._block.index_columns) # Make columns in SQL reflect _labels_ not _ids_. Note: This may use # the arbitrary unicode column labels feature in BigQuery, which is # currently (June 2023) in preview. id_overrides = { col_id: col_label for col_id, col_label in zip(columns, column_labels) if (col_id != col_label) } if ordering_id is not None: array_value, internal_ordering_id = array_value.promote_offsets() id_overrides[internal_ordering_id] = ordering_id return array_value, id_overrides
[docs] def map(self, func, na_action: Optional[str] = None) -> DataFrame: if not isinstance(func, bigframes.functions.BigqueryCallableRoutine): raise TypeError("the first argument must be callable") if na_action not in {None, "ignore"}: raise ValueError(f"na_action={na_action} not supported") # TODO(shobs): Support **kwargs return self._apply_unary_op( ops.RemoteFunctionOp( function_def=func.udf_def, apply_on_null=(na_action is None) ) )
[docs] def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): # In Bigframes BigQuery function, DataFrame '.apply' method is specifically # designed to work with row-wise or column-wise operations, where the input # to the applied function should be a Series, not a scalar. if utils.get_axis_number(axis) == 1: msg = bfe.format_message( "DataFrame.apply with parameter axis=1 scenario is in preview." ) warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning) if not isinstance( func, ( bigframes.functions.BigqueryCallableRoutine, bigframes.functions.BigqueryCallableRowRoutine, ), ): raise ValueError( "For axis=1 a BigFrames BigQuery function must be used." ) if func.is_row_processor: # Early check whether the dataframe dtypes are currently supported # in the bigquery function # NOTE: Keep in sync with the value converters used in the gcf code # generated in function_template.py bigquery_function_supported_dtypes = ( bigframes.dtypes.INT_DTYPE, bigframes.dtypes.FLOAT_DTYPE, bigframes.dtypes.BOOL_DTYPE, bigframes.dtypes.BYTES_DTYPE, bigframes.dtypes.STRING_DTYPE, ) supported_dtypes_types = tuple( type(dtype) for dtype in bigquery_function_supported_dtypes if not isinstance(dtype, pandas.ArrowDtype) ) # Check ArrowDtype separately since multiple BigQuery types map to # ArrowDtype, including BYTES and TIMESTAMP. supported_arrow_types = tuple( dtype.pyarrow_dtype for dtype in bigquery_function_supported_dtypes if isinstance(dtype, pandas.ArrowDtype) ) supported_dtypes_hints = tuple( str(dtype) for dtype in bigquery_function_supported_dtypes ) for dtype in self.dtypes: if ( # Not one of the pandas/numpy types. not isinstance(dtype, supported_dtypes_types) # And not one of the arrow types. and not ( isinstance(dtype, pandas.ArrowDtype) and any( dtype.pyarrow_dtype.equals(arrow_type) for arrow_type in supported_arrow_types ) ) ): raise NotImplementedError( f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." f" Supported dtypes are {supported_dtypes_hints}." ) # Serialize the rows as json values block = self._get_block() rows_as_json_series = bigframes.series.Series( block._get_rows_as_json_values() ) # Apply the function if args: result_series = rows_as_json_series._apply_nary_op( ops.NaryRemoteFunctionOp(function_def=func.udf_def), list(args), ) else: result_series = rows_as_json_series._apply_unary_op( ops.RemoteFunctionOp( function_def=func.udf_def, apply_on_null=True ) ) else: # This is a special case where we are providing not-pandas-like # extension. If the bigquery function can take one or more # params (excluding the args) then we assume that here the user # intention is to use the column values of the dataframe as # arguments to the function. For this to work the following # condition must be true: # 1. The number or input params (excluding the args) in the # function must be same as the number of columns in the # dataframe. # 2. The dtypes of the columns in the dataframe must be # compatible with the data types of the input params. # 3. The order of the columns in the dataframe must correspond # to the order of the input params in the function. udf_input_dtypes = func.udf_def.signature.bf_input_types if not args and len(udf_input_dtypes) != len(self.columns): raise ValueError( f"Parameter count mismatch: BigFrames BigQuery function" f" expected {len(udf_input_dtypes)} parameters but" f" received {len(self.columns)} DataFrame columns." ) if args and len(udf_input_dtypes) != len(self.columns) + len(args): raise ValueError( f"Parameter count mismatch: BigFrames BigQuery function" f" expected {len(udf_input_dtypes)} parameters but" f" received {len(self.columns) + len(args)} values" f" ({len(self.columns)} DataFrame columns and" f" {len(args)} args)." ) end_slice = -len(args) if args else None if udf_input_dtypes[:end_slice] != tuple(self.dtypes.to_list()): raise ValueError( f"Data type mismatch for DataFrame columns:" f" Expected {udf_input_dtypes[:end_slice]}" f" Received {tuple(self.dtypes)}." ) if args: bq_types = ( function_typing.sdk_type_from_python_type(type(arg)) for arg in args ) args_dtype = tuple( function_typing.sdk_type_to_bf_type(bq_type) for bq_type in bq_types ) if udf_input_dtypes[end_slice:] != args_dtype: raise ValueError( f"Data type mismatch for 'args' parameter:" f" Expected {udf_input_dtypes[end_slice:]}" f" Received {args_dtype}." ) series_list = [self[col] for col in self.columns] op_list = series_list[1:] + list(args) result_series = series_list[0]._apply_nary_op( ops.NaryRemoteFunctionOp(function_def=func.udf_def), op_list ) result_series.name = None result_series = func._post_process_series(result_series) return result_series # At this point column-wise or element-wise bigquery function operation will # be performed (not supported). if hasattr(func, "bigframes_bigquery_function"): raise formatter.create_exception_with_feedback_link( NotImplementedError, "BigFrames DataFrame '.apply()' does not support BigFrames " "BigQuery function for column-wise (i.e. with axis=0) " "operations, please use a regular python function instead. For " "element-wise operations of the BigFrames BigQuery function, " "please use '.map()'.", ) # Per-column apply results = {name: func(col, *args, **kwargs) for name, col in self.items()} if all( [ isinstance(val, bigframes.series.Series) or utils.is_list_like(val) for val in results.values() ] ): return DataFrame(data=results) else: return pandas.Series(data=results)
[docs] def drop_duplicates( self, subset: typing.Union[blocks.Label, typing.Sequence[blocks.Label]] = None, *, keep: str = "first", ) -> DataFrame: if keep is not False: validations.enforce_ordered(self, "drop_duplicates(keep != False)") if subset is None: column_ids = self._block.value_columns elif utils.is_list_like(subset): column_ids = [ id for label in subset for id in self._block.label_to_col_id[label] ] else: # interpret as single label column_ids = self._block.label_to_col_id[typing.cast(blocks.Label, subset)] block = block_ops.drop_duplicates(self._block, column_ids, keep) return DataFrame(block)
[docs] def duplicated(self, subset=None, keep: str = "first") -> bigframes.series.Series: if keep is not False: validations.enforce_ordered(self, "duplicated(keep != False)") if subset is None: column_ids = self._block.value_columns else: column_ids = [ id for label in subset for id in self._block.label_to_col_id[label] ] block, indicator = block_ops.indicate_duplicates(self._block, column_ids, keep) return bigframes.series.Series( block.select_column( indicator, ) )
[docs] def rank( self, axis=0, method: str = "average", numeric_only=False, na_option: str = "keep", ascending=True, pct: bool = False, ) -> DataFrame: df = self._drop_non_numeric() if numeric_only else self return DataFrame( block_ops.rank(df._block, method, na_option, ascending, pct=pct) )
[docs] def first_valid_index(self): return
applymap = map applymap.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.map) def _slice( self, start: typing.Optional[int] = None, stop: typing.Optional[int] = None, step: typing.Optional[int] = None, ) -> DataFrame: block = self._block.slice( start=start, stop=stop, step=step if (step is not None) else 1 ) return DataFrame(block) def __array_ufunc__( self, ufunc: numpy.ufunc, method: str, *inputs, **kwargs ) -> DataFrame: """Used to support numpy ufuncs. See: https://numpy.org/doc/stable/reference/ufuncs.html """ if method != "__call__" or len(inputs) > 2 or len(kwargs) > 0: return NotImplemented if len(inputs) == 1 and ufunc in ops.NUMPY_TO_OP: return self._apply_unary_op(ops.NUMPY_TO_OP[ufunc]) if len(inputs) == 2 and ufunc in ops.NUMPY_TO_BINOP: binop = ops.NUMPY_TO_BINOP[ufunc] if inputs[0] is self: return self._apply_binop(inputs[1], binop) else: return self._apply_binop(inputs[0], binop, reverse=True) return NotImplemented def _set_block(self, block: blocks.Block): self._block = block def _get_block(self) -> blocks.Block: return self._block
[docs] def cache(self): """ Materializes the DataFrame to a temporary table. Useful if the dataframe will be used multiple times, as this will avoid recomputating the shared intermediate value. Returns: bigframes.pandas.DataFrame: DataFrame """ return self._cached(force=True)
def _cached(self, *, force: bool = False) -> DataFrame: """Materialize dataframe to a temporary table. No-op if the dataframe represents a trivial transformation of an existing materialization. Force=True is used for BQML integration where need to copy data rather than use snapshot. """ if self._disable_cache_override: return self self._block.cached(force=force) return self _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries")
[docs] @validations.requires_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bigframes.series.Series)): raise NotImplementedError( f"Only DataFrame or Series operand is supported. {constants.FEEDBACK_LINK}" ) if len(self.index.names) > 1 or len(other.index.names) > 1: raise NotImplementedError( f"Multi-index input is not supported. {constants.FEEDBACK_LINK}" ) if len(self.columns.names) > 1 or ( isinstance(other, DataFrame) and len(other.columns.names) > 1 ): raise NotImplementedError( f"Multi-level column input is not supported. {constants.FEEDBACK_LINK}" ) # Convert the dataframes into cell-value-decomposed representation, i.e. # each cell value is present in a separate row row_id = "row" col_id = "col" val_id = "val" left_suffix = "_left" right_suffix = "_right" cvd_columns = [row_id, col_id, val_id] def get_left_id(id): return f"{id}{left_suffix}" def get_right_id(id): return f"{id}{right_suffix}" other_frame = other if isinstance(other, DataFrame) else other.to_frame() left = self.stack().reset_index() left.columns = cvd_columns right = other_frame.stack().reset_index() right.columns = cvd_columns merged = left.merge( right, left_on=col_id, right_on=row_id, suffixes=(left_suffix, right_suffix), ) left_row_id = get_left_id(row_id) right_col_id = get_right_id(col_id) aggregated = ( merged.assign( val=merged[get_left_id(val_id)] * merged[get_right_id(val_id)] )[[left_row_id, right_col_id, val_id]] .groupby([left_row_id, right_col_id]) .sum(numeric_only=True) ) aggregated_noindex = aggregated.reset_index() aggregated_noindex.columns = cvd_columns result = aggregated_noindex._pivot( columns=col_id, columns_unique_values=other_frame.columns, index=row_id ) # Set the index names to match the left side matrix result.index.names = self.index.names # Pivot has the result columns ordered alphabetically. It should still # match the columns in the right sided matrix. Let's reorder them as per # the right side matrix if not result.columns.difference(other_frame.columns).empty: raise RuntimeError( f"Could not construct all columns. {constants.FEEDBACK_LINK}" ) result = result[other_frame.columns] if isinstance(other, bigframes.series.Series): # There should be exactly one column in the result result = result[result.columns[0]].rename() return result
@property def plot(self): return plotting.PlotAccessor(self)
[docs] def hist( self, by: typing.Optional[typing.Sequence[str]] = None, bins: int = 10, **kwargs ): return self.plot.hist(by=by, bins=bins, **kwargs)
hist.__doc__ = inspect.getdoc(plotting.PlotAccessor.hist)
[docs] def line( self, x: typing.Optional[typing.Hashable] = None, y: typing.Optional[typing.Hashable] = None, **kwargs, ): return self.plot.line(x=x, y=y, **kwargs)
line.__doc__ = inspect.getdoc(plotting.PlotAccessor.line)
[docs] def area( self, x: typing.Optional[typing.Hashable] = None, y: typing.Optional[typing.Hashable] = None, stacked: bool = True, **kwargs, ): return self.plot.area(x=x, y=y, stacked=stacked, **kwargs)
area.__doc__ = inspect.getdoc(plotting.PlotAccessor.area)
[docs] def bar( self, x: typing.Optional[typing.Hashable] = None, y: typing.Optional[typing.Hashable] = None, **kwargs, ): return self.plot.bar(x=x, y=y, **kwargs)
bar.__doc__ = inspect.getdoc(plotting.PlotAccessor.bar)
[docs] def scatter( self, x: typing.Optional[typing.Hashable] = None, y: typing.Optional[typing.Hashable] = None, s: typing.Union[typing.Hashable, typing.Sequence[typing.Hashable]] = None, c: typing.Union[typing.Hashable, typing.Sequence[typing.Hashable]] = None, **kwargs, ): return self.plot.scatter(x=x, y=y, s=s, c=c, **kwargs)
scatter.__doc__ = inspect.getdoc(plotting.PlotAccessor.scatter) def __matmul__(self, other) -> DataFrame: return self.dot(other) __matmul__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__matmul__) @property def struct(self): return bigframes.operations.structs.StructFrameAccessor(self) def _throw_if_null_index(self, opname: str): if not self._has_index: raise bigframes.exceptions.NullIndexError( f"DataFrame cannot perform {opname} as it has no index. Set an index using set_index." ) @property def semantics(self): msg = bfe.format_message( "The 'semantics' property will be removed. Please use 'bigframes.bigquery.ai' instead." ) warnings.warn(msg, category=FutureWarning) return bigframes.operations.semantics.Semantics(self) @property def ai(self): """Returns the accessor for AI operators.""" msg = bfe.format_message( "The 'ai' property will be removed. Please use 'bigframes.bigquery.ai' instead." ) warnings.warn(msg, category=FutureWarning) return bigframes.operations.ai.AIAccessor(self)