# Notifications and State Handlers

Alerts, notifications, and dynamically responding to task state are important features of any workflow tool. Using Prefect primitives, users can create Tasks that send notifications after certain tasks run or fail using Prefect's trigger logic. This will work, but does not cover more subtle uses of notification logic (e.g., receiving a notification if a task retries). For this reason, Prefect introduces a flexible concept called "state handlers", which can be attached to individual tasks or flows. At a high level, a state handler is a function that is called on every change of state for the underlying object; these can be used for sending alerts upon failure, emails upon success, or more nuanced handling based on the information contained in both the old and new states.

In addition to working with the state_handler API directly, Prefect provides higher level wrappers for implementing common use cases such as failure callbacks.

Prefect States

State handlers are intimately connected with Prefect's concept of "State". We recommend reviewing the concept doc on States before reading further.

# State Handlers

Let's start with the definition of a state handler:

def state_handler(obj: Union[Task, Flow], old_state: State, new_state: State) -> Optional[State]:
    Any function with this signature can serve as a state handler.

        - obj (Union[Task, Flow]): the underlying object to which this state handler
            is attached
        - old_state (State): the previous state of this object
        - new_state (State): the proposed new state of this object

        - Optional[State]: the new state of this object (typically this is just `new_state`)

As you can see, the state handler API is a simple way for executing arbitrary Python code with every state change.

A simple example will clarify its use:

from prefect import Task, Flow

def my_state_handler(obj, old_state, new_state):
    msg = "\nCalling my custom state handler on {0}:\n{1} to {2}\n"
    print(msg.format(obj, old_state, new_state))
    return new_state

my_flow = Flow(name="state-handler-demo",

Ignoring logs, this should output:

Calling my custom state handler on <Flow: name=state-handler-demo>:
Scheduled() to Running("Running flow.")

Calling my custom state handler on <Flow: name=state-handler-demo>:
Running("Running flow.") to Success("All reference tasks succeeded.")

In exactly the same way, we can attach this state handler to an individual task instead of the flow:

t = Task(state_handlers=[my_state_handler])
flow = Flow(name="state-handler-demo", tasks=[t])

Once again ignoring logs, this should output:

Calling my custom state handler on <Task: Task>:
Pending() to Running("Starting task run.")

Calling my custom state handler on <Task: Task>:
Running("Starting task run.") to Success("Task run succeeded.")

At the end of the day, that's all there is to it! However, the simplicity of the API belies the many possible usage patterns for this feature, which is what we will look at next.


For the sake of simplicity, for the rest of this document we will focus on task state handlers, but everything we discuss applies equally to flows.

# Sending a simple notification

As our above example demonstrated, it is very easy to intercept task states and respond to them. In fact, our previous example can be thought of as a simple notification system in which we print notifications to stdout. Let's take this one step further and write a notifier that posts an update to Slack whenever our task has finished its run. In order for this example to work, you'll need to have a Slack app setup with an incoming webhook. If you don't use Slack or don't have an app, no worries - just swap out the Slack URL with any other webserver you might have access to.

import requests
from prefect import Task, Flow

def post_to_slack(task, old_state, new_state):
    if new_state.is_finished():
        msg = "Task {0} finished in state {1}".format(task, new_state)

        # replace URL with your Slack webhook URL
        requests.post("https://XXXXX", json={"text": msg})

    return new_state

t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])

Here we are responding to state by only sending a notification when the task's state is considered "finished"; this includes Success states as well as Failed states, but does not include states such as Retrying or Scheduled.

Notification failure causes Task failure

We could have raised an error if the POST request returned a non-200 status code. This is fine, but be warned: Prefect considers state handlers an integral part of task execution, and consequently if an error is raised when calling a task's state handlers, the task run will be aborted and the task will be marked "Failed".

Handlers can use Prefect Secrets

Most notification systems will require some form of authentication. Don't despair - state handlers can retrieve Prefect Secrets just like Tasks.

# Responding to State

Most of our examples so far inform the user if and when a task enters a certain state. Prefect State objects contain rich information, and allow us to get more creative than that! Let's revisit our post_to_slack notifier and have it alert us if the task enters a Retrying state, and how long we have to wait for the retry to occur:

import requests
from prefect import Task, Flow

def post_to_slack(task, old_state, new_state):
    if new_state.is_retrying():
        msg = "Task {0} failed and is retrying at {1}".format(task, new_state.start_time)

        # replace URL with your Slack webhook URL
        requests.post("https://XXXXX", json={"text": msg})

    return new_state

t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run() # the notifier is never run

This uses the start_time attribute of Retrying states to alert the user with more useful information.

Think outside the box

Because Prefect allows tasks to return data, we can actually have our state handler respond based on the outputs of the task. Even more interesting, any Prefect State can carry data - this includes Failed states.

Let's implement a task that has a special mode of failure; if this failure mode occurs, we want to be alerted immediately.

from prefect import task, Flow
from prefect.engine import signals

def alert_on_special_failure(task, old_state, new_state):
    if new_state.is_failed():
        if getattr(new_state.result, "flag", False) is True:
            print("Special failure mode!  Send all the alerts!")
            print("a == b == {}".format(new_state.result.value))

    return new_state

def mission_critical_task(a, b):
    if a == b:
        fail_signal = signals.FAIL("a equaled b!")
        fail_signal.flag = True
        fail_signal.value = a
        raise fail_signal
        return 1 / (b - a)

with Flow(name="state-inspection-handler") as flow:
    result = mission_critical_task(1, 1)

# Special failure mode!  Send all the alerts!
# a == b == 1

Note that we can reuse this pattern of attaching information to our FAIL signal across many tasks (and consequently reuse this state handler in other situations).

# Using Multiple handlers

You might have noticed that the state_handlers argument is plural and accepts a list. This is because Prefect allows you to attach as many state handlers to a task as you wish! This pattern is useful for composing state handlers with different use cases (e.g., a special handler for failure and another for retries). It is also useful if there is are certain critical circumstances you want to be alerted for -- you can implement many different and various state handlers to make sure you are alerted ASAP.

State handlers are called in order

If you choose to provide multiple state handlers to a task, note that they will be called in the order in which they are provided.

# Higher level API

Prefect provides tools for creating state handlers from smaller, more modular pieces. In particular, the callback_factory helper utility located in prefect.utilities.notifications allows you to create state handlers from two simpler functions - one that implements an action, and another that performs a check (or filter) that determines when that action should occur. Let's re-implement our post_to_slack retry handler using this utility:

from prefect.utilities.notifications import callback_factory

def send_post(task, state):
    msg = "Task {0} failed and is retrying at {1}".format(task, state.start_time)
    requests.post("https://XXXXX", json={"text": msg})

post_to_slack = callback_factory(send_post, lambda s: s.is_retrying())

You can check that this state handler has identical behavior to the first one we implemented. This factory method allows users to mix and match pieces of logic using a simple API.

An incredibly common check is whether or not the state is Failed. For this reason, Prefect provides an even higher level API for constructing on failure callbacks. In particular, given a function with signature

def f(obj: Union[Task, Flow], state: State) -> None

one can use the on_failure keyword argument to both Tasks and Flows for automatically creating the appropriate state handler.