Accelerate your workflows by running tasks in parallel with Dask
Dask can run your tasks in parallel and distribute them over multiple machines.
The prefect-dask
integration makes it easy to accelerate your flow runs with Dask.
prefect-dask
The following command will install a version of prefect-dask
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
Upgrade to the latest versions of prefect
and prefect-dask
:
Say your flow downloads many images to train a machine learning model. It takes longer than you’d like for the flow to run because it executes sequentially.
To accelerate your flow code, parallelize it with prefect-dask
in three steps:
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner)
a_task.submit(*args, **kwargs)
Below is code with and without the DaskTaskRunner:
In our tests, the flow run took 15.2 seconds to execute sequentially.
Using the DaskTaskRunner
reduced the runtime to 5.7 seconds!
The DaskTaskRunner
is a task runner that submits tasks to the dask.distributed
scheduler.
By default, when the DaskTaskRunner
is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address
kwarg.
DaskTaskRunner
accepts the following optional parameters:
Parameter | Description |
---|---|
address | Address of a currently running Dask scheduler. |
cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster" ), or the class itself. |
cluster_kwargs | Additional kwargs to pass to the cluster_class when creating a temporary Dask cluster. |
adapt_kwargs | Additional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided. |
client_kwargs | Additional kwargs to use when creating a dask.distributed.Client . |
Multiprocessing safety
Because the DaskTaskRunner
uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__":
or you will encounter warnings and errors.
If you don’t provide the address
of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for n_workers
or threads_per_worker
to cluster_kwargs
:
The DaskTaskRunner
can create a temporary cluster using any of Dask’s cluster-manager options.
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a cluster_class
.
This can be:
"dask_cloudprovider.aws.FargateCluster"
)You can also configure cluster_kwargs
.
This takes a dictionary of keyword arguments to pass to cluster_class
when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster
with four workers running with an image named my-prefect-image
:
For larger workloads, you can accelerate execution further by distributing task runs over multiple machines.
Multiple Prefect flow runs can use the same existing Dask cluster. You might manage a single long-running Dask cluster (for example, using the Dask Helm Chart) and configure flows to connect to it during execution. This has disadvantages compared to using a temporary Dask cluster:
Still, you may prefer managing a single long-running Dask cluster.
To configure a DaskTaskRunner
to connect to an existing cluster, pass in the address of the scheduler to the address
argument:
Suppose you have an existing Dask client/cluster such as a dask.dataframe.DataFrame
.
With prefect-dask
, it takes just a few steps:
task
and flow
decoratorsget_dask_client
context manager to distribute work across Dask workersA key feature of using a DaskTaskRunner
is the ability to scale adaptively to the workload.
Instead of specifying n_workers
as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.
To do this, pass adapt_kwargs
to DaskTaskRunner
.
This takes the following fields:
maximum
(int
or None
, optional): the maximum number of workers to scale to. Set to None
for no maximum.minimum
(int
or None
, optional): the minimum number of workers to scale to. Set to None
for no minimum.For example, this configures a flow to run on a FargateCluster
scaling up to a maximum of 10 workers:
Use Dask annotations to further control the behavior of tasks. For example, set the priority of tasks in the Dask scheduler:
Another common use case is resource annotations:
Refer to the prefect-dask
SDK documentation to explore all the capabilities of the prefect-dask
library.
For assistance using Dask, consult the Dask documentation
Resolving futures in sync client
Note, by default, dask_collection.compute()
returns concrete values while client.compute(dask_collection)
returns Dask Futures. Therefore, if you call client.compute
, you must resolve all futures before exiting out of the context manager by either:
sync=True
result()
For more information, visit the docs on Waiting on Futures.
There is also an equivalent context manager for asynchronous tasks and flows: get_async_dask_client
. When using the async client, you must await client.compute(dask_collection)
before exiting the context manager.
Note that task submission (.submit()
) and future resolution (.result()
) are always synchronous operations in Prefect, even when working with async tasks and flows.
Accelerate your workflows by running tasks in parallel with Dask
Dask can run your tasks in parallel and distribute them over multiple machines.
The prefect-dask
integration makes it easy to accelerate your flow runs with Dask.
prefect-dask
The following command will install a version of prefect-dask
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
Upgrade to the latest versions of prefect
and prefect-dask
:
Say your flow downloads many images to train a machine learning model. It takes longer than you’d like for the flow to run because it executes sequentially.
To accelerate your flow code, parallelize it with prefect-dask
in three steps:
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner)
a_task.submit(*args, **kwargs)
Below is code with and without the DaskTaskRunner:
In our tests, the flow run took 15.2 seconds to execute sequentially.
Using the DaskTaskRunner
reduced the runtime to 5.7 seconds!
The DaskTaskRunner
is a task runner that submits tasks to the dask.distributed
scheduler.
By default, when the DaskTaskRunner
is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address
kwarg.
DaskTaskRunner
accepts the following optional parameters:
Parameter | Description |
---|---|
address | Address of a currently running Dask scheduler. |
cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster" ), or the class itself. |
cluster_kwargs | Additional kwargs to pass to the cluster_class when creating a temporary Dask cluster. |
adapt_kwargs | Additional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided. |
client_kwargs | Additional kwargs to use when creating a dask.distributed.Client . |
Multiprocessing safety
Because the DaskTaskRunner
uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__":
or you will encounter warnings and errors.
If you don’t provide the address
of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for n_workers
or threads_per_worker
to cluster_kwargs
:
The DaskTaskRunner
can create a temporary cluster using any of Dask’s cluster-manager options.
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a cluster_class
.
This can be:
"dask_cloudprovider.aws.FargateCluster"
)You can also configure cluster_kwargs
.
This takes a dictionary of keyword arguments to pass to cluster_class
when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster
with four workers running with an image named my-prefect-image
:
For larger workloads, you can accelerate execution further by distributing task runs over multiple machines.
Multiple Prefect flow runs can use the same existing Dask cluster. You might manage a single long-running Dask cluster (for example, using the Dask Helm Chart) and configure flows to connect to it during execution. This has disadvantages compared to using a temporary Dask cluster:
Still, you may prefer managing a single long-running Dask cluster.
To configure a DaskTaskRunner
to connect to an existing cluster, pass in the address of the scheduler to the address
argument:
Suppose you have an existing Dask client/cluster such as a dask.dataframe.DataFrame
.
With prefect-dask
, it takes just a few steps:
task
and flow
decoratorsget_dask_client
context manager to distribute work across Dask workersA key feature of using a DaskTaskRunner
is the ability to scale adaptively to the workload.
Instead of specifying n_workers
as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.
To do this, pass adapt_kwargs
to DaskTaskRunner
.
This takes the following fields:
maximum
(int
or None
, optional): the maximum number of workers to scale to. Set to None
for no maximum.minimum
(int
or None
, optional): the minimum number of workers to scale to. Set to None
for no minimum.For example, this configures a flow to run on a FargateCluster
scaling up to a maximum of 10 workers:
Use Dask annotations to further control the behavior of tasks. For example, set the priority of tasks in the Dask scheduler:
Another common use case is resource annotations:
Refer to the prefect-dask
SDK documentation to explore all the capabilities of the prefect-dask
library.
For assistance using Dask, consult the Dask documentation
Resolving futures in sync client
Note, by default, dask_collection.compute()
returns concrete values while client.compute(dask_collection)
returns Dask Futures. Therefore, if you call client.compute
, you must resolve all futures before exiting out of the context manager by either:
sync=True
result()
For more information, visit the docs on Waiting on Futures.
There is also an equivalent context manager for asynchronous tasks and flows: get_async_dask_client
. When using the async client, you must await client.compute(dask_collection)
before exiting the context manager.
Note that task submission (.submit()
) and future resolution (.result()
) are always synchronous operations in Prefect, even when working with async tasks and flows.