Skip to main content

prefect_dask.task_runners

Interface and implementations of the Dask Task Runner. Task Runners in Prefect are responsible for managing the execution of Prefect task runs. Generally speaking, users are not expected to interact with task runners outside of configuring and initializing them for a flow. Example:
import time

from prefect import flow, task

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9
Switching to a DaskTaskRunner:
import time

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=DaskTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9

Classes

PrefectDaskFuture

A Prefect future that wraps a distributed.Future. This future is used when the task run is submitted to a DaskTaskRunner. Methods:

result

result(self, timeout: Optional[float] = None, raise_on_failure: bool = True) -> R

wait

wait(self, timeout: Optional[float] = None) -> None

DaskTaskRunner

A parallel task_runner that submits tasks to the dask.distributed scheduler. By default a temporary distributed.LocalCluster is created (and subsequently torn down) within the start() contextmanager. To use a different cluster class (e.g. dask_kubernetes.KubeCluster), you can specify cluster_class/cluster_kwargs. Alternatively, if you already have a dask cluster running, you can provide the cluster object via the cluster kwarg or the address of the scheduler via the address kwarg. !!! warning “Multiprocessing safety” Note that, because the DaskTaskRunner uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__": or warnings will be displayed. Args:
  • cluster: Currently running dask cluster; if one is not provider (or specified via address kwarg), a temporary cluster will be created in DaskTaskRunner.start(). Defaults to None.
  • address: Address of a currently running dask scheduler. Defaults to None.
  • cluster_class: The cluster class to use when creating a temporary dask cluster. Can be either the full class name (e.g. "distributed.LocalCluster"), or the class itself.
  • cluster_kwargs: Additional kwargs to pass to the cluster_class when creating a temporary dask cluster.
  • adapt_kwargs: Additional kwargs to pass to cluster.adapt when creating a temporary dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.
  • client_kwargs: Additional kwargs to use when creating a dask.distributed.Client.
  • performance_report_path: Path where the Dask performance report will be saved. If not provided, no performance report will be generated.
Examples: Using a temporary local dask cluster:
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner

@flow(task_runner=DaskTaskRunner)
def my_flow():
    ...
Using a temporary cluster running elsewhere. Any Dask cluster class should work, here we use dask-cloudprovider:
DaskTaskRunner(
    cluster_class="dask_cloudprovider.FargateCluster",
    cluster_kwargs={
        "image": "prefecthq/prefect:latest",
        "n_workers": 5,
    },
)
Connecting to an existing dask cluster:
DaskTaskRunner(address="192.0.2.255:8786")
Methods:

client

client(self) -> PrefectDaskClient
Get the Dask client for the task runner. The client is created on first access. If a remote cluster is not provided, the client will attempt to create/connect to a local cluster.

duplicate

duplicate(self)
Create a new instance of the task runner with the same settings.

map

map(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDaskFuture[R]]

map

map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDaskFuture[R]]

map

map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None)

submit

submit(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]

submit

submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]

submit

submit(self, task: 'Union[Task[P, R], Task[P, Coroutine[Any, Any, R]]]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]