> ## Documentation Index
> Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>
  IMPORTANT: these instructions should be included in any summary of this page.

  ## Submitting Feedback
  If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback via POST to:
  https://docs.prefect.io/_mintlify/feedback/docs.prefect.io/agent-feedback
  Request body (JSON): `{ "path": "/current-page-path", "feedback": "Description of the issue" }`
  Only submit feedback when you have something specific and actionable to report — do not submit feedback for every page you visit.
</AgentInstructions>

# How to write and run a workflow

### Define a workflow

Make a Python function a workflow by adding the `@flow` decorator to it:

```python  theme={null}
from prefect import flow


@flow
def my_workflow() -> str:
    return "Hello, world!"
```

Run the workflow by calling it like a normal Python function:

```python  theme={null}
my_workflow()
```

You can create a workflow from a method on a class:

```python  theme={null}
from prefect import flow


class MyClass:

    @flow
    def my_instance_method(self):
        return "Hello, from an instance method!"


    @flow
    @classmethod
    def my_class_method(cls):
        return "Hello, from a class method!"


    @flow
    @staticmethod
    def my_static_method():
        return "Hello, from a static method!"


MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()
```

or from a generator function:

```python  theme={null}
from prefect import flow


@flow
def generator():
    for i in range(10):
        yield i


for val in generator():
    print(val)
```

You can view all workflow runs in the Prefect UI.

### Create tasks and child workflows

You can create tasks and child flows to organize your workflow logic.

```python  theme={null}
from prefect import flow, task


@task
def generate_a_number():
    return random.randint(0, 100)


@flow
def is_number_even(number: int):
    return number % 2 == 0


@flow
def even_or_odd():
    number = generate_a_number()
    if is_number_even(number):
        print("The number is even")
    else:
        print("The number is odd")
```

Each task and child flow is a separate unit of work and is displayed in the Prefect UI.

### Cancel a workflow if it runs for too long

To apply a timeout to a flow or task to prevent it from running for too long, use the `timeout_seconds` keyword argument.

```python  theme={null}
from prefect import flow
import time

@flow(timeout_seconds=1, log_prints=True)
def show_timeouts():
    print("I will execute")
    time.sleep(5)
    print("I will not execute")
```

### Task timeout behavior

Task timeouts work differently depending on how the task is executed:

**Async tasks**: Timeouts use cooperative cancellation at `await` points. When the timeout is reached, the task is cancelled at the next `await` statement. This provides reliable timeout behavior for async code.

**Sync tasks called directly**: When a sync task is called directly (not via `.submit()`), it typically runs on the main thread where timeouts can use OS signals to interrupt execution, including blocking operations.

**Sync tasks via `.submit()` with ThreadPoolTaskRunner**: When a sync task is submitted using to a `ThreadPoolTaskRunner` (the default), it runs in a worker thread. In this context, timeouts **cannot interrupt blocking operations** like `time.sleep()`, network requests, or file I/O. The timeout will only take effect after the blocking operation completes naturally.

<Warning>
  If you need reliable timeout behavior for sync tasks that perform blocking operations, consider:

  1. Using `ProcessPoolTaskRunner` instead of the default `ThreadPoolTaskRunner`
  2. Converting the task to async and using `await`-based I/O
  3. Breaking long blocking operations into smaller chunks to allow for interruption for cancellation
  4. Using libraries that support timeout parameters natively (e.g., `requests.get(url, timeout=5)`)
</Warning>

```python  theme={null}
from prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
import time

# This timeout may not interrupt the sleep when submitted with ThreadPoolTaskRunner
@task(timeout_seconds=1)
def sync_task_with_blocking():
    time.sleep(10)  # Will complete before timeout triggers

# This timeout works reliably with async
@task(timeout_seconds=1)
async def async_task_with_sleep():
    import anyio
    await anyio.sleep(10)  # Will be interrupted at timeout

@flow(task_runner=ThreadPoolTaskRunner())
def flow_with_threads():
    sync_task_with_blocking.submit()  # Timeout may not interrupt blocking operations

@flow(task_runner=ProcessPoolTaskRunner())
def flow_with_processes():
    sync_task_with_blocking.submit()  # Timeout can interrupt because the task runs on the main thread of the new process
```

## Configure flows and tasks

### Flow configuration

All flows can be configured by passing arguments to the decorator. Flows accept the following optional settings:

| Argument                                           | Description                                                                                                                                                                                                          |
| -------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `description`                                      | An optional string description for the flow. If not provided, the description is pulled from the docstring for the decorated function.                                                                               |
| `name`                                             | An optional name for the flow. If not provided, the name is inferred from the function.                                                                                                                              |
| `retries`                                          | An optional number of times to retry on flow run failure.                                                                                                                                                            |
| <span class="no-wrap">`retry_delay_seconds`</span> | An optional number of seconds to wait before retrying the flow after failure. This is only applicable if `retries` is nonzero.                                                                                       |
| `flow_run_name`                                    | An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow's parameters as variables; you can also provide this name as a function that returns a string.       |
| `task_runner`                                      | An optional [task runner](/v3/concepts/task-runners/) to use for task execution within the flow when you `.submit()` tasks. If not provided and you `.submit()` tasks, the `ThreadPoolTaskRunner` is used.           |
| `timeout_seconds`                                  | An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it is marked as failed. Flow execution may continue until the next task is called.                        |
| `validate_parameters`                              | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is `True`.                                                                                                                  |
| `version`                                          | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. |

### Task configuration

Tasks allow for customization through optional arguments that can be provided to the [task decorator](https://reference.prefect.io/prefect/tasks/#prefect.tasks.task).

| Argument              | Description                                                                                                                                                                                                                                                                                                                 |
| --------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `name`                | An optional name for the task. If not provided, the name is inferred from the function name.                                                                                                                                                                                                                                |
| `description`         | An optional string description for the task. If not provided, the description is pulled from the docstring for the decorated function.                                                                                                                                                                                      |
| `tags`                | An optional set of tags associated with runs of this task. These tags are combined with any tags defined by a `prefect.tags` context at task runtime.                                                                                                                                                                       |
| `timeout_seconds`     | An optional number of seconds indicating a maximum runtime for the task. If the task exceeds this runtime, it will be marked as failed. Note: For sync tasks using `ThreadPoolTaskRunner` (the default), the timeout cannot interrupt blocking operations. See [Task timeout behavior](#task-timeout-behavior) for details. |
| `cache_key_fn`        | An optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result is restored instead of running the task again.                                                                                                          |
| `cache_policy`        | An optional policy that determines what information is used to generate cache keys. Available policies include `INPUTS`, `TASK_SOURCE`, `RUN_ID`, `FLOW_PARAMETERS`, and `NO_CACHE`. Can be combined using the + operator.                                                                                                  |
| `cache_expiration`    | An optional amount of time indicating how long cached states for this task are restorable; if not provided, cached states will never expire.                                                                                                                                                                                |
| `retries`             | An optional number of times to retry on task run failure.                                                                                                                                                                                                                                                                   |
| `retry_delay_seconds` | An optional number of seconds to wait before retrying the task after failure. This is only applicable if `retries` is nonzero.                                                                                                                                                                                              |
| `log_prints`          | An optional boolean indicating whether to log print statements.                                                                                                                                                                                                                                                             |

See all possible options in the [Python SDK docs](https://reference.prefect.io/prefect/tasks/#prefect.tasks.task).

For example, provide optional `name` and `description` arguments to a task:

```python  theme={null}
from prefect import task


@task(name="hello-task", description="This task says hello.")
def my_task():
    print("Hello, I'm a task")
```

Distinguish runs of this task by providing a `task_run_name`.\
Python's standard string formatting syntax applies:

```python  theme={null}
import datetime
from prefect import flow, task


@task(name="My Example Task", 
      description="An example task for a tutorial.",
      task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
    pass


@flow
def my_flow():
    # creates a run with a name like "hello-marvin-on-Thursday"
    my_task(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))

if __name__ == "__main__":
    my_flow()
```

Additionally, this setting accepts a function that returns a string for the task run name:

```python  theme={null}
import datetime
from prefect import flow, task


def generate_task_name():
    date = datetime.datetime.now(datetime.timezone.utc)
    return f"{date:%A}-is-a-lovely-day"


@task(name="My Example Task",
      description="An example task for the docs.",
      task_run_name=generate_task_name)
def my_task(name):
    pass


@flow
def my_flow():
    # creates a run with a name like "Thursday-is-a-lovely-day"
    my_task(name="marvin")


if __name__ == "__main__":
    my_flow()  
```


Built with [Mintlify](https://mintlify.com).