Skip to main content
from prefect import task

@task(log_prints=True)
def explain_tasks():
    print("run any python code here!")
    print("but maybe just a little bit")

if __name__ == "__main__":
    explain_tasks()

What is a task?

Tasks are defined as decorated Python functions. Above, explain_tasks is an instance of a task. Tasks are cache-able and retryable units of work that are easy to execute concurrently, in parallel, and/or with transactional semantics. Like flows, tasks are free to call other tasks or flows, there is no required nesting pattern. Generally, tasks behave like normal Python functions, but they have some additional capabilities:
  • Metadata about task runs, such as run time and final state, is automatically tracked
  • Each state the task enters is recorded, enabling observability and state-based logic
  • Futures from upstream tasks are automatically resolved by downstream tasks
  • Retries can be performed on failure, with configurable delay and retry limits
  • Caching enables result reuse across workflow executions
  • Concurrency via .submit() and .map() allow concurrent execution within and across workflows
  • Timeouts can be enforced to prevent unintentional, long-running operations
Tasks are uniquely identified by a task key, which is a hash composed of the task name and the fully qualified name of the function.

Running a task

A task run is a representation of a single invocation of a task.

The life of a task run

Like flow runs, each task run has its own state lifecycle. Task states provide observability into execution progress and enable sophisticated runtime logic based on upstream outcomes. Like flow runs, each task run can be observed in the Prefect UI or CLI. A normal task run lifecycle looks like this:
Background tasks have an additional stateWhen using .delay(), background tasks start in a Scheduled state before transitioning to Pending. This allows them to be queued and distributed to available workers.

Different ways to create a task run

There are three primary ways to invoke a task, each suited to different execution patterns:

Calling a task directly (__call__)

The simplest way to create a task run is to call a @task decorated function directly, just like a normal Python function. This blocks the calling thread until the task completes and returns the result.
from prefect import task

@task
def add_integers(a: int, b: int) -> int:
    return a + b

if __name__ == "__main__":
    result = add_integers(1, 2)  # blocks until complete, returns 3
Use direct calls when you need the result immediately and don’t require concurrent execution.

Submitting a task (.submit())

Tasks may be submitted to a task runner for concurrent execution. Calling .submit() returns a PrefectFuture immediately, allowing the calling code to continue while the task runs in the background.
from prefect import flow, task

@task
def add_integers(a: int, b: int) -> int:
    return a + b

@flow
def my_flow():
    future = add_integers.submit(1, 2)  # returns immediately
    # do other work...
    result = future.result()  # blocks until complete, returns 3
The future should be resolved (via .result() or .wait()) from the same context that submitted it. Use .submit() when you want to run multiple tasks concurrently within a flow or need fine-grained control over when to wait for results. For more details on concurrent execution patterns, see how to run work concurrently.

Delaying a task (.delay())

When the result of a task is not required by the caller, it may be delayed to run on separate infrastructure by an available task worker. Like .submit(), calling .delay() returns a PrefectFuture immediately, but this future does not need to be resolved from the context that submitted it.
from prefect import task

@task
def send_email(to: str, subject: str):
    # send email...
    pass

# In a web endpoint handler:
future = send_email.delay("user@example.com", "Welcome!")  # returns immediately
# The HTTP response can return without waiting for the email to send
Use .delay() for fire-and-forget scenarios like web applications where you need to dispatch work without blocking the response, or when you want to scale task execution independently from the main application. For implementation details, see how to run background tasks.

Comparison summary

MethodBlocks?ReturnsResolve from same context?Use case
task()YesResultN/ASimple, sequential execution
task.submit()NoPrefectFutureYesConcurrent execution within a flow
task.delay()NoPrefectFutureNoBackground execution on separate infrastructure

Task orchestration model

Client-side orchestration

Prefect tasks are orchestrated client-side, which means that task runs are created and updated locally. This allows for efficient handling of large-scale workflows with many tasks and improves reliability when connectivity fails intermittently. Task updates are logged in batch, leading to eventual consistency for task states in the UI and API queries.

State dependencies

Tasks automatically resolve dependencies based on data flow between them. When a task receives the result or future of an upstream task as input, Prefect establishes an implicit state dependency such that a downstream task cannot begin until the upstream task has Completed. Explicit state dependencies can be introduced with the wait_for parameter.

Parameter resolution

When a task receives parameters, Prefect traverses their contents to resolve any PrefectFuture or State objects nested inside. If a field inside a dataclass or named tuple is resolved to a new value, Prefect reconstructs the container object using dataclasses.replace(), which calls __init__ and __post_init__ again on the new copy.
Pydantic models are reconstructed with model_construct(), which skips __init__ and validators. This behavior only affects dataclasses and named tuples.
This reconstruction can produce unexpected results when __post_init__ has side effects or derives values that are overwritten during the second initialization:
from dataclasses import dataclass

