prefect.runner.runner

Runners are responsible for managing the execution of all deployments. When creating a deployment using either flow.serve or the serve utility, they also will poll for scheduled runs. Example:
import time
from prefect import flow, serve


@flow
def slow_flow(sleep: int = 60):
    "Sleepy flow - sleeps the provided amount of time (in seconds)."
    time.sleep(sleep)


@flow
def fast_flow():
    "Fastest flow this side of the Mississippi."
    return


if __name__ == "__main__":
    slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
    fast_deploy = fast_flow.to_deployment(name="fast")

    # serve generates a Runner instance
    serve(slow_deploy, fast_deploy)

Classes

ProcessMapEntry

Runner

Methods:

add_deployment

add_deployment(self, deployment: 'RunnerDeployment') -> UUID
Registers the deployment with the Prefect API and will monitor for work once the runner is started. Args:
  • deployment: A deployment for the runner to register.

add_flow

add_flow(self, flow: Flow[Any, Any], name: Optional[str] = None, interval: Optional[Union[Iterable[Union[int, float, datetime.timedelta]], int, float, datetime.timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, enforce_parameter_schema: bool = True, entrypoint_type: EntrypointType = EntrypointType.FILE_PATH) -> UUID
Provides a flow to the runner to be run based on the provided configuration. Will create a deployment for the provided flow and register the deployment with the runner. Args:
  • flow: A flow for the runner to run.
  • name: The name to give the created deployment. Will default to the name of the runner.
  • interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.
  • cron: A cron schedule of when to execute runs of this flow.
  • rrule: An rrule schedule of when to execute runs of this flow.
  • paused: Whether or not to set the created deployment as paused.
  • schedule: A schedule object defining when to execute runs of this deployment. Used to provide additional scheduling options like timezone or parameters.
  • schedules: A list of schedule objects defining when to execute runs of this flow. Used to define multiple schedules or additional scheduling options like timezone.
  • concurrency_limit: The maximum number of concurrent runs of this flow to allow.
  • triggers: A list of triggers that should kick of a run of this flow.
  • parameters: A dictionary of default parameter values to pass to runs of this flow.
  • description: A description for the created deployment. Defaults to the flow’s description if not provided.
  • tags: A list of tags to associate with the created deployment for organizational purposes.
  • version: A version for the created deployment. Defaults to the flow’s version.
  • entrypoint_type: Type of entrypoint to use for the deployment. When using a module path entrypoint, ensure that the module will be importable in the execution environment.

cancel_all

cancel_all(self) -> None

execute_bundle

execute_bundle(self, bundle: SerializedBundle, cwd: Path | str | None = None, env: dict[str, str | None] | None = None) -> None
Executes a bundle in a subprocess.

execute_flow_run

execute_flow_run(self, flow_run_id: UUID, entrypoint: str | None = None, command: str | None = None, cwd: Path | str | None = None, env: dict[str, str | None] | None = None, task_status: anyio.abc.TaskStatus[int] = anyio.TASK_STATUS_IGNORED, stream_output: bool = True) -> anyio.abc.Process | None
Executes a single flow run with the given ID. Execution will wait to monitor for cancellation requests. Exits once the flow run process has exited. Returns:
  • The flow run process.

execute_in_background

execute_in_background(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> 'concurrent.futures.Future[Any]'
Executes a function in the background.

handle_sigterm

handle_sigterm(self, *args: Any, **kwargs: Any) -> None
Gracefully shuts down the runner when a SIGTERM is received.

has_slots_available

has_slots_available(self) -> bool
Determine if the flow run limit has been reached. Returns:
    • bool: True if the limit has not been reached, False otherwise.

reschedule_current_flow_runs

reschedule_current_flow_runs(self) -> None
Reschedules all flow runs that are currently running. This should only be called when the runner is shutting down because it kill all child processes and short-circuit the crash detection logic.

start

start(self, run_once: bool = False, webserver: Optional[bool] = None) -> None
Starts a runner. The runner will begin monitoring for and executing any scheduled work for all added flows. Args:
  • run_once: If True, the runner will through one query loop and then exit.
  • webserver: a boolean for whether to start a webserver for this runner. If provided, overrides the default on the runner
Examples: Initialize a Runner, add two flows, and serve them by starting the Runner:
import asyncio
from prefect import flow, Runner

@flow
def hello_flow(name):
    print(f"hello {name}")

@flow
def goodbye_flow(name):
    print(f"goodbye {name}")

if __name__ == "__main__"
    runner = Runner(name="my-runner")

    # Will be runnable via the API
    runner.add_flow(hello_flow)

    # Run on a cron schedule
    runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})

    asyncio.run(runner.start())

stop

stop(self)
Stops the runner’s polling cycle.