Skip to content

prefect.orion.api.server

Defines the Orion FastAPI app.

SPAStaticFiles

Implementation of StaticFiles for serving single page applications.

Adds get_response handling to ensure that when a resource isn't found the application still returns the index.

Source code in prefect/orion/api/server.py
class SPAStaticFiles(StaticFiles):
    """
    Implementation of `StaticFiles` for serving single page applications.

    Adds `get_response` handling to ensure that when a resource isn't found the
    application still returns the index.
    """

    async def get_response(self, path: str, scope):
        try:
            return await super().get_response(path, scope)
        except HTTPException:
            return await super().get_response("./index.html", scope)

SPAStaticFiles.get_response async

Returns an HTTP response, given the incoming path, method and request headers.

Source code in prefect/orion/api/server.py
async def get_response(self, path: str, scope):
    try:
        return await super().get_response(path, scope)
    except HTTPException:
        return await super().get_response("./index.html", scope)

create_app

Create an FastAPI app that includes the Orion API and UI

Parameters:

Name Description Default
settings

The settings to use to create the app. If not set, settings are pulled from the context.

Settings
None
ignore_cache

If set, a new application will be created even if the settings match. Otherwise, an application is returned from the cache.

bool
False
ephemeral

If set, the application will be treated as ephemeral. The UI and services will be disabled.

bool
False
Source code in prefect/orion/api/server.py
def create_app(
    settings: prefect.settings.Settings = None,
    ephemeral: bool = False,
    ignore_cache: bool = False,
) -> FastAPI:
    """
    Create an FastAPI app that includes the Orion API and UI

    Args:
        settings: The settings to use to create the app. If not set, settings are pulled
            from the context.
        ignore_cache: If set, a new application will be created even if the settings
            match. Otherwise, an application is returned from the cache.
        ephemeral: If set, the application will be treated as ephemeral. The UI
            and services will be disabled.
    """
    settings = settings or prefect.settings.get_current_settings()
    cache_key = (settings, ephemeral)

    if cache_key in APP_CACHE and not ignore_cache:
        return APP_CACHE[cache_key]

    # TODO: Move these startup functions out of this closure into the top-level or
    #       another dedicated location
    async def run_migrations():
        """Ensure the database is created and up to date with the current migrations"""
        if prefect.settings.PREFECT_ORION_DATABASE_MIGRATE_ON_START:
            from prefect.orion.database.dependencies import provide_database_interface

            db = provide_database_interface()
            await db.create_db()

    @_memoize_block_auto_registration
    async def add_block_types():
        """Add all registered blocks to the database"""
        if not prefect.settings.PREFECT_ORION_BLOCKS_REGISTER_ON_START:
            return

        from prefect.orion.database.dependencies import provide_database_interface
        from prefect.orion.models.block_registration import run_block_auto_registration

        db = provide_database_interface()
        session = await db.session()

        try:
            async with session:
                await run_block_auto_registration(session=session)
        except Exception as exc:
            logger.warn(f"Error occurred during block auto-registration: {exc!r}")

    async def start_services():
        """Start additional services when the Orion API starts up."""

        if ephemeral:
            app.state.services = None
            return

        service_instances = []

        if prefect.settings.PREFECT_ORION_SERVICES_SCHEDULER_ENABLED.value():
            service_instances.append(services.scheduler.Scheduler())
            service_instances.append(services.scheduler.RecentDeploymentsScheduler())

        if prefect.settings.PREFECT_ORION_SERVICES_LATE_RUNS_ENABLED.value():
            service_instances.append(services.late_runs.MarkLateRuns())

        if prefect.settings.PREFECT_ORION_ANALYTICS_ENABLED.value():
            service_instances.append(services.telemetry.Telemetry())

        if (
            prefect.settings.PREFECT_ORION_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED.value()
        ):
            service_instances.append(
                services.flow_run_notifications.FlowRunNotifications()
            )

        loop = asyncio.get_running_loop()

        app.state.services = {
            service: loop.create_task(service.start()) for service in service_instances
        }

        for service, task in app.state.services.items():
            logger.info(f"{service.name} service scheduled to start in-app")
            task.add_done_callback(partial(on_service_exit, service))

    async def stop_services():
        """Ensure services are stopped before the Orion API shuts down."""
        if app.state.services:
            await asyncio.gather(*[service.stop() for service in app.state.services])
            try:
                await asyncio.gather(
                    *[task.stop() for task in app.state.services.values()]
                )
            except Exception as exc:
                # `on_service_exit` should handle logging exceptions on exit
                pass

    def on_service_exit(service, task):
        """
        Added as a callback for completion of services to log exit
        """
        try:
            # Retrieving the result will raise the exception
            task.result()
        except Exception:
            logger.error(f"{service.name} service failed!", exc_info=True)
        else:
            logger.info(f"{service.name} service stopped!")

    app = FastAPI(
        title=TITLE,
        version=API_VERSION,
        on_startup=[
            run_migrations,
            add_block_types,
            start_services,
        ],
        on_shutdown=[stop_services],
    )
    api_app = create_orion_api(
        fast_api_app_kwargs={
            "exception_handlers": {
                Exception: custom_internal_exception_handler,
                RequestValidationError: validation_exception_handler,
                sa.exc.IntegrityError: integrity_exception_handler,
                ObjectNotFoundError: prefect_object_not_found_exception_handler,
            }
        }
    )
    ui_app = create_ui_app(ephemeral)

    # middleware
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_methods=["*"],
        allow_headers=["*"],
    )

    api_app.mount(
        "/static",
        StaticFiles(
            directory=os.path.join(
                os.path.dirname(os.path.realpath(__file__)), "static"
            )
        ),
        name="static",
    )
    app.mount("/api", app=api_app, name="api")
    app.mount("/", app=ui_app, name="ui")

    def openapi():
        """
        Convenience method for extracting the user facing OpenAPI schema from the API app.

        This method is attached to the global public app for easy access.
        """
        partial_schema = get_openapi(
            title=API_TITLE,
            version=API_VERSION,
            routes=api_app.routes,
        )
        new_schema = partial_schema.copy()
        new_schema["paths"] = {}
        for path, value in partial_schema["paths"].items():
            new_schema["paths"][f"/api{path}"] = value

        new_schema["info"]["x-logo"] = {"url": "static/prefect-logo-mark-gradient.png"}
        return new_schema

    app.openapi = openapi

    APP_CACHE[cache_key] = app
    return app

