# Next Steps
In the introduction, we took a look at creating tasks and combining them into flows. Prefect's functional and imperative APIs make that easy, but the result was a fairly vanilla data pipeline. In this section, we're going to explore how Prefect enables advanced mechanisms for overlaying complex business logic on your workflow.
You can also check out our full tutorial on building real-world data applications.
So far, we've dealt exclusively with tasks that obey "normal" data pipeline rules: whenever a task succeeds, its downstream dependencies start running.
It is frequently desirable to build flows in which tasks respond to each other in more complicated ways. A simple example is a "clean up task" that must run, even (or only!) if a prior task failed. Suppose our first task creates a Spark cluster, the second task submits a job to that cluster, and a third task tears down the cluster. In a "normal" pipeline world, if the second task fails the third task won't run -- and that could result in our Spark cluster running forever!
Prefect introduces a concept called
triggers to solve this situation. Before a task runs, its trigger function is called on a set of upstream task states. If the trigger function doesn't pass, the task won't run. Prefect has a variety of built-in triggers, including
all_successful (the default),
any_failed, and even a particularly interesting one called
Let's set up our Spark cluster flow, using obvious pseudocode where appropriate:
from prefect import task, Flow @task def create_cluster(): cluster = create_spark_cluster() return cluster @task def run_spark_job(cluster): submit_job(cluster) @task def tear_down_cluster(cluster): tear_down(cluster) with Flow("Spark") as flow: # define data dependencies cluster = create_cluster() submitted = run_spark_job(cluster) tear_down_cluster(cluster) # wait for the job to finish before tearing down the cluster tear_down_cluster.set_upstream(submitted)
When we run this flow, everything will work perfectly as long as the Spark job runs successfully. However, if the job fails, then the
submitted task will enter a
Failed state. Because the default trigger on the
tear_down_cluster task is
all_successful, it will enter a
TriggerFailed state -- a special variant of
Failed that indicates that the trigger, not a runtime error, was the cause.
To fix this, we simply adjust the trigger when defining the task. In this case we can use the
always_run trigger, which is an alias for
@task(trigger=prefect.triggers.always_run) # one new line def tear_down_cluster(cluster): tear_down(cluster)
All other code remains the same. Now the
tear_down_cluster task will always run, even -- or especially -- when the
submitted task fails.
Tasks never run before upstream tasks finish
Prefect will never run a task before its upstream tasks finish. This is why the
always_run triggers are synonymous.
# Reference Tasks
Previously, we used a trigger to ensure that our
tear_down_cluster task always runs. Let's choose to live in a world in which it always runs successfully. This actually presents a problem, because workflow management systems generally set the state of the entire workflow from the state of the terminal tasks. In this case, our terminal task,
tear_down_cluster, is going to run and succeed even if the job we wanted to run fails!
This illustrates an important issue: workflow management systems, and workflows in general, have semantics that are not necessarily aligned with business logic. In this case, we definitely want to consider the flow to have failed if the job didn't run, but we also want the workflow itself -- including the clean up task -- to succeed!
Prefect introduces a mechanism called "reference tasks" to solve this conundrum. Flows remain in a
Running state until all of their tasks enter
Finished states. At that time, the Flow essentially decides its own final state by applying an
all_successful trigger to its reference tasks. By default, the reference tasks are the terminal tasks, but users can set them to be any tasks they want.
In our case, we make a simple tweak at the end of our script to tell this flow to use the
run_spark_job task as its reference task:
Now the flow's state will be determined by what happens to this job. If it fails, the flow state will fail even after the clean up task succeeds.
Prefect's State system allows users to set up advanced behaviors through triggers and reference tasks. Each task enters a final state based on what happens in its
run() function: if the function finishes normally, the task enters a
Success state; and if the function encounters an error, the task enters a
Failed state. However, Prefect also gives users fine-grained control over a task's state via a mechanism called "signals."
Signals are ways of telling the Prefect engine that a task should be moved immediately into a specific state.
For example, at any time, a task could raise a
SUCCESS signal to succeed before its
run() function would normally finish. Conversely, it could raise a
FAILED signal if the user wanted more careful control about the resulting
There are other, more interesting signals available. For example, raising a
RETRY signal will cause a task to immediately retry itself, and raising a
PAUSE signal will enter a
Paused state until the task is explicitly resumed.
Another useful signal is the
SKIP signal. When a task is skipped, downstream tasks are also skipped unless they specifically set
skip_on_upstream_skip=False. This means that users can set up entire workflow branches that can be skipped if a condition isn't met.
SKIP is treated like SUCCESS
When a task is skipped, it's usually treated as if it ran successfully. This is because tasks only skip if users specifically introduce skipping logic, so the result is compliant with the user's design.
To raise a signal, simply use it inside your task's run() function:
from prefect.engine import signals @task def signal_task(message): if message == 'go!': raise signals.SUCCESS(message='going!') elif message == 'stop!': raise signals.FAIL(message='stopping!') elif message == 'skip!': raise signals.SKIP(message='skipping!')
If the message passed to this task is
"stop!", then the task will fail.