Source code for bigframes.ml.metrics._metrics

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Metrics functions for evaluating models. This module is styled after
scikit-learn's metrics module: https://scikit-learn.org/stable/modules/metrics.html."""

from __future__ import annotations

import inspect
import typing
from typing import Literal, overload, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.sklearn.metrics._classification as vendored_metrics_classification
import bigframes_vendored.sklearn.metrics._ranking as vendored_metrics_ranking
import bigframes_vendored.sklearn.metrics._regression as vendored_metrics_regression
import numpy as np
import pandas as pd

from bigframes.ml import utils
import bigframes.pandas as bpd


[docs] def r2_score( y_true: Union[bpd.DataFrame, bpd.Series], y_pred: Union[bpd.DataFrame, bpd.Series], *, force_finite=True, ) -> float: y_true_series, y_pred_series = utils.batch_convert_to_series(y_true, y_pred) # total sum of squares # (dataframe, scalar) binops # TODO(tbergeron): These stats are eagerly evaluated. Move to lazy representation once scalar subqueries supported. delta_from_mean = y_true_series - y_true_series.mean() ss_total = (delta_from_mean * delta_from_mean).sum() # residual sum of squares # (scalar, scalar) binops delta_from_pred = y_true_series - y_pred_series ss_res = (delta_from_pred * delta_from_pred).sum() if force_finite and ss_total == 0: return 0.0 if ss_res > 0 else 1.0 return 1 - (ss_res / ss_total)
r2_score.__doc__ = inspect.getdoc(vendored_metrics_regression.r2_score)
[docs] def accuracy_score( y_true: Union[bpd.DataFrame, bpd.Series], y_pred: Union[bpd.DataFrame, bpd.Series], *, normalize=True, ) -> float: # TODO(ashleyxu): support sample_weight as the parameter y_true_series, y_pred_series = utils.batch_convert_to_series(y_true, y_pred) # Compute accuracy for each possible representation # TODO(ashleyxu): add multilabel classification support where y_type # starts with "multilabel" score = (y_true_series == y_pred_series).astype(pd.Int64Dtype()) if normalize: return score.mean() else: return score.sum()
accuracy_score.__doc__ = inspect.getdoc(vendored_metrics_classification.accuracy_score)
[docs] def roc_curve( y_true: Union[bpd.DataFrame, bpd.Series], y_score: Union[bpd.DataFrame, bpd.Series], *, drop_intermediate: bool = True, ) -> Tuple[bpd.Series, bpd.Series, bpd.Series]: # TODO(bmil): Add multi-class support # TODO(bmil): Add multi-label support # TODO(bmil): Implement drop_intermediate if drop_intermediate: raise NotImplementedError( f"drop_intermediate is not yet implemented. {constants.FEEDBACK_LINK}" ) y_true_series, y_score_series = utils.batch_convert_to_series(y_true, y_score) session = y_true_series._block.expr.session # We operate on rows, so, remove the index if there is one # TODO(bmil): check that the indexes are equivalent before removing y_true_series = typing.cast(bpd.Series, y_true_series.reset_index(drop=True)) y_score_series = typing.cast(bpd.Series, y_score_series.reset_index(drop=True)) df = bpd.DataFrame( { "y_true": y_true_series, "y_score": y_score_series, } ) total_positives = y_true_series.sum() total_negatives = y_true_series.count() - total_positives df = df.sort_values(by="y_score", ascending=False) df["cum_tp"] = df["y_true"].cumsum() # have to astype("Int64") as not supported boolean cumsum yet. df["cum_fp"] = ( (~typing.cast(bpd.Series, df["y_true"].astype("boolean"))) .astype("Int64") .cumsum() ) # produce just one data point per y_score df = df.drop_duplicates(subset="y_score", keep="last") df = df.sort_values(by="y_score", ascending=False) df["tpr"] = typing.cast(bpd.Series, df["cum_tp"]) / total_positives df["fpr"] = typing.cast(bpd.Series, df["cum_fp"]) / total_negatives df["thresholds"] = typing.cast(bpd.Series, df["y_score"].astype("Float64")) # sklearn includes an extra datapoint for the origin with threshold np.inf # having problems with concating inline df_origin = session.read_pandas( pd.DataFrame({"tpr": [0.0], "fpr": [0.0], "thresholds": np.inf}) ) df = typing.cast(bpd.DataFrame, bpd.concat([df_origin, df], ignore_index=True)) df = df.reset_index(drop=True) return ( typing.cast(bpd.Series, df["fpr"]), typing.cast(bpd.Series, df["tpr"]), typing.cast(bpd.Series, df["thresholds"]), )
roc_curve.__doc__ = inspect.getdoc(vendored_metrics_ranking.roc_curve)
[docs] def roc_auc_score( y_true: Union[bpd.DataFrame, bpd.Series], y_score: Union[bpd.DataFrame, bpd.Series] ) -> float: # TODO(bmil): Add multi-class support # TODO(bmil): Add multi-label support y_true_series, y_score_series = utils.batch_convert_to_series(y_true, y_score) fpr, tpr, _ = roc_curve(y_true_series, y_score_series, drop_intermediate=False) # Use the trapezoid rule to compute the area under the ROC curve width_diff = fpr.diff().iloc[1:].reset_index(drop=True) height_avg = (tpr.iloc[:-1] + tpr.iloc[1:].reset_index(drop=True)) / 2 return typing.cast(float, (width_diff * height_avg).sum())
roc_auc_score.__doc__ = inspect.getdoc(vendored_metrics_ranking.roc_auc_score)
[docs] def auc( x: Union[bpd.DataFrame, bpd.Series], y: Union[bpd.DataFrame, bpd.Series], ) -> float: x_series, y_series = utils.batch_convert_to_series(x, y) x_pandas = x_series.to_pandas() y_pandas = y_series.to_pandas() return vendored_metrics_ranking.auc(x_pandas, y_pandas)
auc.__doc__ = inspect.getdoc(vendored_metrics_ranking.auc)
[docs] def confusion_matrix( y_true: Union[bpd.DataFrame, bpd.Series], y_pred: Union[bpd.DataFrame, bpd.Series], ) -> pd.DataFrame: # TODO(ashleyxu): support labels and sample_weight parameters y_true_series, y_pred_series = utils.batch_convert_to_series(y_true, y_pred) y_true_series = y_true_series.rename("y_true") confusion_df = y_true_series.to_frame().assign(y_pred=y_pred_series) confusion_df = confusion_df.assign(dummy=0) groupby_count = ( confusion_df.groupby(by=["y_true", "y_pred"], as_index=False) .count() .to_pandas() ) unique_values = sorted( set(groupby_count["y_true"]).union(set(groupby_count["y_pred"])) ) confusion_matrix = pd.DataFrame( 0, index=pd.Index(unique_values), columns=pd.Index(unique_values), dtype=int ) # Loop through the result by rows and columns for _, row in groupby_count.iterrows(): y_true = row["y_true"] y_pred = row["y_pred"] count = row["dummy"] confusion_matrix[y_pred][y_true] = count return confusion_matrix
confusion_matrix.__doc__ = inspect.getdoc( vendored_metrics_classification.confusion_matrix )
[docs] def recall_score( y_true: Union[bpd.DataFrame, bpd.Series], y_pred: Union[bpd.DataFrame, bpd.Series], *, average: typing.Optional[str] = "binary", ) -> pd.Series: # TODO(ashleyxu): support more average type, default to "binary" if average is not None: raise NotImplementedError( f"Only average=None is supported. {constants.FEEDBACK_LINK}" ) y_true_series, y_pred_series = utils.batch_convert_to_series(y_true, y_pred) is_accurate = y_true_series == y_pred_series unique_labels = ( bpd.concat([y_true_series, y_pred_series], join="outer") .drop_duplicates() .sort_values(inplace=False) ) index = unique_labels.to_list() recall = ( is_accurate.groupby(y_true_series).sum() / is_accurate.groupby(y_true_series).count() ).to_pandas() recall_score = pd.Series(0, index=index) for i in recall_score.index: recall_score.loc[i] = recall.loc[i] return recall_score
recall_score.__doc__ = inspect.getdoc(vendored_metrics_classification.recall_score) @overload def precision_score( y_true: bpd.DataFrame | bpd.Series, y_pred: bpd.DataFrame | bpd.Series, *, pos_label: int | float | bool | str = ..., average: Literal["binary"] = ..., ) -> float: ... @overload def precision_score( y_true: bpd.DataFrame | bpd.Series, y_pred: bpd.DataFrame | bpd.Series, *, pos_label: int | float | bool | str = ..., average: None = ..., ) -> pd.Series: ...
[docs] def precision_score( y_true: bpd.DataFrame | bpd.Series, y_pred: bpd.DataFrame | bpd.Series, *, pos_label: int | float | bool | str = 1, average: Literal["binary"] | None = "binary", ) -> pd.Series | float: y_true_series, y_pred_series = utils.batch_convert_to_series(y_true, y_pred) if average is None: return _precision_score_per_label(y_true_series, y_pred_series) if average == "binary": return _precision_score_binary_pos_only(y_true_series, y_pred_series, pos_label) raise NotImplementedError( f"Unsupported 'average' param value: {average}. {constants.FEEDBACK_LINK}" )
precision_score.__doc__ = inspect.getdoc( vendored_metrics_classification.precision_score ) def _precision_score_per_label(y_true: bpd.Series, y_pred: bpd.Series) -> pd.Series: is_accurate = y_true == y_pred unique_labels = ( bpd.concat([y_true, y_pred], join="outer") .drop_duplicates() .sort_values(inplace=False) ) index = unique_labels.to_list() precision = ( is_accurate.groupby(y_pred).sum() / is_accurate.groupby(y_pred).count() ).to_pandas() precision_score = pd.Series(0, index=index) for i in precision.index: precision_score.loc[i] = precision.loc[i] return precision_score def _precision_score_binary_pos_only( y_true: bpd.Series, y_pred: bpd.Series, pos_label: int | float | bool | str ) -> float: unique_labels = bpd.concat([y_true, y_pred]).unique(keep_order=False) if unique_labels.count() != 2: raise ValueError( "Target is multiclass but average='binary'. Please choose another average setting." ) if not (unique_labels == pos_label).any(): raise ValueError( f"pos_labe={pos_label} is not a valid label. It should be one of {unique_labels.to_list()}" ) target_elem_idx = y_pred == pos_label is_accurate = y_pred[target_elem_idx] == y_true[target_elem_idx] return is_accurate.sum() / is_accurate.count()
[docs] def f1_score( y_true: Union[bpd.DataFrame, bpd.Series], y_pred: Union[bpd.DataFrame, bpd.Series], *, average: typing.Optional[str] = "binary", ) -> pd.Series: # TODO(ashleyxu): support more average type, default to "binary" y_true_series, y_pred_series = utils.batch_convert_to_series(y_true, y_pred) if average is not None: raise NotImplementedError( f"Only average=None is supported. {constants.FEEDBACK_LINK}" ) recall = recall_score(y_true_series, y_pred_series, average=None) precision = precision_score(y_true_series, y_pred_series, average=None) f1_score = pd.Series(0, index=recall.index) for index in recall.index: if precision[index] + recall[index] != 0: f1_score[index] = ( 2 * (precision[index] * recall[index]) / (precision[index] + recall[index]) ) else: f1_score[index] = 0 return f1_score
f1_score.__doc__ = inspect.getdoc(vendored_metrics_classification.f1_score)
[docs] def mean_squared_error( y_true: Union[bpd.DataFrame, bpd.Series], y_pred: Union[bpd.DataFrame, bpd.Series], ) -> float: y_true_series, y_pred_series = utils.batch_convert_to_series(y_true, y_pred) return (y_pred_series - y_true_series).pow(2).sum() / len(y_true_series)
mean_squared_error.__doc__ = inspect.getdoc( vendored_metrics_regression.mean_squared_error )
[docs] def mean_absolute_error( y_true: Union[bpd.DataFrame, bpd.Series], y_pred: Union[bpd.DataFrame, bpd.Series], ) -> float: y_true_series, y_pred_series = utils.batch_convert_to_series(y_true, y_pred) return (y_pred_series - y_true_series).abs().sum() / len(y_true_series)
mean_absolute_error.__doc__ = inspect.getdoc( vendored_metrics_regression.mean_absolute_error )