from prefect import flow
from datetime import datetime
from prefect_shell import ShellOperation
@flow
def download_data():
today = datetime.today().strftime("%Y%m%d")
# for short running operations, you can use the `run` method
# which automatically manages the context
ShellOperation(
commands=[
"mkdir -p data",
"mkdir -p data/${today}"
],
env={"today": today}
).run()
# for long running operations, you can use a context manager
with ShellOperation(
commands=[
"curl -O https://masie_web.apps.nsidc.org/pub/DATASETS/NOAA/G02135/north/daily/data/N_seaice_extent_daily_v3.0.csv",
],
working_dir=f"data/{today}",
) as download_csv_operation:
# trigger runs the process in the background
download_csv_process = download_csv_operation.trigger()
# then do other things here in the meantime, like download another file
...
# when you're ready, wait for the process to finish
download_csv_process.wait_for_completion()
# if you'd like to get the output lines, you can use the `fetch_result` method
output_lines = download_csv_process.fetch_result()
if __name__ == "__main__":
download_data()