# Next Steps
In this tutorial we showed how Prefect can be used to improve the behavior of your workflow. However, there is a lot more to Prefect that we didn't cover!
# Logging within your Tasks
Within our tutorial
Tasks are displaying information via Python's built in
@task def load_reference_data(ref_data): print("saving reference data...") db = aclib.Database() db.update_reference_data(ref_data)
However, there is already a
Logger object made available to you within
@task def load_reference_data(ref_data): logger = prefect.context.get("logger") logger.info("saving reference data...") db = aclib.Database() db.update_reference_data(ref_data)
# Rich Handlers for Storing Task Results
In this particular Aircraft ETL example we did not have an explicit need to keep intermediate results at each step along the way. However, Prefect provides a
ResultHandler abstraction that enables users to persist
Results returned from each
Task to a storage of choice:
from prefect.engine.result_handlers import LocalResultHandler handler = LocalResultHandler(dir="./my-results") with Flow("Aircraft-ETL", result_handler=handler) as flow: ... flow.run()
Now any results generated by individual
Tasks are written out to the
./my-results directory, relative to where the Flow was executed from. Prefect provides many supported Result Handlers to persist results to common storage options such as S3, Google Cloud Storage, Azure Storage, and more.
Read More About Result Handlers
# Advanced Control Structures
In this tutorial we could have gone one step further and asked for fetching all aircraft data within X radius of multiple Y airports. In this case
Mapping would be the easiest way to tackle this, calling our
extract_live_data Task for each desired airport.
# Ready-made Task Library
Within our tutorial
Tasks were always user-provided functions, however, Prefect provides a library of ready-made
Tasks. Here's an example
Flow that uses
ShellTasks to run arbitrary commands:
from prefect import Flow from prefect.tasks.shell import ShellTask shell = ShellTask() with Flow("Simple Pipeline") as flow: flow.chain( shell(command='pip install -r requirements.txt'), shell(command='black --check .'), shell(command='pytest .'), ) flow.run()
The Task library includes integrations with Kubernetes, GitHub, Slack, Docker, AWS, GCP, and more!