Caching refers to the ability of a task run to enter a Completed state and return a predetermined value without actually running the code that defines the task. Caching allows you to efficiently reuse results of tasks that may be expensive to compute and ensure that your pipelines are idempotent when retrying them due to unexpected failure.

By default Prefect’s caching logic is based on the following attributes of a task invocation:

  • the inputs provided to the task
  • the code definition of the task
  • the prevailing flow run ID, or if executed autonomously, the prevailing task run ID

These values are hashed to compute the task’s cache key. This implies that, by default, calling the same task with the same inputs more than once within a flow will result in cached behavior for all calls after the first. This behavior can be configured - see customizing the cache below.

Caching requires result persistence

Caching requires result persistence, which is off by default. To turn on result persistence for all of your tasks use the PREFECT_RESULTS_PERSIST_BY_DEFAULT setting:

prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true

See managing results for more details on managing your result configuration, and settings for more details on managing Prefect settings.

Cache keys

To determine whether a task run should retrieve a cached state, Prefect uses the concept of a “cache key”. A cache key is a computed string value that determines where the task’s return value will be persisted within its configured result storage. When a task run begins, Prefect first computes its cache key and uses this key to lookup a record in the task’s result storage. If an unexpired record is found, this result is returned and the task does not run, but instead, enters a Cached state with the corresponding result value.

Cache keys can be shared by the same task across different flows, and even among different tasks, so long as they all share a common result storage location.

By default Prefect stores results locally in ~/.prefect/storage/. The filenames in this directory will correspond exactly to computed cache keys from your task runs.

Relationship with result persistence

Task caching and result persistence are intimately related. Because task caching relies on loading a known result, task caching will only work when your task can persist its output to a fixed and known location.

Therefore any configuration which explicitly avoids result persistence will result in your task never using a cache, for example setting persist_result=False.

Cache policies

Cache key computation can be configured through the use of cache policies. A cache policy is a recipe for computing cache keys for a given task.

Prefect comes prepackaged with a few common cache policies:

  • DEFAULT: this cache policy uses the task’s inputs, its code definition, as well as the prevailing flow run ID to compute the task’s cache key.
  • INPUTS: this cache policy uses only the task’s inputs to compute the cache key.
  • TASK_SOURCE: this cache policy uses only the task’s code definition to compute the cache key.
  • FLOW_PARAMETERS: this cache policy uses only the parameter values provided to the parent flow run to compute the cache key.
  • NONE: this cache policy always returns None and therefore avoids caching and result persistence altogether.

These policies can be set using the cache_policy keyword on the task decorator:

from prefect import task
from prefect.cache_policies import TASK_SOURCE

import time


@task(cache_policy=TASK_SOURCE)
def my_stateful_task():
    print('sleeping')
    time.sleep(10)
    return 42

my_stateful_task() # sleeps
my_stateful_task() # does not sleep

No matter how many flows call it, this task will run once and only once until its underlying code is altered:

@task(cache_policy=TASK_SOURCE)
def my_stateful_task():
    print('sleeping')
    time.sleep(10)

    # change the return value, for example
    return 43 

my_stateful_task() # sleeps again

Customizing the cache

Prefect allows you to configure task caching behavior in numerous ways.

Cache expiration

All cache keys can optionally be given an expiration through the cache_expiration keyword on the task decorator. This keyword accepts a datetime.timedelta specifying a duration for which the cached value should be considered valid.

Providing an expiration value results in Prefect persisting an expiration timestamp alongside the result record for the task. This expiration is then applied to all other tasks that may share this cache key.

Cache policies

Cache policies can be composed and altered using basic Python syntax to form more complex policies. For example, all task policies except for NONE can be added together to form new policies that combine the individual policies’ logic into a larger cache key computation. Combining policies in this way results in caches that are easier to invalidate.

For example:

from prefect import task
from prefect.cache_policies import TASK_SOURCE, INPUTS
@task(cache_policy=TASK_SOURCE + INPUTS)
def my_cached_task(x: int):
    return x + 42

This task will rerun anytime you provide new values for x, or anytime you change the underlying code.

