Source code for bigframes.series

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Series is a 1 dimensional data structure."""

from __future__ import annotations

import datetime
import functools
import inspect
import itertools
import numbers
import textwrap
import typing
from typing import (
    Any,
    Callable,
    cast,
    Iterable,
    List,
    Literal,
    Mapping,
    Optional,
    overload,
    Sequence,
    Tuple,
    Union,
)
import warnings

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.series as vendored_pandas_series
import google.cloud.bigquery as bigquery
import numpy
import pandas
from pandas.api import extensions as pd_ext
import pyarrow as pa
import typing_extensions

import bigframes.core
from bigframes.core import agg_expressions, groupby
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
import bigframes.core.expression as ex
import bigframes.core.identifiers as ids
import bigframes.core.indexers
import bigframes.core.indexes as indexes
from bigframes.core.logging import log_adapter
import bigframes.core.ordering as order
import bigframes.core.scalar as scalars
import bigframes.core.utils as utils
import bigframes.core.validations as validations
import bigframes.core.window
from bigframes.core.window import rolling
import bigframes.core.window_spec as windows
import bigframes.dataframe
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.formatting_helpers as formatter
import bigframes.functions
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.operations.blob as blob
import bigframes.operations.datetimes as dt
import bigframes.operations.lists as lists
import bigframes.operations.plotting as plotting
import bigframes.operations.python_op_maps as python_ops
import bigframes.operations.structs as structs
import bigframes.session

if typing.TYPE_CHECKING:
    import bigframes.geopandas.geoseries
    import bigframes.operations.strings as strings


LevelType = typing.Union[str, int]
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]


_bigquery_function_recommendation_message = (
    "Your functions could not be applied directly to the Series."
    " Try converting it to a BigFrames BigQuery function."
)

_list = list  # Type alias to escape Series.list property


[docs] @log_adapter.class_logger class Series(vendored_pandas_series.Series): # Must be above 5000 for pandas to delegate to bigframes for binops __pandas_priority__ = 13000 # Ensure mypy can more robustly determine the type of self._block since it # gets set in various places. _block: blocks.Block
[docs] def __init__( self, data=None, index=None, dtype: Optional[bigframes.dtypes.DtypeString | bigframes.dtypes.Dtype] = None, name: str | None = None, copy: Optional[bool] = None, *, session: Optional[bigframes.session.Session] = None, ): self._query_job: Optional[bigquery.QueryJob] = None import bigframes.pandas # Ignore object dtype if provided, as it provides no additional # information about what BigQuery type to use. if dtype is not None and bigframes.dtypes.is_object_like(dtype): dtype = None read_pandas_func = ( session.read_pandas if (session is not None) else (lambda x: bigframes.pandas.read_pandas(x)) ) block: typing.Optional[blocks.Block] = None if (name is not None) and not isinstance(name, typing.Hashable): raise ValueError( f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}" ) if copy is not None and not copy: raise ValueError( f"Series constructor only supports copy=True. {constants.FEEDBACK_LINK}" ) if isinstance(data, blocks.Block): block = data elif isinstance(data, bigframes.pandas.Series): block = data._get_block() # special case where data is local scalar, but index is bigframes index (maybe very big) elif ( not utils.is_list_like(data) and not isinstance(data, indexes.Index) ) and isinstance(index, indexes.Index): block = index._block block, _ = block.create_constant(data) block = block.with_column_labels([None]) # prevents no-op reindex later index = None elif isinstance(data, indexes.Index) or isinstance(index, indexes.Index): data = indexes.Index(data, dtype=dtype, name=name, session=session) # set to none as it has already been applied, avoid re-cast later if data.nlevels != 1: raise NotImplementedError("Cannot interpret multi-index as Series.") # Reset index to promote index columns to value columns, set default index data_block = data._block.reset_index(drop=False).with_column_labels( data.names ) if index is not None: # Align data and index by offset bf_index = indexes.Index(index, session=session) idx_block = bf_index._block.reset_index( drop=False ) # reset to align by offsets, and then reset back idx_cols = idx_block.value_columns data_block, (l_mapping, _) = idx_block.join(data_block, how="left") data_block = data_block.set_index([l_mapping[col] for col in idx_cols]) data_block = data_block.with_index_labels(bf_index.names) # prevents no-op reindex later index = None block = data_block if block: assert len(block.value_columns) == 1 assert len(block.column_labels) == 1 if index is not None: # reindexing operation bf_index = indexes.Index(index) idx_block = bf_index._block idx_cols = idx_block.index_columns block, _ = idx_block.join(block, how="left") block = block.with_index_labels(bf_index.names) if name: block = block.with_column_labels([name]) if dtype: bf_dtype = bigframes.dtypes.bigframes_type(dtype) block = block.multi_apply_unary_op(ops.AsTypeOp(to_type=bf_dtype)) else: if isinstance(dtype, str) and dtype.lower() == "json": dtype = bigframes.dtypes.JSON_DTYPE pd_series = pandas.Series( data=data, index=index, # type:ignore dtype=dtype, # type:ignore name=name, ) block = read_pandas_func(pd_series)._get_block() # type:ignore assert block is not None self._block: blocks.Block = block self._block.session._register_object(self)
@property def dt(self) -> dt.DatetimeMethods: return dt.DatetimeMethods(self) @property def dtype(self): bigframes.dtypes.warn_on_db_dtypes_json_dtype([self._dtype]) return self._dtype @property def dtypes(self): bigframes.dtypes.warn_on_db_dtypes_json_dtype([self._dtype]) return self._dtype @property def geo(self) -> bigframes.geopandas.geoseries.GeoSeries: """ Accessor object for geography properties of the Series values. Returns: bigframes.geopandas.geoseries.GeoSeries: An accessor containing geography methods. """ import bigframes.geopandas.geoseries return bigframes.geopandas.geoseries.GeoSeries(self) @property @validations.requires_index def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: return bigframes.core.indexers.LocSeriesIndexer(self) @property @validations.requires_ordering() def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: return bigframes.core.indexers.IlocSeriesIndexer(self) @property @validations.requires_ordering() def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) @property @validations.requires_index def at(self) -> bigframes.core.indexers.AtSeriesIndexer: return bigframes.core.indexers.AtSeriesIndexer(self) @property def name(self) -> blocks.Label: return self._name @name.setter def name(self, label: blocks.Label): new_block = self._block.with_column_labels([label]) self._set_block(new_block) @property def shape(self) -> typing.Tuple[int]: return (self._block.shape[0],) @property def size(self) -> int: return self.shape[0] @property def ndim(self) -> int: return 1 @property def empty(self) -> bool: return self.shape[0] == 0 @property def hasnans(self) -> bool: # Note, hasnans is actually a null check, and NaNs don't count for nullable float return self.isnull().any() @property def values(self) -> numpy.ndarray: return self.to_numpy() @property @validations.requires_index def index(self) -> indexes.Index: return indexes.Index.from_frame(self)
[docs] @validations.requires_index def keys(self) -> indexes.Index: return self.index
@property def query_job(self) -> Optional[bigquery.QueryJob]: """BigQuery job metadata for the most recent query. Returns: The most recent `QueryJob <https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_. """ if self._query_job is None: self._set_internal_query_job(self._compute_dry_run()) return self._query_job @property def struct(self) -> structs.StructAccessor: return structs.StructAccessor(self) @property def list(self) -> lists.ListAccessor: return lists.ListAccessor(self) @property def blob(self) -> blob.BlobAccessor: return blob.BlobAccessor(self) @property @validations.requires_ordering() def T(self) -> Series: return self.transpose() @property def _info_axis(self) -> indexes.Index: return self.index @property def _session(self) -> bigframes.Session: return self._get_block().expr.session @property def _struct_fields(self) -> List[str]: if not bigframes.dtypes.is_struct_like(self._dtype): return [] struct_type = typing.cast(pa.StructType, self._dtype.pyarrow_dtype) return [struct_type.field(i).name for i in range(struct_type.num_fields)]
[docs] @validations.requires_ordering() def transpose(self) -> Series: return self
def _set_internal_query_job(self, query_job: Optional[bigquery.QueryJob]): self._query_job = query_job def __len__(self): return self.shape[0] __len__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__len__) def __iter__(self) -> typing.Iterator: return itertools.chain.from_iterable( map(lambda x: x.squeeze(axis=1), self._block.to_pandas_batches()) ) def __contains__(self, key) -> bool: return key in self.index
[docs] def copy(self) -> Series: return Series(self._block)
@overload def rename( self, index: Union[blocks.Label, Mapping[Any, Any]] = None, ) -> Series: ... @overload def rename( self, index: Union[blocks.Label, Mapping[Any, Any]] = None, *, inplace: Literal[False], **kwargs, ) -> Series: ... @overload def rename( self, index: Union[blocks.Label, Mapping[Any, Any]] = None, *, inplace: Literal[True], **kwargs, ) -> None: ...
[docs] def rename( self, index: Union[blocks.Label, Mapping[Any, Any]] = None, *, inplace: bool = False, **kwargs, ) -> Optional[Series]: if len(kwargs) != 0: raise NotImplementedError( f"rename does not currently support any keyword arguments. {constants.FEEDBACK_LINK}" ) # rename the index if isinstance(index, Mapping): index = typing.cast(Mapping[Any, Any], index) block = self._block for k, v in index.items(): new_idx_ids = [] for idx_id, idx_dtype in zip(block.index_columns, block.index.dtypes): # Will throw if key type isn't compatible with index type, which leads to invalid SQL. block.create_constant(k, dtype=idx_dtype) # Will throw if value type isn't compatible with index type. block, const_id = block.create_constant(v, dtype=idx_dtype) block, cond_id = block.project_expr( ops.ne_op.as_expr(idx_id, ex.const(k)) ) block, new_idx_id = block.apply_ternary_op( idx_id, cond_id, const_id, ops.where_op ) new_idx_ids.append(new_idx_id) block = block.drop_columns([const_id, cond_id]) block = block.set_index(new_idx_ids, index_labels=block.index.names) if inplace: self._block = block return None else: return Series(block) # rename the Series name if isinstance(index, typing.Hashable): # Python 3.9 doesn't allow isinstance of Optional index = typing.cast(Optional[str], index) block = self._block.with_column_labels([index]) if inplace: self._block = block return None else: return Series(block) raise ValueError(f"Unsupported type of parameter index: {type(index)}")
@overload def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], ) -> Series: ... @overload def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], *, inplace: Literal[False], **kwargs, ) -> Series: ... @overload def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], *, inplace: Literal[True], **kwargs, ) -> None: ...
[docs] @validations.requires_index def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], *, inplace: bool = False, **kwargs, ) -> Optional[Series]: if len(kwargs) != 0: raise NotImplementedError( f"rename_axis does not currently support any keyword arguments. {constants.FEEDBACK_LINK}" ) # limited implementation: the new index name is simply the 'mapper' parameter if _is_list_like(mapper): labels = mapper else: labels = [mapper] block = self._block.with_index_labels(labels) if inplace: self._block = block return None else: return Series(block)
[docs] def equals( self, other: typing.Union[Series, bigframes.dataframe.DataFrame] ) -> bool: # Must be same object type, same column dtypes, and same label values if not isinstance(other, Series): return False return block_ops.equals(self._block, other._block)
@overload # type: ignore[override] def reset_index( self, level: blocks.LevelsType = ..., *, name: typing.Optional[str] = ..., drop: Literal[False] = ..., inplace: Literal[False] = ..., allow_duplicates: Optional[bool] = ..., ) -> bigframes.dataframe.DataFrame: ... @overload def reset_index( self, level: blocks.LevelsType = ..., *, name: typing.Optional[str] = ..., drop: Literal[True] = ..., inplace: Literal[False] = ..., allow_duplicates: Optional[bool] = ..., ) -> Series: ... @overload def reset_index( self, level: blocks.LevelsType = ..., *, name: typing.Optional[str] = ..., drop: bool = ..., inplace: Literal[True] = ..., allow_duplicates: Optional[bool] = ..., ) -> None: ...
[docs] @validations.requires_ordering() def reset_index( self, level: blocks.LevelsType = None, *, name: typing.Optional[str] = None, drop: bool = False, inplace: bool = False, allow_duplicates: Optional[bool] = None, ) -> bigframes.dataframe.DataFrame | Series | None: if allow_duplicates is None: allow_duplicates = False block = self._block.reset_index(level, drop, allow_duplicates=allow_duplicates) if drop: if inplace: self._set_block(block) return None return Series(block) else: if inplace: raise ValueError( "Series.reset_index cannot combine inplace=True and drop=False" ) if name: block = block.assign_label(self._value_column, name) return bigframes.dataframe.DataFrame(block)
def _repr_mimebundle_(self, include=None, exclude=None): """ Custom display method for IPython/Jupyter environments. This is called by IPython's display system when the object is displayed. """ # TODO(b/467647693): Anywidget integration has been tested in Jupyter, VS Code, and # BQ Studio, but there is a known compatibility issue with Marimo that needs to be addressed. from bigframes.display import html return html.repr_mimebundle(self, include=include, exclude=exclude) def __repr__(self) -> str: # Protect against errors with uninitialized Series. See: # https://github.com/googleapis/python-bigquery-dataframes/issues/728 if not hasattr(self, "_block"): return object.__repr__(self) # TODO(swast): Add a timeout here? If the query is taking a long time, # maybe we just print the job metadata that we have so far? # TODO(swast): Avoid downloading the whole series by using job # metadata, like we do with DataFrame. opts = bigframes.options.display if opts.repr_mode == "deferred": return formatter.repr_query_job(self._compute_dry_run()) self._cached() pandas_df, row_count, query_job = self._block.retrieve_repr_request_results( opts.max_rows ) self._set_internal_query_job(query_job) from bigframes.display import plaintext return plaintext.create_text_representation( pandas_df, row_count, is_series=True, has_index=len(self._block.index_columns) > 0, )
[docs] def astype( self, dtype: Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype], *, errors: Literal["raise", "null"] = "raise", ) -> Series: if errors not in ["raise", "null"]: raise ValueError("Argument 'errors' must be one of 'raise' or 'null'") dtype = bigframes.dtypes.bigframes_type(dtype) return self._apply_unary_op( bigframes.operations.AsTypeOp(to_type=dtype, safe=(errors == "null")) )
[docs] def to_pandas( self, max_download_size: Optional[int] = None, sampling_method: Optional[str] = None, random_state: Optional[int] = None, *, ordered: bool = True, dry_run: bool = False, allow_large_results: Optional[bool] = None, ) -> pandas.Series: """Writes Series to pandas Series. **Examples:** >>> s = bpd.Series([4, 3, 2]) Download the data from BigQuery and convert it into an in-memory pandas Series. >>> s.to_pandas() 0 4 1 3 2 2 dtype: Int64 Estimate job statistics without processing or downloading data by using `dry_run=True`. >>> s.to_pandas(dry_run=True) # doctest: +SKIP columnCount 1 columnDtypes {None: Int64} indexLevel 1 indexDtypes [Int64] projectId bigframes-dev location US jobType QUERY destinationTable {'projectId': 'bigframes-dev', 'datasetId': '_... useLegacySql False referencedTables None totalBytesProcessed 0 cacheHit False statementType SELECT creationTime 2025-04-03 18:54:59.219000+00:00 dtype: object Args: max_download_size (int, default None): .. deprecated:: 2.0.0 ``max_download_size`` parameter is deprecated. Please use ``to_pandas_batches()`` method instead. Download size threshold in MB. If ``max_download_size`` is exceeded when downloading data, the data will be downsampled if ``bigframes.options.sampling.enable_downsampling`` is ``True``, otherwise, an error will be raised. If set to a value other than ``None``, this will supersede the global config. sampling_method (str, default None): .. deprecated:: 2.0.0 ``sampling_method`` parameter is deprecated. Please use ``sample()`` method instead. Downsampling algorithms to be chosen from, the choices are: "head": This algorithm returns a portion of the data from the beginning. It is fast and requires minimal computations to perform the downsampling; "uniform": This algorithm returns uniform random samples of the data. If set to a value other than None, this will supersede the global config. random_state (int, default None): .. deprecated:: 2.0.0 ``random_state`` parameter is deprecated. Please use ``sample()`` method instead. The seed for the uniform downsampling algorithm. If provided, the uniform method may take longer to execute and require more computation. If set to a value other than None, this will supersede the global config. ordered (bool, default True): Determines whether the resulting pandas series will be ordered. In some cases, unordered may result in a faster-executing query. dry_run (bool, default False): If this argument is true, this method will not process the data. Instead, it returns a Pandas Series containing dry run job statistics allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. Returns: pandas.Series: A pandas Series with all rows of this Series if the data_sampling_threshold_mb is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame. If dry_run is set to True, a pandas Series containing dry run statistics will be returned. """ if max_download_size is not None: msg = bfe.format_message( "DEPRECATED: The `max_download_size` parameters for `Series.to_pandas()` " "are deprecated and will be removed soon. Please use `Series.to_pandas_batches()`." ) warnings.warn(msg, category=FutureWarning) if sampling_method is not None or random_state is not None: msg = bfe.format_message( "DEPRECATED: The `sampling_method` and `random_state` parameters for " "`Series.to_pandas()` are deprecated and will be removed soon. " "Please use `Series.sample().to_pandas()` instead for sampling." ) warnings.warn(msg, category=FutureWarning) if dry_run: dry_run_stats, dry_run_job = self._block._compute_dry_run( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, ordered=ordered, ) self._set_internal_query_job(dry_run_job) return dry_run_stats # Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve # Literal[True/False] to bool df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, ordered=ordered, allow_large_results=allow_large_results, ) if query_job: self._set_internal_query_job(query_job) series = df.squeeze(axis=1) series.name = self._name return series
[docs] def to_pandas_batches( self, page_size: Optional[int] = None, max_results: Optional[int] = None, *, allow_large_results: Optional[bool] = None, ) -> Iterable[pandas.Series]: """Stream Series results to an iterable of pandas Series. page_size and max_results determine the size and number of batches, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result **Examples:** >>> s = bpd.Series([4, 3, 2, 2, 3]) Iterate through the results in batches, limiting the total rows yielded across all batches via `max_results`: >>> for s_batch in s.to_pandas_batches(max_results=3): ... print(s_batch) 0 4 1 3 2 2 dtype: Int64 Alternatively, control the approximate size of each batch using `page_size` and fetch batches manually using `next()`: >>> it = s.to_pandas_batches(page_size=2) >>> next(it) 0 4 1 3 dtype: Int64 >>> next(it) 2 2 3 2 dtype: Int64 Args: page_size (int, default None): The maximum number of rows of each batch. Non-positive values are ignored. max_results (int, default None): The maximum total number of rows of all batches. allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. Returns: Iterable[pandas.Series]: An iterable of smaller Series which combine to form the original Series. Results stream from bigquery, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable """ batches = self._block.to_pandas_batches( page_size=page_size, max_results=max_results, allow_large_results=allow_large_results, ) return map(lambda df: cast(pandas.Series, df.squeeze(1)), batches)
def _compute_dry_run(self) -> bigquery.QueryJob: _, query_job = self._block._compute_dry_run((self._value_column,)) return query_job
[docs] def drop( self, labels: typing.Any = None, *, axis: typing.Union[int, str] = 0, index: typing.Any = None, columns: Union[blocks.Label, typing.Iterable[blocks.Label]] = None, level: typing.Optional[LevelType] = None, ) -> Series: if (labels is None) == (index is None): raise ValueError("Must specify exactly one of 'labels' or 'index'") if labels is not None: index = labels # ignore axis, columns params block = self._block level_id = self._resolve_levels(level or 0)[0] if _is_list_like(index): block, inverse_condition_id = block.apply_unary_op( level_id, ops.IsInOp(values=tuple(index), match_nulls=True) ) block, condition_id = block.apply_unary_op( inverse_condition_id, ops.invert_op ) else: block, condition_id = block.project_expr( ops.ne_op.as_expr(level_id, ex.const(index)) ) block = block.filter_by_id(condition_id, keep_null=True) block = block.drop_columns([condition_id]) return Series(block.select_column(self._value_column))
[docs] @validations.requires_index def droplevel(self, level: LevelsType, axis: int | str = 0): resolved_level_ids = self._resolve_levels(level) return Series(self._block.drop_levels(resolved_level_ids))
[docs] @validations.requires_index def swaplevel(self, i: int = -2, j: int = -1): level_i = self._block.index_columns[i] level_j = self._block.index_columns[j] mapping = {level_i: level_j, level_j: level_i} reordering = [ mapping.get(index_id, index_id) for index_id in self._block.index_columns ] return Series(self._block.reorder_levels(reordering))
[docs] @validations.requires_index def reorder_levels(self, order: LevelsType, axis: int | str = 0): resolved_level_ids = self._resolve_levels(order) return Series(self._block.reorder_levels(resolved_level_ids))
def _resolve_levels(self, level: LevelsType) -> typing.Sequence[str]: return self._block.index.resolve_level(level)
[docs] def between(self, left, right, inclusive="both"): if inclusive not in ["both", "neither", "left", "right"]: raise ValueError( "Must set 'inclusive' to one of 'both', 'neither', 'left', or 'right'" ) left_op = ops.ge_op if (inclusive in ["left", "both"]) else ops.gt_op right_op = ops.le_op if (inclusive in ["right", "both"]) else ops.lt_op return self._apply_binary_op(left, left_op).__and__( self._apply_binary_op(right, right_op) )
[docs] def case_when(self, caselist) -> Series: cases = [] for condition, output in itertools.chain(caselist, [(True, self)]): cases.append(condition) cases.append(output) # In pandas, the default value if no case matches is the original value. # This makes it impossible to change the type of the column, but if # the condition is always True, we know it will match and no subsequent # conditions matter (including the fallback to `self`). This break allows # the type to change (see: internal issue 349926559). if condition is True: break return self._apply_nary_op( ops.case_when_op, cases, # Self is already included in "others". ignore_self=True, ).rename(self.name)
[docs] @validations.requires_ordering() def cumsum(self) -> Series: return self._apply_window_op(agg_ops.sum_op, windows.cumulative_rows())
[docs] @validations.requires_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> Series: window = windows.rows(start=None if limit is None else -limit, end=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window)
pad = ffill pad.__doc__ = inspect.getdoc(vendored_pandas_series.Series.ffill)
[docs] @validations.requires_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> Series: window = windows.rows(start=0, end=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window)
[docs] @validations.requires_ordering() def cummax(self) -> Series: return self._apply_window_op(agg_ops.max_op, windows.cumulative_rows())
[docs] @validations.requires_ordering() def cummin(self) -> Series: return self._apply_window_op(agg_ops.min_op, windows.cumulative_rows())
[docs] @validations.requires_ordering() def cumprod(self) -> Series: return self._apply_window_op(agg_ops.product_op, windows.cumulative_rows())
[docs] @validations.requires_ordering() def shift(self, periods: int = 1) -> Series: window_spec = windows.rows() return self._apply_window_op(agg_ops.ShiftOp(periods), window_spec)
[docs] @validations.requires_ordering() def diff(self, periods: int = 1) -> Series: window_spec = windows.rows() return self._apply_window_op(agg_ops.DiffOp(periods), window_spec)
[docs] @validations.requires_ordering() def pct_change(self, periods: int = 1) -> Series: # Future versions of pandas will not perfrom ffill automatically series = self.ffill() return Series(block_ops.pct_change(series._block, periods=periods))
[docs] @validations.requires_ordering() def rank( self, axis=0, method: str = "average", numeric_only=False, na_option: str = "keep", ascending: bool = True, pct: bool = False, ) -> Series: return Series( block_ops.rank(self._block, method, na_option, ascending, pct=pct) )
[docs] def fillna(self, value=None) -> Series: return self._apply_binary_op(value, ops.fillna_op)
[docs] def replace( self, to_replace: typing.Any, value: typing.Any = None, *, regex: bool = False ): if regex: # No-op unless to_replace and series dtype are both string type if not isinstance(to_replace, str) or not isinstance( self.dtype, pandas.StringDtype ): return self return self._regex_replace(to_replace, value) elif utils.is_dict_like(to_replace): return self._mapping_replace(to_replace) # type: ignore elif utils.is_list_like(to_replace): replace_list = to_replace else: # Scalar replace_list = [to_replace] replace_list = [ i for i in replace_list if bigframes.dtypes.is_compatible(i, self.dtype) ] return self._simple_replace(replace_list, value) if replace_list else self
def _regex_replace(self, to_replace: str, value: str): if not bigframes.dtypes.is_dtype(value, self.dtype): raise NotImplementedError( f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}" ) block, result_col = self._block.apply_unary_op( self._value_column, ops.RegexReplaceStrOp(to_replace, value), result_label=self.name, ) return Series(block.select_column(result_col)) def _simple_replace(self, to_replace_list: typing.Sequence, value): result_type = bigframes.dtypes.is_compatible(value, self.dtype) if not result_type: raise NotImplementedError( f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}" ) if result_type != self.dtype: return self.astype(result_type)._simple_replace(to_replace_list, value) block, cond = self._block.apply_unary_op( self._value_column, ops.IsInOp(tuple(to_replace_list)) ) block, result_col = block.project_expr( ops.where_op.as_expr(ex.const(value), cond, self._value_column), self.name ) return Series(block.select_column(result_col)) def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): if not mapping: return self.copy() tuples = [] lcd_types: list[typing.Optional[bigframes.dtypes.Dtype]] = [] for key, value in mapping.items(): lcd_type = bigframes.dtypes.is_compatible(key, self.dtype) if not lcd_type: continue if not bigframes.dtypes.is_dtype(value, self.dtype): raise NotImplementedError( f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}" ) tuples.append((key, value)) lcd_types.append(lcd_type) result_dtype = functools.reduce( lambda t1, t2: bigframes.dtypes.lcd_type(t1, t2) if (t1 and t2) else None, lcd_types, self.dtype, ) if not result_dtype: raise NotImplementedError( f"Cannot replace {self.dtype} elements with incompatible mapping {mapping} as mixed-type columns not supported. {constants.FEEDBACK_LINK}" ) block, result = self._block.apply_unary_op( self._value_column, ops.MapOp(tuple(tuples)) ) replaced = Series(block.select_column(result)) replaced.name = self.name return replaced
[docs] @validations.requires_ordering() @validations.requires_index def interpolate(self, method: str = "linear") -> Series: if method == "pad": return self.ffill() result = block_ops.interpolate(self._block, method) return Series(result)
[docs] def dropna( self, *, axis: int = 0, inplace: bool = False, how: typing.Optional[str] = None, ignore_index: bool = False, ) -> Series: if inplace: raise NotImplementedError("'inplace'=True not supported") result = block_ops.dropna(self._block, [self._value_column], how="any") if ignore_index: result = result.reset_index() return Series(result)
[docs] @validations.requires_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) def head(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[0:n])
[docs] @validations.requires_ordering() def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:])
[docs] def peek( self, n: int = 5, *, force: bool = True, allow_large_results=None ) -> pandas.Series: """ Preview n arbitrary elements from the series without guarantees about row selection or ordering. ``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires full data scanning. Using ``force=True`` will always succeed, but may be perform queries. Query results will be cached so that future steps will benefit from these queries. Args: n (int, default 5): The number of rows to select from the series. Which N rows are returned is non-deterministic. force (bool, default True): If the data cannot be peeked efficiently, the series will instead be fully materialized as part of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError. allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large query results over the default size limit of 10 GB. Returns: pandas.Series: A pandas Series with n rows. Raises: ValueError: If force=False and data cannot be efficiently peeked. """ maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results) if maybe_result is None: if force: self._cached() maybe_result = self._block.try_peek( n, force=True, allow_large_results=allow_large_results ) assert maybe_result is not None else: raise ValueError( "Cannot peek efficiently when data has aggregates, joins or window functions applied. Use force=True to fully compute dataframe." ) as_series = maybe_result.squeeze(axis=1) as_series.name = self.name return as_series
[docs] def item(self): # Docstring is in third_party/bigframes_vendored/pandas/core/series.py return self.peek(2).item()
[docs] def nlargest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": validations.enforce_ordered(self, "nlargest(keep != 'all')") return Series( block_ops.nlargest(self._block, n, [self._value_column], keep=keep) )
[docs] def nsmallest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": validations.enforce_ordered(self, "nsmallest(keep != 'all')") return Series( block_ops.nsmallest(self._block, n, [self._value_column], keep=keep) )
[docs] def isin(self, values) -> "Series": if isinstance(values, Series): return Series(self._block.isin(values._block)) if isinstance(values, indexes.Index): return Series(self._block.isin(values.to_series()._block)) if not _is_list_like(values): raise TypeError( "only list-like objects are allowed to be passed to " f"isin(), you passed a [{type(values).__name__}]" ) return self._apply_unary_op( ops.IsInOp(values=tuple(values), match_nulls=True) ).fillna(value=False)
[docs] def isna(self) -> "Series": return self._apply_unary_op(ops.isnull_op)
isnull = isna isnull.__doc__ = inspect.getdoc(vendored_pandas_series.Series.isna)
[docs] def notna(self) -> "Series": return self._apply_unary_op(ops.notnull_op)
notnull = notna notnull.__doc__ = inspect.getdoc(vendored_pandas_series.Series.notna) def __and__(self, other: bool | int | Series) -> Series: return self._apply_binary_op(other, ops.and_op) __and__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__and__) __rand__ = __and__ def __or__(self, other: bool | int | Series) -> Series: return self._apply_binary_op(other, ops.or_op) __or__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__or__) __ror__ = __or__ def __xor__(self, other: bool | int | Series) -> Series: return self._apply_binary_op(other, ops.xor_op) __or__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__xor__) __rxor__ = __xor__ def __add__(self, other: float | int | pandas.Timedelta | Series) -> Series: return self.add(other) __add__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__add__) def __radd__(self, other: float | int | pandas.Timedelta | Series) -> Series: return self.radd(other) __radd__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__radd__)
[docs] def add(self, other: float | int | pandas.Timedelta | Series) -> Series: return self._apply_binary_op(other, ops.add_op)
[docs] def radd(self, other: float | int | pandas.Timedelta | Series) -> Series: return self._apply_binary_op(other, ops.add_op, reverse=True)
def __sub__(self, other: float | int | Series) -> Series: return self.sub(other) __sub__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__sub__) def __rsub__(self, other: float | int | Series) -> Series: return self.rsub(other) __rsub__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rsub__)
[docs] def sub(self, other) -> Series: return self._apply_binary_op(other, ops.sub_op)
[docs] def rsub(self, other) -> Series: return self._apply_binary_op(other, ops.sub_op, reverse=True)
subtract = sub subtract.__doc__ = inspect.getdoc(vendored_pandas_series.Series.sub) def __mul__(self, other: float | int | Series) -> Series: return self.mul(other) __mul__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__mul__) def __rmul__(self, other: float | int | Series) -> Series: return self.rmul(other) __rmul__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rmul__)
[docs] def mul(self, other: float | int | Series) -> Series: return self._apply_binary_op(other, ops.mul_op)
[docs] def rmul(self, other: float | int | Series) -> Series: return self._apply_binary_op(other, ops.mul_op, reverse=True)
multiply = mul multiply.__doc__ = inspect.getdoc(vendored_pandas_series.Series.mul) def __truediv__(self, other: float | int | pandas.Timedelta | Series) -> Series: return self.truediv(other) __truediv__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__truediv__) def __rtruediv__(self, other: float | int | pandas.Timedelta | Series) -> Series: return self.rtruediv(other) __rtruediv__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rtruediv__)
[docs] def truediv(self, other: float | int | pandas.Timedelta | Series) -> Series: return self._apply_binary_op(other, ops.div_op)
[docs] def rtruediv(self, other: float | int | pandas.Timedelta | Series) -> Series: return self._apply_binary_op(other, ops.div_op, reverse=True)
truediv.__doc__ = inspect.getdoc(vendored_pandas_series.Series.truediv) div = divide = truediv rdiv = rtruediv rdiv.__doc__ = inspect.getdoc(vendored_pandas_series.Series.rtruediv) def __floordiv__(self, other: float | int | pandas.Timedelta | Series) -> Series: return self.floordiv(other) __floordiv__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__floordiv__) def __rfloordiv__(self, other: float | int | pandas.Timedelta | Series) -> Series: return self.rfloordiv(other) __rfloordiv__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rfloordiv__)
[docs] def floordiv(self, other: float | int | pandas.Timedelta | Series) -> Series: return self._apply_binary_op(other, ops.floordiv_op)
[docs] def rfloordiv(self, other: float | int | pandas.Timedelta | Series) -> Series: return self._apply_binary_op(other, ops.floordiv_op, reverse=True)
def __pow__(self, other: float | int | Series) -> Series: return self.pow(other) __pow__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__pow__) def __rpow__(self, other: float | int | Series) -> Series: return self.rpow(other) __rpow__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rpow__)
[docs] def pow(self, other: float | int | Series) -> Series: return self._apply_binary_op(other, ops.pow_op)
[docs] def rpow(self, other: float | int | Series) -> Series: return self._apply_binary_op(other, ops.pow_op, reverse=True)
def __lt__(self, other: float | int | str | Series) -> Series: return self.lt(other) def __le__(self, other: float | int | str | Series) -> Series: return self.le(other)
[docs] def lt(self, other) -> Series: return self._apply_binary_op(other, ops.lt_op)
[docs] def le(self, other) -> Series: return self._apply_binary_op(other, ops.le_op)
def __gt__(self, other: float | int | str | Series) -> Series: return self.gt(other) def __ge__(self, other: float | int | str | Series) -> Series: return self.ge(other)
[docs] def gt(self, other) -> Series: return self._apply_binary_op(other, ops.gt_op)
[docs] def ge(self, other) -> Series: return self._apply_binary_op(other, ops.ge_op)
def __mod__(self, other) -> Series: # type: ignore return self.mod(other) __mod__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__mod__) def __rmod__(self, other) -> Series: # type: ignore return self.rmod(other) __rmod__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rmod__)
[docs] def mod(self, other) -> Series: # type: ignore return self._apply_binary_op(other, ops.mod_op)
[docs] def rmod(self, other) -> Series: # type: ignore return self._apply_binary_op(other, ops.mod_op, reverse=True)
[docs] def divmod(self, other) -> Tuple[Series, Series]: # type: ignore # TODO(huanc): when self and other both has dtype int and other contains zeros, # the output should be dtype float, both floordiv and mod returns dtype int in this case. return (self.floordiv(other), self.mod(other))
[docs] def rdivmod(self, other) -> Tuple[Series, Series]: # type: ignore # TODO(huanc): when self and other both has dtype int and self contains zeros, # the output should be dtype float, both floordiv and mod returns dtype int in this case. return (self.rfloordiv(other), self.rmod(other))
[docs] def dot(self, other): return (self * other).sum()
def __matmul__(self, other): return self.dot(other) __matmul__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__matmul__) def __rmatmul__(self, other): return self.dot(other) __rmatmul__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rmatmul__)
[docs] def combine_first(self, other: Series) -> Series: result = self._apply_binary_op(other, ops.coalesce_op) result.name = self.name return result
[docs] def update(self, other: Union[Series, Sequence, Mapping]) -> None: result = self._apply_binary_op( other, ops.coalesce_op, reverse=True, alignment="left" ) self._set_block(result._get_block())
def __abs__(self) -> Series: return self.abs() __abs__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.abs)
[docs] def abs(self) -> Series: return self._apply_unary_op(ops.abs_op)
[docs] def round(self, decimals=0) -> "Series": return self._apply_binary_op(decimals, ops.round_op)
[docs] def corr(self, other: Series, method="pearson", min_periods=None) -> float: # TODO(tbergeron): Validate early that both are numeric # TODO(tbergeron): Handle partially-numeric columns if method != "pearson": raise NotImplementedError( f"Only Pearson correlation is currently supported. {constants.FEEDBACK_LINK}" ) if min_periods: raise NotImplementedError( f"min_periods not yet supported. {constants.FEEDBACK_LINK}" ) return self._apply_binary_aggregation(other, agg_ops.CorrOp())
[docs] def autocorr(self, lag: int = 1) -> float: return self.corr(self.shift(lag))
[docs] def cov(self, other: Series) -> float: return self._apply_binary_aggregation(other, agg_ops.CovOp())
[docs] def all(self) -> bool: return typing.cast(bool, self._apply_aggregation(agg_ops.all_op))
[docs] def any(self) -> bool: return typing.cast(bool, self._apply_aggregation(agg_ops.any_op))
[docs] def count(self) -> int: return typing.cast(int, self._apply_aggregation(agg_ops.count_op))
[docs] def nunique(self) -> int: return typing.cast(int, self._apply_aggregation(agg_ops.nunique_op))
[docs] def max(self) -> scalars.Scalar: return self._apply_aggregation(agg_ops.max_op)
[docs] def min(self) -> scalars.Scalar: return self._apply_aggregation(agg_ops.min_op)
[docs] def std(self) -> float: return typing.cast(float, self._apply_aggregation(agg_ops.std_op))
[docs] def var(self) -> float: return typing.cast(float, self._apply_aggregation(agg_ops.var_op))
def _central_moment(self, n: int) -> float: """Useful helper for calculating central moment statistics""" # Nth central moment is mean((x-mean(x))^n) # See: https://en.wikipedia.org/wiki/Moment_(mathematics) mean_deltas = self - self.mean() delta_powers = mean_deltas**n return delta_powers.mean()
[docs] def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: if _is_list_like(func): if self.dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: raise NotImplementedError( f"Multiple aggregations only supported on numeric series. {constants.FEEDBACK_LINK}" ) aggregations = [agg_ops.lookup_agg_func(f)[0] for f in func] return Series( self._block.summarize( [self._value_column], aggregations, ) ) else: return self._apply_aggregation(agg_ops.lookup_agg_func(func)[0])
aggregate = agg aggregate.__doc__ = inspect.getdoc(vendored_pandas_series.Series.agg)
[docs] def describe(self) -> Series: from bigframes.pandas.core.methods import describe return cast(Series, describe.describe(self, include="all"))
[docs] def skew(self): count = self.count() if count < 3: return pandas.NA moment3 = self._central_moment(3) moment2 = self.var() * (count - 1) / count # Convert sample var to pop var # See G1 estimator: # https://en.wikipedia.org/wiki/Skewness#Sample_skewness numerator = moment3 denominator = moment2 ** (3 / 2) adjustment = (count * (count - 1)) ** 0.5 / (count - 2) return (numerator / denominator) * adjustment
[docs] def kurt(self): count = self.count() if count < 4: return pandas.NA moment4 = self._central_moment(4) moment2 = self.var() * (count - 1) / count # Convert sample var to pop var # Kurtosis is often defined as the second standardize moment: moment(4)/moment(2)**2 # Pandas however uses Fisher’s estimator, implemented below numerator = (count + 1) * (count - 1) * moment4 denominator = (count - 2) * (count - 3) * moment2**2 adjustment = 3 * (count - 1) ** 2 / ((count - 2) * (count - 3)) return (numerator / denominator) - adjustment
kurtosis = kurt kurtosis.__doc__ = inspect.getdoc(vendored_pandas_series.Series.kurt)
[docs] def mode(self) -> Series: block = self._block # Approach: Count each value, return each value for which count(x) == max(counts)) block = block.aggregate( by_column_ids=[self._value_column], aggregations=( agg_expressions.UnaryAggregation( agg_ops.count_op, ex.deref(self._value_column) ), ), ) value_count_col_id = block.value_columns[0] block, max_value_count_col_id = block.apply_window_op( value_count_col_id, agg_ops.max_op, window_spec=windows.unbound(), ) block, is_mode_col_id = block.apply_binary_op( value_count_col_id, max_value_count_col_id, ops.eq_op, ) block = block.filter_by_id(is_mode_col_id) # use temporary name for reset_index to avoid collision, restore after dropping extra columns block = ( block.with_index_labels(["mode_temp_internal"]) .order_by([order.ascending_over(self._value_column)]) .reset_index(drop=False) ) block = block.select_column(self._value_column).with_column_labels([self.name]) mode_values_series = Series(block.select_column(self._value_column)) return typing.cast(Series, mode_values_series)
[docs] def mean(self) -> float: return typing.cast(float, self._apply_aggregation(agg_ops.mean_op))
[docs] def median(self, *, exact: bool = True) -> float: if exact: return typing.cast(float, self.quantile(0.5)) else: return typing.cast(float, self._apply_aggregation(agg_ops.median_op))
[docs] def quantile(self, q: Union[float, Sequence[float]] = 0.5) -> Union[Series, float]: qs = tuple(q) if utils.is_list_like(q) else (q,) result = block_ops.quantile(self._block, (self._value_column,), qs=qs) if utils.is_list_like(q): # Drop the first level, since only one column result = result.with_column_labels(result.column_labels.droplevel(0)) result, index_col = result.create_constant(self.name, None) result = result.set_index([index_col]) return Series( result.transpose(original_row_index=pandas.Index([self.name])) ) else: return cast(float, Series(result).to_pandas().squeeze())
[docs] def sum(self) -> float: return typing.cast(float, self._apply_aggregation(agg_ops.sum_op))
[docs] def prod(self) -> float: return typing.cast(float, self._apply_aggregation(agg_ops.product_op))
product = prod product.__doc__ = inspect.getdoc(vendored_pandas_series.Series.prod) def __eq__(self, other: object) -> Series: # type: ignore return self.eq(other) def __ne__(self, other: object) -> Series: # type: ignore return self.ne(other) def __invert__(self) -> Series: return self._apply_unary_op(ops.invert_op) __invert__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__invert__) def __pos__(self) -> Series: return self._apply_unary_op(ops.pos_op) def __neg__(self) -> Series: return self._apply_unary_op(ops.neg_op) def __dir__(self) -> List[str]: return dir(type(self)) + self._struct_fields
[docs] def eq(self, other: object) -> Series: # TODO: enforce stricter alignment return self._apply_binary_op(other, ops.eq_op)
[docs] def ne(self, other: object) -> Series: # TODO: enforce stricter alignment return self._apply_binary_op(other, ops.ne_op)
[docs] def items(self): for batch_df in self._block.to_pandas_batches(): assert ( batch_df.shape[1] == 1 ), f"Expected 1 column in the dataframe, but got {batch_df.shape[1]}." for item in batch_df.squeeze(axis=1).items(): yield item
def _apply_callable(self, condition): """ "Executes the possible callable condition as needed.""" if callable(condition): # When it's a bigframes function. if hasattr(condition, "bigframes_bigquery_function"): return self.apply(condition) # When it's a plain Python function. else: return self.apply(condition, by_row=False) # When it's not a callable. return condition
[docs] def where(self, cond, other=None): cond = self._apply_callable(cond) other = self._apply_callable(other) value_id, cond_id, other_id, block = self._align3(cond, other) block, result_id = block.project_expr( ops.where_op.as_expr(value_id, cond_id, other_id) ) return Series(block.select_column(result_id).with_column_labels([self.name]))
[docs] def clip(self, lower=None, upper=None): if lower is None and upper is None: return self if lower is None: return self._apply_binary_op(upper, ops.minimum_op, alignment="left") if upper is None: return self._apply_binary_op(lower, ops.maximum_op, alignment="left") # special rule to coerce scalar string args to date value_id, lower_id, upper_id, block = self._align3( lower, upper, cast_scalars=(bigframes.dtypes.is_date_like(self.dtype)) ) block, result_id = block.project_expr( ops.clip_op.as_expr(value_id, lower_id, upper_id), ) return Series(block.select_column(result_id).with_column_labels([self.name]))
[docs] @validations.requires_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( [ order.descending_over(self._value_column), order.ascending_over(row_nums), ] ) return typing.cast( scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] )
[docs] @validations.requires_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( [ order.ascending_over(self._value_column), order.ascending_over(row_nums), ] ) return typing.cast( scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] )
[docs] @validations.requires_index def unstack(self, level: LevelsType = -1): if isinstance(level, int) or isinstance(level, str): level = [level] block = self._block if self.index.nlevels == 1: raise ValueError("Series must have multi-index to unstack") # Pivot by index levels unstack_ids = self._resolve_levels(level) block = block.reset_index(drop=False) block = block.set_index( [col for col in self._block.index_columns if col not in unstack_ids] ) pivot_block = block.pivot( columns=unstack_ids, values=self._block.value_columns, values_in_index=False, ) return bigframes.dataframe.DataFrame(pivot_block)
[docs] @validations.requires_index def idxmax(self) -> blocks.Label: block = self._block.order_by( [ order.descending_over(self._value_column), *[ order.ascending_over(idx_col) for idx_col in self._block.index_columns ], ] ) block = block.slice(0, 1) return indexes.Index(block).to_pandas()[0]
[docs] @validations.requires_index def idxmin(self) -> blocks.Label: block = self._block.order_by( [ order.ascending_over(self._value_column), *[ order.ascending_over(idx_col) for idx_col in self._block.index_columns ], ] ) block = block.slice(0, 1) return indexes.Index(block).to_pandas()[0]
@property @validations.requires_ordering() def is_monotonic_increasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_increasing(self._value_column) ) @property @validations.requires_ordering() def is_monotonic_decreasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_decreasing(self._value_column) ) def __getitem__(self, indexer): # TODO: enforce stricter alignment, should fail if indexer is missing any keys. use_iloc = ( isinstance(indexer, slice) and all( isinstance(x, numbers.Integral) or (x is None) for x in [indexer.start, indexer.stop, indexer.step] ) ) or ( isinstance(indexer, numbers.Integral) and not isinstance(self._block.index.dtypes[0], pandas.Int64Dtype) ) if use_iloc: return self.iloc[indexer] if isinstance(indexer, Series): (left, right, block) = self._align(indexer, "left") block = block.filter(right) block = block.select_column(left.id.name) return Series(block) return self.loc[indexer] __getitem__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__getitem__) def __getattr__(self, key: str): # Protect against recursion errors with uninitialized Series objects. # We use "_block" attribute to check whether the instance is initialized. # See: # https://github.com/googleapis/python-bigquery-dataframes/issues/728 # and # https://nedbatchelder.com/blog/201010/surprising_getattr_recursion.html if key == "_block": raise AttributeError(key) elif hasattr(pandas.Series, key): log_adapter.submit_pandas_labels( self._block.session.bqclient, self.__class__.__name__, key ) raise AttributeError( textwrap.dedent( f""" BigQuery DataFrames has not yet implemented an equivalent to 'pandas.Series.{key}'. {constants.FEEDBACK_LINK} """ ) ) elif key in self._struct_fields: return self.struct.field(key) else: raise AttributeError(key) def __setitem__(self, key, value) -> None: """Set item using direct assignment, delegating to .loc indexer.""" self.loc[key] = value def _apply_aggregation( self, op: agg_ops.UnaryAggregateOp | agg_ops.NullaryAggregateOp ) -> Any: return self._block.get_stat(self._value_column, op) def _apply_window_op( self, op: agg_ops.UnaryWindowOp, window_spec: windows.WindowSpec ): block = self._block block, result_id = block.apply_window_op( self._value_column, op, window_spec=window_spec, result_label=self.name ) result = Series(block.select_column(result_id)) if op.skips_nulls: return result.where(self.notna(), None) else: return result
[docs] def value_counts( self, normalize: bool = False, sort: bool = True, ascending: bool = False, *, dropna: bool = True, ): block = block_ops.value_counts( self._block, [self._value_column], normalize=normalize, ascending=ascending, drop_na=dropna, ) return Series(block)
@typing.overload # type: ignore[override] def sort_values( self, *, axis=..., inplace: Literal[True] = ..., ascending: bool | typing.Sequence[bool] = ..., kind: str = ..., na_position: typing.Literal["first", "last"] = ..., ) -> None: ... @typing.overload def sort_values( self, *, axis=..., inplace: Literal[False] = ..., ascending: bool | typing.Sequence[bool] = ..., kind: str = ..., na_position: typing.Literal["first", "last"] = ..., ) -> Series: ...
[docs] def sort_values( self, *, axis=0, inplace: bool = False, ascending=True, kind: str = "quicksort", na_position: typing.Literal["first", "last"] = "last", ) -> Optional[Series]: if axis != 0 and axis != "index": raise ValueError(f"No axis named {axis} for object type Series") if na_position not in ["first", "last"]: raise ValueError("Param na_position must be one of 'first' or 'last'") block = self._block.order_by( [ order.ascending_over(self._value_column, (na_position == "last")) if ascending else order.descending_over(self._value_column, (na_position == "last")) ], ) if inplace: self._set_block(block) return None else: return Series(block)
@typing.overload # type: ignore[override] def sort_index( self, *, axis=..., inplace: Literal[False] = ..., ascending=..., na_position=... ) -> Series: ... @typing.overload def sort_index( self, *, axis=0, inplace: Literal[True] = ..., ascending=..., na_position=... ) -> None: ...
[docs] @validations.requires_index def sort_index( self, *, axis=0, inplace: bool = False, ascending=True, na_position="last" ) -> Optional[Series]: # TODO(tbergeron): Support level parameter once multi-index introduced. if axis != 0 and axis != "index": raise ValueError(f"No axis named {axis} for object type Series") if na_position not in ["first", "last"]: raise ValueError("Param na_position must be one of 'first' or 'last'") block = self._block na_last = na_position == "last" ordering = [ order.ascending_over(column, na_last) if ascending else order.descending_over(column, na_last) for column in block.index_columns ] block = block.order_by(ordering) if inplace: self._set_block(block) return None else: return Series(block)
[docs] @validations.requires_ordering() def rolling( self, window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, min_periods: int | None = None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> bigframes.core.window.Window: if isinstance(window, int): # Rows rolling window_spec = windows.WindowSpec( bounds=windows.RowsWindowBounds.from_window_size(window, closed), min_periods=window if min_periods is None else min_periods, ) return bigframes.core.window.Window( self._block, window_spec, self._block.value_columns, is_series=True ) return rolling.create_range_window( block=self._block, window=window, min_periods=min_periods, closed=closed, is_series=True, )
[docs] @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window_spec = windows.cumulative_rows(min_periods=min_periods) return bigframes.core.window.Window( self._block, window_spec, self._block.value_columns, is_series=True )
[docs] def groupby( self, by: typing.Union[ blocks.Label, Series, typing.Sequence[typing.Union[blocks.Label, Series]] ] = None, axis=0, level: typing.Optional[ int | str | typing.Sequence[int] | typing.Sequence[str] ] = None, as_index: bool = True, *, dropna: bool = True, ) -> bigframes.core.groupby.SeriesGroupBy: if (by is not None) and (level is not None): raise ValueError("Do not specify both 'by' and 'level'") if not as_index: raise ValueError("as_index=False only valid with DataFrame") if axis: raise ValueError("No axis named {} for object type Series".format(level)) if not as_index: raise ValueError("'as_index'=False only applies to DataFrame") if by is not None: return self._groupby_values(by, dropna) if level is not None: return self._groupby_level(level, dropna) else: raise TypeError("You have to supply one of 'by' and 'level'")
@validations.requires_index def _groupby_level( self, level: int | str | typing.Sequence[int] | typing.Sequence[str], dropna: bool = True, ) -> bigframes.core.groupby.SeriesGroupBy: if utils.is_list_like(level): by_key_is_singular = False else: by_key_is_singular = True return groupby.SeriesGroupBy( self._block, self._value_column, by_col_ids=self._resolve_levels(level), value_name=self.name, dropna=dropna, by_key_is_singular=by_key_is_singular, ) def _groupby_values( self, by: typing.Union[ blocks.Label, Series, typing.Sequence[typing.Union[blocks.Label, Series]] ], dropna: bool = True, ) -> bigframes.core.groupby.SeriesGroupBy: if not isinstance(by, Series) and _is_list_like(by): by = list(by) by_key_is_singular = False else: by = [typing.cast(typing.Union[blocks.Label, Series], by)] by_key_is_singular = True block = self._block grouping_cols: typing.Sequence[str] = [] value_col = self._value_column for key in by: if isinstance(key, Series): block, ( get_column_left, get_column_right, ) = block.join(key._block, how="inner" if dropna else "left") value_col = get_column_left[value_col] grouping_cols = [ *[get_column_left[value] for value in grouping_cols], get_column_right[key._value_column], ] else: # Interpret as index level matches = block.index_name_to_col_id.get(key, []) if len(matches) != 1: raise ValueError( f"GroupBy key {key} does not match a unique index level. BigQuery DataFrames only interprets lists of strings as index level names, not directly as per-row group assignments." ) grouping_cols = [*grouping_cols, matches[0]] return groupby.SeriesGroupBy( block, value_col, by_col_ids=grouping_cols, value_name=self.name, dropna=dropna, by_key_is_singular=by_key_is_singular, )
[docs] def apply( self, func, by_row: typing.Union[typing.Literal["compat"], bool] = "compat", *, args: typing.Tuple = (), ) -> Series: # Note: This signature differs from pandas.Series.apply. Specifically, # `args` is keyword-only and `by_row` is a custom parameter here. Full # alignment would involve breaking changes. However, given that by_row # is not frequently used, we defer any such changes until there is a # clear need based on user feedback. # # See pandas docs for reference: # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.apply.html # TODO(shobs, b/274645634): Support convert_dtype, **kwargs # is actually a ternary op if by_row not in ["compat", False]: raise ValueError("Param by_row must be one of 'compat' or False") if not callable(func) and not isinstance(func, numpy.ufunc): raise ValueError( "Only a ufunc (a function that applies to the entire Series) or" " a BigFrames BigQuery function that only works on single values" " are supported." ) if isinstance(func, bigframes.functions.BigqueryCallableRoutine): # We are working with bigquery function at this point if args: result_series = self._apply_nary_op( ops.NaryRemoteFunctionOp(function_def=func.udf_def), args ) # TODO(jialuo): Investigate why `_apply_nary_op` drops the series # `name`. Manually reassigning it here as a temporary fix. result_series.name = self.name else: result_series = self._apply_unary_op( ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True) ) result_series = func._post_process_series(result_series) return result_series bf_op = python_ops.python_callable_to_op(func) if bf_op and isinstance(bf_op, ops.UnaryOp): return self._apply_unary_op(bf_op) # It is neither a remote function nor a managed function. # Then it must be a vectorized function that applies to the Series # as a whole. if by_row: raise ValueError( "You have passed a function as-is. If your intention is to " "apply this function in a vectorized way (i.e. to the " "entire Series as a whole, and you are sure that it " "performs only the operations that are implemented for a " "Series (e.g. a chain of arithmetic/logical operations, " "such as `def foo(s): return s % 2 == 1`), please also " "specify `by_row=False`. If your function contains " "arbitrary code, it can only be applied to every element " "in the Series individually, in which case you must " "convert it to a BigFrames BigQuery function using " "`bigframes.pandas.udf`, " "or `bigframes.pandas.remote_function` before passing." ) try: return func(self) # type: ignore except Exception as ex: # This could happen if any of the operators in func is not # supported on a Series. Let's guide the customer to use a # bigquery function instead if hasattr(ex, "message"): ex.message += f"\n{_bigquery_function_recommendation_message}" raise
[docs] def combine( self, other, func, ) -> Series: if not callable(func) and not isinstance(func, numpy.ufunc): raise ValueError( "Only a ufunc (a function that applies to the entire Series) or" " a BigFrames BigQuery function that only works on single values" " are supported." ) if isinstance(func, bigframes.functions.BigqueryCallableRoutine): result_series = self._apply_binary_op( other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def) ) result_series = func._post_process_series(result_series) return result_series bf_op = python_ops.python_callable_to_op(func) if bf_op and isinstance(bf_op, ops.BinaryOp): result_series = self._apply_binary_op(other, bf_op) return result_series # Keep this in sync with .apply try: return func(self, other) except Exception as ex: # This could happen if any of the operators in func is not # supported on a Series. Let's guide the customer to use a # bigquery function instead if hasattr(ex, "message"): ex.message += f"\n{_bigquery_function_recommendation_message}" raise
[docs] @validations.requires_index def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series: return Series(self._get_block().add_prefix(prefix))
[docs] @validations.requires_index def add_suffix(self, suffix: str, axis: int | str | None = None) -> Series: return Series(self._get_block().add_suffix(suffix))
[docs] def take( self, indices: typing.Sequence[int], axis: int | str | None = 0, **kwargs ) -> Series: if not utils.is_list_like(indices): raise ValueError("indices should be a list-like object.") return typing.cast(Series, self.iloc[indices])
[docs] def filter( self, items: typing.Optional[typing.Iterable] = None, like: typing.Optional[str] = None, regex: typing.Optional[str] = None, axis: typing.Optional[typing.Union[str, int]] = None, ) -> Series: if (axis is not None) and utils.get_axis_number(axis) != 0: raise ValueError(f"Invalid axis for series: {axis}") if sum([(items is not None), (like is not None), (regex is not None)]) != 1: raise ValueError( "Need to provide exactly one of 'items', 'like', or 'regex'" ) if len(self._block.index_columns) > 1: raise NotImplementedError( f"Method filter does not support rows multiindex. {constants.FEEDBACK_LINK}" ) if (like is not None) or (regex is not None): block = self._block block, label_string_id = block.apply_unary_op( self._block.index_columns[0], ops.AsTypeOp(to_type=pandas.StringDtype(storage="pyarrow")), ) if like is not None: block, mask_id = block.apply_unary_op( label_string_id, ops.StrContainsOp(pat=like) ) else: # regex assert regex is not None block, mask_id = block.apply_unary_op( label_string_id, ops.StrContainsRegexOp(pat=regex) ) block = block.filter_by_id(mask_id) block = block.select_columns([self._value_column]) return Series(block) elif items is not None: # Behavior matches pandas 2.1+, older pandas versions would reindex block = self._block block, mask_id = block.apply_unary_op( self._block.index_columns[0], ops.IsInOp(values=tuple(items)) ) block = block.filter_by_id(mask_id) block = block.select_columns([self._value_column]) return Series(block) else: raise ValueError("Need to provide 'items', 'like', or 'regex'")
[docs] @validations.requires_index def reindex(self, index=None, *, validate: typing.Optional[bool] = None): if validate and not self.index.is_unique: raise ValueError("Original index must be unique to reindex") keep_original_names = False if isinstance(index, indexes.Index): new_indexer = bigframes.dataframe.DataFrame(data=index._block)[[]] else: if not isinstance(index, pandas.Index): keep_original_names = True index = pandas.Index(index) if index.nlevels != self.index.nlevels: raise NotImplementedError( "Cannot reindex with index with different nlevels" ) new_indexer = bigframes.dataframe.DataFrame( index=index, session=self._get_block().expr.session )[[]] # multiindex join is senstive to index names, so we will set all these result = new_indexer.rename_axis(range(new_indexer.index.nlevels)).join( self.to_frame().rename_axis(range(self.index.nlevels)), how="left", ) # and then reset the names after the join result_block = result.rename_axis( self.index.names if keep_original_names else index.names )._block return Series(result_block)
[docs] @validations.requires_index def reindex_like(self, other: Series, *, validate: typing.Optional[bool] = None): return self.reindex(other.index, validate=validate)
[docs] def drop_duplicates(self, *, keep: str = "first") -> Series: block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block)
[docs] def unique(self, keep_order=True) -> Series: if keep_order: validations.enforce_ordered(self, "unique(keep_order != False)") return self.drop_duplicates() block = self._block.aggregate( [ agg_expressions.UnaryAggregation( agg_ops.AnyValueOp(), ex.deref(self._value_column) ) ], [self._value_column], column_labels=self._block.column_labels, dropna=False, ) return Series(block.reset_index())
[docs] def duplicated(self, keep: str = "first") -> Series: block, indicator = block_ops.indicate_duplicates( self._block, (self._value_column,), keep ) return Series( block.select_column( indicator, ).with_column_labels([self.name]) )
[docs] def mask(self, cond, other=None) -> Series: cond = self._apply_callable(cond) other = self._apply_callable(other) if not isinstance(cond, Series): raise TypeError( f"Only bigframes series condition is supported, received {type(cond).__name__}. " f"{constants.FEEDBACK_LINK}" ) return self.where(~cond, other)
[docs] def to_frame(self, name: blocks.Label = None) -> bigframes.dataframe.DataFrame: provided_name = name if name else self.name # To be consistent with Pandas, it assigns 0 as the column name if missing. 0 is the first element of RangeIndex. block = self._block.with_column_labels( [provided_name] if provided_name else [0] ) return bigframes.dataframe.DataFrame(block)
[docs] def to_csv( self, path_or_buf=None, sep=",", *, header: bool = True, index: bool = True, allow_large_results: Optional[bool] = None, ) -> Optional[str]: if utils.is_gcs_path(path_or_buf): return self.to_frame().to_csv( path_or_buf, sep=sep, header=header, index=index, allow_large_results=allow_large_results, ) else: pd_series = self.to_pandas(allow_large_results=allow_large_results) return pd_series.to_csv( path_or_buf=path_or_buf, sep=sep, header=header, index=index )
[docs] def to_dict( self, into: type[dict] = dict, *, allow_large_results: Optional[bool] = None, ) -> typing.Mapping: return typing.cast(dict, self.to_pandas(allow_large_results=allow_large_results).to_dict(into=into)) # type: ignore
[docs] def to_excel( self, excel_writer, sheet_name="Sheet1", *, allow_large_results=None, **kwargs ) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_excel( excel_writer, sheet_name, **kwargs )
[docs] def to_json( self, path_or_buf=None, orient: Optional[ typing.Literal["split", "records", "index", "columns", "values", "table"] ] = None, *, lines: bool = False, index: bool = True, allow_large_results: Optional[bool] = None, ) -> Optional[str]: if utils.is_gcs_path(path_or_buf): return self.to_frame().to_json( path_or_buf=path_or_buf, orient=orient, lines=lines, index=index, allow_large_results=allow_large_results, ) else: pd_series = self.to_pandas(allow_large_results=allow_large_results) return pd_series.to_json( path_or_buf=path_or_buf, orient=orient, lines=lines, index=index # type: ignore )
[docs] def to_latex( self, buf=None, columns=None, header=True, index=True, *, allow_large_results=None, **kwargs, ) -> typing.Optional[str]: return self.to_pandas(allow_large_results=allow_large_results).to_latex( buf, columns=columns, header=header, index=index, **kwargs )
[docs] def tolist( self, *, allow_large_results: Optional[bool] = None, ) -> _list: return self.to_pandas(allow_large_results=allow_large_results).to_list()
to_list = tolist to_list.__doc__ = inspect.getdoc(vendored_pandas_series.Series.tolist)
[docs] def to_markdown( self, buf: typing.IO[str] | None = None, mode: str = "wt", index: bool = True, *, allow_large_results: Optional[bool] = None, **kwargs, ) -> typing.Optional[str]: return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore
[docs] def to_numpy( self, dtype=None, copy=False, na_value=pd_ext.no_default, *, allow_large_results=None, **kwargs, ) -> numpy.ndarray: return self.to_pandas(allow_large_results=allow_large_results).to_numpy( dtype, copy, na_value, **kwargs )
def __array__(self, dtype=None, copy: Optional[bool] = None) -> numpy.ndarray: if copy is False: raise ValueError("Cannot convert to array without copy.") return self.to_numpy(dtype=dtype) __array__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__array__)
[docs] def to_pickle(self, path, *, allow_large_results=None, **kwargs) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_pickle( path, **kwargs )
[docs] def to_string( self, buf=None, na_rep="NaN", float_format=None, header=True, index=True, length=False, dtype=False, name=False, max_rows=None, min_rows=None, *, allow_large_results=None, ) -> typing.Optional[str]: return self.to_pandas(allow_large_results=allow_large_results).to_string( buf, na_rep, float_format, header, index, length, dtype, name, max_rows, min_rows, )
[docs] def to_xarray( self, *, allow_large_results: Optional[bool] = None, ): return self.to_pandas(allow_large_results=allow_large_results).to_xarray()
def _throw_if_index_contains_duplicates( self, error_message: typing.Optional[str] = None ) -> None: if not self.index.is_unique: error_message = ( error_message if error_message else "Index contains duplicate entries, but uniqueness is required." ) raise pandas.errors.InvalidIndexError(error_message)
[docs] def map( self, arg: typing.Union[Mapping, Series, Callable], na_action: Optional[str] = None, *, verify_integrity: bool = False, ) -> Series: if na_action: raise NotImplementedError( f"Non-None na_action argument is not yet supported for Series.map. {constants.FEEDBACK_LINK}" ) if isinstance(arg, Series): if verify_integrity: error_message = "When verify_integrity is True in Series.map, index of arg parameter must not have duplicate entries." arg._throw_if_index_contains_duplicates(error_message=error_message) map_df = bigframes.dataframe.DataFrame(arg._block) map_df = map_df.rename(columns={arg.name: self.name}) elif isinstance(arg, Mapping): map_df = bigframes.dataframe.DataFrame( {"keys": list(arg.keys()), self.name: list(arg.values())}, # type: ignore session=self._get_block().expr.session, ) map_df = map_df.set_index("keys") elif callable(arg): # This is for remote function and managed funtion. return self.apply(arg) else: # Mirroring pandas, call the uncallable object arg() # throws TypeError: object is not callable self_df = self.to_frame(name="series") result_df = self_df.join(map_df, on="series") return result_df[self.name]
[docs] @validations.requires_ordering() def sample( self, n: Optional[int] = None, frac: Optional[float] = None, *, random_state: Optional[int] = None, sort: Optional[bool | Literal["random"]] = "random", ) -> Series: if n is not None and frac is not None: raise ValueError("Only one of 'n' or 'frac' parameter can be specified.") ns = (n,) if n is not None else () fracs = (frac,) if frac is not None else () return Series( self._block.split(ns=ns, fracs=fracs, random_state=random_state, sort=sort)[ 0 ] )
[docs] def explode(self, *, ignore_index: Optional[bool] = False) -> Series: return Series( self._block.explode( column_ids=[self._value_column], ignore_index=ignore_index ) )
[docs] @validations.requires_ordering() def resample( self, rule: str, *, closed: Optional[Literal["right", "left"]] = None, label: Optional[Literal["right", "left"]] = None, level: Optional[LevelsType] = None, origin: Union[ Union[ pandas.Timestamp, datetime.datetime, numpy.datetime64, int, float, str ], Literal["epoch", "start", "start_day", "end", "end_day"], ] = "start_day", ) -> bigframes.core.groupby.SeriesGroupBy: block = self._block._generate_resample_label( rule=rule, closed=closed, label=label, on=None, level=level, origin=origin, ) series = Series(block) return series.groupby(level=0)
def __array_ufunc__( self, ufunc: numpy.ufunc, method: str, *inputs, **kwargs ) -> Series: """Used to support numpy ufuncs. See: https://numpy.org/doc/stable/reference/ufuncs.html """ # Only __call__ supported with zero arguments if method != "__call__" or len(inputs) > 2 or len(kwargs) > 0: return NotImplemented if len(inputs) == 1 and ufunc in ops.NUMPY_TO_OP: return self._apply_unary_op(ops.NUMPY_TO_OP[ufunc]) if len(inputs) == 2 and ufunc in ops.NUMPY_TO_BINOP: binop = ops.NUMPY_TO_BINOP[ufunc] if inputs[0] is self: return self._apply_binary_op(inputs[1], binop) else: return self._apply_binary_op(inputs[0], binop, reverse=True) return NotImplemented @property def plot(self): return plotting.PlotAccessor(self)
[docs] def hist( self, by: typing.Optional[typing.Sequence[str]] = None, bins: int = 10, **kwargs ): return self.plot.hist(by=by, bins=bins, **kwargs)
hist.__doc__ = inspect.getdoc(plotting.PlotAccessor.hist)
[docs] def line( self, x: typing.Optional[typing.Hashable] = None, y: typing.Optional[typing.Hashable] = None, **kwargs, ): return self.plot.line(x=x, y=y, **kwargs)
line.__doc__ = inspect.getdoc(plotting.PlotAccessor.line)
[docs] def area( self, x: typing.Optional[typing.Hashable] = None, y: typing.Optional[typing.Hashable] = None, stacked: bool = True, **kwargs, ): return self.plot.area(x=x, y=y, stacked=stacked, **kwargs)
area.__doc__ = inspect.getdoc(plotting.PlotAccessor.area)
[docs] def bar( self, x: typing.Optional[typing.Hashable] = None, y: typing.Optional[typing.Hashable] = None, **kwargs, ): return self.plot.bar(x=x, y=y, **kwargs)
bar.__doc__ = inspect.getdoc(plotting.PlotAccessor.bar) def _slice( self, start: typing.Optional[int] = None, stop: typing.Optional[int] = None, step: typing.Optional[int] = None, ) -> Series: return Series( self._block.slice( start=start, stop=stop, step=step if (step is not None) else 1 ).select_column(self._value_column), )
[docs] def cache(self): """ Materializes the Series to a temporary table. Useful if the series will be used multiple times, as this will avoid recomputating the shared intermediate value. Returns: Series: Self """ # Do not use session-aware cashing if user-requested return self._cached(force=True, session_aware=False)
def _cached(self, *, force: bool = True, session_aware: bool = True) -> Series: self._block.cached(force=force, session_aware=session_aware) return self # Keep this at the bottom of the Series class to avoid # confusing type checker by overriding str @property def str(self) -> strings.StringMethods: import bigframes.operations.strings as strings return strings.StringMethods(self) @property def _value_column(self) -> __builtins__.str: return self._block.value_columns[0] @property def _name(self) -> blocks.Label: return self._block.column_labels[0] @property def _dtype(self): return self._block.dtypes[0] def _set_block(self, block: blocks.Block): self._block = block def _get_block(self) -> blocks.Block: return self._block def _apply_unary_op( self, op: ops.UnaryOp, ) -> Series: """Applies a unary operator to the series.""" block, result_id = self._block.apply_unary_op( self._value_column, op, ) return Series(block.select_column(result_id), name=self.name) # type: ignore def _apply_binary_op( self, other: typing.Any, op: ops.BinaryOp, alignment: typing.Literal["outer", "left"] = "outer", reverse: bool = False, ) -> Series: """Applies a binary operator to the series and other.""" if bigframes.core.convert.can_convert_to_series(other): self_index = indexes.Index(self._block) other_series = bigframes.core.convert.to_bf_series( other, self_index, self._block.session ) (self_col, other_col, block) = self._align(other_series, how=alignment) name = self._name # Drop name if both objects have name attr, but they don't match if ( hasattr(other, "name") and other_series.name != self._name and alignment == "outer" ): name = None expr = op.as_expr( other_col if reverse else self_col, self_col if reverse else other_col ) block, result_id = block.project_expr(expr) block = block.select_column(result_id).with_column_labels([name]) return Series(block) # type: ignore else: # Scalar binop name = self._name expr = op.as_expr( ex.const(other) if reverse else self._value_column, self._value_column if reverse else ex.const(other), ) block, result_id = self._block.project_expr(expr) block = block.select_column(result_id).with_column_labels([name]) return Series(block) # type: ignore def _apply_nary_op( self, op: ops.NaryOp, others: Sequence[typing.Union[Series, scalars.Scalar]], ignore_self=False, ): """Applies an n-ary operator to the series and others.""" values, block = self._align_n( others, ignore_self=ignore_self, cast_scalars=False ) block, result_id = block.project_expr(op.as_expr(*values)) return Series(block.select_column(result_id)) def _apply_binary_aggregation( self, other: Series, stat: agg_ops.BinaryAggregateOp ) -> float: (left, right, block) = self._align(other, how="outer") assert isinstance(left, ex.DerefOp) assert isinstance(right, ex.DerefOp) return block.get_binary_stat(left.id.name, right.id.name, stat) AlignedExprT = Union[ex.ScalarConstantExpression, ex.DerefOp] @typing.overload def _align( self, other: Series, how="outer" ) -> tuple[ex.DerefOp, ex.DerefOp, blocks.Block,]: ... @typing.overload def _align( self, other: typing.Union[Series, scalars.Scalar], how="outer" ) -> tuple[ex.DerefOp, AlignedExprT, blocks.Block,]: ... def _align( self, other: typing.Union[Series, scalars.Scalar], how="outer" ) -> tuple[ex.DerefOp, AlignedExprT, blocks.Block,]: """Aligns the series value with another scalar or series object. Returns new left column id, right column id and joined tabled expression.""" values, block = self._align_n( [ other, ], how, ) return (typing.cast(ex.DerefOp, values[0]), values[1], block) def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scalar, how="left", cast_scalars: bool = True) -> tuple[ex.DerefOp, AlignedExprT, AlignedExprT, blocks.Block]: # type: ignore """Aligns the series value with 2 other scalars or series objects. Returns new values and joined tabled expression.""" values, index = self._align_n([other1, other2], how, cast_scalars=cast_scalars) return ( typing.cast(ex.DerefOp, values[0]), values[1], values[2], index, ) def _align_n( self, others: typing.Sequence[typing.Union[Series, scalars.Scalar]], how="outer", ignore_self=False, cast_scalars: bool = False, ) -> tuple[ typing.Sequence[Union[ex.ScalarConstantExpression, ex.DerefOp]], blocks.Block, ]: if ignore_self: value_ids: List[Union[ex.ScalarConstantExpression, ex.DerefOp]] = [] else: value_ids = [ex.deref(self._value_column)] block = self._block for other in others: if isinstance(other, Series): block, ( get_column_left, get_column_right, ) = block.join(other._block, how=how) rebindings = { ids.ColumnId(old): ids.ColumnId(new) for old, new in get_column_left.items() } remapped_value_ids = ( value.remap_column_refs(rebindings) for value in value_ids ) value_ids = [ *remapped_value_ids, # type: ignore ex.deref(get_column_right[other._value_column]), ] else: # Will throw if can't interpret as scalar. dtype = typing.cast(bigframes.dtypes.Dtype, self._dtype) value_ids = [ *value_ids, ex.const(other, dtype=dtype if cast_scalars else None), ] return (value_ids, block) def _throw_if_null_index(self, opname: __builtins__.str): if len(self._block.index_columns) == 0: raise bigframes.exceptions.NullIndexError( f"Series cannot perform {opname} as it has no index. Set an index using set_index." )
def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: return pandas.api.types.is_list_like(obj)