{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# BigFrames StreamingDataFrame", "bigframes.streaming.StreamingDataFrame is a special DataFrame type that allows simple operations and can create streaming jobs to process real-time data and reverse ETL output to Bigtable and Pub/Sub using [BigQuery continuous queries](https://cloud.google.com/bigquery/docs/continuous-queries-introduction).\n", "\n", "In this notebook, we will:\n", "* Create a StreamingDataFrame from a BigQuery table\n", "* Do some operations like select, filter and preview the content\n", "* Create and manage streaming jobs to both Bigtable and Pub/Sub" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'1.31.0'" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import bigframes\n", "# make sure bigframes version >= 1.12.0\n", "bigframes.__version__" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import bigframes.pandas as bpd\n", "import bigframes.streaming as bst\n", "bigframes.options._bigquery_options.project = \"bigframes-load-testing\" # Change to your own project ID\n", "job_id_prefix = \"test_streaming_\"" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Query job c72abbec-0dda-49e8-8617-4d8178659ec2 is DONE. 0 Bytes processed. Open Job" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "Query job d55762e7-d9d4-4a79-84a4-4975e9292158 is DONE. 28.9 kB processed. Open Job" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "'birds.penguins_bigtable_streaming'" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Copy a table from the public dataset for streaming jobs. Any changes to the table can be reflected in the streaming destination.\n", "df = bpd.read_gbq(\"bigquery-public-data.ml_datasets.penguins\")\n", "df.to_gbq(\"birds.penguins_bigtable_streaming\", if_exists=\"replace\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create, select, filter and preview", "Create the StreamingDataFrame from a BigQuery table, select certain columns, filter rows and preview the output" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/google/home/chelsealin/src/bigframes1/bigframes/session/__init__.py:604: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.\n", " warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)\n", "/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.\n", " warnings.warn(msg, category=bfe.NullIndexPreviewWarning)\n" ] } ], "source": [ "sdf = bst.read_gbq_table(\"birds.penguins_bigtable_streaming\")" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.\n", " warnings.warn(msg, category=bfe.NullIndexPreviewWarning)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "text/html": [ "Query job 2894a764-5336-492f-98e1-c865fb161ef9 is DONE. 28.9 kB processed. Open Job" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "Query job f8fb08cb-ba11-4d73-8fff-c36081d98206 is DONE. 10.4 kB processed. Open Job" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
speciesrowkeybody_mass_g
0Adelie Penguin (Pygoscelis adeliae)Torgersen3875.0
1Adelie Penguin (Pygoscelis adeliae)Torgersen2900.0
2Adelie Penguin (Pygoscelis adeliae)Biscoe3725.0
3Adelie Penguin (Pygoscelis adeliae)Dream2975.0
4Adelie Penguin (Pygoscelis adeliae)Torgersen3050.0
5Chinstrap penguin (Pygoscelis antarctica)Dream2700.0
6Adelie Penguin (Pygoscelis adeliae)Dream3900.0
7Adelie Penguin (Pygoscelis adeliae)Biscoe3825.0
8Chinstrap penguin (Pygoscelis antarctica)Dream3775.0
9Adelie Penguin (Pygoscelis adeliae)Dream3350.0
10Adelie Penguin (Pygoscelis adeliae)Biscoe3900.0
11Adelie Penguin (Pygoscelis adeliae)Torgersen3650.0
12Adelie Penguin (Pygoscelis adeliae)Biscoe3200.0
13Chinstrap penguin (Pygoscelis antarctica)Dream3650.0
14Adelie Penguin (Pygoscelis adeliae)Dream3700.0
15Chinstrap penguin (Pygoscelis antarctica)Dream3800.0
16Chinstrap penguin (Pygoscelis antarctica)Dream3950.0
17Chinstrap penguin (Pygoscelis antarctica)Dream3350.0
18Adelie Penguin (Pygoscelis adeliae)Dream3100.0
19Chinstrap penguin (Pygoscelis antarctica)Dream3750.0
20Adelie Penguin (Pygoscelis adeliae)Biscoe3550.0
21Chinstrap penguin (Pygoscelis antarctica)Dream3400.0
22Adelie Penguin (Pygoscelis adeliae)Torgersen3450.0
23Adelie Penguin (Pygoscelis adeliae)Torgersen3600.0
24Chinstrap penguin (Pygoscelis antarctica)Dream3650.0
\n", "

25 rows × 3 columns

\n", "
[165 rows x 3 columns in total]" ], "text/plain": [ " species rowkey body_mass_g\n", " Adelie Penguin (Pygoscelis adeliae) Torgersen 3875.0\n", " Adelie Penguin (Pygoscelis adeliae) Torgersen 2900.0\n", " Adelie Penguin (Pygoscelis adeliae) Biscoe 3725.0\n", " Adelie Penguin (Pygoscelis adeliae) Dream 2975.0\n", " Adelie Penguin (Pygoscelis adeliae) Torgersen 3050.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 2700.0\n", " Adelie Penguin (Pygoscelis adeliae) Dream 3900.0\n", " Adelie Penguin (Pygoscelis adeliae) Biscoe 3825.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 3775.0\n", " Adelie Penguin (Pygoscelis adeliae) Dream 3350.0\n", " Adelie Penguin (Pygoscelis adeliae) Biscoe 3900.0\n", " Adelie Penguin (Pygoscelis adeliae) Torgersen 3650.0\n", " Adelie Penguin (Pygoscelis adeliae) Biscoe 3200.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 3650.0\n", " Adelie Penguin (Pygoscelis adeliae) Dream 3700.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 3800.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 3950.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 3350.0\n", " Adelie Penguin (Pygoscelis adeliae) Dream 3100.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 3750.0\n", " Adelie Penguin (Pygoscelis adeliae) Biscoe 3550.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 3400.0\n", " Adelie Penguin (Pygoscelis adeliae) Torgersen 3450.0\n", " Adelie Penguin (Pygoscelis adeliae) Torgersen 3600.0\n", "Chinstrap penguin (Pygoscelis antarctica) Dream 3650.0\n", "...\n", "\n", "[165 rows x 3 columns]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sdf = sdf[[\"species\", \"island\", \"body_mass_g\"]]\n", "sdf = sdf[sdf[\"body_mass_g\"] < 4000]\n", "# BigTable needs a rowkey column\n", "sdf = sdf.rename(columns={\"island\": \"rowkey\"})\n", "print(type(sdf))\n", "sdf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### BigTable\n", "Create BigTable streaming job" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/google/home/chelsealin/src/bigframes1/bigframes/streaming/dataframe.py:352: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.\n", " warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)\n" ] } ], "source": [ "job = sdf.to_bigtable(instance=\"streaming-testing-instance\", # Change to your own Bigtable instance name\n", " table=\"garrettwu-no-col-family\", # Change to your own Bigtable table name\n", " service_account_email=\"streaming-testing-admin@bigframes-load-testing.iam.gserviceaccount.com\", # Change to your own service account\n", " app_profile=None,\n", " truncate=True,\n", " overwrite=True,\n", " auto_create_column_families=True,\n", " bigtable_options={},\n", " job_id=None,\n", " job_id_prefix=job_id_prefix,)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n", "None\n" ] } ], "source": [ "print(job.running())\n", "print(job.error_result)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "job.cancel()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pub/Sub\n", "Create Pub/Sub streaming job" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/google/home/chelsealin/src/bigframes1/bigframes/core/blocks.py:141: NullIndexPreviewWarning: Creating object with Null Index. Null Index is a preview feature.\n", " warnings.warn(msg, category=bfe.NullIndexPreviewWarning)\n" ] } ], "source": [ "# Pub/Sub requires a single column\n", "sdf = sdf[[\"rowkey\"]]" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/google/home/chelsealin/src/bigframes1/bigframes/streaming/dataframe.py:464: PreviewWarning: The bigframes.streaming module is a preview feature, and subject to change.\n", " warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)\n" ] } ], "source": [ "job = sdf.to_pubsub(\n", " topic=\"penguins\", # Change to your own Pub/Sub topic ID\n", " service_account_email=\"streaming-testing@bigframes-load-testing.iam.gserviceaccount.com\", # Change to your own service account\n", " job_id=None,\n", " job_id_prefix=job_id_prefix,\n", " )" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n", "None\n" ] } ], "source": [ "print(job.running())\n", "print(job.error_result)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "job.cancel()" ] } ], "metadata": { "kernelspec": { "display_name": "venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.1" } }, "nbformat": 4, "nbformat_minor": 2 }