import lume
import uuid
import psycopg2 # Example using the PostgreSQL driver
# --- 1. Prepare and Insert the Batch Data ---
# In a real workflow, this data would come from an upstream source.
# The `db_batch_id` is just a string that uniquely identifies the records for this run.
db_batch_id = str(uuid.uuid4())
new_invoices = [
(db_batch_id, "inv_123", "cust_abc", 100.50),
(db_batch_id, "inv_124", "cust_xyz", 75.00)
]
# Connect to your database and insert the new records.
# This part of the code runs on your infrastructure.
conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=yourhost")
with conn.cursor() as cur:
for record in new_invoices:
cur.execute(
"INSERT INTO raw_invoices (batch_id, invoice_id, customer_id, amount) VALUES (%s, %s, %s, %s)",
record
)
conn.commit()
print(f"Inserted {len(new_invoices)} records with batch_id: {db_batch_id}")
# --- 2. Trigger the Run ---
# Lume uses the pre-configured Connector and injects the `source_path`
# into the filtering logic defined in the Flow Version.
run = lume.run(
flow_version="invoice_processor:v1",
source_path=db_batch_id
)
print(f"Started run {run.id} for database batch_id: {db_batch_id}")
# --- 3. Monitor and Finalize ---
# `run.wait()` blocks execution until the pipeline is complete.
print("Waiting for run to complete...")
run.wait()
print(f"Run {run.id} finished with status: {run.status}")
if run.status == 'SUCCEEDED':
metrics = run.metadata.get('metrics', {})
results = run.metadata.get('results', {})
print(f" - Input Rows: {results.get('input_rows')}")
print(f" - Mapped Rows: {results.get('mapped_rows')}")
print(f" - Duration: {metrics.get('duration_seconds', 0):.2f}s")
# The transformed data is now available in the `clean_invoices` table in Snowflake.
else:
error_info = run.metadata.get('error', {})
print(f" - Error Code: {error_info.get('code')}")
print(f" - Error Message: {error_info.get('message')}")