Skip to content

prefect.runner.server

build_server async

Build a FastAPI server for a runner.

Parameters:

Name Type Description Default
runner Runner

the runner this server interacts with and monitors

required
log_level str

the log level to use for the server

required
Source code in prefect/runner/server.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
@sync_compatible
async def build_server(runner: "Runner") -> FastAPI:
    """
    Build a FastAPI server for a runner.

    Args:
        runner (Runner): the runner this server interacts with and monitors
        log_level (str): the log level to use for the server
    """
    webserver = FastAPI()
    router = APIRouter()

    router.add_api_route(
        "/health", perform_health_check(runner=runner), methods=["GET"]
    )
    router.add_api_route("/run_count", run_count(runner=runner), methods=["GET"])
    router.add_api_route("/shutdown", shutdown(runner=runner), methods=["POST"])
    webserver.include_router(router)

    if PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS.value():
        deployments_router, deployment_schemas = await get_deployment_router(runner)
        webserver.include_router(deployments_router)

        subflow_schemas = await get_subflow_schemas(runner)
        webserver.add_api_route(
            "/flow/run",
            _build_generic_endpoint_for_flows(runner=runner, schemas=subflow_schemas),
            methods=["POST"],
            name="Run flow in background",
            description="Trigger any flow run as a background task on the runner.",
            summary="Run flow",
        )

        def customize_openapi():
            if webserver.openapi_schema:
                return webserver.openapi_schema

            openapi_schema = inject_schemas_into_openapi(webserver, deployment_schemas)
            webserver.openapi_schema = openapi_schema
            return webserver.openapi_schema

        webserver.openapi = customize_openapi

    return webserver

get_subflow_schemas async

Load available subflow schemas by filtering for only those subflows in the deployment entrypoint's import space.

Source code in prefect/runner/server.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
async def get_subflow_schemas(runner: "Runner") -> Dict[str, Dict]:
    """
    Load available subflow schemas by filtering for only those subflows in the
    deployment entrypoint's import space.
    """
    schemas = {}
    async with get_client() as client:
        for deployment_id in runner._deployment_ids:
            deployment = await client.read_deployment(deployment_id)
            if deployment.entrypoint is None:
                continue

            script = deployment.entrypoint.split(":")[0]
            subflows = load_flows_from_script(script)
            for flow in subflows:
                schemas[flow.name] = flow.parameters.dict()

    return schemas

start_webserver

Run a FastAPI server for a runner.

Parameters:

Name Type Description Default
runner Runner

the runner this server interacts with and monitors

required
log_level str

the log level to use for the server

None
Source code in prefect/runner/server.py
291
292
293
294
295
296
297
298
299
300
301
302
303
def start_webserver(runner: "Runner", log_level: Optional[str] = None) -> None:
    """
    Run a FastAPI server for a runner.

    Args:
        runner (Runner): the runner this server interacts with and monitors
        log_level (str): the log level to use for the server
    """
    host = PREFECT_RUNNER_SERVER_HOST.value()
    port = PREFECT_RUNNER_SERVER_PORT.value()
    log_level = log_level or PREFECT_RUNNER_SERVER_LOG_LEVEL.value()
    webserver = build_server(runner)
    uvicorn.run(webserver, host=host, port=port, log_level=log_level)