This example shows how to use a perpetual script to implement a service in Windmill leveraging Apache Kafka. Services are processes listening to certain events and triggering actions based on the events they received, and this is now easily achievable in Windmill.
This blog post was written before Windmill's native Kafka triggers feature was released. While the pattern described here still works, we now recommend using Kafka triggers for a more streamlined experience. Kafka triggers are available as part of Windmill Enterprise and allow you to directly connect Kafka topics to Windmill runnables without needing to write a perpetual script listener.
First, we need a messaging service to listen to. Here we will use Kafka, but it can easily be adapted to others. In Windmill, we are going to implement a perpetual script that will listen to events coming from a Kafka topic. On every event received, the perpetual script will spin off a Windmill job with the content of the event being passed as an argument to the job.
For this blog post, the consumer of the event will only print the event content, but you can make it do whatever you want with it (ping a Slack channel, update a database table, etc.)
Setup
First, we're going to set-up a stack with the following:
- Kafka + Zookeeper to have a working Kafka instance to play with.
- A Windmill cluster composed of one server and 2 workers. We need 3 workers here to be able to run multiple jobs in parallel (the listener and the producer). If you are fine sending messages to Kafka using the CLI, then one worker will be enough.
We wrote a docker-compose.yml to easily build this stack:
docker compose up -d
Create a Kafka topic
The easiest is to do it via Windmill, but you can also do it with Kafka CLI. Go to your local Windmill and create a Python script with the following content. It simply creates the topic in Kafka and returns.
from confluent_kafka.admin import AdminClient, NewTopic
def main(topic_name:str = "windmill-events" ):
admin_client = AdminClient({'bootstrap.servers': 'kafka:9092'})
new_topic = NewTopic(topic_name)
topic_created = admin_client.create_topics([new_topic])
for topic, response in topic_created.items():
try:
response.result()
print("Topic {} created".format(topic))
except Exception as e:
raise Exception("Failed to create topic {}: {}".format(topic, e))
You can then run this script with topic name of your choice. For the rest of this post, we will use the topic windmill-events
.
Details
Want to do it from the terminal?
Run the following command to create the topic from within the Kafka container:docker exec -it $KAFKA_CONTAINER_ID kafka-topics.sh --create --topic windmill-events --bootstrap-server localhost:9092
Create a topic listener in Windmill
As said in the intro, the purpose of this perpetual script is to listen to the windmill-events
topic and trigger new Windmill job when a message is received. The content is quite simple:
from confluent_kafka import Consumer
import wmill
MSG_CONSUMING_JOB_PATH = "u/admin/consume_message"
def main(kafka_topic: str = "windmill-events"):
client = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'windmill',
'auto.offset.reset': 'earliest'
})
client.subscribe([kafka_topic])
# The counter i is here to force the perpetual script to exit (and be auto-restarted by
# Windmill) after some time, no matter how many messages it has processed. It's a good
# practice time-bound jobs in general, and it this particular case it will avoid hitting
# the maximum logs size
i = 0
while i < 10000:
i += 1
msg = client.poll(timeout=30) # timeout of 60 seconds
if msg is None:
# print("No message after timeout. Looping")
continue
if msg.error():
raise Exception("Consumer error: {}".format(msg.error()))
payload = msg.value().decode('utf-8')
print('Message received ({}). Scheduling consuming job'.format(payload))
wmill.run_script_async(hash_or_path=MSG_CONSUMING_JOB_PATH, args={"msg": payload})
client.close()
return
Before deploying the script, don't forget to toggle the "Perpetual Script" toggle in the script settings. As a Perpetual script, Windmill will make sure to restart a new job every time one finishes. Here is a short tutorial on how to enable "Perpetual Scripts":
Lastly, we need to create u/admin/consume_message
script. As said previously, here it only prints the message content:
def main(
msg: str
):
print("A message has been received: {}".format(msg))
The listener script can now be started. It will run perpetually.
Publish messages to the Kafka topic
Finally, to prove that the above works, we need to publish messages to the Kafka topic. It can be done with Kafka CLI, but why not doing it in Windmill? Here is a script that will publish 10 messages with random sleep in between:
from confluent_kafka import Producer
import wmill
import random
import time
NUMBER_OR_MSGS = 10
MAX_SLEEP_SECS = 10
def main(kafka_topic: str = "windmill-events", msg: str = "Hello World!"):
for i in range(NUMBER_OR_MSGS):
sleep_secs = random.randint(0, MAX_SLEEP_SECS)
print("Sleeping for {}s".format(sleep_secs))
time.sleep(sleep_secs)
client = Producer({
'bootstrap.servers': 'kafka:9092',
})
client.poll(0)
client.produce(kafka_topic, msg.encode('utf-8'), callback=delivery_callback)
client.flush()
return
def delivery_callback(err, msg):
if err is not None:
raise Exception('Publishing message failed: {}'.format(err))
else:
print('Message delivered')
Details
Want to do it from the terminal?
Run the following log into the Kafka container and run thekafka-console-producer.sh
helper:docker exec -it $KAFKA_CONTAINER_ID kafka-console-producer.sh --topic windmill-events --bootstrap-server localhost:9092
One line is one message sent to the topic.
On every message, the listener will trigger the consuming script with the message payload, and Windmill will restart it immediately!
Learn more
To learn more about Perpetual Scripts, you can visit our dedicated docs page:
You can self-host Windmill using a
docker compose up
, or go with the cloud app.