⚠️ Private Access Required - This guide assumes you have private SDK access. Contact your Lume representative for installation credentials and setup assistance.
Transform your first CSV file in under 5 minutes with this step-by-step guide.
Prerequisites
- ✅ SDK access granted
- ✅ SDK installed
- ✅ CSV file accessible via S3
- ✅ Flow created via the Lume app
Step 1: Choose a Flow Version
First, you need a Flow Version - a pre-built mapping template that defines how to transform your CSV data.
import lume
# Use a published flow version
flow_version = "invoice_cleaner:v4"
What is a Flow Version?
- A Flow is a mapping template created in the Lume UI
- A Version is an immutable snapshot of that flow (e.g.,
invoice_cleaner:v4
)
- You can only run Versions, not create Flows via the API
# Upload and transform your CSV file
run = lume.run(
flow_version=flow_version,
input_files=["s3://raw-bucket/invoices_2025-06.csv"]
).wait() # Wait for completion
This single call:
- Uploads your CSV file
- Applies the mapping transformation
- Validates the output
- Returns results
Step 3: Check Results
# Check the status
print(run.status) # "SUCCEEDED" | "PARTIAL_FAILED" | "FAILED" | "CRASHED"
# Download all output files (default: CSV format)
run.download_all("./output")
Step 4: Analyze Output
Your transformed data is now in the ./output
directory:
./output/
├── mapped/part-0000.csv # Successfully transformed data
├── rejects/part-0000.csv # Rows that failed validation
├── metrics.json # Summary statistics
└── validation_results.json # Detailed validation results
Complete Example
import lume
import json
import pathlib
# Transform the CSV file
run = lume.run(
flow_version="invoice_cleaner:v4",
input_files=["s3://raw-bucket/invoices_2025-06.csv"]
).wait()
# Download results as CSV (default)
run.download_all(pathlib.Path("./output"))
# Check quality metrics
metrics = json.load(open("./output/metrics.json"))
print(f"Error rate: {metrics['error_rate']:.2%}")
print(f"Rows processed: {metrics['row_counts']['input']}")
print(f"Rows mapped: {metrics['row_counts']['mapped']}")
print(f"Rows rejected: {metrics['row_counts']['rejects']}")
# Act on results
if metrics["error_rate"] < 0.05:
print("✅ Data quality is good - proceed with pipeline")
else:
print("❌ Too many errors - investigate rejects")
Using Seed Files
Seed files provide reference data that can be used during transformation (e.g., lookup tables, configuration data).
import lume
import pathlib
# Transform with seed files
run = lume.run(
flow_version="invoice_cleaner:v4",
input_files=["s3://raw-bucket/invoices.csv"],
seed_files=["s3://reference/customer_lookup.csv", "s3://reference/product_catalog.csv"]
).wait()
# Download results
run.download_all(pathlib.Path("./output"))
You can choose between CSV and JSON output formats:
# Download as CSV (default)
run.download_all("./output", output_format="csv")
# Download as JSON
run.download_all("./output", output_format="json")
Async Version
For non-blocking operations (web apps, event-driven systems):
import asyncio
import lume
async def transform_data():
run = await lume.arun(
flow_version="invoice_cleaner:v4",
input_files=["s3://raw-bucket/invoices.csv"]
)
# Optional: wait for completion
# await run.wait_async()
await run.download_all_async(pathlib.Path("./output"))
print(f"Error rate: {run.metrics.error_rate}")
# Run the async function
asyncio.run(transform_data())