Manage results
Results represent the data returned by a flow or a task and enable features such as caching.
Results are the bedrock of many Prefect features - most notably transactions and caching - and are foundational to the resilient execution paradigm that Prefect enables. Any return value from a task or a flow is a result. By default these results are not persisted and no reference to them is maintained in the API.
Enabling result persistence allows you to fully benefit from Prefect’s orchestration features.
Turn on persistence globally by default
The simplest way to turn on result persistence globally is through the PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting:
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
See settings for more information on how settings are managed.
Configuring result persistence
There are four categories of configuration for result persistence:
- whether to persist results at all: this is configured through
various keyword arguments and the
PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting. - what filesystem to persist results to: this is configured through the
result_storage
keyword and thePREFECT_DEFAULT_RESULT_STORAGE_BLOCK
setting. - how to serialize and deserialize results: this is configured through
the
result_serializer
keyword and thePREFECT_RESULTS_DEFAULT_SERIALIZER
setting. - what filename to use: this is configured through one of
result_storage_key
,cache_policy
, orcache_key_fn
.
Default persistence configuration
Once result persistence is enabled - whether through the PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting or
through any of the mechanisms described below - Prefect’s default
result storage configuration is activated.
If you enable result persistence and don’t specify a filesystem block, your results will be stored locally.
By default, results are persisted to ~/.prefect/storage/
.
You can configure the location of these results through the PREFECT_LOCAL_STORAGE_PATH
setting.
prefect config set PREFECT_LOCAL_STORAGE_PATH='~/.my-results/'
Enabling result persistence
In addition to the PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting, result persistence can also be
enabled or disabled on both individual flows and individual tasks.
Specifying a non-null value for any of the following keywords on the task decorator will enable result
persistence for that task:
persist_result
: a boolean that allows you to explicitly enable or disable result persistence.result_storage
: accepts either a string reference to a storage block or a storage block class that specifies where results should be stored.result_storage_key
: a string that specifies the filename of the result within the task’s result storage.result_serializer
: a string or serializer that configures how the data should be serialized and deserialized.cache_policy
: a cache policy specifying the behavior of the task’s cache.cache_key_fn
: a function that configures a custom cache policy.
Similarly, setting persist_result=True
, result_storage
, or result_serializer
on a flow will enable
persistence for that flow.
Enabling persistence on a flow enables persistence by default for its tasks
Enabling result persistence on a flow through any of the above keywords will also enable it for all tasks called within that flow by default.
Any settings explicitly set on a task take precedence over the flow settings.
Result storage
You can configure the system of record for your results through the result_storage
keyword argument.
This keyword accepts an instantiated filesystem block, or a block slug. Find your blocks’ slugs with prefect block ls
.
Note that if you want your tasks to share a common cache, your result storage should be accessible by
the infrastructure in which those tasks run. Integrations have cloud-specific storage blocks.
For example, a common distributed filesystem for result storage is AWS S3.
from prefect import flow, task
from prefect_aws.s3 import S3Bucket
test_block = S3Bucket(bucket='test-bucket')
test_block.save('test-block')
# define three tasks
# with different result persistence configuration
@task
def my_task():
return 42
unpersisted_task = my_task.with_options(persist_result=False)
other_storage_task = my_task.with_options(result_storage=test_block)
@flow(result_storage='s3-bucket/my-dev-block')
def my_flow():
# this task will use the flow's result storage
my_task()
# this task will not persist results at all
unpersisted_task()
# this task will persist results to its own bucket using a different S3 block
other_storage_task()
Specifying a default filesystem
Alternatively, you can specify a different filesystem through the PREFECT_DEFAULT_RESULT_STORAGE_BLOCK
setting.
Specifying a block document slug here will enable result persistence using that filesystem as the default.
For example:
prefect config set PREFECT_DEFAULT_RESULT_STORAGE_BLOCK='s3-bucket/my-prod-block'
Note that any explicit configuration of result_storage
on either a flow or task will override this default.
Result filenames
By default, the filename of a task’s result is computed based on the task’s cache policy, which is typically a hash of various pieces of data and metadata. For flows, the filename is a random UUID.
You can configure the filename of the result file within result storage using either:
result_storage_key
: a templated string that can use any of the fields withinprefect.runtime
and the task’s individual parameter values. These templated values will be populated at runtime.cache_key_fn
: a function that accepts the task run context and its runtime parameters and returns a string. See task caching documentation for more information.
If both result_storage_key
and cache_key_fn
are provided, only the result_storage_key
will be used.
The following example writes three different result files based on the name
parameter passed to the task:
from prefect import flow, task
@task(result_storage_key="hello-{parameters[name]}.pickle")
def hello_world(name: str = "world"):
return f"hello {name}"
@flow
def my_flow():
hello_world()
hello_world(name="foo")
hello_world(name="bar")
If a result exists at a given storage key in the storage location, the task will load it without running. To learn more about caching mechanics in Prefect, see the caching documentation.
Result serialization
You can configure how results are serialized to storage using result serializers.
These can be set using the result_serializer
keyword on both tasks and flows.
A default value can be set using the PREFECT_RESULTS_DEFAULT_SERIALIZER
setting, which defaults to pickle
.
Current built-in options include "pickle"
, "json"
, "compressed/pickle"
and "compressed/json"
.
The result_serializer
accepts both a string identifier or an instance of a ResultSerializer
class, allowing
you to customize serialization behavior.
Advanced: Caching results in memory
When running workflows, Prefect keeps the results of all tasks and flows in memory so they can be passed downstream. In some cases, it is desirable to override this behavior. For example, if you are returning a large amount of data from a task, it can be costly to keep it in memory for the entire duration of the flow run.
Flows and tasks both include an option to drop the result from memory once the
result has been committed with cache_result_in_memory
:
@flow(cache_result_in_memory=False)
def foo():
return "pretend this is large data"
@task(cache_result_in_memory=False)
def bar():
return "pretend this is biiiig data"
Was this page helpful?