Source code for bigframes.ml.linear_model

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Linear models. This module is styled after scikit-learn's linear_model module:
https://scikit-learn.org/stable/modules/linear_model.html."""

from __future__ import annotations

from typing import Dict, List, Literal, Optional, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.sklearn.linear_model._base
import bigframes_vendored.sklearn.linear_model._logistic
from google.cloud import bigquery

from bigframes.core import log_adapter
from bigframes.ml import base, core, globals, utils
import bigframes.pandas as bpd
import bigframes.session

_BQML_PARAMS_MAPPING = {
    "optimize_strategy": "optimizationStrategy",
    "fit_intercept": "fitIntercept",
    "l1_reg": "l1Regularization",
    "l2_reg": "l2Regularization",
    "max_iterations": "maxIterations",
    "learning_rate_strategy": "learnRateStrategy",
    "learning_rate": "learnRate",
    "tol": "minRelativeProgress",
    "ls_init_learning_rate": "initialLearnRate",
    "warm_start": "warmStart",
    "calculate_p_values": "calculatePValues",
    "enable_global_explain": "enableGlobalExplain",
}


[docs] @log_adapter.class_logger class LinearRegression( base.SupervisedTrainableWithEvaluationPredictor, bigframes_vendored.sklearn.linear_model._base.LinearRegression, ): __doc__ = bigframes_vendored.sklearn.linear_model._base.LinearRegression.__doc__
[docs] def __init__( self, *, optimize_strategy: Literal[ "auto_strategy", "batch_gradient_descent", "normal_equation" ] = "auto_strategy", fit_intercept: bool = True, l1_reg: Optional[float] = None, l2_reg: float = 0.0, max_iterations: int = 20, warm_start: bool = False, learning_rate: Optional[float] = None, learning_rate_strategy: Literal["line_search", "constant"] = "line_search", tol: float = 0.01, ls_init_learning_rate: Optional[float] = None, calculate_p_values: bool = False, enable_global_explain: bool = False, ): self.optimize_strategy = optimize_strategy self.fit_intercept = fit_intercept self.l1_reg = l1_reg self.l2_reg = l2_reg self.max_iterations = max_iterations self.warm_start = warm_start self.learning_rate = learning_rate self.learning_rate_strategy = learning_rate_strategy self.tol = tol self.ls_init_learning_rate = ls_init_learning_rate self.calculate_p_values = calculate_p_values self.enable_global_explain = enable_global_explain self._bqml_model: Optional[core.BqmlModel] = None self._bqml_model_factory = globals.bqml_model_factory()
@classmethod def _from_bq( cls, session: bigframes.session.Session, bq_model: bigquery.Model ) -> LinearRegression: assert bq_model.model_type == "LINEAR_REGRESSION" kwargs = utils.retrieve_params_from_bq_model( cls, bq_model, _BQML_PARAMS_MAPPING ) model = cls(**kwargs) model._bqml_model = core.BqmlModel(session, bq_model) return model @property def _bqml_options(self) -> dict: """The model options as they will be set for BQML""" options = { "model_type": "LINEAR_REG", "data_split_method": "NO_SPLIT", "optimize_strategy": self.optimize_strategy, "fit_intercept": self.fit_intercept, "l2_reg": self.l2_reg, "max_iterations": self.max_iterations, "learn_rate_strategy": self.learning_rate_strategy, "min_rel_progress": self.tol, "calculate_p_values": self.calculate_p_values, "enable_global_explain": self.enable_global_explain, } if self.l1_reg is not None: options["l1_reg"] = self.l1_reg if self.learning_rate is not None: options["learn_rate"] = self.learning_rate if self.ls_init_learning_rate is not None: options["ls_init_learn_rate"] = self.ls_init_learning_rate # Even presenting warm_start returns error for NORMAL_EQUATION optimizer if self.warm_start: options["warm_start"] = self.warm_start return options def _fit( self, X: utils.ArrayType, y: utils.ArrayType, transforms: Optional[List[str]] = None, X_eval: Optional[utils.ArrayType] = None, y_eval: Optional[utils.ArrayType] = None, ) -> LinearRegression: X, y = utils.batch_convert_to_dataframe(X, y) bqml_options = self._bqml_options if X_eval is not None and y_eval is not None: X_eval, y_eval = utils.batch_convert_to_dataframe(X_eval, y_eval) X, y, bqml_options = utils.combine_training_and_evaluation_data( X, y, X_eval, y_eval, bqml_options ) self._bqml_model = self._bqml_model_factory.create_model( X, y, transforms=transforms, options=bqml_options, ) return self
[docs] def predict(self, X: utils.ArrayType) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before predict") (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) return self._bqml_model.predict(X)
[docs] def predict_explain( self, X: utils.ArrayType, *, top_k_features: int = 5, ) -> bpd.DataFrame: """ Explain predictions for a linear regression model. .. note:: Output matches that of the BigQuery ML.EXPLAIN_PREDICT function. See: https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-explain-predict Args: X (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): Series or a DataFrame to explain its predictions. top_k_features (int, default 5): an INT64 value that specifies how many top feature attribution pairs are generated for each row of input data. The features are ranked by the absolute values of their attributions. By default, top_k_features is set to 5. If its value is greater than the number of features in the training data, the attributions of all features are returned. Returns: bigframes.pandas.DataFrame: The predicted DataFrames with explanation columns. """ if top_k_features < 1: raise ValueError( f"top_k_features must be at least 1, but is {top_k_features}." ) if not self._bqml_model: raise RuntimeError("A model must be fitted before predict") (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) return self._bqml_model.explain_predict( X, options={"top_k_features": top_k_features} )
[docs] def global_explain( self, ) -> bpd.DataFrame: """ Provide explanations for an entire linear regression model. .. note:: Output matches that of the BigQuery ML.GLOBAL_EXPLAIN function. See: https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-global-explain Returns: bigframes.pandas.DataFrame: Dataframes containing feature importance values and corresponding attributions, designed to provide a global explanation of feature influence. """ if not self._bqml_model: raise RuntimeError("A model must be fitted before predict") return self._bqml_model.global_explain({})
[docs] def score( self, X: utils.ArrayType, y: utils.ArrayType, ) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before score") X, y = utils.batch_convert_to_dataframe(X, y, session=self._bqml_model.session) input_data = X.join(y, how="outer") return self._bqml_model.evaluate(input_data)
[docs] def to_gbq(self, model_name: str, replace: bool = False) -> LinearRegression: """Save the model to BigQuery. Args: model_name (str): The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: LinearRegression: Saved model.""" if not self._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") new_model = self._bqml_model.copy(model_name, replace) return new_model.session.read_gbq_model(model_name)
[docs] @log_adapter.class_logger class LogisticRegression( base.SupervisedTrainableWithEvaluationPredictor, bigframes_vendored.sklearn.linear_model._logistic.LogisticRegression, ): __doc__ = ( bigframes_vendored.sklearn.linear_model._logistic.LogisticRegression.__doc__ ) # TODO(ashleyxu) support class_weight in the constructor.
[docs] def __init__( self, *, optimize_strategy: Literal[ "auto_strategy", "batch_gradient_descent" ] = "auto_strategy", fit_intercept: bool = True, l1_reg: Optional[float] = None, l2_reg: float = 0.0, max_iterations: int = 20, warm_start: bool = False, learning_rate: Optional[float] = None, learning_rate_strategy: Literal["line_search", "constant"] = "line_search", tol: float = 0.01, ls_init_learning_rate: Optional[float] = None, calculate_p_values: bool = False, enable_global_explain: bool = False, class_weight: Optional[Union[Literal["balanced"], Dict[str, float]]] = None, ): self.optimize_strategy = optimize_strategy self.fit_intercept = fit_intercept self.l1_reg = l1_reg self.l2_reg = l2_reg self.max_iterations = max_iterations self.warm_start = warm_start self.learning_rate = learning_rate self.learning_rate_strategy = learning_rate_strategy self.tol = tol self.ls_init_learning_rate = ls_init_learning_rate self.calculate_p_values = calculate_p_values self.enable_global_explain = enable_global_explain self.class_weight = class_weight self._auto_class_weight = class_weight == "balanced" self._bqml_model: Optional[core.BqmlModel] = None self._bqml_model_factory = globals.bqml_model_factory()
@classmethod def _from_bq( cls, session: bigframes.session.Session, bq_model: bigquery.Model ) -> LogisticRegression: assert bq_model.model_type == "LOGISTIC_REGRESSION" kwargs = utils.retrieve_params_from_bq_model( cls, bq_model, _BQML_PARAMS_MAPPING ) last_fitting = bq_model.training_runs[-1]["trainingOptions"] if last_fitting["autoClassWeights"]: kwargs["class_weight"] = "balanced" # TODO(ashleyxu) support class_weight in the constructor. # if "labelClassWeights" in last_fitting: # kwargs["class_weight"] = last_fitting["labelClassWeights"] model = cls(**kwargs) model._bqml_model = core.BqmlModel(session, bq_model) return model @property def _bqml_options(self) -> dict: """The model options as they will be set for BQML""" options = { "model_type": "LOGISTIC_REG", "data_split_method": "NO_SPLIT", "fit_intercept": self.fit_intercept, "auto_class_weights": self._auto_class_weight, "optimize_strategy": self.optimize_strategy, "l2_reg": self.l2_reg, "max_iterations": self.max_iterations, "learn_rate_strategy": self.learning_rate_strategy, "min_rel_progress": self.tol, "calculate_p_values": self.calculate_p_values, "enable_global_explain": self.enable_global_explain, # TODO(ashleyxu): support class_weight (struct array as dict in our API) # "class_weight": self.class_weight, } if self.l1_reg is not None: options["l1_reg"] = self.l1_reg if self.learning_rate is not None: options["learn_rate"] = self.learning_rate if self.ls_init_learning_rate is not None: options["ls_init_learn_rate"] = self.ls_init_learning_rate # Even presenting warm_start returns error for NORMAL_EQUATION optimizer if self.warm_start: options["warm_start"] = self.warm_start return options def _fit( self, X: utils.ArrayType, y: utils.ArrayType, transforms: Optional[List[str]] = None, X_eval: Optional[utils.ArrayType] = None, y_eval: Optional[utils.ArrayType] = None, ) -> LogisticRegression: X, y = utils.batch_convert_to_dataframe(X, y) bqml_options = self._bqml_options if X_eval is not None and y_eval is not None: X_eval, y_eval = utils.batch_convert_to_dataframe(X_eval, y_eval) X, y, bqml_options = utils.combine_training_and_evaluation_data( X, y, X_eval, y_eval, bqml_options ) self._bqml_model = self._bqml_model_factory.create_model( X, y, transforms=transforms, options=bqml_options, ) return self
[docs] def predict( self, X: utils.ArrayType, ) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before predict") (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) return self._bqml_model.predict(X)
[docs] def predict_explain( self, X: utils.ArrayType, *, top_k_features: int = 5, ) -> bpd.DataFrame: """ Explain predictions for a logistic regression model. .. note:: Output matches that of the BigQuery ML.EXPLAIN_PREDICT function. See: https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-explain-predict Args: X (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): Series or a DataFrame to explain its predictions. top_k_features (int, default 5): an INT64 value that specifies how many top feature attribution pairs are generated for each row of input data. The features are ranked by the absolute values of their attributions. By default, top_k_features is set to 5. If its value is greater than the number of features in the training data, the attributions of all features are returned. Returns: bigframes.pandas.DataFrame: The predicted DataFrames with explanation columns. """ if top_k_features < 1: raise ValueError( f"top_k_features must be at least 1, but is {top_k_features}." ) if not self._bqml_model: raise RuntimeError("A model must be fitted before predict") (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) return self._bqml_model.explain_predict( X, options={"top_k_features": top_k_features} )
[docs] def score( self, X: utils.ArrayType, y: utils.ArrayType, ) -> bpd.DataFrame: if not self._bqml_model: raise RuntimeError("A model must be fitted before score") X, y = utils.batch_convert_to_dataframe(X, y, session=self._bqml_model.session) input_data = X.join(y, how="outer") return self._bqml_model.evaluate(input_data)
[docs] def to_gbq(self, model_name: str, replace: bool = False) -> LogisticRegression: """Save the model to BigQuery. Args: model_name (str): The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: LogisticRegression: Saved model.""" if not self._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") # TODO(ashleyxu): support class_weight (struct array as dict in our API) if self.class_weight not in (None, "balanced"): raise NotImplementedError( f"class_weight is not supported yet. {constants.FEEDBACK_LINK}" ) new_model = self._bqml_model.copy(model_name, replace) return new_model.session.read_gbq_model(model_name)