Master advanced features and optimization techniques to build robust, scalable data transformation pipelines.

Batch Processing

Process Multiple Files Efficiently

import lume
import asyncio
from typing import List
from pathlib import Path

def process_batch_sync(files: List[str], flow_version: str, batch_size: int = 5):
    """Process files in batches synchronously."""
    results = []
    
    for i in range(0, len(files), batch_size):
        batch = files[i:i + batch_size]
        print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
        
        run = lume.run(
            flow_version=flow_version,
            input_files=batch,
            retry_policy=lume.Retry(times=3, backoff=2.0)
        ).wait()
        
        results.append(run)
    
    return results

async def process_batch_async(files: List[str], flow_version: str, batch_size: int = 5):
    """Process files in batches asynchronously."""
    results = []
    
    for i in range(0, len(files), batch_size):
        batch = files[i:i + batch_size]
        print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
        
        run = await lume.arun(
            flow_version=flow_version,
            input_files=batch
        )
        await run.wait_async()
        
        results.append(run)
    
    return results

# Usage
files = [
    "s3://bucket/file1.csv",
    "s3://bucket/file2.csv",
    "s3://bucket/file3.csv",
    # ... more files
]

# Sync processing
runs = process_batch_sync(files, "invoice_cleaner:v4", batch_size=3)

# Async processing
runs = asyncio.run(process_batch_async(files, "invoice_cleaner:v4", batch_size=3))

Concurrent Processing

import asyncio
import lume
from typing import List

async def process_concurrent(files: List[str], flow_version: str, max_concurrent: int = 3):
    """Process multiple files concurrently with rate limiting."""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_single(file: str):
        async with semaphore:
            run = await lume.arun(
                flow_version=flow_version,
                input_files=[file]
            )
            await run.wait_async()
            return run
    
    tasks = [process_single(file) for file in files]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Filter out exceptions
    successful_runs = [r for r in results if not isinstance(r, Exception)]
    failed_runs = [r for r in results if isinstance(r, Exception)]
    
    return successful_runs, failed_runs

# Usage
files = ["s3://bucket/file1.csv", "s3://bucket/file2.csv", "s3://bucket/file3.csv"]
successful, failed = asyncio.run(process_concurrent(files, "invoice_cleaner:v4"))

Error Handling & Resilience

Comprehensive Error Handling

import lume
from lume.exceptions import (
    AuthenticationError, ValidationError, NotFoundError,
    RateLimitError, TimeoutError, ServerError
)
import logging
from typing import Optional, Dict, Any

logger = logging.getLogger(__name__)

