# Mapping

Looking for the latest Prefect 2 release? Prefect 2 and Prefect Cloud 2 have been released for General Availability. See https://docs.prefect.io/ for details.

Prefect introduces a flexible map/reduce model for dynamically executing tasks across an iterable input. This, in turn, gives you the ability to execute mapped tasks in a distributed or parallel manner using an executor like the DaskExecutor.

Classic "map/reduce" is a powerful two-stage programming model that can be used to distribute and parallelize work (the "map" phase) before collecting and processing all the results (the "reduce" phase).

A typical map/reduce setup requires three things:

  • An iterable input
  • A "map" function that operates on a single item at a time
  • A "reduce" function that operates on a group of items at once

For example, we could use map/reduce to take a list of numbers, increment them all by one, and sum the result:

numbers = [1, 2, 3]
map_fn = lambda x: x + 1
reduce_fn = lambda x: sum(x)

mapped_result = [map_fn(n) for n in numbers]
reduced_result = reduce_fn(mapped_result)
assert reduced_result == 9

# Prefect approach

Prefect's version of map/reduce is far more flexible than the classic implementation.

When a task is mapped, Prefect automatically creates a copy of the task for each element of its input data. The copy -- referred to as a "child" task -- is applied only to that element. This means that mapped tasks actually represent the computations of many individual children tasks.

If a "normal" (non-mapped) task depends on a mapped task, Prefect automatically applies a reduce operation to gather the mapped results and pass them to the downstream task.

However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.

Here's how the previous example would look as a Prefect flow:

from prefect import Flow, task

numbers = [1, 2, 3]
map_fn = task(lambda x: x + 1)
reduce_fn = task(lambda x: sum(x))

with Flow('Map Reduce') as flow:
    mapped_result = map_fn.map(numbers)
    reduced_result = reduce_fn(mapped_result)

state = flow.run()
assert state.result[reduced_result].result == 9

Dynamically-generated children tasks are first-class tasks

Even though the user didn't create them explicitly, the children tasks of a mapped task are first-class Prefect tasks. They can do anything a "normal" task can do, including succeed, fail, retry, pause, or skip.

# Simple mapping

The simplest Prefect map takes a tasks and applies it to each element of its inputs.

For example, if we define a task for adding 10 to a number, we can trivially apply that task to each element of a list:

from prefect import Flow, task

@task
def add_ten(x):
    return x + 10

with Flow('simple map') as flow:
    mapped_result = add_ten.map([1, 2, 3])

The result of the mapped_result task will be [11, 12, 13] when the flow is run.

Child task execution

The actual execution of the child tasks which are applied to each element of the list, can be concurrent or parallel. This depends on how the workflow is configured.

# Iterated mapping

Since mapped_result is nothing more than a task with an iterable result, we can immediately use it as the input for another round of mapping:

from prefect import Flow, task

@task
def add_ten(x):
    return x + 10

with Flow('iterated map') as flow:
    mapped_result = add_ten.map([1, 2, 3])
    mapped_result_2 = add_ten.map(mapped_result)

When this flow runs, the result of the mapped_result_2 task will be [21, 22, 23], which is the result of applying the mapped function twice.

No reduce required

Even though we observed that the result of mapped_result was a list, Prefect won't apply a reduce step to gather that list unless the user requires it. In this example, we never needed the entire list (we only needed each of its elements), so no reduce took place. The two mapped tasks generated three completely-independent pipelines, each one containing two tasks.

# Flat-mapping

In general, each layer of an iterated map has the same number of children: if you map over a list of N items, you produce N results. Sometimes, it's useful to produce a sequence of results for each mapped input. For example, you might map over a list of directories to load all the files in each directory, then want to map over each file. Prefect provides a flatten annotation to make this possible. When the input to a map is marked as flatten, the input is assumed to be a list-of-lists and is "un-nested" into a single list prior to applying the map.

Using flatten() is more efficient than adding a reduce step to an otherwise-iterated map, because Prefect will compute the flatmap without gathering all data to a single worker.

from prefect import Flow, task, flatten

@task
def A():
    return [1, 2, 3]

@task
def B(x):
    return list(range(x))

@task
def C(y):
    return y + 100

with Flow('flat map') as f:
    a = A() # [1, 2, 3]
    b = B.map(x=a) # [[0], [0, 1], [0, 1, 2]]
    c = C.map(y=flatten(b)) # [100, 100, 101, 100, 101, 102]

TIP

flatten() can be used on any task input, even if it isn't being mapped over.

# Reduce

Prefect automatically gathers mapped results into a list if they are needed by a non-mapped task. Therefore, all users need to do to "reduce" a mapped result is supply it to a task!

from prefect import Flow, task

