Kafka triggers
Windmill can connect to Kafka brokers servers and trigger runnables (scripts, flows) when a message is received. Listening is done from the servers, so it doesn't take up any workers. Kafka triggers is a self-hosted Enterprise feature.
How to use
Create a new trigger on the Kafka triggers page. Add a Kafka resource with the broker hostnames (hostname:port) and the security settings. Specify the topics the trigger should listen to. The group id is automatically filled in from the current workspace and the trigger path. You can change it if necessary. It indicates the consumer group to which the trigger belongs. This garantees that even if the trigger stops listening for a while, it will receive the messages it missed when it starts listening again.
Once the Kafka resource and settings are set, select the runnable that should be triggered by this trigger.
The received webhook message will be passed to the runnable as a string argument called msg
.
Here's an example script:
export async function main(msg: string) {
// do something with the message
}
And if you use a preprocessor, the script could look like this:
export async function preprocessor(
msg: string,
wm_trigger: {
kind: "kafka",
kafka: {
brokers: string[];
topic: string; // the specific topic the message was received from
group_id: string;
}
},
) {
// assuming the message is a JSON object
const msg = JSON.parse(msg);
// define args for the main function
// let's assume we want to use the message content and the topic
return {
message_content: msg.content,
topic: wm_trigger.kafka.topic
};
}
export async function main(message_content: string, topic: string) {
// do something with the message content and topic
}