Typescript
Python (Enterprise)
Python (Startup)
Python (Enterprise)
Advanced Usage
Advanced features and optimization techniques
Master advanced features and optimization techniques to build robust, scalable data transformation pipelines.
Batch Processing
Process Multiple Files Efficiently
Copy
Ask AI
import lume
import asyncio
from typing import List
from pathlib import Path
def process_batch_sync(files: List[str], flow_version: str, batch_size: int = 5):
"""Process files in batches synchronously."""
results = []
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
run = lume.run(
flow_version=flow_version,
input_files=batch,
retry_policy=lume.Retry(times=3, backoff=2.0)
).wait()
results.append(run)
return results
async def process_batch_async(files: List[str], flow_version: str, batch_size: int = 5):
"""Process files in batches asynchronously."""
results = []
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
run = await lume.arun(
flow_version=flow_version,
input_files=batch
)
await run.wait_async()
results.append(run)
return results
# Usage
files = [
"s3://bucket/file1.csv",
"s3://bucket/file2.csv",
"s3://bucket/file3.csv",
# ... more files
]
# Sync processing
runs = process_batch_sync(files, "invoice_cleaner:v4", batch_size=3)
# Async processing
runs = asyncio.run(process_batch_async(files, "invoice_cleaner:v4", batch_size=3))
Concurrent Processing
Copy
Ask AI
import asyncio
import lume
from typing import List
async def process_concurrent(files: List[str], flow_version: str, max_concurrent: int = 3):
"""Process multiple files concurrently with rate limiting."""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_single(file: str):
async with semaphore:
run = await lume.arun(
flow_version=flow_version,
input_files=[file]
)
await run.wait_async()
return run
tasks = [process_single(file) for file in files]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
successful_runs = [r for r in results if not isinstance(r, Exception)]
failed_runs = [r for r in results if isinstance(r, Exception)]
return successful_runs, failed_runs
# Usage
files = ["s3://bucket/file1.csv", "s3://bucket/file2.csv", "s3://bucket/file3.csv"]
successful, failed = asyncio.run(process_concurrent(files, "invoice_cleaner:v4"))
Error Handling & Resilience
Comprehensive Error Handling
Copy
Ask AI
import lume
from lume.exceptions import (
AuthenticationError, ValidationError, NotFoundError,
RateLimitError, TimeoutError, ServerError
)
import logging
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class LumeTransformer:
def __init__(self, flow_version: str, max_retries: int = 3):
self.flow_version = flow_version
self.max_retries = max_retries
def transform_with_fallback(self, input_files: List[str]) -> Optional[lume.Run]:
"""Transform files with comprehensive error handling and fallback strategies."""
for attempt in range(self.max_retries):
try:
logger.info(f"Attempt {attempt + 1} for files: {input_files}")
run = lume.run(
flow_version=self.flow_version,
input_files=input_files,
retry_policy=lume.Retry(times=2, backoff=2.0)
).wait()
logger.info(f"Success: Run {run.id} completed with status {run.status}")
return run
except AuthenticationError as e:
logger.error(f"Authentication failed: {e}")
raise # Don't retry auth errors
except ValidationError as e:
logger.error(f"Validation error: {e}")
raise # Don't retry validation errors
except NotFoundError as e:
logger.error(f"Resource not found: {e}")
raise # Don't retry not found errors
except RateLimitError as e:
logger.warning(f"Rate limited (attempt {attempt + 1}): {e}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise
except TimeoutError as e:
logger.warning(f"Timeout (attempt {attempt + 1}): {e}")
if attempt < self.max_retries - 1:
continue
raise
except ServerError as e:
logger.error(f"Server error (attempt {attempt + 1}): {e}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
except Exception as e:
logger.error(f"Unexpected error (attempt {attempt + 1}): {e}")
if attempt < self.max_retries - 1:
continue
raise
return None
# Usage
transformer = LumeTransformer("invoice_cleaner:v4")
run = transformer.transform_with_fallback(["s3://bucket/file.csv"])
Circuit Breaker Pattern
Copy
Ask AI
import lume
import time
from enum import Enum
from typing import Optional, Callable
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Circuit is open, requests fail fast
HALF_OPEN = "half_open" # Testing if service is back
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func: Callable, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=60.0,
expected_exception=(lume.ServerError, lume.TimeoutError)
)
try:
run = breaker.call(
lambda: lume.run(
flow_version="invoice_cleaner:v4",
input_files=["s3://bucket/file.csv"]
).wait()
)
except Exception as e:
print(f"Circuit breaker prevented call: {e}")
Performance Optimization
Streaming and Chunking
Copy
Ask AI
import lume
import pandas as pd
from typing import Iterator, List
import tempfile
import os
def chunk_large_file(file_path: str, chunk_size: int = 10000) -> Iterator[str]:
"""Split a large CSV file into smaller chunks."""
df_iterator = pd.read_csv(file_path, chunksize=chunk_size)
for i, chunk in enumerate(df_iterator):
# Create temporary file for chunk
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
chunk.to_csv(f.name, index=False)
yield f.name
def process_large_file_streaming(file_path: str, flow_version: str):
"""Process a large file by streaming it in chunks."""
results = []
for chunk_file in chunk_large_file(file_path, chunk_size=5000):
try:
run = lume.run(
flow_version=flow_version,
input_files=[chunk_file]
).wait()
results.append(run)
finally:
# Clean up temporary file
os.unlink(chunk_file)
return results
# Usage
results = process_large_file_streaming("large_file.csv", "invoice_cleaner:v4")
Memory-Efficient Processing
Copy
Ask AI
import lume
import gc
import psutil
import os
class MemoryAwareProcessor:
def __init__(self, memory_threshold: float = 0.8):
self.memory_threshold = memory_threshold
def get_memory_usage(self) -> float:
"""Get current memory usage as a percentage."""
return psutil.virtual_memory().percent / 100
def should_gc(self) -> bool:
"""Check if garbage collection is needed."""
return self.get_memory_usage() > self.memory_threshold
def process_with_memory_management(self, files: List[str], flow_version: str):
"""Process files with automatic memory management."""
results = []
for i, file in enumerate(files):
print(f"Processing file {i+1}/{len(files)}: {file}")
# Check memory usage
if self.should_gc():
print("High memory usage detected, running garbage collection...")
gc.collect()
# Process file
run = lume.run(
flow_version=flow_version,
input_files=[file]
).wait()
results.append(run)
# Force garbage collection after each file
gc.collect()
return results
# Usage
processor = MemoryAwareProcessor(memory_threshold=0.75)
results = processor.process_with_memory_management(files, "invoice_cleaner:v4")
Monitoring & Observability
Custom Metrics Collection
Copy
Ask AI
import lume
import time
import json
from dataclasses import dataclass, asdict
from typing import Dict, Any
from datetime import datetime
@dataclass
class TransformMetrics:
run_id: str
flow_version: str
input_files: List[str]
start_time: datetime
end_time: datetime
duration_seconds: float
status: str
error_rate: float
input_rows: int
output_rows: int
reject_rows: int
memory_usage_mb: float
cpu_usage_percent: float
class MetricsCollector:
def __init__(self):
self.metrics_history = []
def collect_metrics(self, run: lume.Run, start_time: datetime) -> TransformMetrics:
"""Collect comprehensive metrics for a run."""
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# Get system metrics
memory_mb = psutil.virtual_memory().used / (1024 * 1024)
cpu_percent = psutil.cpu_percent()
metrics = TransformMetrics(
run_id=run.id,
flow_version=run.flow_version,
input_files=run.input_files,
start_time=start_time,
end_time=end_time,
duration_seconds=duration,
status=run.status,
error_rate=run.metrics.error_rate,
input_rows=run.metrics.row_counts.input,
output_rows=run.metrics.row_counts.mapped,
reject_rows=run.metrics.row_counts.rejects,
memory_usage_mb=memory_mb,
cpu_usage_percent=cpu_percent
)
self.metrics_history.append(metrics)
return metrics
def export_metrics(self, filepath: str):
"""Export metrics to JSON file."""
with open(filepath, 'w') as f:
json.dump([asdict(m) for m in self.metrics_history], f, indent=2, default=str)
def get_summary_stats(self) -> Dict[str, Any]:
"""Get summary statistics from collected metrics."""
if not self.metrics_history:
return {}
durations = [m.duration_seconds for m in self.metrics_history]
error_rates = [m.error_rate for m in self.metrics_history]
return {
"total_runs": len(self.metrics_history),
"avg_duration_seconds": sum(durations) / len(durations),
"avg_error_rate": sum(error_rates) / len(error_rates),
"successful_runs": len([m for m in self.metrics_history if m.status == "SUCCEEDED"]),
"failed_runs": len([m for m in self.metrics_history if m.status in ["FAILED", "CRASHED"]])
}
# Usage
collector = MetricsCollector()
start_time = datetime.now()
run = lume.run(flow_version="invoice_cleaner:v4", input_files=files).wait()
metrics = collector.collect_metrics(run, start_time)
print(f"Run completed in {metrics.duration_seconds:.2f} seconds")
print(f"Error rate: {metrics.error_rate:.2%}")
# Export metrics
collector.export_metrics("transform_metrics.json")
print("Summary:", collector.get_summary_stats())
Real-time Monitoring
Copy
Ask AI
import lume
import asyncio
import logging
from typing import Callable, Optional
class RealTimeMonitor:
def __init__(self, on_status_change: Optional[Callable] = None):
self.on_status_change = on_status_change
self.logger = logging.getLogger(__name__)
async def monitor_run(self, run: lume.Run, poll_interval: float = 5.0):
"""Monitor a run in real-time with status updates."""
last_status = run.status
self.logger.info(f"Starting monitoring for run {run.id}")
while run.status in ["QUEUED", "RUNNING"]:
await asyncio.sleep(poll_interval)
await run.refresh()
if run.status != last_status:
self.logger.info(f"Status changed: {last_status} → {run.status}")
last_status = run.status
if self.on_status_change:
self.on_status_change(run)
# Final status
self.logger.info(f"Run {run.id} completed with status: {run.status}")
if run.status == "SUCCEEDED":
self.logger.info(f"Success! Processed {run.metrics.row_counts.input} rows")
elif run.status == "PARTIAL_FAILED":
self.logger.warning(f"Partial failure. Error rate: {run.metrics.error_rate:.2%}")
else:
self.logger.error(f"Run failed with status: {run.status}")
return run
# Usage
def status_callback(run: lume.Run):
print(f"Status update: {run.status}")
monitor = RealTimeMonitor(on_status_change=status_callback)
async def main():
run = await lume.arun(
flow_version="invoice_cleaner:v4",
input_files=["s3://bucket/file.csv"]
)
await monitor.monitor_run(run, poll_interval=10.0)
asyncio.run(main())
Advanced Configuration
Custom Configuration Management
Copy
Ask AI
import lume
import os
from typing import Dict, Any
from dataclasses import dataclass
@dataclass
class LumeConfig:
token: str
api_url: str = "https://api.lume.com"
timeout: int = 300
max_retries: int = 3
retry_backoff: float = 2.0
max_concurrent: int = 5
memory_threshold: float = 0.8
@classmethod
def from_env(cls) -> 'LumeConfig':
"""Create config from environment variables."""
return cls(
token=os.environ["LUME_TOKEN"],
api_url=os.environ.get("LUME_API_URL", "https://api.lume.com"),
timeout=int(os.environ.get("LUME_TIMEOUT", "300")),
max_retries=int(os.environ.get("LUME_MAX_RETRIES", "3")),
retry_backoff=float(os.environ.get("LUME_RETRY_BACKOFF", "2.0")),
max_concurrent=int(os.environ.get("LUME_MAX_CONCURRENT", "5")),
memory_threshold=float(os.environ.get("LUME_MEMORY_THRESHOLD", "0.8"))
)
def to_env(self):
"""Set environment variables from config."""
os.environ["LUME_TOKEN"] = self.token
os.environ["LUME_API_URL"] = self.api_url
os.environ["LUME_TIMEOUT"] = str(self.timeout)
os.environ["LUME_MAX_RETRIES"] = str(self.max_retries)
os.environ["LUME_RETRY_BACKOFF"] = str(self.retry_backoff)
os.environ["LUME_MAX_CONCURRENT"] = str(self.max_concurrent)
os.environ["LUME_MEMORY_THRESHOLD"] = str(self.memory_threshold)
class AdvancedLumeClient:
def __init__(self, config: LumeConfig):
self.config = config
self.config.to_env()
# Configure retry policy
self.retry_policy = lume.Retry(
times=config.max_retries,
backoff=config.retry_backoff
)
def run_with_config(self, flow_version: str, input_files: List[str]) -> lume.Run:
"""Run transformation with custom configuration."""
return lume.run(
flow_version=flow_version,
input_files=input_files,
retry_policy=self.retry_policy
).wait()
# Usage
config = LumeConfig.from_env()
client = AdvancedLumeClient(config)
run = client.run_with_config(
flow_version="invoice_cleaner:v4",
input_files=["s3://bucket/file.csv"]
)
Best Practices Summary
Performance
- Use batch processing for multiple files
- Implement concurrent processing with rate limiting
- Monitor memory usage and implement garbage collection
- Use streaming for large files
Reliability
- Implement comprehensive error handling
- Use circuit breaker patterns for fault tolerance
- Implement exponential backoff for retries
- Monitor and log all operations
Monitoring
- Collect detailed metrics for all runs
- Implement real-time monitoring
- Export metrics for analysis
- Set up alerts for failures
Configuration
- Use environment variables for configuration
- Implement configuration validation
- Support different environments (dev, staging, prod)
- Use type hints for better code quality
Was this page helpful?
On this page
- Batch Processing
- Process Multiple Files Efficiently
- Concurrent Processing
- Error Handling & Resilience
- Comprehensive Error Handling
- Circuit Breaker Pattern
- Performance Optimization
- Streaming and Chunking
- Memory-Efficient Processing
- Monitoring & Observability
- Custom Metrics Collection
- Real-time Monitoring
- Advanced Configuration
- Custom Configuration Management
- Best Practices Summary
- Performance
- Reliability
- Monitoring
- Configuration
Assistant
Responses are generated using AI and may contain mistakes.