This example demonstrates how to use background tasks in the context of a web application using Prefect for task submission, execution, monitoring, and result storage. We’ll build out an application using FastAPI to offer API endpoints to our clients, and task workers to execute the background tasks these endpoints defer.
Refer to the examples repository for the complete example’s source code.
This pattern is useful when you need to perform operations that are too long for a standard web request-response cycle, such as data processing, sending emails, or interacting with external APIs that might be slow.

Overview

This example will build out:
  • @prefect.task definitions representing the work you want to run in the background
  • A fastapi application providing API endpoints to:
    • Receive task parameters via POST request and submit the task to Prefect with .delay()
    • Allow polling for the task’s status via a GET request using its task_run_id
  • A Dockerfile to build a multi-stage image for the web app, Prefect server and task worker(s)
  • A compose.yaml to manage lifecycles of the web app, Prefect server and task worker(s)
├── Dockerfile
├── README.md
├── compose.yaml
├── pyproject.toml
├── src
   └── foo
       ├── __init__.py
       ├── _internal/*.py
       ├── api.py
       └── task.py
You can follow along by cloning the examples repository or instead use uv to bootstrap a your own new project:
uv init --lib foo
uv add prefect marvin
This example application is structured as a library with a src/foo directory for portability and organization.
This example does not require:
  • Prefect Cloud
  • creating a Prefect Deployment
  • creating a work pool

Useful things to remember

  • You can call any Python code from your task definitions (including other flows and tasks!)
  • Prefect Results allow you to save/serialize the return value of your task definitions to your result storage (e.g. a local directory, S3, GCS, etc), enabling caching and idempotency.

Defining the background task

The core of the background processing is a Python function decorated with @prefect.task. This marks the function as a unit of work that Prefect can manage (e.g. observe, cache, retry, etc.)
src/foo/task.py
from typing import Any, TypeVar

import marvin

from prefect import task, Task
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.states import State
from prefect.task_worker import serve
from prefect.client.schemas.objects import TaskRun

T = TypeVar("T")


def _print_output(task: Task, task_run: TaskRun, state: State[T]):
    result = state.result()
    print(f"result type: {type(result)}")
    print(f"result: {result!r}")


@task(cache_policy=INPUTS + TASK_SOURCE, on_completion=[_print_output])
async def create_structured_output(data: Any, target: type[T], instructions: str) -> T:
    return await marvin.cast_async(
        data,
        target=target,
        instructions=instructions,
    )


def main():
    serve(create_structured_output)


if __name__ == "__main__":
    main()
Key details:
  • @task: Decorator to define our task we want to run in the background.
  • cache_policy: Caching based on INPUTS and TASK_SOURCE.
  • serve(create_structured_output): This function starts a task worker subscribed to newly delay()ed task runs.

Building the FastAPI application

The FastAPI application provides API endpoints to trigger the background task and check its status.
src/foo/api.py
import logging
from uuid import UUID

from fastapi import Depends, FastAPI, Response
from fastapi.responses import JSONResponse

from foo._internal import get_form_data, get_task_result, StructuredOutputRequest
from foo.task import create_structured_output

logger = logging.getLogger(__name__)

app = FastAPI()


@app.post("/tasks", status_code=202)
async def submit_task(
    form_data: StructuredOutputRequest = Depends(get_form_data),
) -> JSONResponse:
    """Submit a task to Prefect for background execution."""
    future = create_structured_output.delay(
        form_data.payload,
        target=form_data.target_type,
        instructions=form_data.instructions,
    )
    logger.info(f"Submitted task run: {future.task_run_id}")
    return {"task_run_id": str(future.task_run_id)}


@app.get("/tasks/{task_run_id}/status")
async def get_task_status_api(task_run_id: UUID) -> Response:
    """Checks the status of a submitted task run."""
    status, data = await get_task_result(task_run_id)

    response_data = {"task_run_id": str(task_run_id), "status": status}
    http_status_code = 200

    if status == "completed":
        response_data["result"] = data
    elif status == "error":
        response_data["message"] = data
        # Optionally set a different HTTP status for errors
    return JSONResponse(response_data, status_code=http_status_code)
The get_task_result helper function (in src/foo/_internal/_prefect.py) uses the Prefect Python client to interact with the Prefect API:
src/foo/_internal/_prefect.py
from typing import Any, Literal, cast
from uuid import UUID


from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import TaskRun
from prefect.logging import get_logger

logger = get_logger(__name__)

Status = Literal["completed", "pending", "error"]


def _any_task_run_result(task_run: TaskRun) -> Any:
    try:
        return cast(Any, task_run.state.result(_sync=True))  # type: ignore
    except Exception as e:
        logger.warning(f"Could not retrieve result for task run {task_run.id}: {e}")
        return None


async def get_task_result(task_run_id: UUID) -> tuple[Status, Any]:
    """Get task result or status.

    Returns:
        tuple: (status, data)
            status: "completed", "pending", or "error"
            data: the result if completed, error message if error, None if pending
    """
    try:
        async with get_client() as client:
            task_run = await client.read_task_run(task_run_id)
            if not task_run.state:
                return "pending", None

            if task_run.state.is_completed():
                try:
                    result = _any_task_run_result(task_run)
                    return "completed", result
                except Exception as e:
                    logger.warning(
                        f"Could not retrieve result for completed task run {task_run_id}: {e}"
                    )
                    return "completed", "<Could not retrieve result>"

            elif task_run.state.is_failed():
                try:
                    error_result = _any_task_run_result(task_run)
                    error_message = (
                        str(error_result)
                        if error_result
                        else "Task failed without specific error message."
                    )
                    return "error", error_message
                except Exception as e:
                    logger.warning(
                        f"Could not retrieve error result for failed task run {task_run_id}: {e}"
                    )
                    return "error", "<Could not retrieve error message>"

            else:
                return "pending", None

    except Exception as e:
        logger.error(f"Error checking task status for {task_run_id}: {e}")
        return "error", f"Failed to check task status: {str(e)}"
This function fetches the TaskRun object from the API and checks its state to determine if it’s Completed, Failed, or still Pending/Running. If completed, it attempts to retrieve the result using task_run.state.result(). If failed, it tries to get the error message.

Building the Docker Image

A multi-stage Dockerfile is used to create optimized images for each service (Prefect server, task worker, and web API). This approach helps keep image sizes small and separates build dependencies from runtime dependencies.
Dockerfile
# Stage 1: Base image with Python and uv
FROM --platform=linux/amd64 ghcr.io/astral-sh/uv:python3.12-bookworm-slim as base

WORKDIR /app

ENV UV_SYSTEM_PYTHON=1
ENV PATH="/root/.local/bin:$PATH"

COPY pyproject.toml uv.lock* ./

# Note: We install all dependencies needed for all stages here.
# A more optimized approach might separate dependencies per stage.
RUN --mount=type=cache,target=/root/.cache/uv \
    uv pip install --system -r pyproject.toml

COPY src/ /app/src

FROM base as server

CMD ["prefect", "server", "start"]

# --- Task Worker Stage --- #
FROM base as task

# Command to start the task worker by running the task script
# This script should call `prefect.task_worker.serve(...)`
CMD ["python", "src/foo/task.py"]

# --- API Stage --- #
FROM base as api

# Command to start the FastAPI server using uvicorn
CMD ["uvicorn", "src.foo.api:app", "--host", "0.0.0.0", "--port", "8000"]

  • Base Stage (base): Sets up Python, uv, installs all dependencies from pyproject.toml into a base layer to make use of Docker caching, and copies the source code.
  • Server Stage (server): Builds upon the base stage. Sets the default command (CMD) to start the Prefect server.
  • Task Worker Stage (task): Builds upon the base stage. Sets the CMD to run the src/foo/task.py script, which is expected to contain the serve() call for the task(s).
  • API Stage (api): Builds upon the base stage. Sets the CMD to start the FastAPI application using uvicorn.
The compose.yaml file then uses the target build argument to specify which of these final stages (server, task, api) to use for each service container.

Declaring the application services

We use compose.yaml to define and run the multi-container application, managing the lifecycles of the FastAPI web server, the Prefect API server, database and task worker(s).
compose.yaml
services:

  prefect-server:
    build:
      context: .
      target: server
    ports:
      - "4200:4200"
    volumes:
      - prefect-data:/root/.prefect # Persist Prefect DB
    environment:
      # Allow connections from other containers
      PREFECT_SERVER_API_HOST: 0.0.0.0
  # Task Worker
  task:
    build:
      context: .
      target:
    deploy:
      replicas: 1 # task workers are safely horizontally scalable (think redis stream consumer groups)
    volumes:
      # Mount storage for results
      - ./task-storage:/task-storage
    depends_on:
      prefect-server:
        condition: service_started
    environment:
      PREFECT_API_URL: http://prefect-server:4200/api
      PREFECT_LOCAL_STORAGE_PATH: /task-storage
      PREFECT_LOGGING_LOG_PRINTS: "true"
      PREFECT_RESULTS_PERSIST_BY_DEFAULT: "true"
      MARVIN_ENABLE_DEFAULT_PRINT_HANDLER: "false"
      OPENAI_API_KEY: ${OPENAI_API_KEY}

    develop:
      # Optionally watch for code changes for development
      watch:
        - action: sync
          path: .
          target: /app
          ignore:
            - .venv/
            - task-storage/
        - action: rebuild
          path: uv.lock

  api:
    build:
      context: .
      target: api
    volumes:
      # Mount storage for results
      - ./task-storage:/task-storage
    ports:
      - "8000:8000"
    depends_on:
      task:
        condition: service_started
      prefect-server:
        condition: service_started
    environment:
      PREFECT_API_URL: http://prefect-server:4200/api
      PREFECT_LOCAL_STORAGE_PATH: /task-storage
    develop:
      # Optionally watch for code changes for development
      watch:
        - action: sync
          path: .
          target: /app
          ignore:
            - .venv/
            - task-storage/
        - action: rebuild
          path: uv.lock

volumes:
  # Named volumes for data persistence
  prefect-data: {}
  task-storage: {}
In a production use-case, you’d likely want to:
  • prefect-server: Runs the Prefect API server and UI.
    • build: Uses a multi-stage Dockerfile (not shown here, but present in the example repo) targeting the server stage.
    • ports: Exposes the Prefect API/UI on port 4200.
    • volumes: Uses a named volume prefect-data to persist the Prefect SQLite database (/root/.prefect/prefect.db) across container restarts.
    • PREFECT_SERVER_API_HOST=0.0.0.0: Makes the API server listen on all interfaces within the Docker network, allowing the task and api services to connect.
  • task: Runs the Prefect task worker process (executing python src/foo/task.py which calls serve).
    • build: Uses the task stage from the Dockerfile.
    • depends_on: Ensures the prefect-server service is started before this service attempts to connect.
    • PREFECT_API_URL: Crucial setting that tells the worker where to find the Prefect API to poll for submitted task runs.
    • PREFECT_LOCAL_STORAGE_PATH=/task-storage: Configures the worker to store task run results in the /task-storage directory inside the container. This path is mounted to the host using the task-storage named volume via volumes: - ./task-storage:/task-storage (or just task-storage: if using a named volume without a host path binding).
    • PREFECT_RESULTS_PERSIST_BY_DEFAULT=true: Tells Prefect tasks to automatically save their results using the configured storage (defined by PREFECT_LOCAL_STORAGE_PATH in this case).
    • PREFECT_LOGGING_LOG_PRINTS=true: Configures the Prefect logger to capture output from print() statements within tasks.
    • OPENAI_API_KEY=${OPENAI_API_KEY}: Passes secrets needed by the task code from the host environment (via a .env file loaded by Docker Compose) into the container’s environment.
  • api: Runs the FastAPI web application.
    • build: Uses the api stage from the Dockerfile.
    • depends_on: Waits for the prefect-server (required for submitting tasks and checking status) and optionally the task worker.
    • PREFECT_API_URL: Tells the FastAPI application where to send .delay() calls and status check requests.
    • PREFECT_LOCAL_STORAGE_PATH: May be needed if the API itself needs to directly read result files (though typically fetching results via task_run.state.result() is preferred).
  • volumes: Defines named volumes (prefect-data, task-storage) to persist data generated by the containers.

Running this example

Assuming you have obtained the code (either by cloning the repository or using uv init as described previously) and are in the project directory:
  1. Prerequisites: Ensure Docker Desktop (or equivalent) with docker compose support is running.
  2. Build and Run Services: This example’s task uses marvin, which (by default) requires an OpenAI API key. Provide it as an environment variable when starting the services:
    OPENAI_API_KEY=<your-openai-api-key> docker compose up --build --watch
    
    This command will:
    • --build: Build the container images if they don’t exist or if the Dockerfile/context has changed.
    • --watch: Watch for changes in the project source code and automatically sync/rebuild services (useful for development).
    • Add --detach or -d to run the containers in the background.
  3. Access Services:

Cleaning up

docker compose down

# also remove the named volumes
docker compose down -v

Next Steps

This example provides a repeatable pattern for integrating Prefect-managed background tasks with any python web application. You can:
  • Explore the background tasks examples repository for more examples.
  • Adapt src/**/*.py to define and submit your specific web app and background tasks.
  • Configure Prefect settings (environment variables in compose.yaml) further, for example, using different result storage or logging levels.
  • Deploy these services to cloud infrastructure using managed container services.