@task
def add_ten(x):
    return x + 10

@task
def sum_numbers(y):
    return sum(y)

with Flow('reduce') as flow:
    mapped_result = add_ten.map([1, 2, 3])
    mapped_result_2 = add_ten.map(mapped_result)
    reduced_result = sum_numbers(mapped_result_2)

In this example, sum_numbers received an automatically-reduced list of results from mapped_result. It appropriately computes the sum: 66.

# Filter map output

If the output of one mapped task is used as input to another mapped task, any failed or skipped task will make the subsequent task fail/skip by default. However sometimes, we want to exclude skipped/failed tasks' output or outputs that are None. Prefect provides this functionality with FilterTask().

from prefect import task, Flow, context
from prefect.tasks.control_flow.filter import FilterTask
from prefect.engine.signals import SKIP

filter_results = FilterTask(
    filter_func=lambda x: not isinstance(x, (BaseException, SKIP, type(None)))
)

@task
def unstable_task(arg):
    if arg == 1:
        raise RuntimeError("Fail this task execution")
    if arg == 2:
        raise SKIP("Skip this task execution")
    if arg == 3:
        return None
 
    return arg

@task
def add_one(arg):
    return arg + 1

@task
def log_args(args):
    logger = context.get("logger")
    logger.info(args)

with Flow('filter') as flow:
    raw_out = unstable_task.map([0, 1, 2, 3, 4])
    # raw_out is [0, RuntimeError, SKIP, None, 4] at this point

    filtered = filter_results(raw_out)
    inc_out = add_one.map(filtered)

    log_args(inc_out)
    # [1, 5]

In the example above, raw_out will contain [0, RuntimeError, SKIP, None, 4]. Without filtering, the RuntimeError would mark downstream tasks as TriggerFailed and SKIP would cause downstream tasks to get the final state Skipped while None could make downstream tasks fail due to invalid input. Our filter_results task will filter out everything that evaluates to False according to the filter_func. This way, the flow will log the array [1, 5] at the end.

# Unmapped inputs

When a task is mapped over its inputs, it retains the same call signature and arguments, but iterates over the inputs to generate its children tasks. Sometimes, we don't want to iterate over one of the inputs -- perhaps it's a constant value, or a list that's required in its entirety. Prefect supplies a convenient unmapped() annotation for this case.

from prefect import Flow, task, unmapped

@task
def add(x, y):
    return x + y

with Flow('unmapped inputs') as flow:
    result = add.map(x=[1, 2, 3], y=unmapped(10))

This map will iterate over the x inputs but not over the y input. The result will be [11, 12, 13].

The unmapped annotation can be applied to any number of input arguments. This means that a mapped task can depend on both mapped and reduced upstream tasks seamlessly.

TIP

Prefect also provides a mapped() annotation that can be used to indicate that an input should be mapped over when binding inputs without calling .map()

# Complex mapped pipelines

Sometimes you want to encode a more complex structure in your mapped pipelines

  • for example, adding conditional tasks using prefect.case. This can be done using prefect.apply_map. This takes a function that adds multiple scalar tasks to a flow, and converts those tasks to run as parallel mapped pipelines.

For example, here we create a function that encodes in Prefect tasks the following logic:

  • If x is even, increment it
  • If x is odd, negate it

Note that inc_or_negate is not a task itself - it's a function that creates several tasks. Just as we can map single tasks like inc using inc.map, we can map functions that create multiple tasks using apply_map.

from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge

@task
def inc(x):
    return x + 1

@task
def negate(x):
    return -x

@task
def is_even(x):
    return x % 2 == 0

def inc_or_negate(x):
    cond = is_even(x)
    # If x is even, increment it
    with case(cond, True):
        res1 = inc(x)
    # If x is odd, negate it
    with case(cond, False):
        res2 = negate(x)
    return merge(res1, res2)

with Flow("apply-map example") as flow:
    result = apply_map(inc_or_negate, range(4))

Running the above flow we get four parallel, conditional mapped pipelines. The computed value of result is [1, -1, 3, -3].

Just as with task.map, arguments to apply_map can be wrapped with unmapped, allowing certain arguments to avoid being mapped. While not always necessary, apply_map can be quite useful when you want to create complex mapped pipelines, especially when using conditional logic within them.

# State behavior with mapped tasks

Whenever a mapped task is reduced by a downstream task, Prefect treats its children as the inputs to that task. This means, among other things, that trigger functions will be applied to all of the mapped children, not the mapped parent.

If a reducing task has an all_successful task, but one of the mapped children failed, then the reducing task's trigger will fail. This is the same behavior as if the mapped children had been created manually and passed to the reducing task. Similar behavior will take place for skips.