Define a workflow

Make a Python function a workflow by adding the @flow decorator to it:

from prefect import flow


@flow
def my_workflow() -> str:
    return "Hello, world!"

Run the workflow by calling it like a normal Python function:

my_workflow()

You can create a workflow from a method on a class:

from prefect import flow


class MyClass:

    @flow
    def my_instance_method(self):
        return "Hello, from an instance method!"


    @flow
    @classmethod
    def my_class_method(cls):
        return "Hello, from a class method!"


    @flow
    @staticmethod
    def my_static_method():
        return "Hello, from a static method!"


MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()

or from a generator function:

from prefect import flow


@flow
def generator():
    for i in range(10):
        yield i


for val in generator():
    print(val)

You can view all workflow runs in the Prefect UI.

Create tasks and child workflows

You can create tasks and child flows to organize your workflow logic.

from prefect import flow, task


@task
def generate_a_number():
    return random.randint(0, 100)


@flow
def is_number_even(number: int):
    return number % 2 == 0


@flow
def even_or_odd():
    number = generate_a_number()
    if is_number_even(number):
        print("The number is even")
    else:
        print("The number is odd")

Each task and child flow is a separate unit of work and is displayed in the Prefect UI.

Cancel a workflow if it runs for too long

To apply a timeout to a flow or task to prevent it from running for too long, use the timeout_seconds keyword argument.

from prefect import flow
import time

@flow(timeout_seconds=1, log_prints=True)
def show_timeouts():
    print("I will execute")
    time.sleep(5)
    print("I will not execute")

Configure flows and tasks

Flow configuration

All flows can be configured by passing arguments to the decorator. Flows accept the following optional settings:

ArgumentDescription
descriptionAn optional string description for the flow. If not provided, the description is pulled from the docstring for the decorated function.
nameAn optional name for the flow. If not provided, the name is inferred from the function.
retriesAn optional number of times to retry on flow run failure.
retry_delay_secondsAn optional number of seconds to wait before retrying the flow after failure. This is only applicable if retries is nonzero.
flow_run_nameAn optional name to distinguish runs of this flow; this name can be provided as a string template with the flow’s parameters as variables; you can also provide this name as a function that returns a string.
task_runnerAn optional task runner to use for task execution within the flow when you .submit() tasks. If not provided and you .submit() tasks, the ThreadPoolTaskRunner is used.
timeout_secondsAn optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it is marked as failed. Flow execution may continue until the next task is called.
validate_parametersBoolean indicating whether parameters passed to flows are validated by Pydantic. Default is True.
versionAn optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null.

Task configuration

Tasks allow for customization through optional arguments that can be provided to the task decorator.

ArgumentDescription
nameAn optional name for the task. If not provided, the name is inferred from the function name.
descriptionAn optional string description for the task. If not provided, the description is pulled from the docstring for the decorated function.
tagsAn optional set of tags associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime.
timeout_secondsAn optional number of seconds indicating a maximum runtime for the task. If the task exceeds this runtime, it will be marked as failed.
cache_key_fnAn optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result is restored instead of running the task again.
cache_policyAn optional policy that determines what information is used to generate cache keys. Available policies include INPUTS, TASK_SOURCE, RUN_ID, FLOW_PARAMETERS, and NO_CACHE. Can be combined using the + operator.
cache_expirationAn optional amount of time indicating how long cached states for this task are restorable; if not provided, cached states will never expire.
retriesAn optional number of times to retry on task run failure.
retry_delay_secondsAn optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero.
log_printsAn optional boolean indicating whether to log print statements.

See all possible options in the Python SDK docs.

For example, provide optional name and description arguments to a task:

from prefect import task


@task(name="hello-task", description="This task says hello.")
def my_task():
    print("Hello, I'm a task")

Distinguish runs of this task by providing a task_run_name.
Python’s standard string formatting syntax applies:

import datetime
from prefect import flow, task


@task(name="My Example Task", 
      description="An example task for a tutorial.",
      task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
    pass


@flow
def my_flow():
    # creates a run with a name like "hello-marvin-on-Thursday"
    my_task(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))

if __name__ == "__main__":
    my_flow()

Additionally, this setting accepts a function that returns a string for the task run name:

import datetime
from prefect import flow, task


def generate_task_name():
    date = datetime.datetime.now(datetime.timezone.utc)
    return f"{date:%A}-is-a-lovely-day"


@task(name="My Example Task",
      description="An example task for the docs.",
      task_run_name=generate_task_name)
def my_task(name):
    pass


@flow
def my_flow():
    # creates a run with a name like "Thursday-is-a-lovely-day"
    my_task(name="marvin")


if __name__ == "__main__":
    my_flow()