The simplest way to cache the results of tasks within in a flow is to set persist_result=True on a task definition.

from prefect import task, flow

@task(persist_result=True)
def add_one(x: int):
    return x + 1

@flow
def my_flow():
    add_one(1) # will not be cached
    add_one(1) # will be cached
    add_one(2) # will not be cached

if __name__ == "__main__":
    my_flow()

This will implicitly use the DEFAULT cache policy, which is a composite cache policy defined as:

DEFAULT = INPUTS + TASK_SOURCE + RUN_ID

This means subsequent calls of a task with identical inputs from within the same parent run will return cached results without executing the body of the function.

The TASK_SOURCE component of the DEFAULT cache policy helps avoid naming collisions between similar tasks that should not share a cache.

Cache based on inputs

To cache the result of a task based only on task inputs, set cache_policy=INPUTS in the task decorator:

from prefect import task
from prefect.cache_policies import INPUTS

import time


@task(cache_policy=INPUTS)
def my_stateful_task(x: int):
    print('sleeping')
    time.sleep(10)
    return x + 1

my_stateful_task(x=1) # sleeps
my_stateful_task(x=1) # does not sleep
my_stateful_task(x=2) # sleeps

The above task will sleep the first time it is called with x=1, but will not sleep for any subsequent calls with the same input.

Prefect ships with several cache policies that can be used to customize caching behavior.

Cache based on a subset of inputs

To cache based on a subset of inputs, you can subtract kwargs from the INPUTS cache policy.

from prefect import task
from prefect.cache_policies import INPUTS

import time


@task(cache_policy=INPUTS - 'debug')
def my_stateful_task(x: int, debug: bool = False):
    print('sleeping')
    time.sleep(10)
    return x + 1

my_stateful_task(x=1) # sleeps
my_stateful_task(x=1, debug=True) # does not sleep
my_stateful_task(x=1, debug=False) # does not sleep

Cache with an expiration

To cache with an expiration, set the cache_expiration parameter on the task decorator.

from prefect import task
from prefect.cache_policies import INPUTS

import time
from datetime import timedelta


@task(cache_policy=INPUTS, cache_expiration=timedelta(seconds=10))
def my_stateful_task(x: int):
    print('sleeping')
    time.sleep(10)
    return x + 1

my_stateful_task(x=1) # sleeps
my_stateful_task(x=1) # does not sleep
# ... 10 seconds pass ...
my_stateful_task(x=1) # sleeps again

Ignore the cache

To ignore the cache regardless of the cache policy, set the refresh_cache parameter on the task decorator.

import random

from prefect import task
from prefect.cache_policies import TASK_SOURCE


@task(cache_policy=TASK_SOURCE, refresh_cache=True)
def never_caching_task():
    return random.random()

To refresh the cache for all tasks, use the PREFECT_TASKS_REFRESH_CACHE setting. Setting PREFECT_TASKS_REFRESH_CACHE=true changes the default behavior of all tasks to refresh.

If you have tasks that should not refresh when this setting is enabled, set refresh_cache to False. These tasks will never write to the cache. If a cache key exists it will be read, not updated. If a cache key does not exist yet, these tasks can still write to the cache.

import random

from prefect import task
from prefect.cache_policies import TASK_SOURCE


@task(cache_policy=TASK_SOURCE, refresh_cache=False)
def caching_task():
    return random.random()

Cache on multiple criteria

Cache policies can be combined using the + operator.

from prefect import task
from prefect.cache_policies import INPUTS, TASK_SOURCE

@task(cache_policy=INPUTS + TASK_SOURCE)
def my_task(x: int):
    return x + 1

The above task will use a cached result as long as the same inputs and task source are used.

Cache in a distributed environment

By default Prefect stores results locally in ~/.prefect/storage/. To share the cache across tasks running on different machines, provide a storage block to the result_storage parameter on the task decorator.

Here’s an example with of a task that uses an S3 bucket to store cache records:

from prefect import task
from prefect.cache_policies import INPUTS

from prefect_aws import AwsCredentials, S3Bucket

s3_bucket = S3Bucket(
    credentials=AwsCredentials(
        aws_access_key_id="my-access-key-id",
        aws_secret_access_key="my-secret-access-key",
    ),
    bucket_name="my-bucket",
)
# save the block to ensure it is available across machines
s3_bucket.save("my-cache-bucket")

@task(cache_policy=INPUTS, result_storage=s3_bucket)
def my_cached_task(x: int):
    return x + 42

When using a storage block from a Prefect integration package, the package the storage block is imported from must be installed in all environments where the task will run.

For example, the prefect_aws package must be installed to use the S3Bucket storage block.

Further reading

For more advanced caching examples, see the advanced caching how-to guide.

For more information on Prefect’s caching system, see the caching concepts page.