# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Options for BigQuery DataFrames."""
from __future__ import annotations
from typing import Literal, Optional, Sequence, Tuple
import warnings
import google.auth.credentials
import requests.adapters
import bigframes._importing
import bigframes.enums
import bigframes.exceptions as bfe
SESSION_STARTED_MESSAGE = (
"Cannot change '{attribute}' once a session has started. "
"Call bigframes.pandas.close_session() first, if you are using the bigframes.pandas API."
)
UNKNOWN_LOCATION_MESSAGE = "The location '{location}' is set to an unknown value. Did you mean '{possibility}'?"
def _get_validated_location(value: Optional[str]) -> Optional[str]:
import bigframes._tools.strings
if value is None or value in bigframes.constants.ALL_BIGQUERY_LOCATIONS:
return value
location = str(value)
location_lowercase = location.lower()
if location_lowercase in bigframes.constants.BIGQUERY_REGIONS:
return location_lowercase
location_uppercase = location.upper()
if location_uppercase in bigframes.constants.BIGQUERY_MULTIREGIONS:
return location_uppercase
possibility = min(
bigframes.constants.ALL_BIGQUERY_LOCATIONS,
key=lambda item: bigframes._tools.strings.levenshtein_distance(location, item),
)
# There are many layers before we get to (possibly) the user's code:
# -> bpd.options.bigquery.location = "us-central-1"
# -> location.setter
# -> _get_validated_location
msg = bfe.format_message(
UNKNOWN_LOCATION_MESSAGE.format(location=location, possibility=possibility)
)
warnings.warn(msg, stacklevel=3, category=bfe.UnknownLocationWarning)
return value
def _validate_ordering_mode(value: str) -> bigframes.enums.OrderingMode:
if value.casefold() == bigframes.enums.OrderingMode.STRICT.value.casefold():
return bigframes.enums.OrderingMode.STRICT
if value.casefold() == bigframes.enums.OrderingMode.PARTIAL.value.casefold():
return bigframes.enums.OrderingMode.PARTIAL
raise ValueError("Ordering mode must be one of 'strict' or 'partial'.")
[docs]
class BigQueryOptions:
"""Encapsulates configuration for working with a session."""
[docs]
def __init__(
self,
credentials: Optional[google.auth.credentials.Credentials] = None,
project: Optional[str] = None,
location: Optional[str] = None,
bq_connection: Optional[str] = None,
use_regional_endpoints: bool = False,
application_name: Optional[str] = None,
kms_key_name: Optional[str] = None,
skip_bq_connection_check: bool = False,
*,
allow_large_results: bool = False,
ordering_mode: Literal["strict", "partial"] = "strict",
client_endpoints_override: Optional[dict] = None,
requests_transport_adapters: Sequence[
Tuple[str, requests.adapters.BaseAdapter]
] = (),
enable_polars_execution: bool = False,
):
self._credentials = credentials
self._project = project
self._location = _get_validated_location(location)
self._bq_connection = bq_connection
self._use_regional_endpoints = use_regional_endpoints
self._application_name = application_name
self._kms_key_name = kms_key_name
self._skip_bq_connection_check = skip_bq_connection_check
self._allow_large_results = allow_large_results
self._requests_transport_adapters = requests_transport_adapters
self._session_started = False
# Determines the ordering strictness for the session.
self._ordering_mode = _validate_ordering_mode(ordering_mode)
if client_endpoints_override is None:
client_endpoints_override = {}
self._client_endpoints_override = client_endpoints_override
if enable_polars_execution:
bigframes._importing.import_polars()
self._enable_polars_execution = enable_polars_execution
@property
def application_name(self) -> Optional[str]:
"""The application name to amend to the user-agent sent to Google APIs.
The application name to amend to the user agent sent to Google APIs.
The recommended format is ``"application-name/major.minor.patch_version"``
or ``"(gpn:PartnerName;)"`` for official Google partners.
Returns:
None or str:
Application name as a string if exists; otherwise None.
"""
return self._application_name
@application_name.setter
def application_name(self, value: Optional[str]):
if self._session_started and self._application_name != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="application_name")
)
self._application_name = value
@property
def credentials(self) -> Optional[google.auth.credentials.Credentials]:
"""The OAuth2 credentials to use for this client.
Returns:
None or google.auth.credentials.Credentials:
google.auth.credentials.Credentials if exists; otherwise None.
"""
return self._credentials
@credentials.setter
def credentials(self, value: Optional[google.auth.credentials.Credentials]):
if self._session_started and self._credentials is not value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="credentials"))
self._credentials = value
@property
def location(self) -> Optional[str]:
"""Default location for job, datasets, and tables.
For more information, see https://cloud.google.com/bigquery/docs/locations BigQuery locations.
Returns:
None or str:
Default location as a string; otherwise None.
"""
return self._location
@location.setter
def location(self, value: Optional[str]):
if self._session_started and self._location != _get_validated_location(value):
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="location"))
self._location = _get_validated_location(value)
@property
def project(self) -> Optional[str]:
"""Google Cloud project ID to use for billing and as the default project.
Returns:
None or str:
Google Cloud project ID as a string; otherwise None.
"""
return self._project
@project.setter
def project(self, value: Optional[str]):
if self._session_started and self._project != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="project"))
self._project = value
@property
def bq_connection(self) -> Optional[str]:
"""Name of the BigQuery connection to use in the form
<PROJECT_NUMBER/PROJECT_ID>.<LOCATION>.<CONNECTION_ID>.
You either need to create the connection in a location of your choice, or
you need the Project Admin IAM role to enable the service to create the
connection for you.
If this option isn't available, or the project or location isn't provided,
then the default connection project/location/connection_id is used in the session.
If this option isn't provided, or project or location aren't provided,
session will use its default project/location/connection_id as default connection.
Returns:
None or str:
Name of the BigQuery connection as a string; otherwise None.
"""
return self._bq_connection
@bq_connection.setter
def bq_connection(self, value: Optional[str]):
if self._session_started and self._bq_connection != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="bq_connection"))
self._bq_connection = value
@property
def skip_bq_connection_check(self) -> bool:
"""Forcibly use the BigQuery connection.
Setting this flag to True would avoid creating the BigQuery connection
and checking or setting IAM permissions on it. So if the BigQuery
connection (default or user-provided) does not exist, or it does not have
necessary permissions set up to support BigQuery DataFrames operations,
then a runtime error will be reported.
Returns:
bool:
A boolean value, where True indicates a BigQuery connection is
not created or the connection does not have necessary
permissions set up; otherwise False.
"""
return self._skip_bq_connection_check
@skip_bq_connection_check.setter
def skip_bq_connection_check(self, value: bool):
if self._session_started and self._skip_bq_connection_check != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="skip_bq_connection_check")
)
self._skip_bq_connection_check = value
@property
def allow_large_results(self) -> bool:
"""
DEPRECATED: Checks the legacy global setting for allowing large results.
Use ``bpd.options.compute.allow_large_results`` instead.
Warning: Accessing ``bpd.options.bigquery.allow_large_results`` is deprecated
and this property will be removed in a future version. The configuration for
handling large results has moved.
Returns:
bool: The value of the deprecated setting.
"""
return self._allow_large_results
@allow_large_results.setter
def allow_large_results(self, value: bool):
warnings.warn(
"Setting `bpd.options.bigquery.allow_large_results` is deprecated, "
"and will be removed in the future. "
"Please use `bpd.options.compute.allow_large_results = <value>` instead. "
"The `bpd.options.bigquery.allow_large_results` option is ignored if "
"`bpd.options.compute.allow_large_results` is set.",
FutureWarning,
stacklevel=2,
)
if self._session_started and self._allow_large_results != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="allow_large_results")
)
self._allow_large_results = value
@property
def use_regional_endpoints(self) -> bool:
"""Flag to connect to regional API endpoints for BigQuery API and
BigQuery Storage API.
.. note::
Use of regional endpoints is a feature in Preview and available only
in regions "europe-west3", "europe-west8", "europe-west9",
"me-central2", "us-central1", "us-central2", "us-east1", "us-east4",
"us-east5", "us-east7", "us-south1", "us-west1", "us-west2", "us-west3"
and "us-west4".
Requires that ``location`` is set. For [supported regions](https://cloud.google.com/bigquery/docs/regional-endpoints),
for example ``europe-west3``, you need to specify
``location='europe-west3'`` and ``use_regional_endpoints=True``, and
then BigQuery DataFrames would connect to the BigQuery endpoint
``bigquery.europe-west3.rep.googleapis.com``. For not supported regions,
for example ``asia-northeast1``, when you specify
``location='asia-northeast1'`` and ``use_regional_endpoints=True``,
the global endpoint ``bigquery.googleapis.com`` would be used, which
does not promise any guarantee on the request remaining within the
location during transit.
Returns:
bool:
A boolean value, where True indicates that regional endpoints
would be used for BigQuery and BigQuery storage APIs; otherwise
global endpoints would be used.
"""
return self._use_regional_endpoints
@use_regional_endpoints.setter
def use_regional_endpoints(self, value: bool):
if self._session_started and self._use_regional_endpoints != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="use_regional_endpoints")
)
if value:
msg = bfe.format_message(
"Use of regional endpoints is a feature in preview and "
"available only in selected regions and projects. "
)
warnings.warn(msg, category=bfe.PreviewWarning, stacklevel=2)
self._use_regional_endpoints = value
@property
def kms_key_name(self) -> Optional[str]:
"""
Customer managed encryption key used to control encryption of the
data-at-rest in BigQuery. This is of the format
projects/PROJECT_ID/locations/LOCATION/keyRings/KEYRING/cryptoKeys/KEY.
For more information, see https://cloud.google.com/bigquery/docs/customer-managed-encryption
Customer-managed Cloud KMS keys
Make sure the project used for Bigquery DataFrames has the
Cloud KMS CryptoKey Encrypter/Decrypter IAM role in the key's project.
For more information, see https://cloud.google.com/bigquery/docs/customer-managed-encryption#assign_role
Assign the Encrypter/Decrypter.
Returns:
None or str:
Name of the customer managed encryption key as a string; otherwise None.
"""
return self._kms_key_name
@kms_key_name.setter
def kms_key_name(self, value: str):
if self._session_started and self._kms_key_name != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="kms_key_name"))
self._kms_key_name = value
@property
def ordering_mode(self) -> Literal["strict", "partial"]:
"""Controls whether total row order is always maintained for DataFrame/Series.
Returns:
Literal:
A literal string value of either strict or partial ordering mode.
"""
return self._ordering_mode.value
@ordering_mode.setter
def ordering_mode(self, value: Literal["strict", "partial"]) -> None:
ordering_mode = _validate_ordering_mode(value)
if self._session_started and self._ordering_mode != ordering_mode:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="ordering_mode"))
self._ordering_mode = ordering_mode
@property
def client_endpoints_override(self) -> dict:
"""Option that sets the BQ client endpoints addresses directly as a dict. Possible keys are "bqclient", "bqconnectionclient", "bqstoragereadclient"."""
return self._client_endpoints_override
@client_endpoints_override.setter
def client_endpoints_override(self, value: dict):
msg = bfe.format_message(
"This is an advanced configuration option for directly setting endpoints. "
"Incorrect use may lead to unexpected behavior or system instability. "
"Proceed only if you fully understand its implications."
)
warnings.warn(msg)
if self._session_started and self._client_endpoints_override != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="client_endpoints_override")
)
self._client_endpoints_override = value
@property
def requests_transport_adapters(
self,
) -> Sequence[Tuple[str, requests.adapters.BaseAdapter]]:
"""Transport adapters for requests-based REST clients such as the
google-cloud-bigquery package.
For more details, see the explanation in `requests guide to transport
adapters
<https://requests.readthedocs.io/en/latest/user/advanced/#transport-adapters>`_.
**Examples:**
Increase the connection pool size using the requests `HTTPAdapter
<https://requests.readthedocs.io/en/latest/api/#requests.adapters.HTTPAdapter>`_.
>>> import bigframes.pandas as bpd
>>> bpd.options.bigquery.requests_transport_adapters = (
... ("http://", requests.adapters.HTTPAdapter(pool_maxsize=100)),
... ("https://", requests.adapters.HTTPAdapter(pool_maxsize=100)),
... ) # doctest: +SKIP
Returns:
Sequence[Tuple[str, requests.adapters.BaseAdapter]]:
Prefixes and corresponding transport adapters to `mount
<https://requests.readthedocs.io/en/latest/api/#requests.Session.mount>`_
in requests-based REST clients.
"""
return self._requests_transport_adapters
@requests_transport_adapters.setter
def requests_transport_adapters(
self, value: Sequence[Tuple[str, requests.adapters.BaseAdapter]]
) -> None:
if self._session_started and self._requests_transport_adapters != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="requests_transport_adapters")
)
self._requests_transport_adapters = value
@property
def enable_polars_execution(self) -> bool:
"""If True, will use polars to execute some simple query plans locally."""
return self._enable_polars_execution
@enable_polars_execution.setter
def enable_polars_execution(self, value: bool):
if self._session_started and self._enable_polars_execution != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="enable_polars_execution")
)
if value is True:
msg = bfe.format_message(
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
)
warnings.warn(msg, category=bfe.PreviewWarning)
bigframes._importing.import_polars()
self._enable_polars_execution = value