Interface and implementations of the Dask Task Runner.
Task Runners
in Prefect are responsible for managing the execution of Prefect task runs.
Generally speaking, users are not expected to interact with
task runners outside of configuring and initializing them for a flow.Example:
import timefrom prefect import flow, task@taskdef shout(number): time.sleep(0.5) print(f"#{number}")@flowdef count_to(highest_number): for number in range(highest_number): shout.submit(number)if __name__ == "__main__": count_to(10)# outputs#0#1#2#3#4#5#6#7#8#9
Switching to a DaskTaskRunner:
import timefrom prefect import flow, taskfrom prefect_dask import DaskTaskRunner@taskdef shout(number): time.sleep(0.5) print(f"#{number}")@flow(task_runner=DaskTaskRunner)def count_to(highest_number): for number in range(highest_number): shout.submit(number)if __name__ == "__main__": count_to(10)# outputs#3#7#2#6#4#0#1#5#8#9
A parallel task_runner that submits tasks to the dask.distributed scheduler.
By default a temporary distributed.LocalCluster is created (and
subsequently torn down) within the start() contextmanager. To use a
different cluster class (e.g.
dask_kubernetes.KubeCluster), you can
specify cluster_class/cluster_kwargs.Alternatively, if you already have a dask cluster running, you can provide
the cluster object via the cluster kwarg or the address of the scheduler
via the address kwarg.
!!! warning “Multiprocessing safety”
Note that, because the DaskTaskRunner uses multiprocessing, calls to flows
in scripts must be guarded with if __name__ == "__main__": or warnings will
be displayed.Args:
cluster: Currently running dask cluster;
if one is not provider (or specified via address kwarg), a temporary
cluster will be created in DaskTaskRunner.start(). Defaults to None.
address: Address of a currently running dask
scheduler. Defaults to None.
cluster_class: The cluster class to use
when creating a temporary dask cluster. Can be either the full
class name (e.g. "distributed.LocalCluster"), or the class itself.
cluster_kwargs: Additional kwargs to pass to the
cluster_class when creating a temporary dask cluster.
adapt_kwargs: Additional kwargs to pass to cluster.adapt
when creating a temporary dask cluster. Note that adaptive scaling
is only enabled if adapt_kwargs are provided.
Get the Dask client for the task runner.The client is created on first access. If a remote cluster is not
provided, the client will attempt to create/connect to a local cluster.