Documentation Index
Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
Use this file to discover all available pages before exploring further.
Use task tags to limit how many tasks can run at the same time. This is useful when tasks interact with shared resources like databases or external APIs.
from prefect import flow, task
@task(tags=["database"])
def fetch_data():
# This task will respect the "database" tag's concurrency limit
return "data"
@flow
def my_workflow():
fetch_data()
For more details on how tag-based concurrency limits work, see the tag-based concurrency limits concept page.
As of Prefect 3.4.19, tag-based concurrency limits are backed by global concurrency limits. When you create a tag-based limit, you’ll see a corresponding global concurrency limit with the name tag:{tag_name}.
Tags without concurrency limits allow unlimited concurrent runs. Once you set a limit, only that many tasks with the tag can run simultaneously.
from prefect import flow, task
@task(tags=["database"])
def query_database(query: str):
# Simulate database work
return f"Results for {query}"
@flow
def data_pipeline():
# These will respect the "database" tag limit
query_database("SELECT * FROM users")
query_database("SELECT * FROM orders")
query_database("SELECT * FROM products")
When a task has multiple tags, it must have available concurrency slots for all tags to run.
from prefect import task
@task(tags=["database", "analytics"])
def complex_query():
# This task needs available slots in both "database" AND "analytics" limits
return "complex results"
You can configure limits using the CLI, Python client, API, Terraform, or the Prefect UI.
CLI
# Set a limit of 10 for the "database" tag
prefect concurrency-limit create database 10
# View all concurrency limits
prefect concurrency-limit ls
# View details about a specific tag's limit
prefect concurrency-limit inspect database
# Delete a concurrency limit
prefect concurrency-limit delete database
Python client
from prefect import get_client
async with get_client() as client:
# Set a concurrency limit of 10 on the "database" tag
await client.create_concurrency_limit(
tag="database",
concurrency_limit=10
)
# Read current limit for a tag
limit = await client.read_concurrency_limit_by_tag(tag="database")
# Delete a concurrency limit
await client.delete_concurrency_limit_by_tag(tag="database")
# View all concurrency limits
limits = await client.read_concurrency_limits()
API
# Create a concurrency limit
curl -X POST "http://localhost:4200/api/concurrency_limits/" \
-H "Content-Type: application/json" \
-d '{"tag": "database", "concurrency_limit": 10}'
# Get all concurrency limits
curl "http://localhost:4200/api/concurrency_limits/"
resource "prefect_concurrency_limit" "database_limit" {
tag = "database"
concurrency_limit = 10
}
When a task is delayed due to a concurrency limit, it will wait for a set wait time before retrying. You can set the wait time for when tasks are delayed due to concurrency limits:
prefect config set PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=60
Note: this setting needs to be set on the Prefect server, not the Prefect client.