# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Series is a 1 dimensional data structure."""
from __future__ import annotations
import datetime
import functools
import inspect
import itertools
import numbers
import textwrap
import typing
from typing import (
Any,
Callable,
cast,
Iterable,
List,
Literal,
Mapping,
Optional,
overload,
Sequence,
Tuple,
Union,
)
import warnings
import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.series as vendored_pandas_series
import google.cloud.bigquery as bigquery
import numpy
import pandas
from pandas.api import extensions as pd_ext
import pyarrow as pa
import typing_extensions
import bigframes.core
from bigframes.core import agg_expressions, groupby, log_adapter
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
import bigframes.core.expression as ex
import bigframes.core.identifiers as ids
import bigframes.core.indexers
import bigframes.core.indexes as indexes
import bigframes.core.ordering as order
import bigframes.core.scalar as scalars
import bigframes.core.utils as utils
import bigframes.core.validations as validations
import bigframes.core.window
from bigframes.core.window import rolling
import bigframes.core.window_spec as windows
import bigframes.dataframe
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.formatting_helpers as formatter
import bigframes.functions
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.operations.blob as blob
import bigframes.operations.datetimes as dt
import bigframes.operations.lists as lists
import bigframes.operations.plotting as plotting
import bigframes.operations.python_op_maps as python_ops
import bigframes.operations.structs as structs
import bigframes.session
if typing.TYPE_CHECKING:
import bigframes.geopandas.geoseries
import bigframes.operations.strings as strings
LevelType = typing.Union[str, int]
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
_bigquery_function_recommendation_message = (
"Your functions could not be applied directly to the Series."
" Try converting it to a BigFrames BigQuery function."
)
_list = list # Type alias to escape Series.list property
[docs]
@log_adapter.class_logger
class Series(vendored_pandas_series.Series):
# Must be above 5000 for pandas to delegate to bigframes for binops
__pandas_priority__ = 13000
# Ensure mypy can more robustly determine the type of self._block since it
# gets set in various places.
_block: blocks.Block
[docs]
def __init__(
self,
data=None,
index=None,
dtype: Optional[bigframes.dtypes.DtypeString | bigframes.dtypes.Dtype] = None,
name: str | None = None,
copy: Optional[bool] = None,
*,
session: Optional[bigframes.session.Session] = None,
):
self._query_job: Optional[bigquery.QueryJob] = None
import bigframes.pandas
# Ignore object dtype if provided, as it provides no additional
# information about what BigQuery type to use.
if dtype is not None and bigframes.dtypes.is_object_like(dtype):
dtype = None
read_pandas_func = (
session.read_pandas
if (session is not None)
else (lambda x: bigframes.pandas.read_pandas(x))
)
block: typing.Optional[blocks.Block] = None
if (name is not None) and not isinstance(name, typing.Hashable):
raise ValueError(
f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}"
)
if copy is not None and not copy:
raise ValueError(
f"Series constructor only supports copy=True. {constants.FEEDBACK_LINK}"
)
if isinstance(data, blocks.Block):
block = data
elif isinstance(data, bigframes.pandas.Series):
block = data._get_block()
# special case where data is local scalar, but index is bigframes index (maybe very big)
elif (
not utils.is_list_like(data) and not isinstance(data, indexes.Index)
) and isinstance(index, indexes.Index):
block = index._block
block, _ = block.create_constant(data)
block = block.with_column_labels([None])
# prevents no-op reindex later
index = None
elif isinstance(data, indexes.Index) or isinstance(index, indexes.Index):
data = indexes.Index(data, dtype=dtype, name=name, session=session)
# set to none as it has already been applied, avoid re-cast later
if data.nlevels != 1:
raise NotImplementedError("Cannot interpret multi-index as Series.")
# Reset index to promote index columns to value columns, set default index
data_block = data._block.reset_index(drop=False).with_column_labels(
data.names
)
if index is not None: # Align data and index by offset
bf_index = indexes.Index(index, session=session)
idx_block = bf_index._block.reset_index(
drop=False
) # reset to align by offsets, and then reset back
idx_cols = idx_block.value_columns
data_block, (l_mapping, _) = idx_block.join(data_block, how="left")
data_block = data_block.set_index([l_mapping[col] for col in idx_cols])
data_block = data_block.with_index_labels(bf_index.names)
# prevents no-op reindex later
index = None
block = data_block
if block:
assert len(block.value_columns) == 1
assert len(block.column_labels) == 1
if index is not None: # reindexing operation
bf_index = indexes.Index(index)
idx_block = bf_index._block
idx_cols = idx_block.index_columns
block, _ = idx_block.join(block, how="left")
block = block.with_index_labels(bf_index.names)
if name:
block = block.with_column_labels([name])
if dtype:
bf_dtype = bigframes.dtypes.bigframes_type(dtype)
block = block.multi_apply_unary_op(ops.AsTypeOp(to_type=bf_dtype))
else:
if isinstance(dtype, str) and dtype.lower() == "json":
dtype = bigframes.dtypes.JSON_DTYPE
pd_series = pandas.Series(
data=data,
index=index, # type:ignore
dtype=dtype, # type:ignore
name=name,
)
block = read_pandas_func(pd_series)._get_block() # type:ignore
assert block is not None
self._block: blocks.Block = block
self._block.session._register_object(self)
@property
def dt(self) -> dt.DatetimeMethods:
return dt.DatetimeMethods(self)
@property
def dtype(self):
bigframes.dtypes.warn_on_db_dtypes_json_dtype([self._dtype])
return self._dtype
@property
def dtypes(self):
bigframes.dtypes.warn_on_db_dtypes_json_dtype([self._dtype])
return self._dtype
@property
def geo(self) -> bigframes.geopandas.geoseries.GeoSeries:
"""
Accessor object for geography properties of the Series values.
Returns:
bigframes.geopandas.geoseries.GeoSeries:
An accessor containing geography methods.
"""
import bigframes.geopandas.geoseries
return bigframes.geopandas.geoseries.GeoSeries(self)
@property
@validations.requires_index
def loc(self) -> bigframes.core.indexers.LocSeriesIndexer:
return bigframes.core.indexers.LocSeriesIndexer(self)
@property
@validations.requires_ordering()
def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer:
return bigframes.core.indexers.IlocSeriesIndexer(self)
@property
@validations.requires_ordering()
def iat(self) -> bigframes.core.indexers.IatSeriesIndexer:
return bigframes.core.indexers.IatSeriesIndexer(self)
@property
@validations.requires_index
def at(self) -> bigframes.core.indexers.AtSeriesIndexer:
return bigframes.core.indexers.AtSeriesIndexer(self)
@property
def name(self) -> blocks.Label:
return self._name
@name.setter
def name(self, label: blocks.Label):
new_block = self._block.with_column_labels([label])
self._set_block(new_block)
@property
def shape(self) -> typing.Tuple[int]:
return (self._block.shape[0],)
@property
def size(self) -> int:
return self.shape[0]
@property
def ndim(self) -> int:
return 1
@property
def empty(self) -> bool:
return self.shape[0] == 0
@property
def hasnans(self) -> bool:
# Note, hasnans is actually a null check, and NaNs don't count for nullable float
return self.isnull().any()
@property
def values(self) -> numpy.ndarray:
return self.to_numpy()
@property
@validations.requires_index
def index(self) -> indexes.Index:
return indexes.Index.from_frame(self)
[docs]
@validations.requires_index
def keys(self) -> indexes.Index:
return self.index
@property
def query_job(self) -> Optional[bigquery.QueryJob]:
"""BigQuery job metadata for the most recent query.
Returns:
The most recent `QueryJob
<https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_.
"""
if self._query_job is None:
self._set_internal_query_job(self._compute_dry_run())
return self._query_job
@property
def struct(self) -> structs.StructAccessor:
return structs.StructAccessor(self)
@property
def list(self) -> lists.ListAccessor:
return lists.ListAccessor(self)
@property
def blob(self) -> blob.BlobAccessor:
return blob.BlobAccessor(self)
@property
@validations.requires_ordering()
def T(self) -> Series:
return self.transpose()
@property
def _info_axis(self) -> indexes.Index:
return self.index
@property
def _session(self) -> bigframes.Session:
return self._get_block().expr.session
@property
def _struct_fields(self) -> List[str]:
if not bigframes.dtypes.is_struct_like(self._dtype):
return []
struct_type = typing.cast(pa.StructType, self._dtype.pyarrow_dtype)
return [struct_type.field(i).name for i in range(struct_type.num_fields)]
[docs]
@validations.requires_ordering()
def transpose(self) -> Series:
return self
def _set_internal_query_job(self, query_job: Optional[bigquery.QueryJob]):
self._query_job = query_job
def __len__(self):
return self.shape[0]
__len__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__len__)
def __iter__(self) -> typing.Iterator:
return itertools.chain.from_iterable(
map(lambda x: x.squeeze(axis=1), self._block.to_pandas_batches())
)
def __contains__(self, key) -> bool:
return key in self.index
[docs]
def copy(self) -> Series:
return Series(self._block)
@overload
def rename(
self,
index: Union[blocks.Label, Mapping[Any, Any]] = None,
) -> Series:
...
@overload
def rename(
self,
index: Union[blocks.Label, Mapping[Any, Any]] = None,
*,
inplace: Literal[False],
**kwargs,
) -> Series:
...
@overload
def rename(
self,
index: Union[blocks.Label, Mapping[Any, Any]] = None,
*,
inplace: Literal[True],
**kwargs,
) -> None:
...
[docs]
def rename(
self,
index: Union[blocks.Label, Mapping[Any, Any]] = None,
*,
inplace: bool = False,
**kwargs,
) -> Optional[Series]:
if len(kwargs) != 0:
raise NotImplementedError(
f"rename does not currently support any keyword arguments. {constants.FEEDBACK_LINK}"
)
# rename the index
if isinstance(index, Mapping):
index = typing.cast(Mapping[Any, Any], index)
block = self._block
for k, v in index.items():
new_idx_ids = []
for idx_id, idx_dtype in zip(block.index_columns, block.index.dtypes):
# Will throw if key type isn't compatible with index type, which leads to invalid SQL.
block.create_constant(k, dtype=idx_dtype)
# Will throw if value type isn't compatible with index type.
block, const_id = block.create_constant(v, dtype=idx_dtype)
block, cond_id = block.project_expr(
ops.ne_op.as_expr(idx_id, ex.const(k))
)
block, new_idx_id = block.apply_ternary_op(
idx_id, cond_id, const_id, ops.where_op
)
new_idx_ids.append(new_idx_id)
block = block.drop_columns([const_id, cond_id])
block = block.set_index(new_idx_ids, index_labels=block.index.names)
if inplace:
self._block = block
return None
else:
return Series(block)
# rename the Series name
if isinstance(index, typing.Hashable):
# Python 3.9 doesn't allow isinstance of Optional
index = typing.cast(Optional[str], index)
block = self._block.with_column_labels([index])
if inplace:
self._block = block
return None
else:
return Series(block)
raise ValueError(f"Unsupported type of parameter index: {type(index)}")
@overload
def rename_axis(
self,
mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]],
) -> Series:
...
@overload
def rename_axis(
self,
mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]],
*,
inplace: Literal[False],
**kwargs,
) -> Series:
...
@overload
def rename_axis(
self,
mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]],
*,
inplace: Literal[True],
**kwargs,
) -> None:
...
[docs]
@validations.requires_index
def rename_axis(
self,
mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]],
*,
inplace: bool = False,
**kwargs,
) -> Optional[Series]:
if len(kwargs) != 0:
raise NotImplementedError(
f"rename_axis does not currently support any keyword arguments. {constants.FEEDBACK_LINK}"
)
# limited implementation: the new index name is simply the 'mapper' parameter
if _is_list_like(mapper):
labels = mapper
else:
labels = [mapper]
block = self._block.with_index_labels(labels)
if inplace:
self._block = block
return None
else:
return Series(block)
[docs]
def equals(
self, other: typing.Union[Series, bigframes.dataframe.DataFrame]
) -> bool:
# Must be same object type, same column dtypes, and same label values
if not isinstance(other, Series):
return False
return block_ops.equals(self._block, other._block)
@overload # type: ignore[override]
def reset_index(
self,
level: blocks.LevelsType = ...,
*,
name: typing.Optional[str] = ...,
drop: Literal[False] = ...,
inplace: Literal[False] = ...,
allow_duplicates: Optional[bool] = ...,
) -> bigframes.dataframe.DataFrame:
...
@overload
def reset_index(
self,
level: blocks.LevelsType = ...,
*,
name: typing.Optional[str] = ...,
drop: Literal[True] = ...,
inplace: Literal[False] = ...,
allow_duplicates: Optional[bool] = ...,
) -> Series:
...
@overload
def reset_index(
self,
level: blocks.LevelsType = ...,
*,
name: typing.Optional[str] = ...,
drop: bool = ...,
inplace: Literal[True] = ...,
allow_duplicates: Optional[bool] = ...,
) -> None:
...
[docs]
@validations.requires_ordering()
def reset_index(
self,
level: blocks.LevelsType = None,
*,
name: typing.Optional[str] = None,
drop: bool = False,
inplace: bool = False,
allow_duplicates: Optional[bool] = None,
) -> bigframes.dataframe.DataFrame | Series | None:
if allow_duplicates is None:
allow_duplicates = False
block = self._block.reset_index(level, drop, allow_duplicates=allow_duplicates)
if drop:
if inplace:
self._set_block(block)
return None
return Series(block)
else:
if inplace:
raise ValueError(
"Series.reset_index cannot combine inplace=True and drop=False"
)
if name:
block = block.assign_label(self._value_column, name)
return bigframes.dataframe.DataFrame(block)
def __repr__(self) -> str:
# Protect against errors with uninitialized Series. See:
# https://github.com/googleapis/python-bigquery-dataframes/issues/728
if not hasattr(self, "_block"):
return object.__repr__(self)
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole series by using job
# metadata, like we do with DataFrame.
opts = bigframes.options.display
max_results = opts.max_rows
# anywdiget mode uses the same display logic as the "deferred" mode
# for faster execution
if opts.repr_mode in ("deferred", "anywidget"):
return formatter.repr_query_job(self._compute_dry_run())
self._cached()
pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
self._set_internal_query_job(query_job)
pd_series = pandas_df.iloc[:, 0]
import pandas.io.formats
# safe to mutate this, this dict is owned by this code, and does not affect global config
to_string_kwargs = pandas.io.formats.format.get_series_repr_params() # type: ignore
if len(self._block.index_columns) == 0:
to_string_kwargs.update({"index": False})
repr_string = pd_series.to_string(**to_string_kwargs)
return repr_string
[docs]
def astype(
self,
dtype: Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype],
*,
errors: Literal["raise", "null"] = "raise",
) -> Series:
if errors not in ["raise", "null"]:
raise ValueError("Argument 'errors' must be one of 'raise' or 'null'")
dtype = bigframes.dtypes.bigframes_type(dtype)
return self._apply_unary_op(
bigframes.operations.AsTypeOp(to_type=dtype, safe=(errors == "null"))
)
[docs]
def to_pandas(
self,
max_download_size: Optional[int] = None,
sampling_method: Optional[str] = None,
random_state: Optional[int] = None,
*,
ordered: bool = True,
dry_run: bool = False,
allow_large_results: Optional[bool] = None,
) -> pandas.Series:
"""Writes Series to pandas Series.
**Examples:**
>>> s = bpd.Series([4, 3, 2])
Download the data from BigQuery and convert it into an in-memory pandas Series.
>>> s.to_pandas()
0 4
1 3
2 2
dtype: Int64
Estimate job statistics without processing or downloading data by using `dry_run=True`.
>>> s.to_pandas(dry_run=True) # doctest: +SKIP
columnCount 1
columnDtypes {None: Int64}
indexLevel 1
indexDtypes [Int64]
projectId bigframes-dev
location US
jobType QUERY
destinationTable {'projectId': 'bigframes-dev', 'datasetId': '_...
useLegacySql False
referencedTables None
totalBytesProcessed 0
cacheHit False
statementType SELECT
creationTime 2025-04-03 18:54:59.219000+00:00
dtype: object
Args:
max_download_size (int, default None):
.. deprecated:: 2.0.0
``max_download_size`` parameter is deprecated. Please use ``to_pandas_batches()``
method instead.
Download size threshold in MB. If ``max_download_size`` is exceeded when downloading data,
the data will be downsampled if ``bigframes.options.sampling.enable_downsampling`` is
``True``, otherwise, an error will be raised. If set to a value other than ``None``,
this will supersede the global config.
sampling_method (str, default None):
.. deprecated:: 2.0.0
``sampling_method`` parameter is deprecated. Please use ``sample()`` method instead.
Downsampling algorithms to be chosen from, the choices are: "head": This algorithm
returns a portion of the data from the beginning. It is fast and requires minimal
computations to perform the downsampling; "uniform": This algorithm returns uniform
random samples of the data. If set to a value other than None, this will supersede
the global config.
random_state (int, default None):
.. deprecated:: 2.0.0
``random_state`` parameter is deprecated. Please use ``sample()`` method instead.
The seed for the uniform downsampling algorithm. If provided, the uniform method may
take longer to execute and require more computation. If set to a value other than
None, this will supersede the global config.
ordered (bool, default True):
Determines whether the resulting pandas series will be ordered.
In some cases, unordered may result in a faster-executing query.
dry_run (bool, default False):
If this argument is true, this method will not process the data. Instead, it returns
a Pandas Series containing dry run job statistics
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
Returns:
pandas.Series: A pandas Series with all rows of this Series if the data_sampling_threshold_mb
is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame. If dry_run
is set to True, a pandas Series containing dry run statistics will be returned.
"""
if max_download_size is not None:
msg = bfe.format_message(
"DEPRECATED: The `max_download_size` parameters for `Series.to_pandas()` "
"are deprecated and will be removed soon. Please use `Series.to_pandas_batches()`."
)
warnings.warn(msg, category=FutureWarning)
if sampling_method is not None or random_state is not None:
msg = bfe.format_message(
"DEPRECATED: The `sampling_method` and `random_state` parameters for "
"`Series.to_pandas()` are deprecated and will be removed soon. "
"Please use `Series.sample().to_pandas()` instead for sampling."
)
warnings.warn(msg, category=FutureWarning)
if dry_run:
dry_run_stats, dry_run_job = self._block._compute_dry_run(
max_download_size=max_download_size,
sampling_method=sampling_method,
random_state=random_state,
ordered=ordered,
)
self._set_internal_query_job(dry_run_job)
return dry_run_stats
# Repeat the to_pandas() call to make mypy deduce type correctly, because mypy cannot resolve
# Literal[True/False] to bool
df, query_job = self._block.to_pandas(
max_download_size=max_download_size,
sampling_method=sampling_method,
random_state=random_state,
ordered=ordered,
allow_large_results=allow_large_results,
)
if query_job:
self._set_internal_query_job(query_job)
series = df.squeeze(axis=1)
series.name = self._name
return series
[docs]
def to_pandas_batches(
self,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
*,
allow_large_results: Optional[bool] = None,
) -> Iterable[pandas.Series]:
"""Stream Series results to an iterable of pandas Series.
page_size and max_results determine the size and number of batches,
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result
**Examples:**
>>> s = bpd.Series([4, 3, 2, 2, 3])
Iterate through the results in batches, limiting the total rows yielded
across all batches via `max_results`:
>>> for s_batch in s.to_pandas_batches(max_results=3):
... print(s_batch)
0 4
1 3
2 2
dtype: Int64
Alternatively, control the approximate size of each batch using `page_size`
and fetch batches manually using `next()`:
>>> it = s.to_pandas_batches(page_size=2)
>>> next(it)
0 4
1 3
dtype: Int64
>>> next(it)
2 2
3 2
dtype: Int64
Args:
page_size (int, default None):
The maximum number of rows of each batch. Non-positive values are ignored.
max_results (int, default None):
The maximum total number of rows of all batches.
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
Returns:
Iterable[pandas.Series]:
An iterable of smaller Series which combine to
form the original Series. Results stream from bigquery,
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable
"""
batches = self._block.to_pandas_batches(
page_size=page_size,
max_results=max_results,
allow_large_results=allow_large_results,
)
return map(lambda df: cast(pandas.Series, df.squeeze(1)), batches)
def _compute_dry_run(self) -> bigquery.QueryJob:
_, query_job = self._block._compute_dry_run((self._value_column,))
return query_job
[docs]
def drop(
self,
labels: typing.Any = None,
*,
axis: typing.Union[int, str] = 0,
index: typing.Any = None,
columns: Union[blocks.Label, typing.Iterable[blocks.Label]] = None,
level: typing.Optional[LevelType] = None,
) -> Series:
if (labels is None) == (index is None):
raise ValueError("Must specify exactly one of 'labels' or 'index'")
if labels is not None:
index = labels
# ignore axis, columns params
block = self._block
level_id = self._resolve_levels(level or 0)[0]
if _is_list_like(index):
block, inverse_condition_id = block.apply_unary_op(
level_id, ops.IsInOp(values=tuple(index), match_nulls=True)
)
block, condition_id = block.apply_unary_op(
inverse_condition_id, ops.invert_op
)
else:
block, condition_id = block.project_expr(
ops.ne_op.as_expr(level_id, ex.const(index))
)
block = block.filter_by_id(condition_id, keep_null=True)
block = block.drop_columns([condition_id])
return Series(block.select_column(self._value_column))
[docs]
@validations.requires_index
def droplevel(self, level: LevelsType, axis: int | str = 0):
resolved_level_ids = self._resolve_levels(level)
return Series(self._block.drop_levels(resolved_level_ids))
[docs]
@validations.requires_index
def swaplevel(self, i: int = -2, j: int = -1):
level_i = self._block.index_columns[i]
level_j = self._block.index_columns[j]
mapping = {level_i: level_j, level_j: level_i}
reordering = [
mapping.get(index_id, index_id) for index_id in self._block.index_columns
]
return Series(self._block.reorder_levels(reordering))
[docs]
@validations.requires_index
def reorder_levels(self, order: LevelsType, axis: int | str = 0):
resolved_level_ids = self._resolve_levels(order)
return Series(self._block.reorder_levels(resolved_level_ids))
def _resolve_levels(self, level: LevelsType) -> typing.Sequence[str]:
return self._block.index.resolve_level(level)
[docs]
def between(self, left, right, inclusive="both"):
if inclusive not in ["both", "neither", "left", "right"]:
raise ValueError(
"Must set 'inclusive' to one of 'both', 'neither', 'left', or 'right'"
)
left_op = ops.ge_op if (inclusive in ["left", "both"]) else ops.gt_op
right_op = ops.le_op if (inclusive in ["right", "both"]) else ops.lt_op
return self._apply_binary_op(left, left_op).__and__(
self._apply_binary_op(right, right_op)
)
[docs]
def case_when(self, caselist) -> Series:
cases = []
for condition, output in itertools.chain(caselist, [(True, self)]):
cases.append(condition)
cases.append(output)
# In pandas, the default value if no case matches is the original value.
# This makes it impossible to change the type of the column, but if
# the condition is always True, we know it will match and no subsequent
# conditions matter (including the fallback to `self`). This break allows
# the type to change (see: internal issue 349926559).
if condition is True:
break
return self._apply_nary_op(
ops.case_when_op,
cases,
# Self is already included in "others".
ignore_self=True,
).rename(self.name)
[docs]
@validations.requires_ordering()
def cumsum(self) -> Series:
return self._apply_window_op(agg_ops.sum_op, windows.cumulative_rows())
[docs]
@validations.requires_ordering()
def ffill(self, *, limit: typing.Optional[int] = None) -> Series:
window = windows.rows(start=None if limit is None else -limit, end=0)
return self._apply_window_op(agg_ops.LastNonNullOp(), window)
pad = ffill
pad.__doc__ = inspect.getdoc(vendored_pandas_series.Series.ffill)
[docs]
@validations.requires_ordering()
def bfill(self, *, limit: typing.Optional[int] = None) -> Series:
window = windows.rows(start=0, end=limit)
return self._apply_window_op(agg_ops.FirstNonNullOp(), window)
[docs]
@validations.requires_ordering()
def cummax(self) -> Series:
return self._apply_window_op(agg_ops.max_op, windows.cumulative_rows())
[docs]
@validations.requires_ordering()
def cummin(self) -> Series:
return self._apply_window_op(agg_ops.min_op, windows.cumulative_rows())
[docs]
@validations.requires_ordering()
def cumprod(self) -> Series:
return self._apply_window_op(agg_ops.product_op, windows.cumulative_rows())
[docs]
@validations.requires_ordering()
def shift(self, periods: int = 1) -> Series:
window_spec = windows.rows()
return self._apply_window_op(agg_ops.ShiftOp(periods), window_spec)
[docs]
@validations.requires_ordering()
def diff(self, periods: int = 1) -> Series:
window_spec = windows.rows()
return self._apply_window_op(agg_ops.DiffOp(periods), window_spec)
[docs]
@validations.requires_ordering()
def pct_change(self, periods: int = 1) -> Series:
# Future versions of pandas will not perfrom ffill automatically
series = self.ffill()
return Series(block_ops.pct_change(series._block, periods=periods))
[docs]
@validations.requires_ordering()
def rank(
self,
axis=0,
method: str = "average",
numeric_only=False,
na_option: str = "keep",
ascending: bool = True,
pct: bool = False,
) -> Series:
return Series(
block_ops.rank(self._block, method, na_option, ascending, pct=pct)
)
[docs]
def fillna(self, value=None) -> Series:
return self._apply_binary_op(value, ops.fillna_op)
[docs]
def replace(
self, to_replace: typing.Any, value: typing.Any = None, *, regex: bool = False
):
if regex:
# No-op unless to_replace and series dtype are both string type
if not isinstance(to_replace, str) or not isinstance(
self.dtype, pandas.StringDtype
):
return self
return self._regex_replace(to_replace, value)
elif utils.is_dict_like(to_replace):
return self._mapping_replace(to_replace) # type: ignore
elif utils.is_list_like(to_replace):
replace_list = to_replace
else: # Scalar
replace_list = [to_replace]
replace_list = [
i for i in replace_list if bigframes.dtypes.is_compatible(i, self.dtype)
]
return self._simple_replace(replace_list, value) if replace_list else self
def _regex_replace(self, to_replace: str, value: str):
if not bigframes.dtypes.is_dtype(value, self.dtype):
raise NotImplementedError(
f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}"
)
block, result_col = self._block.apply_unary_op(
self._value_column,
ops.RegexReplaceStrOp(to_replace, value),
result_label=self.name,
)
return Series(block.select_column(result_col))
def _simple_replace(self, to_replace_list: typing.Sequence, value):
result_type = bigframes.dtypes.is_compatible(value, self.dtype)
if not result_type:
raise NotImplementedError(
f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}"
)
if result_type != self.dtype:
return self.astype(result_type)._simple_replace(to_replace_list, value)
block, cond = self._block.apply_unary_op(
self._value_column, ops.IsInOp(tuple(to_replace_list))
)
block, result_col = block.project_expr(
ops.where_op.as_expr(ex.const(value), cond, self._value_column), self.name
)
return Series(block.select_column(result_col))
def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]):
if not mapping:
return self.copy()
tuples = []
lcd_types: list[typing.Optional[bigframes.dtypes.Dtype]] = []
for key, value in mapping.items():
lcd_type = bigframes.dtypes.is_compatible(key, self.dtype)
if not lcd_type:
continue
if not bigframes.dtypes.is_dtype(value, self.dtype):
raise NotImplementedError(
f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}"
)
tuples.append((key, value))
lcd_types.append(lcd_type)
result_dtype = functools.reduce(
lambda t1, t2: bigframes.dtypes.lcd_type(t1, t2) if (t1 and t2) else None,
lcd_types,
self.dtype,
)
if not result_dtype:
raise NotImplementedError(
f"Cannot replace {self.dtype} elements with incompatible mapping {mapping} as mixed-type columns not supported. {constants.FEEDBACK_LINK}"
)
block, result = self._block.apply_unary_op(
self._value_column, ops.MapOp(tuple(tuples))
)
replaced = Series(block.select_column(result))
replaced.name = self.name
return replaced
[docs]
@validations.requires_ordering()
@validations.requires_index
def interpolate(self, method: str = "linear") -> Series:
if method == "pad":
return self.ffill()
result = block_ops.interpolate(self._block, method)
return Series(result)
[docs]
def dropna(
self,
*,
axis: int = 0,
inplace: bool = False,
how: typing.Optional[str] = None,
ignore_index: bool = False,
) -> Series:
if inplace:
raise NotImplementedError("'inplace'=True not supported")
result = block_ops.dropna(self._block, [self._value_column], how="any")
if ignore_index:
result = result.reset_index()
return Series(result)
[docs]
@validations.requires_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW)
def head(self, n: int = 5) -> Series:
return typing.cast(Series, self.iloc[0:n])
[docs]
@validations.requires_ordering()
def tail(self, n: int = 5) -> Series:
return typing.cast(Series, self.iloc[-n:])
[docs]
def peek(
self, n: int = 5, *, force: bool = True, allow_large_results=None
) -> pandas.Series:
"""
Preview n arbitrary elements from the series without guarantees about row selection or ordering.
``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires
full data scanning. Using ``force=True`` will always succeed, but may be perform queries.
Query results will be cached so that future steps will benefit from these queries.
Args:
n (int, default 5):
The number of rows to select from the series. Which N rows are returned is non-deterministic.
force (bool, default True):
If the data cannot be peeked efficiently, the series will instead be fully materialized as part
of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError.
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
Returns:
pandas.Series: A pandas Series with n rows.
Raises:
ValueError: If force=False and data cannot be efficiently peeked.
"""
maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results)
if maybe_result is None:
if force:
self._cached()
maybe_result = self._block.try_peek(
n, force=True, allow_large_results=allow_large_results
)
assert maybe_result is not None
else:
raise ValueError(
"Cannot peek efficiently when data has aggregates, joins or window functions applied. Use force=True to fully compute dataframe."
)
as_series = maybe_result.squeeze(axis=1)
as_series.name = self.name
return as_series
[docs]
def item(self):
# Docstring is in third_party/bigframes_vendored/pandas/core/series.py
return self.peek(2).item()
[docs]
def nlargest(self, n: int = 5, keep: str = "first") -> Series:
if keep not in ("first", "last", "all"):
raise ValueError("'keep must be one of 'first', 'last', or 'all'")
if keep != "all":
validations.enforce_ordered(self, "nlargest(keep != 'all')")
return Series(
block_ops.nlargest(self._block, n, [self._value_column], keep=keep)
)
[docs]
def nsmallest(self, n: int = 5, keep: str = "first") -> Series:
if keep not in ("first", "last", "all"):
raise ValueError("'keep must be one of 'first', 'last', or 'all'")
if keep != "all":
validations.enforce_ordered(self, "nsmallest(keep != 'all')")
return Series(
block_ops.nsmallest(self._block, n, [self._value_column], keep=keep)
)
[docs]
def isin(self, values) -> "Series":
if isinstance(values, Series):
return Series(self._block.isin(values._block))
if isinstance(values, indexes.Index):
return Series(self._block.isin(values.to_series()._block))
if not _is_list_like(values):
raise TypeError(
"only list-like objects are allowed to be passed to "
f"isin(), you passed a [{type(values).__name__}]"
)
return self._apply_unary_op(
ops.IsInOp(values=tuple(values), match_nulls=True)
).fillna(value=False)
[docs]
def isna(self) -> "Series":
return self._apply_unary_op(ops.isnull_op)
isnull = isna
isnull.__doc__ = inspect.getdoc(vendored_pandas_series.Series.isna)
[docs]
def notna(self) -> "Series":
return self._apply_unary_op(ops.notnull_op)
notnull = notna
notnull.__doc__ = inspect.getdoc(vendored_pandas_series.Series.notna)
def __and__(self, other: bool | int | Series) -> Series:
return self._apply_binary_op(other, ops.and_op)
__and__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__and__)
__rand__ = __and__
def __or__(self, other: bool | int | Series) -> Series:
return self._apply_binary_op(other, ops.or_op)
__or__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__or__)
__ror__ = __or__
def __xor__(self, other: bool | int | Series) -> Series:
return self._apply_binary_op(other, ops.xor_op)
__or__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__xor__)
__rxor__ = __xor__
def __add__(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self.add(other)
__add__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__add__)
def __radd__(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self.radd(other)
__radd__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__radd__)
[docs]
def add(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self._apply_binary_op(other, ops.add_op)
[docs]
def radd(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self._apply_binary_op(other, ops.add_op, reverse=True)
def __sub__(self, other: float | int | Series) -> Series:
return self.sub(other)
__sub__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__sub__)
def __rsub__(self, other: float | int | Series) -> Series:
return self.rsub(other)
__rsub__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rsub__)
[docs]
def sub(self, other) -> Series:
return self._apply_binary_op(other, ops.sub_op)
[docs]
def rsub(self, other) -> Series:
return self._apply_binary_op(other, ops.sub_op, reverse=True)
subtract = sub
subtract.__doc__ = inspect.getdoc(vendored_pandas_series.Series.sub)
def __mul__(self, other: float | int | Series) -> Series:
return self.mul(other)
__mul__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__mul__)
def __rmul__(self, other: float | int | Series) -> Series:
return self.rmul(other)
__rmul__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rmul__)
[docs]
def mul(self, other: float | int | Series) -> Series:
return self._apply_binary_op(other, ops.mul_op)
[docs]
def rmul(self, other: float | int | Series) -> Series:
return self._apply_binary_op(other, ops.mul_op, reverse=True)
multiply = mul
multiply.__doc__ = inspect.getdoc(vendored_pandas_series.Series.mul)
def __truediv__(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self.truediv(other)
__truediv__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__truediv__)
def __rtruediv__(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self.rtruediv(other)
__rtruediv__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rtruediv__)
[docs]
def truediv(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self._apply_binary_op(other, ops.div_op)
[docs]
def rtruediv(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self._apply_binary_op(other, ops.div_op, reverse=True)
truediv.__doc__ = inspect.getdoc(vendored_pandas_series.Series.truediv)
div = divide = truediv
rdiv = rtruediv
rdiv.__doc__ = inspect.getdoc(vendored_pandas_series.Series.rtruediv)
def __floordiv__(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self.floordiv(other)
__floordiv__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__floordiv__)
def __rfloordiv__(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self.rfloordiv(other)
__rfloordiv__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rfloordiv__)
[docs]
def floordiv(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self._apply_binary_op(other, ops.floordiv_op)
[docs]
def rfloordiv(self, other: float | int | pandas.Timedelta | Series) -> Series:
return self._apply_binary_op(other, ops.floordiv_op, reverse=True)
def __pow__(self, other: float | int | Series) -> Series:
return self.pow(other)
__pow__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__pow__)
def __rpow__(self, other: float | int | Series) -> Series:
return self.rpow(other)
__rpow__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rpow__)
[docs]
def pow(self, other: float | int | Series) -> Series:
return self._apply_binary_op(other, ops.pow_op)
[docs]
def rpow(self, other: float | int | Series) -> Series:
return self._apply_binary_op(other, ops.pow_op, reverse=True)
def __lt__(self, other: float | int | str | Series) -> Series:
return self.lt(other)
def __le__(self, other: float | int | str | Series) -> Series:
return self.le(other)
[docs]
def lt(self, other) -> Series:
return self._apply_binary_op(other, ops.lt_op)
[docs]
def le(self, other) -> Series:
return self._apply_binary_op(other, ops.le_op)
def __gt__(self, other: float | int | str | Series) -> Series:
return self.gt(other)
def __ge__(self, other: float | int | str | Series) -> Series:
return self.ge(other)
[docs]
def gt(self, other) -> Series:
return self._apply_binary_op(other, ops.gt_op)
[docs]
def ge(self, other) -> Series:
return self._apply_binary_op(other, ops.ge_op)
def __mod__(self, other) -> Series: # type: ignore
return self.mod(other)
__mod__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__mod__)
def __rmod__(self, other) -> Series: # type: ignore
return self.rmod(other)
__rmod__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rmod__)
[docs]
def mod(self, other) -> Series: # type: ignore
return self._apply_binary_op(other, ops.mod_op)
[docs]
def rmod(self, other) -> Series: # type: ignore
return self._apply_binary_op(other, ops.mod_op, reverse=True)
[docs]
def divmod(self, other) -> Tuple[Series, Series]: # type: ignore
# TODO(huanc): when self and other both has dtype int and other contains zeros,
# the output should be dtype float, both floordiv and mod returns dtype int in this case.
return (self.floordiv(other), self.mod(other))
[docs]
def rdivmod(self, other) -> Tuple[Series, Series]: # type: ignore
# TODO(huanc): when self and other both has dtype int and self contains zeros,
# the output should be dtype float, both floordiv and mod returns dtype int in this case.
return (self.rfloordiv(other), self.rmod(other))
[docs]
def dot(self, other):
return (self * other).sum()
def __matmul__(self, other):
return self.dot(other)
__matmul__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__matmul__)
def __rmatmul__(self, other):
return self.dot(other)
__rmatmul__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rmatmul__)
[docs]
def combine_first(self, other: Series) -> Series:
result = self._apply_binary_op(other, ops.coalesce_op)
result.name = self.name
return result
[docs]
def update(self, other: Union[Series, Sequence, Mapping]) -> None:
result = self._apply_binary_op(
other, ops.coalesce_op, reverse=True, alignment="left"
)
self._set_block(result._get_block())
def __abs__(self) -> Series:
return self.abs()
__abs__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.abs)
[docs]
def abs(self) -> Series:
return self._apply_unary_op(ops.abs_op)
[docs]
def round(self, decimals=0) -> "Series":
return self._apply_binary_op(decimals, ops.round_op)
[docs]
def corr(self, other: Series, method="pearson", min_periods=None) -> float:
# TODO(tbergeron): Validate early that both are numeric
# TODO(tbergeron): Handle partially-numeric columns
if method != "pearson":
raise NotImplementedError(
f"Only Pearson correlation is currently supported. {constants.FEEDBACK_LINK}"
)
if min_periods:
raise NotImplementedError(
f"min_periods not yet supported. {constants.FEEDBACK_LINK}"
)
return self._apply_binary_aggregation(other, agg_ops.CorrOp())
[docs]
def autocorr(self, lag: int = 1) -> float:
return self.corr(self.shift(lag))
[docs]
def cov(self, other: Series) -> float:
return self._apply_binary_aggregation(other, agg_ops.CovOp())
[docs]
def all(self) -> bool:
return typing.cast(bool, self._apply_aggregation(agg_ops.all_op))
[docs]
def any(self) -> bool:
return typing.cast(bool, self._apply_aggregation(agg_ops.any_op))
[docs]
def count(self) -> int:
return typing.cast(int, self._apply_aggregation(agg_ops.count_op))
[docs]
def nunique(self) -> int:
return typing.cast(int, self._apply_aggregation(agg_ops.nunique_op))
[docs]
def max(self) -> scalars.Scalar:
return self._apply_aggregation(agg_ops.max_op)
[docs]
def min(self) -> scalars.Scalar:
return self._apply_aggregation(agg_ops.min_op)
[docs]
def std(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.std_op))
[docs]
def var(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.var_op))
def _central_moment(self, n: int) -> float:
"""Useful helper for calculating central moment statistics"""
# Nth central moment is mean((x-mean(x))^n)
# See: https://en.wikipedia.org/wiki/Moment_(mathematics)
mean_deltas = self - self.mean()
delta_powers = mean_deltas**n
return delta_powers.mean()
[docs]
def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series:
if _is_list_like(func):
if self.dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE:
raise NotImplementedError(
f"Multiple aggregations only supported on numeric series. {constants.FEEDBACK_LINK}"
)
aggregations = [agg_ops.lookup_agg_func(f)[0] for f in func]
return Series(
self._block.summarize(
[self._value_column],
aggregations,
)
)
else:
return self._apply_aggregation(agg_ops.lookup_agg_func(func)[0])
aggregate = agg
aggregate.__doc__ = inspect.getdoc(vendored_pandas_series.Series.agg)
[docs]
def describe(self) -> Series:
from bigframes.pandas.core.methods import describe
return cast(Series, describe.describe(self, include="all"))
[docs]
def skew(self):
count = self.count()
if count < 3:
return pandas.NA
moment3 = self._central_moment(3)
moment2 = self.var() * (count - 1) / count # Convert sample var to pop var
# See G1 estimator:
# https://en.wikipedia.org/wiki/Skewness#Sample_skewness
numerator = moment3
denominator = moment2 ** (3 / 2)
adjustment = (count * (count - 1)) ** 0.5 / (count - 2)
return (numerator / denominator) * adjustment
[docs]
def kurt(self):
count = self.count()
if count < 4:
return pandas.NA
moment4 = self._central_moment(4)
moment2 = self.var() * (count - 1) / count # Convert sample var to pop var
# Kurtosis is often defined as the second standardize moment: moment(4)/moment(2)**2
# Pandas however uses Fisher’s estimator, implemented below
numerator = (count + 1) * (count - 1) * moment4
denominator = (count - 2) * (count - 3) * moment2**2
adjustment = 3 * (count - 1) ** 2 / ((count - 2) * (count - 3))
return (numerator / denominator) - adjustment
kurtosis = kurt
kurtosis.__doc__ = inspect.getdoc(vendored_pandas_series.Series.kurt)
[docs]
def mode(self) -> Series:
block = self._block
# Approach: Count each value, return each value for which count(x) == max(counts))
block, agg_ids = block.aggregate(
by_column_ids=[self._value_column],
aggregations=(
agg_expressions.UnaryAggregation(
agg_ops.count_op, ex.deref(self._value_column)
),
),
)
value_count_col_id = agg_ids[0]
block, max_value_count_col_id = block.apply_window_op(
value_count_col_id,
agg_ops.max_op,
window_spec=windows.unbound(),
)
block, is_mode_col_id = block.apply_binary_op(
value_count_col_id,
max_value_count_col_id,
ops.eq_op,
)
block = block.filter_by_id(is_mode_col_id)
# use temporary name for reset_index to avoid collision, restore after dropping extra columns
block = (
block.with_index_labels(["mode_temp_internal"])
.order_by([order.ascending_over(self._value_column)])
.reset_index(drop=False)
)
block = block.select_column(self._value_column).with_column_labels([self.name])
mode_values_series = Series(block.select_column(self._value_column))
return typing.cast(Series, mode_values_series)
[docs]
def mean(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.mean_op))
[docs]
def quantile(self, q: Union[float, Sequence[float]] = 0.5) -> Union[Series, float]:
qs = tuple(q) if utils.is_list_like(q) else (q,)
result = block_ops.quantile(self._block, (self._value_column,), qs=qs)
if utils.is_list_like(q):
# Drop the first level, since only one column
result = result.with_column_labels(result.column_labels.droplevel(0))
result, index_col = result.create_constant(self.name, None)
result = result.set_index([index_col])
return Series(
result.transpose(original_row_index=pandas.Index([self.name]))
)
else:
return cast(float, Series(result).to_pandas().squeeze())
[docs]
def sum(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.sum_op))
[docs]
def prod(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.product_op))
product = prod
product.__doc__ = inspect.getdoc(vendored_pandas_series.Series.prod)
def __eq__(self, other: object) -> Series: # type: ignore
return self.eq(other)
def __ne__(self, other: object) -> Series: # type: ignore
return self.ne(other)
def __invert__(self) -> Series:
return self._apply_unary_op(ops.invert_op)
__invert__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__invert__)
def __pos__(self) -> Series:
return self._apply_unary_op(ops.pos_op)
def __neg__(self) -> Series:
return self._apply_unary_op(ops.neg_op)
def __dir__(self) -> List[str]:
return dir(type(self)) + self._struct_fields
[docs]
def eq(self, other: object) -> Series:
# TODO: enforce stricter alignment
return self._apply_binary_op(other, ops.eq_op)
[docs]
def ne(self, other: object) -> Series:
# TODO: enforce stricter alignment
return self._apply_binary_op(other, ops.ne_op)
[docs]
def items(self):
for batch_df in self._block.to_pandas_batches():
assert (
batch_df.shape[1] == 1
), f"Expected 1 column in the dataframe, but got {batch_df.shape[1]}."
for item in batch_df.squeeze(axis=1).items():
yield item
def _apply_callable(self, condition):
""" "Executes the possible callable condition as needed."""
if callable(condition):
# When it's a bigframes function.
if hasattr(condition, "bigframes_bigquery_function"):
return self.apply(condition)
# When it's a plain Python function.
else:
return self.apply(condition, by_row=False)
# When it's not a callable.
return condition
[docs]
def where(self, cond, other=None):
cond = self._apply_callable(cond)
other = self._apply_callable(other)
value_id, cond_id, other_id, block = self._align3(cond, other)
block, result_id = block.project_expr(
ops.where_op.as_expr(value_id, cond_id, other_id)
)
return Series(block.select_column(result_id).with_column_labels([self.name]))
[docs]
def clip(self, lower=None, upper=None):
if lower is None and upper is None:
return self
if lower is None:
return self._apply_binary_op(upper, ops.minimum_op, alignment="left")
if upper is None:
return self._apply_binary_op(lower, ops.maximum_op, alignment="left")
# special rule to coerce scalar string args to date
value_id, lower_id, upper_id, block = self._align3(
lower, upper, cast_scalars=(bigframes.dtypes.is_date_like(self.dtype))
)
block, result_id = block.project_expr(
ops.clip_op.as_expr(value_id, lower_id, upper_id),
)
return Series(block.select_column(result_id).with_column_labels([self.name]))
[docs]
@validations.requires_ordering()
def argmax(self) -> int:
block, row_nums = self._block.promote_offsets()
block = block.order_by(
[
order.descending_over(self._value_column),
order.ascending_over(row_nums),
]
)
return typing.cast(
scalars.Scalar, Series(block.select_column(row_nums)).iloc[0]
)
[docs]
@validations.requires_ordering()
def argmin(self) -> int:
block, row_nums = self._block.promote_offsets()
block = block.order_by(
[
order.ascending_over(self._value_column),
order.ascending_over(row_nums),
]
)
return typing.cast(
scalars.Scalar, Series(block.select_column(row_nums)).iloc[0]
)
[docs]
@validations.requires_index
def unstack(self, level: LevelsType = -1):
if isinstance(level, int) or isinstance(level, str):
level = [level]
block = self._block
if self.index.nlevels == 1:
raise ValueError("Series must have multi-index to unstack")
# Pivot by index levels
unstack_ids = self._resolve_levels(level)
block = block.reset_index(drop=False)
block = block.set_index(
[col for col in self._block.index_columns if col not in unstack_ids]
)
pivot_block = block.pivot(
columns=unstack_ids,
values=self._block.value_columns,
values_in_index=False,
)
return bigframes.dataframe.DataFrame(pivot_block)
[docs]
@validations.requires_index
def idxmax(self) -> blocks.Label:
block = self._block.order_by(
[
order.descending_over(self._value_column),
*[
order.ascending_over(idx_col)
for idx_col in self._block.index_columns
],
]
)
block = block.slice(0, 1)
return indexes.Index(block).to_pandas()[0]
[docs]
@validations.requires_index
def idxmin(self) -> blocks.Label:
block = self._block.order_by(
[
order.ascending_over(self._value_column),
*[
order.ascending_over(idx_col)
for idx_col in self._block.index_columns
],
]
)
block = block.slice(0, 1)
return indexes.Index(block).to_pandas()[0]
@property
@validations.requires_ordering()
def is_monotonic_increasing(self) -> bool:
return typing.cast(
bool, self._block.is_monotonic_increasing(self._value_column)
)
@property
@validations.requires_ordering()
def is_monotonic_decreasing(self) -> bool:
return typing.cast(
bool, self._block.is_monotonic_decreasing(self._value_column)
)
def __getitem__(self, indexer):
# TODO: enforce stricter alignment, should fail if indexer is missing any keys.
use_iloc = (
isinstance(indexer, slice)
and all(
isinstance(x, numbers.Integral) or (x is None)
for x in [indexer.start, indexer.stop, indexer.step]
)
) or (
isinstance(indexer, numbers.Integral)
and not isinstance(self._block.index.dtypes[0], pandas.Int64Dtype)
)
if use_iloc:
return self.iloc[indexer]
if isinstance(indexer, Series):
(left, right, block) = self._align(indexer, "left")
block = block.filter(right)
block = block.select_column(left.id.name)
return Series(block)
return self.loc[indexer]
__getitem__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__getitem__)
def __getattr__(self, key: str):
# Protect against recursion errors with uninitialized Series objects.
# We use "_block" attribute to check whether the instance is initialized.
# See:
# https://github.com/googleapis/python-bigquery-dataframes/issues/728
# and
# https://nedbatchelder.com/blog/201010/surprising_getattr_recursion.html
if key == "_block":
raise AttributeError(key)
elif hasattr(pandas.Series, key):
log_adapter.submit_pandas_labels(
self._block.session.bqclient, self.__class__.__name__, key
)
raise AttributeError(
textwrap.dedent(
f"""
BigQuery DataFrames has not yet implemented an equivalent to
'pandas.Series.{key}'. {constants.FEEDBACK_LINK}
"""
)
)
elif key in self._struct_fields:
return self.struct.field(key)
else:
raise AttributeError(key)
def __setitem__(self, key, value) -> None:
"""Set item using direct assignment, delegating to .loc indexer."""
self.loc[key] = value
def _apply_aggregation(
self, op: agg_ops.UnaryAggregateOp | agg_ops.NullaryAggregateOp
) -> Any:
return self._block.get_stat(self._value_column, op)
def _apply_window_op(
self, op: agg_ops.UnaryWindowOp, window_spec: windows.WindowSpec
):
block = self._block
block, result_id = block.apply_window_op(
self._value_column, op, window_spec=window_spec, result_label=self.name
)
result = Series(block.select_column(result_id))
if op.skips_nulls:
return result.where(self.notna(), None)
else:
return result
[docs]
def value_counts(
self,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
*,
dropna: bool = True,
):
block = block_ops.value_counts(
self._block,
[self._value_column],
normalize=normalize,
ascending=ascending,
drop_na=dropna,
)
return Series(block)
@typing.overload # type: ignore[override]
def sort_values(
self,
*,
axis=...,
inplace: Literal[True] = ...,
ascending: bool | typing.Sequence[bool] = ...,
kind: str = ...,
na_position: typing.Literal["first", "last"] = ...,
) -> None:
...
@typing.overload
def sort_values(
self,
*,
axis=...,
inplace: Literal[False] = ...,
ascending: bool | typing.Sequence[bool] = ...,
kind: str = ...,
na_position: typing.Literal["first", "last"] = ...,
) -> Series:
...
[docs]
def sort_values(
self,
*,
axis=0,
inplace: bool = False,
ascending=True,
kind: str = "quicksort",
na_position: typing.Literal["first", "last"] = "last",
) -> Optional[Series]:
if axis != 0 and axis != "index":
raise ValueError(f"No axis named {axis} for object type Series")
if na_position not in ["first", "last"]:
raise ValueError("Param na_position must be one of 'first' or 'last'")
block = self._block.order_by(
[
order.ascending_over(self._value_column, (na_position == "last"))
if ascending
else order.descending_over(self._value_column, (na_position == "last"))
],
)
if inplace:
self._set_block(block)
return None
else:
return Series(block)
@typing.overload # type: ignore[override]
def sort_index(
self, *, axis=..., inplace: Literal[False] = ..., ascending=..., na_position=...
) -> Series:
...
@typing.overload
def sort_index(
self, *, axis=0, inplace: Literal[True] = ..., ascending=..., na_position=...
) -> None:
...
[docs]
@validations.requires_index
def sort_index(
self, *, axis=0, inplace: bool = False, ascending=True, na_position="last"
) -> Optional[Series]:
# TODO(tbergeron): Support level parameter once multi-index introduced.
if axis != 0 and axis != "index":
raise ValueError(f"No axis named {axis} for object type Series")
if na_position not in ["first", "last"]:
raise ValueError("Param na_position must be one of 'first' or 'last'")
block = self._block
na_last = na_position == "last"
ordering = [
order.ascending_over(column, na_last)
if ascending
else order.descending_over(column, na_last)
for column in block.index_columns
]
block = block.order_by(ordering)
if inplace:
self._set_block(block)
return None
else:
return Series(block)
[docs]
@validations.requires_ordering()
def rolling(
self,
window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
min_periods: int | None = None,
closed: Literal["right", "left", "both", "neither"] = "right",
) -> bigframes.core.window.Window:
if isinstance(window, int):
# Rows rolling
window_spec = windows.WindowSpec(
bounds=windows.RowsWindowBounds.from_window_size(window, closed),
min_periods=window if min_periods is None else min_periods,
)
return bigframes.core.window.Window(
self._block, window_spec, self._block.value_columns, is_series=True
)
return rolling.create_range_window(
block=self._block,
window=window,
min_periods=min_periods,
closed=closed,
is_series=True,
)
[docs]
@validations.requires_ordering()
def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window:
window_spec = windows.cumulative_rows(min_periods=min_periods)
return bigframes.core.window.Window(
self._block, window_spec, self._block.value_columns, is_series=True
)
[docs]
def groupby(
self,
by: typing.Union[
blocks.Label, Series, typing.Sequence[typing.Union[blocks.Label, Series]]
] = None,
axis=0,
level: typing.Optional[
int | str | typing.Sequence[int] | typing.Sequence[str]
] = None,
as_index: bool = True,
*,
dropna: bool = True,
) -> bigframes.core.groupby.SeriesGroupBy:
if (by is not None) and (level is not None):
raise ValueError("Do not specify both 'by' and 'level'")
if not as_index:
raise ValueError("as_index=False only valid with DataFrame")
if axis:
raise ValueError("No axis named {} for object type Series".format(level))
if not as_index:
raise ValueError("'as_index'=False only applies to DataFrame")
if by is not None:
return self._groupby_values(by, dropna)
if level is not None:
return self._groupby_level(level, dropna)
else:
raise TypeError("You have to supply one of 'by' and 'level'")
@validations.requires_index
def _groupby_level(
self,
level: int | str | typing.Sequence[int] | typing.Sequence[str],
dropna: bool = True,
) -> bigframes.core.groupby.SeriesGroupBy:
if utils.is_list_like(level):
by_key_is_singular = False
else:
by_key_is_singular = True
return groupby.SeriesGroupBy(
self._block,
self._value_column,
by_col_ids=self._resolve_levels(level),
value_name=self.name,
dropna=dropna,
by_key_is_singular=by_key_is_singular,
)
def _groupby_values(
self,
by: typing.Union[
blocks.Label, Series, typing.Sequence[typing.Union[blocks.Label, Series]]
],
dropna: bool = True,
) -> bigframes.core.groupby.SeriesGroupBy:
if not isinstance(by, Series) and _is_list_like(by):
by = list(by)
by_key_is_singular = False
else:
by = [typing.cast(typing.Union[blocks.Label, Series], by)]
by_key_is_singular = True
block = self._block
grouping_cols: typing.Sequence[str] = []
value_col = self._value_column
for key in by:
if isinstance(key, Series):
block, (
get_column_left,
get_column_right,
) = block.join(key._block, how="inner" if dropna else "left")
value_col = get_column_left[value_col]
grouping_cols = [
*[get_column_left[value] for value in grouping_cols],
get_column_right[key._value_column],
]
else:
# Interpret as index level
matches = block.index_name_to_col_id.get(key, [])
if len(matches) != 1:
raise ValueError(
f"GroupBy key {key} does not match a unique index level. BigQuery DataFrames only interprets lists of strings as index level names, not directly as per-row group assignments."
)
grouping_cols = [*grouping_cols, matches[0]]
return groupby.SeriesGroupBy(
block,
value_col,
by_col_ids=grouping_cols,
value_name=self.name,
dropna=dropna,
by_key_is_singular=by_key_is_singular,
)
[docs]
def apply(
self,
func,
by_row: typing.Union[typing.Literal["compat"], bool] = "compat",
*,
args: typing.Tuple = (),
) -> Series:
# Note: This signature differs from pandas.Series.apply. Specifically,
# `args` is keyword-only and `by_row` is a custom parameter here. Full
# alignment would involve breaking changes. However, given that by_row
# is not frequently used, we defer any such changes until there is a
# clear need based on user feedback.
#
# See pandas docs for reference:
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.apply.html
# TODO(shobs, b/274645634): Support convert_dtype, **kwargs
# is actually a ternary op
if by_row not in ["compat", False]:
raise ValueError("Param by_row must be one of 'compat' or False")
if not callable(func) and not isinstance(func, numpy.ufunc):
raise ValueError(
"Only a ufunc (a function that applies to the entire Series) or"
" a BigFrames BigQuery function that only works on single values"
" are supported."
)
if isinstance(func, bigframes.functions.BigqueryCallableRoutine):
# We are working with bigquery function at this point
if args:
result_series = self._apply_nary_op(
ops.NaryRemoteFunctionOp(function_def=func.udf_def), args
)
# TODO(jialuo): Investigate why `_apply_nary_op` drops the series
# `name`. Manually reassigning it here as a temporary fix.
result_series.name = self.name
else:
result_series = self._apply_unary_op(
ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True)
)
result_series = func._post_process_series(result_series)
return result_series
bf_op = python_ops.python_callable_to_op(func)
if bf_op and isinstance(bf_op, ops.UnaryOp):
return self._apply_unary_op(bf_op)
# It is neither a remote function nor a managed function.
# Then it must be a vectorized function that applies to the Series
# as a whole.
if by_row:
raise ValueError(
"You have passed a function as-is. If your intention is to "
"apply this function in a vectorized way (i.e. to the "
"entire Series as a whole, and you are sure that it "
"performs only the operations that are implemented for a "
"Series (e.g. a chain of arithmetic/logical operations, "
"such as `def foo(s): return s % 2 == 1`), please also "
"specify `by_row=False`. If your function contains "
"arbitrary code, it can only be applied to every element "
"in the Series individually, in which case you must "
"convert it to a BigFrames BigQuery function using "
"`bigframes.pandas.udf`, "
"or `bigframes.pandas.remote_function` before passing."
)
try:
return func(self) # type: ignore
except Exception as ex:
# This could happen if any of the operators in func is not
# supported on a Series. Let's guide the customer to use a
# bigquery function instead
if hasattr(ex, "message"):
ex.message += f"\n{_bigquery_function_recommendation_message}"
raise
[docs]
def combine(
self,
other,
func,
) -> Series:
if not callable(func) and not isinstance(func, numpy.ufunc):
raise ValueError(
"Only a ufunc (a function that applies to the entire Series) or"
" a BigFrames BigQuery function that only works on single values"
" are supported."
)
if isinstance(func, bigframes.functions.BigqueryCallableRoutine):
result_series = self._apply_binary_op(
other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def)
)
result_series = func._post_process_series(result_series)
return result_series
bf_op = python_ops.python_callable_to_op(func)
if bf_op and isinstance(bf_op, ops.BinaryOp):
result_series = self._apply_binary_op(other, bf_op)
return result_series
# Keep this in sync with .apply
try:
return func(self, other)
except Exception as ex:
# This could happen if any of the operators in func is not
# supported on a Series. Let's guide the customer to use a
# bigquery function instead
if hasattr(ex, "message"):
ex.message += f"\n{_bigquery_function_recommendation_message}"
raise
[docs]
@validations.requires_index
def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series:
return Series(self._get_block().add_prefix(prefix))
[docs]
@validations.requires_index
def add_suffix(self, suffix: str, axis: int | str | None = None) -> Series:
return Series(self._get_block().add_suffix(suffix))
[docs]
def take(
self, indices: typing.Sequence[int], axis: int | str | None = 0, **kwargs
) -> Series:
if not utils.is_list_like(indices):
raise ValueError("indices should be a list-like object.")
return typing.cast(Series, self.iloc[indices])
[docs]
def filter(
self,
items: typing.Optional[typing.Iterable] = None,
like: typing.Optional[str] = None,
regex: typing.Optional[str] = None,
axis: typing.Optional[typing.Union[str, int]] = None,
) -> Series:
if (axis is not None) and utils.get_axis_number(axis) != 0:
raise ValueError(f"Invalid axis for series: {axis}")
if sum([(items is not None), (like is not None), (regex is not None)]) != 1:
raise ValueError(
"Need to provide exactly one of 'items', 'like', or 'regex'"
)
if len(self._block.index_columns) > 1:
raise NotImplementedError(
f"Method filter does not support rows multiindex. {constants.FEEDBACK_LINK}"
)
if (like is not None) or (regex is not None):
block = self._block
block, label_string_id = block.apply_unary_op(
self._block.index_columns[0],
ops.AsTypeOp(to_type=pandas.StringDtype(storage="pyarrow")),
)
if like is not None:
block, mask_id = block.apply_unary_op(
label_string_id, ops.StrContainsOp(pat=like)
)
else: # regex
assert regex is not None
block, mask_id = block.apply_unary_op(
label_string_id, ops.StrContainsRegexOp(pat=regex)
)
block = block.filter_by_id(mask_id)
block = block.select_columns([self._value_column])
return Series(block)
elif items is not None:
# Behavior matches pandas 2.1+, older pandas versions would reindex
block = self._block
block, mask_id = block.apply_unary_op(
self._block.index_columns[0], ops.IsInOp(values=tuple(items))
)
block = block.filter_by_id(mask_id)
block = block.select_columns([self._value_column])
return Series(block)
else:
raise ValueError("Need to provide 'items', 'like', or 'regex'")
[docs]
@validations.requires_index
def reindex(self, index=None, *, validate: typing.Optional[bool] = None):
if validate and not self.index.is_unique:
raise ValueError("Original index must be unique to reindex")
keep_original_names = False
if isinstance(index, indexes.Index):
new_indexer = bigframes.dataframe.DataFrame(data=index._block)[[]]
else:
if not isinstance(index, pandas.Index):
keep_original_names = True
index = pandas.Index(index)
if index.nlevels != self.index.nlevels:
raise NotImplementedError(
"Cannot reindex with index with different nlevels"
)
new_indexer = bigframes.dataframe.DataFrame(
index=index, session=self._get_block().expr.session
)[[]]
# multiindex join is senstive to index names, so we will set all these
result = new_indexer.rename_axis(range(new_indexer.index.nlevels)).join(
self.to_frame().rename_axis(range(self.index.nlevels)),
how="left",
)
# and then reset the names after the join
result_block = result.rename_axis(
self.index.names if keep_original_names else index.names
)._block
return Series(result_block)
[docs]
@validations.requires_index
def reindex_like(self, other: Series, *, validate: typing.Optional[bool] = None):
return self.reindex(other.index, validate=validate)
[docs]
def drop_duplicates(self, *, keep: str = "first") -> Series:
if keep is not False:
validations.enforce_ordered(self, "drop_duplicates(keep != False)")
block = block_ops.drop_duplicates(self._block, (self._value_column,), keep)
return Series(block)
[docs]
def unique(self, keep_order=True) -> Series:
if keep_order:
validations.enforce_ordered(self, "unique(keep_order != False)")
return self.drop_duplicates()
block, result = self._block.aggregate(
[self._value_column],
[
agg_expressions.UnaryAggregation(
agg_ops.AnyValueOp(), ex.deref(self._value_column)
)
],
column_labels=self._block.column_labels,
dropna=False,
)
return Series(block.select_columns(result).reset_index())
[docs]
def duplicated(self, keep: str = "first") -> Series:
if keep is not False:
validations.enforce_ordered(self, "duplicated(keep != False)")
block, indicator = block_ops.indicate_duplicates(
self._block, (self._value_column,), keep
)
return Series(
block.select_column(
indicator,
).with_column_labels([self.name])
)
[docs]
def mask(self, cond, other=None) -> Series:
cond = self._apply_callable(cond)
other = self._apply_callable(other)
if not isinstance(cond, Series):
raise TypeError(
f"Only bigframes series condition is supported, received {type(cond).__name__}. "
f"{constants.FEEDBACK_LINK}"
)
return self.where(~cond, other)
[docs]
def to_frame(self, name: blocks.Label = None) -> bigframes.dataframe.DataFrame:
provided_name = name if name else self.name
# To be consistent with Pandas, it assigns 0 as the column name if missing. 0 is the first element of RangeIndex.
block = self._block.with_column_labels(
[provided_name] if provided_name else [0]
)
return bigframes.dataframe.DataFrame(block)
[docs]
def to_csv(
self,
path_or_buf=None,
sep=",",
*,
header: bool = True,
index: bool = True,
allow_large_results: Optional[bool] = None,
) -> Optional[str]:
if utils.is_gcs_path(path_or_buf):
return self.to_frame().to_csv(
path_or_buf,
sep=sep,
header=header,
index=index,
allow_large_results=allow_large_results,
)
else:
pd_series = self.to_pandas(allow_large_results=allow_large_results)
return pd_series.to_csv(
path_or_buf=path_or_buf, sep=sep, header=header, index=index
)
[docs]
def to_dict(
self,
into: type[dict] = dict,
*,
allow_large_results: Optional[bool] = None,
) -> typing.Mapping:
return typing.cast(dict, self.to_pandas(allow_large_results=allow_large_results).to_dict(into)) # type: ignore
[docs]
def to_excel(
self, excel_writer, sheet_name="Sheet1", *, allow_large_results=None, **kwargs
) -> None:
return self.to_pandas(allow_large_results=allow_large_results).to_excel(
excel_writer, sheet_name, **kwargs
)
[docs]
def to_json(
self,
path_or_buf=None,
orient: Optional[
typing.Literal["split", "records", "index", "columns", "values", "table"]
] = None,
*,
lines: bool = False,
index: bool = True,
allow_large_results: Optional[bool] = None,
) -> Optional[str]:
if utils.is_gcs_path(path_or_buf):
return self.to_frame().to_json(
path_or_buf=path_or_buf,
orient=orient,
lines=lines,
index=index,
allow_large_results=allow_large_results,
)
else:
pd_series = self.to_pandas(allow_large_results=allow_large_results)
return pd_series.to_json(
path_or_buf=path_or_buf, orient=orient, lines=lines, index=index # type: ignore
)
[docs]
def to_latex(
self,
buf=None,
columns=None,
header=True,
index=True,
*,
allow_large_results=None,
**kwargs,
) -> typing.Optional[str]:
return self.to_pandas(allow_large_results=allow_large_results).to_latex(
buf, columns=columns, header=header, index=index, **kwargs
)
[docs]
def tolist(
self,
*,
allow_large_results: Optional[bool] = None,
) -> _list:
return self.to_pandas(allow_large_results=allow_large_results).to_list()
to_list = tolist
to_list.__doc__ = inspect.getdoc(vendored_pandas_series.Series.tolist)
[docs]
def to_markdown(
self,
buf: typing.IO[str] | None = None,
mode: str = "wt",
index: bool = True,
*,
allow_large_results: Optional[bool] = None,
**kwargs,
) -> typing.Optional[str]:
return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore
[docs]
def to_numpy(
self,
dtype=None,
copy=False,
na_value=pd_ext.no_default,
*,
allow_large_results=None,
**kwargs,
) -> numpy.ndarray:
return self.to_pandas(allow_large_results=allow_large_results).to_numpy(
dtype, copy, na_value, **kwargs
)
def __array__(self, dtype=None, copy: Optional[bool] = None) -> numpy.ndarray:
if copy is False:
raise ValueError("Cannot convert to array without copy.")
return self.to_numpy(dtype=dtype)
__array__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__array__)
[docs]
def to_pickle(self, path, *, allow_large_results=None, **kwargs) -> None:
return self.to_pandas(allow_large_results=allow_large_results).to_pickle(
path, **kwargs
)
[docs]
def to_string(
self,
buf=None,
na_rep="NaN",
float_format=None,
header=True,
index=True,
length=False,
dtype=False,
name=False,
max_rows=None,
min_rows=None,
*,
allow_large_results=None,
) -> typing.Optional[str]:
return self.to_pandas(allow_large_results=allow_large_results).to_string(
buf,
na_rep,
float_format,
header,
index,
length,
dtype,
name,
max_rows,
min_rows,
)
[docs]
def to_xarray(
self,
*,
allow_large_results: Optional[bool] = None,
):
return self.to_pandas(allow_large_results=allow_large_results).to_xarray()
def _throw_if_index_contains_duplicates(
self, error_message: typing.Optional[str] = None
) -> None:
if not self.index.is_unique:
error_message = (
error_message
if error_message
else "Index contains duplicate entries, but uniqueness is required."
)
raise pandas.errors.InvalidIndexError(error_message)
[docs]
def map(
self,
arg: typing.Union[Mapping, Series, Callable],
na_action: Optional[str] = None,
*,
verify_integrity: bool = False,
) -> Series:
if na_action:
raise NotImplementedError(
f"Non-None na_action argument is not yet supported for Series.map. {constants.FEEDBACK_LINK}"
)
if isinstance(arg, Series):
if verify_integrity:
error_message = "When verify_integrity is True in Series.map, index of arg parameter must not have duplicate entries."
arg._throw_if_index_contains_duplicates(error_message=error_message)
map_df = bigframes.dataframe.DataFrame(arg._block)
map_df = map_df.rename(columns={arg.name: self.name})
elif isinstance(arg, Mapping):
map_df = bigframes.dataframe.DataFrame(
{"keys": list(arg.keys()), self.name: list(arg.values())}, # type: ignore
session=self._get_block().expr.session,
)
map_df = map_df.set_index("keys")
elif callable(arg):
# This is for remote function and managed funtion.
return self.apply(arg)
else:
# Mirroring pandas, call the uncallable object
arg() # throws TypeError: object is not callable
self_df = self.to_frame(name="series")
result_df = self_df.join(map_df, on="series")
return result_df[self.name]
[docs]
@validations.requires_ordering()
def sample(
self,
n: Optional[int] = None,
frac: Optional[float] = None,
*,
random_state: Optional[int] = None,
sort: Optional[bool | Literal["random"]] = "random",
) -> Series:
if n is not None and frac is not None:
raise ValueError("Only one of 'n' or 'frac' parameter can be specified.")
ns = (n,) if n is not None else ()
fracs = (frac,) if frac is not None else ()
return Series(
self._block.split(ns=ns, fracs=fracs, random_state=random_state, sort=sort)[
0
]
)
[docs]
def explode(self, *, ignore_index: Optional[bool] = False) -> Series:
return Series(
self._block.explode(
column_ids=[self._value_column], ignore_index=ignore_index
)
)
[docs]
@validations.requires_ordering()
def resample(
self,
rule: str,
*,
closed: Optional[Literal["right", "left"]] = None,
label: Optional[Literal["right", "left"]] = None,
level: Optional[LevelsType] = None,
origin: Union[
Union[
pandas.Timestamp, datetime.datetime, numpy.datetime64, int, float, str
],
Literal["epoch", "start", "start_day", "end", "end_day"],
] = "start_day",
) -> bigframes.core.groupby.SeriesGroupBy:
block = self._block._generate_resample_label(
rule=rule,
closed=closed,
label=label,
on=None,
level=level,
origin=origin,
)
series = Series(block)
return series.groupby(level=0)
def __array_ufunc__(
self, ufunc: numpy.ufunc, method: str, *inputs, **kwargs
) -> Series:
"""Used to support numpy ufuncs.
See: https://numpy.org/doc/stable/reference/ufuncs.html
"""
# Only __call__ supported with zero arguments
if method != "__call__" or len(inputs) > 2 or len(kwargs) > 0:
return NotImplemented
if len(inputs) == 1 and ufunc in ops.NUMPY_TO_OP:
return self._apply_unary_op(ops.NUMPY_TO_OP[ufunc])
if len(inputs) == 2 and ufunc in ops.NUMPY_TO_BINOP:
binop = ops.NUMPY_TO_BINOP[ufunc]
if inputs[0] is self:
return self._apply_binary_op(inputs[1], binop)
else:
return self._apply_binary_op(inputs[0], binop, reverse=True)
return NotImplemented
@property
def plot(self):
return plotting.PlotAccessor(self)
[docs]
def hist(
self, by: typing.Optional[typing.Sequence[str]] = None, bins: int = 10, **kwargs
):
return self.plot.hist(by=by, bins=bins, **kwargs)
hist.__doc__ = inspect.getdoc(plotting.PlotAccessor.hist)
[docs]
def line(
self,
x: typing.Optional[typing.Hashable] = None,
y: typing.Optional[typing.Hashable] = None,
**kwargs,
):
return self.plot.line(x=x, y=y, **kwargs)
line.__doc__ = inspect.getdoc(plotting.PlotAccessor.line)
[docs]
def area(
self,
x: typing.Optional[typing.Hashable] = None,
y: typing.Optional[typing.Hashable] = None,
stacked: bool = True,
**kwargs,
):
return self.plot.area(x=x, y=y, stacked=stacked, **kwargs)
area.__doc__ = inspect.getdoc(plotting.PlotAccessor.area)
[docs]
def bar(
self,
x: typing.Optional[typing.Hashable] = None,
y: typing.Optional[typing.Hashable] = None,
**kwargs,
):
return self.plot.bar(x=x, y=y, **kwargs)
bar.__doc__ = inspect.getdoc(plotting.PlotAccessor.bar)
def _slice(
self,
start: typing.Optional[int] = None,
stop: typing.Optional[int] = None,
step: typing.Optional[int] = None,
) -> Series:
return Series(
self._block.slice(
start=start, stop=stop, step=step if (step is not None) else 1
).select_column(self._value_column),
)
[docs]
def cache(self):
"""
Materializes the Series to a temporary table.
Useful if the series will be used multiple times, as this will avoid recomputating the shared intermediate value.
Returns:
Series: Self
"""
# Do not use session-aware cashing if user-requested
return self._cached(force=True, session_aware=False)
def _cached(self, *, force: bool = True, session_aware: bool = True) -> Series:
self._block.cached(force=force, session_aware=session_aware)
return self
# Keep this at the bottom of the Series class to avoid
# confusing type checker by overriding str
@property
def str(self) -> strings.StringMethods:
import bigframes.operations.strings as strings
return strings.StringMethods(self)
@property
def _value_column(self) -> __builtins__.str:
return self._block.value_columns[0]
@property
def _name(self) -> blocks.Label:
return self._block.column_labels[0]
@property
def _dtype(self):
return self._block.dtypes[0]
def _set_block(self, block: blocks.Block):
self._block = block
def _get_block(self) -> blocks.Block:
return self._block
def _apply_unary_op(
self,
op: ops.UnaryOp,
) -> Series:
"""Applies a unary operator to the series."""
block, result_id = self._block.apply_unary_op(
self._value_column, op, result_label=self._name
)
return Series(block.select_column(result_id))
def _apply_binary_op(
self,
other: typing.Any,
op: ops.BinaryOp,
alignment: typing.Literal["outer", "left"] = "outer",
reverse: bool = False,
) -> Series:
"""Applies a binary operator to the series and other."""
if bigframes.core.convert.can_convert_to_series(other):
self_index = indexes.Index(self._block)
other_series = bigframes.core.convert.to_bf_series(
other, self_index, self._block.session
)
(self_col, other_col, block) = self._align(other_series, how=alignment)
name = self._name
# Drop name if both objects have name attr, but they don't match
if (
hasattr(other, "name")
and other_series.name != self._name
and alignment == "outer"
):
name = None
expr = op.as_expr(
other_col if reverse else self_col, self_col if reverse else other_col
)
block, result_id = block.project_expr(expr, name)
return Series(block.select_column(result_id))
else: # Scalar binop
name = self._name
expr = op.as_expr(
ex.const(other) if reverse else self._value_column,
self._value_column if reverse else ex.const(other),
)
block, result_id = self._block.project_expr(expr, name)
return Series(block.select_column(result_id))
def _apply_nary_op(
self,
op: ops.NaryOp,
others: Sequence[typing.Union[Series, scalars.Scalar]],
ignore_self=False,
):
"""Applies an n-ary operator to the series and others."""
values, block = self._align_n(
others, ignore_self=ignore_self, cast_scalars=False
)
block, result_id = block.project_expr(op.as_expr(*values))
return Series(block.select_column(result_id))
def _apply_binary_aggregation(
self, other: Series, stat: agg_ops.BinaryAggregateOp
) -> float:
(left, right, block) = self._align(other, how="outer")
assert isinstance(left, ex.DerefOp)
assert isinstance(right, ex.DerefOp)
return block.get_binary_stat(left.id.name, right.id.name, stat)
AlignedExprT = Union[ex.ScalarConstantExpression, ex.DerefOp]
@typing.overload
def _align(
self, other: Series, how="outer"
) -> tuple[ex.DerefOp, ex.DerefOp, blocks.Block,]:
...
@typing.overload
def _align(
self, other: typing.Union[Series, scalars.Scalar], how="outer"
) -> tuple[ex.DerefOp, AlignedExprT, blocks.Block,]:
...
def _align(
self, other: typing.Union[Series, scalars.Scalar], how="outer"
) -> tuple[ex.DerefOp, AlignedExprT, blocks.Block,]:
"""Aligns the series value with another scalar or series object. Returns new left column id, right column id and joined tabled expression."""
values, block = self._align_n(
[
other,
],
how,
)
return (typing.cast(ex.DerefOp, values[0]), values[1], block)
def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scalar, how="left", cast_scalars: bool = True) -> tuple[ex.DerefOp, AlignedExprT, AlignedExprT, blocks.Block]: # type: ignore
"""Aligns the series value with 2 other scalars or series objects. Returns new values and joined tabled expression."""
values, index = self._align_n([other1, other2], how, cast_scalars=cast_scalars)
return (
typing.cast(ex.DerefOp, values[0]),
values[1],
values[2],
index,
)
def _align_n(
self,
others: typing.Sequence[typing.Union[Series, scalars.Scalar]],
how="outer",
ignore_self=False,
cast_scalars: bool = False,
) -> tuple[
typing.Sequence[Union[ex.ScalarConstantExpression, ex.DerefOp]],
blocks.Block,
]:
if ignore_self:
value_ids: List[Union[ex.ScalarConstantExpression, ex.DerefOp]] = []
else:
value_ids = [ex.deref(self._value_column)]
block = self._block
for other in others:
if isinstance(other, Series):
block, (
get_column_left,
get_column_right,
) = block.join(other._block, how=how)
rebindings = {
ids.ColumnId(old): ids.ColumnId(new)
for old, new in get_column_left.items()
}
remapped_value_ids = (
value.remap_column_refs(rebindings) for value in value_ids
)
value_ids = [
*remapped_value_ids, # type: ignore
ex.deref(get_column_right[other._value_column]),
]
else:
# Will throw if can't interpret as scalar.
dtype = typing.cast(bigframes.dtypes.Dtype, self._dtype)
value_ids = [
*value_ids,
ex.const(other, dtype=dtype if cast_scalars else None),
]
return (value_ids, block)
def _throw_if_null_index(self, opname: __builtins__.str):
if len(self._block.index_columns) == 0:
raise bigframes.exceptions.NullIndexError(
f"Series cannot perform {opname} as it has no index. Set an index using set_index."
)
def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]:
return pandas.api.types.is_list_like(obj)