# app.py
import lume
import os
import hmac
import hashlib
import json
import boto3
from flask import Flask, request, abort, jsonify
# --- Application Setup ---
app = Flask(__name__)
s3 = boto3.client('s3')
# Lume API Key and Webhook Secret must be configured
# For production, use a secure secrets manager.
lume.init(api_key=os.environ.get("LUME_API_KEY"))
LUME_WEBHOOK_SECRET = os.environ.get("LUME_WEBHOOK_SECRET")
if not LUME_WEBHOOK_SECRET:
raise ValueError("LUME_WEBHOOK_SECRET environment variable not set.")
# --- 1. The Triggering Logic ---
def trigger_customer_pipeline(s3_path: str):
"""
Triggers a Lume run for a specific file in S3.
"""
print(f"Triggering Lume pipeline for: {s3_path}")
# The flow_version must be configured with S3 connectors.
try:
run = lume.run(
flow_version="customer_cleaner:v1",
source_path=s3_path,
# We can pass custom metadata that we'll receive in the webhook
run_metadata={"triggered_by": "webhook_app", "s3_path": s3_path}
)
print(f"Successfully started run {run.id} for path {s3_path}")
return run.id
except Exception as e:
print(f"An error occurred during trigger: {e}")
return None
# --- 2. The Webhook Listener ---
@app.route("/lume-webhook", methods=["POST"])
def handle_lume_webhook():
"""
Receives and processes notifications from Lume.
"""
# a. Securely verify the webhook signature
signature = request.headers.get("X-Lume-Signature-256")
if not signature or not verify_signature(request.data, signature):
print("Invalid webhook signature. Aborting.")
abort(403)
# b. Process the event payload
event = request.get_json()
run_id = event.get("run_id")
status = event.get("status")
custom_meta = event.get("run_metadata", {})
print(f"Received valid webhook for Run ID: {run_id} | Status: {status}")
# c. Take action based on the run status
if status == "SUCCEEDED":
print(f"Run {run_id} succeeded. Adding to downstream processor.")
# In a real app, you would add this job to a queue (e.g., RabbitMQ, SQS)
# The job would contain the run_id and any necessary metadata from the event.
process_successful_run(run_id)
elif status in ["FAILED", "CRASHED"]:
error_info = event.get('error', {})
print(f"Run {run_id} failed. Notifying data team. Error: {error_info.get('code')}")
# Add alerting logic here (e.g., PagerDuty, Slack)
return jsonify({"status": "received"}), 200
def verify_signature(payload, header_signature):
"""Verifies the Lume webhook signature."""
computed_signature = hmac.new(
key=LUME_WEBHOOK_SECRET.encode("utf-8"),
msg=payload,
digestmod=hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed_signature, header_signature)
# --- 3. The Downstream Action (Simulated) ---
def process_successful_run(run_id: str):
"""
This function simulates a background worker that processes a successful run.
It would fetch the final run metadata to get the output location and process the data.
"""
print(f"[Worker] Processing successful run {run_id}...")
try:
run = lume.run_status(run_id)
# The target location is available in the run's metadata
target_location = run.metadata.get('target', {}).get('mapped_path')
if target_location:
print(f"[Worker] Results are available at: {target_location}")
# Add logic here to download and process the results from S3
else:
print(f"[Worker] Could not determine target location for run {run_id}")
except Exception as e:
print(f"[Worker] Error processing results for run {run_id}: {e}")
# Add to a retry queue
# --- Main execution block ---
if __name__ == '__main__':
# This part simulates an event that triggers the pipeline.
# In a real app, this might be a file watcher or a message queue consumer.
print("--- Simulating new file arrival and triggering pipeline ---")
# Create and upload a dummy file to S3
source_bucket = "lume-demo-source-bucket"
s3_path = "s3://lume-demo-source-bucket/raw_customers/new_customers.csv"
with open("new_customers.csv", "w") as f:
f.write("id,name,email\n")
f.write("1,John Doe,[email protected]\n")
f.write("2,JANE SMITH, [email protected]\n")
s3.upload_file("new_customers.csv", source_bucket, "raw_customers/new_customers.csv")
trigger_customer_pipeline(s3_path)
# This part runs the web server to listen for the webhook.
print("\n--- Starting web server to listen for Lume webhooks ---")
print("Run `ngrok http 5000` and configure the webhook URL in the Lume UI.")
app.run(port=5000)