Skip to content

prefect.workers.process

Module containing the Process worker used for executing flow runs as subprocesses.

To start a Process worker, run the following command:

prefect worker start --pool 'my-work-pool' --type process

Replace my-work-pool with the name of the work pool you want the worker to poll for flow runs.

For more information about work pools and workers, checkout out the Prefect docs.

ProcessJobConfiguration

Bases: BaseJobConfiguration

Source code in prefect/workers/process.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class ProcessJobConfiguration(BaseJobConfiguration):
    stream_output: bool = Field(default=True)
    working_dir: Optional[Path] = Field(default=None)

    @validator("working_dir")
    def validate_command(cls, v):
        return validate_command(v)

    def prepare_for_flow_run(
        self,
        flow_run: "FlowRun",
        deployment: Optional["DeploymentResponse"] = None,
        flow: Optional["Flow"] = None,
    ):
        super().prepare_for_flow_run(flow_run, deployment, flow)

        self.env = {**os.environ, **self.env}
        self.command = (
            f"{get_sys_executable()} -m prefect.engine"
            if self.command == self._base_flow_run_command()
            else self.command
        )

    def _base_flow_run_command(self) -> str:
        """
        Override the base flow run command because enhanced cancellation doesn't
        work with the process worker.
        """
        return "python -m prefect.engine"

ProcessWorkerResult

Bases: BaseWorkerResult

Contains information about the final state of a completed process

Source code in prefect/workers/process.py
117
118
class ProcessWorkerResult(BaseWorkerResult):
    """Contains information about the final state of a completed process"""