# ETL: The Prefect Way
Let's improve the overall structure of our example workflow introduced in the previous tutorial by using Prefect to encapsulate our code.
Follow along in the Terminal
cd examples/tutorial python 02_etl_flow.py
# It's E, T, L, not ETL
Prefect's smallest unit of work is a Python function; so our first order of business is to rework our example into functions.
The first question that may come to mind is "how large/small should a function be?". An easy approach would be to split the work up into explicit "extract", "transform", and "load" functions, like so:
def extract(...): # fetch all aircraft and reference data ... return all_data def transform(all_data): # clean the live data ... def load(all_data): # save all transformed data and reference data to the database ...
This would be functional, however, it still does not address some of the problems from the original code base:
- What happens to already-fetched reference data if pulling live data fails?
- What happens to the already-transformed data if the database is not available?
These points highlight the fact that
load() are still arbitrarily scoped. This brings us to a rule of thumb when deciding how large to make each function: look at the input and output data that the workflow needs at each step. In our case the reference data and live data come from different sources and are stored separately. Lets refactor a bit more, taking this new insight into consideration:
def extract_reference_data(...): # fetch reference data ... return reference_data def extract_live_data(...): # fetch live data ... return live_data def transform(live_data, reference_data): # clean the live data ... return transformed_data def load_reference_data(reference_data): # save reference data to the database ... def load_live_data(transformed_data): # save transformed live data to the database ...
# Leveraging Prefect
Now that we have appropriately sized functions and an idea of how these functions relate to one another, let's encapsulate our workflow with Prefect.
# First step
Decorate any function that Prefect should run with
from prefect import task, Flow @task def extract_reference_data(...): # fetch reference data ... return reference_data @task def extract_live_data(...): # fetch live data ... return live_data @task def transform(live_data, reference_data): # clean the live data ... return transformed_data @task def load_reference_data(reference_data): # save reference data to the database ... @task def load_live_data(transformed_data): # save transformed live data to the database ...
# Second step
Specify data and task dependencies within a
# ...task definitions above with Flow("Aircraft-ETL") as flow: reference_data = extract_reference_data() live_data = extract_live_data() transformed_live_data = transform(live_data, reference_data) load_reference_data(reference_data) load_live_data(transformed_live_data)
Note: none of the tasks are actually executed at this time, as the
with Flow(...): context manager allows Prefect to reason about dependencies between tasks and build an execution graph that will be executed later. In this case, the execution graph would look like so:
A huge improvement over our original implementation!
# Third step
Execute the Flow!
# ...flow definition above flow.run()
At this point, the
Tasks (our Python functions) are executed in the appropriate order, with data being passed from task-to-task as specified in the execution graph.
Prefect Task Library
Prefect provides a Task Library that includes common Task implementations and integrations with Kubernetes, GitHub, Slack, Docker, AWS, GCP, and more!
Let's parameterize our Flow to make it more reusable.