Manage states
Prefect states contain information about the status of a flow or task run.
States are rich objects that contain information about the status of a particular task run or flow run.
You can learn many things about a task or flow by examining its current state or the history of its states. For example, a state could tell you that a task:
- is scheduled to make a third run attempt in an hour
- succeeded and what data it produced
- was scheduled to run, but later cancelled
- used the cached result of a previous run instead of re-running
- failed because it timed out
Only runs have states: Flows and tasks are templates that describe what a system does; only when we run the system does it also take on a state.
State types
Prefect states have names and types.
A state’s name is often, but not always, synonymous with its type. For example, a task run
that is running for the first time has a state with the name Running and the type RUNNING
. However, if the task retries,
that same task run will have the name Retrying and the type RUNNING
.
The distinction between types and names is subtle: state types are typically used for backing orchestration logic, whereas state names are more for visual display and bookkeeping.
The full list of states and state types includes:
Name | Type | Terminal? | Description |
---|---|---|---|
Scheduled | SCHEDULED | No | The run will begin at a particular time in the future. |
Late | SCHEDULED | No | The run’s scheduled start time has passed, but it has not transitioned to PENDING (15 seconds by default). |
AwaitingRetry | SCHEDULED | No | The run did not complete successfully because of a code issue and had remaining retry attempts. |
Pending | PENDING | No | The run has been submitted to execute, but is waiting on necessary preconditions to be satisfied. |
Running | RUNNING | No | The run code is currently executing. |
Retrying | RUNNING | No | The run code is currently executing after previously not completing successfully. |
Paused | PAUSED | No | The run code has stopped executing until it receives manual approval to proceed. |
Cancelling | CANCELLING | No | The infrastructure on which the code was running is being cleaned up. |
Cancelled | CANCELLED | Yes | The run did not complete because a user determined that it should not. |
Completed | COMPLETED | Yes | The run completed successfully. |
Cached | COMPLETED | Yes | The run result was loaded from a previously cached value. |
RolledBack | COMPLETED | Yes | The run completed successfully but the transaction rolled back and executed rollback hooks. |
Failed | FAILED | Yes | The run did not complete because of a code issue and had no remaining retry attempts. |
Crashed | CRASHED | Yes | The run did not complete because of an infrastructure issue. |
Final state determination
The final state of a flow or task run depends on a number of factors; generally speaking there are three categories of terminal states:
COMPLETED
: a run in anyCOMPLETED
state did not encounter any errors or exceptions and returned successfullyFAILED
: a run in anyFAILED
state encountered an error during execution, such as a raised exceptionCRASHED
: a run in anyCRASHED
state was interrupted by an OS signal such as aKeyboardInterrupt
orSIGTERM
Task return values
A task will be placed into a Completed
state if it returns any Python object, with one exception:
if a task explicitly returns a Prefect Failed
state, the task will be marked Failed
.
from prefect import task, flow
from prefect.states import Completed, Failed
@task
def toggle_task(fail: bool):
if fail:
return Failed(message="I was instructed to fail.")
else:
return Completed(message="I was instructed to succeed.")
@flow
def example():
# this run will be set to a `Failed` state
state_one = toggle_task(fail=True)
# this run will be set to a `Completed` state
state_two = toggle_task(fail=False)
# similarly, the flow run will fail because we return a `Failed` state
return state_one, state_two
You can also access state objects directly within a flow through the return_state
flag:
from prefect import flow, task
@task
def add_one(x):
return x + 1
@flow
def my_flow():
result = add_one(1)
assert isinstance(result, int) and result == 2
state = add_one(1, return_state=True)
assert state.is_completed() is True
assert state.result() == 2
Returning a State
via return_state=True
is useful when you want to conditionally respond to the terminal states of a task or flow. For example, if state.is_failed(): ...
.
Flow return values
The final state of a flow is determined by its return value. The following rules apply:
- If an exception is raised directly in the flow function, the flow run is marked as
FAILED
. - If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
- If a flow returns an iterable of states, the presence of any
FAILED
state will cause the run to be marked asFAILED
.
In any other situation in which the flow returns without error, it will be marked as COMPLETED
.
If you manipulate states programmatically, you can create situations in which tasks within a flow can fail and not cause flow run failure. For example:
from prefect import flow, task
@task
def add_one(x):
return x + 1
@flow
def my_flow():
# avoided raising an exception via `return_state=True`
state = add_one("1", return_state=True)
assert state.is_failed()
# the flow function returns successfully!
return
If state
were returned from the flow function, the run would be marked as FAILED
.
Execute code on state changes
State change hooks execute code in response to client side changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.
A state hook must have the following signature:
def my_hook(obj: Task | Flow, run: TaskRun | FlowRun, state: State) -> None:
...
Both task and flow run hooks can be specified through a keyword argument or through decorator syntax:
from prefect import task, flow
# for type hints only
from prefect import Task
from prefect.context import TaskRun
from prefect.states import State
def first_task_hook(tsk: Task, run: TaskRun, state: State) -> None:
if not state.name == 'Cached':
print('I run anytime this task executes successfully')
else:
print('and can condition my behavior on details of this run')
@task(log_prints=True, on_completion=[first_task_hook])
def nice_task(name: str):
print(f"Hello {name}!")
# alternatively hooks can be specified via decorator
@my_nice_task.on_completion
def second_hook(tsk: Task, run: TaskRun, state: State) -> None:
print('another hook')
State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition, or to use the same state change hook for different transitions:
def my_success_hook(task, task_run, state):
print("Task run succeeded!")
def my_failure_hook(task, task_run, state):
print("Task run failed!")
def my_succeed_or_fail_hook(task, task_run, state):
print("If the task run succeeds or fails, this hook runs.")
@task(
on_completion=[my_success_hook, my_succeed_or_fail_hook],
on_failure=[my_failure_hook, my_succeed_or_fail_hook]
)
Available state change hooks
Type | Flow | Task | Description |
---|---|---|---|
on_completion | ✓ | ✓ | Executes when a flow or task run enters a Completed state. |
on_failure | ✓ | ✓ | Executes when a flow or task run enters a Failed state. |
on_cancellation | ✓ | - | Executes when a flow run enters a Cancelling state. |
on_crashed | ✓ | - | Executes when a flow run enters a Crashed state. |
on_running | ✓ | - | Executes when a flow run enters a Running state. |
Note that the on_rollback
hook for tasks is not a proper state change hook but instead
is a transaction lifecycle hook.
Rollback hooks accept one argument representing the transaction for the task.
Pass kwargs
to state change hooks
You can compose the with_options
method to effectively pass arbitrary **kwargs
to your hooks:
from functools import partial
from prefect import flow, task
data = {}
def my_hook(task, task_run, state, **kwargs):
data.update(state=state, **kwargs)
@task
def bad_task():
raise ValueError("meh")
@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
bad_task_with_a_hook = bad_task.with_options(
on_failure=[partial(my_hook, **dict(x=x, y=y))]
)
# return a tuple of "bar" and the task run state
# to avoid raising the task's exception
return "bar", bad_task_with_a_hook(return_state=True)
_, task_run_state = ok_with_failure_flow()
assert data == {"x": "foo", "y": 42, "state": task_run_state}
Example usage: send a notification when a flow run fails
State change hooks enable you to customize messages sent when tasks transition between states, such as sending notifications containing sensitive information when tasks enter a Failed
state.
Here’s an example of running a client-side hook upon a flow run entering a Failed
state:
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load(
"slack-webhook/my-slack-webhook"
)
slack_webhook_block.notify(
(
f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start: {flow_run.expected_start_time}"
)
)
@flow(on_failure=[notify_slack], retries=1)
def failing_flow():
raise ValueError("oops!")
if __name__ == "__main__":
failing_flow()
Note that retries are configured in this example. This means the on_failure
hook does not run until all retries
have completed when the flow run enters a Failed
state.
Was this page helpful?