BigFrames StreamingDataFramebigframes.streaming.StreamingDataFrame is a special DataFrame type that allows simple operations and can create streaming jobs to process real-time data and reverse ETL output to Bigtable and Pub/Sub using BigQuery continuous queries.#

In this notebook, we will:

  • Create a StreamingDataFrame from a BigQuery table

  • Do some operations like select, filter and preview the content

  • Create and manage streaming jobs to both Bigtable and Pub/Sub

import bigframes
# make sure bigframes version >= 1.12.0
bigframes.__version__
'1.31.0'
import bigframes.pandas as bpd
import bigframes.streaming as bst
bigframes.options._bigquery_options.project = "bigframes-load-testing" # Change to your own project ID
job_id_prefix = "test_streaming_"
# Copy a table from the public dataset for streaming jobs. Any changes to the table can be reflected in the streaming destination.
df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
df.to_gbq("birds.penguins_bigtable_streaming", if_exists="replace")
Query job c72abbec-0dda-49e8-8617-4d8178659ec2 is DONE. 0 Bytes processed. Open Job
Query job d55762e7-d9d4-4a79-84a4-4975e9292158 is DONE. 28.9 kB processed. Open Job
'birds.penguins_bigtable_streaming'

Create, select, filter and previewCreate the StreamingDataFrame from a BigQuery table, select certain columns, filter rows and preview the output#

sdf = bst.read_gbq_table("birds.penguins_bigtable_streaming")
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/session/__init__.py:604: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.
  warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.
  warnings.warn(msg, category=bfe.NullIndexPreviewWarning)
sdf = sdf[["species", "island", "body_mass_g"]]
sdf = sdf[sdf["body_mass_g"] < 4000]
# BigTable needs a rowkey column
sdf = sdf.rename(columns={"island": "rowkey"})
print(type(sdf))
sdf
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.
  warnings.warn(msg, category=bfe.NullIndexPreviewWarning)
<class 'bigframes.streaming.dataframe.StreamingDataFrame'>
Query job 2894a764-5336-492f-98e1-c865fb161ef9 is DONE. 28.9 kB processed. Open Job
Query job f8fb08cb-ba11-4d73-8fff-c36081d98206 is DONE. 10.4 kB processed. Open Job
species rowkey body_mass_g
0 Adelie Penguin (Pygoscelis adeliae) Torgersen 3875.0
1 Adelie Penguin (Pygoscelis adeliae) Torgersen 2900.0
2 Adelie Penguin (Pygoscelis adeliae) Biscoe 3725.0
3 Adelie Penguin (Pygoscelis adeliae) Dream 2975.0
4 Adelie Penguin (Pygoscelis adeliae) Torgersen 3050.0
5 Chinstrap penguin (Pygoscelis antarctica) Dream 2700.0
6 Adelie Penguin (Pygoscelis adeliae) Dream 3900.0
7 Adelie Penguin (Pygoscelis adeliae) Biscoe 3825.0
8 Chinstrap penguin (Pygoscelis antarctica) Dream 3775.0
9 Adelie Penguin (Pygoscelis adeliae) Dream 3350.0
10 Adelie Penguin (Pygoscelis adeliae) Biscoe 3900.0
11 Adelie Penguin (Pygoscelis adeliae) Torgersen 3650.0
12 Adelie Penguin (Pygoscelis adeliae) Biscoe 3200.0
13 Chinstrap penguin (Pygoscelis antarctica) Dream 3650.0
14 Adelie Penguin (Pygoscelis adeliae) Dream 3700.0
15 Chinstrap penguin (Pygoscelis antarctica) Dream 3800.0
16 Chinstrap penguin (Pygoscelis antarctica) Dream 3950.0
17 Chinstrap penguin (Pygoscelis antarctica) Dream 3350.0
18 Adelie Penguin (Pygoscelis adeliae) Dream 3100.0
19 Chinstrap penguin (Pygoscelis antarctica) Dream 3750.0
20 Adelie Penguin (Pygoscelis adeliae) Biscoe 3550.0
21 Chinstrap penguin (Pygoscelis antarctica) Dream 3400.0
22 Adelie Penguin (Pygoscelis adeliae) Torgersen 3450.0
23 Adelie Penguin (Pygoscelis adeliae) Torgersen 3600.0
24 Chinstrap penguin (Pygoscelis antarctica) Dream 3650.0

25 rows × 3 columns

[165 rows x 3 columns in total]

BigTable#

Create BigTable streaming job

job = sdf.to_bigtable(instance="streaming-testing-instance", # Change to your own Bigtable instance name
    table="garrettwu-no-col-family", # Change to your own Bigtable table name
    service_account_email="streaming-testing-admin@bigframes-load-testing.iam.gserviceaccount.com", # Change to your own service account
    app_profile=None,
    truncate=True,
    overwrite=True,
    auto_create_column_families=True,
    bigtable_options={},
    job_id=None,
    job_id_prefix=job_id_prefix,)
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/streaming/dataframe.py:352: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.
  warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
print(job.running())
print(job.error_result)
True
None
job.cancel()
True

Pub/Sub#

Create Pub/Sub streaming job

# Pub/Sub requires a single column
sdf = sdf[["rowkey"]]
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.
  warnings.warn(msg, category=bfe.NullIndexPreviewWarning)
job = sdf.to_pubsub(
        topic="penguins", # Change to your own Pub/Sub topic ID
        service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", # Change to your own service account
        job_id=None,
        job_id_prefix=job_id_prefix,
    )
/usr/local/google/home/chelsealin/src/bigframes1/bigframes/streaming/dataframe.py:464: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.
  warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
print(job.running())
print(job.error_result)
True
None
job.cancel()
True