prefect-dask
Accelerate your workflows by running tasks in parallel with Dask
Dask can run your tasks in parallel and distribute them over multiple machines.
The prefect-dask
integration makes it easy to accelerate your flow runs with Dask.
Getting started
Install prefect-dask
The following command will install a version of prefect-dask
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
pip install "prefect[dask]"
Upgrade to the latest versions of prefect
and prefect-dask
:
pip install -U "prefect[dask]"
Why use Dask?
Say your flow downloads many images to train a machine learning model. It takes longer than you’d like for the flow to run because it executes sequentially.
To accelerate your flow code, parallelize it with prefect-dask
in three steps:
- Add the import:
from prefect_dask import DaskTaskRunner
- Specify the task runner in the flow decorator:
@flow(task_runner=DaskTaskRunner)
- Submit tasks to the flow’s task runner:
a_task.submit(*args, **kwargs)
Below is code with and without the DaskTaskRunner:
# Completed in 15.2 seconds
from typing import List
from pathlib import Path
import httpx
from prefect import flow, task
URL_FORMAT = (
"https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
"{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)
@task
def download_image(year: int, month: int, directory: Path) -> Path:
# download image from URL
url = URL_FORMAT.format(year=year, month=month)
resp = httpx.get(url)
# save content to directory/YYYYMM.png
file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
file_path.write_bytes(resp.content)
return file_path
@flow
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
# create a directory to hold images
directory = Path("data")
directory.mkdir(exist_ok=True)
# download all images
file_paths = []
for month in range(1, 12 + 1):
file_path = download_image(year, month, directory)
file_paths.append(file_path)
return file_paths
if __name__ == "__main__":
download_nino_34_plumes_from_year(2022)
In our tests, the flow run took 15.2 seconds to execute sequentially.
Using the DaskTaskRunner
reduced the runtime to 5.7 seconds!
Run tasks on Dask
The DaskTaskRunner
is a task runner that submits tasks to the dask.distributed
scheduler.
By default, when the DaskTaskRunner
is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address
kwarg.
DaskTaskRunner
accepts the following optional parameters:
Parameter | Description |
---|---|
address | Address of a currently running Dask scheduler. |
cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster" ), or the class itself. |
cluster_kwargs | Additional kwargs to pass to the cluster_class when creating a temporary Dask cluster. |
adapt_kwargs | Additional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided. |
client_kwargs | Additional kwargs to use when creating a dask.distributed.Client . |
Multiprocessing safety
Because the DaskTaskRunner
uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__":
or you will encounter warnings and errors.
If you don’t provide the address
of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for n_workers
or threads_per_worker
to cluster_kwargs
:
from prefect_dask import DaskTaskRunner
# Use 4 worker processes, each with 2 threads
DaskTaskRunner(
cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
)
Use a temporary cluster
The DaskTaskRunner
can create a temporary cluster using any of Dask’s cluster-manager options.
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a cluster_class
.
This can be:
- A string specifying the import path to the cluster class (for example,
"dask_cloudprovider.aws.FargateCluster"
) - The cluster class itself
- A function for creating a custom cluster
You can also configure cluster_kwargs
.
This takes a dictionary of keyword arguments to pass to cluster_class
when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster
with four workers running with an image named my-prefect-image
:
from prefect_dask import DaskTaskRunner
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)
For larger workloads, you can accelerate execution further by distributing task runs over multiple machines.
Connect to an existing cluster
Multiple Prefect flow runs can use the same existing Dask cluster. You might manage a single long-running Dask cluster (for example, using the Dask Helm Chart) and configure flows to connect to it during execution. This has disadvantages compared to using a temporary Dask cluster:
- All workers in the cluster must have dependencies installed for all flows you intend to run.
- Multiple flow runs may compete for resources. Dask tries to do a good job sharing resources between tasks, but you may still run into issues.
Still, you may prefer managing a single long-running Dask cluster.
To configure a DaskTaskRunner
to connect to an existing cluster, pass in the address of the scheduler to the address
argument:
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster"))
def my_flow():
...
Suppose you have an existing Dask client/cluster such as a dask.dataframe.DataFrame
.
With prefect-dask
, it takes just a few steps:
- Add imports
- Add
task
andflow
decorators - Use
get_dask_client
context manager to distribute work across Dask workers - Specify the task runner and client’s address in the flow decorator
- Submit the tasks to the flow’s task runner
import dask.dataframe
import dask.distributed
client = dask.distributed.Client()
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
df = dask.datasets.timeseries(start, end, partition_freq="4w")
return df
def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:
df_yearly_avg = df.groupby(df.index.year).mean()
return df_yearly_avg.compute()
def dask_pipeline():
df = read_data("1988", "2022")
df_yearly_average = process_data(df)
return df_yearly_average
if __name__ == "__main__":
dask_pipeline()
Configure adaptive scaling
A key feature of using a DaskTaskRunner
is the ability to scale adaptively to the workload.
Instead of specifying n_workers
as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.
To do this, pass adapt_kwargs
to DaskTaskRunner
.
This takes the following fields:
maximum
(int
orNone
, optional): the maximum number of workers to scale to. Set toNone
for no maximum.minimum
(int
orNone
, optional): the minimum number of workers to scale to. Set toNone
for no minimum.
For example, this configures a flow to run on a FargateCluster
scaling up to a maximum of 10 workers:
from prefect_dask import DaskTaskRunner
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
adapt_kwargs={"maximum": 10}
)
Use Dask annotations
Use Dask annotations to further control the behavior of tasks. For example, set the priority of tasks in the Dask scheduler:
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def show(x):
print(x)
@flow(task_runner=DaskTaskRunner())
def my_flow():
with dask.annotate(priority=-10):
future = show.submit(1) # low priority task
with dask.annotate(priority=10):
future = show.submit(2) # high priority task
Another common use case is resource annotations:
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def show(x):
print(x)
# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
)
)
def my_flow():
with dask.annotate(resources={'GPU': 1}):
future = show(0) # this task requires 1 GPU resource on a worker
with dask.annotate(resources={'process': 1}):
# These tasks each require 1 process on a worker; because we've
# specified that our cluster has 1 process per worker and 1 worker,
# these tasks will run sequentially
future = show(1)
future = show(2)
future = show(3)
if __name__ == "__main__":
my_flow()
Additional Resources
Refer to the prefect-dask
SDK documentation linked in the sidebar to explore all the capabilities of the prefect-dask
library.
For assistance using Dask, consult the Dask documentation
Was this page helpful?