Vane Data / Guides

SQL Multimodal Pipeline

This guide shows a SQL-first pattern for multimodal data preparation. SQL owns metadata filtering and validation; Python UDFs handle the media-specific step; the result returns to SQL for inspection and output.

Use this style when most of the pipeline should remain readable to SQL users, but one stage needs Python libraries for decoding, feature extraction, or model inference.

Pipeline shape

The pattern has five stages:

  1. Select candidate rows with SQL.
  2. Load bytes or keep stable media references.
  3. Run a Python UDF for decoding or inference.
  4. Return to SQL for validation and summaries.
  5. Write a curated tabular output.

Vane Data does not add image, audio, or video syntax to SQL. Multimodal work is represented as table columns and explicit Python UDF stages.

1. Start with metadata in SQL

Keep the first step relational. Filter rows, normalize metadata, and reduce the input before Python sees it.

example.py
import vane


con = vane.connect()


items = con.sql("""
    select
        id,
        path,
        media_type
    from read_parquet('data/media_index/*.parquet')
    where media_type in ('image/png', 'image/jpeg')
      and path is not null
""")

2. Add bytes or references

If media bytes are already in Parquet, keep them in the relation and skip this step. If the table stores paths, use a UDF to load bytes and return the columns needed by later stages.

example.py
from pathlib import Path


import pyarrow as pa


def load_image_bytes(batch: pa.Table) -> pa.Table:
    ids = batch.column("id").to_pylist()
    paths = batch.column("path").to_pylist()
    media_types = batch.column("media_type").to_pylist()


    return pa.table({
        "id": ids,
        "path": paths,
        "media_type": media_types,
        "image_bytes": [Path(path).read_bytes() for path in paths],
    })


with_bytes = items.map_batches(
    load_image_bytes,
    schema={
        "id": "BIGINT",
        "path": "VARCHAR",
        "media_type": "VARCHAR",
        "image_bytes": "BLOB",
    },
    batch_size=32,
    execution_backend="subprocess_task",
)

map_batches returns the columns produced by the UDF. Return input columns such as id, path, and media_type when downstream stages still need them.

3. Run the model step

Use an actor backend when a model, tokenizer, image processor, or client should be initialized once and reused across batches.

example.py
class ImageClassifier:
    def __init__(self) -> None:
        # Initialize model state here in a real pipeline.
        self.labels = ("invoice", "claim", "other")


    def __call__(self, batch: pa.Table) -> pa.Table:
        ids = batch.column("id").to_pylist()
        paths = batch.column("path").to_pylist()
        media_types = batch.column("media_type").to_pylist()
        image_bytes = batch.column("image_bytes").to_pylist()


        labels = []
        scores = []
        for value in image_bytes:
            labels.append(self.labels[0] if value else self.labels[2])
            scores.append(1.0 if value else 0.0)


        return pa.table({
            "id": ids,
            "path": paths,
            "media_type": media_types,
            "label": labels,
            "score": scores,
        })


classified = with_bytes.map_batches(
    ImageClassifier,
    schema={
        "id": "BIGINT",
        "path": "VARCHAR",
        "media_type": "VARCHAR",
        "label": "VARCHAR",
        "score": "DOUBLE",
    },
    batch_size=16,
    execution_backend="subprocess_actor",
)

The example classifier is deliberately simple so the relation contract is visible. In production, put decoding, preprocessing, and inference inside the actor while keeping the returned schema stable.

4. Return to SQL

Create a table from the relation when you want to use normal SQL for validation, summaries, or downstream joins.

example.py
classified.to_table("classified_media")


summary = con.sql("""
    select
        label,
        count(*) as rows,
        avg(score) as avg_score
    from classified_media
    group by label
    order by rows desc
""")


summary.show()

5. Write output

Write only the columns that downstream systems need. Large media bytes are often useful during inference but unnecessary in curated outputs.

example.py
curated = con.sql("""
    select id, path, media_type, label, score
    from classified_media
""")


curated.write_parquet("output/classified_media/")

Move to Ray

The SQL text and UDF contract do not need to change when the expensive stage moves to Ray. Configure the runner before creating and executing the pipeline:

example.py
import vane


vane.configure(runner="ray")

Then select a Ray UDF backend for the expensive stage:

example.py
classified = with_bytes.map_batches(
    ImageClassifier,
    schema={
        "id": "BIGINT",
        "path": "VARCHAR",
        "media_type": "VARCHAR",
        "label": "VARCHAR",
        "score": "DOUBLE",
    },
    batch_size=16,
    execution_backend="ray_actor",
    gpus=1,
    concurrency=4,
)

Every Ray worker must be able to import the UDF class and access the same storage, credentials, model files, and Python dependencies.

Practical checks

Before promoting a multimodal pipeline:

  • Verify the UDF output schema with a small local run.
  • Keep filters and column projections before byte loading and inference.
  • Preserve stable identifiers so model outputs can be joined back to source records.
  • Decide whether media bytes belong in the final output or only in intermediate stages.
  • Test path, storage, and credential access from every Ray worker before scaling.