Typescript
Python (Enterprise)
Python (Startup)
Python (Enterprise)
Production Patterns
Real-world integration examples for production environments
⚠️ Private SDK Production Guide - This guide covers production patterns. Some features require private access and custom configurations.
Learn how to integrate the Lume Python SDK into production data pipelines, ETL workflows, and robust systems.
Overview
The SDK is designed for production use with reliability, monitoring, and scalability. This guide covers common integration patterns to use with Lume.
Airflow Integration
Basic Airflow DAG
Copy
Ask AI
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import lume
import os
# Configure environment
lume.configure(
token=os.getenv("LUME_TOKEN"),
api_url="<lume_url>"
)
def transform_csv_data(**context):
"""Transform CSV data using Lume SDK"""
try:
# Get file paths from Airflow context
input_files = context['dag_run'].conf.get('input_files', [])
flow_version = context['dag_run'].conf.get('flow_version', 'invoice_cleaner:v4')
# Run transformation
run = lume.run(
flow_version=flow_version,
input_files=input_files,
seed_files=["s3://reference/customer_lookup.csv"],
timeout=1800 # 30 minutes for large files
).wait()
# Log metrics
context['task_instance'].xcom_push(
key='lume_metrics',
value={
'error_rate': run.metrics.error_rate,
'runtime_seconds': run.metrics.runtime_seconds
}
)
# Download results
output_dir = f"/tmp/lume_output_{context['run_id']}"
run.download_all(output_dir, output_format="csv")
# Check quality thresholds
if run.metrics.error_rate > 0.05:
raise ValueError(f"Error rate too high: {run.metrics.error_rate:.2%}")
return output_dir
except Exception as e:
# Log error and notify support
context['task_instance'].log.error(f"Lume transformation failed: {e}")
raise
# Define DAG
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'lume_csv_transformation',
default_args=default_args,
description='Transform CSV data using Lume SDK',
schedule_interval='0 2 * * *', # Daily at 2 AM
catchup=False,
tags=['lume', 'enterprise', 'data-transformation']
)
transform_task = PythonOperator(
task_id='transform_csv_data',
python_callable=transform_csv_data,
dag=dag,
)
dbt Integration
dbt Model with Lume Preprocessing
Copy
Ask AI
# models/staging/stg_invoices.sql
-- This model depends on Lume-transformed data
-- The transformation happens in a separate process
{{ config(materialized='table') }}
SELECT
invoice_id,
customer_id,
invoice_date,
amount,
currency,
status
FROM {{ ref('lume_transformed_invoices') }}
WHERE error_rate < 0.05 -- Only high-quality data
dbt Hook for Lume Transformation
Copy
Ask AI
# macros/run_lume_transformation.sql
{% macro run_lume_transformation(input_files, flow_version) %}
{% set python_code %}
import lume
import os
# Configure
lume.configure(
token=os.getenv("LUME_TOKEN"),
api_url="<lume_api_url>"
)
# Run transformation
run = lume.run(
flow_version="{{ flow_version }}",
input_files={{ input_files }},
seed_files=["s3://reference/customer_lookup.csv"],
timeout=1800
).wait()
# Download to dbt staging area
output_dir = "{{ var('lume_output_dir', '/tmp/lume_output') }}"
run.download_all(output_dir, output_format="csv")
print(f"Transformation completed: {run.metrics.error_rate:.2%} error rate")
{% endset %}
{% do run_query(python_code) %}
{% endmacro %}
Transformation Script
Copy
Ask AI
# scripts/transform_data.py
import lume
import os
import json
from pathlib import Path
def main():
# Configure
lume.configure(
token=os.getenv("LUME_TOKEN"),
api_url=os.getenv("LUME_API_URL")
)
# Run transformation
run = lume.run(
flow_version="invoice_cleaner:v4",
input_files=["s3://raw-data/invoices.csv"],
seed_files=["s3://reference/customer_lookup.csv"],
timeout=1800
).wait()
# Download results
output_dir = Path("output")
output_dir.mkdir(exist_ok=True)
run.download_all(output_dir, output_format="csv")
# Save metrics for quality check
metrics = {
'error_rate': run.metrics.error_rate,
'runtime_seconds': run.metrics.runtime_seconds
}
with open(output_dir / "metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
# Fail if quality is poor
if run.metrics.error_rate > 0.05:
raise RuntimeError(f"Error rate too high: {run.metrics.error_rate:.2%}")
if __name__ == "__main__":
main()
Best Practices
Error Handling
Copy
Ask AI
def safe_quality_run(flow_version, input_files, **kwargs):
"""Safe wrapper for runs with proper error handling"""
try:
run = lume.run(
flow_version=flow_version,
input_files=input_files,
**kwargs
).wait()
# Check quality standards
if run.metrics.error_rate > 0.05:
raise ValueError(f"Error rate exceeds 5%: {run.metrics.error_rate:.2%}")
return run
except Exception as e:
# Log for monitoring
print(f"Run failed: {e}")
# Notify support
raise
Was this page helpful?
Assistant
Responses are generated using AI and may contain mistakes.