Skip to main content

GCP Pub/Sub triggers

Windmill can connect to Google Cloud Pub/Sub and trigger runnables (scripts, flows) when messages are published on topics.
You can configure Windmill to either pull messages from subscriptions or receive pushed messages via auto-generated endpoints.

Google Cloud Pub/Sub triggers is a self-hosted Enterprise feature.


How to use

Configure GCP connection

  • Select an existing GCP resource (service account credentials) or create a new one.

The service account used must have enough permissions for Windmill to fully manage Pub/Sub resources. Specifically:

  • Pub/Sub Viewer (roles/pubsub.viewer): to check if topics or subscriptions exist, list them.
  • Pub/Sub Subscriber (roles/pubsub.subscriber): to attach to subscriptions and consume messages.
  • Pub/Sub Editor (roles/pubsub.editor): needed to create or update subscriptions, and to optionally delete the subscription in the cloud when deleting the associated trigger if the user chooses to do so.

If you prefer not to assign these three individually, you can simply grant the Pub/Sub Admin role (roles/pubsub.admin).

Additionally, if you want to create authenticated push delivery subscriptions, the service account must also have Service Account User (roles/iam.serviceAccountUser) permission. See Authenticate Push Subscriptions for more details.

Subscription setup

Select topic and subscription

  • Choose a topic from your GCP project. You can refresh the list if needed.
  • Decide how to set up your subscription:
    • Create or update a subscription: Windmill will create a new subscription or update an existing one.
    • Use an existing subscription: Link an existing subscription from your GCP project.
When creating/updating a subscription:
  • Specify a Subscription ID, or leave it empty to auto-generate one.
  • Choose the delivery type:
    • Pull: Windmill sets the subscription as a Pull subscription.
    • Push: Windmill sets the subscription as a Push subscription.
      • For push delivery, Windmill sets the subscription's push endpoint URL to match the path of the trigger.
        The format is:
        {base_endpoint}/api/gcp/w/{workspace_id}/{trigger_path}
      • Example: if the trigger path is u/test/fabulous_trigger, the endpoint will be:
        {base_endpoint}/api/gcp/w/myworkspace/u/test/fabulous_trigger
      • When creating or updating a push subscription, Windmill allows you to configure:
        • Whether authentication is enabled or disabled.

Refer to Google Cloud Pub/Sub - Managing Subscriptions for more details about delivery types.

When using an existing subscription:
  • Select an existing subscription ID among the subscriptions fetched from the selected topic.
  • Windmill will automatically detect the subscription's delivery type based on the cloud configuration.
  • If the subscription is of push delivery type:
    • The subscription's endpoint URL must match the path of the trigger that will be bound to it.
    • The expected format is:
      {base_endpoint}/api/gcp/w/{workspace_id}/{trigger_path}

Note: You must not have multiple subscriptions pointing to the same trigger URL (for example, two subscriptions targeting {base_endpoint}/api/gcp/w/myworkspace/u/test/fabulous_trigger).

Choose the runnable

  • Select the script or flow to trigger when Pub/Sub messages are received.

Implementation examples

Below are examples for handling GCP Pub/Sub messages in Windmill.

Windmill provides the Pub/Sub message as the argument payload (a base64-encoded string) to your runnable.

Basic script

export async function main(payload: string) {
const decoded = new TextDecoder().decode(Uint8Array.from(atob(payload), c => c.charCodeAt(0)));

try {
const jsonData = JSON.parse(decoded);
console.log("Received JSON data:", jsonData);
// Process structured data
} catch (e) {
console.log("Received plain text:", decoded);
// Process raw text
}

return { processed: true };
}

Using a preprocessor

If you configure a preprocessor, you can extract fields before they reach the main function.

Windmill provides the Pub/Sub message as the argument payload (a base64-encoded string) to the preprocessor.

GCP Pub/Sub trigger object

  • subscription: Subscription ID
  • topic: Topic ID
  • message_id: Unique message ID
  • publish_time: Publish timestamp (RFC 3339 format with Z, e.g., "2024-04-07T12:34:56Z")
  • attributes: Key-value metadata
  • delivery_type: "push" or "pull" (the type of delivery)
  • ordering_key: Ordering key (optional, if message ordering is enabled)
  • headers: HTTP headers for push delivery (only present for push)

Example preprocessor:

export async function preprocessor(
wm_trigger: {
kind: 'gcp',
gcp?: {
message_id: string,
subscription: string,
ordering_key?: string,
attributes?: Record<string, string>,
delivery_type: "push" | "pull",
headers?: Record<string, string>,
publish_time?: string,
}
},
payload: string,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const decodedString = new TextDecoder().decode(Uint8Array.from(atob(payload), c => c.charCodeAt(0)));

const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';

let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}

return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}

throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`);
}

Then your main function can simply receive the extracted arguments:

export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>,
) {
console.log("Decoded String:", messageAsDecodedString);
console.log("Content-Type:", contentType);
console.log("Parsed Message:", parsedMessage);
console.log("Attributes:", attributes);
}

Troubleshooting

  • Permission issues: Verify the service account has required Pub/Sub permissions. If the correct permissions are set but you still encounter unauthorized or permission denied errors, it might indicate that Google has updated required permissions. Please contact Windmill support so we can investigate and assist.
  • Push delivery failures: If using existing subscription ensure the push endpoint URL matches the required format ({base_endpoint}/api/gcp/w/{workspace_id}/{trigger_path}) and is unique across the workspace.
  • Topic or subscription not found: Refresh the list to fetch the latest available resources.