The INPUTS policy is a special policy that allows you to subtract string values to ignore certain task inputs:

from prefect import task
from prefect.cache_policies import INPUTS


my_custom_policy = INPUTS - 'debug'

@task(cache_policy=my_custom_policy)
def my_cached_task(x: int, debug: bool = False):
    print('running...')
    return x + 42


my_cached_task(1)
my_cached_task(1, debug=True) # still uses the cache

Cache key functions

You can configure custom cache policy logic through the use of cache key functions. A cache key function is a function that accepts two positional arguments:

  • The first argument corresponds to the TaskRunContext, which stores task run metadata. For example, this object has attributes task_run_id, flow_run_id, and task, all of which can be used in your custom logic.
  • The second argument corresponds to a dictionary of input values to the task. For example, if your task has the signature fn(x, y, z) then the dictionary will have keys “x”, “y”, and “z” with corresponding values that can be used to compute your cache key.

This function can then be specified using the cache_key_fn argument on the task decorator.

For example:

def static_cache_key(context, parameters):
    # return a constant
    return "static cache key"


@task(cache_key_fn=static_cache_key)
def my_cached_task(x: int):
    return x + 1

Cache storage

By default, cache records are collocated with task results and files containing task results will include metadata used for caching. Configuring a cache policy with a key_storage argument allows cache records to be stored separately from task results.

When cache key storage is configured, persisted task results will only include the return value of your task and cache records can be deleted or modified without effecting your task results.

You can configure where cache records are stored by using the .configure method with a key_storage argument on a cache policy. The key_storage argument accepts either a path to a local directory or a storage block.

For example:

from prefect import task
from prefect.cache_policies import TASK_SOURCE, INPUTS

cache_policy = (TASK_SOURCE + INPUTS).configure(key_storage="/path/to/cache/storage")

@task(cache_policy=cache_policy)
def my_cached_task(x: int):
    return x + 42

This task will store cache records in the specified directory.

To store cache records in a remote object store such as S3, pass a storage block instead:

from prefect import task
from prefect.cache_policies import TASK_SOURCE, INPUTS

from prefect_aws import S3Bucket

cache_policy = (TASK_SOURCE + INPUTS).configure(key_storage=S3Bucket.load("my-bucket"))

@task(cache_policy=cache_policy)
def my_cached_task(x: int):
    return x + 42

Cache isolation

Cache isolation controls how concurrent task runs interact with cache records. Prefect supports two isolation levels: READ_COMMITTED and SERIALIZABLE.

By default, cache records operate with a READ_COMMITTED isolation level. This guarantees that reading a cache record will see the latest committed cache value, but allows multiple executions of the same task to occur simultaneously.

Consider the following example:

from prefect import task
from prefect.cache_policies import INPUTS
import threading


cache_policy = INPUTS

@task(cache_policy=cache_policy)
def my_task_version_1(x: int):
    print("my_task_version_1 running")
    return x + 42

@task(cache_policy=cache_policy)
def my_task_version_2(x: int):
    print("my_task_version_2 running")
    return x + 43

if __name__ == "__main__":
    thread_1 = threading.Thread(target=my_task_version_1, args=(1,))
    thread_2 = threading.Thread(target=my_task_version_2, args=(1,))

    thread_1.start()
    thread_2.start()

    thread_1.join()
    thread_2.join()

When running this script, both tasks will execute in parallel and perform work despite both tasks using the same cache key.

This is evidenced by seeing both my_task_version_1 running and my_task_version_2 running in the output:

11:27:21.031 | INFO    | Task run 'my_task_version_2' - Created task run 'my_task_version_2' for task 'my_task_version_2'
11:27:21.032 | INFO    | Task run 'my_task_version_1' - Created task run 'my_task_version_1' for task 'my_task_version_1'
my_task_version_2 running
my_task_version_1 running
11:27:21.050 | INFO    | Task run 'my_task_version_2' - Finished in state Completed()
11:27:21.051 | INFO    | Task run 'my_task_version_1' - Finished in state Completed()

For stricter isolation, you can use the SERIALIZABLE isolation level. This ensures that only one execution of a task occurs at a time for a given cache record via a locking mechanism.

