Process big data
Guidance for using Prefect with big data.
This page explores methods to reduce the processing time or memory utilization of Prefect workflows that process large volumes of data, without editing your Python code. There are several options to optimize your Python code for speed, memory, compute, and storage, including:
- Removing task introspection with
quote
to save time running your code. - Writing task results to cloud storage such as S3 using a block to save memory.
- Saving data to disk within a flow rather than using results.
- Caching task results to save time and compute.
- Compressing results written to disk to save space.
- Using a task runner for parallelizable operations to save time.
Remove task introspection
When a task is called from a flow, each argument is introspected by Prefect, by default.
To speed up your flow runs, disable this behavior for a task by wrapping the argument using
quote
.
Here’s a basic example that extracts and transforms some New York taxi data:
from prefect import task, flow
from prefect.utilities.annotations import quote
import pandas as pd
@task
def extract(url: str):
"""Extract data"""
df_raw = pd.read_parquet(url)
print(df_raw.info())
return df_raw
@task
def transform(df: pd.DataFrame):
"""Basic transformation"""
df["tip_fraction"] = df["tip_amount"] / df["total_amount"]
print(df.info())
return df
@flow(log_prints=True)
def et(url: str):
"""ET pipeline"""
df_raw = extract(url)
df = transform(quote(df_raw))
if __name__ == "__main__":
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-09.parquet"
et(url)
Introspection can take significant time when the object being passed is a large collection, such as dictionary or DataFrame, where each element needs to be visited.
Using quote
reduces execution time at the expense of disabling task dependency
tracking for the wrapped object.
Write task results to cloud storage
By default, the results of task runs are stored in memory in your execution environment. This behavior makes flow runs fast for small data, but can be problematic for large data. Save memory by writing results to disk.
In production, it’s recommended to write results to a cloud provider storage such as AWS S3. Prefect lets you use a storage block from a Prefect Cloud integration library such as prefect-aws to save your configuration information. Learn more about blocks.
Install the relevant library, register the block type with the server, and create your block. Then reference the block in your flow:
...
from prefect_aws.s3 import S3Bucket
my_s3_block = S3Bucket.load("MY_BLOCK_NAME")
...
@task(result_storage=my_s3_block)
The result of the task writes to S3, rather than stored in memory.
Save data to disk within a flow
To save memory and time with big data, you don’t need to pass results between tasks. Instead, you can write and read data to disk directly in your flow code. Prefect has integration libraries for each of the major cloud providers.
Each library contains blocks with methods that make it convenient to read and write data to and from cloud object storage.
Cache task results
Caching saves you time and compute by allowing you to avoid re-running tasks unnecessarily. Note that caching requires task result persistence. Learn more about caching.
Compress results written to disk
If you’re using Prefect’s task result persistence, save disk space by compressing the results.
Specify the result type with compressed/
prefixed:
@task(result_serializer="compressed/json")
Note that compression takes time to compress and decompress the data.
Use a task runner for parallelizable operations
Prefect’s task runners allow you to use the Dask and Ray Python libraries to run tasks in parallel, distributed across multiple machines. This can save you time and compute when operating on large data structures. See the guide to working with Dask and Ray Task Runners for details.
Was this page helpful?