SQL Multimodal Pipeline
This guide shows a SQL-first pattern for multimodal data preparation. SQL owns metadata filtering and validation; Python UDFs handle the media-specific step; the result returns to SQL for inspection and output.
Use this style when most of the pipeline should remain readable to SQL users, but one stage needs Python libraries for decoding, feature extraction, or model inference.
Pipeline shape
The pattern has five stages:
- Select candidate rows with SQL.
- Load bytes or keep stable media references.
- Run a Python UDF for decoding or inference.
- Return to SQL for validation and summaries.
- Write a curated tabular output.
Vane Data does not add image, audio, or video syntax to SQL. Multimodal work is represented as table columns and explicit Python UDF stages.
1. Start with metadata in SQL
Keep the first step relational. Filter rows, normalize metadata, and reduce the input before Python sees it.
import vane con = vane.connect() items = con.sql(""" select id, path, media_type from read_parquet('data/media_index/*.parquet') where media_type in ('image/png', 'image/jpeg') and path is not null """)
2. Add bytes or references
If media bytes are already in Parquet, keep them in the relation and skip this step. If the table stores paths, use a UDF to load bytes and return the columns needed by later stages.
from pathlib import Path import pyarrow as pa def load_image_bytes(batch: pa.Table) -> pa.Table: ids = batch.column("id").to_pylist() paths = batch.column("path").to_pylist() media_types = batch.column("media_type").to_pylist() return pa.table({ "id": ids, "path": paths, "media_type": media_types, "image_bytes": [Path(path).read_bytes() for path in paths], }) with_bytes = items.map_batches( load_image_bytes, schema={ "id": "BIGINT", "path": "VARCHAR", "media_type": "VARCHAR", "image_bytes": "BLOB", }, batch_size=32, execution_backend="subprocess_task", )
map_batches returns the columns produced by the UDF. Return input columns such as id, path, and media_type when downstream stages still need them.
3. Run the model step
Use an actor backend when a model, tokenizer, image processor, or client should be initialized once and reused across batches.
class ImageClassifier: def __init__(self) -> None: # Initialize model state here in a real pipeline. self.labels = ("invoice", "claim", "other") def __call__(self, batch: pa.Table) -> pa.Table: ids = batch.column("id").to_pylist() paths = batch.column("path").to_pylist() media_types = batch.column("media_type").to_pylist() image_bytes = batch.column("image_bytes").to_pylist() labels = [] scores = [] for value in image_bytes: labels.append(self.labels[0] if value else self.labels[2]) scores.append(1.0 if value else 0.0) return pa.table({ "id": ids, "path": paths, "media_type": media_types, "label": labels, "score": scores, }) classified = with_bytes.map_batches( ImageClassifier, schema={ "id": "BIGINT", "path": "VARCHAR", "media_type": "VARCHAR", "label": "VARCHAR", "score": "DOUBLE", }, batch_size=16, execution_backend="subprocess_actor", )
The example classifier is deliberately simple so the relation contract is visible. In production, put decoding, preprocessing, and inference inside the actor while keeping the returned schema stable.
4. Return to SQL
Create a table from the relation when you want to use normal SQL for validation, summaries, or downstream joins.
classified.to_table("classified_media") summary = con.sql(""" select label, count(*) as rows, avg(score) as avg_score from classified_media group by label order by rows desc """) summary.show()
5. Write output
Write only the columns that downstream systems need. Large media bytes are often useful during inference but unnecessary in curated outputs.
curated = con.sql(""" select id, path, media_type, label, score from classified_media """) curated.write_parquet("output/classified_media/")
Move to Ray
The SQL text and UDF contract do not need to change when the expensive stage moves to Ray. Configure the runner before creating and executing the pipeline:
import vane vane.configure(runner="ray")
Then select a Ray UDF backend for the expensive stage:
classified = with_bytes.map_batches( ImageClassifier, schema={ "id": "BIGINT", "path": "VARCHAR", "media_type": "VARCHAR", "label": "VARCHAR", "score": "DOUBLE", }, batch_size=16, execution_backend="ray_actor", gpus=1, concurrency=4, )
Every Ray worker must be able to import the UDF class and access the same storage, credentials, model files, and Python dependencies.
Practical checks
Before promoting a multimodal pipeline:
- Verify the UDF output schema with a small local run.
- Keep filters and column projections before byte loading and inference.
- Preserve stable identifiers so model outputs can be joined back to source records.
- Decide whether media bytes belong in the final output or only in intermediate stages.
- Test path, storage, and credential access from every Ray worker before scaling.