This example demonstrates how to use background tasks in the context of a web application using Prefect for task submission, execution, monitoring, and result storage. We’ll build out an application using FastAPI to offer API endpoints to our clients, and task workers to execute the background tasks these endpoints defer.

Refer to the examples repository for the complete example’s source code.

This pattern is useful when you need to perform operations that are too long for a standard web request-response cycle, such as data processing, sending emails, or interacting with external APIs that might be slow.

Overview

This example will build out:

  • @prefect.task definitions representing the work you want to run in the background
  • A fastapi application providing API endpoints to:
    • Receive task parameters via POST request and submit the task to Prefect with .delay()
    • Allow polling for the task’s status via a GET request using its task_run_id
  • A Dockerfile to build a multi-stage image for the web app, Prefect server and task worker(s)
  • A compose.yaml to manage lifecycles of the web app, Prefect server and task worker(s)
├── Dockerfile
├── README.md
├── compose.yaml
├── pyproject.toml
├── src
│   └── foo
│       ├── __init__.py
│       ├── _internal/*.py
│       ├── api.py
│       └── task.py

You can follow along by cloning the examples repository or instead use uv to bootstrap a your own new project:

uv init --lib foo
uv add prefect marvin

This example application is structured as a library with a src/foo directory for portability and organization.

This example does not require:

  • Prefect Cloud
  • creating a Prefect Deployment
  • creating a work pool

Useful things to remember

  • You can call any Python code from your task definitions (including other flows and tasks!)
  • Prefect Results allow you to save/serialize the return value of your task definitions to your result storage (e.g. a local directory, S3, GCS, etc), enabling caching and idempotency.

Defining the background task

The core of the background processing is a Python function decorated with @prefect.task. This marks the function as a unit of work that Prefect can manage (e.g. observe, cache, retry, etc.)

src/foo/task.py
from typing import Any, TypeVar

import marvin

from prefect import task, Task
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.states import State
from prefect.task_worker import serve
from prefect.client.schemas.objects import TaskRun

T = TypeVar("T")


def _print_output(task: Task, task_run: TaskRun, state: State[T]):
    result = state.result()
    print(f"result type: {type(result)}")
    print(f"result: {result!r}")


@task(cache_policy=INPUTS + TASK_SOURCE, on_completion=[_print_output])
async def create_structured_output(data: Any, target: type[T], instructions: str) -> T:
    return await marvin.cast_async(
        data,
        target=target,
        instructions=instructions,
    )


def main():
    serve(create_structured_output)


if __name__ == "__main__":
    main()

Key details:

  • @task: Decorator to define our task we want to run in the background.
  • cache_policy: Caching based on INPUTS and TASK_SOURCE.
  • serve(create_structured_output): This function starts a task worker subscribed to newly delay()ed task runs.

Building the FastAPI application

The FastAPI application provides API endpoints to trigger the background task and check its status.

src/foo/api.py
import logging
from uuid import UUID

from fastapi import Depends, FastAPI, Response
from fastapi.responses import JSONResponse

from foo._internal import get_form_data, get_task_result, StructuredOutputRequest
from foo.task import create_structured_output

logger = logging.getLogger(__name__)

app = FastAPI()


@app.post("/tasks", status_code=202)
async def submit_task(
    form_data: StructuredOutputRequest = Depends(get_form_data),
) -> JSONResponse:
    """Submit a task to Prefect for background execution."""
    future = create_structured_output.delay(
        form_data.payload,
        target=form_data.target_type,
        instructions=form_data.instructions,
    )
    logger.info(f"Submitted task run: {future.task_run_id}")
    return {"task_run_id": str(future.task_run_id)}


@app.get("/tasks/{task_run_id}/status")
async def get_task_status_api(task_run_id: UUID) -> Response:
    """Checks the status of a submitted task run."""
    status, data = await get_task_result(task_run_id)

    response_data = {"task_run_id": str(task_run_id), "status": status}
    http_status_code = 200

    if status == "completed":
        response_data["result"] = data
    elif status == "error":
        response_data["message"] = data
        # Optionally set a different HTTP status for errors
    return JSONResponse(response_data, status_code=http_status_code)

Building the Docker Image

A multi-stage Dockerfile is used to create optimized images for each service (Prefect server, task worker, and web API). This approach helps keep image sizes small and separates build dependencies from runtime dependencies.

Dockerfile
# Stage 1: Base image with Python and uv
FROM --platform=linux/amd64 ghcr.io/astral-sh/uv:python3.12-bookworm-slim as base

WORKDIR /app

ENV UV_SYSTEM_PYTHON=1
ENV PATH="/root/.local/bin:$PATH"

COPY pyproject.toml uv.lock* ./

# Note: We install all dependencies needed for all stages here.
# A more optimized approach might separate dependencies per stage.
RUN --mount=type=cache,target=/root/.cache/uv \
    uv pip install --system -r pyproject.toml

COPY src/ /app/src

FROM base as server

CMD ["prefect", "server", "start"]

# --- Task Worker Stage --- #
FROM base as task

# Command to start the task worker by running the task script
# This script should call `prefect.task_worker.serve(...)`
CMD ["python", "src/foo/task.py"]

# --- API Stage --- #
FROM base as api

# Command to start the FastAPI server using uvicorn
CMD ["uvicorn", "src.foo.api:app", "--host", "0.0.0.0", "--port", "8000"]

Declaring the application services

We use compose.yaml to define and run the multi-container application, managing the lifecycles of the FastAPI web server, the Prefect API server, database and task worker(s).

compose.yaml
services:

  prefect-server:
    build:
      context: .
      target: server
    ports:
      - "4200:4200"
    volumes:
      - prefect-data:/root/.prefect # Persist Prefect DB
    environment:
      # Allow connections from other containers
      PREFECT_SERVER_API_HOST: 0.0.0.0
  # Task Worker
  task:
    build:
      context: .
      target:
    deploy:
      replicas: 1 # task workers are safely horizontally scalable (think redis stream consumer groups)
    volumes:
      # Mount storage for results
      - ./task-storage:/task-storage
    depends_on:
      prefect-server:
        condition: service_started
    environment:
      PREFECT_API_URL: http://prefect-server:4200/api
      PREFECT_LOCAL_STORAGE_PATH: /task-storage
      PREFECT_LOGGING_LOG_PRINTS: "true"
      PREFECT_RESULTS_PERSIST_BY_DEFAULT: "true"
      MARVIN_ENABLE_DEFAULT_PRINT_HANDLER: "false"
      OPENAI_API_KEY: ${OPENAI_API_KEY}

    develop:
      # Optionally watch for code changes for development
      watch:
        - action: sync
          path: .
          target: /app
          ignore:
            - .venv/
            - task-storage/
        - action: rebuild
          path: uv.lock

  api:
    build:
      context: .
      target: api
    volumes:
      # Mount storage for results
      - ./task-storage:/task-storage
    ports:
      - "8000:8000"
    depends_on:
      task:
        condition: service_started
      prefect-server:
        condition: service_started
    environment:
      PREFECT_API_URL: http://prefect-server:4200/api
      PREFECT_LOCAL_STORAGE_PATH: /task-storage
    develop:
      # Optionally watch for code changes for development
      watch:
        - action: sync
          path: .
          target: /app
          ignore:
            - .venv/
            - task-storage/
        - action: rebuild
          path: uv.lock

volumes:
  # Named volumes for data persistence
  prefect-data: {}
  task-storage: {}

In a production use-case, you’d likely want to:

Running this example

Assuming you have obtained the code (either by cloning the repository or using uv init as described previously) and are in the project directory:

  1. Prerequisites: Ensure Docker Desktop (or equivalent) with docker compose support is running.

  2. Build and Run Services: This example’s task uses marvin, which (by default) requires an OpenAI API key. Provide it as an environment variable when starting the services:

    OPENAI_API_KEY=<your-openai-api-key> docker compose up --build --watch
    

    This command will:

    • --build: Build the container images if they don’t exist or if the Dockerfile/context has changed.
    • --watch: Watch for changes in the project source code and automatically sync/rebuild services (useful for development).
    • Add --detach or -d to run the containers in the background.
  3. Access Services:

Cleaning up

docker compose down

# also remove the named volumes
docker compose down -v

Next Steps

This example provides a repeatable pattern for integrating Prefect-managed background tasks with any python web application. You can:

  • Explore the background tasks examples repository for more examples.
  • Adapt src/**/*.py to define and submit your specific web app and background tasks.
  • Configure Prefect settings (environment variables in compose.yaml) further, for example, using different result storage or logging levels.
  • Deploy these services to cloud infrastructure using managed container services.