Skip to content

prefect.concurrency.asyncio

ConcurrencySlotAcquisitionError

Bases: Exception

Raised when an unhandlable occurs while acquiring concurrency slots.

Source code in prefect/concurrency/asyncio.py
25
26
class ConcurrencySlotAcquisitionError(Exception):
    """Raised when an unhandlable occurs while acquiring concurrency slots."""

concurrency async

A context manager that acquires and releases concurrency slots from the given concurrency limits.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None

Raises:

Type Description
TimeoutError

If the slots are not acquired within the given timeout.

A simple example of using the async concurrency context manager:

from prefect.concurrency.asyncio import concurrency

async def resource_heavy():
    async with concurrency("test", occupy=1):
        print("Resource heavy task")

async def main():
    await resource_heavy()

Source code in prefect/concurrency/asyncio.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@asynccontextmanager
async def concurrency(
    names: Union[str, List[str]],
    occupy: int = 1,
    timeout_seconds: Optional[float] = None,
):
    """A context manager that acquires and releases concurrency slots from the
    given concurrency limits.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.

    Raises:
        TimeoutError: If the slots are not acquired within the given timeout.

    Example:
    A simple example of using the async `concurrency` context manager:
    ```python
    from prefect.concurrency.asyncio import concurrency

    async def resource_heavy():
        async with concurrency("test", occupy=1):
            print("Resource heavy task")

    async def main():
        await resource_heavy()
    ```
    """
    names = names if isinstance(names, list) else [names]
    with timeout_async(seconds=timeout_seconds):
        limits = await _acquire_concurrency_slots(names, occupy)
    acquisition_time = pendulum.now("UTC")
    emitted_events = _emit_concurrency_acquisition_events(limits, occupy)

    try:
        yield
    finally:
        occupancy_period = cast(Interval, (pendulum.now("UTC") - acquisition_time))
        await _release_concurrency_slots(
            names, occupy, occupancy_period.total_seconds()
        )
        _emit_concurrency_release_events(limits, occupy, emitted_events)

rate_limit async

Block execution until an occupy number of slots of the concurrency limits given in names are acquired. Requires that all given concurrency limits have a slot decay.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
Source code in prefect/concurrency/asyncio.py
76
77
78
79
80
81
82
83
84
85
86
87
async def rate_limit(names: Union[str, List[str]], occupy: int = 1):
    """Block execution until an `occupy` number of slots of the concurrency
    limits given in `names` are acquired. Requires that all given concurrency
    limits have a slot decay.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
    """
    names = names if isinstance(names, list) else [names]
    limits = await _acquire_concurrency_slots(names, occupy, mode="rate_limit")
    _emit_concurrency_acquisition_events(limits, occupy)