Source code for bigframes.ml.decomposition

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Matrix Decomposition models. This module is styled after Scikit-Learn's decomposition module:
https://scikit-learn.org/stable/modules/decomposition.html."""

from __future__ import annotations

from typing import List, Literal, Optional, Union

import bigframes_vendored.sklearn.decomposition._mf
import bigframes_vendored.sklearn.decomposition._pca
from google.cloud import bigquery

from bigframes.core import log_adapter
from bigframes.ml import base, core, globals, utils
import bigframes.pandas as bpd
import bigframes.session

_BQML_PARAMS_MAPPING = {
    "svd_solver": "pcaSolver",
    "feedback_type": "feedbackType",
    "num_factors": "numFactors",
    "user_col": "userColumn",
    "item_col": "itemColumn",
    "_input_label_columns": "inputLabelColumns",
    "l2_reg": "l2Regularization",
}


[docs] @log_adapter.class_logger class PCA( base.UnsupervisedTrainablePredictor, bigframes_vendored.sklearn.decomposition._pca.PCA, ): __doc__ = bigframes_vendored.sklearn.decomposition._pca.PCA.__doc__
[docs] def __init__( self, n_components: Optional[Union[int, float]] = None, *, svd_solver: Literal["full", "randomized", "auto"] = "auto", ): self.n_components = n_components self.svd_solver = svd_solver self._bqml_model: Optional[core.BqmlModel] = None self._bqml_model_factory = globals.bqml_model_factory()
@classmethod def _from_bq( cls, session: bigframes.session.Session, bq_model: bigquery.Model ) -> PCA: assert bq_model.model_type == "PCA" kwargs = utils.retrieve_params_from_bq_model( cls, bq_model, _BQML_PARAMS_MAPPING ) last_fitting = bq_model.training_runs[-1]["trainingOptions"] if "numPrincipalComponents" in last_fitting: kwargs["n_components"] = int(last_fitting["numPrincipalComponents"]) elif "pcaExplainedVarianceRatio" in last_fitting: kwargs["n_components"] = float(last_fitting["pcaExplainedVarianceRatio"]) model = cls(**kwargs) model._bqml_model = core.BqmlModel(session, bq_model) return model @property def _bqml_options(self) -> dict: """The model options as they will be set for BQML""" options: dict = { "model_type": "PCA", "pca_solver": self.svd_solver, } assert self.n_components is not None if 0 < self.n_components < 1: options["pca_explained_variance_ratio"] = float(self.n_components) elif self.n_components >= 1: options["num_principal_components"] = int(self.n_components) return options def _fit( self, X: utils.ArrayType, y=None, transforms: Optional[List[str]] = None, ) -> PCA: (X,) = utils.batch_convert_to_dataframe(X) # To mimic sklearn's behavior if self.n_components is None: self.n_components = min(X.shape) self._bqml_model = self._bqml_model_factory.create_model( X_train=X, transforms=transforms, options=self._bqml_options, ) return self @property def components_(self) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before calling components_.") return self._bqml_model.principal_components() @property def explained_variance_(self) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError( "A model must be fitted before calling explained_variance_." ) return self._bqml_model.principal_component_info()[ ["principal_component_id", "eigenvalue"] ].rename(columns={"eigenvalue": "explained_variance"}) @property def explained_variance_ratio_(self) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError( "A model must be fitted before calling explained_variance_ratio_." ) return self._bqml_model.principal_component_info()[ ["principal_component_id", "explained_variance_ratio"] ]
[docs] def predict(self, X: utils.ArrayType) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before predict") (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) return self._bqml_model.predict(X)
[docs] def detect_anomalies( self, X: utils.ArrayType, *, contamination: float = 0.1, ) -> bpd.DataFrame: """Detect the anomaly data points of the input. Args: X (bigframes.dataframe.DataFrame or bigframes.series.Series): Series or a DataFrame to detect anomalies. contamination (float, default 0.1): Identifies the proportion of anomalies in the training dataset that are used to create the model. The value must be in the range [0, 0.5]. Returns: bigframes.dataframe.DataFrame: detected DataFrame.""" if contamination < 0.0 or contamination > 0.5: raise ValueError( f"contamination must be [0.0, 0.5], but is {contamination}." ) if not self._bqml_model: raise RuntimeError("A model must be fitted before detect_anomalies") (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) return self._bqml_model.detect_anomalies( X, options={"contamination": contamination} )
[docs] def to_gbq(self, model_name: str, replace: bool = False) -> PCA: """Save the model to BigQuery. Args: model_name (str): The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: PCA: Saved model.""" if not self._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") new_model = self._bqml_model.copy(model_name, replace) return new_model.session.read_gbq_model(model_name)
[docs] def score( self, X=None, y=None, ) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before score") # TODO(b/291973741): X param is ignored. Update BQML supports input in ML.EVALUATE. return self._bqml_model.evaluate()
[docs] @log_adapter.class_logger class MatrixFactorization( base.UnsupervisedTrainablePredictor, bigframes_vendored.sklearn.decomposition._mf.MatrixFactorization, ): __doc__ = bigframes_vendored.sklearn.decomposition._mf.MatrixFactorization.__doc__
[docs] def __init__( self, *, feedback_type: Literal["explicit", "implicit"] = "explicit", num_factors: int, user_col: str, item_col: str, rating_col: str = "rating", # TODO: Add support for hyperparameter tuning. l2_reg: float = 1.0, ): feedback_type = feedback_type.lower() # type: ignore if feedback_type not in ("explicit", "implicit"): raise ValueError("Expected feedback_type to be `explicit` or `implicit`.") self.feedback_type = feedback_type if not isinstance(num_factors, int): raise TypeError( f"Expected num_factors to be an int, but got {type(num_factors)}." ) if num_factors < 0: raise ValueError( f"Expected num_factors to be a positive integer, but got {num_factors}." ) self.num_factors = num_factors if not isinstance(user_col, str): raise TypeError(f"Expected user_col to be a str, but got {type(user_col)}.") self.user_col = user_col if not isinstance(item_col, str): raise TypeError(f"Expected item_col to be STR, but got {type(item_col)}.") self.item_col = item_col if not isinstance(rating_col, str): raise TypeError( f"Expected rating_col to be a str, but got {type(rating_col)}." ) self._input_label_columns = [rating_col] if not isinstance(l2_reg, (float, int)): raise TypeError( f"Expected l2_reg to be a float or int, but got {type(l2_reg)}." ) self.l2_reg = l2_reg self._bqml_model: Optional[core.BqmlModel] = None self._bqml_model_factory = globals.bqml_model_factory()
@property def rating_col(self) -> str: """str: The rating column name. Defaults to 'rating'.""" return self._input_label_columns[0] @classmethod def _from_bq( cls, session: bigframes.session.Session, bq_model: bigquery.Model ) -> MatrixFactorization: assert bq_model.model_type == "MATRIX_FACTORIZATION" kwargs = utils.retrieve_params_from_bq_model( cls, bq_model, _BQML_PARAMS_MAPPING ) model = cls(**kwargs) model._bqml_model = core.BqmlModel(session, bq_model) return model @property def _bqml_options(self) -> dict: """The model options as they will be set for BQML""" options: dict = { "model_type": "matrix_factorization", "feedback_type": self.feedback_type, "user_col": self.user_col, "item_col": self.item_col, "rating_col": self.rating_col, "l2_reg": self.l2_reg, } if self.num_factors is not None: options["num_factors"] = self.num_factors return options def _fit( self, X: utils.ArrayType, y=None, transforms: Optional[List[str]] = None, ) -> MatrixFactorization: if y is not None: raise ValueError( "Label column not supported for Matrix Factorization model but y was not `None`" ) (X,) = utils.batch_convert_to_dataframe(X) self._bqml_model = self._bqml_model_factory.create_model( X_train=X, transforms=transforms, options=self._bqml_options, ) return self
[docs] def predict(self, X: utils.ArrayType) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before recommend") (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) return self._bqml_model.recommend(X)
[docs] def to_gbq(self, model_name: str, replace: bool = False) -> MatrixFactorization: """Save the model to BigQuery. Args: model_name (str): The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: MatrixFactorization: Saved model.""" if not self._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") new_model = self._bqml_model.copy(model_name, replace) return new_model.session.read_gbq_model(model_name)
[docs] def score( self, X=None, y=None, ) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before score") if X is not None and y is not None: X, y = utils.batch_convert_to_dataframe( X, y, session=self._bqml_model.session ) input_data = X.join(y, how="outer") else: input_data = X return self._bqml_model.evaluate(input_data)