Skip to content

States

Overview

States are rich objects that contain information about the status of a particular task run or flow run. While you don't need to know the details of the states to use Prefect, you can give your workflows superpowers by taking advantage of it.

At any moment, you can learn anything you need to know about a task or flow by examining its current state or the history of its states. For example, a state could tell you that a task:

  • is scheduled to make a third run attempt in an hour

  • succeeded and what data it produced

  • was scheduled to run, but later cancelled

  • used the cached result of a previous run instead of re-running

  • failed because it timed out

By manipulating a relatively small number of task states, Prefect flows can harness the complexity that emerges in workflows.

Only runs have states

Though we often refer to the "state" of a flow or a task, what we really mean is the state of a flow run or a task run. Flows and tasks are templates that describe what a system does; only when we run the system does it also take on a state. So while we might refer to a task as "running" or being "successful", we really mean that a specific instance of the task is in that state.

State Types

States have names and types. State types are canonical, with specific orchestration rules that apply to transitions into and out of each state type. A state's name, is often, but not always, synonymous with its type. For example, a task run that is running for the first time has a state with the name Running and the type RUNNING. However, if the task retries, that same task run will have the name Retrying and the type RUNNING. Each time the task run transitions into the RUNNING state, the same orchestration rules are applied.

There are terminal state types from which there are no orchestrated transitions to any other state type.

  • COMPLETED
  • CANCELLED
  • FAILED
  • CRASHED

The full complement of states and state types includes:

Name Type Terminal? Description
Scheduled SCHEDULED No The run will begin at a particular time in the future.
Late SCHEDULED No The run's scheduled start time has passed, but it has not transitioned to PENDING (15 seconds by default).
AwaitingRetry SCHEDULED No The run did not complete successfully because of a code issue and had remaining retry attempts.
Pending PENDING No The run has been submitted to run, but is waiting on necessary preconditions to be satisfied.
Running RUNNING No The run code is currently executing.
Retrying RUNNING No The run code is currently executing after previously not complete successfully.
Paused PAUSED No The run code has stopped executing until it receives manual approval to proceed.
Cancelling CANCELLING No The infrastructure on which the code was running is being cleaned up.
Cancelled CANCELLED Yes The run did not complete because a user determined that it should not.
Completed COMPLETED Yes The run completed successfully.
Failed FAILED Yes The run did not complete because of a code issue and had no remaining retry attempts.
Crashed CRASHED Yes The run did not complete because of an infrastructure issue.

Returned values

When calling a task or a flow, there are three types of returned values:

  • Data: A Python object (such as int, str, dict, list, and so on).
  • State: A Prefect object indicating the state of a flow or task run.
  • PrefectFuture: A Prefect object that contains both data and State.

Returning data  is the default behavior any time you call your_task().

Returning Prefect State occurs anytime you call your task or flow with the argument return_state=True.

Returning PrefectFuture is achieved by calling your_task.submit().

Return Data

By default, running a task will return data:

from prefect import flow, task 

@task 
def add_one(x):
    return x + 1

@flow 
def my_flow():
    result = add_one(1) # return int

The same rule applies for a subflow:

@flow 
def subflow():
    return 42 

@flow 
def my_flow():
    result = subflow() # return data

Return Prefect State

To return a State instead, add return_state=True as a parameter of your task call.

@flow 
def my_flow():
    state = add_one(1, return_state=True) # return State

To get data from a State, call .result().

@flow 
def my_flow():
    state = add_one(1, return_state=True) # return State
    result = state.result() # return int

The same rule applies for a subflow:

@flow 
def subflow():
    return 42 

@flow 
def my_flow():
    state = subflow(return_state=True) # return State
    result = state.result() # return int

Return a PrefectFuture

To get a PrefectFuture, add .submit() to your task call.

@flow 
def my_flow():
    future = add_one.submit(1) # return PrefectFuture

To get data from a PrefectFuture, call .result().

@flow 
def my_flow():
    future = add_one.submit(1) # return PrefectFuture
    result = future.result() # return data

To get a State from a PrefectFuture, call .wait().

@flow 
def my_flow():
    future = add_one.submit(1) # return PrefectFuture
    state = future.wait() # return State

Final state determination

The final state of a flow is determined by its return value. The following rules apply:

  • If an exception is raised directly in the flow function, the flow run is marked as FAILED.
  • If the flow does not return a value (or returns None), its state is determined by the states of all of the tasks and subflows within it.
  • If any task run or subflow run failed and none were cancelled, then the final flow run state is marked as FAILED.
  • If any task run or subflow run was cancelled, then the final flow run state is marked as CANCELLED.
  • If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
  • If the flow run returns any other object, then it is marked as successfully completed.

See the Final state determination section of the Flows documentation for further details and examples.

State Change Hooks

State change hooks execute code in response to changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.

A simple example

from prefect import flow

def my_success_hook(flow, flow_run, state):
    print("Flow run succeeded!")

@flow(on_completion=[my_success_hook])
def my_flow():
    return 42

my_flow()

Create and use hooks

Available state change hooks

Type Flow Task Description
on_completion Executes when a flow or task run enters a Completed state.
on_failure Executes when a flow or task run enters a Failed state.
on_cancellation - Executes when a flow run enters a Cancelling state.
on_crashed - Executes when a flow run enters a Crashed state.
on_running - Executes when a flow run enters a Running state.

Create flow run state change hooks

def my_flow_hook(flow: Flow, flow_run: FlowRun, state: State):
    """This is the required signature for a flow run state
    change hook. This hook can only be passed into flows.
    """

# pass hook as a list of callables
@flow(on_completion=[my_flow_hook])

Create task run state change hooks

def my_task_hook(task: Task, task_run: TaskRun, state: State):
    """This is the required signature for a task run state change
    hook. This hook can only be passed into tasks.
    """

# pass hook as a list of callables
@task(on_failure=[my_task_hook])

Use multiple state change hooks

State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition, or to use the same state change hook for different transitions:

def my_success_hook(task, task_run, state):
    print("Task run succeeded!")

def my_failure_hook(task, task_run, state):
    print("Task run failed!")

def my_succeed_or_fail_hook(task, task_run, state):
    print("If the task run succeeds or fails, this hook runs.")

@task(
    on_completion=[my_success_hook, my_succeed_or_fail_hook],
    on_failure=[my_failure_hook, my_succeed_or_fail_hook]
)

Pass kwargs to your hooks

The Prefect engine will call your hooks for you upon the state change, passing in the flow, flow run, and state objects.

However, you can define your hook to have additional default arguments:

from prefect import flow

data = {}

def my_hook(flow, flow_run, state, my_arg="custom_value"):
    data.update(my_arg=my_arg, state=state)

@flow(on_completion=[my_hook])
def lazy_flow():
    pass

state = lazy_flow(return_state=True)

assert data == {"my_arg": "custom_value", "state": state}

... or define your hook to accept arbitrary keyword arguments:

from functools import partial
from prefect import flow, task

data = {}

def my_hook(task, task_run, state, **kwargs):
    data.update(state=state, **kwargs)

@task
def bad_task():
    raise ValueError("meh")

@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
    bad_task_with_a_hook = bad_task.with_options(
        on_failure=[partial(my_hook, **dict(x=x, y=y))]
    )
    # return a tuple of "bar" and the task run state
    # to avoid raising the task's exception
    return "bar", bad_task_with_a_hook(return_state=True)

_, task_run_state = ok_with_failure_flow()

assert data == {"x": "foo", "y": 42, "state": task_run_state}

More examples of state change hooks