Use task tags to limit how many tasks can run at the same time. This is useful when tasks interact with shared resources like databases or external APIs.
from prefect import flow, task

@task(tags=["database"])
def fetch_data():
    # This task will respect the "database" tag's concurrency limit
    return "data"

@flow
def my_workflow():
    fetch_data()
For more details on how tag-based concurrency limits work, see the tag-based concurrency limits concept page.
As of Prefect 3.4.19, tag-based concurrency limits are backed by global concurrency limits. When you create a tag-based limit, you’ll see a corresponding global concurrency limit with the name tag:{tag_name}.

Set concurrency limits on tags

Tags without concurrency limits allow unlimited concurrent runs. Once you set a limit, only that many tasks with the tag can run simultaneously.
from prefect import flow, task

@task(tags=["database"])
def query_database(query: str):
    # Simulate database work
    return f"Results for {query}"

@flow
def data_pipeline():
    # These will respect the "database" tag limit
    query_database("SELECT * FROM users")
    query_database("SELECT * FROM orders")
    query_database("SELECT * FROM products")

Apply multiple tags to a task

When a task has multiple tags, it must have available concurrency slots for all tags to run.
from prefect import task

@task(tags=["database", "analytics"])
def complex_query():
    # This task needs available slots in both "database" AND "analytics" limits
    return "complex results"

Configure concurrency limits

You can configure limits using the CLI, Python client, API, Terraform, or the Prefect UI.

CLI

# Set a limit of 10 for the "database" tag
prefect concurrency-limit create database 10

# View all concurrency limits
prefect concurrency-limit ls

# View details about a specific tag's limit
prefect concurrency-limit inspect database

# Delete a concurrency limit
prefect concurrency-limit delete database

Python client

from prefect import get_client

async with get_client() as client:
    # Set a concurrency limit of 10 on the "database" tag
    await client.create_concurrency_limit(
        tag="database",
        concurrency_limit=10
    )

    # Read current limit for a tag
    limit = await client.read_concurrency_limit_by_tag(tag="database")

    # Delete a concurrency limit
    await client.delete_concurrency_limit_by_tag(tag="database")

    # View all concurrency limits
    limits = await client.read_concurrency_limits()

API

# Create a concurrency limit
curl -X POST "http://localhost:4200/api/concurrency_limits/" \
  -H "Content-Type: application/json" \
  -d '{"tag": "database", "concurrency_limit": 10}'

# Get all concurrency limits
curl "http://localhost:4200/api/concurrency_limits/"

Terraform

resource "prefect_concurrency_limit" "database_limit" {
  tag               = "database"
  concurrency_limit = 10
}

Configure globally

When a task is delayed due to a concurrency limit, it will wait for a set wait time before retrying. You can set the wait time for when tasks are delayed due to concurrency limits:
prefect config set PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=60
Note: this setting needs to be set on the Prefect server, not the Prefect client.