# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import re
from typing import Generic, Hashable, Literal, Optional, TypeVar, Union
import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.strings.accessor as vendorstr
from bigframes.core import log_adapter
import bigframes.core.indexes.base as indices
import bigframes.dataframe as df
import bigframes.operations as ops
from bigframes.operations._op_converters import convert_index, convert_slice
import bigframes.operations.aggregations as agg_ops
import bigframes.series as series
# Maps from python to re2
REGEXP_FLAGS = {
re.IGNORECASE: "i",
re.MULTILINE: "m",
re.DOTALL: "s",
}
T = TypeVar("T", series.Series, indices.Index)
[docs]
@log_adapter.class_logger
class StringMethods(vendorstr.StringMethods, Generic[T]):
__doc__ = vendorstr.StringMethods.__doc__
[docs]
def __init__(self, data: T):
self._data: T = data
def __getitem__(self, key: Union[int, slice]) -> T:
if isinstance(key, int):
return self._data._apply_unary_op(convert_index(key))
elif isinstance(key, slice):
return self._data._apply_unary_op(convert_slice(key))
else:
raise ValueError(f"key must be an int or slice, got {type(key).__name__}")
[docs]
def find(
self,
sub: str,
start: Optional[int] = None,
end: Optional[int] = None,
) -> T:
return self._data._apply_unary_op(
ops.StrFindOp(substr=sub, start=start, end=end)
)
[docs]
def len(self) -> T:
return self._data._apply_unary_op(ops.len_op)
[docs]
def lower(self) -> T:
return self._data._apply_unary_op(ops.lower_op)
[docs]
def reverse(self) -> T:
"""Reverse strings in the Series.
**Examples:**
>>> import bigframes.pandas as bpd
>>> s = bpd.Series(["apple", "banana", "", pd.NA])
>>> s.str.reverse()
0 elppa
1 ananab
2
3 <NA>
dtype: string
Returns:
bigframes.series.Series: A Series of booleans indicating whether the given
pattern matches the start of each string element.
"""
# reverse method is in ibis, not pandas.
return self._data._apply_unary_op(ops.reverse_op)
[docs]
def slice(
self,
start: Optional[int] = None,
stop: Optional[int] = None,
) -> T:
return self._data._apply_unary_op(ops.StrSliceOp(start=start, end=stop))
[docs]
def strip(self, to_strip: Optional[str] = None) -> T:
return self._data._apply_unary_op(
ops.StrStripOp(to_strip=" \n\t" if to_strip is None else to_strip)
)
[docs]
def upper(self) -> T:
return self._data._apply_unary_op(ops.upper_op)
[docs]
def isnumeric(self) -> T:
return self._data._apply_unary_op(ops.isnumeric_op)
[docs]
def isalpha(
self,
) -> T:
return self._data._apply_unary_op(ops.isalpha_op)
[docs]
def isdigit(
self,
) -> T:
return self._data._apply_unary_op(ops.isdigit_op)
[docs]
def isdecimal(
self,
) -> T:
return self._data._apply_unary_op(ops.isdecimal_op)
[docs]
def isalnum(
self,
) -> T:
return self._data._apply_unary_op(ops.isalnum_op)
[docs]
def isspace(
self,
) -> T:
return self._data._apply_unary_op(ops.isspace_op)
[docs]
def islower(
self,
) -> T:
return self._data._apply_unary_op(ops.islower_op)
[docs]
def isupper(
self,
) -> T:
return self._data._apply_unary_op(ops.isupper_op)
[docs]
def rstrip(self, to_strip: Optional[str] = None) -> T:
return self._data._apply_unary_op(
ops.StrRstripOp(to_strip=" \n\t" if to_strip is None else to_strip)
)
[docs]
def lstrip(self, to_strip: Optional[str] = None) -> T:
return self._data._apply_unary_op(
ops.StrLstripOp(to_strip=" \n\t" if to_strip is None else to_strip)
)
[docs]
def repeat(self, repeats: int) -> T:
return self._data._apply_unary_op(ops.StrRepeatOp(repeats=repeats))
[docs]
def capitalize(self) -> T:
return self._data._apply_unary_op(ops.capitalize_op)
[docs]
def match(self, pat, case=True, flags=0) -> T:
# \A anchors start of entire string rather than start of any line in multiline mode
adj_pat = rf"\A{pat}"
return self.contains(pat=adj_pat, case=case, flags=flags)
[docs]
def fullmatch(self, pat, case=True, flags=0) -> T:
# \A anchors start of entire string rather than start of any line in multiline mode
# \z likewise anchors to the end of the entire multiline string
adj_pat = rf"\A{pat}\z"
return self.contains(pat=adj_pat, case=case, flags=flags)
[docs]
def get(self, i: int) -> T:
return self._data._apply_unary_op(ops.StrGetOp(i=i))
[docs]
def pad(self, width, side="left", fillchar=" ") -> T:
return self._data._apply_unary_op(
ops.StrPadOp(length=width, fillchar=fillchar, side=side)
)
[docs]
def ljust(self, width, fillchar=" ") -> T:
return self._data._apply_unary_op(
ops.StrPadOp(length=width, fillchar=fillchar, side="right")
)
[docs]
def rjust(self, width, fillchar=" ") -> T:
return self._data._apply_unary_op(
ops.StrPadOp(length=width, fillchar=fillchar, side="left")
)
[docs]
def contains(
self, pat, case: bool = True, flags: int = 0, *, regex: bool = True
) -> T:
if not case:
return self.contains(pat=pat, flags=flags | re.IGNORECASE, regex=True)
if regex:
re2flags = _parse_flags(flags)
if re2flags:
pat = re2flags + pat
return self._data._apply_unary_op(ops.StrContainsRegexOp(pat=pat))
else:
return self._data._apply_unary_op(ops.StrContainsOp(pat=pat))
[docs]
def replace(
self,
pat: Union[str, re.Pattern],
repl: str,
*,
case: Optional[bool] = None,
flags: int = 0,
regex: bool = False,
) -> T:
if isinstance(pat, re.Pattern):
assert isinstance(pat.pattern, str)
pat_str = pat.pattern
flags = pat.flags | flags
else:
pat_str = pat
if case is False:
return self.replace(pat_str, repl, flags=flags | re.IGNORECASE, regex=True)
if regex:
re2flags = _parse_flags(flags)
if re2flags:
pat_str = re2flags + pat_str
return self._data._apply_unary_op(
ops.RegexReplaceStrOp(pat=pat_str, repl=repl)
)
else:
if isinstance(pat, re.Pattern):
raise ValueError(
"Must set 'regex'=True if using compiled regex pattern."
)
return self._data._apply_unary_op(ops.ReplaceStrOp(pat=pat_str, repl=repl))
[docs]
def startswith(
self,
pat: Union[str, tuple[str, ...]],
) -> T:
if not isinstance(pat, tuple):
pat = (pat,)
return self._data._apply_unary_op(ops.StartsWithOp(pat=pat))
[docs]
def endswith(
self,
pat: Union[str, tuple[str, ...]],
) -> T:
if not isinstance(pat, tuple):
pat = (pat,)
return self._data._apply_unary_op(ops.EndsWithOp(pat=pat))
[docs]
def split(
self,
pat: str = " ",
regex: Union[bool, None] = None,
) -> T:
if regex is True or (regex is None and len(pat) > 1):
raise NotImplementedError(
"Regular expressions aren't currently supported. Please set "
+ f"`regex=False` and try again. {constants.FEEDBACK_LINK}"
)
return self._data._apply_unary_op(ops.StringSplitOp(pat=pat))
[docs]
def zfill(self, width: int) -> T:
return self._data._apply_unary_op(ops.ZfillOp(width=width))
[docs]
def center(self, width: int, fillchar: str = " ") -> T:
return self._data._apply_unary_op(
ops.StrPadOp(length=width, fillchar=fillchar, side="both")
)
[docs]
def cat(
self,
others: Union[str, indices.Index, series.Series],
*,
join: Literal["outer", "left"] = "left",
) -> T:
return self._data._apply_binary_op(others, ops.strconcat_op, alignment=join)
[docs]
def join(self, sep: str) -> T:
return self._data._apply_unary_op(
ops.ArrayReduceOp(aggregation=agg_ops.StringAggOp(sep=sep))
)
[docs]
def to_blob(self, connection: Optional[str] = None) -> T:
"""Create a BigFrames Blob series from a series of URIs.
.. note::
BigFrames Blob is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of the
Service Specific Terms(https://cloud.google.com/terms/service-terms#1). Pre-GA products and features are available "as is"
and might have limited support. For more information, see the launch stage descriptions
(https://cloud.google.com/products#product-launch-stages).
Args:
connection (str or None, default None):
Connection to connect with remote service. str of the format <PROJECT_NUMBER/PROJECT_ID>.<LOCATION>.<CONNECTION_ID>.
If None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach
permission if the connection isn't fully set up.
Returns:
bigframes.series.Series: Blob Series.
"""
session = self._data._block.session
connection = session._create_bq_connection(connection=connection)
return self._data._apply_binary_op(connection, ops.obj_make_ref_op)
def _parse_flags(flags: int) -> Optional[str]:
re2flags = []
for reflag, re2flag in REGEXP_FLAGS.items():
if flags & reflag:
re2flags.append(re2flag)
flags = flags ^ reflag
# re2 handles unicode fine by default
# most compiled re in python will have unicode set
if re.U and flags:
flags = flags ^ re.U
# Remaining flags couldn't be mapped to re2 engine
if flags:
raise NotImplementedError(
f"Could not handle RegexFlag: {flags}. {constants.FEEDBACK_LINK}"
)
if re2flags:
return "(?" + "".join(re2flags) + ")"
else:
return None