Skip to main content

MQTT triggers

Windmill can connect to an MQTT broker and trigger runnables (scripts, flows) in response to messages published to specified topics. MQTT triggers are not available on the Cloud.

For more details on MQTT, see the MQTT Protocol Documentation.

How to use

Configure MQTT resource

  • Select an existing MQTT resource or create a new one
  • Provide broker hostname and port
  • Add authentication credentials and certificates as required by your broker

Select runnable

  • Choose the script or flow to execute when messages are published to your subscribed topics

Configure topic subscriptions

  • Specify one or more topics to subscribe to
  • Set appropriate QoS level for each topic

Quality of Service (QoS) levels

LevelDescriptionWhen to use
0At most once – Message delivered once or not at all without confirmationChoose when it is okay for your script/flow to not be triggered (if the message is lost) or triggered only once.
1At least once – Guaranteed delivery but may arrive multiple timesChoose when it is okay for your script/flow to be triggered again by an already received message from the broker.
2Exactly once – Guaranteed delivery exactly onceChoose when you need your script/flow to be triggered only once and avoid any duplicates.

For more information about MQTT QoS, see the MQTT QoS Documentation.

MQTT topic structure

MQTT topics are case-sensitive and follow a hierarchical structure (e.g., home/sensor/temperature).
For best practices on MQTT topics, see the MQTT Topics Documentation.

Advanced MQTT options

By default, Windmill uses MQTT version 5. However, you can choose to use MQTT version 3 or MQTT version 5 with specific associated options.

Implementation examples

Below are code examples demonstrating how to handle MQTT messages in your Windmill scripts. You can either process messages directly in a basic script or use a preprocessor for more advanced message handling and transformation before execution.

Basic script

export async function main(payload: Array<number>) {
// Convert binary payload to string
const textPayload = new TextDecoder().decode(new Uint8Array(payload));

// Parse JSON if applicable
try {
const jsonData = JSON.parse(textPayload);
console.log("Received JSON data:", jsonData);
// Process JSON data
} catch (e) {
// Handle as plain text
console.log("Received text data:", textPayload);
// Process text data
}

return { processed: true, message: "MQTT message processed successfully" };
}

Script with preprocessor

If you use a preprocessor, the preprocessor function receives the message payload as byte array and an MQTT object with the following fields:

MQTT object

  • topic: The MQTT topic on which the message was received.
  • retain: Boolean indicating if the message is retained.
  • pkid: Packet identifier (if QoS > 0).
  • qos: Quality of Service level.
  • v5: MQTT v5 properties (optional).

MQTT v5 properties

  • payload_format_indicator: Indicates if the payload is UTF-8 encoded or binary.
  • topic_alias: An alias for the topic name.
  • response_topic: A topic for the recipient to send a response to.
  • correlation_data: Correlation data for request/response.
  • user_properties: A list of user-defined properties.
  • subscription_identifiers: Subscription identifiers.
  • content_type: The content type of the payload.

For more information about MQTT v5 properties, see the MQTT v5 Properties Documentation.

/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
mqtt?: {
topic: string,
retain: boolean,
pkid: number,
qos: number,
v5?: {
payload_format_indicator?: number,
topic_alias?: number,
response_topic?: string,
correlation_data?: Array<number>,
user_properties?: Array<[string, string]>,
subscription_identifiers?: Array<number>,
content_type?: string
}
}
},
payload: Array<number>,
) {
if (wm_trigger.kind === 'mqtt' && wm_trigger.mqtt) {
const uint8Payload = new Uint8Array(payload);
const payloadAsString = new TextDecoder().decode(uint8Payload);

return {
contentType: wm_trigger.mqtt.v5?.content_type,
payload: uint8Payload,
payloadAsString
};
}
// We assume the script is triggered by an MQTT message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected mqtt trigger kind got: ${wm_trigger.kind}`)
}

/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - MQTT v5 content type (if available)
*/
export async function main(payload: Uint8Array, payloadAsString: string, contentType?: string) {

}