Task run concurrency limits help prevent too many tasks from running simultaneously. For example, if many tasks across multiple flows are designed to interact with a database that only allows 10 connections.

Task run concurrency limits use task tags. You can specify an optional concurrency limit as the maximum number of concurrent task runs in a Running state for tasks with a given tag.

Tag-based task concurrency is different from Global concurrency limits, though they can be used to achieve similar outcomes. Global concurrency limits are a more general way to control concurrency for any Python-based operation, whereas tag-based concurrency limits are specific to Prefect tasks.

If a task has multiple tags, it will run only if all tags have available concurrency.

Tags without specified concurrency limits are treated as unlimited. Setting a tag’s concurrency limit to 0 causes immediate abortion of any task runs with that tag, rather than delaying them.

Execution behavior

Task tag limits are checked whenever a task run attempts to enter a Running state.

If there are no concurrency slots available for any one of your task’s tags, it delays the transition to a Running state and instructs the client to try entering a Running state again in 30 seconds (or the value specified by the PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS setting).

Configure task run concurrency limits

Flow run concurrency limits are set at a work pool, work queue, or deployment level

While task run concurrency limits are configured through tags (as shown below), flow run concurrency limits are configured through work pools and/or work queues.

You can set concurrency limits on as few or as many tags as you wish. You can set limits through:

CLI

You can manage task run concurrency with the Prefect CLI.

prefect concurrency-limit [command] [arguments]
CommandDescription
createCreate a concurrency limit by specifying a tag and limit.
deleteDelete the concurrency limit set on the specified tag.
inspectView details about a concurrency limit set on the specified tag.
lsView all defined concurrency limits.

For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:

prefect concurrency-limit create small_instance 10

To delete the concurrency limit on the ‘small_instance’ tag:

prefect concurrency-limit delete small_instance

To view details about the concurrency limit on the ‘small_instance’ tag:

prefect concurrency-limit inspect small_instance

API

You can manage task run concurrency with the Prefect API.

Terraform

You can manage task run concurrency with the Terraform provider for Prefect.

Python client

To update your tag concurrency limits programmatically, use PrefectClient.orchestration.create_concurrency_limit.

create_concurrency_limit takes two arguments:

  • tag specifies the task tag on which you’re setting a limit.
  • concurrency_limit specifies the maximum number of concurrent task runs for that tag.

For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:

from prefect import get_client

async with get_client() as client:
    # set a concurrency limit of 10 on the 'small_instance' tag
    limit_id = await client.create_concurrency_limit(
        tag="small_instance", 
        concurrency_limit=10
        )

To remove all concurrency limits on a tag, use PrefectClient.delete_concurrency_limit_by_tag, passing the tag:

async with get_client() as client:
    # remove a concurrency limit on the 'small_instance' tag
    await client.delete_concurrency_limit_by_tag(tag="small_instance")

If you wish to query for the current limit on a tag, use PrefectClient.read_concurrency_limit_by_tag, passing the tag:

To see all of your limits across all of your tags, use PrefectClient.read_concurrency_limits.

async with get_client() as client:
    # query the concurrency limit on the 'small_instance' tag
    limit = await client.read_concurrency_limit_by_tag(tag="small_instance")