Motivating custom events

Imagine you are running an e-commerce platform and you want to trigger a deployment when a customer completes an order.

There might be a number of events that occur during an order on your platform, for example:

  • order.created
  • order.item.added
  • order.payment-method.confirmed
  • order.shipping-method.added
  • order.complete

Event grammar

The above choices of event names are arbitrary. With Prefect events, you’re free to select any event grammar that best represents your use case.

In this case, we want to trigger a deployment when a user completes an order, so our trigger should:

  • expect an order.complete event
  • after an order.created event
  • evaluate these conditions for_each user id

Finally, it should pass the user_id as a parameter to the deployment.

Define the trigger

Here’s how this looks in code:

post_order_deployment.py
from prefect import flow
from prefect.events.schemas.deployment_triggers import DeploymentEventTrigger

order_complete = DeploymentEventTrigger(
    expect={"order.complete"},
    after={"order.created"},
    for_each={"prefect.resource.id"},
    parameters={"user_id": "{{ event.resource.id }}"},
)


@flow(log_prints=True)
def post_order_complete(user_id: str):
    print(f"User {user_id} has completed an order -- doing stuff now")


if __name__ == "__main__":
    post_order_complete.serve(triggers=[order_complete])

Specify multiple events or resources

The expect and after fields accept a set of event names, so you can specify multiple events for each condition.

Similarly, the for_each field accepts a set of resource ids.

Simulate events

To simulate users causing order status events, run the following in a Python shell or script:

simulate_events.py
import time
from prefect.events import emit_event

user_id_1, user_id_2 = "123", "456"
for event_name, user_id in [
    ("order.created", user_id_1),
    ("order.created", user_id_2), # other user
    ("order.complete", user_id_1),
]:
    event = emit_event(
        event=event_name,
        resource={"prefect.resource.id": user_id},
    )
    time.sleep(1)
    print(f"{user_id} emitted {event_name}")

In the above example:

  • user_id_1 creates and then completes an order, triggering a run of our deployment.
  • user_id_2 creates an order, but no completed event is emitted so no deployment is triggered.