prefect-kubernetes
prefect-kubernetes
contains Prefect tasks, flows, and blocks enabling orchestration, observation and management of Kubernetes resources.
This library is most commonly used for installation with a Kubernetes worker. See the Prefect docs on deploying with Kubernetes to learn how to create and run deployments in Kubernetes.
Prefect provides a Helm chart for deploying a worker, a self-hosted Prefect server instance, and other resources to a Kubernetes cluster. See the Prefect Helm chart for more information.
Getting started
Prerequisites
Install prefect-kubernetes
The following command will install a version of prefect-kubernetes
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
pip install "prefect[kubernetes]"
Upgrade to the latest versions of prefect
and prefect-kubernetes
:
pip install -U "prefect[kubernetes]"
Register newly installed block types
Register the block types in the prefect-kubernetes
module to make them available for use.
prefect block register -m prefect_kubernetes
Examples
Use with_options
to customize options on an existing task or flow
from prefect_kubernetes.flows import run_namespaced_job
customized_run_namespaced_job = run_namespaced_job.with_options(
name="My flow running a Kubernetes Job",
retries=2,
retry_delay_seconds=10,
) # this is now a new flow object that can be called
Specify and run a Kubernetes Job from a YAML file
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.flows import run_namespaced_job # this is a flow
from prefect_kubernetes.jobs import KubernetesJob
k8s_creds = KubernetesCredentials.load("k8s-creds")
job = KubernetesJob.from_yaml_file( # or create in the UI with a dict manifest
credentials=k8s_creds,
manifest_path="path/to/job.yaml",
)
job.save("my-k8s-job", overwrite=True)
if __name__ == "__main__":
# run the flow
run_namespaced_job(job)
Generate a resource-specific client from KubernetesClusterConfig
# with minikube / docker desktop & a valid ~/.kube/config this should ~just work~
from prefect_kubernetes.credentials import KubernetesCredentials, KubernetesClusterConfig
k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')
k8s_credentials = KubernetesCredentials(cluster_config=k8s_config)
with k8s_credentials.get_client("core") as v1_core_client:
for namespace in v1_core_client.list_namespace().items:
print(namespace.metadata.name)
List jobs in a namespace
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.jobs import list_namespaced_job
@flow
def kubernetes_orchestrator():
v1_job_list = list_namespaced_job(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
namespace="my-namespace",
)
Patch an existing Kubernetes deployment
from kubernetes.client.models import V1Deployment
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import patch_namespaced_deployment
from prefect_kubernetes.utilities import convert_manifest_to_model
@flow
def kubernetes_orchestrator():
v1_deployment_updates = convert_manifest_to_model(
manifest="path/to/manifest.yaml",
v1_model_name="V1Deployment",
)
v1_deployment = patch_namespaced_deployment(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
deployment_name="my-deployment",
deployment_updates=v1_deployment_updates,
namespace="my-namespace"
)
For assistance using Kubernetes, consult the Kubernetes documentation.
Refer to the prefect-kubernetes
SDK documentation linked in the sidebar to explore all the capabilities of the prefect-kubernetes
library.