Skip to main content

prefect_kubernetes.worker

Module containing the Kubernetes worker used for executing flow runs as Kubernetes jobs. To start a Kubernetes worker, run the following command:
prefect worker start --pool 'my-work-pool' --type kubernetes
Replace my-work-pool with the name of the work pool you want the worker to poll for flow runs.

Securing your Prefect Cloud API key

If you are using Prefect Cloud and would like to pass your Prefect Cloud API key to created jobs via a Kubernetes secret, set the PREFECT_INTEGRATIONS_KUBERNETES_WORKER_CREATE_SECRET_FOR_API_KEY environment variable before starting your worker:
export PREFECT_INTEGRATIONS_KUBERNETES_WORKER_CREATE_SECRET_FOR_API_KEY="true"
prefect worker start --pool 'my-work-pool' --type kubernetes
Note that your work will need permission to create secrets in the same namespace(s) that Kubernetes jobs are created in to execute flow runs.

Using a custom Kubernetes job manifest template

The default template used for Kubernetes job manifests looks like this:
---
apiVersion: batch/v1
kind: Job
metadata:
annotations: "{{ annotations }}"
labels: "{{ labels }}"
namespace: "{{ namespace }}"
generateName: "{{ name }}-"
spec:
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
    spec:
    parallelism: 1
    completions: 1
    restartPolicy: Never
    serviceAccountName: "{{ service_account_name }}"
    containers:
    - name: prefect-job
        env: "{{ env }}"
        image: "{{ image }}"
        imagePullPolicy: "{{ image_pull_policy }}"
        args: "{{ command }}"
Each values enclosed in {{ }} is a placeholder that will be replaced with a value at runtime. The values that can be used a placeholders are defined by the variables schema defined in the base job template. The default job manifest and available variables can be customized on a work pool by work pool basis. These customizations can be made via the Prefect UI when creating or editing a work pool. For example, if you wanted to allow custom memory requests for a Kubernetes work pool you could update the job manifest template to look like this:
---
apiVersion: batch/v1
kind: Job
metadata:
annotations: "{{ annotations }}"
labels: "{{ labels }}"
namespace: "{{ namespace }}"
generateName: "{{ name }}-"
spec:
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
    spec:
    parallelism: 1
    completions: 1
    restartPolicy: Never
    serviceAccountName: "{{ service_account_name }}"
    containers:
    - name: prefect-job
        env: "{{ env }}"
        image: "{{ image }}"
        imagePullPolicy: "{{ image_pull_policy }}"
        args: "{{ command }}"
        resources:
            requests:
                memory: "{{ memory }}Mi"
            limits:
                memory: 128Mi
In this new template, the memory placeholder allows customization of the memory allocated to Kubernetes jobs created by workers in this work pool, but the limit is hard-coded and cannot be changed by deployments. For more information about work pools and workers, checkout out the Prefect docs.

Classes

KubernetesImagePullPolicy

Enum representing the image pull policy options for a Kubernetes job.

KubernetesWorkerJobConfiguration

Configuration class used by the Kubernetes worker. An instance of this class is passed to the Kubernetes worker’s run method for each flow run. It contains all of the information necessary to execute the flow run as a Kubernetes job. Attributes:
  • name: The name to give to created Kubernetes job.
  • command: The command executed in created Kubernetes jobs to kick off flow run execution.
  • env: The environment variables to set in created Kubernetes jobs.
  • labels: The labels to set on created Kubernetes jobs.
  • namespace: The Kubernetes namespace to create Kubernetes jobs in.
  • job_manifest: The Kubernetes job manifest to use to create Kubernetes jobs.
  • cluster_config: The Kubernetes cluster configuration to use for authentication to a Kubernetes cluster.
  • job_watch_timeout_seconds: The number of seconds to wait for the job to complete before timing out. If None, the worker will wait indefinitely.
  • pod_watch_timeout_seconds: The number of seconds to wait for the pod to complete before timing out.
  • stream_output: Whether or not to stream the job’s output.
Methods:

get_environment_variable_value

get_environment_variable_value(self, name: str) -> str | None
Returns the value of an environment variable from the job manifest.

prepare_for_flow_run

prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: 'DeploymentResponse | None' = None, flow: 'APIFlow | None' = None, work_pool: 'WorkPool | None' = None, worker_name: str | None = None, worker_id: 'UUID | None' = None)
Prepares the job configuration for a flow run. Ensures that necessary values are present in the job manifest and that the job manifest is valid. Args:
  • flow_run: The flow run to prepare the job configuration for
  • deployment: The deployment associated with the flow run used for preparation.
  • flow: The flow associated with the flow run used for preparation.
  • work_pool: The work pool associated with the flow run used for preparation.
  • worker_name: The name of the worker used for preparation.

KubernetesWorkerVariables

Default variables for the Kubernetes worker. The schema for this class is used to populate the variables section of the default base job template.

KubernetesWorkerResult

Contains information about the final state of a completed process

KubernetesWorker

Prefect worker that executes flow runs within Kubernetes Jobs. Methods:

kill_infrastructure

kill_infrastructure(self, infrastructure_pid: str, configuration: KubernetesWorkerJobConfiguration, grace_seconds: int = 30) -> None
Kill a Kubernetes job by deleting it. Args:
  • infrastructure_pid: The infrastructure identifier in format “namespace:job_name”.
  • configuration: The job configuration used to connect to the cluster.
  • grace_seconds: Time to allow for graceful shutdown before force killing.
Raises:
  • InfrastructureNotFound: If the job doesn’t exist.
  • InfrastructureNotAvailable: If unable to connect to the cluster.

run

run(self, flow_run: 'FlowRun', configuration: KubernetesWorkerJobConfiguration, task_status: anyio.abc.TaskStatus[int] | None = None) -> KubernetesWorkerResult
Executes a flow run within a Kubernetes Job and waits for the flow run to complete. Args:
  • flow_run: The flow run to execute
  • configuration: The configuration to use when executing the flow run.
  • task_status: The task status object for the current flow run. If provided, the task will be marked as started.
Returns:
  • A result object containing information about the final state of the flow run

teardown

teardown(self, *exc_info: Any)