Source code for bigframes.ml.model_selection

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Functions for test/train split and model tuning. This module is styled after
scikit-learn's model_selection module:
https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection."""


import inspect
from itertools import chain
import time
from typing import cast, Generator, List, Optional, Union

import bigframes_vendored.sklearn.model_selection._split as vendored_model_selection_split
import bigframes_vendored.sklearn.model_selection._validation as vendored_model_selection_validation
import pandas as pd

from bigframes.core import log_adapter
from bigframes.ml import utils
import bigframes.pandas as bpd


[docs] def train_test_split( *arrays: utils.ArrayType, test_size: Union[float, None] = None, train_size: Union[float, None] = None, random_state: Union[int, None] = None, stratify: Union[bpd.Series, None] = None, shuffle: bool = True, ) -> List[Union[bpd.DataFrame, bpd.Series]]: if test_size is None: if train_size is None: test_size = 0.25 else: test_size = 1.0 - train_size if train_size is None: train_size = 1.0 - test_size if train_size <= 0.0 or train_size >= 1.0: raise ValueError(f"train_size must be within (0.0, 1.0). But is {train_size}.") if test_size <= 0.0 or test_size >= 1.0: raise ValueError(f"test_size must be within (0.0, 1.0). But is {test_size}.") if train_size + test_size > 1.0: raise ValueError( f"The sum of train_size and test_size exceeds 1.0. train_size: {train_size}. test_size: {test_size}" ) if not shuffle: if stratify is not None: raise ValueError( "Stratified train/test split is not implemented for shuffle=False" ) bf_arrays = list(utils.batch_convert_to_bf_equivalent(*arrays)) total_rows = len(bf_arrays[0]) train_rows = int(total_rows * train_size) test_rows = total_rows - train_rows return list( chain.from_iterable( [ [bf_array.head(train_rows), bf_array.tail(test_rows)] for bf_array in bf_arrays ] ) ) dfs = list(utils.batch_convert_to_dataframe(*arrays)) def _stratify_split(df: bpd.DataFrame, stratify: bpd.Series) -> List[bpd.DataFrame]: """Split a single DF according to the stratify Series.""" stratify = stratify.rename("bigframes_stratify_col") # avoid name conflicts merged_df = df.join(stratify.to_frame(), how="outer") train_dfs, test_dfs = [], [] uniq = stratify.value_counts().index for value in uniq: cur = merged_df[merged_df["bigframes_stratify_col"] == value] train, test = train_test_split( cur, test_size=test_size, train_size=train_size, random_state=random_state, ) train_dfs.append(train) test_dfs.append(test) train_df = cast( bpd.DataFrame, bpd.concat(train_dfs).drop(columns="bigframes_stratify_col") ) test_df = cast( bpd.DataFrame, bpd.concat(test_dfs).drop(columns="bigframes_stratify_col") ) return [train_df, test_df] joined_df = dfs[0] for df in dfs[1:]: joined_df = joined_df.join(df, how="outer") if stratify is None: joined_df_train, joined_df_test = joined_df._split( fracs=(train_size, test_size), random_state=random_state ) else: joined_df_train, joined_df_test = _stratify_split(joined_df, stratify) results = [] for array in arrays: columns = array.name if isinstance(array, bpd.Series) else array.columns results.append(joined_df_train[columns]) results.append(joined_df_test[columns]) return results
train_test_split.__doc__ = inspect.getdoc( vendored_model_selection_split.train_test_split )
[docs] @log_adapter.class_logger class KFold(vendored_model_selection_split.KFold): __doc__ = inspect.getdoc(vendored_model_selection_split.KFold)
[docs] def __init__(self, n_splits: int = 5, *, random_state: Union[int, None] = None): if n_splits < 2: raise ValueError(f"n_splits must be at least 2. Got {n_splits}") self._n_splits = n_splits self._random_state = random_state
[docs] def get_n_splits(self) -> int: return self._n_splits
[docs] def split( self, X: utils.ArrayType, y: Union[utils.ArrayType, None] = None, ) -> Generator[tuple[Union[bpd.DataFrame, bpd.Series, None], ...], None, None]: X_df = next(utils.batch_convert_to_dataframe(X)) y_df_or = next(utils.batch_convert_to_dataframe(y)) if y is not None else None joined_df = X_df.join(y_df_or, how="outer") if y_df_or is not None else X_df fracs = (1 / self._n_splits,) * self._n_splits dfs = joined_df._split(fracs=fracs, random_state=self._random_state) for i in range(len(dfs)): train_df = bpd.concat(dfs[:i] + dfs[i + 1 :]) test_df = dfs[i] X_train = train_df[X_df.columns] y_train = train_df[y_df_or.columns] if y_df_or is not None else None X_test = test_df[X_df.columns] y_test = test_df[y_df_or.columns] if y_df_or is not None else None yield ( KFold._convert_to_bf_type(X_train, X), KFold._convert_to_bf_type(X_test, X), KFold._convert_to_bf_type(y_train, y), KFold._convert_to_bf_type(y_test, y), )
@staticmethod def _convert_to_bf_type( input, type_instance: Union[bpd.DataFrame, bpd.Series, pd.DataFrame, pd.Series, None], ) -> Union[bpd.DataFrame, bpd.Series, None]: if isinstance(type_instance, pd.Series) or isinstance( type_instance, bpd.Series ): return next(utils.batch_convert_to_series(input)) if isinstance(type_instance, pd.DataFrame) or isinstance( type_instance, bpd.DataFrame ): return next(utils.batch_convert_to_dataframe(input)) return None
[docs] def cross_validate( estimator, X: utils.ArrayType, y: Union[utils.ArrayType, None] = None, *, cv: Optional[Union[int, KFold]] = None, ) -> dict[str, list]: if cv is None: cv = KFold(n_splits=5) elif isinstance(cv, int): cv = KFold(n_splits=cv) result: dict[str, list] = {"test_score": [], "fit_time": [], "score_time": []} for X_train, X_test, y_train, y_test in cv.split(X, y): # type: ignore fit_start_time = time.perf_counter() estimator.fit(X_train, y_train) fit_time = time.perf_counter() - fit_start_time score_start_time = time.perf_counter() score = estimator.score(X_test, y_test) score_time = time.perf_counter() - score_start_time result["test_score"].append(score) result["fit_time"].append(fit_time) result["score_time"].append(score_time) return result
cross_validate.__doc__ = inspect.getdoc( vendored_model_selection_validation.cross_validate )