prefect-kubernetes contains Prefect tasks, flows, and blocks enabling orchestration, observation and management of Kubernetes resources.

This library is most commonly used for installation with a Kubernetes worker. See the Prefect docs on deploying with Kubernetes to learn how to create and run deployments in Kubernetes.

Prefect provides a Helm chart for deploying a worker, a self-hosted Prefect server instance, and other resources to a Kubernetes cluster. See the Prefect Helm chart for more information.

Getting started

Prerequisites

Install prefect-kubernetes

The following command will install a version of prefect-kubernetes compatible with your installed version of prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.

pip install "prefect[kubernetes]"

Upgrade to the latest versions of prefect and prefect-kubernetes:

pip install -U "prefect[kubernetes]"

Register newly installed block types

Register the block types in the prefect-kubernetes module to make them available for use.

prefect block register -m prefect_kubernetes

Examples

Use with_options to customize options on an existing task or flow

from prefect_kubernetes.flows import run_namespaced_job

customized_run_namespaced_job = run_namespaced_job.with_options(
    name="My flow running a Kubernetes Job",
    retries=2,
    retry_delay_seconds=10,
) # this is now a new flow object that can be called

Specify and run a Kubernetes Job from a YAML file

from prefect import flow, get_run_logger
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.flows import run_namespaced_job # this is a flow
from prefect_kubernetes.jobs import KubernetesJob

k8s_creds = KubernetesCredentials.load("k8s-creds")

job = KubernetesJob.from_yaml_file( # or create in the UI with a dict manifest
    credentials=k8s_creds,
    manifest_path="path/to/job.yaml",
)

job.save("my-k8s-job", overwrite=True)


@flow
def kubernetes_orchestrator():
    # run the flow and send logs to the parent flow run's logger
    logger = get_run_logger()
    run_namespaced_job(job, print_func=logger.info)


if __name__ == "__main__":
    kubernetes_orchestrator()

As with all Prefect flows and tasks, you can call the underlying function directly if you don’t need Prefect features:

run_namespaced_job.fn(job, print_func=print)

Generate a resource-specific client from KubernetesClusterConfig

# with minikube / docker desktop & a valid ~/.kube/config this should ~just work~
from prefect_kubernetes.credentials import KubernetesCredentials, KubernetesClusterConfig

k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')

k8s_credentials = KubernetesCredentials(cluster_config=k8s_config)

with k8s_credentials.get_client("core") as v1_core_client:
    for namespace in v1_core_client.list_namespace().items:
        print(namespace.metadata.name)

List jobs in a namespace

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.jobs import list_namespaced_job


@flow
def kubernetes_orchestrator():
    v1_job_list = list_namespaced_job(
        kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
        namespace="my-namespace",
    )

For assistance using Kubernetes, consult the Kubernetes documentation.

Refer to the prefect-kubernetes SDK documentation linked in the sidebar to explore all the capabilities of the prefect-kubernetes library.