How to write and run a workflow
Define a workflow
Make a Python function a workflow by adding the @flow
decorator to it:
Run the workflow by calling it like a normal Python function:
You can create a workflow from a method on a class:
or from a generator function:
You can view all workflow runs in the Prefect UI.
Create tasks and child workflows
You can create tasks and child flows to organize your workflow logic.
Each task and child flow is a separate unit of work and is displayed in the Prefect UI.
Cancel a workflow if it runs for too long
To apply a timeout to a flow or task to prevent it from running for too long, use the timeout_seconds
keyword argument.
Configure flows and tasks
Flow configuration
All flows can be configured by passing arguments to the decorator. Flows accept the following optional settings:
Argument | Description |
---|---|
description | An optional string description for the flow. If not provided, the description is pulled from the docstring for the decorated function. |
name | An optional name for the flow. If not provided, the name is inferred from the function. |
retries | An optional number of times to retry on flow run failure. |
retry_delay_seconds | An optional number of seconds to wait before retrying the flow after failure. This is only applicable if retries is nonzero. |
flow_run_name | An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow’s parameters as variables; you can also provide this name as a function that returns a string. |
task_runner | An optional task runner to use for task execution within the flow when you .submit() tasks. If not provided and you .submit() tasks, the ThreadPoolTaskRunner is used. |
timeout_seconds | An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it is marked as failed. Flow execution may continue until the next task is called. |
validate_parameters | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is True . |
version | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. |
Task configuration
Tasks allow for customization through optional arguments that can be provided to the task decorator.
Argument | Description |
---|---|
name | An optional name for the task. If not provided, the name is inferred from the function name. |
description | An optional string description for the task. If not provided, the description is pulled from the docstring for the decorated function. |
tags | An optional set of tags associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime. |
timeout_seconds | An optional number of seconds indicating a maximum runtime for the task. If the task exceeds this runtime, it will be marked as failed. |
cache_key_fn | An optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result is restored instead of running the task again. |
cache_policy | An optional policy that determines what information is used to generate cache keys. Available policies include INPUTS , TASK_SOURCE , RUN_ID , FLOW_PARAMETERS , and NO_CACHE . Can be combined using the + operator. |
cache_expiration | An optional amount of time indicating how long cached states for this task are restorable; if not provided, cached states will never expire. |
retries | An optional number of times to retry on task run failure. |
retry_delay_seconds | An optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero. |
log_prints | An optional boolean indicating whether to log print statements. |
See all possible options in the Python SDK docs.
For example, provide optional name
and description
arguments to a task:
Distinguish runs of this task by providing a task_run_name
.
Python’s standard string formatting syntax applies:
Additionally, this setting accepts a function that returns a string for the task run name: