Deploy a web application powered by background tasks
Learn how to background heavy tasks from a web application to dedicated infrastructure.
This example demonstrates how to use background tasks in the context of a web application using Prefect for task submission, execution, monitoring, and result storage. We’ll build out an application using FastAPI to offer API endpoints to our clients, and task workers to execute the background tasks these endpoints defer.
This pattern is useful when you need to perform operations that are too long for a standard web request-response cycle, such as data processing, sending emails, or interacting with external APIs that might be slow.
You can call any Python code from your task definitions (including other flows and tasks!)
Prefect Results allow you to save/serialize the return value of your task definitions to your result storage (e.g. a local directory, S3, GCS, etc), enabling caching and idempotency.
The core of the background processing is a Python function decorated with @prefect.task. This marks the function as a unit of work that Prefect can manage (e.g. observe, cache, retry, etc.)
The FastAPI application provides API endpoints to trigger the background task and check its status.
src/foo/api.py
import loggingfrom uuid import UUIDfrom fastapi import Depends, FastAPI, Responsefrom fastapi.responses import JSONResponsefrom foo._internal import get_form_data, get_task_result, StructuredOutputRequestfrom foo.task import create_structured_outputlogger = logging.getLogger(__name__)app = FastAPI()@app.post("/tasks", status_code=202)async def submit_task( form_data: StructuredOutputRequest = Depends(get_form_data),) -> JSONResponse: """Submit a task to Prefect for background execution.""" future = create_structured_output.delay( form_data.payload, target=form_data.target_type, instructions=form_data.instructions, ) logger.info(f"Submitted task run: {future.task_run_id}") return {"task_run_id": str(future.task_run_id)}@app.get("/tasks/{task_run_id}/status")async def get_task_status_api(task_run_id: UUID) -> Response: """Checks the status of a submitted task run.""" status, data = await get_task_result(task_run_id) response_data = {"task_run_id": str(task_run_id), "status": status} http_status_code = 200 if status == "completed": response_data["result"] = data elif status == "error": response_data["message"] = data # Optionally set a different HTTP status for errors return JSONResponse(response_data, status_code=http_status_code)
The get_task_result helper function (in src/foo/_internal/_prefect.py) uses the Prefect Python client to interact with the Prefect API:
src/foo/_internal/_prefect.py
from typing import Any, Literal, castfrom uuid import UUIDfrom prefect.client.orchestration import get_clientfrom prefect.client.schemas.objects import TaskRunfrom prefect.logging import get_loggerlogger = get_logger(__name__)Status = Literal["completed", "pending", "error"]def _any_task_run_result(task_run: TaskRun) -> Any: try: return cast(Any, task_run.state.result(_sync=True)) # type: ignore except Exception as e: logger.warning(f"Could not retrieve result for task run {task_run.id}: {e}") return Noneasync def get_task_result(task_run_id: UUID) -> tuple[Status, Any]: """Get task result or status. Returns: tuple: (status, data) status: "completed", "pending", or "error" data: the result if completed, error message if error, None if pending """ try: async with get_client() as client: task_run = await client.read_task_run(task_run_id) if not task_run.state: return "pending", None if task_run.state.is_completed(): try: result = _any_task_run_result(task_run) return "completed", result except Exception as e: logger.warning( f"Could not retrieve result for completed task run {task_run_id}: {e}" ) return "completed", "<Could not retrieve result>" elif task_run.state.is_failed(): try: error_result = _any_task_run_result(task_run) error_message = ( str(error_result) if error_result else "Task failed without specific error message." ) return "error", error_message except Exception as e: logger.warning( f"Could not retrieve error result for failed task run {task_run_id}: {e}" ) return "error", "<Could not retrieve error message>" else: return "pending", None except Exception as e: logger.error(f"Error checking task status for {task_run_id}: {e}") return "error", f"Failed to check task status: {str(e)}"
This function fetches the TaskRun object from the API and checks its state to determine if it’s Completed, Failed, or still Pending/Running. If completed, it attempts to retrieve the result using task_run.state.result(). If failed, it tries to get the error message.
A multi-stage Dockerfile is used to create optimized images for each service (Prefect server, task worker, and web API). This approach helps keep image sizes small and separates build dependencies from runtime dependencies.
Dockerfile
# Stage 1: Base image with Python and uvFROM --platform=linux/amd64 ghcr.io/astral-sh/uv:python3.12-bookworm-slim as baseWORKDIR /appENV UV_SYSTEM_PYTHON=1ENV PATH="/root/.local/bin:$PATH"COPY pyproject.toml uv.lock* ./# Note: We install all dependencies needed for all stages here.# A more optimized approach might separate dependencies per stage.RUN --mount=type=cache,target=/root/.cache/uv \ uv pip install --system -r pyproject.tomlCOPY src/ /app/srcFROM base as serverCMD ["prefect", "server", "start"]# --- Task Worker Stage --- #FROM base as task# Command to start the task worker by running the task script# This script should call `prefect.task_worker.serve(...)`CMD ["python", "src/foo/task.py"]# --- API Stage --- #FROM base as api# Command to start the FastAPI server using uvicornCMD ["uvicorn", "src.foo.api:app", "--host", "0.0.0.0", "--port", "8000"]
Base Stage (base): Sets up Python, uv, installs all dependencies from pyproject.toml into a base layer to make use of Docker caching, and copies the source code.
Server Stage (server): Builds upon the base stage. Sets the default command (CMD) to start the Prefect server.
Task Worker Stage (task): Builds upon the base stage. Sets the CMD to run the src/foo/task.py script, which is expected to contain the serve() call for the task(s).
API Stage (api): Builds upon the base stage. Sets the CMD to start the FastAPI application using uvicorn.
The compose.yaml file then uses the target build argument to specify which of these final stages (server, task, api) to use for each service container.
We use compose.yaml to define and run the multi-container application, managing the lifecycles of the FastAPI web server, the Prefect API server, database and task worker(s).
compose.yaml
services: prefect-server: build: context: . target: server ports: - "4200:4200" volumes: - prefect-data:/root/.prefect # Persist Prefect DB environment: # Allow connections from other containers PREFECT_SERVER_API_HOST: 0.0.0.0 # Task Worker task: build: context: . target: deploy: replicas: 1 # task workers are safely horizontally scalable (think redis stream consumer groups) volumes: # Mount storage for results - ./task-storage:/task-storage depends_on: prefect-server: condition: service_started environment: PREFECT_API_URL: http://prefect-server:4200/api PREFECT_LOCAL_STORAGE_PATH: /task-storage PREFECT_LOGGING_LOG_PRINTS: "true" PREFECT_RESULTS_PERSIST_BY_DEFAULT: "true" MARVIN_ENABLE_DEFAULT_PRINT_HANDLER: "false" OPENAI_API_KEY: ${OPENAI_API_KEY} develop: # Optionally watch for code changes for development watch: - action: sync path: . target: /app ignore: - .venv/ - task-storage/ - action: rebuild path: uv.lock api: build: context: . target: api volumes: # Mount storage for results - ./task-storage:/task-storage ports: - "8000:8000" depends_on: task: condition: service_started prefect-server: condition: service_started environment: PREFECT_API_URL: http://prefect-server:4200/api PREFECT_LOCAL_STORAGE_PATH: /task-storage develop: # Optionally watch for code changes for development watch: - action: sync path: . target: /app ignore: - .venv/ - task-storage/ - action: rebuild path: uv.lockvolumes: # Named volumes for data persistence prefect-data: {} task-storage: {}
remove the hot-reloading configuration in the develop section
prefect-server: Runs the Prefect API server and UI.
build: Uses a multi-stage Dockerfile (not shown here, but present in the example repo) targeting the server stage.
ports: Exposes the Prefect API/UI on port 4200.
volumes: Uses a named volume prefect-data to persist the Prefect SQLite database (/root/.prefect/prefect.db) across container restarts.
PREFECT_SERVER_API_HOST=0.0.0.0: Makes the API server listen on all interfaces within the Docker network, allowing the task and api services to connect.
task: Runs the Prefect task worker process (executing python src/foo/task.py which calls serve).
build: Uses the task stage from the Dockerfile.
depends_on: Ensures the prefect-server service is started before this service attempts to connect.
PREFECT_API_URL: Crucial setting that tells the worker where to find the Prefect API to poll for submitted task runs.
PREFECT_LOCAL_STORAGE_PATH=/task-storage: Configures the worker to store task run results in the /task-storage directory inside the container. This path is mounted to the host using the task-storage named volume via volumes: - ./task-storage:/task-storage (or just task-storage: if using a named volume without a host path binding).
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true: Tells Prefect tasks to automatically save their results using the configured storage (defined by PREFECT_LOCAL_STORAGE_PATH in this case).
PREFECT_LOGGING_LOG_PRINTS=true: Configures the Prefect logger to capture output from print() statements within tasks.
OPENAI_API_KEY=${OPENAI_API_KEY}: Passes secrets needed by the task code from the host environment (via a .env file loaded by Docker Compose) into the container’s environment.
api: Runs the FastAPI web application.
build: Uses the api stage from the Dockerfile.
depends_on: Waits for the prefect-server (required for submitting tasks and checking status) and optionally the task worker.
PREFECT_API_URL: Tells the FastAPI application where to send .delay() calls and status check requests.
PREFECT_LOCAL_STORAGE_PATH: May be needed if the API itself needs to directly read result files (though typically fetching results via task_run.state.result() is preferred).
volumes: Defines named volumes (prefect-data, task-storage) to persist data generated by the containers.
Assuming you have obtained the code (either by cloning the repository or using uv init as described previously) and are in the project directory:
Prerequisites: Ensure Docker Desktop (or equivalent) with docker compose support is running.
Build and Run Services:
This example’s task uses marvin, which (by default) requires an OpenAI API key. Provide it as an environment variable when starting the services:
OPENAI_API_KEY=<your-openai-api-key> docker compose up --build --watch
This command will:
--build: Build the container images if they don’t exist or if the Dockerfile/context has changed.
--watch: Watch for changes in the project source code and automatically sync/rebuild services (useful for development).
Add --detach or -d to run the containers in the background.