To configure the isolation level, use the .configure method with an isolation_level argument on a cache policy. When using SERIALIZABLE, you must also provide a lock_manager that implements locking logic for your system.

Here’s an updated version of the previous example that uses SERIALIZABLE isolation:

import threading

from prefect import task
from prefect.cache_policies import INPUTS
from prefect.locking.memory import MemoryLockManager
from prefect.transactions import IsolationLevel

cache_policy = INPUTS.configure(
    isolation_level=IsolationLevel.SERIALIZABLE,
    lock_manager=MemoryLockManager(),
)


@task(cache_policy=cache_policy)
def my_task_version_1(x: int):
    print("my_task_version_1 running")
    return x + 42


@task(cache_policy=cache_policy)
def my_task_version_2(x: int):
    print("my_task_version_2 running")
    return x + 43


if __name__ == "__main__":
    thread_1 = threading.Thread(target=my_task_version_1, args=(2,))
    thread_2 = threading.Thread(target=my_task_version_2, args=(2,))

    thread_1.start()
    thread_2.start()

    thread_1.join()
    thread_2.join()

In the updated script, only one of the tasks will run and the other will use the cached value.

This is evidenced by seeing only one of my_task_version_1 running or my_task_version_2 running in the output:

11:34:00.383 | INFO    | Task run 'my_task_version_1' - Created task run 'my_task_version_1' for task 'my_task_version_1'
11:34:00.383 | INFO    | Task run 'my_task_version_2' - Created task run 'my_task_version_2' for task 'my_task_version_2'
my_task_version_1 running
11:34:00.402 | INFO    | Task run 'my_task_version_1' - Finished in state Completed()
11:34:00.405 | INFO    | Task run 'my_task_version_2' - Finished in state Cached(type=COMPLETED)

Locking in a distributed setting

To manage locks in a distributed setting, you will need to use a storage system for locks that is accessible by all of your execution infrastructure.

We recommend using the RedisLockManager provided by prefect-redis in conjunction with a shared Redis instance:

from prefect import task
from prefect.cache_policies import TASK_SOURCE, INPUTS
from prefect.transactions import IsolationLevel

from prefect_redis import RedisLockManager

cache_policy = (INPUTS + TASK_SOURCE).configure(
    isolation_level=IsolationLevel.SERIALIZABLE,
    lock_manager=RedisLockManager(host="my-redis-host"),
)

@task(cache_policy=cache_policy)
def my_cached_task(x: int):
    return x + 42

Handling Non-Serializable Objects

You may have task inputs that can’t (or shouldn’t) be serialized as part of the cache key. There are two direct approaches to handle this, both of which based on the same idea.

You can adjust the serialization logic to only serialize certain properties of an input:

  1. Using a custom cache key function:
from prefect import flow, task
from prefect.cache_policies import CacheKeyFnPolicy, RUN_ID
from prefect.context import TaskRunContext
from pydantic import BaseModel, ConfigDict

class NotSerializable:
    def __getstate__(self):
        raise TypeError("NooOoOOo! I will not be serialized!")

