Source code for bigframes.ml.compose

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Build composite transformers on heterogeneous data. This module is styled
after scikit-Learn's compose module:
https://scikit-learn.org/stable/modules/classes.html#module-sklearn.compose."""

from __future__ import annotations

import re
import types
import typing
from typing import cast, Iterable, List, Optional, Set, Tuple, Union

from bigframes_vendored import constants
import bigframes_vendored.sklearn.compose._column_transformer
from google.cloud import bigquery

from bigframes.core import log_adapter
import bigframes.core.compile.googlesql as sql_utils
import bigframes.core.utils as core_utils
from bigframes.ml import base, core, globals, impute, preprocessing, utils
import bigframes.pandas as bpd

_BQML_TRANSFROM_TYPE_MAPPING = types.MappingProxyType(
    {
        "ML.STANDARD_SCALER": preprocessing.StandardScaler,
        "ML.ONE_HOT_ENCODER": preprocessing.OneHotEncoder,
        "ML.MAX_ABS_SCALER": preprocessing.MaxAbsScaler,
        "ML.MIN_MAX_SCALER": preprocessing.MinMaxScaler,
        "ML.BUCKETIZE": preprocessing.KBinsDiscretizer,
        "ML.QUANTILE_BUCKETIZE": preprocessing.KBinsDiscretizer,
        "ML.LABEL_ENCODER": preprocessing.LabelEncoder,
        "ML.POLYNOMIAL_EXPAND": preprocessing.PolynomialFeatures,
        "ML.IMPUTER": impute.SimpleImputer,
    }
)


