Prefect's execution model is built around two classes,
TaskRunner, which produce and operate on State objects. The actual execution is handled by
Executor classes, which can interface with external environments.
# Flow runners
The flow runner takes a flow and attempts to run all of its tasks. It collects the resulting states and, if possible, returns a final state for the flow.
Flow runners loop over all of the tasks one time. If tasks remain unfinished after that pass -- for example, if one of them needs to be retried -- then a second loop will be required to attempt to finish them. There is no limit to the number of attempts it may take to move all tasks (and therefore the flow itself) into a finished state.
Flows that have parameters may require parameter values (if those parameters have no defaults). Parameter values must be passed to the flow runner when it runs.
# Task runners
The task runner is responsible for executing a single task. It receives the task's initial state as well as any upstream states, and uses these to evaluate an execution pipeline. For example:
- the task must be in a
- the upstream tasks must be
- the task's trigger function must pass
If these conditions (and a few others) are met, the task can move into a
Then, depending on the task, it may either be
run() or it may be mapped, which involves creating dynamic children task runners.
Finally, the task moves through a post-process pipeline that checks to see if it should be retried or cached.
The executor classes are responsible for actually running tasks. For example, the flow runner will submit each task runner to its executor, and wait for the result. We recommend Dask distributed as the preferred execution engine.
Executors have a relatively simple API - users can
submit functions and
wait for their results.
For testing and development, the
LocalExecutor is preferred. It runs every function synchronously in the local process and is the default executor for flows unless otherwise specified.
LocalDaskExecutor is slightly more complex. It still runs functions locally, but uses Dask to parallelize across threads or processes.
DaskExecutor is a completely asynchronous engine that can run functions in a distributed Dask cluster. This is the recommended engine for production.
# Using a Dask Executor
An executor can be provided to a flow at runtime:
from prefect import task, Flow @task def say_hello(): print("Hello, world!") with Flow("Run Me") as flow: h = say_hello() from prefect.executors import DaskExecutor executor = DaskExecutor(address="tcp://localhost:8786") flow.run(executor=executor)
DaskExecutor will connect to a Dask scheduler over the address
tcp://localhost:8786 and begin submitting work to be executed on Dask workers.
If no scheduler
address is specified for the
DaskExecutor than an in-process scheduler will be created and torn down upon completion. See the DaskExecutor API Documentation for more information.
LocalDaskExecutor vs DaskExecutor
The key difference between the
LocalDaskExecutor and the
DaskExecutor is the choice of scheduler. The
LocalDaskExecutor is configurable to use any number of schedulers while the
DaskExecutor uses the distributed scheduler. This means that the
LocalDaskExecutor can help achieve some multithreading / multiprocessing however it does not provide as many distributed features as the