.submit() or .map() methods to submit tasks to a task runner.
Available task runners
The default task runner in Prefect is theThreadPoolTaskRunner,
which runs tasks concurrently in independent threads.
For CPU-intensive tasks that benefit from process isolation, use the ProcessPoolTaskRunner,
which runs tasks in separate processes using Python’s multiprocessing module.
For distributed task execution across multiple machines, you can use one of the following task runners, which are available as extras of the prefect library:
DaskTaskRunnercan run tasks usingdask.distributed(installprefect[dask])RayTaskRunnercan run tasks using Ray (installprefect[ray])
Concurrency vs. parallelism
- Concurrency refers to a system that can do more than one thing simultaneously, but not at the exact same time. Think of concurrent execution as non-blocking: within the restrictions of resources available in the execution environment and data dependencies between tasks, execution of one task does not block execution of other tasks in a flow.
- Parallelism refers to a system that can do more than one thing at the exact same time. Within the restrictions of resources available, parallel execution can run tasks at the same time, such as for operations mapped across a dataset.
Configure a task runner
To configure a specific task runner, provide atask_runner keyword argument to the parent flow:
max_workers parameter of the ThreadPoolTaskRunner and ProcessPoolTaskRunner controls the number of threads or processes (respectively) that the task runner will use to execute tasks.
Submit tasks to a task runner
When you use.submit() to submit a task to a task runner, the task runner creates a
PrefectFuture for access to the state and
result of the task.
A PrefectFuture is an object that provides:
- a reference to the result returned by the task
- a
Stateindicating the current state of the task run
PrefectFuture objects must be resolved explicitly before returning from a flow or task.
Dependencies between futures will be automatically resolved whenever their dependents are resolved.
This means that only terminal futures need to be resolved, either by:- returning the terminal futures from your flow or task
- calling
.wait()or.result()on each terminal future - using one of the top level
waitoras_completedutilities to resolve terminal futures
PrefectFuture you passed as an argument.
Instead, the downstream task receives the value that the upstream task returned.
For example:
print_result future as Prefect automatically resolved say_hello to a string.
Access results from submitted tasks
You can access the result of a future explicitly with the.result() method.
.result() method waits for the task to complete before returning the result to the caller.
If the task run fails, .result() will raise the task run’s exception. Disable this behavior
with the raise_on_failure option:
A few notes on
.result().result()is a blocking call. This means that calling.result()will block the caller until the task run completes.- Only use
.result()when you need to interact directly with the return value of your submitted task; for example, you should use.result()if passing the return value to a standard Python function (not a Prefect task) but do not need to use.result()if you are passing the value to another task (as these futures will be automatically resolved).
Design considerations
When choosing how and when to achieve concurrency using task runners, consider the following:- Task granularity: The “proper” size for tasks depends on the nature and complexity of the work you’re doing, e.g. too many small tasks might create overhead - see Writing tasks for more.
-
Resource constraints: Be aware of environment limitations. Using
.mapcan create many task instances very quickly, which might exceed your resource availability. - Data transfer: Large data passed between tasks can impact performance. Consider passing references to external storage when dealing with large datasets.
-
Parallelism: For true parallelism (rather than just concurrency), consider using a specialized task runner like the
DaskTaskRunnerorRayTaskRunner(or propose a new task runner type). -
Beware of unsafe global state: Use of concurrency or parallelism features like
.submitand.mapmust respect the underlying primitives to avoid unexpected behavior. For example, the defaultThreadPoolTaskRunnerruns each task in a separate thread, which means that any global state must be threadsafe. Similarly,DaskTaskRunnerandRayTaskRunnerare multi-process runners that require global state to be picklable.