Write and run flows
Learn the basics of defining and running flows.
A flow is a container for workflow logic.
Flows are defined as Python functions. They can take inputs, perform work, and return a result.
Make any Python function a Prefect flow by adding the @flow
decorator to it:
from prefect import flow
@flow
def my_flow() -> str:
return "Hello, world!"
if __name__ == "__main__":
print(my_flow())
When a function becomes a flow, it gains the following capabilities:
- Metadata about flow runs, such as run time and final state, is automatically tracked.
- Each state the flow enters is recorded. This allows you to observe and act upon each transition in the flow’s execution.
- Input arguments can be type validated as workflow parameters.
- Retries can be performed on failure, with configurable delay and retry limits.
- Timeouts can be enforced to prevent unintentional, long-running workflows.
- A flow can be deployed, which exposes an API for interacting with it remotely.
Flows are uniquely identified by name.
You can provide a name
parameter value for the flow:
from prefect import flow
@flow(name="My Flow")
def my_flow() -> str:
return "Hello, world!"
If you don’t provide a name, Prefect uses the flow function name.
Run flows
A flow run is a single execution of a flow.
Create a flow run by calling a flow by its function name, just as you would a normal Python function.
You can also create a flow run by:
- Using external schedulers such as
cron
to invoke a flow function - Triggering a deployment of that flow in Prefect Cloud or self-hosted Prefect server
- Starting a flow run for the deployment through a schedule, the Prefect UI, or the Prefect API
However you run your flow, Prefect monitors the flow run, capturing its state for observability. You can log a variety of metadata about flow runs for monitoring, troubleshooting, and auditing purposes.
The example below uses the HTTPX client library to fetch statistics about the main Prefect repository.
import httpx
from prefect import flow
@flow
def get_repo_info():
url = "https://api.github.com/repos/PrefectHQ/prefect"
response = httpx.get(url)
response.raise_for_status()
repo = response.json()
print("PrefectHQ/prefect repository statistics 🤓:")
print(f"Stars 🌠 : {repo['stargazers_count']}")
print(f"Forks 🍴 : {repo['forks_count']}")
if __name__ == "__main__":
get_repo_info()
Running this script results in the following output:
12:47:42.792 | INFO | prefect.engine - Created flow run 'ludicrous-warthog' for flow 'get-repo-info'
PrefectHQ/prefect repository statistics 🤓:
Stars 🌠 : 12146
Forks 🍴 : 1245
12:47:45.008 | INFO | Flow run 'ludicrous-warthog' - Finished in state Completed()
Specify flow parameters
As with any Python function, you can pass arguments to a flow, including both positional and keyword arguments. These arguments defined on your flow function are called parameters. They are stored by the Prefect orchestration engine on the flow run object.
Prefect automatically performs type conversion of inputs using any provided type hints. Type hints provide a simple way to enforce typing on your flow parameters and can be customized with Pydantic. Prefect supports any Pydantic model as a type hint for a flow parameter.
For example, to automatically convert an argument to a datetime object:
from datetime import (
datetime,
timezone,
)
from typing import Optional
from prefect import flow
@flow
def what_day_is_it(date: Optional[datetime] = None):
if date is None:
date = datetime.now(timezone.utc)
print(f"It was {date.strftime('%A')} on {date.isoformat()}")
if __name__ == "__main__":
what_day_is_it("2021-01-01T02:00:19.180906")
When you run this flow, you’ll see the following output:
It was Friday on 2021-01-01T02:00:19.180906
You may pass also a dict
representation of a BaseModel
parameter and it will be coerced:
from prefect import flow
from pydantic import BaseModel
class Model(BaseModel):
a: int
b: str
@flow
def flow_that_validates_parameters(model: Model): ...
if __name__ == "__main__":
flow_that_validates_parameters(
model={"a": "WRONG", "b": "fine"}
)
This flow run will fail with the following error:
Flow run received invalid parameters:
- model.a: Input should be a valid integer, unable to parse string as an integer
Note that you can provide parameter values to a flow through the API using a deployment. Flow run parameters sent to the API are coerced to the appropriate types when possible.
Prefect API requires keyword arguments
When creating flow runs from the Prefect API, you must specify parameter names when overriding defaults. The values passed cannot be positional.
Parameters are validated before a flow is run.
If a flow run for a deployment receives invalid parameters, it moves from a Pending
state to a Failed
state without entering a Running
state.
Flow run parameters cannot exceed 512kb
in size.
Compose flows
Flows can call tasks, the most granular units of orchestrated work in Prefect workflows:
from prefect import flow, task
@task
def print_hello(name):
print(f"Hello {name}!")
@flow(name="Hello Flow")
def hello_world(name="world"):
print_hello(name)
A single flow function can contain all of your workflow’s code. However, if you put all of your workflow logic in a single flow function and any line of code fails, the entire flow fails and must be retried from the beginning. The more granular you make your workflows, the better they can recover from failures and the easier you can find and fix issues.
Prefect tasks are well suited for parallel or distributed execution using distributed computation frameworks such as Dask or Ray.
Nest flows
In addition to calling tasks from a flow, flows can also call other flows. A nested flow run is created when a flow function is called by another flow. When one flow calls another, the calling flow run is the “parent” run, the called flow run is the “child” run.
In the UI, each child flow run is linked to its parent and can be individually observed.
For most purposes, nested flow runs behave just like unnested flow runs. There is a full representation of the nested flow run in the backend as if it had been called separately. Nested flow runs differ from normal flow runs in that they resolve any passed task futures into data. This allows data to be passed from the parent flow run to a nested flow run easily.
When a nested flow run starts, it creates a new task runner for any tasks it contains. When the nested flow run completes, the task runner shuts down. Nested flow runs block execution of the parent flow run until completion. However, asynchronous nested flows can run concurrently with AnyIO task groups or asyncio.gather.
The relationship between nested runs is recorded through a special task run in the parent flow run that represents the child flow run.
The state_details
field of the task run representing the child flow run includes a child_flow_run_id
.
The state_details
field of the nested flow run includes a parent_task_run_id
.
You can define multiple flows within the same file. Whether running locally or through a deployment, you must indicate which flow is the entrypoint for a flow run.
Cancel nested flow runs
A nested flow run cannot be cancelled without cancelling its parent flow run. If you need to be able to cancel a nested flow run independent of its parent flow run, we recommend deploying it separately and starting it with the run_deployment method.
You can also define flows or tasks in separate modules and import them for use:
from prefect import flow, task
@flow(name="Nestedflow")
def my_nested_flow(msg):
print(f"Nestedflow says: {msg}")
Here’s a parent flow that imports and uses my_nested_flow
as a nested flow:
from prefect import flow, task
from myproject.flows import my_nested_flow
@task(name="Print Hello")
def print_hello(name):
msg = f"Hello {name}!"
print(msg)
return msg
@flow(name="Hello Flow")
def hello_world(name="world"):
message = print_hello(name)
my_nested_flow(message)
if __name__=="__main__":
hello_world("Marvin")
Running the hello_world()
flow creates a flow run like this:
08:24:06.617 | INFO | prefect.engine - Created flow run 'sage-mongoose' for flow 'Hello Flow'
08:24:06.620 | INFO | prefect.engine - View at https://app.prefect.cloud/...
08:24:07.113 | INFO | Task run 'Print Hello-0' - Created task run 'Print Hello-0' for task 'Print Hello'
Hello Marvin!
08:24:07.445 | INFO | Task run 'Print Hello-0' - Finished in state Completed()
08:24:07.825 | INFO | Flow run 'sage-mongoose' - Created subflow run 'powerful-capybara' for flow 'Nestedflow'
08:24:07.826 | INFO | prefect.engine - View at https://app.prefect.cloud/...
Nestedflow says: Hello Marvin!
08:24:08.165 | INFO | Flow run 'powerful-capybara' - Finished in state Completed()
08:24:08.296 | INFO | Flow run 'sage-mongoose' - Finished in state Completed()
Here are some scenarios where you might want to define a nested flow rather than call tasks individually:
- Observability: Nested flows, like any other flow run, have first-class observability within the Prefect UI and Prefect Cloud. You’ll see nested flows’ status in the Runs dashboard rather than having to dig down into the tasks within a specific flow run. See Final state determination for examples of using task state within flows.
- Conditional flows: If you have a group of tasks that run only under certain conditions, you can group them within a nested flow and conditionally run the nested flow rather than each task individually.
- Parameters: Flows have first-class support for parameterization, making it easy to run the same group of tasks in different use cases by simply passing different parameters to the nested flow in which they run.
- Task runners: Nested flows enable you to specify the task runner used for tasks within the flow. For example, to optimize parallel execution of certain tasks with Dask, group them in a nested flow that uses the Dask task runner. You can use a different task runner for each nested flow.
Supported functions
Almost any standard Python function can be turned into a Prefect flow by adding the @flow
decorator.
Flows are executed in the main thread by default to facilitate native Python debugging and profiling.
As shown in the examples above, flows run synchronously by default.
Asynchronous functions
Prefect also supports asynchronous execution. The resulting flows are coroutines that can be awaited or run concurrently, following the standard rules of async Python. For example:
import asyncio
from prefect import task, flow
@task
async def print_values(values):
for value in values:
await asyncio.sleep(1)
print(value, end=" ")
@flow
async def async_flow():
print("Hello, I'm an async flow")
# runs immediately
await print_values([1, 2])
# runs concurrently
coros = [print_values("abcd"), print_values("6789")]
await asyncio.gather(*coros)
if __name__ == "__main__":
asyncio.run(async_flow())
Class methods
Prefect supports synchronous and asynchronous class methods as flows, including instance methods, class methods, and static methods.
For class methods and static methods, apply the appropriate method decorator above the @flow
decorator:
from prefect import flow
class MyClass:
@flow
def my_instance_method(self):
pass
@classmethod
@flow
def my_class_method(cls):
pass
@staticmethod
@flow
def my_static_method():
pass
MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()
Generators
Prefect supports synchronous and asynchronous generators as flows.
The flow is considered to be Running
as long as the generator is yielding values.
When the generator is exhausted, the flow is considered Completed
.
Any values yielded by the generator can be consumed by other flows or tasks.
from prefect import flow
@flow
def generator():
for i in range(10):
yield i
@flow
def consumer(x):
print(x)
for val in generator():
consumer(val)
Generator functions are consumed when returned from flows
The result of a completed flow must be serializable, but generators cannot be serialized. Therefore, if you return a generator from a flow, the generator will be fully consumed and its yielded values will be returned as a list. This can lead to unexpected behavior or blocking if the generator is infinite or very large.
Here is an example of proactive generator consumption:
from prefect import flow
def gen():
yield from [1, 2, 3]
print('Generator consumed!')
@flow
def f():
return gen()
f() # prints 'Generator consumed!'
If you need to return a generator without consuming it, you can yield
it instead of using return
.
Values yielded from generator flows are not considered final results and do not face the same serialization constraints:
from prefect import flow
def gen():
yield from [1, 2, 3]
print('Generator consumed!')
@flow
def f():
yield gen
generator = next(f())
list(generator()) # prints 'Generator consumed!'
Flow runs
A flow run is a single execution of a flow.
You can create a flow run by calling the flow function manually, or even by using an external scheduler such as cron
to invoke a flow function.
Most users run flows by creating a deployment on Prefect Cloud or Prefect server and then scheduling a flow run for the deployment through a schedule, the Prefect UI, or the Prefect API.
However you run a flow, the Prefect API monitors the flow run and records information for monitoring, troubleshooting, and auditing.
Flow settings
All flows can be configured by passing arguments to the decorator. Flows accept the following optional settings:
Argument | Description |
---|---|
description | An optional string description for the flow. If not provided, the description is pulled from the docstring for the decorated function. |
name | An optional name for the flow. If not provided, the name is inferred from the function. |
retries | An optional number of times to retry on flow run failure. |
retry_delay_seconds | An optional number of seconds to wait before retrying the flow after failure. This is only applicable if retries is nonzero. |
flow_run_name | An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow’s parameters as variables; you can also provide this name as a function that returns a string. |
task_runner | An optional task runner to use for task execution within the flow when you .submit() tasks. If not provided and you .submit() tasks, the ThreadPoolTaskRunner is used. |
timeout_seconds | An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it is marked as failed. Flow execution may continue until the next task is called. |
validate_parameters | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is True . |
version | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. |
For example, you can provide name
and description
arguments.
from prefect import flow
@flow(
name="My Flow", description="My flow with a name and description", log_prints=True)
def my_flow():
print("Hello, I'm a flow")
if __name__ == "__main__":
my_flow()
If no description is provided, a flow function’s docstring is used as the description.
You can distinguish runs of a flow by passing a flow_run_name
.
This parameter accepts a string that can contain templated references to the parameters of your flow.
The name is formatted using Python’s standard string formatting syntax:
import datetime
from prefect import flow
@flow(flow_run_name="{name}-on-{date:%A}")
def my_flow(name: str, date: datetime.datetime):
pass
# creates a flow run called 'marvin-on-Thursday'
if __name__ == "__main__":
my_flow(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))
This setting also accepts a function that returns a string for the flow run name:
import datetime
from prefect import flow
def generate_flow_run_name():
date = datetime.datetime.now(datetime.timezone.utc)
return f"{date:%A}-is-a-nice-day"
@flow(flow_run_name=generate_flow_run_name)
def my_flow(name: str):
pass
# creates a flow run named 'Thursday-is-a-nice-day'
if __name__ == "__main__":
my_flow(name="marvin")
If you need access to information about the flow, use the prefect.runtime
module. For example:
from prefect import flow
from prefect.runtime import flow_run
def generate_flow_run_name():
flow_name = flow_run.flow_name
parameters = flow_run.parameters
name = parameters["name"]
limit = parameters["limit"]
return f"{flow_name}-with-{name}-and-{limit}"
@flow(flow_run_name=generate_flow_run_name)
def my_flow(name: str, limit: int = 100):
pass
# creates a flow run named 'my-flow-with-marvin-and-100'
if __name__ == "__main__":
my_flow(name="marvin")
Note that validate_parameters
checks that input values conform to the annotated types on the function.
Where possible, values are coerced into the correct type.
For example, if a parameter is defined as x: int
and the string “5” is passed, it resolves to 5
.
If set to False
, no validation is performed on flow parameters.
Final state determination
A state is a record of the status of a particular task run or flow run. See the manage states page for more information.
The final state of a flow is determined by its return value. The following rules apply:
- If an exception is raised directly in the flow function, the flow run is marked as
FAILED
. - If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
- If a flow returns an iterable of states, the presence of any
FAILED
state will cause the run to be marked asFAILED
.
In any other situation in which the flow returns without error, it will be marked as COMPLETED
.
If you manipulate states programmatically, you can create situations in which tasks within a flow can fail and not cause flow run failure. For example:
from prefect import flow, task
@task
def add_one(x):
return x + 1
@flow
def my_flow():
# avoided raising an exception via `return_state=True`
state = add_one("1", return_state=True)
assert state.is_failed()
# the flow function returns successfully!
return
If state
were returned from the flow function, the run would be marked as FAILED
.
Return a future
If a flow returns one or more futures, the final state is determined based on the underlying states.
from prefect import flow, task
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def always_succeeds_flow():
x = always_fails_task.submit().result(raise_on_failure=False)
y = always_succeeds_task.submit(wait_for=[x])
return y
if __name__ == "__main__":
always_succeeds_flow()
This flow run finishes in a Completed final state because the flow returns the future of the task that succeeds:
18:35:24.965 | INFO | prefect.engine - Created flow run 'whispering-guan' for flow 'always-succeeds-flow'
18:35:25.204 | INFO | Flow run 'whispering-guan' - Created task run 'always_fails_task-96e4be14-0' for task 'always_fails_task'
18:35:25.205 | INFO | Flow run 'whispering-guan' - Submitted task run 'always_fails_task-96e4be14-0' for execution.
18:35:25.232 | ERROR | Task run 'always_fails_task-96e4be14-0' - Encountered exception during execution:
Traceback (most recent call last):
...
ValueError: I fail successfully
18:35:25.265 | ERROR | Task run 'always_fails_task-96e4be14-0' - Finished in state Failed('Task run encountered an exception.')
18:35:25.289 | INFO | Flow run 'whispering-guan' - Created task run 'always_succeeds_task-9c27db32-0' for task 'always_succeeds_task'
18:35:25.289 | INFO | Flow run 'whispering-guan' - Submitted task run 'always_succeeds_task-9c27db32-0' for execution.
I'm fail safe!
18:35:25.335 | INFO | Task run 'always_succeeds_task-9c27db32-0' - Finished in state Completed()
18:35:25.362 | INFO | Flow run 'whispering-guan' - Finished in state Completed('All states completed.')
Return multiple states or futures
If a flow returns a mix of futures and states, the final state is determined by resolving all futures to states,
then determining if any of the states are not COMPLETED
.
from prefect import task, flow
@task
def always_fails_task():
raise ValueError("I am bad task")
@task
def always_succeeds_task():
return "foo"
@flow
def always_succeeds_flow():
return "bar"
@flow
def always_fails_flow():
x = always_fails_task()
y = always_succeeds_task()
z = always_succeeds_flow()
return x, y, z
Running always_fails_flow
fails because one of the three returned futures fails.
Note that the states of each of the returned futures are included in the flow run output:
...
20:57:52.438 | INFO | Flow run 'unbiased-firefly' - Finished in state Completed()
20:57:52.811 | ERROR | Flow run 'impartial-gorilla' - Finished in state Failed('1/3 states failed.')
Failed(message='1/3 states failed.', type=FAILED, result=(Failed(message='Task run encountered an exception.', type=FAILED, result=ValueError('I am bad task'), task_run_id=5fd4c697-7c4c-440d-8ebc-dd9c5bbf2245), Completed(message=None, type=COMPLETED, result='foo', task_run_id=df9b6256-f8ac-457c-ba69-0638ac9b9367), Completed(message=None, type=COMPLETED, result='bar', task_run_id=cfdbf4f1-dccd-4816-8d0f-128750017d0c)), flow_run_id=6d2ec094-001a-4cb0-a24e-d2051db6318d)
If multiple states are returned, they must be contained in a set
, list
, or tuple
.
Return a manual state
If a flow returns a manually created state, the final state is determined based upon the return value.
from prefect import task, flow
from prefect.states import Completed, Failed
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def always_succeeds_flow():
x = always_fails_task.submit()
y = always_succeeds_task.submit()
if y.result() == "success":
return Completed(message="I am happy with this result")
else:
return Failed(message="How did this happen!?")
if __name__ == "__main__":
always_succeeds_flow()
Running this flow produces the following result:
...
ValueError: I fail successfully
07:29:34.754 | INFO | Task run 'always_succeeds_task-0' - Created task run 'always_succeeds_task-0' for task 'always_succeeds_task'
07:29:34.848 | ERROR | Task run 'always_fails_task-0' - Finished in state Failed('Task run encountered an exception ValueError: I fail successfully')
I'm fail safe!
07:29:35.086 | INFO | Task run 'always_succeeds_task-0' - Finished in state Completed()
07:29:35.225 | INFO | Flow run 'hidden-butterfly' - Finished in state Completed('I am happy with this result')
If a flow run returns any other object, then it is recorded as COMPLETED
Custom named states
You can also create custom named states to provide more granularity in your flow run states.
For example, we could create a Skipped
state to indicate that a flow run was skipped.
from prefect import flow
from prefect.states import Completed
@flow
def my_flow(work_to_do: bool):
if not work_to_do:
return Completed(message="No work to do 💤", name="Skipped")
else:
return Completed(message="Work was done 💪")
if __name__ == "__main__":
my_flow(work_to_do=False)
Running this flow produces the following result:
15:26:49.644 | INFO | Flow run 'liberal-zebra' - Finished in state Skipped('No work to do', type=COMPLETED)
Retries
Unexpected errors may occur in workflows. For example the GitHub API may be temporarily unavailable or rate limited.
Prefect can automatically retry flow runs on failure.
To enable retries, pass an integer to the flow’s retries
parameter.
If the flow run fails, Prefect will retry it up to retries
times.
If the flow run fails on the final retry, Prefect records the final flow run state as failed.
Optionally, pass an integer to retry_delay_seconds
to specify how many seconds to wait between each retry attempt.
Check out Transactions to make your flows even more resilient and rollback actions when desired.
See also
Was this page helpful?