class LumeTransformer:
    def __init__(self, flow_version: str, max_retries: int = 3):
        self.flow_version = flow_version
        self.max_retries = max_retries
    
    def transform_with_fallback(self, input_files: List[str]) -> Optional[lume.Run]:
        """Transform files with comprehensive error handling and fallback strategies."""
        
        for attempt in range(self.max_retries):
            try:
                logger.info(f"Attempt {attempt + 1} for files: {input_files}")
                
                run = lume.run(
                    flow_version=self.flow_version,
                    input_files=input_files,
                    retry_policy=lume.Retry(times=2, backoff=2.0)
                ).wait()
                
                logger.info(f"Success: Run {run.id} completed with status {run.status}")
                return run
                
            except AuthenticationError as e:
                logger.error(f"Authentication failed: {e}")
                raise  # Don't retry auth errors
                
            except ValidationError as e:
                logger.error(f"Validation error: {e}")
                raise  # Don't retry validation errors
                
            except NotFoundError as e:
                logger.error(f"Resource not found: {e}")
                raise  # Don't retry not found errors
                
            except RateLimitError as e:
                logger.warning(f"Rate limited (attempt {attempt + 1}): {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponential backoff
                    continue
                raise
                
            except TimeoutError as e:
                logger.warning(f"Timeout (attempt {attempt + 1}): {e}")
                if attempt < self.max_retries - 1:
                    continue
                raise
                
            except ServerError as e:
                logger.error(f"Server error (attempt {attempt + 1}): {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(2 ** attempt)
                    continue
                raise
                
            except Exception as e:
                logger.error(f"Unexpected error (attempt {attempt + 1}): {e}")
                if attempt < self.max_retries - 1:
                    continue
                raise
        
        return None

# Usage
transformer = LumeTransformer("invoice_cleaner:v4")
run = transformer.transform_with_fallback(["s3://bucket/file.csv"])

Circuit Breaker Pattern

import lume
import time
from enum import Enum
from typing import Optional, Callable

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Circuit is open, requests fail fast
    HALF_OPEN = "half_open"  # Testing if service is back

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func: Callable, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage
breaker = CircuitBreaker(
    failure_threshold=3,
    recovery_timeout=60.0,
    expected_exception=(lume.ServerError, lume.TimeoutError)
)

try:
    run = breaker.call(
        lambda: lume.run(
            flow_version="invoice_cleaner:v4",
            input_files=["s3://bucket/file.csv"]
        ).wait()
    )
except Exception as e:
    print(f"Circuit breaker prevented call: {e}")

Performance Optimization

Streaming and Chunking

import lume
import pandas as pd
from typing import Iterator, List
import tempfile
import os

def chunk_large_file(file_path: str, chunk_size: int = 10000) -> Iterator[str]:
    """Split a large CSV file into smaller chunks."""
    
    df_iterator = pd.read_csv(file_path, chunksize=chunk_size)
    
    for i, chunk in enumerate(df_iterator):
        # Create temporary file for chunk
        with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
            chunk.to_csv(f.name, index=False)
            yield f.name

def process_large_file_streaming(file_path: str, flow_version: str):
    """Process a large file by streaming it in chunks."""
    
    results = []
    
    for chunk_file in chunk_large_file(file_path, chunk_size=5000):
        try:
            run = lume.run(
                flow_version=flow_version,
                input_files=[chunk_file]
            ).wait()
            
            results.append(run)
            
        finally:
            # Clean up temporary file
            os.unlink(chunk_file)
    
    return results

# Usage
results = process_large_file_streaming("large_file.csv", "invoice_cleaner:v4")

Memory-Efficient Processing

import lume
import gc
import psutil
import os

class MemoryAwareProcessor:
    def __init__(self, memory_threshold: float = 0.8):
        self.memory_threshold = memory_threshold
    
    def get_memory_usage(self) -> float:
        """Get current memory usage as a percentage."""
        return psutil.virtual_memory().percent / 100
    
    def should_gc(self) -> bool:
        """Check if garbage collection is needed."""
        return self.get_memory_usage() > self.memory_threshold
    
    def process_with_memory_management(self, files: List[str], flow_version: str):
        """Process files with automatic memory management."""
        
        results = []
        
        for i, file in enumerate(files):
            print(f"Processing file {i+1}/{len(files)}: {file}")
            
            # Check memory usage
            if self.should_gc():
                print("High memory usage detected, running garbage collection...")
                gc.collect()
            
            # Process file
            run = lume.run(
                flow_version=flow_version,
                input_files=[file]
            ).wait()
            
            results.append(run)
            
            # Force garbage collection after each file
            gc.collect()
        
        return results

# Usage
processor = MemoryAwareProcessor(memory_threshold=0.75)
results = processor.process_with_memory_management(files, "invoice_cleaner:v4")

Monitoring & Observability

Custom Metrics Collection

import lume
import time
import json
from dataclasses import dataclass, asdict
from typing import Dict, Any
from datetime import datetime

@dataclass
class TransformMetrics:
    run_id: str
    flow_version: str
    input_files: List[str]
    start_time: datetime
    end_time: datetime
    duration_seconds: float
    status: str
    error_rate: float
    input_rows: int
    output_rows: int
    reject_rows: int
    memory_usage_mb: float
    cpu_usage_percent: float

class MetricsCollector:
    def __init__(self):
        self.metrics_history = []
    
    def collect_metrics(self, run: lume.Run, start_time: datetime) -> TransformMetrics:
        """Collect comprehensive metrics for a run."""
        
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        # Get system metrics
        memory_mb = psutil.virtual_memory().used / (1024 * 1024)
        cpu_percent = psutil.cpu_percent()
        
        metrics = TransformMetrics(
            run_id=run.id,
            flow_version=run.flow_version,
            input_files=run.input_files,
            start_time=start_time,
            end_time=end_time,
            duration_seconds=duration,
            status=run.status,
            error_rate=run.metrics.error_rate,
            input_rows=run.metrics.row_counts.input,
            output_rows=run.metrics.row_counts.mapped,
            reject_rows=run.metrics.row_counts.rejects,
            memory_usage_mb=memory_mb,
            cpu_usage_percent=cpu_percent
        )
        
        self.metrics_history.append(metrics)
        return metrics
    
    def export_metrics(self, filepath: str):
        """Export metrics to JSON file."""
        with open(filepath, 'w') as f:
            json.dump([asdict(m) for m in self.metrics_history], f, indent=2, default=str)
    
    def get_summary_stats(self) -> Dict[str, Any]:
        """Get summary statistics from collected metrics."""
        if not self.metrics_history:
            return {}
        
        durations = [m.duration_seconds for m in self.metrics_history]
        error_rates = [m.error_rate for m in self.metrics_history]
        
        return {
            "total_runs": len(self.metrics_history),
            "avg_duration_seconds": sum(durations) / len(durations),
            "avg_error_rate": sum(error_rates) / len(error_rates),
            "successful_runs": len([m for m in self.metrics_history if m.status == "SUCCEEDED"]),
            "failed_runs": len([m for m in self.metrics_history if m.status in ["FAILED", "CRASHED"]])
        }

# Usage
collector = MetricsCollector()

start_time = datetime.now()
run = lume.run(flow_version="invoice_cleaner:v4", input_files=files).wait()
metrics = collector.collect_metrics(run, start_time)

print(f"Run completed in {metrics.duration_seconds:.2f} seconds")
print(f"Error rate: {metrics.error_rate:.2%}")

# Export metrics
collector.export_metrics("transform_metrics.json")
print("Summary:", collector.get_summary_stats())

Real-time Monitoring

import lume
import asyncio
import logging
from typing import Callable, Optional

class RealTimeMonitor:
    def __init__(self, on_status_change: Optional[Callable] = None):
        self.on_status_change = on_status_change
        self.logger = logging.getLogger(__name__)
    
    async def monitor_run(self, run: lume.Run, poll_interval: float = 5.0):
        """Monitor a run in real-time with status updates."""
        
        last_status = run.status
        self.logger.info(f"Starting monitoring for run {run.id}")
        
        while run.status in ["QUEUED", "RUNNING"]:
            await asyncio.sleep(poll_interval)
            await run.refresh()
            
            if run.status != last_status:
                self.logger.info(f"Status changed: {last_status}{run.status}")
                last_status = run.status
                
                if self.on_status_change:
                    self.on_status_change(run)
        
        # Final status
        self.logger.info(f"Run {run.id} completed with status: {run.status}")
        
        if run.status == "SUCCEEDED":
            self.logger.info(f"Success! Processed {run.metrics.row_counts.input} rows")
        elif run.status == "PARTIAL_FAILED":
            self.logger.warning(f"Partial failure. Error rate: {run.metrics.error_rate:.2%}")
        else:
            self.logger.error(f"Run failed with status: {run.status}")
        
        return run

# Usage
def status_callback(run: lume.Run):
    print(f"Status update: {run.status}")

monitor = RealTimeMonitor(on_status_change=status_callback)

async def main():
    run = await lume.arun(
        flow_version="invoice_cleaner:v4",
        input_files=["s3://bucket/file.csv"]
    )
    
    await monitor.monitor_run(run, poll_interval=10.0)

asyncio.run(main())

Advanced Configuration

Custom Configuration Management

import lume
import os
from typing import Dict, Any
from dataclasses import dataclass

@dataclass
class LumeConfig:
    token: str
    api_url: str = "https://api.lume.com"
    timeout: int = 300
    max_retries: int = 3
    retry_backoff: float = 2.0
    max_concurrent: int = 5
    memory_threshold: float = 0.8
    
    @classmethod
    def from_env(cls) -> 'LumeConfig':
        """Create config from environment variables."""
        return cls(
            token=os.environ["LUME_TOKEN"],
            api_url=os.environ.get("LUME_API_URL", "https://api.lume.com"),
            timeout=int(os.environ.get("LUME_TIMEOUT", "300")),
            max_retries=int(os.environ.get("LUME_MAX_RETRIES", "3")),
            retry_backoff=float(os.environ.get("LUME_RETRY_BACKOFF", "2.0")),
            max_concurrent=int(os.environ.get("LUME_MAX_CONCURRENT", "5")),
            memory_threshold=float(os.environ.get("LUME_MEMORY_THRESHOLD", "0.8"))
        )
    
    def to_env(self):
        """Set environment variables from config."""
        os.environ["LUME_TOKEN"] = self.token
        os.environ["LUME_API_URL"] = self.api_url
        os.environ["LUME_TIMEOUT"] = str(self.timeout)
        os.environ["LUME_MAX_RETRIES"] = str(self.max_retries)
        os.environ["LUME_RETRY_BACKOFF"] = str(self.retry_backoff)
        os.environ["LUME_MAX_CONCURRENT"] = str(self.max_concurrent)
        os.environ["LUME_MEMORY_THRESHOLD"] = str(self.memory_threshold)

class AdvancedLumeClient:
    def __init__(self, config: LumeConfig):
        self.config = config
        self.config.to_env()
        
        # Configure retry policy
        self.retry_policy = lume.Retry(
            times=config.max_retries,
            backoff=config.retry_backoff
        )
    
    def run_with_config(self, flow_version: str, input_files: List[str]) -> lume.Run:
        """Run transformation with custom configuration."""
        return lume.run(
            flow_version=flow_version,
            input_files=input_files,
            retry_policy=self.retry_policy
        ).wait()

# Usage
config = LumeConfig.from_env()
client = AdvancedLumeClient(config)

run = client.run_with_config(
    flow_version="invoice_cleaner:v4",
    input_files=["s3://bucket/file.csv"]
)

Best Practices Summary

Performance

  • Use batch processing for multiple files
  • Implement concurrent processing with rate limiting
  • Monitor memory usage and implement garbage collection
  • Use streaming for large files

Reliability

  • Implement comprehensive error handling
  • Use circuit breaker patterns for fault tolerance
  • Implement exponential backoff for retries
  • Monitor and log all operations

Monitoring

  • Collect detailed metrics for all runs
  • Implement real-time monitoring
  • Export metrics for analysis
  • Set up alerts for failures

Configuration

  • Use environment variables for configuration
  • Implement configuration validation
  • Support different environments (dev, staging, prod)
  • Use type hints for better code quality