UDFs
Vane Data UDFs let you run Python inside a relation pipeline without giving up explicit schemas, batch control, or a path to distributed execution.
Use UDFs when a step needs Python libraries, model state, custom decoding, external clients, or row expansion. Keep relational work in SQL, then use UDFs for the part SQL should not own.
Public APIs
Vane Data exposes three relation-level UDF APIs:
| API | Shape | Typical use |
|---|---|---|
| map_batches(function, schema=..., ...) | Arrow batch to Arrow batch | Vectorized transforms, model inference, decoding |
| flat_map(function, schema=..., ...) | Row to zero, one, or many rows | Chunking, splitting, expanding nested records |
| map(map_function, return_type=..., ...) | Row-wise scalar value | Small scalar enrichments |
Core parameters:
- schema: output columns and types for relation-producing UDFs.
- return_type: scalar output type for map.
- batch_size: rows per map_batches call.
- execution_backend: where the callable runs.
- cpus and gpus: resource requests where supported.
- concurrency: number of executors or actors where supported.
- streaming_breaker: advanced map_batches execution-boundary control.
Unsupported keyword arguments are rejected early, before the relation is rendered.
Execution backends
| Backend | Callable shape | Best for |
|---|---|---|
| subprocess_task | Python function | Local stateless batch transforms |
| subprocess_actor | Callable class | Local model or client reuse |
| ray_task | Python function | Distributed stateless transforms |
| ray_actor | Callable class | Distributed stateful inference, GPU-backed models |
Pass a function to task backends. Pass a class, not a pre-created instance, to actor backends so Vane Data can construct and reuse the object inside the executor.
Batch UDF contract
map_batches receives Arrow data and returns Arrow data. The returned table must match the declared output schema.
import pyarrow as pa import vane con = vane.connect() rel = con.sql(""" select * from ( values (1, 'hello'), (2, 'vane data') ) as t(id, text) """) def add_features(batch: pa.Table) -> pa.Table: values = batch.column("text").to_pylist() return pa.table({ "length": [len(str(value)) for value in values], "upper_text": [str(value).upper() for value in values], }) features = rel.map_batches( add_features, schema={ "length": "BIGINT", "upper_text": "VARCHAR", }, batch_size=1024, execution_backend="subprocess_task", ) features.show()
Batch UDFs are the default choice for model and media workloads because they amortize Python overhead and let model libraries process multiple rows at once.
Stateful class UDFs
Use actor backends when setup is expensive and should be reused across batches.
import pyarrow as pa class EmbedBatch: def __init__(self) -> None: from sentence_transformers import SentenceTransformer self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") def __call__(self, batch: pa.Table) -> pa.Table: text = batch.column("text").to_pylist() embeddings = self.model.encode(text).tolist() return pa.table({"embedding": embeddings}) embedded = rel.map_batches( EmbedBatch, schema={"embedding": "FLOAT[]"}, batch_size=64, execution_backend="subprocess_actor", concurrency=2, )
This relation contains the declared embedding column. Return source identifiers from the UDF as well when later stages need to join embeddings back to input rows.
For distributed GPU inference, use the same callable class shape with ray_actor:
embedded = rel.map_batches( EmbedBatch, schema={"embedding": "FLOAT[]"}, batch_size=64, execution_backend="ray_actor", gpus=1, concurrency=4, )
Every Ray worker must be able to import the class and access the required model files or credentials.
One-to-many transforms
Use flat_map when one input row can produce zero, one, or many output rows.
def split_terms(row: dict) -> list[dict]: return [{"term": term} for term in str(row["text"]).split()] terms = rel.flat_map( split_terms, schema={"term": "VARCHAR"}, execution_backend="subprocess_task", )
Typical uses include document chunking, URL expansion, nested record normalization, and filtering by returning an empty list.
Scalar transforms
Use map for small row-wise scalar enrichments where a batch UDF would be unnecessary.
def normalize_text(text: str) -> str: return text.strip().lower() normalized = rel.project("text").map( normalize_text, return_type="VARCHAR", execution_backend="subprocess_task", )
For model calls, media decoding, or anything that benefits from vectorization, prefer map_batches.
Resources and concurrency
Keep these controls separate:
- batch_size: rows per batch passed to map_batches.
- gpus: GPU resource request per callable execution where supported.
- cpus: CPU resource request per callable execution where supported.
- concurrency: number of executors or actors where supported.
Increase batch_size before increasing concurrency if the bottleneck is model throughput. Increase concurrency when workers are underutilized and memory headroom is available.
Some backends do not accept every resource or concurrency option. Validate settings on a representative sample before scaling.
Streaming behavior
map_batches exposes streaming_breaker for advanced execution-boundary control. Leave it unset unless you are measuring a specific pipeline behavior or debugging a boundary.
Backend selection
Use this progression:
- Build the UDF locally with subprocess_task or subprocess_actor.
- Verify schema, row counts, and error behavior on a representative sample.
- Move stateless functions to ray_task only when distribution helps.
- Move stateful model classes to ray_actor only when actor reuse, GPUs, or cluster placement are required.
- Tune batch size, resource requests, and concurrency with production-like data.