[docs] class SQLScalarColumnTransformer: r""" Wrapper for plain SQL code contained in a ColumnTransformer. Create a single column transformer in plain sql. This transformer can only be used inside ColumnTransformer. When creating an instance '{0}' can be used as placeholder for the column to transform: SQLScalarColumnTransformer("{0}+1") The default target column gets the prefix 'transformed\_' but can also be changed when creating an instance: SQLScalarColumnTransformer("{0}+1", "inc_{0}") **Examples:** >>> from bigframes.ml.compose import ColumnTransformer, SQLScalarColumnTransformer >>> import bigframes.pandas as bpd >>> df = bpd.DataFrame({'name': ["James", None, "Mary"], 'city': ["New York", "Boston", None]}) >>> col_trans = ColumnTransformer([ ... ("strlen", ... SQLScalarColumnTransformer("CASE WHEN {0} IS NULL THEN 15 ELSE LENGTH({0}) END"), ... ['name', 'city']), ... ]) >>> col_trans = col_trans.fit(df) >>> df_transformed = col_trans.transform(df) >>> df_transformed transformed_name transformed_city 0 5 8 1 15 6 2 4 15 <BLANKLINE> [3 rows x 2 columns] SQLScalarColumnTransformer can be combined with other transformers, like StandardScaler: >>> col_trans = ColumnTransformer([ ... ("identity", SQLScalarColumnTransformer("{0}", target_column="{0}"), ["col1", "col5"]), ... ("increment", SQLScalarColumnTransformer("{0}+1", target_column="inc_{0}"), "col2"), ... ("stdscale", preprocessing.StandardScaler(), "col3"), ... # ... ... ]) """
[docs] def __init__(self, sql: str, target_column: str = "transformed_{0}"): super().__init__() self._sql = sql # TODO: More robust unescaping self._target_column = target_column.replace("`", "")
def _compile_to_sql( self, X: bpd.DataFrame, columns: Optional[Iterable[str]] = None ) -> List[str]: if columns is None: columns = X.columns columns, _ = core_utils.get_standardized_ids(columns) result = [] for column in columns: current_sql = self._sql.format(sql_utils.identifier(column)) current_target_column = sql_utils.identifier( self._target_column.format(column) ) result.append(f"{current_sql} AS {current_target_column}") return result def __repr__(self): return f"SQLScalarColumnTransformer(sql='{self._sql}', target_column='{self._target_column}')" def __eq__(self, other) -> bool: return type(self) is type(other) and self._keys() == other._keys() def __hash__(self) -> int: return hash(self._keys()) def _keys(self): return (self._sql, self._target_column)
# Type hints for transformers contained in ColumnTransformer SingleColTransformer = Union[ preprocessing.PreprocessingType, impute.SimpleImputer, SQLScalarColumnTransformer, ]
[docs] @log_adapter.class_logger class ColumnTransformer( base.Transformer, bigframes_vendored.sklearn.compose._column_transformer.ColumnTransformer, ): __doc__ = ( bigframes_vendored.sklearn.compose._column_transformer.ColumnTransformer.__doc__ )
[docs] def __init__( self, transformers: Iterable[ Tuple[ str, SingleColTransformer, Union[str, Iterable[str]], ] ], ): # TODO: if any(transformers) has fitted raise warning self.transformers = list(transformers) self._bqml_model: Optional[core.BqmlModel] = None self._bqml_model_factory = globals.bqml_model_factory() # call self.transformers_ to check chained transformers self.transformers_
def _keys(self): return (self.transformers, self._bqml_model) @property def transformers_( self, ) -> List[Tuple[str, SingleColTransformer, str,]]: """The collection of transformers as tuples of (name, transformer, column).""" result: List[ Tuple[ str, SingleColTransformer, str, ] ] = [] for entry in self.transformers: name, transformer, column_or_columns = entry columns = ( column_or_columns if isinstance(column_or_columns, List) else [column_or_columns] ) for column in columns: result.append((name, transformer, column)) return result AS_FLEXNAME_SUFFIX_RX = re.compile("^(.*)\\bAS\\s*`[^`]+`\\s*$", re.IGNORECASE) @classmethod def _extract_from_bq_model( cls, bq_model: bigquery.Model, ) -> ColumnTransformer: """Extract transformers as ColumnTransformer obj from a BQ Model. Keep the _bqml_model field as None.""" assert "transformColumns" in bq_model._properties transformers_set: Set[ Tuple[ str, SingleColTransformer, Union[str, List[str]], ] ] = set() def camel_to_snake(name): name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower() output_names = [] for transform_col in bq_model._properties["transformColumns"]: transform_col_dict = cast(dict, transform_col) # pass the columns that are not transformed if "transformSql" not in transform_col_dict: continue transform_sql: str = transform_col_dict["transformSql"] # workaround for bug in bq_model returning " AS `...`" suffix for flexible names flex_name_match = cls.AS_FLEXNAME_SUFFIX_RX.match(transform_sql) if flex_name_match: transform_sql = flex_name_match.group(1) output_names.append(transform_col_dict["name"]) found_transformer = False for prefix in _BQML_TRANSFROM_TYPE_MAPPING: if transform_sql.startswith(prefix): transformer_cls = _BQML_TRANSFROM_TYPE_MAPPING[prefix] transformers_set.add( ( camel_to_snake(transformer_cls.__name__), # TODO: This is very fragile, use real SQL parser *transformer_cls._parse_from_sql(transform_sql), # type: ignore ) ) found_transformer = True break if not found_transformer: if transform_sql.startswith("ML."): raise NotImplementedError( f"Unsupported transformer type. {constants.FEEDBACK_LINK}" ) target_column = transform_col_dict["name"] sql_transformer = SQLScalarColumnTransformer( transform_sql.strip(), target_column=target_column ) input_column_name = f"?{target_column}" transformers_set.add( ( camel_to_snake(sql_transformer.__class__.__name__), sql_transformer, input_column_name, ) ) transformer = cls(transformers=list(transformers_set)) transformer._output_names = output_names return transformer def _merge( self, bq_model: bigquery.Model ) -> Union[ ColumnTransformer, Union[preprocessing.PreprocessingType, impute.SimpleImputer] ]: """Try to merge the column transformer to a simple transformer. Depends on all the columns in bq_model are transformed with the same transformer.""" transformers = self.transformers assert len(transformers) > 0 _, transformer_0, column_0 = transformers[0] if isinstance(transformer_0, SQLScalarColumnTransformer): return self # SQLScalarColumnTransformer only work inside ColumnTransformer feature_columns_sorted = sorted( [ cast(str, feature_column.name) for feature_column in bq_model.feature_columns ] ) if ( len(transformers) == 1 and isinstance(transformer_0, preprocessing.PolynomialFeatures) and sorted(column_0) == feature_columns_sorted ): transformer_0._output_names = self._output_names return transformer_0 if not isinstance(column_0, str): return self columns = [column_0] for _, transformer, column in transformers[1:]: if not isinstance(column, str): return self # all transformers are the same if transformer != transformer_0: return self columns.append(column) # all feature columns are transformed if sorted(columns) == feature_columns_sorted: transformer_0._output_names = self._output_names return transformer_0 return self def _compile_to_sql( self, X: bpd.DataFrame, ) -> List[str]: """Compile this transformer to a list of SQL expressions that can be included in a BQML TRANSFORM clause Args: X: DataFrame to transform. Returns: a list of sql_expr.""" result = [] for _, transformer, target_columns in self.transformers: if isinstance(target_columns, str): target_columns = [target_columns] result += transformer._compile_to_sql(X, target_columns) return result
[docs] def fit( self, X: utils.ArrayType, y=None, # ignored ) -> ColumnTransformer: (X,) = utils.batch_convert_to_dataframe(X) transform_sqls = self._compile_to_sql(X) self._bqml_model = self._bqml_model_factory.create_model( X, options={"model_type": "transform_only"}, transforms=transform_sqls, ) self._extract_output_names() return self
[docs] def transform(self, X: utils.ArrayType) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("Must be fitted before transform") (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) df = self._bqml_model.transform(X) return typing.cast( bpd.DataFrame, df[self._output_names], )