This example shows the most common use case: processing a batch of CSV files from an S3 bucket and writing the transformed data back to another S3 location.

Scenario: You receive daily sales data in a specific S3 prefix. You want to trigger a run to clean, transform, and validate this data. The sales_transformer:v3 Flow Version has already been configured in the Lume UI with S3 Connectors for the source and target.

import lume
import boto3

# --- 1. Prepare Your Data ---
# Create a dummy local CSV file for demonstration.
with open("local_sales_data.csv", "w") as f:
    f.write("order_id,product_id,quantity,price\\n")
    f.write("ord_001,prod_a,2,10.50\\n")
    f.write("ord_002,prod_b,1,25.00\\n")
    f.write("ord_003,prod_a,5,9.99\\n")

# In a real workflow, this data would be delivered to S3 by another process.
# Here, we'll upload our sample file to a specific S3 path.
s3 = boto3.client('s3')
source_bucket = "lume-demo-source-bucket"
source_path = "s3://lume-demo-source-bucket/daily_sales/2024-07-30/sales.csv"

# For demonstration, we upload a dummy file to the source_path.
# The S3 Connector in the Flow Version must have read access to this location.
s3.upload_file("local_sales_data.csv", source_bucket, "daily_sales/2024-07-30/sales.csv")
print(f"Uploaded sample data to {source_path}")

# --- 2. Trigger the Run ---
# The `source_path` directly tells Lume which object to process.
try:
    run = lume.run(
        flow_version="sales_transformer:v3",
        source_path=source_path
    )
    print(f"Successfully started run {run.id} for source: {source_path}")

except Exception as e:
    print(f"Failed to start run: {e}")
    # Add your error handling logic here
    exit()

# --- 3. Monitor the Run ---
# `run.wait()` blocks execution until the pipeline finishes.
print("Waiting for run to complete...")
run.wait()
print(f"Run {run.id} finished with status: {run.status}")

<Tip>
For production workflows, consider using [Webhooks](/pages/libraries/python-ent/production#asynchronous-processing-with-webhooks) instead of `run.wait()` for more efficient, event-driven monitoring.
</Tip>

# --- 4. Handle the Outcome ---
if run.status == "SUCCEEDED":
    print("Run completed successfully!")
    # You can now access metadata, including output locations and metrics.
    pipeline_metrics = run.metadata.get('pipeline', {})
    print(f"Pipeline duration: {pipeline_metrics.get('duration_seconds')}s")

elif run.status == "PARTIAL_FAILED":
    print("Run completed with some errors.")
    # You can now use the metadata to find and inspect the rejected rows.
    results = run.metadata.get('results', {})
    print(f"Rejected rows count: {results.get('rejected_rows')}")
    print(f"Rejects file location: {results.get('target_locations', {}).get('rejects')}")

else: # FAILED or CRASHED
    print(f"Run failed with status: {run.status}")
    error_info = run.metadata.get('errors', [{}])[0]
    print(f"Error Stage: {error_info.get('stage')}")
    print(f"Error Message: {error_info.get('message')}")

Sample Input

(local_sales_data.csv)

order_idproduct_idquantityprice
ord_001prod_a210.50
ord_002prod_b125.00
ord_003prod_a59.99

Sample Output

(Resulting mapped_sales_data.csv in the target S3 bucket)

order_idproductitem_counttotal_price
ord_001prod_a221.00
ord_002prod_b125.00
ord_003prod_a549.95

Notice how the Flow Version renamed columns (product_idproduct, quantityitem_count) and calculated a new field (total_price).