Skip to content

prefect.workers.process

Module containing the Process worker used for executing flow runs as subprocesses.

To start a Process worker, run the following command:

prefect worker start --pool 'my-work-pool' --type process

Replace my-work-pool with the name of the work pool you want the worker to poll for flow runs.

For more information about work pools and workers, checkout out the Prefect docs.

ProcessJobConfiguration

Bases: BaseJobConfiguration

Source code in prefect/workers/process.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class ProcessJobConfiguration(BaseJobConfiguration):
    stream_output: bool = Field(default=True)
    working_dir: Optional[Path] = Field(default=None)

    @validator("working_dir")
    def validate_command(cls, v):
        return validate_command(v)

    def prepare_for_flow_run(
        self,
        flow_run: "FlowRun",
        deployment: Optional["DeploymentResponse"] = None,
        flow: Optional["Flow"] = None,
    ):
        super().prepare_for_flow_run(flow_run, deployment, flow)

        self.env = {**os.environ, **self.env}
        self.command = (
            f"{get_sys_executable()} -m prefect.engine"
            if self.command == self._base_flow_run_command()
            else self.command
        )

    def _base_flow_run_command(self) -> str:
        """
        Override the base flow run command because enhanced cancellation doesn't
        work with the process worker.
        """
        return "python -m prefect.engine"

ProcessWorkerResult

Bases: BaseWorkerResult

Contains information about the final state of a completed process

Source code in prefect/workers/process.py
132
133
class ProcessWorkerResult(BaseWorkerResult):
    """Contains information about the final state of a completed process"""