This example demonstrates how to trigger a run that processes data already loaded into a relational database like PostgreSQL or Snowflake.
Scenario: An external process loads invoice data into a raw_invoices
table in your PostgreSQL database. All records for a given batch are tagged with the same batch_id
.
The Flow Version invoice_processor:v1
is configured in the Lume UI with:
- Source Connector: Your PostgreSQL database.
- Source Tables:
raw_invoices
.
- Filtering Logic: A
WHERE
clause like batch_id = '{source_path}'
.
- Target Connector: Your Snowflake data warehouse.
- Target Table:
clean_invoices
.
import lume
import uuid
import psycopg2 # Example using the PostgreSQL driver
# --- 1. Prepare and Insert the Batch Data ---
# In a real workflow, this data would come from an upstream source.
# The `db_batch_id` is just a string that uniquely identifies the records for this run.
db_batch_id = str(uuid.uuid4())
new_invoices = [
(db_batch_id, "inv_123", "cust_abc", 100.50),
(db_batch_id, "inv_124", "cust_xyz", 75.00)
]
# Connect to your database and insert the new records.
# This part of the code runs on your infrastructure.
conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=yourhost")
with conn.cursor() as cur:
for record in new_invoices:
cur.execute(
"INSERT INTO raw_invoices (batch_id, invoice_id, customer_id, amount) VALUES (%s, %s, %s, %s)",
record
)
conn.commit()
print(f"Inserted {len(new_invoices)} records with batch_id: {db_batch_id}")
# --- 2. Trigger the Run ---
# Lume uses the pre-configured Connector and injects the `source_path`
# into the filtering logic defined in the Flow Version.
run = lume.run(
flow_version="invoice_processor:v1",
source_path=db_batch_id
)
print(f"Started run {run.id} for database batch_id: {db_batch_id}")
# --- 3. Monitor and Finalize ---
# `run.wait()` blocks execution until the pipeline is complete.
print("Waiting for run to complete...")
run.wait()
print(f"Run {run.id} finished with status: {run.status}")
if run.status == 'SUCCEEDED':
metrics = run.metadata.get('metrics', {})
results = run.metadata.get('results', {})
print(f" - Input Rows: {results.get('input_rows')}")
print(f" - Mapped Rows: {results.get('mapped_rows')}")
print(f" - Duration: {metrics.get('duration_seconds', 0):.2f}s")
# The transformed data is now available in the `clean_invoices` table in Snowflake.
else:
error_info = run.metadata.get('error', {})
print(f" - Error Code: {error_info.get('code')}")
print(f" - Error Message: {error_info.get('message')}")
(Records in the PostgreSQL raw_invoices
table where batch_id
matches the source_path
)
batch_id | invoice_id | customer_id | amount |
---|
… | inv_123 | cust_abc | 100.50 |
… | inv_124 | cust_xyz | 75.00 |
Sample Output
(New records created in the Snowflake clean_invoices
table by the Lume pipeline)
invoiceId | customerId | amountCents | processedAt |
---|
INV-123 | CUST-ABC | 10050 | 2024-07-30T10:00:00Z |
INV-124 | CUST-XYZ | 7500 | 2024-07-30T10:00:00Z |
Notice how the Flow Version transformed the data: it standardized IDs, converted the amount to cents, and added a processing timestamp.
For production workflows, consider using Webhooks instead of run.wait()
for more efficient, event-driven monitoring.
Final Result