Accelerate your workflows by running tasks in parallel with Dask
prefect-dask
integration makes it easy to accelerate your flow runs with Dask.
prefect-dask
prefect-dask
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
prefect
and prefect-dask
:
prefect-dask
in three steps:
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner)
a_task.submit(*args, **kwargs)
DaskTaskRunner
reduced the runtime to 5.7 seconds!
DaskTaskRunner
is a task runner that submits tasks to the dask.distributed
scheduler.
By default, when the DaskTaskRunner
is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address
kwarg.
DaskTaskRunner
accepts the following optional parameters:
Parameter | Description |
---|---|
address | Address of a currently running Dask scheduler. |
cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster" ), or the class itself. |
cluster_kwargs | Additional kwargs to pass to the cluster_class when creating a temporary Dask cluster. |
adapt_kwargs | Additional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided. |
client_kwargs | Additional kwargs to use when creating a dask.distributed.Client . |
DaskTaskRunner
uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__":
or you will encounter warnings and errors.address
of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for n_workers
or threads_per_worker
to cluster_kwargs
:
DaskTaskRunner
can create a temporary cluster using any of Dask’s cluster-manager options.
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a cluster_class
.
This can be:
"dask_cloudprovider.aws.FargateCluster"
)cluster_kwargs
.
This takes a dictionary of keyword arguments to pass to cluster_class
when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster
with four workers running with an image named my-prefect-image
:
DaskTaskRunner
to connect to an existing cluster, pass in the address of the scheduler to the address
argument:
dask.dataframe.DataFrame
.
With prefect-dask
, it takes just a few steps:
task
and flow
decoratorsget_dask_client
context manager to distribute work across Dask workersDaskTaskRunner
is the ability to scale adaptively to the workload.
Instead of specifying n_workers
as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.
To do this, pass adapt_kwargs
to DaskTaskRunner
.
This takes the following fields:
maximum
(int
or None
, optional): the maximum number of workers to scale to. Set to None
for no maximum.minimum
(int
or None
, optional): the minimum number of workers to scale to. Set to None
for no minimum.FargateCluster
scaling up to a maximum of 10 workers:
prefect-dask
SDK documentation to explore all the capabilities of the prefect-dask
library.
For assistance using Dask, consult the Dask documentation
dask_collection.compute()
returns concrete values while client.compute(dask_collection)
returns Dask Futures. Therefore, if you call client.compute
, you must resolve all futures before exiting out of the context manager by either:sync=True
result()
get_async_dask_client
. When using the async client, you must await client.compute(dask_collection)
before exiting the context manager.
Note that task submission (.submit()
) and future resolution (.result()
) are always synchronous operations in Prefect, even when working with async tasks and flows.