Skip to content

prefect.client.cloud

CloudUnauthorizedError

Bases: PrefectException

Raised when the CloudClient receives a 401 or 403 from the Cloud API.

Source code in prefect/client/cloud.py
55
56
57
58
class CloudUnauthorizedError(PrefectException):
    """
    Raised when the CloudClient receives a 401 or 403 from the Cloud API.
    """

CloudClient

Source code in prefect/client/cloud.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class CloudClient:
    def __init__(
        self,
        host: str,
        api_key: str,
        httpx_settings: Optional[Dict[str, Any]] = None,
    ) -> None:
        httpx_settings = httpx_settings or dict()
        httpx_settings.setdefault("headers", dict())
        httpx_settings["headers"].setdefault("Authorization", f"Bearer {api_key}")

        httpx_settings.setdefault("base_url", host)
        if not PREFECT_UNIT_TEST_MODE.value():
            httpx_settings.setdefault("follow_redirects", True)
        self._client = PrefectHttpxAsyncClient(
            **httpx_settings, enable_csrf_support=False
        )

    async def api_healthcheck(self):
        """
        Attempts to connect to the Cloud API and raises the encountered exception if not
        successful.

        If successful, returns `None`.
        """
        with anyio.fail_after(10):
            await self.read_workspaces()

    async def read_workspaces(self) -> List[Workspace]:
        workspaces = pydantic.parse_obj_as(
            List[Workspace], await self.get("/me/workspaces")
        )
        return workspaces

    async def read_worker_metadata(self) -> Dict[str, Any]:
        configured_url = prefect.settings.PREFECT_API_URL.value()
        account_id, workspace_id = re.findall(PARSE_API_URL_REGEX, configured_url)[0]
        return await self.get(
            f"accounts/{account_id}/workspaces/{workspace_id}/collections/work_pool_types"
        )

    async def __aenter__(self):
        await self._client.__aenter__()
        return self

    async def __aexit__(self, *exc_info):
        return await self._client.__aexit__(*exc_info)

    def __enter__(self):
        raise RuntimeError(
            "The `CloudClient` must be entered with an async context. Use 'async "
            "with CloudClient(...)' not 'with CloudClient(...)'"
        )

    def __exit__(self, *_):
        assert False, "This should never be called but must be defined for __enter__"

    async def get(self, route, **kwargs):
        return await self.request("GET", route, **kwargs)

    async def request(self, method, route, **kwargs):
        try:
            res = await self._client.request(method, route, **kwargs)
            res.raise_for_status()
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code in (
                status.HTTP_401_UNAUTHORIZED,
                status.HTTP_403_FORBIDDEN,
            ):
                raise CloudUnauthorizedError
            else:
                raise exc

        if res.status_code == status.HTTP_204_NO_CONTENT:
            return

        return res.json()

api_healthcheck async

Attempts to connect to the Cloud API and raises the encountered exception if not successful.

If successful, returns None.

Source code in prefect/client/cloud.py
79
80
81
82
83
84
85
86
87
async def api_healthcheck(self):
    """
    Attempts to connect to the Cloud API and raises the encountered exception if not
    successful.

    If successful, returns `None`.
    """
    with anyio.fail_after(10):
        await self.read_workspaces()

get_cloud_client

Needs a docstring.

Source code in prefect/client/cloud.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def get_cloud_client(
    host: Optional[str] = None,
    api_key: Optional[str] = None,
    httpx_settings: Optional[dict] = None,
    infer_cloud_url: bool = False,
) -> "CloudClient":
    """
    Needs a docstring.
    """
    if httpx_settings is not None:
        httpx_settings = httpx_settings.copy()

    if infer_cloud_url is False:
        host = host or PREFECT_CLOUD_API_URL.value()
    else:
        configured_url = prefect.settings.PREFECT_API_URL.value()
        host = re.sub(PARSE_API_URL_REGEX, "", configured_url)

    return CloudClient(
        host=host,
        api_key=api_key or PREFECT_API_KEY.value(),
        httpx_settings=httpx_settings,
    )