Skip to main content
When you emit custom events with payloads, you can pass that payload data to flows triggered by automations. This is useful for event-driven workflows where external systems or APIs send structured data that your flows need to process.

Define a flow that accepts payload data

Create a flow that accepts a dictionary payload:
from typing import Any

from prefect import flow


@flow(log_prints=True)
def process_webhook(payload: dict[str, Any]):
    """Process data from a custom event"""
    print(f"User: {payload.get('user_id')}")
    print(f"Action: {payload.get('action')}")
    print(f"Full payload: {payload}")


if __name__ == "__main__":
    process_webhook.serve(
        name="webhook-processor",
        tags=["webhook", "event-driven"],
    )

Emit a custom event with payload

Emit custom events with structured payloads using the Prefect CLI:
prefect event emit api.webhook.received \
  --resource-id custom.webhook.handler \
  --payload '{
    "user_id": "123",
    "action": "created",
    "resource_type": "order",
    "timestamp": "2024-01-15T10:30:00Z"
  }'
Or using the Python SDK:
from prefect.events import emit_event

emit_event(
    event="api.webhook.received",
    resource={
        "prefect.resource.id": "custom.webhook.handler",
        "prefect.resource.role": "api"
    },
    payload={
        "user_id": "123",
        "action": "created",
        "resource_type": "order",
        "timestamp": "2024-01-15T10:30:00Z"
    }
)

Configure the automation trigger

Create a new automation from the Automations page. In the trigger configuration, set it to listen for your custom event. The trigger definition should look like this in the JSON tab:
{
  "type": "event",
  "match": {
    "prefect.resource.id": [
      "custom.webhook.handler"
    ]
  },
  "match_related": {},
  "after": [],
  "expect": [
    "api.webhook.received"
  ],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 0
}
Finding your custom eventsYou can view all events, including your custom events, in the Event Feed in the Prefect UI. This helps you verify the exact event names and payload structure to use in your automation configuration.

Configure the action to pass the payload

Add a “Run a deployment” action and configure it to pass the event payload as a dictionary to your flow. In the UI, you’ll need to use “custom JSON” mode for the payload parameter and enter:
{
  "__prefect_kind": "json",
  "value": {
    "__prefect_kind": "jinja",
    "template": "{{ event.payload | tojson }}"
  }
}
This nested structure:
  1. Renders the Jinja template {{ event.payload | tojson }} to a JSON string
  2. Automatically parses the JSON string back to a dictionary using the json handler
This allows your flow to receive the payload as a dict type rather than a string.

Define everything in Python

You can define the flow and trigger entirely in Python using DeploymentEventTrigger:
event_payload_example.py
from typing import Any

from prefect import flow, serve
from prefect.events import DeploymentEventTrigger


@flow(log_prints=True)
def process_webhook(payload: dict[str, Any]):
    """Process data from a custom event"""
    print(f"User: {payload.get('user_id')}")
    print(f"Action: {payload.get('action')}")
    print(f"Full payload: {payload}")


if __name__ == "__main__":
    deployment = process_webhook.to_deployment(
        name="webhook-processor",
        triggers=[
            DeploymentEventTrigger(
                expect={"api.webhook.received"},
                parameters={
                    "payload": {
                        "__prefect_kind": "json",
                        "value": {
                            "__prefect_kind": "jinja",
                            "template": "{{ event.payload | tojson }}",
                        }
                    }
                },
            )
        ],
    )

    serve(deployment)
Start the serve process:
python event_payload_example.py
Get the deployment ID from the output, then emit an event targeting that deployment:
prefect event emit api.webhook.received \
  --resource-id prefect.deployment.<deployment-id> \
  --payload '{"user_id": "123", "action": "created", "resource_type": "order"}'
Targeting the deploymentWhen using DeploymentEventTrigger, events must target the deployment’s resource ID (prefect.deployment.<deployment-id>). You can find the deployment ID in the serve output or with prefect deployment ls.

Alternative: Pass individual payload fields

If you only need specific fields from the payload, you can access them individually using Jinja templates for each parameter:
from prefect import flow, serve
from prefect.events import DeploymentEventTrigger


@flow(log_prints=True)
def process_webhook(user_id: str, action: str, resource_type: str):
    """Process individual fields from a custom event"""
    print(f"Processing {action} for user {user_id}")
    print(f"Resource type: {resource_type}")


if __name__ == "__main__":
    deployment = process_webhook.to_deployment(
        name="webhook-processor-fields",
        triggers=[
            DeploymentEventTrigger(
                expect={"api.webhook.received"},
                parameters={
                    "user_id": {
                        "__prefect_kind": "jinja",
                        "template": "{{ event.payload.user_id }}",
                    },
                    "action": {
                        "__prefect_kind": "jinja",
                        "template": "{{ event.payload.action }}",
                    },
                    "resource_type": {
                        "__prefect_kind": "jinja",
                        "template": "{{ event.payload.resource_type }}",
                    },
                },
            )
        ],
    )

    serve(deployment)
This approach is useful when your flow expects specific typed parameters (like strings or integers) rather than a generic dictionary. Each field from the event payload is extracted and passed as a separate parameter to your flow.
I