Training Data Pipeline
This example page shows a common pattern for preparing text training or retrieval data with Vane Data.
The repository scripts that cover this area are:
- examples/common_crawl.py
- examples/minhash_dedupe.py
- examples/llms_red_pajamas.py
Goal
Prepare text data by filtering raw records, chunking documents, generating embeddings, and removing duplicates or near duplicates before writing a curated dataset.
Run local samples
The scripts provide sample modes so you can inspect the workflow without Common Crawl, Red Pajama, or network access:
python examples/common_crawl.py --source sample --limit 5 python examples/minhash_dedupe.py --source sample --limit 10 python examples/llms_red_pajamas.py --source sample --limit 6
Pipeline shape
- Load raw text records.
- Filter by language, source type, and minimum text length.
- Chunk text into model-sized records.
- Generate embeddings with rel.embed_text(...) or a custom UDF.
- Generate duplicate candidates with MinHash or another similarity method.
- Keep representative rows.
- Write clean chunks, metadata, and optional embeddings.
Minimal code shape
The code below shows the relation contract. The chunking UDF returns all columns needed by later stages.
import pyarrow as pa import vane con = vane.connect() docs = con.sql(""" select id, text, source_uri from read_parquet('data/raw_text/*.parquet') where text is not null and length(text) > 80 """) def chunk_rows(batch: pa.Table) -> pa.Table: ids = batch.column("id").to_pylist() texts = batch.column("text").to_pylist() source_uris = batch.column("source_uri").to_pylist() out_id = [] out_chunk_id = [] out_text = [] out_source_uri = [] for row_id, text, source_uri in zip(ids, texts, source_uris, strict=False): chunks = [text[i : i + 512] for i in range(0, len(text), 512)] for chunk_id, chunk in enumerate(chunks): out_id.append(row_id) out_chunk_id.append(chunk_id) out_text.append(chunk) out_source_uri.append(source_uri) return pa.table({ "id": out_id, "chunk_id": out_chunk_id, "chunk_text": out_text, "source_uri": out_source_uri, }) chunks = docs.map_batches( chunk_rows, schema={ "id": "BIGINT", "chunk_id": "BIGINT", "chunk_text": "VARCHAR", "source_uri": "VARCHAR", }, batch_size=512, execution_backend="subprocess_task", )
Embeddings
rel.embed_text(...) returns a relation containing the embedding output column. If the final dataset needs the original chunk columns as well, explicitly combine the embedding column with the chunk relation after validating row counts, or use a custom UDF that returns the complete output schema.
embedding_only = chunks.embed_text( "chunk_text", provider="transformers", model="sentence-transformers/all-MiniLM-L6-v2", output_column="embedding", execution_backend="subprocess_actor", ) chunk_table = chunks.to_arrow_table() embedding_table = embedding_only.to_arrow_table() embedded = con.from_arrow(chunk_table.append_column("embedding", embedding_table["embedding"])) embedded.write_parquet("output/training_chunks/")
For large datasets, prefer the implementation pattern used by examples/common_crawl.py, where the combine step is explicit and validated, or implement embedding as a custom batch UDF that returns all required columns.
Scale-out
Switch the runner and the expensive UDF or AI helper stage after the local path is correct:
import vane vane.configure(runner="ray") embedding_only = chunks.embed_text( "chunk_text", provider="transformers", execution_backend="ray_actor", )
Every Ray worker must be able to import the UDF code and access the same storage, model files, and Python dependencies.
Recommended output columns
- id
- chunk_id
- chunk_text
- source_uri
- embedding
- duplicate_component_id
- is_representative
Scope notes
Vane Data does not provide a complete training-data quality framework. It provides the SQL, UDF, AI helper, and execution tools used to build that framework in your own pipeline.