Preprocessors
Scripts and flows can include a preprocessor to transform incoming requests before they are passed to the runnable. The preprocessor is only called when the runnable is triggered via a webhook, an HTTP route, an email trigger, a WebSocket trigger, a Kafka trigger, a NATS trigger, a Postgres trigger, an SQS trigger or an MQTT trigger.
This approach is useful for preprocessing arguments differently depending on the trigger before the execution of the runnable. It also separates the handling of arguments according to whether they are called by a trigger or from the UI, which can help you keep a simple schema form in the UI for the runnable.
The preprocessor receives trigger metadata (wm_trigger
) along with the main trigger arguments.
The structure of wm_trigger
and the main trigger arguments are specific to each trigger type:
- Webhook/HTTP:
(wm_trigger: { kind: 'http' | 'webhook', http?: { ... } }, body_key_1: any, body_key_2: any, ...)
- Postgres:
(wm_trigger: { kind: 'postgres' }, transaction_type: string, schema_name: string, table_name: string, row: any)
- WebSocket/Kafka/NATS/SQS/MQTT:
(wm_trigger: { kind: 'websocket' | 'kafka' | 'nats' | 'sqs' | 'mqtt', [kind]: { ... } }, msg: string)
- Email:
(wm_trigger: { kind: 'email' }, raw_email: string, parsed_email: { ... })
You can find more details about the arguments format and the structure of wm_trigger
for each trigger in their respective documentation pages.
Preprocessors can only be written in TypeScript (Bun/Deno) or Python.
Script preprocessor
In scripts, you need to export an additional preprocessor
function.
The returned object defines the parameter values passed to main()
.
For instance, returning { b: 1, a: 2 }
in the preprocessor will call main(2, 1)
, assuming main
is defined as main(a: number, b: number)
.
Ensure that the parameter names in main
match the keys in the returned object.
Here are some templates for scripts with preprocessors in TypeScript and Python:
- TypeScript
- Python
export async function preprocessor(
/*
* Replace this comment with the parameters received from the trigger.
* Examples: `body_key_1`, `body_key_2` for Webhook/HTTP, `msg` for WebSocket, etc.
*/
// The trigger metadata
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
method: string
params: Record<string, string> // path parameters
query: Record<string, string> // query parameters
headers: Record<string, string>
},
websocket?: {
url: string // The websocket url
},
kafka?: {
brokers: string[]
topic: string
group_id: string
},
nats?: {
servers: string[]
subject: string
headers?: Record<string, string[]>
status?: number
description?: string
length: number
},
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
},
mqtt?: {
topic: string,
retain: boolean,
pkid: number,
qos: number,
v5?: {
payload_format_indicator?: number,
topic_alias?: number,
response_topic?: string,
correlation_data?: Array<number>,
user_properties?: Array<[string, string]>,
subscription_identifiers?: Array<number>,
content_type?: string
}
}
}
) {
return {
// return the args to be passed to the runnable
}
}
export async function main(/* main args */) {
// your code here
}
from typing import TypedDict, Literal
class Http(TypedDict):
route: str # The route path, e.g. "/users/:id"
path: str # The actual path called, e.g. "/users/123"
method: str
params: dict[str, str]
query: dict[str, str]
headers: dict[str, str]
class Websocket(TypedDict):
url: str # The websocket url
class Kafka(TypedDict):
topic: str
brokers: list[str]
group_id: str
class Nats(TypedDict):
servers: list[str]
subject: str
headers: dict[str, list[str]] | None
status: int | None
description: str | None
length: int
class MessageAttribute(TypedDict):
string_value: str | None
data_type: str
class Sqs(TypedDict):
queue_url: str
message_id: str | None
receipt_handle: str | None
attributes: dict[str, str]
message_attributes: dict[str, MessageAttribute] | None
class MqttV5Properties:
payload_format_indicator: int | None
topic_alias: int | None
response_topic: str | None
correlation_data: list[int] | None
user_properties: list[tuple[str, str]] | None
subscription_identifiers: list[int] | None
content_type: str | None
class Mqtt(TypedDict):
topic: str
retain: bool
pkid: int
qos: int
v5: MqttV5Properties | None
class WmTrigger(TypedDict):
kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats", "postgres", "sqs", "mqtt"]
http: Http | None
websocket: Websocket | None
kafka: Kafka | None
nats: Nats | None
sqs: Sqs | None
mqtt: Mqtt | None
def preprocessor(
# Replace this comment with the parameters received from the trigger.
# Examples: `bodyKey1`, `bodyKey2` for Webhook/HTTP, `msg` for WebSocket, etc.
# Trigger metadata
wm_trigger: WmTrigger,
):
return {
# return the args to be passed to the runnable
}
def main(): # add the parameters you expect from the preprocessor
# your code here
Once a preprocessor is created, you should see a new tab in the right panel of the editor that allows you to test the preprocessor with a sample request.
Flow preprocessor
For flows, the idea is similar but the preprocessor is a standalone step that returns only a preprocessor
function.
To create a preprocessor for a flow, click on the plus button above the Input
step:
The returned object determines the parameter values passed to the flow.
For instance, returning { b: 1, a: 2 }
in the preprocessor will call the flow with a = 2
and b = 1
, assuming the flow has two inputs called a
and b
.
Ensure that the input names of the flow match the keys in the returned object.
Below you'll find preprocessor step templates for flows in TypeScript and Python:
- TypeScript
- Python
export async function preprocessor(
/*
* Replace this comment with the parameters received from the trigger.
* Examples: `bodyKey1`, `bodyKey2` for Webhook/HTTP, `msg` for WebSocket, etc.
*/
// The trigger metadata
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
method: string
params: Record<string, string> // path parameters
query: Record<string, string> // query parameters
headers: Record<string, string>
},
websocket?: {
url: string // The websocket url
},
kafka?: {
brokers: string[]
topic: string
group_id: string
},
nats?: {
servers: string[]
subject: string
headers?: Record<string, string[]>
status?: number
description?: string
length: number
},
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
},
mqtt?: {
topic: string,
retain: boolean,
pkid: number,
qos: number,
v5?: {
payload_format_indicator?: number,
topic_alias?: number,
response_topic?: string,
correlation_data?: Array<number>,
user_properties?: Array<[string, string]>,
subscription_identifiers?: Array<number>,
content_type?: string
}
}
}
) {
return {
// return the args to be passed to the runnable
}
}
from typing import TypedDict, Literal
class Http(TypedDict):
route: str # The route path, e.g. "/users/:id"
path: str # The actual path called, e.g. "/users/123"
method: str
params: dict[str, str]
query: dict[str, str]
headers: dict[str, str]
class Websocket(TypedDict):
url: str # The websocket url
class Kafka(TypedDict):
topic: str
brokers: list[str]
group_id: str
class Nats(TypedDict):
servers: list[str]
subject: str
headers: dict[str, list[str]] | None
status: int | None
description: str | None
length: int
class MessageAttribute(TypedDict):
string_value: str | None
data_type: str
class Sqs(TypedDict):
queue_url: str
message_id: str | None
receipt_handle: str | None
attributes: dict[str, str]
message_attributes: dict[str, MessageAttribute] | None
class MqttV5Properties:
payload_format_indicator: int | None
topic_alias: int | None
response_topic: str | None
correlation_data: list[int] | None
user_properties: list[tuple[str, str]] | None
subscription_identifiers: list[int] | None
content_type: str | None
class Mqtt(TypedDict):
topic: str
retain: bool
pkid: int
qos: int
v5: MqttV5Properties | None
class WmTrigger(TypedDict):
kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats", "postgres", "sqs", "mqtt"]
http: Http | None
websocket: Websocket | None
kafka: Kafka | None
nats: Nats | None
sqs: Sqs | None
mqtt: Mqtt | None
def preprocessor(
# Replace this comment with the parameters received from the trigger.
# Examples: `bodyKey1`, `bodyKey2` for Webhook/HTTP, `msg` for WebSocket, etc.
# Trigger metadata
wm_trigger: WmTrigger,
):
return {
# return the args to be passed to the runnable
}