This example shows the most common use case: processing a batch of CSV files from an S3 bucket and writing the transformed data back to another S3 location.
Scenario: You receive daily sales data in a specific S3 prefix. You want to trigger a run to clean, transform, and validate this data. The sales_transformer:v3
Flow Version has already been configured in the Lume UI with S3 Connectors for the source and target.
import lume
import boto3
# --- 1. Prepare Your Data ---
# Create a dummy local CSV file for demonstration.
with open("local_sales_data.csv", "w") as f:
f.write("order_id,product_id,quantity,price\\n")
f.write("ord_001,prod_a,2,10.50\\n")
f.write("ord_002,prod_b,1,25.00\\n")
f.write("ord_003,prod_a,5,9.99\\n")
# In a real workflow, this data would be delivered to S3 by another process.
# Here, we'll upload our sample file to a specific S3 path.
s3 = boto3.client('s3')
source_bucket = "lume-demo-source-bucket"
source_path = "s3://lume-demo-source-bucket/daily_sales/2024-07-30/sales.csv"
# For demonstration, we upload a dummy file to the source_path.
# The S3 Connector in the Flow Version must have read access to this location.
s3.upload_file("local_sales_data.csv", source_bucket, "daily_sales/2024-07-30/sales.csv")
print(f"Uploaded sample data to {source_path}")
# --- 2. Trigger the Run ---
# The `source_path` directly tells Lume which object to process.
try:
run = lume.run(
flow_version="sales_transformer:v3",
source_path=source_path
)
print(f"Successfully started run {run.id} for source: {source_path}")
except Exception as e:
print(f"Failed to start run: {e}")
# Add your error handling logic here
exit()
# --- 3. Monitor the Run ---
# `run.wait()` blocks execution until the pipeline finishes.
print("Waiting for run to complete...")
run.wait()
print(f"Run {run.id} finished with status: {run.status}")
<Tip>
For production workflows, consider using [Webhooks](/pages/libraries/python-ent/production#asynchronous-processing-with-webhooks) instead of `run.wait()` for more efficient, event-driven monitoring.
</Tip>
# --- 4. Handle the Outcome ---
if run.status == "SUCCEEDED":
print("Run completed successfully!")
# You can now access metadata, including output locations and metrics.
pipeline_metrics = run.metadata.get('pipeline', {})
print(f"Pipeline duration: {pipeline_metrics.get('duration_seconds')}s")
elif run.status == "PARTIAL_FAILED":
print("Run completed with some errors.")
# You can now use the metadata to find and inspect the rejected rows.
results = run.metadata.get('results', {})
print(f"Rejected rows count: {results.get('rejected_rows')}")
print(f"Rejects file location: {results.get('target_locations', {}).get('rejects')}")
else: # FAILED or CRASHED
print(f"Run failed with status: {run.status}")
error_info = run.metadata.get('errors', [{}])[0]
print(f"Error Stage: {error_info.get('stage')}")
print(f"Error Message: {error_info.get('message')}")
(local_sales_data.csv
)
order_id | product_id | quantity | price |
---|
ord_001 | prod_a | 2 | 10.50 |
ord_002 | prod_b | 1 | 25.00 |
ord_003 | prod_a | 5 | 9.99 |
Sample Output
(Resulting mapped_sales_data.csv
in the target S3 bucket)
order_id | product | item_count | total_price |
---|
ord_001 | prod_a | 2 | 21.00 |
ord_002 | prod_b | 1 | 25.00 |
ord_003 | prod_a | 5 | 49.95 |
Notice how the Flow Version renamed columns (product_id
→ product
, quantity
→ item_count
) and calculated a new field (total_price
).