This example demonstrates how to trigger a run that processes data already loaded into a relational database like PostgreSQL or Snowflake.

Scenario: An external process loads invoice data into a raw_invoices table in your PostgreSQL database. All records for a given batch are tagged with the same batch_id.

The Flow Version invoice_processor:v1 is configured in the Lume UI with:

  • Source Connector: Your PostgreSQL database.
  • Source Tables: raw_invoices.
  • Filtering Logic: A WHERE clause like batch_id = '{source_path}'.
  • Target Connector: Your Snowflake data warehouse.
  • Target Table: clean_invoices.
import lume
import uuid
import psycopg2 # Example using the PostgreSQL driver

# --- 1. Prepare and Insert the Batch Data ---
# In a real workflow, this data would come from an upstream source.
# The `db_batch_id` is just a string that uniquely identifies the records for this run.
db_batch_id = str(uuid.uuid4())
new_invoices = [
    (db_batch_id, "inv_123", "cust_abc", 100.50),
    (db_batch_id, "inv_124", "cust_xyz", 75.00)
]

# Connect to your database and insert the new records.
# This part of the code runs on your infrastructure.
conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=yourhost")
with conn.cursor() as cur:
    for record in new_invoices:
        cur.execute(
            "INSERT INTO raw_invoices (batch_id, invoice_id, customer_id, amount) VALUES (%s, %s, %s, %s)",
            record
        )
conn.commit()
print(f"Inserted {len(new_invoices)} records with batch_id: {db_batch_id}")


# --- 2. Trigger the Run ---
# Lume uses the pre-configured Connector and injects the `source_path`
# into the filtering logic defined in the Flow Version.
run = lume.run(
    flow_version="invoice_processor:v1",
    source_path=db_batch_id
)
print(f"Started run {run.id} for database batch_id: {db_batch_id}")

# --- 3. Monitor and Finalize ---
# `run.wait()` blocks execution until the pipeline is complete.
print("Waiting for run to complete...")
run.wait()

print(f"Run {run.id} finished with status: {run.status}")

if run.status == 'SUCCEEDED':
    metrics = run.metadata.get('metrics', {})
    results = run.metadata.get('results', {})
    print(f"  - Input Rows: {results.get('input_rows')}")
    print(f"  - Mapped Rows: {results.get('mapped_rows')}")
    print(f"  - Duration: {metrics.get('duration_seconds', 0):.2f}s")
    # The transformed data is now available in the `clean_invoices` table in Snowflake.
else:
    error_info = run.metadata.get('error', {})
    print(f"  - Error Code: {error_info.get('code')}")
    print(f"  - Error Message: {error_info.get('message')}")

Sample Input

(Records in the PostgreSQL raw_invoices table where batch_id matches the source_path)

batch_idinvoice_idcustomer_idamount
inv_123cust_abc100.50
inv_124cust_xyz75.00

Sample Output

(New records created in the Snowflake clean_invoices table by the Lume pipeline)

invoiceIdcustomerIdamountCentsprocessedAt
INV-123CUST-ABC100502024-07-30T10:00:00Z
INV-124CUST-XYZ75002024-07-30T10:00:00Z

Notice how the Flow Version transformed the data: it standardized IDs, converted the amount to cents, and added a processing timestamp.

For production workflows, consider using Webhooks instead of run.wait() for more efficient, event-driven monitoring.

Final Result