prefect.workers.process

Module containing the Process worker used for executing flow runs as subprocesses. To start a Process worker, run the following command:
prefect worker start --pool 'my-work-pool' --type process
Replace my-work-pool with the name of the work pool you want the worker to poll for flow runs. For more information about work pools and workers, checkout out the Prefect docs.

Classes

ProcessJobConfiguration

Methods:

from_template_and_values

from_template_and_values(cls, base_job_template: dict[str, Any], values: dict[str, Any], client: 'PrefectClient | None' = None)
Creates a valid worker configuration object from the provided base configuration and overrides. Important: this method expects that the base_job_template was already validated server-side.

is_using_a_runner

is_using_a_runner(self) -> bool

json_template

json_template(cls) -> dict[str, Any]
Returns a dict with job configuration as keys and the corresponding templates as values Defaults to using the job configuration parameter name as the template variable name. e.g.
{
    key1: '{{ key1 }}',     # default variable template
    key2: '{{ template2 }}', # `template2` specifically provide as template
}

prepare_for_flow_run

prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: 'DeploymentResponse | None' = None, flow: 'APIFlow | None' = None, work_pool: 'WorkPool | None' = None, worker_name: str | None = None) -> None

prepare_for_flow_run

prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: 'DeploymentResponse | None' = None, flow: 'APIFlow | None' = None, work_pool: 'WorkPool | None' = None, worker_name: str | None = None) -> None
Prepare the job configuration for a flow run. This method is called by the worker before starting a flow run. It should be used to set any configuration values that are dependent on the flow run. Args:
  • flow_run: The flow run to be executed.
  • deployment: The deployment that the flow run is associated with.
  • flow: The flow that the flow run is associated with.
  • work_pool: The work pool that the flow run is running in.
  • worker_name: The name of the worker that is submitting the flow run.

validate_working_dir

validate_working_dir(cls, v: Path | str | None) -> Path | None

ProcessVariables

Methods:

model_json_schema

model_json_schema(cls, by_alias: bool = True, ref_template: str = '#/definitions/{model}', schema_generator: Type[GenerateJsonSchema] = GenerateJsonSchema, mode: Literal['validation', 'serialization'] = 'validation') -> dict[str, Any]
TODO: stop overriding this method - use GenerateSchema in ConfigDict instead?

ProcessWorkerResult

Contains information about the final state of a completed process

ProcessWorker

Methods:

run

run(self, flow_run: 'FlowRun', configuration: ProcessJobConfiguration, task_status: Optional[anyio.abc.TaskStatus[int]] = None) -> ProcessWorkerResult

start

start(self, run_once: bool = False, with_healthcheck: bool = False, printer: Callable[..., None] = print) -> None
Starts the worker and runs the main worker loops. By default, the worker will run loops to poll for scheduled/cancelled flow runs and sync with the Prefect API server. If run_once is set, the worker will only run each loop once and then return. If with_healthcheck is set, the worker will start a healthcheck server which can be used to determine if the worker is still polling for flow runs and restart the worker if necessary. Args:
  • run_once: If set, the worker will only run each loop once then return.
  • with_healthcheck: If set, the worker will start a healthcheck server.
  • printer: A print-like function where logs will be reported.