Skip to content

prefect.client.utilities

Utilities for working with clients.

inject_client

Simple helper to provide a context managed client to a asynchronous function.

The decorated function must take a client kwarg and if a client is passed when called it will be used instead of creating a new one, but it will not be context managed as it is assumed that the caller is managing the context.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/client/utilities.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def inject_client(fn):
    """
    Simple helper to provide a context managed client to a asynchronous function.

    The decorated function _must_ take a `client` kwarg and if a client is passed when
    called it will be used instead of creating a new one, but it will not be context
    managed as it is assumed that the caller is managing the context.
    """

    @wraps(fn)
    async def with_injected_client(*args, **kwargs):
        from prefect.client.orchestration import get_client
        from prefect.context import FlowRunContext, TaskRunContext

        flow_run_context = FlowRunContext.get()
        task_run_context = TaskRunContext.get()
        client = None
        client_context = asyncnullcontext()

        if "client" in kwargs and kwargs["client"] is not None:
            # Client provided in kwargs
            client = kwargs["client"]
        elif flow_run_context and flow_run_context.client._loop == get_running_loop():
            # Client available from flow run context
            client = flow_run_context.client
        elif task_run_context and task_run_context.client._loop == get_running_loop():
            # Client available from task run context
            client = task_run_context.client

        else:
            # A new client is needed
            client_context = get_client()

        # Removes existing client to allow it to be set by setdefault below
        kwargs.pop("client", None)

        async with client_context as new_client:
            kwargs.setdefault("client", new_client or client)
            return await fn(*args, **kwargs)

    return with_injected_client