# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

BigFrames Multimodal DataFrame#

Colab logo Run in Colab GitHub logo View on GitHub BQ logo Open in BQ Studio

This notebook is introducing BigFrames Multimodal features:

  1. Create Multimodal DataFrame

  2. Combine unstructured data with structured data

  3. Conduct image transformations

  4. Use LLM models to ask questions and generate embeddings on images

  5. PDF chunking function

  6. Transcribe audio

  7. Extract EXIF metadata from images

Setup#

Install the latest bigframes package if bigframes version < 2.4.0

# !pip install bigframes --upgrade
PROJECT = "bigframes-dev" # replace with your project. 
# Refer to https://cloud.google.com/bigquery/docs/multimodal-data-dataframes-tutorial#required_roles for your required permissions

LOCATION = "us" # replace with your location.

# Dataset where the UDF will be created.
DATASET_ID = "bigframes_samples" # replace with your dataset ID.

OUTPUT_BUCKET = "bigframes_blob_test" # replace with your GCS bucket. 
# The connection (or bigframes-default-connection of the project) must have read/write permission to the bucket. 
# Refer to https://cloud.google.com/bigquery/docs/multimodal-data-dataframes-tutorial#grant-permissions for setting up connection service account permissions.
# In this Notebook it uses bigframes-default-connection by default. You can also bring in your own connections in each method.

FULL_CONNECTION_ID = f"{PROJECT}.{LOCATION}.bigframes-default-connection"

import bigframes
# Setup project
bigframes.options.bigquery.project = PROJECT
bigframes.options.bigquery.location = LOCATION

# Display options
bigframes.options.display.blob_display_width = 300
bigframes.options.display.progress_bar = None

import bigframes.pandas as bpd
import bigframes.bigquery as bbq
import bigframes.bigquery as bbq

def get_runtime_json_str(series, mode="R", with_metadata=False):
    """
    Get the runtime (contains signed URL to access gcs data) and apply the
    ToJSONSTring transformation.
    
    Args:
        series: bigframes.series.Series to operate on.
        mode: "R" for read, "RW" for read/write.
        with_metadata: Whether to fetch and include blob metadata.
    """
    # 1. Optionally fetch metadata
    s = (
        bbq.obj.fetch_metadata(series)
        if with_metadata
        else series
    )
    
    # 2. Retrieve the access URL runtime object
    runtime = bbq.obj.get_access_url(s, mode=mode)
    
    # 3. Convert the runtime object to a JSON string
    return bbq.to_json_string(runtime)

def get_metadata(series):
    # Fetch metadata and extract GCS metadata from the details JSON field
    metadata_obj = bbq.obj.fetch_metadata(series)
    return bbq.json_query(metadata_obj.struct.field("details"), "$.gcs_metadata")

def get_content_type(series):
    return bbq.json_value(get_metadata(series), "$.content_type")

def get_size(series):
    return bbq.json_value(get_metadata(series), "$.size").astype("Int64")

def get_updated(series):
    return bpd.to_datetime(bbq.json_value(get_metadata(series), "$.updated").astype("Int64"), unit="us", utc=True)

from IPython.display import HTML, display

def render_images(df):
    """Helper to display BigFrames DataFrame with rendered image previews."""
    import bigframes.pandas as bpd
    import bigframes.bigquery as bbq
    import bigframes
    from bigframes import dtypes
    import json
    
    if isinstance(df, bpd.Series):
        df = df.to_frame()
        
    # 1. Auto-detect columns holding ObjectRefs
    object_cols = [
        col for col, dtype in zip(df.columns, df.dtypes)
        if dtype == dtypes.OBJ_REF_DTYPE
    ]
    
    if not object_cols:
        display(df)
        return

    limit = bigframes.options.display.max_rows or 10
    view_df = df.head(limit)
        
    # 2. Bulk-fetch access runtime URLs ONLY (disable with_metadata to bypass potential 
    # race conditions on new files where BigQuery may error before async writes finalize)
    runtime_cols = {
        col: get_runtime_json_str(view_df[col], mode="R", with_metadata=False) 
        for col in object_cols
    }
        
    pandas_json_df = bpd.DataFrame(runtime_cols).to_pandas()
    final_pd = view_df.to_pandas()
    
    width = bigframes.options.display.blob_display_width or 300
    IMAGE_EXTENSIONS = (".png", ".jpg", ".jpeg", ".gif", ".webp")
    
    def format_cell_html(raw_json):
        if not raw_json:
            return ""
        try:
            obj_rt = json.loads(raw_json)
            
            if "access_urls" not in obj_rt:
                err = obj_rt.get("errors", [{"message": "URL Generation Failed"}])[0].get("message")
                return f'<span style="color:red;">Error: {err}</span>'
                
            uri = obj_rt.get("objectref", {}).get("uri", "")
            url = obj_rt["access_urls"]["read_url"]
            
            # Safely infer type from extension to guarantee immediate display availability
            if uri and str(uri).lower().endswith(IMAGE_EXTENSIONS):
                return f'<img src="{url}" width="{width}">'
            
            return f'<a href="{url}" target="_blank">{uri if uri else "view"}</a>'
        except:
            return "Format Error"

    for col in object_cols:
        final_pd[col] = pandas_json_df[col].map(format_cell_html)
        
    display(HTML(final_pd.to_html(escape=False)))

