Vane Data / Concepts

UDFs

Vane Data UDFs let you run Python inside a relation pipeline without giving up explicit schemas, batch control, or a path to distributed execution.

Use UDFs when a step needs Python libraries, model state, custom decoding, external clients, or row expansion. Keep relational work in SQL, then use UDFs for the part SQL should not own.

Public APIs

Vane Data exposes three relation-level UDF APIs:

APIShapeTypical use
map_batches(function, schema=..., ...)Arrow batch to Arrow batchVectorized transforms, model inference, decoding
flat_map(function, schema=..., ...)Row to zero, one, or many rowsChunking, splitting, expanding nested records
map(map_function, return_type=..., ...)Row-wise scalar valueSmall scalar enrichments

Core parameters:

  • schema: output columns and types for relation-producing UDFs.
  • return_type: scalar output type for map.
  • batch_size: rows per map_batches call.
  • execution_backend: where the callable runs.
  • cpus and gpus: resource requests where supported.
  • concurrency: number of executors or actors where supported.
  • streaming_breaker: advanced map_batches execution-boundary control.

Unsupported keyword arguments are rejected early, before the relation is rendered.

Execution backends

BackendCallable shapeBest for
subprocess_taskPython functionLocal stateless batch transforms
subprocess_actorCallable classLocal model or client reuse
ray_taskPython functionDistributed stateless transforms
ray_actorCallable classDistributed stateful inference, GPU-backed models

Pass a function to task backends. Pass a class, not a pre-created instance, to actor backends so Vane Data can construct and reuse the object inside the executor.

Batch UDF contract

map_batches receives Arrow data and returns Arrow data. The returned table must match the declared output schema.

example.py
import pyarrow as pa
import vane


con = vane.connect()
rel = con.sql("""
    select *
    from (
        values
            (1, 'hello'),
            (2, 'vane data')
    ) as t(id, text)
""")


def add_features(batch: pa.Table) -> pa.Table:
    values = batch.column("text").to_pylist()
    return pa.table({
        "length": [len(str(value)) for value in values],
        "upper_text": [str(value).upper() for value in values],
    })


features = rel.map_batches(
    add_features,
    schema={
        "length": "BIGINT",
        "upper_text": "VARCHAR",
    },
    batch_size=1024,
    execution_backend="subprocess_task",
)


features.show()

Batch UDFs are the default choice for model and media workloads because they amortize Python overhead and let model libraries process multiple rows at once.

Stateful class UDFs

Use actor backends when setup is expensive and should be reused across batches.

example.py
import pyarrow as pa


class EmbedBatch:
    def __init__(self) -> None:
        from sentence_transformers import SentenceTransformer


        self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")


    def __call__(self, batch: pa.Table) -> pa.Table:
        text = batch.column("text").to_pylist()
        embeddings = self.model.encode(text).tolist()
        return pa.table({"embedding": embeddings})


embedded = rel.map_batches(
    EmbedBatch,
    schema={"embedding": "FLOAT[]"},
    batch_size=64,
    execution_backend="subprocess_actor",
    concurrency=2,
)

This relation contains the declared embedding column. Return source identifiers from the UDF as well when later stages need to join embeddings back to input rows.

For distributed GPU inference, use the same callable class shape with ray_actor:

example.py
embedded = rel.map_batches(
    EmbedBatch,
    schema={"embedding": "FLOAT[]"},
    batch_size=64,
    execution_backend="ray_actor",
    gpus=1,
    concurrency=4,
)

Every Ray worker must be able to import the class and access the required model files or credentials.

One-to-many transforms

Use flat_map when one input row can produce zero, one, or many output rows.

example.py
def split_terms(row: dict) -> list[dict]:
    return [{"term": term} for term in str(row["text"]).split()]


terms = rel.flat_map(
    split_terms,
    schema={"term": "VARCHAR"},
    execution_backend="subprocess_task",
)

Typical uses include document chunking, URL expansion, nested record normalization, and filtering by returning an empty list.

Scalar transforms

Use map for small row-wise scalar enrichments where a batch UDF would be unnecessary.

example.py
def normalize_text(text: str) -> str:
    return text.strip().lower()


normalized = rel.project("text").map(
    normalize_text,
    return_type="VARCHAR",
    execution_backend="subprocess_task",
)

For model calls, media decoding, or anything that benefits from vectorization, prefer map_batches.

Resources and concurrency

Keep these controls separate:

  • batch_size: rows per batch passed to map_batches.
  • gpus: GPU resource request per callable execution where supported.
  • cpus: CPU resource request per callable execution where supported.
  • concurrency: number of executors or actors where supported.

Increase batch_size before increasing concurrency if the bottleneck is model throughput. Increase concurrency when workers are underutilized and memory headroom is available.

Some backends do not accept every resource or concurrency option. Validate settings on a representative sample before scaling.

Streaming behavior

map_batches exposes streaming_breaker for advanced execution-boundary control. Leave it unset unless you are measuring a specific pipeline behavior or debugging a boundary.

Backend selection

Use this progression:

  1. Build the UDF locally with subprocess_task or subprocess_actor.
  2. Verify schema, row counts, and error behavior on a representative sample.
  3. Move stateless functions to ray_task only when distribution helps.
  4. Move stateful model classes to ray_actor only when actor reuse, GPUs, or cluster placement are required.
  5. Tune batch size, resource requests, and concurrency with production-like data.