These examples are meant to help you quickly get started with a data mapping flow. See the full list of guides for more advanced operations.

Running a flow

We are ingesting data from an API and want to normalize each record to our own internal schema. The data in question contains information about our users. The API gives us the following source data:

data.json
[
    {
        "name": "Barry Devlin",
        "nickname": "Baz",
        "years_old": 42
    },
    {
        "name": "Paul Murphy",
        "nickname": "Polly",
        "years_old": 37
    }
]

Our internal schema can be represented in JSON Schema format as follows:

schema.json
{
    "type": "object"
    "properties": {
        "first_name": {
            "description": "The first name of the user.",
            "type": "string"
        },
        "last_name": {
            "description": "The last name of the user.",
            "type": "string"
        },
        "age": {
            "description": "The age of the user in years.",
            "type": "integer"
        }
    },
    "required": [
        "first_name",
        "last_name",
        "age",
    ]
}

All we need to do is create a flow with our target schema and run it with our source data to retrieve the transformed data:

import { Lume } from '@lume-ai/typescript-sdk';
import data from './data.json'
import schema from './schema.json'

const lume = new Lume('api_key');

// Method 1: Create, run, and get results in one go
const flow = await lume.flowService.createAndRunFlow({
  name: "my_flow",
  description: "Process customer data",
  target_schema: schema,
  tags: ["customers"]
}, {
  source_data: data
}, true);

// Get results from the latest run
const results = await flow.getLatestRunResults();

// Method 2: Process data with an existing flow
const flowId = "existing-flow-id";
const existingFlow = await lume.flowService.getFlow(flowId);
const moreResults = await existingFlow.process(newData);

And just like that, Lume completely automates the mapping process! Our mapped data will look like this:

[
    {
        "first_name": "Barry",
        "last_name": "Devlin",
        "age": 42
    },
    {
        "first_name": "Paul",
        "last_name": "Murphy",
        "age": 37
    }
]

Processing Methods

There are two main approaches to processing data with Lume:

  1. Creating and Running a New Flow (createAndRunFlow + getLatestRunResults):

    • Best for when you’re setting up a new data transformation flow
    • Creates a new flow configuration and immediately processes data
    • Useful for initial setup and testing of your mapping logic
    • The getLatestRunResults() method retrieves the output from the most recent run
    • If you’re planning to review the results in the Lume app, you can skip calling getLatestRunResults() entirely
  2. Using an Existing Flow (process):

    • Best for when you already have a flow set up and want to process new data
    • More efficient as it reuses existing mapping logic
    • Provides a simpler, more direct way to process data
    • Returns results immediately without needing to call getLatestRunResults

Choose Method 1 when setting up a new transformation flow, and Method 2 when you want to process additional data using an existing flow’s mapping logic.

Monitoring flow status

We can also create a flow and monitor its status manually. This approach is particularly useful for longer-running flows, such as those processing large datasets or complex target schemas, where you want direct control over polling intervals rather than waiting for an asynchronous call to complete:

// Create a flow
const flow = await lume.flowService.createFlow({
    name: 'user_normalization',
    description: 'Mapping from API user data to internal schema.',
    target_schema: schema,
    tags: ['users'],
});

// Create and monitor a run
const run = await flow.createRun({ source_data: data });
while (run.status === 'RUNNING' || run.status === 'PENDING') {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    await run.get();
}

if (run.status === 'SUCCEEDED') {
    const output = await run.getSchemaTransformerOutput();
    console.log('Transformation complete:', output?.mapped_data.items);
} else {
    console.log('Run failed:', run.status);
}

Processing more data

Once a flow is created, we can easily process new data using the same mapping logic:

// Get an existing flow by ID or name
const existingFlow = await lume.flowService.getFlow('flow-id');
// Process new data
const results = await existingFlow.process(newData);

// Alternatively, you can create a new run and get its results
await existingFlow.createRun({ source_data: newData }, true);
const latestResults = await existingFlow.getLatestRunResults();

The getLatestRunResults() method is particularly useful when you want to retrieve the output from the most recent run of your flow, regardless of how it was initiated.

Next steps

As our data transformation needs grow, we will begin creating more flows. Start with Managing Flows and read the rest of our guides to learn more!