Documentation Index
Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
Use this file to discover all available pages before exploring further.
View on GitHub
When a worker runs multiple flow runs concurrently, those flow runs share the
worker machine’s resources—CPU, memory, GPU, local software. Some tasks may
need to limit how many can run at once to avoid overloading these resources.
The problem: Using --limit 1 on the worker forces entire flow runs to be
sequential. But often only specific tasks need limits—other tasks could overlap.
The solution: Use Global Concurrency Limits
with worker-specific names. GCLs are coordinated by the Prefect server, so they
work across the separate subprocesses that each flow run executes in.
Example: Image processing with ML inference
Consider a pipeline that processes images through an ML model:
- Download image — network-bound, can run many in parallel
- Run ML model — uses GPU memory, need to limit concurrent runs
- Save results — disk I/O, can run many in parallel
Without limits, if 5 flow runs hit the ML step simultaneously, they’d all try
to load the model into GPU memory and crash. With per-worker limits, only 1-2
run at a time while others wait.
Setup
import os
import time
from prefect import flow, get_run_logger, task
from prefect.concurrency.sync import concurrency
def get_worker_id() -> str:
"""
Get worker identity from environment.
Set this when starting the worker:
WORKER_ID=gpu-1 prefect worker start --pool ml-pool
"""
return os.getenv("WORKER_ID", "default")
Tasks without limits
These tasks don’t contend for limited resources, so they run freely.
@task
def download_image(image_id: int) -> dict:
"""Download an image from storage. Network-bound, no local resource contention."""
logger = get_run_logger()
logger.info(f"Image {image_id}: downloading...")
time.sleep(1) # simulate download
return {"image_id": image_id, "path": f"/tmp/image_{image_id}.jpg"}
@task
def save_results(data: dict) -> str:
"""Save processed results. Fast operation, no limits needed."""
logger = get_run_logger()
logger.info(f"Image {data['image_id']}: saving results...")
time.sleep(0.5)
return f"processed-{data['image_id']}"
Task with per-worker limit
This task uses a local resource (GPU) that can only handle limited concurrent
usage. The limit is scoped to this worker so each machine has independent limits.
@task
def run_ml_model(data: dict) -> dict:
"""
Run image through ML model.
GPU memory is limited—only 1-2 can run at once per worker machine.
Uses a Global Concurrency Limit scoped to this worker's identity.
"""
logger = get_run_logger()
worker_id = get_worker_id()
image_id = data["image_id"]
# Limit key includes worker ID: each worker has its own limit
with concurrency(f"gpu:{worker_id}", occupy=1):
logger.info(f"Image {image_id}: running ML inference (GPU)...")
time.sleep(3) # simulate model inference
return {**data, "predictions": [0.9, 0.1]}
The flow
@flow(log_prints=True)
def process_image(image_id: int = 1) -> str:
"""
Process an image through the ML pipeline.
When multiple instances run concurrently on the same worker:
- download and save tasks overlap freely
- run_ml_model tasks are limited by the per-worker GPU concurrency limit
"""
logger = get_run_logger()
logger.info(f"Processing image {image_id} on worker '{get_worker_id()}'")
image = download_image(image_id)
predictions = run_ml_model(image)
result = save_results(predictions)
return result
Running the example
1. Create a Global Concurrency Limit for each worker
Each worker machine needs its own limit. The limit value controls how many
ML tasks can run simultaneously on that machine.
# GPU machine 1: allow 2 concurrent ML tasks
prefect gcl create gpu:gpu-1 --limit 2
# GPU machine 2: allow 2 concurrent ML tasks
prefect gcl create gpu:gpu-2 --limit 2
2. Create work pool and deploy
prefect work-pool create ml-pool --type process
prefect deploy --all
3. Start workers with unique IDs
Each worker needs a unique ID that matches its GCL name:
# Machine 1
WORKER_ID=gpu-1 prefect worker start --pool ml-pool --limit 10
# Machine 2
WORKER_ID=gpu-2 prefect worker start --pool ml-pool --limit 10
The --limit 10 allows up to 10 concurrent flow runs, but the GCL ensures
only 2 are in the ML step at any time.
4. Submit jobs
for i in {1..20}; do
prefect deployment run process-image/process-image --param image_id=$i --timeout 0
done
What you’ll see
With 10 concurrent flow runs on a worker:
- Download tasks from all 10 start immediately and overlap
- ML tasks queue up—only 2 run at a time (per the GCL limit)
- Save tasks run as soon as their ML task completes
Flow runs aren’t blocked entirely—just the resource-intensive step is limited.
This maximizes throughput while protecting the GPU from overload.
Why this works
-
GCLs are server-coordinated — The Prefect server tracks who holds what
limit. It doesn’t matter that flow runs are separate processes.
-
Worker-specific names — By including
worker_id in the limit name,
each worker machine has independent limits. GPU-1’s limit doesn’t affect GPU-2.
-
Selective application — Only the tasks that need limits acquire them.
Everything else runs at full concurrency.
Adapting this pattern
The same pattern works for any local resource constraint:
- Software licenses: A tool that only allows N concurrent instances
- Memory-intensive processing: Limit concurrent jobs to avoid OOM
- Disk I/O: Limit concurrent writes to a local SSD
- Local services: A sidecar database with connection limits
Just change the limit name and value to match your constraint.
if __name__ == "__main__":
process_image(image_id=1)