⚠️ Private SDK Production Guide - This guide covers production patterns. Some features require private access and custom configurations.

Learn how to integrate the Lume Python SDK into production data pipelines, ETL workflows, and robust systems.

Overview

The SDK is designed for production use with reliability, monitoring, and scalability. This guide covers common integration patterns to use with Lume.

Airflow Integration

Basic Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import lume
import os

# Configure environment
lume.configure(
    token=os.getenv("LUME_TOKEN"),
    api_url="<lume_url>"
)

def transform_csv_data(**context):
    """Transform CSV data using Lume SDK"""
    try:
        # Get file paths from Airflow context
        input_files = context['dag_run'].conf.get('input_files', [])
        flow_version = context['dag_run'].conf.get('flow_version', 'invoice_cleaner:v4')
        
        # Run transformation
        run = lume.run(
            flow_version=flow_version,
            input_files=input_files,
            seed_files=["s3://reference/customer_lookup.csv"],
            timeout=1800  # 30 minutes for large files
        ).wait()
        
        # Log metrics
        context['task_instance'].xcom_push(
            key='lume_metrics',
            value={
                'error_rate': run.metrics.error_rate,
                'runtime_seconds': run.metrics.runtime_seconds
            }
        )
        
        # Download results
        output_dir = f"/tmp/lume_output_{context['run_id']}"
        run.download_all(output_dir, output_format="csv")
        
        # Check quality thresholds
        if run.metrics.error_rate > 0.05:
            raise ValueError(f"Error rate too high: {run.metrics.error_rate:.2%}")
        
        return output_dir
        
    except Exception as e:
        # Log error and notify support
        context['task_instance'].log.error(f"Lume transformation failed: {e}")
        raise

# Define DAG
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'lume_csv_transformation',
    default_args=default_args,
    description='Transform CSV data using Lume SDK',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    catchup=False,
    tags=['lume', 'enterprise', 'data-transformation']
)

transform_task = PythonOperator(
    task_id='transform_csv_data',
    python_callable=transform_csv_data,
    dag=dag,
)

dbt Integration

dbt Model with Lume Preprocessing

# models/staging/stg_invoices.sql
-- This model depends on Lume-transformed data
-- The transformation happens in a separate process

{{ config(materialized='table') }}

SELECT 
    invoice_id,
    customer_id,
    invoice_date,
    amount,
    currency,
    status
FROM {{ ref('lume_transformed_invoices') }}
WHERE error_rate < 0.05  -- Only high-quality data

dbt Hook for Lume Transformation

# macros/run_lume_transformation.sql
{% macro run_lume_transformation(input_files, flow_version) %}
    {% set python_code %}
import lume
import os

# Configure
lume.configure(
    token=os.getenv("LUME_TOKEN"),
    api_url="<lume_api_url>"
)

# Run transformation
run = lume.run(
    flow_version="{{ flow_version }}",
    input_files={{ input_files }},
    seed_files=["s3://reference/customer_lookup.csv"],
    timeout=1800
).wait()

# Download to dbt staging area
output_dir = "{{ var('lume_output_dir', '/tmp/lume_output') }}"
run.download_all(output_dir, output_format="csv")

print(f"Transformation completed: {run.metrics.error_rate:.2%} error rate")
    {% endset %}
    
    {% do run_query(python_code) %}
{% endmacro %}

Transformation Script

# scripts/transform_data.py
import lume
import os
import json
from pathlib import Path

def main():
    # Configure
    lume.configure(
        token=os.getenv("LUME_TOKEN"),
        api_url=os.getenv("LUME_API_URL")
    )
    
    # Run transformation
    run = lume.run(
        flow_version="invoice_cleaner:v4",
        input_files=["s3://raw-data/invoices.csv"],
        seed_files=["s3://reference/customer_lookup.csv"],
        timeout=1800
    ).wait()
    
    # Download results
    output_dir = Path("output")
    output_dir.mkdir(exist_ok=True)
    
    run.download_all(output_dir, output_format="csv")
    
    # Save metrics for quality check
    metrics = {
        'error_rate': run.metrics.error_rate,
        'runtime_seconds': run.metrics.runtime_seconds
    }
    
    with open(output_dir / "metrics.json", "w") as f:
        json.dump(metrics, f, indent=2)
    
    # Fail if quality is poor
    if run.metrics.error_rate > 0.05:
        raise RuntimeError(f"Error rate too high: {run.metrics.error_rate:.2%}")

if __name__ == "__main__":
    main()

Best Practices

Error Handling

def safe_quality_run(flow_version, input_files, **kwargs):
    """Safe wrapper for runs with proper error handling"""
    try:
        run = lume.run(
            flow_version=flow_version,
            input_files=input_files,
            **kwargs
        ).wait()
        
        # Check quality standards
        if run.metrics.error_rate > 0.05:
            raise ValueError(f"Error rate exceeds 5%: {run.metrics.error_rate:.2%}")
        
        return run
        
    except Exception as e:
        # Log for monitoring
        print(f"Run failed: {e}")
        # Notify support
        raise