Configure task caching
Learn how to use caching to gain efficiency and pipeline idempotency.
Caching refers to the ability of a task run to enter a Completed
state and return a predetermined
value without actually running the code that defines the task.
Caching allows you to efficiently reuse results of tasks that may be expensive to compute
and ensure that your pipelines are idempotent when retrying them due to unexpected failure.
By default Prefect’s caching logic is based on the following attributes of a task invocation:
- the inputs provided to the task
- the code definition of the task
- the prevailing flow run ID, or if executed autonomously, the prevailing task run ID
These values are hashed to compute the task’s cache key. This implies that, by default, calling the same task with the same inputs more than once within a flow will result in cached behavior for all calls after the first. This behavior can be configured - see customizing the cache below.
Caching requires result persistence
Caching requires result persistence, which is off by default.
To turn on result persistence for all of your tasks use the PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting:
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
See managing results for more details on managing your result configuration, and settings for more details on managing Prefect settings.
Cache keys
To determine whether a task run should retrieve a cached state, Prefect uses the concept of a “cache key”.
A cache key is a computed string value that determines where the task’s return value will be persisted within
its configured result storage.
When a task run begins, Prefect first computes its cache key and uses this key to lookup a record in the task’s result
storage.
If an unexpired record is found, this result is returned and the task does not run, but instead, enters a
Cached
state with the corresponding result value.
Cache keys can be shared by the same task across different flows, and even among different tasks, so long as they all share a common result storage location.
By default Prefect stores results locally in ~/.prefect/storage/
.
The filenames in this directory will correspond exactly to computed cache keys from your task runs.
Relationship with result persistence
Task caching and result persistence are intimately related. Because task caching relies on loading a known result, task caching will only work when your task can persist its output to a fixed and known location.
Therefore any configuration which explicitly avoids result persistence will result in your task never
using a cache, for example setting persist_result=False
.
Cache policies
Cache key computation can be configured through the use of cache policies. A cache policy is a recipe for computing cache keys for a given task.
Prefect comes prepackaged with a few common cache policies:
DEFAULT
: this cache policy uses the task’s inputs, its code definition, as well as the prevailing flow run ID to compute the task’s cache key.INPUTS
: this cache policy uses only the task’s inputs to compute the cache key.TASK_SOURCE
: this cache policy uses only the task’s code definition to compute the cache key.FLOW_PARAMETERS
: this cache policy uses only the parameter values provided to the parent flow run to compute the cache key.NONE
: this cache policy always returnsNone
and therefore avoids caching and result persistence altogether.
These policies can be set using the cache_policy
keyword on the task decorator:
from prefect import task
from prefect.cache_policies import TASK_SOURCE
import time
@task(cache_policy=TASK_SOURCE)
def my_stateful_task():
print('sleeping')
time.sleep(10)
return 42
my_stateful_task() # sleeps
my_stateful_task() # does not sleep
No matter how many flows call it, this task will run once and only once until its underlying code is altered:
@task(cache_policy=TASK_SOURCE)
def my_stateful_task():
print('sleeping')
time.sleep(10)
# change the return value, for example
return 43
my_stateful_task() # sleeps again
Customizing the cache
Prefect allows you to configure task caching behavior in numerous ways.
Cache expiration
All cache keys can optionally be given an expiration through the cache_expiration
keyword on
the task decorator.
This keyword accepts a datetime.timedelta
specifying a duration for which the cached value should be
considered valid.
Providing an expiration value results in Prefect persisting an expiration timestamp alongside the result record for the task. This expiration is then applied to all other tasks that may share this cache key.
Cache policies
Cache policies can be composed and altered using basic Python syntax to form more complex policies.
For example, all task policies except for NONE
can be added together to form new policies that combine
the individual policies’ logic into a larger cache key computation.
Combining policies in this way results in caches that are easier to invalidate.
For example:
from prefect import task
from prefect.cache_policies import TASK_SOURCE, INPUTS
@task(cache_policy=TASK_SOURCE + INPUTS)
def my_cached_task(x: int):
return x + 42
This task will rerun anytime you provide new values for x
, or anytime you change the underlying code.
The INPUTS
policy is a special policy that allows you to subtract string values to ignore
certain task inputs:
from prefect import task
from prefect.cache_policies import INPUTS
my_custom_policy = INPUTS - 'debug'
@task(cache_policy=my_custom_policy)
def my_cached_task(x: int, debug: bool = False):
print('running...')
return x + 42
my_cached_task(1)
my_cached_task(1, debug=True) # still uses the cache
Cache key functions
You can configure custom cache policy logic through the use of cache key functions. A cache key function is a function that accepts two positional arguments:
- The first argument corresponds to the
TaskRunContext
, which stores task run metadata. For example, this object has attributestask_run_id
,flow_run_id
, andtask
, all of which can be used in your custom logic. - The second argument corresponds to a dictionary of input values to the task. For example,
if your task has the signature
fn(x, y, z)
then the dictionary will have keys “x”, “y”, and “z” with corresponding values that can be used to compute your cache key.
This function can then be specified using the cache_key_fn
argument on
the task decorator.
For example:
def static_cache_key(context, parameters):
# return a constant
return "static cache key"
@task(cache_key_fn=static_cache_key)
def my_cached_task(x: int):
return x + 1
Multi-task caching
There are many situations in which multiple tasks need to always run together or not at all. This can be achieved in Prefect by configuring these tasks to always write to their caches within a single transaction.
from prefect import task, flow
from prefect.transactions import transaction
@task(cache_key_fn=lambda *args, **kwargs: "static-key-1")
def load_data():
return "some-data"
@task(cache_key_fn=lambda *args, **kwargs: "static-key-2")
def process_data(data, fail):
if fail:
raise RuntimeError("Error! Abort!")
return len(data)
@flow
def multi_task_cache(fail: bool = True):
with transaction():
data = load_data()
process_data(data=data, fail=fail)
When this flow is run with default parameter values it will fail on the process_data
task.
The load_data
task will succeed. However, because caches are only written to when a transaction
is committed, the load_data
task will not write a result to its cache key location until
the process_data
task succeeds as well.
This ensures that anytime you need to rerun this flow both load_data
and process_data
are executed
together.
After a successful execution both tasks will be cached until the cache key is updated.
Read more about transactions.
Caching example
In this example, until the cache_expiration
time is reached, as long as the input to hello_task()
remains
the same when it is called, the cached return value will be returned. The task is not rerun.
However, if the input argument value changes, hello_task()
runs using the new input.
from datetime import timedelta
from prefect import flow, task
from prefect.cache_policies import INPUTS
@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def hello_task(name_input):
# Doing some work
print("Saying hello")
return "hello " + name_input
@flow(log_prints=True)
def hello_flow(name_input):
hello_task(name_input)
hello_task(name_input) # does not rerun
A more realistic example might include the flow run id in the cache key, so only repeated calls in the same flow run are cached:
from prefect.cache_policies import INPUTS, RUN_ID
@task(cache_policy=INPUTS + RUN_ID, cache_expiration=timedelta(days=1))
def hello_task(name_input):
# Doing some work
print("Saying hello")
return "hello " + name_input
@flow(log_prints=True)
def hello_flow(name_input):
# reruns each time the flow is run
hello_task(name_input)
# but the same call within the same flow run is Cached
hello_task(name_input)
Force ignore the cache
A cache “refresh” instructs Prefect to ignore the data associated with a task’s cache key and rerun no matter what.
The refresh_cache
option enables this behavior for a specific task:
import random
def static_cache_key(context, parameters):
# return a constant
return "static cache key"
@task(cache_key_fn=static_cache_key, refresh_cache=True)
def caching_task():
return random.random()
When this task runs, it always updates the cache key instead of using the cached value. This is particularly useful when you have a flow that is responsible for updating the cache.
To refresh the cache for all tasks, use the PREFECT_TASKS_REFRESH_CACHE
setting.
Setting PREFECT_TASKS_REFRESH_CACHE=true
changes the default behavior of all tasks to refresh.
This is particularly useful to rerun a flow without cached results.
See settings for more details on managing Prefect settings.
If you have tasks that should not refresh when this setting is enabled, you may explicitly set refresh_cache
to False
. These tasks will never refresh the cache. If a cache key exists it will be read, not updated.
If a cache key does not exist yet, these tasks can still write to the cache.
@task(cache_key_fn=static_cache_key, refresh_cache=False)
def caching_task():
return random.random()
Was this page helpful?