BigFrames StreamingDataFramebigframes.streaming.StreamingDataFrame is a special DataFrame type that allows simple operations and can create streaming jobs to process real-time data and reverse ETL output to Bigtable and Pub/Sub using BigQuery continuous queries.#
In this notebook, we will:
Create a StreamingDataFrame from a BigQuery table
Do some operations like select, filter and preview the content
Create and manage streaming jobs to both Bigtable and Pub/Sub
import bigframes
# make sure bigframes version >= 1.12.0
bigframes.__version__
'1.31.0'
import bigframes.pandas as bpd
import bigframes.streaming as bst
bigframes.options._bigquery_options.project = "bigframes-load-testing" # Change to your own project ID
job_id_prefix = "test_streaming_"
# Copy a table from the public dataset for streaming jobs. Any changes to the table can be reflected in the streaming destination.
df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
df.to_gbq("birds.penguins_bigtable_streaming", if_exists="replace")
Create, select, filter and previewCreate the StreamingDataFrame from a BigQuery table, select certain columns, filter rows and preview the output#
sdf = bst.read_gbq_table("birds.penguins_bigtable_streaming")
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/session/__init__.py:604: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.
warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.
warnings.warn(msg, category=bfe.NullIndexPreviewWarning)
sdf = sdf[["species", "island", "body_mass_g"]]
sdf = sdf[sdf["body_mass_g"] < 4000]
# BigTable needs a rowkey column
sdf = sdf.rename(columns={"island": "rowkey"})
print(type(sdf))
sdf
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.
warnings.warn(msg, category=bfe.NullIndexPreviewWarning)
<class 'bigframes.streaming.dataframe.StreamingDataFrame'>
Query job 2894a764-5336-492f-98e1-c865fb161ef9 is DONE. 28.9 kB processed. Open Job
Query job f8fb08cb-ba11-4d73-8fff-c36081d98206 is DONE. 10.4 kB processed. Open Job
| species | rowkey | body_mass_g | |
|---|---|---|---|
| 0 | Adelie Penguin (Pygoscelis adeliae) | Torgersen | 3875.0 |
| 1 | Adelie Penguin (Pygoscelis adeliae) | Torgersen | 2900.0 |
| 2 | Adelie Penguin (Pygoscelis adeliae) | Biscoe | 3725.0 |
| 3 | Adelie Penguin (Pygoscelis adeliae) | Dream | 2975.0 |
| 4 | Adelie Penguin (Pygoscelis adeliae) | Torgersen | 3050.0 |
| 5 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 2700.0 |
| 6 | Adelie Penguin (Pygoscelis adeliae) | Dream | 3900.0 |
| 7 | Adelie Penguin (Pygoscelis adeliae) | Biscoe | 3825.0 |
| 8 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 3775.0 |
| 9 | Adelie Penguin (Pygoscelis adeliae) | Dream | 3350.0 |
| 10 | Adelie Penguin (Pygoscelis adeliae) | Biscoe | 3900.0 |
| 11 | Adelie Penguin (Pygoscelis adeliae) | Torgersen | 3650.0 |
| 12 | Adelie Penguin (Pygoscelis adeliae) | Biscoe | 3200.0 |
| 13 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 3650.0 |
| 14 | Adelie Penguin (Pygoscelis adeliae) | Dream | 3700.0 |
| 15 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 3800.0 |
| 16 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 3950.0 |
| 17 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 3350.0 |
| 18 | Adelie Penguin (Pygoscelis adeliae) | Dream | 3100.0 |
| 19 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 3750.0 |
| 20 | Adelie Penguin (Pygoscelis adeliae) | Biscoe | 3550.0 |
| 21 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 3400.0 |
| 22 | Adelie Penguin (Pygoscelis adeliae) | Torgersen | 3450.0 |
| 23 | Adelie Penguin (Pygoscelis adeliae) | Torgersen | 3600.0 |
| 24 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 3650.0 |
25 rows × 3 columns
BigTable#
Create BigTable streaming job
job = sdf.to_bigtable(instance="streaming-testing-instance", # Change to your own Bigtable instance name
table="garrettwu-no-col-family", # Change to your own Bigtable table name
service_account_email="streaming-testing-admin@bigframes-load-testing.iam.gserviceaccount.com", # Change to your own service account
app_profile=None,
truncate=True,
overwrite=True,
auto_create_column_families=True,
bigtable_options={},
job_id=None,
job_id_prefix=job_id_prefix,)
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/streaming/dataframe.py:352: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.
warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
print(job.running())
print(job.error_result)
True
None
job.cancel()
True
Pub/Sub#
Create Pub/Sub streaming job
# Pub/Sub requires a single column
sdf = sdf[["rowkey"]]
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.
warnings.warn(msg, category=bfe.NullIndexPreviewWarning)
job = sdf.to_pubsub(
topic="penguins", # Change to your own Pub/Sub topic ID
service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", # Change to your own service account
job_id=None,
job_id_prefix=job_id_prefix,
)
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/streaming/dataframe.py:464: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.
warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
print(job.running())
print(job.error_result)
True
None
job.cancel()
True