Learn about task runners for concurrent, parallel or distributed execution of tasks.
Task runners are a simple and consistent interface to concurrency primitives - they are not required for task execution. Calling a task function directly executes the function in the main thread by default, blocking execution of its caller until the task completes.
To enable concurrent, parallel, or distributed execution of tasks, use the .submit()
or .map()
methods to submit tasks to a task runner.
The default task runner in Prefect is the ThreadPoolTaskRunner
,
which runs tasks concurrently in independent threads.
For truly parallel or distributed task execution, you must use one of the following task runners, which are available as extras of the prefect
library:
DaskTaskRunner
can run tasks using dask.distributed
(install prefect[dask]
)RayTaskRunner
can run tasks using Ray (install prefect[ray]
)Concurrency vs. parallelism
To configure a specific task runner, provide a task_runner
keyword argument to the parent flow:
The max_workers
parameter of the ThreadPoolTaskRunner
controls the number of threads that the task runner will use to execute tasks concurrently.
When you use .submit()
to submit a task to a task runner, the task runner creates a
PrefectFuture
for access to the state and
result of the task.
A PrefectFuture
is an object that provides:
State
indicating the current state of the task runPrefectFuture
objects must be resolved explicitly before returning from a flow or task.
Dependencies between futures will be automatically resolved whenever their dependents are resolved.
This means that only terminal futures need to be resolved, either by:
.wait()
or .result()
on each terminal futurewait
or as_completed
utilities to resolve terminal futuresNot doing so may leave your tasks in an unfinished state.
When you pass a future into a task, Prefect automatically waits for the “upstream” task (the one that the future references), to reach a final state before starting the downstream task.
This means that the downstream task won’t receive the PrefectFuture
you passed as an argument.
Instead, the downstream task receives the value that the upstream task returned.
For example:
If we run this, we see that we only had to wait for the final print_result
future as Prefect automatically resolved say_hello
to a string.
You can access the result of a future explicitly with the .result()
method.
The .result()
method waits for the task to complete before returning the result to the caller.
If the task run fails, .result()
will raise the task run’s exception. Disable this behavior
with the raise_on_failure
option:
A few notes on .result()
.result()
is a blocking call.
This means that calling .result()
will block the caller until the task run completes..result()
when you need to interact directly with the return value of your submitted task;
for example, you should use .result()
if passing the return value to a standard Python function (not a
Prefect task) but do not need to use .result()
if you are passing the value to another task (as these futures will be automatically resolved).When choosing how and when to achieve concurrency using task runners, consider the following:
Task granularity: The “proper” size for tasks depends on the nature and complexity of the work you’re doing, e.g. too many small tasks might create overhead - see Writing tasks for more.
Resource constraints: Be aware of environment limitations. Using .map
can create many task instances very quickly, which might exceed your resource availability.
Data transfer: Large data passed between tasks can impact performance. Consider passing references to external storage when dealing with large datasets.
Parallelism: For true parallelism (rather than just concurrency), consider using a specialized task runner like the DaskTaskRunner
or RayTaskRunner
(or propose a new task runner type).
Beware of unsafe global state: Use of concurrency or parallelism features like .submit
and .map
must respect the underlying primitives to avoid unexpected behavior. For example, the default ThreadPoolTaskRunner
runs each task in a separate thread, which means that any global state must be threadsafe. Similarly, DaskTaskRunner
and RayTaskRunner
are multi-process runners that require global state to be picklable.
Learn about task runners for concurrent, parallel or distributed execution of tasks.
Task runners are a simple and consistent interface to concurrency primitives - they are not required for task execution. Calling a task function directly executes the function in the main thread by default, blocking execution of its caller until the task completes.
To enable concurrent, parallel, or distributed execution of tasks, use the .submit()
or .map()
methods to submit tasks to a task runner.
The default task runner in Prefect is the ThreadPoolTaskRunner
,
which runs tasks concurrently in independent threads.
For truly parallel or distributed task execution, you must use one of the following task runners, which are available as extras of the prefect
library:
DaskTaskRunner
can run tasks using dask.distributed
(install prefect[dask]
)RayTaskRunner
can run tasks using Ray (install prefect[ray]
)Concurrency vs. parallelism
To configure a specific task runner, provide a task_runner
keyword argument to the parent flow:
The max_workers
parameter of the ThreadPoolTaskRunner
controls the number of threads that the task runner will use to execute tasks concurrently.
When you use .submit()
to submit a task to a task runner, the task runner creates a
PrefectFuture
for access to the state and
result of the task.
A PrefectFuture
is an object that provides:
State
indicating the current state of the task runPrefectFuture
objects must be resolved explicitly before returning from a flow or task.
Dependencies between futures will be automatically resolved whenever their dependents are resolved.
This means that only terminal futures need to be resolved, either by:
.wait()
or .result()
on each terminal futurewait
or as_completed
utilities to resolve terminal futuresNot doing so may leave your tasks in an unfinished state.
When you pass a future into a task, Prefect automatically waits for the “upstream” task (the one that the future references), to reach a final state before starting the downstream task.
This means that the downstream task won’t receive the PrefectFuture
you passed as an argument.
Instead, the downstream task receives the value that the upstream task returned.
For example:
If we run this, we see that we only had to wait for the final print_result
future as Prefect automatically resolved say_hello
to a string.
You can access the result of a future explicitly with the .result()
method.
The .result()
method waits for the task to complete before returning the result to the caller.
If the task run fails, .result()
will raise the task run’s exception. Disable this behavior
with the raise_on_failure
option:
A few notes on .result()
.result()
is a blocking call.
This means that calling .result()
will block the caller until the task run completes..result()
when you need to interact directly with the return value of your submitted task;
for example, you should use .result()
if passing the return value to a standard Python function (not a
Prefect task) but do not need to use .result()
if you are passing the value to another task (as these futures will be automatically resolved).When choosing how and when to achieve concurrency using task runners, consider the following:
Task granularity: The “proper” size for tasks depends on the nature and complexity of the work you’re doing, e.g. too many small tasks might create overhead - see Writing tasks for more.
Resource constraints: Be aware of environment limitations. Using .map
can create many task instances very quickly, which might exceed your resource availability.
Data transfer: Large data passed between tasks can impact performance. Consider passing references to external storage when dealing with large datasets.
Parallelism: For true parallelism (rather than just concurrency), consider using a specialized task runner like the DaskTaskRunner
or RayTaskRunner
(or propose a new task runner type).
Beware of unsafe global state: Use of concurrency or parallelism features like .submit
and .map
must respect the underlying primitives to avoid unexpected behavior. For example, the default ThreadPoolTaskRunner
runs each task in a separate thread, which means that any global state must be threadsafe. Similarly, DaskTaskRunner
and RayTaskRunner
are multi-process runners that require global state to be picklable.