Trigger actions on events
Automations provide a flexible and powerful framework for automatically taking action in response to events.
Automations enable you to configure actions that execute automatically based on trigger conditions.
Potential triggers include the occurrence of events from changes in a flow run’s state—or the absence of such events. You can define your own custom trigger to fire based on a custom event defined in Python code. With Prefect Cloud you can even create webhooks that can receive data for use in actions.
Actions include starting flow runs, pausing schedules, and sending custom notifications.
Create an automation
On the Automations page, select the + icon to create a new automation. You’ll be prompted to configure:
- A trigger condition that causes the automation to execute.
- One or more actions carried out by the automation.
- Details about the automation, such as a name and description.
Manage automations
The Automations page provides an overview of all configured automations for your workspace.
Select the toggle next to an automation to pause execution of the automation.
The button next to the toggle provides commands to copy the automation ID, edit the automation, or delete the automation.
Select the name of an automation to view Details about it and relevant Events.
Triggers
Triggers specify the conditions under which your action should be performed. The Prefect UI includes templates for many common conditions, such as:
- Flow run state change (Flow Run Tags are only evaluated with
OR
criteria) - Work pool status
- Work queue status
- Deployment status
- Metric thresholds, such as average duration, lateness, or completion percentage
- Incident declarations (available on Pro and Enterprise plans)
- Custom event triggers
Automations API
The automations API enables further programmatic customization of trigger and action policies based on arbitrary events.
Importantly, you can configure the triggers not only in reaction to events, but also proactively: in the absence of an expected event.
For example, in the case of flow run state change triggers, you might expect production flows to finish in no longer than thirty minutes. But transient infrastructure or network issues could cause your flow to get “stuck” in a running state. A trigger could kick off an action if the flow stays in a running state for more than 30 minutes.
This action could be taken on the flow itself, such as cancelling or restarting it. Or the action could take the form of a notification for someone to take manual remediation steps. Or you could set both actions to take place when the trigger occurs.
Actions
Actions specify what your automation does when its trigger criteria are met. Current action types include:
- Cancel a flow run
- Pause or resume a schedule
- Run a deployment
- Pause or resume a deployment schedule
- Pause or resume a work pool
- Pause or resume a work queue
- Pause or resume an automation
- Send a notification
- Call a webhook
- Suspend a flow run
- Declare an incident (available on Pro and Enterprise plans)
- Change the state of a flow run
Create automations In Python code
You can create and access any automation with the Python SDK’s Automation
class and its methods.
from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger
from prefect.server.events.actions import CancelFlowRun
# creating an automation
automation =
Automation(
name="woodchonk",
trigger=EventTrigger(
expect={"animal.walked"},
match={
"genus": "Marmota",
"species": "monax",
},
posture="Reactive",
threshold=3,
),
actions=[CancelFlowRun()]
).create()
print(automation)
# name='woodchonk' description='' enabled=True trigger=EventTrigger(type='event', match=ResourceSpecification(__root__={'genus': 'Marmota', 'species': 'monax'}), match_related=ResourceSpecification(__root__={}), after=set(), expect={'animal.walked'}, for_each=set(), posture=Posture.Reactive, threshold=3, within=datetime.timedelta(seconds=10)) actions=[CancelFlowRun(type='cancel-flow-run')] actions_on_trigger=[] actions_on_resolve=[] owner_resource=None id=UUID('d641c552-775c-4dc6-a31e-541cb11137a6')
# reading the automation
automation = Automation.read(id ="d641c552-775c-4dc6-a31e-541cb11137a6")
or
automation = Automation.read("woodchonk")
print(automation)
# name='woodchonk' description='' enabled=True trigger=EventTrigger(type='event', match=ResourceSpecification(__root__={'genus': 'Marmota', 'species': 'monax'}), match_related=ResourceSpecification(__root__={}), after=set(), expect={'animal.walked'}, for_each=set(), posture=Posture.Reactive, threshold=3, within=datetime.timedelta(seconds=10)) actions=[CancelFlowRun(type='cancel-flow-run')] actions_on_trigger=[] actions_on_resolve=[] owner_resource=None id=UUID('d641c552-775c-4dc6-a31e-541cb11137a6')
Selected and inferred action targets
Some actions require you to either select the target of the action, or specify that the target of the action should be inferred. Selected targets are simple and useful for when you know exactly what object your action should act on. For example, the case of a cleanup flow you want to run or a specific notification you want to send.
Inferred targets are deduced from the trigger itself.
For example, if a trigger fires on a flow run that is stuck in a running state, and the action is to cancel an inferred flow run—the flow run that caused the trigger to fire.
Similarly, if a trigger fires on a work queue event and the corresponding action is to pause an inferred work queue, the inferred work queue is the one that emitted the event.
Prefect infers the relevant event whenever possible, but sometimes one does not exist.
Specify a name and, optionally, a description for the automation.
Create an automation with deployment triggers
To enable the simple configuration of event-driven deployments, Prefect provides deployment triggers—a shorthand for creating automations that are linked to specific deployments to run them based on the presence or absence of events.
Trigger definitions for deployments are supported in prefect.yaml
, .serve
, and .deploy
. At deployment time,
specified trigger definitions create linked automations triggered by events matching your chosen
grammar. Each trigger definition may include a jinja template
to render the triggering event
as the parameters
of your deployment’s flow run.
Define triggers in prefect.yaml
You can include a list of triggers on any deployment in a prefect.yaml
file:
deployments:
- name: my-deployment
entrypoint: path/to/flow.py:decorated_fn
work_pool:
name: my-work-pool
triggers:
- type: event
enabled: true
match:
prefect.resource.id: my.external.resource
expect:
- external.resource.pinged
parameters:
param_1: "{{ event }}"
This deployment creates a flow run when an external.resource.pinged
event and an external.resource.replied
event have been seen from my.external.resource
:
deployments:
- name: my-deployment
entrypoint: path/to/flow.py:decorated_fn
work_pool:
name: my-work-pool
triggers:
- type: compound
require: all
parameters:
param_1: "{{ event }}"
triggers:
- type: event
match:
prefect.resource.id: my.external.resource
expect:
- external.resource.pinged
- type: event
match:
prefect.resource.id: my.external.resource
expect:
- external.resource.replied
Define triggers in .serve
and .deploy
To create deployments with triggers in Python, the trigger types DeploymentEventTrigger
,
DeploymentMetricTrigger
, DeploymentCompoundTrigger
, and DeploymentSequenceTrigger
can be imported
from prefect.events
:
from prefect import flow
from prefect.events import DeploymentEventTrigger
@flow(log_prints=True)
def decorated_fn(param_1: str):
print(param_1)
if __name__=="__main__":
decorated_fn.serve(
name="my-deployment",
triggers=[
DeploymentEventTrigger(
enabled=True,
match={"prefect.resource.id": "my.external.resource"},
expect=["external.resource.pinged"],
parameters={
"param_1": "{{ event }}",
},
)
],
)
As with prior examples, you must supply composite triggers with a list of underlying triggers:
from prefect import flow
from prefect.events import DeploymentCompoundTrigger
@flow(log_prints=True)
def decorated_fn(param_1: str):
print(param_1)
if __name__=="__main__":
decorated_fn.deploy(
name="my-deployment",
image="my-image-registry/my-image:my-tag"
triggers=[
DeploymentCompoundTrigger(
enabled=True,
name="my-compound-trigger",
require="all",
triggers=[
{
"type": "event",
"match": {"prefect.resource.id": "my.external.resource"},
"expect": ["external.resource.pinged"],
},
{
"type": "event",
"match": {"prefect.resource.id": "my.external.resource"},
"expect": ["external.resource.replied"],
},
],
parameters={
"param_1": "{{ event }}",
},
)
],
work_pool_name="my-work-pool",
)
Pass triggers to prefect deploy
You can pass one or more --trigger
arguments to prefect deploy
as either a JSON string or a
path to a .yaml
or .json
file.
# Pass a trigger as a JSON string
prefect deploy -n test-deployment \
--trigger '{
"enabled": true,
"match": {
"prefect.resource.id": "prefect.flow-run.*"
},
"expect": ["prefect.flow-run.Completed"]
}'
# Pass a trigger using a JSON/YAML file
prefect deploy -n test-deployment --trigger triggers.yaml
prefect deploy -n test-deployment --trigger my_stuff/triggers.json
For example, a triggers.yaml
file could have many triggers defined:
triggers:
- enabled: true
match:
prefect.resource.id: my.external.resource
expect:
- external.resource.pinged
parameters:
param_1: "{{ event }}"
- enabled: true
match:
prefect.resource.id: my.other.external.resource
expect:
- some.other.event
parameters:
param_1: "{{ event }}"
Both of the above triggers would be attached to test-deployment
after running prefect deploy
.
Triggers passed to prefect deploy
will override any triggers defined in prefect.yaml
While you can define triggers in prefect.yaml
for a given deployment, triggers passed to prefect deploy
take precedence over those defined in prefect.yaml
.
Note that deployment triggers contribute to the total number of automations in your workspace.
Sending notifications with automations
Automations support sending notifications through any predefined block that is capable of and configured to send a message, including:
- Slack message to a channel
- Microsoft Teams message to a channel
- Email to an email address
Templating with Jinja
You can access templated variables with automation actions through Jinja syntax. Templated variables enable you to dynamically include details from an automation trigger, such as a flow or pool name.
Jinja templated variable syntax wraps the variable name in double curly brackets, like this: {{ variable }}
.
You can access properties of the underlying flow run objects including:
In addition to its native properties, each object includes an id
along with created
and updated
timestamps.
The flow_run|ui_url
token returns the URL to view the flow run in the UI.
Here’s an example relevant to a flow run state-based notification:
Flow run {{ flow_run.name }} entered state {{ flow_run.state.name }}.
Timestamp: {{ flow_run.state.timestamp }}
Flow ID: {{ flow_run.flow_id }}
Flow Run ID: {{ flow_run.id }}
State message: {{ flow_run.state.message }}
The resulting Slack webhook notification looks something like this:
You could include flow
and deployment
properties:
Flow run {{ flow_run.name }} for flow {{ flow.name }}
entered state {{ flow_run.state.name }}
with message {{ flow_run.state.message }}
Flow tags: {{ flow_run.tags }}
Deployment name: {{ deployment.name }}
Deployment version: {{ deployment.version }}
Deployment parameters: {{ deployment.parameters }}
An automation that reports on work pool status might include notifications using work_pool
properties:
Work pool status alert!
Name: {{ work_pool.name }}
Last polled: {{ work_pool.last_polled }}
In addition to those shortcuts for flows, deployments, and work pools, you have access to the automation and the event that triggered the automation. See the Automations API for additional details.
Automation: {{ automation.name }}
Description: {{ automation.description }}
Event: {{ event.id }}
Resource:
{% for label, value in event.resource %}
{{ label }}: {{ value }}
{% endfor %}
Related Resources:
{% for related in event.related %}
Role: {{ related.role }}
{% for label, value in related %}
{{ label }}: {{ value }}
{% endfor %}
{% endfor %}
Note that this example also illustrates the ability to use Jinja features such as iterator and for loop control structures when templating notifications.
API example
This example grabs data from an API and sends a notification based on the end state.
Create the example script
Start by pulling hypothetical user data from an endpoint and then performing data cleaning and transformations.
First create a simple extract method that pulls the data from a random user data generator endpoint:
from prefect import flow, task, get_run_logger
import requests
import json
@task
def fetch(url: str):
logger = get_run_logger()
response = requests.get(url)
raw_data = response.json()
logger.info(f"Raw response: {raw_data}")
return raw_data
@task
def clean(raw_data: dict):
print(raw_data.get('results')[0])
results = raw_data.get('results')[0]
logger = get_run_logger()
logger.info(f"Cleaned results: {results}")
return results['name']
@flow
def build_names(num: int = 10):
df = []
url = "https://randomuser.me/api/"
logger = get_run_logger()
copy = num
while num != 0:
raw_data = fetch(url)
df.append(clean(raw_data))
num -= 1
logger.info(f"Built {copy} names: {df}")
return df
if __name__ == "__main__":
list_of_names = build_names()
The data cleaning workflow has visibility into each step, and sends a list of names to the next step of the pipeline.
Create a notification block in the UI
Next, send a notification based off a completed state outcome. Configure a notification that shows when to look into your workflow logic.
-
Prior to creating the automation, confirm the notification location. Create a notification block to help define where the notification is sent.
-
Navigate to the blocks page on the UI, and click into creating an email notification block.
-
Go to the automations page to create your first automation.
-
Next, find the trigger type. In this case, use a flow completion.
-
Create the actions for when the trigger is hit. In this case, create a notification to showcase the completion.
-
Now the automation is ready to be triggered from a flow run completion. Run the file locally and see that the notification is sent to your inbox after the completion. It may take a few minutes for the notification to arrive.
No deployment created
You do not need to create a deployment to trigger your automation. In the case above, the flow run state trigger fired in response to a flow run that was executed locally.
Now that you’ve seen how to create an email notification from a flow run completion, see how to kick off a deployment run in response to an event.
Event-based deployment automation
Create an automation to kick off a deployment instead of a notification. Explore how to programmatically create this automation with Prefect’s REST API.
See the REST API documentation as a reference for interacting with the automation endpoints.
Create a deployment to kick off some work based on how long a flow is running. For example, if the build_names
flow
takes too long to execute, you can kick off a deployment with the same build_names
flow, but replace the count
value
with a lower number to speed up completion.
Create a deployment with a prefect.yaml
file or a Python file that uses flow.deploy
.
Create a prefect.yaml
file like this one for our flow build_names
:
# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.
# Generic metadata about this project
name: automations-guide
prefect-version: 3.0.0
# build section allows you to manage and build docker images
build: null
# push section allows you to manage if and how this project is uploaded to remote locations
push: null
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
directory: /Users/src/prefect/Playground/automations-guide
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: deploy-build-names
version: null
tags: []
description: null
entrypoint: test-automations.py:build_names
parameters: {}
work_pool:
name: tutorial-process-pool
work_queue_name: null
job_variables: {}
schedule: null
Grab your deployment_id
from this deployment with the CLI and embed it in your automation.
Find deployment_id from the CLI
Run prefect deployment ls
in an authenticated command prompt.
prefect deployment ls
Deployments
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Name ┃ ID ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Extract islands/island-schedule │ d9d7289c-7a41-436d-8313-80a044e61532 │
│ build-names/deploy-build-names │ 8b10a65e-89ef-4c19-9065-eec5c50242f4 │
│ ride-duration-prediction-backfill/backfill-deployment │ 76dc6581-1773-45c5-a291-7f864d064c57 │
└───────────────────────────────────────────────────────┴──────────────────────────────────────┘
Create an automation with a POST call to programmatically create the automation. Ensure you have your api_key
, account_id
, and workspace_id
.
def create_event_driven_automation():
api_url = f"https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/automations/"
data = {
"name": "Event Driven Redeploy",
"description": "Programmatically created an automation to redeploy a flow based on an event",
"enabled": "true",
"trigger": {
"after": [
"string"
],
"expect": [
"prefect.flow-run.Running"
],
"for_each": [
"prefect.resource.id"
],
"posture": "Proactive",
"threshold": 30,
"within": 0
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "YOUR-DEPLOYMENT-ID",
"parameters": "10"
}
],
"owner_resource": "string"
}
headers = {"Authorization": f"Bearer {PREFECT_API_KEY}"}
response = requests.post(api_url, headers=headers, json=data)
print(response.json())
return response.json()
After running this function, you will see the changes that came from the post request within the UI. Keep in mind the context is “custom” on UI.
Run the underlying flow and see the deployment kick off after 30 seconds. This results in a new flow run of build_names
.
You can see this new deployment get initiated with the custom parameters outlined above.
In a few quick changes, you can programmatically create an automation that deploys workflows with custom parameters.
Use an underlying .yaml file
You can take this a step further by using your own .yaml version of the automation, and registering that file with the UI. This simplifies the requirements of the automation by declaring it in its own .yaml file, and then registering that .yaml with the API.
First start with creating the .yaml file to house the automation requirements:
name: Cancel long running flows
description: Cancel any flow run after an hour of execution
trigger:
match:
"prefect.resource.id": "prefect.flow-run.*"
match_related: {}
after:
- "prefect.flow-run.Failed"
expect:
- "prefect.flow-run.*"
for_each:
- "prefect.resource.id"
posture: "Proactive"
threshold: 1
within: 30
actions:
- type: "cancel-flow-run"
Make a helper function that applies this YAML file with the REST API function.
import yaml
from utils import post, put
def create_or_update_automation(path: str = "automation.yaml"):
"""Create or update an automation from a local YAML file"""
# Load the definition
with open(path, "r") as fh:
payload = yaml.safe_load(fh)
# Find existing automations by name
automations = post("/automations/filter")
existing_automation = [a["id"] for a in automations if a["name"] == payload["name"]]
automation_exists = len(existing_automation) > 0
# Create or update the automation
if automation_exists:
print(f"Automation '{payload['name']}' already exists and will be updated")
put(f"/automations/{existing_automation[0]}", payload=payload)
else:
print(f"Creating automation '{payload['name']}'")
post("/automations/", payload=payload)
if __name__ == "__main__":
create_or_update_automation()
Find a complete repo with these APIs examples in this GitHub repository.
In this example, you created the automation by registering the .yaml file with a helper function.
Kick off an automation with a custom webhook
Use webhooks to expose the events API. This allows you to extend the functionality of deployments and respond to changes in your workflow.
By exposing a webhook endpoint, you can kick off workflows that trigger deployments, all from an event created from an HTTP request.
Create this webhook in the UI to create these dynamic events.
{
"event": "model-update",
"resource": {
"prefect.resource.id": "product.models.{{ body.model_id}}",
"prefect.resource.name": "{{ body.friendly_name }}",
"run_count": "{{body.run_count}}"
}
}
From this input, you can create an exposed webhook endpoint.
Each webhook corresponds to a custom event created where you can react to it downstream with a separate deployment or automation.
For example, you can create a curl request that sends the endpoint information such as a run count for your deployment:
curl -X POST https://api.prefect.cloud/hooks/34iV2SFke3mVa6y5Y-YUoA -d "model_id=adhoc" -d "run_count=10" -d "friendly_name=test-user-input"
From here, you can make a webhook that is connected to pulling in parameters on the curl command. It kicks off a deployment that uses these pulled parameters:
Go into the event feed to automate straight from this event:
This allows you to create automations that respond to these webhook events. From a few clicks in the UI, you can associate an external process with the Prefect events API that can trigger downstream deployments.
Examples
Trigger a downstream deployment with an event
This example shows how to use a trigger to schedule a downstream deployment when an upstream deployment completes.
from prefect import flow, serve
from prefect.events import DeploymentEventTrigger
@flow(log_prints=True)
def upstream_flow():
print("upstream flow")
@flow(log_prints=True)
def downstream_flow():
print("downstream flow")
if __name__ == "__main__":
upstream_deployment = upstream_flow.to_deployment(name="upstream_deployment")
downstream_deployment = downstream_flow.to_deployment(
name="downstream_deployment",
triggers=[
DeploymentEventTrigger(
expect={"prefect.flow-run.Completed"},
match_related={"prefect.resource.name": "upstream_deployment"},
)
],
)
serve(upstream_deployment, downstream_deployment)
First, start the serve
process to listen for scheduled deployments runs:
python event_driven_deployments.py
Now, run the upstream deployment and see the downstream deployment kick off after it completes:
prefect deployment run upstream-flow/upstream_deployment
Check the event feed
You can inspect raw events in the event feed in the UI to see what related resources are available to match against.
For example, the following prefect.flow-run.Completed
event’s related resources include:
{
"related": [
{
"prefect.resource.id": "prefect.flow.10e099ec-8358-4146-b188-be68027ee58f",
"prefect.resource.role": "flow",
"prefect.resource.name": "upstream-flow"
},
{
"prefect.resource.id": "prefect.deployment.be777bbd-4b15-49f3-bc1f-4d109374cee2",
"prefect.resource.role": "deployment",
"prefect.resource.name": "upstream_deployment"
},
{
"prefect.resource.id": "prefect-cloud.user.80546602-9f31-4396-ab4b-e873a5377feb",
"prefect.resource.role": "creator",
"prefect.resource.name": "stoat"
}
]
}
Trigger a deployment when a customer completes an order
Imagine you are running an e-commerce platform and you want to trigger a deployment when a customer completes an order.
There might be a number of events that occur during an order on your platform, for example:
order.created
order.item.added
order.payment-method.confirmed
order.shipping-method.added
order.complete
Event grammar
The above choices of event names are arbitrary. With Prefect events, you’re free to select any event grammar that best represents your use case.
In this case, we want to trigger a deployment when a user completes an order, so our trigger should:
expect
anorder.complete
eventafter
anorder.created
event- evaluate these conditions
for_each
user id
Finally, it should pass the user_id
as a parameter to the deployment. Here’s how this looks in code:
from prefect import flow
from prefect.events.schemas.deployment_triggers import DeploymentEventTrigger
order_complete = DeploymentEventTrigger(
expect={"order.complete"},
after={"order.created"},
for_each={"prefect.resource.id"},
parameters={"user_id": "{{ event.resource.id }}"},
)
@flow(log_prints=True)
def post_order_complete(user_id: str):
print(f"User {user_id} has completed an order -- doing stuff now")
if __name__ == "__main__":
post_order_complete.serve(triggers=[order_complete])
Specify multiple events or resources
The expect
and after
fields accept a set
of event names, so you can specify multiple events for each condition.
Similarly, the for_each
field accepts a set
of resource ids.
To simulate users causing order status events, run the following in a Python shell or script:
import time
from prefect.events import emit_event
user_id_1, user_id_2 = "123", "456"
for event_name, user_id in [
("order.created", user_id_1),
("order.created", user_id_2), # other user
("order.complete", user_id_1),
]:
event = emit_event(
event=event_name,
resource={"prefect.resource.id": user_id},
)
time.sleep(1)
print(f"{user_id} emitted {event_name}")
In the above example:
user_id_1
creates and then completes an order, triggering a run of our deployment.user_id_2
creates an order, but no completed event is emitted so no deployment is triggered.
See also
- To learn more about Prefect events, which can trigger automations, see the events docs.
- See the webhooks guide to learn how to create webhooks and receive external events.
Was this page helpful?