# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import functools
import inspect
import os
import threading
import typing
from typing import (
Any,
Callable,
Dict,
IO,
Iterable,
Literal,
MutableSequence,
Optional,
overload,
Sequence,
Tuple,
Union,
)
import warnings
import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq
from google.cloud import bigquery
import numpy
import pandas
from pandas._typing import (
CompressionOptions,
FilePath,
ReadPickleBuffer,
StorageOptions,
)
import pyarrow as pa
import bigframes._config as config
import bigframes.core.global_session as global_session
import bigframes.core.indexes
import bigframes.dataframe
import bigframes.enums
import bigframes.series
import bigframes.session
from bigframes.session import dry_runs
import bigframes.session._io.bigquery
import bigframes.session.clients
import bigframes.session.metrics
# Note: the following methods are duplicated from Session. This duplication
# enables the following:
#
# 1. Static type checking knows the argument and return types, which is
# difficult to do with decorators. Aside: When we require Python 3.10, we
# can use Concatenate for generic typing in decorators. See:
# https://stackoverflow.com/a/68290080/101923
# 2. docstrings get processed by static processing tools, such as VS Code's
# autocomplete.
# 3. Positional arguments function as expected. If we were to pull in the
# methods directly from Session, a Session object would need to be the first
# argument, even if we allow a default value.
# 4. Allows to set BigQuery options for the BigFrames session based on the
# method and its arguments.
[docs]
def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
"""Load a PyArrow Table to a BigQuery DataFrames DataFrame.
Args:
pa_table (pyarrow.Table):
PyArrow table to load data from.
Returns:
bigframes.dataframe.DataFrame:
A new DataFrame representing the data from the PyArrow table.
"""
session = global_session.get_global_session()
return session.read_arrow(pa_table=pa_table)
[docs]
def read_csv(
filepath_or_buffer: str | IO["bytes"],
*,
sep: Optional[str] = ",",
header: Optional[int] = 0,
names: Optional[
Union[MutableSequence[Any], numpy.ndarray[Any, Any], Tuple[Any, ...], range]
] = None,
index_col: Optional[
Union[
int,
str,
Sequence[Union[str, int]],
bigframes.enums.DefaultIndexKind,
Literal[False],
]
] = None,
usecols: Optional[
Union[
MutableSequence[str],
Tuple[str, ...],
Sequence[int],
pandas.Series,
pandas.Index,
numpy.ndarray[Any, Any],
Callable[[Any], bool],
]
] = None,
dtype: Optional[Dict] = None,
engine: Optional[
Literal["c", "python", "pyarrow", "python-fwf", "bigquery"]
] = None,
encoding: Optional[str] = None,
write_engine: constants.WriteEngineType = "default",
**kwargs,
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_csv,
filepath_or_buffer=filepath_or_buffer,
sep=sep,
header=header,
names=names,
index_col=index_col,
usecols=usecols,
dtype=dtype,
engine=engine,
encoding=encoding,
write_engine=write_engine,
**kwargs,
)
read_csv.__doc__ = inspect.getdoc(bigframes.session.Session.read_csv)
[docs]
def read_json(
path_or_buf: str | IO["bytes"],
*,
orient: Literal[
"split", "records", "index", "columns", "values", "table"
] = "columns",
dtype: Optional[Dict] = None,
encoding: Optional[str] = None,
lines: bool = False,
engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson",
write_engine: constants.WriteEngineType = "default",
**kwargs,
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_json,
path_or_buf=path_or_buf,
orient=orient,
dtype=dtype,
encoding=encoding,
lines=lines,
engine=engine,
write_engine=write_engine,
**kwargs,
)
read_json.__doc__ = inspect.getdoc(bigframes.session.Session.read_json)
@overload
def read_gbq( # type: ignore[overload-overlap]
query_or_table: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = ...,
columns: Iterable[str] = ...,
configuration: Optional[Dict] = ...,
max_results: Optional[int] = ...,
filters: vendored_pandas_gbq.FiltersType = ...,
use_cache: Optional[bool] = ...,
col_order: Iterable[str] = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> bigframes.dataframe.DataFrame:
...
@overload
def read_gbq(
query_or_table: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = ...,
columns: Iterable[str] = ...,
configuration: Optional[Dict] = ...,
max_results: Optional[int] = ...,
filters: vendored_pandas_gbq.FiltersType = ...,
use_cache: Optional[bool] = ...,
col_order: Iterable[str] = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> pandas.Series:
...
[docs]
def read_gbq(
query_or_table: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
filters: vendored_pandas_gbq.FiltersType = (),
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
dry_run: bool = False,
allow_large_results: Optional[bool] = None,
) -> bigframes.dataframe.DataFrame | pandas.Series:
_set_default_session_location_if_possible(query_or_table)
return global_session.with_default_session(
bigframes.session.Session.read_gbq,
query_or_table,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
filters=filters,
use_cache=use_cache,
col_order=col_order,
dry_run=dry_run,
allow_large_results=allow_large_results,
)
read_gbq.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq)
def _run_read_gbq_colab_sessionless_dry_run(
query: str,
*,
pyformat_args: Dict[str, Any],
) -> pandas.Series:
"""Run a dry_run without a session."""
query_formatted = bigframes.core.pyformat.pyformat(
query,
pyformat_args=pyformat_args,
dry_run=True,
)
bqclient = _get_bqclient()
job = _dry_run(query_formatted, bqclient)
return dry_runs.get_query_stats_with_inferred_dtypes(job, (), ())
def _try_read_gbq_colab_sessionless_dry_run(
query: str,
*,
pyformat_args: Dict[str, Any],
) -> Optional[pandas.Series]:
"""Run a dry_run without a session, only if the session hasn't yet started."""
global _default_location_lock
# Avoid creating a session just for dry run. We don't want to bind to a
# location too early. This is especially important if the query only refers
# to local data and not any BigQuery tables.
with _default_location_lock:
if not config.options.bigquery._session_started:
return _run_read_gbq_colab_sessionless_dry_run(
query, pyformat_args=pyformat_args
)
# Explicitly return None to indicate that we didn't run the dry run query.
return None
@overload
def _read_gbq_colab( # type: ignore[overload-overlap]
query_or_table: str,
*,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[False] = ...,
) -> bigframes.dataframe.DataFrame:
...
@overload
def _read_gbq_colab(
query_or_table: str,
*,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[True] = ...,
) -> pandas.Series:
...
def _read_gbq_colab(
query_or_table: str,
*,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> bigframes.dataframe.DataFrame | pandas.Series:
"""A Colab-specific version of read_gbq.
Calls `_set_default_session_location_if_possible` and then delegates
to `bigframes.session.Session._read_gbq_colab`.
Args:
query_or_table (str):
SQL query or table ID (table ID not yet supported).
pyformat_args (Optional[Dict[str, Any]]):
Parameters to format into the query string.
dry_run (bool):
If True, estimates the query results size without returning data.
The return will be a pandas Series with query metadata.
Returns:
Union[bigframes.dataframe.DataFrame, pandas.Series]:
A BigQuery DataFrame if `dry_run` is False, otherwise a pandas Series.
"""
if pyformat_args is None:
pyformat_args = {}
# Only try to set the global location if it's not a dry run. We don't want
# to bind to a location too early. This is especially important if the query
# only refers to local data and not any BigQuery tables.
if dry_run:
result = _try_read_gbq_colab_sessionless_dry_run(
query_or_table, pyformat_args=pyformat_args
)
if result is not None:
return result
# If we made it this far, we must have a session that has already
# started. That means we can safely call the "real" _read_gbq_colab,
# which generates slightly nicer SQL.
else:
# Delay formatting the query with the special "session-less" logic. This
# avoids doing unnecessary work if the session already has a location or has
# already started.
create_query = functools.partial(
bigframes.core.pyformat.pyformat,
query_or_table,
pyformat_args=pyformat_args,
dry_run=True,
)
_set_default_session_location_if_possible_deferred_query(create_query)
if not config.options.bigquery._session_started:
with warnings.catch_warnings():
# Don't warning about Polars in SQL cell.
# Related to b/437090788.
warnings.simplefilter("ignore", bigframes.exceptions.PreviewWarning)
config.options.bigquery.enable_polars_execution = True
return global_session.with_default_session(
bigframes.session.Session._read_gbq_colab,
query_or_table,
pyformat_args=pyformat_args,
dry_run=dry_run,
)
[docs]
def read_gbq_model(model_name: str):
return global_session.with_default_session(
bigframes.session.Session.read_gbq_model,
model_name,
)
read_gbq_model.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_model)
[docs]
def read_gbq_object_table(
object_table: str, *, name: Optional[str] = None
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_gbq_object_table,
object_table,
name=name,
)
read_gbq_object_table.__doc__ = inspect.getdoc(
bigframes.session.Session.read_gbq_object_table
)
@overload
def read_gbq_query( # type: ignore[overload-overlap]
query: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = ...,
columns: Iterable[str] = ...,
configuration: Optional[Dict] = ...,
max_results: Optional[int] = ...,
use_cache: Optional[bool] = ...,
col_order: Iterable[str] = ...,
filters: vendored_pandas_gbq.FiltersType = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> bigframes.dataframe.DataFrame:
...
@overload
def read_gbq_query(
query: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = ...,
columns: Iterable[str] = ...,
configuration: Optional[Dict] = ...,
max_results: Optional[int] = ...,
use_cache: Optional[bool] = ...,
col_order: Iterable[str] = ...,
filters: vendored_pandas_gbq.FiltersType = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> pandas.Series:
...
[docs]
def read_gbq_query(
query: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
filters: vendored_pandas_gbq.FiltersType = (),
dry_run: bool = False,
allow_large_results: Optional[bool] = None,
) -> bigframes.dataframe.DataFrame | pandas.Series:
_set_default_session_location_if_possible(query)
return global_session.with_default_session(
bigframes.session.Session.read_gbq_query,
query,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
use_cache=use_cache,
col_order=col_order,
filters=filters,
dry_run=dry_run,
allow_large_results=allow_large_results,
)
read_gbq_query.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_query)
@overload
def read_gbq_table( # type: ignore[overload-overlap]
query: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = ...,
columns: Iterable[str] = ...,
max_results: Optional[int] = ...,
filters: vendored_pandas_gbq.FiltersType = ...,
use_cache: bool = ...,
col_order: Iterable[str] = ...,
dry_run: Literal[False] = ...,
) -> bigframes.dataframe.DataFrame:
...
@overload
def read_gbq_table(
query: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = ...,
columns: Iterable[str] = ...,
max_results: Optional[int] = ...,
filters: vendored_pandas_gbq.FiltersType = ...,
use_cache: bool = ...,
col_order: Iterable[str] = ...,
dry_run: Literal[True] = ...,
) -> pandas.Series:
...
[docs]
def read_gbq_table(
query: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
columns: Iterable[str] = (),
max_results: Optional[int] = None,
filters: vendored_pandas_gbq.FiltersType = (),
use_cache: bool = True,
col_order: Iterable[str] = (),
dry_run: bool = False,
) -> bigframes.dataframe.DataFrame | pandas.Series:
_set_default_session_location_if_possible(query)
return global_session.with_default_session(
bigframes.session.Session.read_gbq_table,
query,
index_col=index_col,
columns=columns,
max_results=max_results,
filters=filters,
use_cache=use_cache,
col_order=col_order,
dry_run=dry_run,
)
read_gbq_table.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_table)
@typing.overload
def read_pandas(
pandas_dataframe: pandas.DataFrame,
*,
write_engine: constants.WriteEngineType = "default",
) -> bigframes.dataframe.DataFrame:
...
@typing.overload
def read_pandas(
pandas_dataframe: pandas.Series,
*,
write_engine: constants.WriteEngineType = "default",
) -> bigframes.series.Series:
...
@typing.overload
def read_pandas(
pandas_dataframe: pandas.Index,
*,
write_engine: constants.WriteEngineType = "default",
) -> bigframes.core.indexes.Index:
...
[docs]
def read_pandas(
pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index],
*,
write_engine: constants.WriteEngineType = "default",
):
return global_session.with_default_session(
bigframes.session.Session.read_pandas,
pandas_dataframe,
write_engine=write_engine,
)
read_pandas.__doc__ = inspect.getdoc(bigframes.session.Session.read_pandas)
[docs]
def read_pickle(
filepath_or_buffer: FilePath | ReadPickleBuffer,
compression: CompressionOptions = "infer",
storage_options: StorageOptions = None,
*,
write_engine: constants.WriteEngineType = "default",
):
return global_session.with_default_session(
bigframes.session.Session.read_pickle,
filepath_or_buffer=filepath_or_buffer,
compression=compression,
storage_options=storage_options,
write_engine=write_engine,
)
read_pickle.__doc__ = inspect.getdoc(bigframes.session.Session.read_pickle)
[docs]
def read_parquet(
path: str | IO["bytes"],
*,
engine: str = "auto",
write_engine: constants.WriteEngineType = "default",
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_parquet,
path,
engine=engine,
write_engine=write_engine,
)
read_parquet.__doc__ = inspect.getdoc(bigframes.session.Session.read_parquet)
[docs]
def read_gbq_function(
function_name: str,
is_row_processor: bool = False,
):
return global_session.with_default_session(
bigframes.session.Session.read_gbq_function,
function_name=function_name,
is_row_processor=is_row_processor,
)
read_gbq_function.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_function)
[docs]
def from_glob_path(
path: str, *, connection: Optional[str] = None, name: Optional[str] = None
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.from_glob_path,
path=path,
connection=connection,
name=name,
)
from_glob_path.__doc__ = inspect.getdoc(bigframes.session.Session.from_glob_path)
_default_location_lock = threading.Lock()
def _get_bqclient() -> bigquery.Client:
# Address circular imports in doctest due to bigframes/session/__init__.py
# containing a lot of logic and samples.
from bigframes.session import clients
clients_provider = clients.ClientsProvider(
project=config.options.bigquery.project,
location=config.options.bigquery.location,
use_regional_endpoints=config.options.bigquery.use_regional_endpoints,
credentials=config.options.bigquery.credentials,
application_name=config.options.bigquery.application_name,
bq_kms_key_name=config.options.bigquery.kms_key_name,
client_endpoints_override=config.options.bigquery.client_endpoints_override,
requests_transport_adapters=config.options.bigquery.requests_transport_adapters,
)
return clients_provider.bqclient
def _dry_run(query, bqclient) -> bigquery.QueryJob:
# Address circular imports in doctest due to bigframes/session/__init__.py
# containing a lot of logic and samples.
from bigframes.session import metrics as bf_metrics
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
# Fix for b/435183833. Log metrics even if a Session isn't available.
if bf_metrics.LOGGING_NAME_ENV_VAR in os.environ:
metrics = bf_metrics.ExecutionMetrics()
metrics.count_job_stats(job)
return job
def _set_default_session_location_if_possible(query):
_set_default_session_location_if_possible_deferred_query(lambda: query)
def _set_default_session_location_if_possible_deferred_query(create_query):
# Address circular imports in doctest due to bigframes/session/__init__.py
# containing a lot of logic and samples.
from bigframes.session._io import bigquery
# Set the location as per the query if this is the first query the user is
# running and:
# (1) Default session has not started yet, and
# (2) Location is not set yet, and
# (3) Use of regional endpoints is not set.
# If query is a table name, then it would be the location of the table.
# If query is a SQL with a table, then it would be table's location.
# If query is a SQL with no table, then it would be the BQ default location.
global _default_location_lock
with _default_location_lock:
if (
config.options.bigquery._session_started
or config.options.bigquery.location
or config.options.bigquery.use_regional_endpoints
):
return
query = create_query()
bqclient = _get_bqclient()
if bigquery.is_query(query):
# Intentionally run outside of the session so that we can detect the
# location before creating the session. Since it's a dry_run, labels
# aren't necessary.
job = _dry_run(query, bqclient)
config.options.bigquery.location = job.location
else:
table = bqclient.get_table(query)
config.options.bigquery.location = table.location