This example demonstrates a complete, realistic, and robust workflow for a production application. The application triggers a Lume pipeline and then uses a webhook to asynchronously receive and process the results when the run is complete.
This architectural pattern is highly recommended for event-driven systems as it is more scalable than polling for status.
Scenario
A Python application needs to process new customer registration CSVs as they arrive. The process is:
- A new CSV file of raw customer data is uploaded to a specific S3 location.
- The application triggers a Lume pipeline, pointing to the S3 object’s path.
- The application does not block or wait. It relies on Lume to call it back when the job is done.
- When Lume sends a webhook notification, the application fetches the run status and, on success, adds the event to a downstream processing queue.
The Application Code
This example uses the Flask web framework (pip install Flask
).
# app.py
import lume
import os
import hmac
import hashlib
import json
import boto3
from flask import Flask, request, abort, jsonify
# --- Application Setup ---
app = Flask(__name__)
s3 = boto3.client('s3')
# Lume API Key and Webhook Secret must be configured
# For production, use a secure secrets manager.
lume.init(api_key=os.environ.get("LUME_API_KEY"))
LUME_WEBHOOK_SECRET = os.environ.get("LUME_WEBHOOK_SECRET")
if not LUME_WEBHOOK_SECRET:
raise ValueError("LUME_WEBHOOK_SECRET environment variable not set.")
# --- 1. The Triggering Logic ---
def trigger_customer_pipeline(s3_path: str):
"""
Triggers a Lume run for a specific file in S3.
"""
print(f"Triggering Lume pipeline for: {s3_path}")
# The flow_version must be configured with S3 connectors.
try:
run = lume.run(
flow_version="customer_cleaner:v1",
source_path=s3_path,
# We can pass custom metadata that we'll receive in the webhook
run_metadata={"triggered_by": "webhook_app", "s3_path": s3_path}
)
print(f"Successfully started run {run.id} for path {s3_path}")
return run.id
except Exception as e:
print(f"An error occurred during trigger: {e}")
return None
# --- 2. The Webhook Listener ---
@app.route("/lume-webhook", methods=["POST"])
def handle_lume_webhook():
"""
Receives and processes notifications from Lume.
"""
# a. Securely verify the webhook signature
signature = request.headers.get("X-Lume-Signature-256")
if not signature or not verify_signature(request.data, signature):
print("Invalid webhook signature. Aborting.")
abort(403)
# b. Process the event payload
event = request.get_json()
run_id = event.get("run_id")
status = event.get("status")
custom_meta = event.get("run_metadata", {})
print(f"Received valid webhook for Run ID: {run_id} | Status: {status}")
# c. Take action based on the run status
if status == "SUCCEEDED":
print(f"Run {run_id} succeeded. Adding to downstream processor.")
# In a real app, you would add this job to a queue (e.g., RabbitMQ, SQS)
# The job would contain the run_id and any necessary metadata from the event.
process_successful_run(run_id)
elif status in ["FAILED", "CRASHED"]:
error_info = event.get('error', {})
print(f"Run {run_id} failed. Notifying data team. Error: {error_info.get('code')}")
# Add alerting logic here (e.g., PagerDuty, Slack)
return jsonify({"status": "received"}), 200
def verify_signature(payload, header_signature):
"""Verifies the Lume webhook signature."""
computed_signature = hmac.new(
key=LUME_WEBHOOK_SECRET.encode("utf-8"),
msg=payload,
digestmod=hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed_signature, header_signature)
# --- 3. The Downstream Action (Simulated) ---
def process_successful_run(run_id: str):
"""
This function simulates a background worker that processes a successful run.
It would fetch the final run metadata to get the output location and process the data.
"""
print(f"[Worker] Processing successful run {run_id}...")
try:
run = lume.run_status(run_id)
# The target location is available in the run's metadata
target_location = run.metadata.get('target', {}).get('mapped_path')
if target_location:
print(f"[Worker] Results are available at: {target_location}")
# Add logic here to download and process the results from S3
else:
print(f"[Worker] Could not determine target location for run {run_id}")
except Exception as e:
print(f"[Worker] Error processing results for run {run_id}: {e}")
# Add to a retry queue
# --- Main execution block ---
if __name__ == '__main__':
# This part simulates an event that triggers the pipeline.
# In a real app, this might be a file watcher or a message queue consumer.
print("--- Simulating new file arrival and triggering pipeline ---")
# Create and upload a dummy file to S3
source_bucket = "lume-demo-source-bucket"
s3_path = "s3://lume-demo-source-bucket/raw_customers/new_customers.csv"
with open("new_customers.csv", "w") as f:
f.write("id,name,email\n")
f.write("1,John Doe,[email protected]\n")
f.write("2,JANE SMITH, [email protected]\n")
s3.upload_file("new_customers.csv", source_bucket, "raw_customers/new_customers.csv")
trigger_customer_pipeline(s3_path)
# This part runs the web server to listen for the webhook.
print("\n--- Starting web server to listen for Lume webhooks ---")
print("Run `ngrok http 5000` and configure the webhook URL in the Lume UI.")
app.run(port=5000)
How to Run This Example
-
Install Dependencies:
pip install Flask lume boto3
-
Set Environment Variables:
export LUME_API_KEY="your_api_key_here"
export LUME_WEBHOOK_SECRET="your_webhook_secret_from_lume_ui"
# Also ensure your environment is configured with AWS credentials for boto3
-
Expose Your Local Server: Lume’s cloud platform needs to be able to reach your local machine. Use a tool like ngrok
to create a secure public URL for your local server.
This will give you a public URL like https://<unique_id>.ngrok.io
.
-
Configure Webhook in Lume: In the Lume UI for your customer_cleaner
Flow, add a new Webhook Notification pointing to your ngrok URL (e.g., https://<unique_id>.ngrok.io/lume-webhook
).
-
Run the Application:
The script will first trigger the run and then start the web server. When Lume finishes the run, you will see the webhook request printed in your terminal, followed by the simulated worker processing.
Sample Data
(Contents of new_customers.csv
uploaded to S3)
Sample Output
(Contents of the mapped CSV file in the target S3 location, available to the background worker)
Notice how the Flow Version standardized the name and email (trimming whitespace), prefixed the ID, and added a default is_premium
flag.