from prefect import flow, task


init_count = 0


@dataclass
class Pipeline:
    base_rate: float
    adjusted_rate: float = None

    def __post_init__(self):
        global init_count
        init_count += 1
        if self.adjusted_rate is None:
            self.adjusted_rate = self.base_rate * 1.5


@task
def get_rate() -> float:
    return 100.0


@task(log_prints=True)
def run_pipeline(pipeline: Pipeline):
    print(f"init_count inside task = {init_count}")
    print(f"adjusted_rate = {pipeline.adjusted_rate}")


@flow(log_prints=True)
def my_flow():
    rate_future = get_rate.submit()
    pipeline = Pipeline(base_rate=42.0, adjusted_rate=rate_future)
    print(f"init_count before task = {init_count}")
    run_pipeline(pipeline)


my_flow()
In this example, the adjusted_rate field holds a PrefectFuture. When pipeline is passed to run_pipeline, Prefect resolves that future to 100.0, which changes the field’s identity and triggers dataclasses.replace(). The replacement calls __post_init__ again—incrementing init_count a second time and, if the guard condition matched, potentially overwriting adjusted_rate. Use the quote and opaque annotations from prefect.utilities.annotations to control this behavior:
AnnotationResolves futuresTraverses contentsUse case
(none)YesYesDefault behavior; works for simple types
quoteNoNoPass a fully resolved object unchanged
opaqueYesNoResolve a future without reconstructing its contents

Skip resolution with quote

Wrap a parameter with quote to prevent Prefect from traversing or reconstructing the object. The task receives the original object unchanged:
from dataclasses import dataclass

from prefect import flow, task
from prefect.utilities.annotations import quote


@dataclass
class Pipeline:
    base_rate: float
    adjusted_rate: float = 0.0

    def __post_init__(self):
        self.adjusted_rate = self.base_rate * 1.5


@task(log_prints=True)
def run_pipeline(pipeline: Pipeline):
    print(f"adjusted_rate = {pipeline.adjusted_rate}")


@flow(log_prints=True)
def my_flow():
    pipeline = Pipeline(base_rate=100.0)
    run_pipeline(quote(pipeline))


my_flow()
Using quote disables automatic resolution of any PrefectFuture or State objects nested inside the wrapped value, and disables task dependency tracking for that parameter. Only use quote when passing a fully resolved object.

Resolve without traversal using opaque

If you need Prefect to resolve a PrefectFuture at the top level but want to skip the recursive traversal into the resolved object’s contents, use opaque. This is useful when chaining .submit() calls and the resolved result is a complex object that should not be reconstructed:
from dataclasses import dataclass

from prefect import flow, task
from prefect.utilities.annotations import opaque


@dataclass
class Pipeline:
    base_rate: float
    adjusted_rate: float = 0.0

    def __post_init__(self):
        self.adjusted_rate = self.base_rate * 1.5


@task
def build_pipeline(base_rate: float) -> Pipeline:
    return Pipeline(base_rate=base_rate)


@task(log_prints=True)
def run_pipeline(pipeline: Pipeline):
    print(f"adjusted_rate = {pipeline.adjusted_rate}")


@flow
def my_flow():
    future = build_pipeline.submit(100.0)
    run_pipeline.submit(opaque(future))


my_flow()

Task composition within flows

Tasks are typically organized into flows to create comprehensive workflows. Each task offers isolated observability within the Prefect UI. Task-level metrics, logs, and state information help identify bottlenecks and troubleshoot issues at a granular level. Tasks can also be reused across multiple flows, promoting consistency and modularity across an organization’s data ecosystem.
How big should a task be?Prefect encourages “small tasks.” As a rule of thumb, each task should represent a logical step or significant “side effect” in your workflow. This allows task-level observability and orchestration to narrate your workflow out-of-the-box.Narrative encapsulation
For detailed configuration options and implementation guidance, see how to write and run workflows.

Background tasks

Background tasks are an alternate task execution model where tasks are submitted in a non-blocking manner by one process and executed by a pool of processes. This execution model is particularly valuable for web applications and workflows that need to dispatch heavy or long-running work without waiting for completion to dedicated, horizontally scaled infrastructure. When a task is executed with .delay(), it pushes the resulting task run onto a server-side topic, which is distributed to an available task worker for execution.
Prefect background tasks can be used in place of tools like Celery and RabbitMQ for task queue functionality.
Background tasks are useful for scenarios such as:
  • Web applications that need to trigger long-running processes without blocking HTTP responses
  • Workflows that dispatch work to specialized infrastructure or resource pools
  • Systems that need to scale task execution independently from the main application
For implementation details, see how to run background tasks.