Skip to main content

Preprocessors

Scripts and flows can include a preprocessor to transform incoming requests before they are passed to the runnable. The preprocessor is only called when the runnable is triggered via a webhook, an HTTP route, an email trigger, a WebSocket trigger, a Kafka trigger, a NATS trigger, a Postgres trigger, an SQS trigger or an MQTT trigger.

This approach is useful for preprocessing arguments differently depending on the trigger before the execution of the runnable. It also separates the handling of arguments according to whether they are called by a trigger or from the UI, which can help you keep a simple schema form in the UI for the runnable.

The preprocessor receives trigger metadata (wm_trigger) along with the main trigger arguments. The structure of wm_trigger and the main trigger arguments are specific to each trigger type:

  • Webhook/HTTP: (wm_trigger: { kind: 'http' | 'webhook', http?: { ... } }, body_key_1: any, body_key_2: any, ...)
  • Postgres: (wm_trigger: { kind: 'postgres' }, transaction_type: string, schema_name: string, table_name: string, row: any)
  • WebSocket/Kafka/NATS/SQS/MQTT: (wm_trigger: { kind: 'websocket' | 'kafka' | 'nats' | 'sqs' | 'mqtt', [kind]: { ... } }, msg: string)
  • Email: (wm_trigger: { kind: 'email' }, raw_email: string, parsed_email: { ... })

You can find more details about the arguments format and the structure of wm_trigger for each trigger in their respective documentation pages.

Preprocessors can only be written in TypeScript (Bun/Deno) or Python.

Script preprocessor

In scripts, you need to export an additional preprocessor function.

The returned object defines the parameter values passed to main(). For instance, returning { b: 1, a: 2 } in the preprocessor will call main(2, 1), assuming main is defined as main(a: number, b: number). Ensure that the parameter names in main match the keys in the returned object.

Here are some templates for scripts with preprocessors in TypeScript and Python:

export async function preprocessor(  
/*
* Replace this comment with the parameters received from the trigger.
* Examples: `body_key_1`, `body_key_2` for Webhook/HTTP, `msg` for WebSocket, etc.
*/

// The trigger metadata
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
method: string
params: Record<string, string> // path parameters
query: Record<string, string> // query parameters
headers: Record<string, string>
},
websocket?: {
url: string // The websocket url
},
kafka?: {
brokers: string[]
topic: string
group_id: string
},
nats?: {
servers: string[]
subject: string
headers?: Record<string, string[]>
status?: number
description?: string
length: number
},
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
},
mqtt?: {
topic: string,
retain: boolean,
pkid: number,
qos: number,
v5?: {
payload_format_indicator?: number,
topic_alias?: number,
response_topic?: string,
correlation_data?: Array<number>,
user_properties?: Array<[string, string]>,
subscription_identifiers?: Array<number>,
content_type?: string
}
}
}
) {
return {
// return the args to be passed to the runnable
}
}

export async function main(/* main args */) {
// your code here
}

Once a preprocessor is created, you should see a new tab in the right panel of the editor that allows you to test the preprocessor with a sample request.

Test script preprocessor

Flow preprocessor

For flows, the idea is similar but the preprocessor is a standalone step that returns only a preprocessor function. To create a preprocessor for a flow, click on the plus button above the Input step:

Add flow preprocessor

The returned object determines the parameter values passed to the flow. For instance, returning { b: 1, a: 2 } in the preprocessor will call the flow with a = 2 and b = 1, assuming the flow has two inputs called a and b. Ensure that the input names of the flow match the keys in the returned object.

Below you'll find preprocessor step templates for flows in TypeScript and Python:

export async function preprocessor(  
/*
* Replace this comment with the parameters received from the trigger.
* Examples: `bodyKey1`, `bodyKey2` for Webhook/HTTP, `msg` for WebSocket, etc.
*/

// The trigger metadata
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
method: string
params: Record<string, string> // path parameters
query: Record<string, string> // query parameters
headers: Record<string, string>
},
websocket?: {
url: string // The websocket url
},
kafka?: {
brokers: string[]
topic: string
group_id: string
},
nats?: {
servers: string[]
subject: string
headers?: Record<string, string[]>
status?: number
description?: string
length: number
},
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
},
mqtt?: {
topic: string,
retain: boolean,
pkid: number,
qos: number,
v5?: {
payload_format_indicator?: number,
topic_alias?: number,
response_topic?: string,
correlation_data?: Array<number>,
user_properties?: Array<[string, string]>,
subscription_identifiers?: Array<number>,
content_type?: string
}
}
}
) {
return {
// return the args to be passed to the runnable
}
}