class ContainsNonSerializableObject(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str
    bad_object: NotSerializable

def custom_cache_key_fn(context: TaskRunContext, parameters: dict) -> str:
    return parameters["some_object"].name

@task(cache_policy=CacheKeyFnPolicy(cache_key_fn=custom_cache_key_fn) + RUN_ID)
def use_object(some_object: ContainsNonSerializableObject) -> str:
    return f"Used {some_object.name}"


@flow
def demo_flow():
    obj = ContainsNonSerializableObject(name="test", bad_object=NotSerializable())
    state = use_object(obj, return_state=True) # Not cached!
    assert state.name == "Completed"
    other_state = use_object(obj, return_state=True) # Cached!
    assert other_state.name == "Cached"
    assert state.result() == other_state.result()
  1. Using Pydantic’s custom serialization on your input types:
from pydantic import BaseModel, ConfigDict, model_serializer
from prefect import flow, task
from prefect.cache_policies import INPUTS, RUN_ID

class NotSerializable:
    def __getstate__(self):
        raise TypeError("NooOoOOo! I will not be serialized!")

class ContainsNonSerializableObject(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str
    bad_object: NotSerializable

    @model_serializer
    def ser_model(self) -> dict:
        """Only serialize the name, not the problematic object"""
        return {"name": self.name}

@task(cache_policy=INPUTS + RUN_ID)
def use_object(some_object: ContainsNonSerializableObject) -> str:
    return f"Used {some_object.name}"

@flow
def demo_flow():
    some_object = ContainsNonSerializableObject(
        name="test",
        bad_object=NotSerializable()
    )
    state = use_object(some_object, return_state=True) # Not cached!
    assert state.name == "Completed"
    other_state = use_object(some_object, return_state=True) # Cached!
    assert other_state.name == "Cached"
    assert state.result() == other_state.result()

Choose the approach that best fits your needs:

  • Use Pydantic models when you want consistent serialization across your application
  • Use custom cache key functions when you need different caching logic for different tasks

Multi-task caching

There are many situations in which multiple tasks need to always run together or not at all. This can be achieved in Prefect by configuring these tasks to always write to their caches within a single transaction.

from prefect import task, flow
from prefect.transactions import transaction


@task(cache_key_fn=lambda *args, **kwargs: "static-key-1")
def load_data():
    return "some-data"


@task(cache_key_fn=lambda *args, **kwargs: "static-key-2")
def process_data(data, fail):
    if fail:
        raise RuntimeError("Error! Abort!")

    return len(data)


@flow
def multi_task_cache(fail: bool = True):
    with transaction():
        data = load_data()
        process_data(data=data, fail=fail)

When this flow is run with default parameter values it will fail on the process_data task. The load_data task will succeed. However, because caches are only written to when a transaction is committed, the load_data task will not write a result to its cache key location until the process_data task succeeds as well.

This ensures that anytime you need to rerun this flow both load_data and process_data are executed together. After a successful execution both tasks will be cached until the cache key is updated. Read more about transactions.

Caching example

In this example, until the cache_expiration time is reached, as long as the input to hello_task() remains the same when it is called, the cached return value will be returned. The task is not rerun. However, if the input argument value changes, hello_task() runs using the new input.

from datetime import timedelta
from prefect import flow, task
from prefect.cache_policies import INPUTS
@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def hello_task(name_input):
    # Doing some work
    print("Saying hello")
    return "hello " + name_input

@flow(log_prints=True)
def hello_flow(name_input):
    hello_task(name_input)
    hello_task(name_input) # does not rerun

A more realistic example might include the flow run id in the cache key, so only repeated calls in the same flow run are cached:

from prefect.cache_policies import INPUTS, RUN_ID


@task(cache_policy=INPUTS + RUN_ID, cache_expiration=timedelta(days=1))
def hello_task(name_input):
    # Doing some work
    print("Saying hello")
    return "hello " + name_input


@flow(log_prints=True)
def hello_flow(name_input):
    # reruns each time the flow is run
    hello_task(name_input) 

    # but the same call within the same flow run is Cached
    hello_task(name_input) 

Force ignore the cache

A cache “refresh” instructs Prefect to ignore the data associated with a task’s cache key and rerun no matter what.

The refresh_cache option enables this behavior for a specific task:

import random


def static_cache_key(context, parameters):
    # return a constant
    return "static cache key"


@task(cache_key_fn=static_cache_key, refresh_cache=True)
def caching_task():
    return random.random()

When this task runs, it always updates the cache key instead of using the cached value. This is particularly useful when you have a flow that is responsible for updating the cache.

To refresh the cache for all tasks, use the PREFECT_TASKS_REFRESH_CACHE setting. Setting PREFECT_TASKS_REFRESH_CACHE=true changes the default behavior of all tasks to refresh. This is particularly useful to rerun a flow without cached results. See settings for more details on managing Prefect settings.

If you have tasks that should not refresh when this setting is enabled, you may explicitly set refresh_cache to False. These tasks will never refresh the cache. If a cache key exists it will be read, not updated. If a cache key does not exist yet, these tasks can still write to the cache.

@task(cache_key_fn=static_cache_key, refresh_cache=False)
def caching_task():
    return random.random()