# Executors


Prefect Executors are responsible for running tasks in a flow. During execution of a flow run, a flow's executor will be initialized, used to execute all tasks in the flow, then shutdown.

Currently, the available executor options are:

  • LocalExecutor: the default, no frills executor. All tasks are executed in a single thread, parallelism is not supported.
  • LocalDaskExecutor: an executor that runs on dask primitives with a using either threads or processes.
  • DaskExecutor: the most feature-rich of the executors, this executor runs on dask.distributed and has support for distributed execution.

Which executor you choose depends on the performance requirements and characteristics of your Flow. See the executors docs for more information.

# Executor

class

prefect.executors.base.Executor

()[source]

Base Executor class that all other executors inherit from.



# LocalExecutor

class

prefect.executors.local.LocalExecutor

()[source]

An executor that runs all functions synchronously and immediately in the main thread. To be used mainly for debugging purposes.



# LocalDaskExecutor

class

prefect.executors.dask.LocalDaskExecutor

(scheduler="threads", **kwargs)[source]

An executor that runs all functions locally using dask and a configurable dask scheduler.

Args:

  • scheduler (str): The local dask scheduler to use; common options are "threads", "processes", and "synchronous". Defaults to "threads".
  • **kwargs (Any): Additional keyword arguments to pass to dask config



# DaskExecutor

class

prefect.executors.dask.DaskExecutor

(address=None, cluster_class=None, cluster_kwargs=None, adapt_kwargs=None, client_kwargs=None, debug=None)[source]

An executor that runs all functions using the dask.distributed scheduler.

By default a temporary distributed.LocalCluster is created (and subsequently torn down) within the start() contextmanager. To use a different cluster class (e.g. dask_kubernetes.KubeCluster), you can specify cluster_class/cluster_kwargs.

Alternatively, if you already have a dask cluster running, you can provide the address of the scheduler via the address kwarg.

Note that if you have tasks with tags of the form "dask-resource:KEY=NUM" they will be parsed and passed as Worker Resources of the form {"KEY": float(NUM)} to the Dask Scheduler.

Args:

  • address (string, optional): address of a currently running dask scheduler; if one is not provided, a temporary cluster will be created in executor.start(). Defaults to None.
  • cluster_class (string or callable, optional): the cluster class to use when creating a temporary dask cluster. Can be either the full class name (e.g. "distributed.LocalCluster"), or the class itself.
  • cluster_kwargs (dict, optional): addtional kwargs to pass to the cluster_class when creating a temporary dask cluster.
  • adapt_kwargs (dict, optional): additional kwargs to pass to cluster.adapt when creating a temporary dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.
  • client_kwargs (dict, optional): additional kwargs to use when creating a dask.distributed.Client.
  • debug (bool, optional): When running with a local cluster, setting debug=True will increase dask's logging level, providing potentially useful debug info. Defaults to the debug value in your Prefect configuration.
Examples:

Using a temporary local dask cluster:

executor = DaskExecutor()

Using a temporary cluster running elsewhere. Any Dask cluster class should work, here we use dask-cloudprovider:

executor = DaskExecutor(
    cluster_class="dask_cloudprovider.FargateCluster",
    cluster_kwargs={
        "image": "prefecthq/prefect:latest",
        "n_workers": 5,
        ...
    },
)

Connecting to an existing dask cluster

executor = DaskExecutor(address="192.0.2.255:8786")



This documentation was auto-generated from commit 59ae220
on April 9, 2021 at 15:46 UTC