⚠️ Private Access Required - This guide assumes you have private SDK access. Contact your Lume representative for installation credentials and setup assistance.

Transform your first CSV file in under 5 minutes with this step-by-step guide.

Prerequisites

  • ✅ SDK access granted
  • ✅ SDK installed
  • ✅ CSV file accessible via S3
  • ✅ Flow created via the Lume app

Step 1: Choose a Flow Version

First, you need a Flow Version - a pre-built mapping template that defines how to transform your CSV data.

import lume

# Use a published flow version
flow_version = "invoice_cleaner:v4"

What is a Flow Version?

  • A Flow is a mapping template created in the Lume UI
  • A Version is an immutable snapshot of that flow (e.g., invoice_cleaner:v4)
  • You can only run Versions, not create Flows via the API

Step 2: Transform Your CSV File

# Upload and transform your CSV file
run = lume.run(
    flow_version=flow_version,
    input_files=["s3://raw-bucket/invoices_2025-06.csv"]
).wait()  # Wait for completion

This single call:

  1. Uploads your CSV file
  2. Applies the mapping transformation
  3. Validates the output
  4. Returns results

Step 3: Check Results

# Check the status
print(run.status)  # "SUCCEEDED" | "PARTIAL_FAILED" | "FAILED" | "CRASHED"

# Download all output files (default: CSV format)
run.download_all("./output")

Step 4: Analyze Output

Your transformed data is now in the ./output directory:

./output/
├── mapped/part-0000.csv              # Successfully transformed data
├── rejects/part-0000.csv             # Rows that failed validation
├── metrics.json                      # Summary statistics
└── validation_results.json           # Detailed validation results

Complete Example

import lume
import json
import pathlib

# Transform the CSV file
run = lume.run(
    flow_version="invoice_cleaner:v4",
    input_files=["s3://raw-bucket/invoices_2025-06.csv"]
).wait()

# Download results as CSV (default)
run.download_all(pathlib.Path("./output"))

# Check quality metrics
metrics = json.load(open("./output/metrics.json"))
print(f"Error rate: {metrics['error_rate']:.2%}")
print(f"Rows processed: {metrics['row_counts']['input']}")
print(f"Rows mapped: {metrics['row_counts']['mapped']}")
print(f"Rows rejected: {metrics['row_counts']['rejects']}")

# Act on results
if metrics["error_rate"] < 0.05:
    print("✅ Data quality is good - proceed with pipeline")
else:
    print("❌ Too many errors - investigate rejects")

Using Seed Files

Seed files provide reference data that can be used during transformation (e.g., lookup tables, configuration data).

import lume
import pathlib

# Transform with seed files
run = lume.run(
    flow_version="invoice_cleaner:v4",
    input_files=["s3://raw-bucket/invoices.csv"],
    seed_files=["s3://reference/customer_lookup.csv", "s3://reference/product_catalog.csv"]
).wait()

# Download results
run.download_all(pathlib.Path("./output"))

Output Format Options

You can choose between CSV and JSON output formats:

# Download as CSV (default)
run.download_all("./output", output_format="csv")

# Download as JSON
run.download_all("./output", output_format="json")

Async Version

For non-blocking operations (web apps, event-driven systems):

import asyncio
import lume

async def transform_data():
    run = await lume.arun(
        flow_version="invoice_cleaner:v4",
        input_files=["s3://raw-bucket/invoices.csv"]
    )
    
    # Optional: wait for completion
    # await run.wait_async()
    
    await run.download_all_async(pathlib.Path("./output"))
    print(f"Error rate: {run.metrics.error_rate}")

# Run the async function
asyncio.run(transform_data())