create_orion_api

Create a FastAPI app that includes the Orion API

Parameters:

Name Description Default
router_prefix

a prefix to apply to all included routers

Optional[str]
''
dependencies

a list of global dependencies to add to each Orion router

Optional[List[Depends]]
None
health_check_path

the health check route path

str
'/health'
fast_api_app_kwargs

kwargs to pass to the FastAPI constructor

dict
None
router_overrides

a mapping of route prefixes (i.e. "/admin") to new routers allowing the caller to override the default routers. If None is provided as a value, the default router will be dropped from the application.

Mapping[str, Optional[fastapi.routing.APIRouter]]
None

Returns:

Type Description
FastAPI

a FastAPI app that serves the Orion API

Source code in prefect/orion/api/server.py
def create_orion_api(
    router_prefix: Optional[str] = "",
    dependencies: Optional[List[Depends]] = None,
    health_check_path: str = "/health",
    fast_api_app_kwargs: dict = None,
    router_overrides: Mapping[str, Optional[APIRouter]] = None,
) -> FastAPI:
    """
    Create a FastAPI app that includes the Orion API

    Args:
        router_prefix: a prefix to apply to all included routers
        dependencies: a list of global dependencies to add to each Orion router
        health_check_path: the health check route path
        fast_api_app_kwargs: kwargs to pass to the FastAPI constructor
        router_overrides: a mapping of route prefixes (i.e. "/admin") to new routers
            allowing the caller to override the default routers. If `None` is provided
            as a value, the default router will be dropped from the application.

    Returns:
        a FastAPI app that serves the Orion API
    """
    fast_api_app_kwargs = fast_api_app_kwargs or {}
    api_app = FastAPI(title=API_TITLE, **fast_api_app_kwargs)

    @api_app.get(health_check_path)
    async def health_check():
        return True

    # always include version checking
    if dependencies is None:
        dependencies = [Depends(enforce_minimum_version)]
    else:
        dependencies.append(Depends(enforce_minimum_version))

    routers = {router.prefix: router for router in API_ROUTERS}

    if router_overrides:
        for prefix, router in router_overrides.items():

            # We may want to allow this behavior in the future to inject new routes, but
            # for now this will be treated an as an exception
            if prefix not in routers:
                raise KeyError(
                    f"Router override provided for prefix that does not exist: {prefix!r}"
                )

            # Drop the existing router
            existing_router = routers.pop(prefix)

            # Replace it with a new router if provided
            if router is not None:

                if prefix != router.prefix:
                    # We may want to allow this behavior in the future, but it will
                    # break expectations without additional routing and is banned for
                    # now
                    raise ValueError(
                        f"Router override for {prefix!r} defines a different prefix "
                        f"{router.prefix!r}."
                    )

                existing_paths = method_paths_from_routes(existing_router.routes)
                new_paths = method_paths_from_routes(router.routes)
                if not existing_paths.issubset(new_paths):
                    raise ValueError(
                        f"Router override for {prefix!r} is missing paths defined by "
                        f"the original router: {existing_paths.difference(new_paths)}"
                    )

                routers[prefix] = router

    for router in routers.values():
        api_app.include_router(router, prefix=router_prefix, dependencies=dependencies)

    return api_app

custom_internal_exception_handler async

Log a detailed exception for internal server errors before returning.

Source code in prefect/orion/api/server.py
async def custom_internal_exception_handler(request: Request, exc: Exception):
    """Log a detailed exception for internal server errors before returning."""
    logger.error(f"Encountered exception in request:", exc_info=True)
    return JSONResponse(
        content={"exception_message": "Internal Server Error"},
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
    )

integrity_exception_handler async

Capture database integrity errors.

Source code in prefect/orion/api/server.py
async def integrity_exception_handler(request: Request, exc: Exception):
    """Capture database integrity errors."""
    logger.error(f"Encountered exception in request:", exc_info=True)
    return JSONResponse(
        content={
            "detail": (
                "Data integrity conflict. This usually means a "
                "unique or foreign key constraint was violated. "
                "See server logs for details."
            )
        },
        status_code=status.HTTP_409_CONFLICT,
    )

prefect_object_not_found_exception_handler async

Return 404 status code on object not found exceptions.

Source code in prefect/orion/api/server.py
async def prefect_object_not_found_exception_handler(
    request: Request, exc: ObjectNotFoundError
):
    """Return 404 status code on object not found exceptions."""
    return JSONResponse(
        content={"exception_message": str(exc)}, status_code=status.HTTP_404_NOT_FOUND
    )

validation_exception_handler async

Provide a detailed message for request validation errors.

Source code in prefect/orion/api/server.py
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    """Provide a detailed message for request validation errors."""
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content=jsonable_encoder(
            {
                "exception_message": "Invalid request received.",
                "exception_detail": exc.errors(),
                "request_body": exc.body,
            }
        ),
    )