To create a Multimodal DataFrame, you can use bigframes.bigquery.obj.make_ref on a series of URIs. You can get the URIs from a BigQuery table or by listing them from Cloud Storage.

In this example, we use gcsfs to list the files from Cloud Storage, and then use read_gbq to load them into a BigQuery DataFrame before creating the object reference.

import gcsfs
import bigframes.bigquery as bbq

# List files using gcsfs (public bucket)
fs = gcsfs.GCSFileSystem(anon=True)
uris = fs.glob("gs://cloud-samples-data/bigquery/tutorials/cymbal-pets/images/*")

# Ensure URIs have gs:// prefix
uris = [u if u.startswith("gs://") else f"gs://{u}" for u in uris]

# Read the URIs into a BigQuery DataFrame using UNNEST
# We take the first 5 for this example
df_image = bpd.read_gbq(f"SELECT uri FROM UNNEST({uris[:5]}) as uri")

# Create the object reference column
df_image['image'] = bbq.obj.make_ref(df_image['uri'], authorizer=FULL_CONNECTION_ID)
df_image = df_image[['image']]
# Take only the 5 images to deal with. Preview the content of the Mutimodal DataFrame
df_image = df_image.head(5)
render_images(df_image)
/usr/local/google/home/shuowei/src/google-cloud-python/google-cloud-python/packages/bigframes/bigframes/dtypes.py:1044: JSONDtypeWarning: JSON columns will be represented as pandas.ArrowDtype(pyarrow.json_())
instead of using `db_dtypes` in the future when available in pandas
(https://github.com/pandas-dev/pandas/issues/60958) and pyarrow.
  warnings.warn(msg, bigframes.exceptions.JSONDtypeWarning)
image
0
1
2
3
4

2. Combine unstructured data with structured data#

Now you can put more information into the table to describe the files. Such as author info from inputs, or other metadata from the gcs object itself.

# Combine unstructured data with structured data
df_image = df_image.head(5)
df_image["author"] = ["alice", "bob", "bob", "alice", "bob"]  # type: ignore
df_image["content_type"] = get_content_type(df_image["image"])
df_image["size"] = get_size(df_image["image"])
df_image["updated"] = get_updated(df_image["image"])
render_images(df_image)
/usr/local/google/home/shuowei/src/google-cloud-python/google-cloud-python/packages/bigframes/bigframes/dtypes.py:1044: JSONDtypeWarning: JSON columns will be represented as pandas.ArrowDtype(pyarrow.json_())
instead of using `db_dtypes` in the future when available in pandas
(https://github.com/pandas-dev/pandas/issues/60958) and pyarrow.
  warnings.warn(msg, bigframes.exceptions.JSONDtypeWarning)
image author content_type size updated
0 alice image/png 715766 2025-03-20 17:44:38+00:00
1 bob image/png 1167406 2025-03-20 17:44:38+00:00
2 bob image/png 1150892 2025-03-20 17:44:39+00:00
3 alice image/png 1736533 2025-03-20 17:44:39+00:00
4 bob image/png 439740 2025-03-20 17:44:39+00:00

3. Conduct image transformations#

This section demonstrates how to perform image transformations like blur, resize, and normalize using custom BigQuery Python UDFs and the opencv-python library.

# Construct the canonical connection ID
FULL_CONNECTION_ID = f"{PROJECT}.{LOCATION}.bigframes-default-connection"

@bpd.udf(
    input_types=[str, str, int, int],
    output_type=str,
    dataset=DATASET_ID,
    name="image_blur_v2",
    bigquery_connection=FULL_CONNECTION_ID,
    packages=["opencv-python-headless", "numpy", "requests"],
)
def image_blur(src_rt: str, dst_rt: str, kx: int, ky: int) -> str:
    import json
    import cv2 as cv
    import numpy as np
    import requests
    import base64

    src_obj = json.loads(src_rt)
    if "access_urls" not in src_obj:
        raise ValueError(f"Missing 'access_urls' in source object. Response: {src_obj}")
    src_url = src_obj["access_urls"]["read_url"]
    
    response = requests.get(src_url, timeout=30)
    response.raise_for_status()
      
    img = cv.imdecode(np.frombuffer(response.content, np.uint8), cv.IMREAD_UNCHANGED)
    if img is None:
        raise ValueError("cv.imdecode failed")
    
    kx, ky = int(kx), int(ky)
    img_blurred = cv.blur(img, ksize=(kx, ky))
      
    success, encoded = cv.imencode(".jpeg", img_blurred)
    if not success:
        raise ValueError("cv.imencode failed")
      
    # Handle two output modes
    if dst_rt:  # GCS/Series output mode
        dst_obj = json.loads(dst_rt)
        if "access_urls" not in dst_obj:
            raise ValueError(f"Missing 'access_urls' in destination object. Verify authorizer permissions. Response: {dst_obj}")
        dst_url = dst_obj["access_urls"]["write_url"]
          
        requests.put(dst_url, data=encoded.tobytes(), headers={"Content-Type": "image/jpeg"}, timeout=30).raise_for_status()
          
        uri = dst_obj["objectref"]["uri"]
        return uri
                  
    else:  # BigQuery bytes output mode  
        image_bytes = encoded.tobytes()
        return base64.b64encode(image_bytes).decode()

def apply_transformation(series, dst_folder, udf, *args, verbose=False):
    import os
    dst_folder = os.path.join(dst_folder, "")
    # Fetch metadata to get the URI
    metadata = bbq.obj.fetch_metadata(series)
    current_uri = metadata.struct.field("uri")
    dst_uri = current_uri.str.replace(r"^.*\/(.*)$", rf"{dst_folder}\1", regex=True)
    
    # To avoid synchronous 404 validation checks on files that don't exist yet, 
    # bypass the validator by explicitly constructing an objectref JSON.
    dst_blob_df = bpd.DataFrame({"uri": dst_uri})
    dst_blob_df["authorizer"] = FULL_CONNECTION_ID
    dst_blob = bbq.obj.make_ref(bbq.to_json(bbq.struct(dst_blob_df)))

    df_transform = bpd.DataFrame({
        "src_rt": get_runtime_json_str(series, mode="R"),
        "dst_rt": get_runtime_json_str(dst_blob, mode="RW"),
    })
    res = df_transform[["src_rt", "dst_rt"]].apply(
        udf, axis=1, args=args
    )
    
    if verbose:
        return res
    
    # Final return MUST also use JSON bypass to eliminate temporary 404 validation 
    # errors from embedded ObjectRefs during fused query execution pipelines.
    res_df = bpd.DataFrame({"uri": res})
    res_df["authorizer"] = FULL_CONNECTION_ID
    return bbq.obj.make_ref(bbq.to_json(bbq.struct(res_df)))

# Apply transformations
df_image["blurred"] = apply_transformation(
    df_image["image"], f"gs://{OUTPUT_BUCKET}/image_blur_transformed/",
    image_blur, 20, 20
)
render_images(df_image[["image", "blurred"]])
/usr/local/google/home/shuowei/src/google-cloud-python/google-cloud-python/packages/bigframes/bigframes/pandas/__init__.py:211: PreviewWarning: udf is in preview.
  return global_session.with_default_session(
/usr/local/google/home/shuowei/src/google-cloud-python/google-cloud-python/packages/bigframes/bigframes/dataframe.py:4695: FunctionAxisOnePreviewWarning: DataFrame.apply with parameter axis=1 scenario is in preview.
  warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning)
/usr/local/google/home/shuowei/src/google-cloud-python/google-cloud-python/packages/bigframes/bigframes/dtypes.py:1044: JSONDtypeWarning: JSON columns will be represented as pandas.ArrowDtype(pyarrow.json_())
instead of using `db_dtypes` in the future when available in pandas
(https://github.com/pandas-dev/pandas/issues/60958) and pyarrow.
  warnings.warn(msg, bigframes.exceptions.JSONDtypeWarning)
image blurred
0
1
2
3
4

4. Use LLM models to ask questions and generate embeddings on images#

from bigframes.ml import llm
gemini = llm.GeminiTextGenerator()
# Ask the same question on the images
answer = gemini.predict(df_image, prompt=["what item is it?", "what color is the picture?"])
render_images(answer[["ml_generate_text_llm_result", "image"]])
# Ask different questions
df_image["question"] = [
    "what item is it?",
    "what color is the picture?",
    "what is the product name?",
    "is it for pets?",
    "what is the weight of the product?",
]
answer_alt = gemini.predict(df_image, prompt=[df_image["question"], df_image["image"]])
render_images(answer_alt[["ml_generate_text_llm_result", "image"]])
# Generate embeddings.
embed_model = llm.MultimodalEmbeddingGenerator()
embeddings = embed_model.predict(df_image["image"])
embeddings

5. PDF extraction and chunking function#

This section demonstrates how to extract text and chunk text from PDF files using custom BigQuery Python UDFs and the pypdf library.

# Construct the canonical connection ID
FULL_CONNECTION_ID = f"{PROJECT}.{LOCATION}.bigframes-default-connection"

@bpd.udf(
    input_types=[str],
    output_type=str,
    dataset=DATASET_ID,
    name="pdf_extract",
    bigquery_connection=FULL_CONNECTION_ID,
    packages=["pypdf", "requests", "cryptography"],
)
def pdf_extract(src_obj_ref_rt: str) -> str:
    import io
    import json
    from pypdf import PdfReader
    import requests
    src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
    src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
    response = requests.get(src_url, timeout=30, stream=True)
    response.raise_for_status()
    pdf_bytes = response.content
    pdf_file = io.BytesIO(pdf_bytes)
    reader = PdfReader(pdf_file, strict=False)
    all_text = ""
    for page in reader.pages:
        page_extract_text = page.extract_text()
        if page_extract_text:
            all_text += page_extract_text
    return all_text

@bpd.udf(
    input_types=[str, int, int],
    output_type=list[str],
    dataset=DATASET_ID,
    name="pdf_chunk",
    bigquery_connection=FULL_CONNECTION_ID,
    packages=["pypdf", "requests", "cryptography"],
)
def pdf_chunk(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> list[str]:
    import io
    import json
    from pypdf import PdfReader
    import requests
    src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
    src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
    response = requests.get(src_url, timeout=30, stream=True)
    response.raise_for_status()
    pdf_bytes = response.content
    pdf_file = io.BytesIO(pdf_bytes)
    reader = PdfReader(pdf_file, strict=False)
    all_text_chunks = []
    curr_chunk = ""
    for page in reader.pages:
        page_text = page.extract_text()
        if page_text:
            curr_chunk += page_text
            while len(curr_chunk) >= chunk_size:
                split_idx = curr_chunk.rfind(" ", 0, chunk_size)
                if split_idx == -1:
                    split_idx = chunk_size
                actual_chunk = curr_chunk[:split_idx]
                all_text_chunks.append(actual_chunk)
                overlap = curr_chunk[split_idx + 1 : split_idx + 1 + overlap_size]
                curr_chunk = overlap + curr_chunk[split_idx + 1 + overlap_size :]
    if curr_chunk:
        all_text_chunks.append(curr_chunk)
    return all_text_chunks
import gcsfs
import bigframes.bigquery as bbq

# List files using gcsfs
fs = gcsfs.GCSFileSystem(anon=True)
uris = fs.glob("gs://cloud-samples-data/bigquery/tutorials/cymbal-pets/documents/*")

# Ensure URIs have gs:// prefix
uris = [u if u.startswith("gs://") else f"gs://{u}" for u in uris]

# Read the URIs into a BigQuery DataFrame
df_pdf = bpd.read_gbq(f"SELECT uri FROM UNNEST({uris[:5]}) as uri")

# Create the object reference column
df_pdf['pdf'] = bbq.obj.make_ref(df_pdf['uri'], authorizer=FULL_CONNECTION_ID)
df_pdf = df_pdf[['pdf']]

# Generate a JSON string containing the runtime information (including signed read URLs)
access_urls = get_runtime_json_str(df_pdf["pdf"], mode="R")

# Apply PDF extraction
df_pdf["extracted_text"] = access_urls.apply(pdf_extract)

# Apply PDF chunking
df_pdf["chunked"] = access_urls.apply(pdf_chunk, args=(2000, 200))

df_pdf[["extracted_text", "chunked"]]
# Explode the chunks to see each chunk as a separate row
chunked = df_pdf["chunked"].explode()
chunked

6. Audio transcribe#

import gcsfs
import bigframes.bigquery as bbq

audio_gcs_path = "gs://bigframes_blob_test/audio/*"

# List files using gcsfs
fs = gcsfs.GCSFileSystem()
uris = fs.glob(audio_gcs_path)

# Ensure URIs have gs:// prefix
uris = [u if u.startswith("gs://") else f"gs://{u}" for u in uris]

# Read the URIs into a BigQuery DataFrame
# If the bucket is empty or doesn't exist, this will result in an empty DataFrame
if not uris:
    # Fallback to a dummy list or just let it be empty
    uris = ["gs://bigframes_blob_test/audio/dummy.mp3"]

df = bpd.read_gbq(f"SELECT uri FROM UNNEST({uris[:5]}) as uri")

# Create the object reference column
df['audio'] = bbq.obj.make_ref(df['uri'], authorizer=FULL_CONNECTION_ID)
df = df[['audio']]
# The audio_transcribe function is a convenience wrapper around bigframes.bigquery.ai.generate.
# Here's how to perform the same operation directly:

audio_series = df["audio"]
prompt_text = (
    "**Task:** Transcribe the provided audio. **Instructions:** - Your response "
    "must contain only the verbatim transcription of the audio. - Do not include "
    "any introductory text, summaries, or conversational filler in your response. "
    "The output should begin directly with the first word of the audio."
)

# Convert the audio series to the runtime representation required by the model.
# This involves fetching metadata and getting a signed access URL.
audio_metadata = bbq.obj.fetch_metadata(audio_series)
audio_runtime = bbq.obj.get_access_url(audio_metadata, mode="R")

transcribed_results = bbq.ai.generate(
    prompt=(prompt_text, audio_runtime),
    endpoint="gemini-2.5-flash",
    model_params={"generationConfig": {"temperature": 0.0}},
)

transcribed_series = transcribed_results.struct.field("result").rename("transcribed_content")
transcribed_series
# To get verbose results (including status), we can extract both fields from the result struct.
transcribed_content_series = transcribed_results.struct.field("result")
transcribed_status_series = transcribed_results.struct.field("status")

transcribed_series_verbose = bpd.DataFrame(
    {
        "status": transcribed_status_series,
        "content": transcribed_content_series,
    }
)
# Package as a struct for consistent display
transcribed_series_verbose = bbq.struct(transcribed_series_verbose).rename("transcription_results")
transcribed_series_verbose

7. Extract EXIF metadata from images#

This section demonstrates how to extract EXIF metadata from images using a custom BigQuery Python UDF and the Pillow library.

# Construct the canonical connection ID
FULL_CONNECTION_ID = f"{PROJECT}.{LOCATION}.bigframes-default-connection"

@bpd.udf(
    input_types=[str],
    output_type=str,
    dataset=DATASET_ID,
    name="extract_exif",
    bigquery_connection=FULL_CONNECTION_ID,
    packages=["pillow", "requests"],
    max_batching_rows=8192,
    container_cpu=0.33,
    container_memory="512Mi"
)
def extract_exif(src_obj_ref_rt: str) -> str:
    import io
    import json
    from PIL import ExifTags, Image
    import requests
    src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
    src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
    response = requests.get(src_url, timeout=30)
    bts = response.content
    image = Image.open(io.BytesIO(bts))
    exif_data = image.getexif()
    exif_dict = {}
    if exif_data:
        for tag, value in exif_data.items():
            tag_name = ExifTags.TAGS.get(tag, tag)
            exif_dict[tag_name] = value
    return json.dumps(exif_dict)
import gcsfs
import bigframes.bigquery as bbq

# Create a Multimodal DataFrame from the sample image URIs
fs = gcsfs.GCSFileSystem()
uris = fs.glob("gs://bigframes_blob_test/images_exif/*")

# Ensure URIs have gs:// prefix
uris = [u if u.startswith("gs://") else f"gs://{u}" for u in uris]

if not uris:
    uris = ["gs://bigframes_blob_test/images_exif/dummy.jpg"]

exif_image_df = bpd.read_gbq(f"SELECT uri FROM UNNEST({uris[:5]}) as uri")
exif_image_df['blob_col'] = bbq.obj.make_ref(exif_image_df['uri'], authorizer=FULL_CONNECTION_ID)
exif_image_df = exif_image_df[['blob_col']]

# Generate a JSON string containing the runtime information (including signed read URLs)
# This allows the UDF to download the images from Google Cloud Storage
access_urls = get_runtime_json_str(exif_image_df["blob_col"], mode="R")

# Apply the BigQuery Python UDF to the runtime JSON strings
# We cast to string to ensure the input matches the UDF's signature
exif_json = access_urls.astype(str).apply(extract_exif)

# Parse the resulting JSON strings back into a structured JSON type for easier access
exif_data = bbq.parse_json(exif_json)

exif_data