Accelerate your workflows by running tasks in parallel with Dask
Dask can run your tasks in parallel and distribute them over multiple machines.
The prefect-dask integration makes it easy to accelerate your flow runs with Dask.
The following command will install a version of prefect-dask compatible with your installed version of prefect.
If you don’t already have prefect installed, it will install the newest version of prefect as well.
pip install "prefect[dask]"
Upgrade to the latest versions of prefect and prefect-dask:
Say your flow downloads many images to train a machine learning model.
It takes longer than you’d like for the flow to run because it executes sequentially.
To accelerate your flow code, parallelize it with prefect-dask in three steps:
Add the import: from prefect_dask import DaskTaskRunner
Specify the task runner in the flow decorator: @flow(task_runner=DaskTaskRunner)
Submit tasks to the flow’s task runner: a_task.submit(*args, **kwargs)
Below is code with and without the DaskTaskRunner:
# Completed in 15.2 secondsfrom typing import Listfrom pathlib import Pathimport httpxfrom prefect import flow, taskURL_FORMAT = ( "https://www.cpc.ncep.noaa.gov/products/NMME/archive/" "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png")@taskdef download_image(year: int, month: int, directory: Path) -> Path: # download image from URL url = URL_FORMAT.format(year=year, month=month) resp = httpx.get(url) # save content to directory/YYYYMM.png file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}") file_path.write_bytes(resp.content) return file_path@flowdef download_nino_34_plumes_from_year(year: int) -> List[Path]: # create a directory to hold images directory = Path("data") directory.mkdir(exist_ok=True) # download all images file_paths = [] for month in range(1, 12 + 1): file_path = download_image(year, month, directory) file_paths.append(file_path) return file_pathsif __name__ == "__main__": download_nino_34_plumes_from_year(2022)
# Completed in 15.2 secondsfrom typing import Listfrom pathlib import Pathimport httpxfrom prefect import flow, taskURL_FORMAT = ( "https://www.cpc.ncep.noaa.gov/products/NMME/archive/" "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png")@taskdef download_image(year: int, month: int, directory: Path) -> Path: # download image from URL url = URL_FORMAT.format(year=year, month=month) resp = httpx.get(url) # save content to directory/YYYYMM.png file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}") file_path.write_bytes(resp.content) return file_path@flowdef download_nino_34_plumes_from_year(year: int) -> List[Path]: # create a directory to hold images directory = Path("data") directory.mkdir(exist_ok=True) # download all images file_paths = [] for month in range(1, 12 + 1): file_path = download_image(year, month, directory) file_paths.append(file_path) return file_pathsif __name__ == "__main__": download_nino_34_plumes_from_year(2022)
# Completed in 5.7 secondsfrom typing import Listfrom pathlib import Pathimport httpxfrom prefect import flow, taskfrom prefect_dask import DaskTaskRunnerURL_FORMAT = ( "https://www.cpc.ncep.noaa.gov/products/NMME/archive/" "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png")@taskdef download_image(year: int, month: int, directory: Path) -> Path: # download image from URL url = URL_FORMAT.format(year=year, month=month) resp = httpx.get(url) # save content to directory/YYYYMM.png file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}") file_path.write_bytes(resp.content) return file_path@flow(task_runner=DaskTaskRunner(cluster_kwargs={"processes": False}))def download_nino_34_plumes_from_year(year: int) -> List[Path]: # create a directory to hold images directory = Path("data") directory.mkdir(exist_ok=True) # download all images file_paths = [] for month in range(1, 12 + 1): file_path = download_image.submit(year, month, directory) file_paths.append(file_path) return file_pathsif __name__ == "__main__": download_nino_34_plumes_from_year(2022)
In our tests, the flow run took 15.2 seconds to execute sequentially.
Using the DaskTaskRunner reduced the runtime to 5.7 seconds!
The DaskTaskRunner is a task runner that submits tasks to the dask.distributed scheduler.
By default, when the DaskTaskRunner is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address kwarg.
DaskTaskRunner accepts the following optional parameters:
Parameter
Description
address
Address of a currently running Dask scheduler.
cluster_class
The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster"), or the class itself.
cluster_kwargs
Additional kwargs to pass to the cluster_class when creating a temporary Dask cluster.
adapt_kwargs
Additional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.
Because the DaskTaskRunner uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__": or you will encounter warnings and errors.
If you don’t provide the address of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for n_workers or threads_per_worker to cluster_kwargs:
from prefect_dask import DaskTaskRunner# Use 4 worker processes, each with 2 threadsDaskTaskRunner( cluster_kwargs={"n_workers": 4, "threads_per_worker": 2})
The DaskTaskRunner can create a temporary cluster using any of Dask’s cluster-manager options.
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a cluster_class.
This can be:
A string specifying the import path to the cluster class (for example, "dask_cloudprovider.aws.FargateCluster")
The cluster class itself
A function for creating a custom cluster
You can also configure cluster_kwargs.
This takes a dictionary of keyword arguments to pass to cluster_class when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster with four workers running with an image named my-prefect-image:
from prefect_dask import DaskTaskRunnerDaskTaskRunner( cluster_class="dask_cloudprovider.aws.FargateCluster", cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},)
For larger workloads, you can accelerate execution further by distributing task runs over multiple machines.
Multiple Prefect flow runs can use the same existing Dask cluster.
You might manage a single long-running Dask cluster (for example, using the Dask Helm Chart) and configure flows to connect to it during execution.
This has disadvantages compared to using a temporary Dask cluster:
All workers in the cluster must have dependencies installed for all flows you intend to run.
Multiple flow runs may compete for resources. Dask tries to do a good job
sharing resources between tasks, but you may still run into issues.
Still, you may prefer managing a single long-running Dask cluster.
To configure a DaskTaskRunner to connect to an existing cluster, pass in the address of the scheduler to the address argument:
from prefect_dask import DaskTaskRunner@flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster"))def my_flow(): ...
Suppose you have an existing Dask client/cluster such as a dask.dataframe.DataFrame.
With prefect-dask, it takes just a few steps:
Add imports
Add task and flow decorators
Use get_dask_client context manager to distribute work across Dask workers
Specify the task runner and client’s address in the flow decorator
A key feature of using a DaskTaskRunner is the ability to scale adaptively to the workload.
Instead of specifying n_workers as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.
To do this, pass adapt_kwargs to DaskTaskRunner.
This takes the following fields:
maximum (int or None, optional): the maximum number of workers to scale to. Set to None for no maximum.
minimum (int or None, optional): the minimum number of workers to scale to. Set to None for no minimum.
For example, this configures a flow to run on a FargateCluster scaling up to a maximum of 10 workers:
from prefect_dask import DaskTaskRunnerDaskTaskRunner( cluster_class="dask_cloudprovider.aws.FargateCluster", adapt_kwargs={"maximum": 10})
import daskfrom prefect import flow, taskfrom prefect_dask.task_runners import DaskTaskRunner@taskdef show(x): print(x)# Create a `LocalCluster` with some resource annotations# Annotations are abstract in dask and not inferred from your system.# Here, we claim that our system has 1 GPU and 1 process available per worker@flow( task_runner=DaskTaskRunner( cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}} ))def my_flow(): with dask.annotate(resources={'GPU': 1}): future = show(0) # this task requires 1 GPU resource on a worker with dask.annotate(resources={'process': 1}): # These tasks each require 1 process on a worker; because we've # specified that our cluster has 1 process per worker and 1 worker, # these tasks will run sequentially future = show(1) future = show(2) future = show(3)if __name__ == "__main__": my_flow()
Note, by default, dask_collection.compute() returns concrete values while client.compute(dask_collection) returns Dask Futures. Therefore, if you call client.compute, you must resolve all futures before exiting out of the context manager by either:
setting sync=True
with get_dask_client() as client: df = dask.datasets.timeseries("2000", "2001", partition_freq="4w") summary_df = client.compute(df.describe(), sync=True)
calling result()
with get_dask_client() as client: df = dask.datasets.timeseries("2000", "2001", partition_freq="4w") summary_df = client.compute(df.describe()).result()
There is also an equivalent context manager for asynchronous tasks and flows: get_async_dask_client. When using the async client, you must await client.compute(dask_collection) before exiting the context manager.
Note that task submission (.submit()) and future resolution (.result()) are always synchronous operations in Prefect, even when working with async tasks and flows.