# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Transformers that prepare data for other estimators. This module is styled after
scikit-learn's preprocessing module: https://scikit-learn.org/stable/modules/preprocessing.html."""
from __future__ import annotations
import typing
from typing import cast, Iterable, List, Literal, Optional, Union
import bigframes_vendored.sklearn.preprocessing._data
import bigframes_vendored.sklearn.preprocessing._discretization
import bigframes_vendored.sklearn.preprocessing._encoder
import bigframes_vendored.sklearn.preprocessing._label
import bigframes_vendored.sklearn.preprocessing._polynomial
from bigframes.core import log_adapter
import bigframes.core.utils as core_utils
from bigframes.ml import base, core, globals, utils
import bigframes.pandas as bpd
[docs]
@log_adapter.class_logger
class StandardScaler(
base.Transformer,
bigframes_vendored.sklearn.preprocessing._data.StandardScaler,
):
__doc__ = bigframes_vendored.sklearn.preprocessing._data.StandardScaler.__doc__
[docs]
def __init__(self):
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()
def _keys(self):
return (self._bqml_model,)
def _compile_to_sql(
self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None
) -> List[str]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause
Args:
X: DataFrame to transform.
columns: transform columns. If None, transform all columns in X.
Returns: a list of tuples sql_expr."""
if columns is None:
columns = X.columns
columns, _ = core_utils.get_standardized_ids(columns)
return [
self._base_sql_generator.ml_standard_scaler(
column, f"standard_scaled_{column}"
)
for column in columns
]
@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[StandardScaler, str]:
"""Parse SQL to tuple(StandardScaler, column_label).
Args:
sql: SQL string of format "ML.STANDARD_SCALER({col_label}) OVER()"
Returns:
tuple(StandardScaler, column_label)"""
col_label = sql[sql.find("(") + 1 : sql.find(")")]
return cls(), _unescape_id(col_label)
[docs]
def fit(
self,
X: utils.ArrayType,
y=None, # ignored
) -> StandardScaler:
(X,) = utils.batch_convert_to_dataframe(X)
transform_sqls = self._compile_to_sql(X)
self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)
self._extract_output_names()
return self
[docs]
@log_adapter.class_logger
class MaxAbsScaler(
base.Transformer,
bigframes_vendored.sklearn.preprocessing._data.MaxAbsScaler,
):
__doc__ = bigframes_vendored.sklearn.preprocessing._data.MaxAbsScaler.__doc__
[docs]
def __init__(self):
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()
def _keys(self):
return (self._bqml_model,)
def _compile_to_sql(
self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None
) -> List[str]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause
Args:
X: DataFrame to transform.
columns: transform columns. If None, transform all columns in X.
Returns: a list of tuples sql_expr."""
if columns is None:
columns = X.columns
columns, _ = core_utils.get_standardized_ids(columns)
return [
self._base_sql_generator.ml_max_abs_scaler(
column, f"max_abs_scaled_{column}"
)
for column in columns
]
@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[MaxAbsScaler, str]:
"""Parse SQL to tuple(MaxAbsScaler, column_label).
Args:
sql: SQL string of format "ML.MAX_ABS_SCALER({col_label}) OVER()"
Returns:
tuple(MaxAbsScaler, column_label)"""
# TODO: Use real sql parser
col_label = sql[sql.find("(") + 1 : sql.find(")")]
return cls(), _unescape_id(col_label)
[docs]
def fit(
self,
X: utils.ArrayType,
y=None, # ignored
) -> MaxAbsScaler:
(X,) = utils.batch_convert_to_dataframe(X)
transform_sqls = self._compile_to_sql(X)
self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)
self._extract_output_names()
return self
[docs]
@log_adapter.class_logger
class MinMaxScaler(
base.Transformer,
bigframes_vendored.sklearn.preprocessing._data.MinMaxScaler,
):
__doc__ = bigframes_vendored.sklearn.preprocessing._data.MinMaxScaler.__doc__
[docs]
def __init__(self):
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()
def _keys(self):
return (self._bqml_model,)
def _compile_to_sql(
self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None
) -> List[str]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause
Args:
X: DataFrame to transform.
columns: transform columns. If None, transform all columns in X.
Returns: a list of tuples sql_expr."""
if columns is None:
columns = X.columns
columns, _ = core_utils.get_standardized_ids(columns)
return [
self._base_sql_generator.ml_min_max_scaler(
column, f"min_max_scaled_{column}"
)
for column in columns
]
@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[MinMaxScaler, str]:
"""Parse SQL to tuple(MinMaxScaler, column_label).
Args:
sql: SQL string of format "ML.MIN_MAX_SCALER({col_label}) OVER()"
Returns:
tuple(MinMaxScaler, column_label)"""
# TODO: Use real sql parser
col_label = sql[sql.find("(") + 1 : sql.find(")")]
return cls(), _unescape_id(col_label)
[docs]
def fit(
self,
X: utils.ArrayType,
y=None, # ignored
) -> MinMaxScaler:
(X,) = utils.batch_convert_to_dataframe(X)
transform_sqls = self._compile_to_sql(X)
self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)
self._extract_output_names()
return self
[docs]
@log_adapter.class_logger
class KBinsDiscretizer(
base.Transformer,
bigframes_vendored.sklearn.preprocessing._discretization.KBinsDiscretizer,
):
__doc__ = (
bigframes_vendored.sklearn.preprocessing._discretization.KBinsDiscretizer.__doc__
)
[docs]
def __init__(
self,
n_bins: int = 5,
strategy: Literal["uniform", "quantile"] = "quantile",
):
if n_bins < 2:
raise ValueError(
f"n_bins has to be larger than or equal to 2, input is {n_bins}."
)
self.n_bins = n_bins
self.strategy = strategy
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()
def _keys(self):
return (self._bqml_model, self.n_bins, self.strategy)
def _compile_to_sql(
self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None
) -> List[str]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause
Args:
X: DataFrame to transform.
columns: transform columns. If None, transform all columns in X.
Returns: a list of tuples sql_expr."""
if columns is None:
columns = X.columns
columns, _ = core_utils.get_standardized_ids(columns)
array_split_points = {}
if self.strategy == "uniform":
for column in columns:
min_value = X[column].min()
max_value = X[column].max()
bin_size = (max_value - min_value) / self.n_bins
array_split_points[column] = [
min_value + i * bin_size for i in range(self.n_bins - 1)
]
return [
self._base_sql_generator.ml_bucketize(
column, array_split_points[column], f"kbinsdiscretizer_{column}"
)
for column in columns
]
elif self.strategy == "quantile":
return [
self._base_sql_generator.ml_quantile_bucketize(
column, self.n_bins, f"kbinsdiscretizer_{column}"
)
for column in columns
]
else:
raise ValueError(
f"strategy should be set 'quantile' or 'uniform', but your input is {self.strategy}."
)
@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[KBinsDiscretizer, str]:
"""Parse SQL to tuple(KBinsDiscretizer, column_label).
Args:
sql: SQL string of format "ML.BUCKETIZE({col_label}, array_split_points, FALSE)"
or ML.QUANTILE_BUCKETIZE({col_label}, num_bucket) OVER()"
Returns:
tuple(KBinsDiscretizer, column_label)"""
s = sql[sql.find("(") + 1 : sql.find(")")]
col_label = s[: s.find(",")]
if sql.startswith("ML.QUANTILE_BUCKETIZE"):
num_bins = s.split(",")[1]
return cls(int(num_bins), "quantile"), _unescape_id(col_label)
else:
array_split_points = s[s.find("[") + 1 : s.find("]")]
n_bins = array_split_points.count(",") + 2
return cls(n_bins, "uniform"), _unescape_id(col_label)
[docs]
def fit(
self,
X: utils.ArrayType,
y=None, # ignored
) -> KBinsDiscretizer:
(X,) = utils.batch_convert_to_dataframe(X)
transform_sqls = self._compile_to_sql(X)
self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)
self._extract_output_names()
return self
[docs]
@log_adapter.class_logger
class OneHotEncoder(
base.Transformer,
bigframes_vendored.sklearn.preprocessing._encoder.OneHotEncoder,
):
# BQML max value https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-one-hot-encoder#syntax
TOP_K_DEFAULT = 1000000
FREQUENCY_THRESHOLD_DEFAULT = 0
__doc__ = bigframes_vendored.sklearn.preprocessing._encoder.OneHotEncoder.__doc__
# All estimators must implement __init__ to document their parameters, even
# if they don't have any
[docs]
def __init__(
self,
drop: Optional[Literal["most_frequent"]] = None,
min_frequency: Optional[int] = None,
max_categories: Optional[int] = None,
):
if max_categories is not None and max_categories < 2:
raise ValueError(
f"max_categories has to be larger than or equal to 2, input is {max_categories}."
)
self.drop = drop
self.min_frequency = min_frequency
self.max_categories = max_categories
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()
def _keys(self):
return (self._bqml_model, self.drop, self.min_frequency, self.max_categories)
def _compile_to_sql(
self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None
) -> List[str]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause
Args:
X: DataFrame to transform.
columns: transform columns. If None, transform all columns in X.
Returns: a list of tuples sql_expr."""
if columns is None:
columns = X.columns
columns, _ = core_utils.get_standardized_ids(columns)
drop = self.drop if self.drop is not None else "none"
# minus one here since BQML's implementation always includes index 0, and top_k is on top of that.
top_k = (
(self.max_categories - 1)
if self.max_categories is not None
else OneHotEncoder.TOP_K_DEFAULT
)
frequency_threshold = (
self.min_frequency
if self.min_frequency is not None
else OneHotEncoder.FREQUENCY_THRESHOLD_DEFAULT
)
return [
self._base_sql_generator.ml_one_hot_encoder(
column, drop, top_k, frequency_threshold, f"onehotencoded_{column}"
)
for column in columns
]
@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[OneHotEncoder, str]:
"""Parse SQL to tuple(OneHotEncoder, column_label).
Args:
sql: SQL string of format "ML.ONE_HOT_ENCODER({col_label}, '{drop}', {top_k}, {frequency_threshold}) OVER() "
Returns:
tuple(OneHotEncoder, column_label)"""
s = sql[sql.find("(") + 1 : sql.find(")")]
col_label, drop_str, top_k, frequency_threshold = s.split(", ")
drop = (
cast(Literal["most_frequent"], "most_frequent")
if drop_str.lower() == "'most_frequent'"
else None
)
max_categories = int(top_k) + 1
min_frequency = int(frequency_threshold)
return cls(drop, min_frequency, max_categories), _unescape_id(col_label)
[docs]
def fit(
self,
X: utils.ArrayType,
y=None, # ignored
) -> OneHotEncoder:
(X,) = utils.batch_convert_to_dataframe(X)
transform_sqls = self._compile_to_sql(X)
self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)
self._extract_output_names()
return self
[docs]
@log_adapter.class_logger
class LabelEncoder(
base.LabelTransformer,
bigframes_vendored.sklearn.preprocessing._label.LabelEncoder,
):
# BQML max value https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-one-hot-encoder#syntax
TOP_K_DEFAULT = 1000000
FREQUENCY_THRESHOLD_DEFAULT = 0
__doc__ = bigframes_vendored.sklearn.preprocessing._label.LabelEncoder.__doc__
# All estimators must implement __init__ to document their parameters, even
# if they don't have any
[docs]
def __init__(
self,
min_frequency: Optional[int] = None,
max_categories: Optional[int] = None,
):
if max_categories is not None and max_categories < 2:
raise ValueError(
f"max_categories has to be larger than or equal to 2, input is {max_categories}."
)
self.min_frequency = min_frequency
self.max_categories = max_categories
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()
def _keys(self):
return (self._bqml_model, self.min_frequency, self.max_categories)
def _compile_to_sql(
self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None
) -> List[str]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause
Args:
X: DataFrame to transform.
columns: transform columns. If None, transform all columns in X.
Returns: a list of tuples sql_expr."""
if columns is None:
columns = X.columns
columns, _ = core_utils.get_standardized_ids(columns)
# minus one here since BQML's inplimentation always includes index 0, and top_k is on top of that.
top_k = (
(self.max_categories - 1)
if self.max_categories is not None
else LabelEncoder.TOP_K_DEFAULT
)
frequency_threshold = (
self.min_frequency
if self.min_frequency is not None
else LabelEncoder.FREQUENCY_THRESHOLD_DEFAULT
)
return [
self._base_sql_generator.ml_label_encoder(
column, top_k, frequency_threshold, f"labelencoded_{column}"
)
for column in columns
]
@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[LabelEncoder, str]:
"""Parse SQL to tuple(LabelEncoder, column_label).
Args:
sql: SQL string of format "ML.LabelEncoder({col_label}, {top_k}, {frequency_threshold}) OVER() "
Returns:
tuple(LabelEncoder, column_label)"""
s = sql[sql.find("(") + 1 : sql.find(")")]
col_label, top_k, frequency_threshold = s.split(", ")
max_categories = int(top_k) + 1
min_frequency = int(frequency_threshold)
return cls(min_frequency, max_categories), _unescape_id(col_label)
[docs]
def fit(
self,
y: utils.ArrayType,
) -> LabelEncoder:
(y,) = utils.batch_convert_to_dataframe(y)
transform_sqls = self._compile_to_sql(y)
self._bqml_model = self._bqml_model_factory.create_model(
y,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)
self._extract_output_names()
return self
[docs]
@log_adapter.class_logger
class PolynomialFeatures(
base.Transformer,
bigframes_vendored.sklearn.preprocessing._polynomial.PolynomialFeatures,
):
__doc__ = (
bigframes_vendored.sklearn.preprocessing._polynomial.PolynomialFeatures.__doc__
)
[docs]
def __init__(self, degree: int = 2):
if degree not in range(1, 5):
raise ValueError(f"degree has to be [1, 4], input is {degree}.")
self.degree = degree
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()
def _keys(self):
return (self._bqml_model, self.degree)
def _compile_to_sql(
self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None
) -> List[str]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause
Args:
X: DataFrame to transform.
columns: transform columns. If None, transform all columns in X.
Returns: a list of tuples sql_expr."""
if columns is None:
columns = X.columns
columns, _ = core_utils.get_standardized_ids(columns)
output_name = "poly_feat"
return [
self._base_sql_generator.ml_polynomial_expand(
columns, self.degree, output_name
)
]
@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[PolynomialFeatures, tuple[str, ...]]:
"""Parse SQL to tuple(PolynomialFeatures, column_labels).
Args:
sql: SQL string of format "ML.POLYNOMIAL_EXPAND(STRUCT(col_label0, col_label1, ...), degree)"
Returns:
tuple(MaxAbsScaler, column_label)"""
col_labels = sql[sql.find("STRUCT(") + 7 : sql.find(")")].split(",")
col_labels = [label.strip() for label in col_labels]
degree = int(sql[sql.rfind(",") + 1 : sql.rfind(")")])
return cls(degree), tuple(map(_unescape_id, col_labels))
[docs]
def fit(
self,
X: utils.ArrayType,
y=None, # ignored
) -> PolynomialFeatures:
(X,) = utils.batch_convert_to_dataframe(X)
transform_sqls = self._compile_to_sql(X)
self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)
self._extract_output_names()
return self
def _unescape_id(id: str) -> str:
"""Very simple conversion to removed ` characters from ids.
A proper sql parser should be used instead.
"""
return id.removeprefix("`").removesuffix("`")
PreprocessingType = Union[
OneHotEncoder,
StandardScaler,
MaxAbsScaler,
MinMaxScaler,
KBinsDiscretizer,
LabelEncoder,
PolynomialFeatures,
]