Preprocessors
Scripts and flows can include a preprocessor to transform incoming requests before they are passed to the runnable. The preprocessor is only called when the runnable is triggered via an HTTP route, a webhook, an email trigger, a websocket trigger or a kafka trigger. This is useful for handling headers, query/path parameters, or modifying arguments before execution. Using a preprocessor also separates the handling of arguments according to whether they are called by a trigger (http, email or webhook) or from the UI, which can help you keep a simple schema form in the UI for the runnable.
Preprocessors can only be written in TypeScript (Bun/Deno) or Python.
Script preprocessor
In scripts, you need to export an additional preprocessor
function. It takes a special argument called wm_trigger
in addition to the request body arguments and should return the transformed arguments for the main function of the script.
The wm_trigger
contains the kind of the trigger (http, email or webhook) and an http object with the details of the request when the script is triggered via an HTTP route.
Here are examples of a preprocessor function in TypeScript and Python:
- TypeScript
- Python
export async function preprocessor(
/* args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats, raw_email and parsed_email for email) */
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
method: string
params: Record<string, string>
query: Record<string, string>
headers: Record<string, string>
},
websocket?: {
url: string // The websocket URL
},
kafka?: {
brokers: string[]
topic: string
group_id: string
},
nats?: {
servers: string[]
subject: string
headers?: Record<string, string[]>
status?: number
description?: string
length: number
}
}
) {
return {
// return the args to be passed to the main function
}
}
export async function main(/* main args */) {
// your code here
}
from typing import TypedDict, Literal
class Http(TypedDict):
route: str # The route path, e.g. "/users/:id"
path: str # The actual path called, e.g. "/users/123"
method: str
params: dict[str, str]
query: dict[str, str]
headers: dict[str, str]
class Websocket(TypedDict):
url: str # The websocket URL
class Kafka(TypedDict):
topic: str
brokers: list[str]
group_id: str
class Nats(TypedDict):
servers: list[str]
subject: str
headers: dict[str, list[str]] | None
status: int | None
description: str | None
length: int
class WmTrigger(TypedDict):
kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats"]
http: Http | None
websocket: Websocket | None
kakfa: Kafka | None
nats: Nats | None
def preprocessor(
# args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats, raw_email and parsed_email for email)
wm_trigger: WmTrigger,
):
return {
# return the args to be passed to the main function
}
def main(/* main args */):
# your code here
Once a preprocessor is created, you should see a new tab in the right panel of the editor that allows you to test the preprocessor with a sample request.
Flow preprocessor
For flows, the idea is similar but the preprocessor is a standalone step that returns only a preprocessor
function.
To create a preprocessor for a flow, click on the plus button above the Input
step:
The flow preprocessor takes the same arguments as the script preprocessor and should return the transformed arguments for the flow:
- TypeScript
- Python
export async function preprocessor(
/* args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats, raw_email and parsed_email for email) */
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
method: string
params: Record<string, string>
query: Record<string, string>
headers: Record<string, string>
},
websocket?: {
url: string // The websocket URL
},
kafka?: {
brokers: string[]
topic: string
group_id: string
},
nats?: {
servers: string[]
subject: string
}
}
) {
return {
// return the args to be passed to the flow
}
}
from typing import TypedDict, Literal
class Http(TypedDict):
route: str # The route path, e.g. "/users/:id"
path: str # The actual path called, e.g. "/users/123"
method: str
params: dict[str, str]
query: dict[str, str]
headers: dict[str, str]
class Websocket(TypedDict):
url: str # The websocket URL
class Kafka(TypedDict):
brokers: list[str]
topic: str
group_id: str
class Nats(TypedDict):
servers: list[str]
subject: str
headers: dict[str, list[str]] | None
status: int | None
description: str | None
length: int
class WmTrigger(TypedDict):
kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats"]
http: Http | None
websocket: Websocket | None
kafka: Kafka | None
nats: Nats | None
def preprocessor(
# args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats, raw_email and parsed_email for email)
wm_trigger: WmTrigger,
):
return {
# return the args to be passed to the flow
}