prefect.orion.utilities.server
¶
Utilities for the Orion API server.
OrionAPIRoute
¶
A FastAPI APIRoute class which attaches an async stack to requests that exits before a response is returned.
Requests already have request.scope['fastapi_astack']
which is an async stack for
the full scope of the request. This stack is used for managing contexts of FastAPI
dependencies. If we want to close a dependency before the request is complete
(i.e. before returning a response to the user), we need a stack with a different
scope. This extension adds this stack at request.state.response_scoped_stack
.
Source code in prefect/orion/utilities/server.py
class OrionAPIRoute(APIRoute):
"""
A FastAPI APIRoute class which attaches an async stack to requests that exits before
a response is returned.
Requests already have `request.scope['fastapi_astack']` which is an async stack for
the full scope of the request. This stack is used for managing contexts of FastAPI
dependencies. If we want to close a dependency before the request is complete
(i.e. before returning a response to the user), we need a stack with a different
scope. This extension adds this stack at `request.state.response_scoped_stack`.
"""
def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
default_handler = super().get_route_handler()
async def handle_response_scoped_depends(request: Request) -> Response:
# Create a new stack scoped to exit before the response is returned
async with AsyncExitStack() as stack:
request.state.response_scoped_stack = stack
response = await default_handler(request)
return response
return handle_response_scoped_depends
OrionRouter
¶
A base class for Orion API routers.
Source code in prefect/orion/utilities/server.py
class OrionRouter(APIRouter):
"""
A base class for Orion API routers.
"""
def __init__(self, **kwargs: Any) -> None:
kwargs.setdefault("route_class", OrionAPIRoute)
super().__init__(**kwargs)
def add_api_route(
self, path: str, endpoint: Callable[..., Any], **kwargs: Any
) -> None:
"""
Add an API route.
For routes that return content and have not specified a `response_model`,
use return type annotation to infer the response model.
For routes that return No-Content status codes, explicitly set
a `response_class` to ensure nothing is returned in the response body.
"""
if kwargs.get("status_code") == status.HTTP_204_NO_CONTENT:
# any routes that return No-Content status codes must
# explicilty set a response_class that will handle status codes
# and not return anything in the body
kwargs["response_class"] = Response
if kwargs.get("response_model") is None:
kwargs["response_model"] = get_type_hints(endpoint).get("return")
return super().add_api_route(path, endpoint, **kwargs)
OrionRouter.add_api_route
¶
Add an API route.
For routes that return content and have not specified a response_model
,
use return type annotation to infer the response model.
For routes that return No-Content status codes, explicitly set
a response_class
to ensure nothing is returned in the response body.
Source code in prefect/orion/utilities/server.py
def add_api_route(
self, path: str, endpoint: Callable[..., Any], **kwargs: Any
) -> None:
"""
Add an API route.
For routes that return content and have not specified a `response_model`,
use return type annotation to infer the response model.
For routes that return No-Content status codes, explicitly set
a `response_class` to ensure nothing is returned in the response body.
"""
if kwargs.get("status_code") == status.HTTP_204_NO_CONTENT:
# any routes that return No-Content status codes must
# explicilty set a response_class that will handle status codes
# and not return anything in the body
kwargs["response_class"] = Response
if kwargs.get("response_model") is None:
kwargs["response_model"] = get_type_hints(endpoint).get("return")
return super().add_api_route(path, endpoint, **kwargs)
method_paths_from_routes
¶
Generate a set of strings describing the given routes in the format:
For example, "GET /logs/"
Source code in prefect/orion/utilities/server.py
def method_paths_from_routes(routes: Iterable[APIRoute]) -> Set[str]:
"""
Generate a set of strings describing the given routes in the format: <method> <path>
For example, "GET /logs/"
"""
method_paths = set()
for route in routes:
for method in route.methods:
method_paths.add(f"{method} {route.path}")
return method_paths
response_scoped_dependency
¶
Ensure that this dependency closes before the response is returned to the client. By default, FastAPI closes dependencies after sending the response.
Uses an async stack that is exited before the response is returned. This is particularly useful for database sesssions which must be committed before the client can do more work.
Do not use a response-scoped dependency within a FastAPI background task.
Background tasks run after FastAPI sends the response, so a response-scoped dependency will already be closed. Use a normal FastAPI dependency instead.
Parameters:
Name | Description | Default |
---|---|---|
dependency |
An async callable. FastAPI dependencies may still be used. Callable |
required |
Returns:
Type | Description |
---|---|
A wrapped |
Source code in prefect/orion/utilities/server.py
def response_scoped_dependency(dependency: Callable):
"""
Ensure that this dependency closes before the response is returned to the client. By
default, FastAPI closes dependencies after sending the response.
Uses an async stack that is exited before the response is returned. This is
particularly useful for database sesssions which must be committed before the client
can do more work.
NOTE: Do not use a response-scoped dependency within a FastAPI background task.
Background tasks run after FastAPI sends the response, so a response-scoped
dependency will already be closed. Use a normal FastAPI dependency instead.
Args:
dependency: An async callable. FastAPI dependencies may still be used.
Returns:
A wrapped `dependency` which will push the `dependency` context manager onto
a stack when called.
"""
signature = inspect.signature(dependency)
async def wrapper(*args, request: Request, **kwargs):
# Replicate FastAPI behavior of auto-creating a context manager
if inspect.isasyncgenfunction(dependency):
context_manager = asynccontextmanager(dependency)
else:
context_manager = dependency
# Ensure request is provided if requested
if "request" in signature.parameters:
kwargs["request"] = request
# Enter the route handler provided stack that is closed before responding,
# return the value yielded by the wrapped dependency
return await request.state.response_scoped_stack.enter_async_context(
context_manager(*args, **kwargs)
)
# Ensure that the signature includes `request: Request` to ensure that FastAPI will
# inject the request as a dependency; maintain the old signature so those depends
# work
request_parameter = inspect.signature(wrapper).parameters["request"]
functools.update_wrapper(wrapper, dependency)
if "request" not in signature.parameters:
new_parameters = signature.parameters.copy()
new_parameters["request"] = request_parameter
wrapper.__signature__ = signature.replace(
parameters=tuple(new_parameters.values())
)
return wrapper