Task runners are not required for task execution. Calling a task function directly without a task runner executes the function in the main thread by default, which blocks execution of its flow until the task completes.

To enable concurrent, parallel, or distributed execution of tasks, use the .submit() method to submit a task to a task runner. The default task runner in Prefect is the ThreadPoolTaskRunner, which runs tasks concurrently within a thread pool. For parallel or distributed task execution, you must additionally install one of the following task runners, available as integrations:

Concurrency vs. parallelism

  • Concurrency refers to a system that can do more than one thing simultaneously, but not at the exact same time. Think of concurrent execution as non-blocking: within the restrictions of resources available in the execution environment and data dependencies between tasks, execution of one task does not block execution of other tasks in a flow.
  • Parallelism refers to a system that can do more than one thing at the exact same time. Within the restrictions of resources available, parallel execution can run tasks at the same time, such as for operations mapped across a dataset.

Configure a task runner

To configure your flow to use a specific task runner, provide the runner to the task_runner keyword of the flow decorator.

To submit work to the runner, use the task’s .submit() method. This method returns a PrefectFuture, which is a Prefect object that contains:

  • a reference to the payload returned by the task;
  • and a State, which is a Prefect object indicating the state of the task run.

Prefect future objects must be resolved explicitly before returning from a flow. Dependencies between futures will be automatically resolved whenever their dependents are resolved. This means that only terminal futures need to be resolved, either by:

  • returning the terminal futures from your flow
  • calling .wait() or .result() on each terminal future
  • using one of the top level wait or as_completed utilities to resolve terminal futures

Not doing so may leave your tasks in an unfinished state.

For example:

from prefect import flow, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
import time


@task
def stop_at_floor(floor):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    print(f"elevator stops on floor {floor}")


@flow(task_runner=ThreadPoolTaskRunner(max_workers=3))
def elevator():
    floors = []

    for floor in range(10, 0, -1):
        floors.append(stop_at_floor.submit(floor))

    wait(floors)

Default task runner If you don’t specify a task runner for a flow and you call a task with .submit() within the flow, Prefect uses the default ThreadPoolTaskRunner.

The max_workers parameter of the ThreadPoolTaskRunner controls the number of threads that the task runner will use to execute tasks concurrently.

Access results from submitted tasks

When you use .submit() to submit a task to a task runner, the task runner creates a PrefectFuture for access to the state and result of the task.

A PrefectFuture is an object that provides access to a computation happening in a task runner, even if that computation happens on a remote system.

When you pass a future into a task, Prefect automatically waits for the “upstream” task (the one that the future references), to reach a final state before starting the downstream task.

This means that the downstream task won’t receive the PrefectFuture you passed as an argument. Instead, the downstream task receives the value that the upstream task returned.

For example:

from prefect import flow, task

@task
def say_hello(name):
    return f"Hello {name}!"

@task
def print_result(result):
    print(type(result))
    print(result)

@flow(name="hello-flow")
def hello_world():
    future = say_hello.submit("Marvin")
    print_result.submit(future).wait()

hello_world()

Notice that we only had to wait for the final print_result future as Prefect automatically resolved say_hello to a string.

You can access the result of a future explicitly with the .result() method.

from prefect import flow, task

@task
def my_task():
    return 42

@flow
def my_flow():
    future = my_task.submit()
    result = future.result()
    print(result)

my_flow()

The .result() method waits for the task to complete before returning the result to the caller. If the task run fails, .result() will raise the task run’s exception. Disable this behavior with the raise_on_failure option:

from prefect import flow, task

@task
def my_task():
    return "I'm a task!"


@flow
def my_flow():
    future = my_task.submit()
    result = future.result(raise_on_failure=False)
    if future.state.is_failed():
        # `result` is an exception! handle accordingly
        ...
    else:
        # `result` is the expected return value of our task
        ...

You may also use the wait_for=[] parameter when calling a task by specifying upstream task dependencies. This enables you to control task execution order for tasks that do not share data dependencies.

@task
def task_a():
    pass

@task
def task_b():
    pass

@task
def task_c():
    pass
    
@task
def task_d():
    pass

@flow
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    # Wait for task_a and task_b to complete
    c = task_c.submit(wait_for=[a, b])
    # task_d will wait for task_c to complete
    # Note: If waiting for one task it must still be in a list.
    d = task_d(wait_for=[c])

A few notes on .result()

  • .result() is a blocking call. This means that calling .result() will wait until the task run completes before continuing execution.
  • Only use .result() when you need to interact directly with the return value of your submitted task; for example, you should use .result() if passing the return value to a standard Python function (not a Prefect task) but not if you are passing the value to another task.

Mapping over iterables

Prefect also provides a .map() method that automatically submits a new task run for each element of its input data. This can be useful when submitting a lot of work to a task runner simultaneously.

from prefect import flow, task

@task
def print_nums(nums):
    for n in nums:
        print(n)

@task
def square_num(num):
    return num**2

@flow
def map_flow(nums):
    print_nums(nums)
    squared_nums = square_num.map(nums) 
    print_nums(squared_nums)

map_flow([1,2,3,5,8,13])

Prefect also supports unmapped arguments, allowing you to pass static values that don’t get mapped over.

from prefect import flow, task

@task
def add_together(x, y):
    return x + y

@flow
def sum_it(numbers, static_value):
    futures = add_together.map(numbers, static_value)
    return futures.result()

resulting_sum = sum_it([1, 2, 3], 5)
assert resulting_sum == [6, 7, 8]

Get the results from a list of mapped futures

When using .map as in the above example, the result of the task is a list of futures. You can wait for or retrieve the results from these futures with wait or result methods:

futures = some_task.map(some_iterable)
results = futures.result()

which is shorthand for:

futures = some_task.map(some_iterable)
results = [future.result() for future in futures]

If your static argument is an iterable, wrap it with unmapped to tell Prefect to treat it as a static value.

from prefect import flow, task, unmapped

@task
def sum_plus(x, static_iterable):
    return x + sum(static_iterable)

@flow
def sum_it(numbers, static_iterable):
    futures = sum_plus.map(numbers, unmapped(static_iterable))
    return futures.result()

resulting_sum = sum_it([4, 5, 6], [1, 2, 3])
assert resulting_sum == [10, 11, 12]

Use multiple task runners

Each flow can only have one task runner, but sometimes you may want a subset of your tasks to run using a different task runner than the one configured on the flow. In this case, you can create nested flows for tasks that need to use a different task runner.

For example, you can have a flow (in the example below called multiple_runner_flow) that runs its tasks locally using the ThreadPoolTaskRunner. If you have some tasks that can run more efficiently in parallel on a Dask cluster, you can create a nested flow (such as dask_nested_flow) to run those tasks using the DaskTaskRunner.

from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect_dask.task_runners import DaskTaskRunner
import time


@task
def hello_local(name: str):
    time.sleep(2)
    print(f"Hello {name}!")


@flow # implicitly uses the default task runner, ThreadPoolTaskRunner
def concurrent_nested_flow():
    marvin = hello_local.submit("marvin")
    ford = hello_local.submit("ford")
    marvin.wait(), ford.wait()


@task
def hello_dask():
    print("Hello from Dask!")


@flow(task_runner=DaskTaskRunner())
def dask_nested_flow():
    hello_dask.submit().wait()


@flow # implicitly uses the default task runner, ThreadPoolTaskRunner
def parent_flow():
    concurrent_nested_flow()
    dask_nested_flow()


if __name__ == "__main__":
    parent_flow()