Skip to content

Infrastructure

Workers are recommended

Infrastructure blocks are part of the agent based deployment model. Work Pools and Workers simplify the specification of each flow's infrastructure and runtime environment. If you have existing agents, you can upgrade from agents to workers to significantly enhance the experience of deploying flows.

Users may specify an infrastructure block when creating a deployment. This block will be used to specify infrastructure for flow runs created by the deployment at runtime.

Infrastructure can only be used with a deployment. When you run a flow directly by calling the flow yourself, you are responsible for the environment in which the flow executes.

Infrastructure overview

Prefect uses infrastructure to create the environment for a user's flow to execute.

Infrastructure is attached to a deployment and is propagated to flow runs created for that deployment. Infrastructure is deserialized by the agent and it has two jobs:

  • Create execution environment infrastructure for the flow run.
  • Run a Python command to start the prefect.engine in the infrastructure, which retrieves the flow from storage and executes the flow.

The engine acquires and calls the flow. Infrastructure doesn't know anything about how the flow is stored, it's just passing a flow run ID to the engine.

Infrastructure is specific to the environments in which flows will run. Prefect currently provides the following infrastructure types:

What about tasks?

Flows and tasks can both use configuration objects to manage the environment in which code runs.

Flows use infrastructure.

Tasks use task runners. For more on how task runners work, see Task Runners.

Using infrastructure

You may create customized infrastructure blocks through the Prefect UI or Prefect Cloud Blocks page or create them in code and save them to the API using the blocks .save() method.

Once created, there are two distinct ways to use infrastructure in a deployment:

  • Starting with Prefect defaults — this is what happens when you pass the -i or --infra flag and provide a type when building deployment files.
  • Pre-configure infrastructure settings as blocks and base your deployment infrastructure on those settings — by passing -ib or --infra-block and a block slug when building deployment files.

For example, when creating your deployment files, the supported Prefect infrastrucure types are:

  • process
  • docker-container
  • kubernetes-job
  • ecs-task
  • cloud-run-job
  • container-instance-job
$ prefect deployment build ./my_flow.py:my_flow -n my-flow-deployment -t test -i docker-container -sb s3/my-bucket --override env.EXTRA_PIP_PACKAGES=s3fs
Found flow 'my-flow'
Successfully uploaded 2 files to s3://bucket-full-of-sunshine
Deployment YAML created at '/Users/terry/test/flows/infra/deployment.yaml'.

In this example we specify the DockerContainer infrastructure in addition to a preconfigured AWS S3 bucket storage block.

The default deployment YAML filename may be edited as needed to add an infrastructure type or infrastructure settings.

###
### A complete description of a Prefect Deployment for flow 'my-flow'
###
name: my-flow-deployment
description: null
version: e29de5d01b06d61b4e321d40f34a480c
# The work queue that will handle this deployment's runs
work_queue_name: default
work_pool_name: default-agent-pool
tags:
- test
parameters: {}
schedules: []
paused: true
infra_overrides:
  env.EXTRA_PIP_PACKAGES: s3fs
infrastructure:
  type: docker-container
  env: {}
  labels: {}
  name: null
  command:
  - python
  - -m
  - prefect.engine
  image: prefecthq/prefect:2-latest
  image_pull_policy: null
  networks: []
  network_mode: null
  auto_remove: false
  volumes: []
  stream_output: true
  memswap_limit: null
  mem_limit: null
  privileged: false
  block_type_slug: docker-container
  _block_type_slug: docker-container

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: my-flow
manifest_path: my_flow-manifest.json
storage:
  bucket_path: bucket-full-of-sunshine
  aws_access_key_id: '**********'
  aws_secret_access_key: '**********'
  _is_anonymous: true
  _block_document_name: anonymous-xxxxxxxx-f1ff-4265-b55c-6353a6d65333
  _block_document_id: xxxxxxxx-06c2-4c3c-a505-4a8db0147011
  block_type_slug: s3
  _block_type_slug: s3
path: ''
entrypoint: my_flow.py:my-flow
parameter_openapi_schema:
  title: Parameters
  type: object
  properties: {}
  required: null
  definitions: null
timestamp: '2023-02-08T23:00:14.974642+00:00'

Editing deployment YAML

Note the big DO NOT EDIT comment in the deployment YAML: In practice, anything above this block can be freely edited before running prefect deployment apply to create the deployment on the API.

Once the deployment exists, any flow runs that this deployment starts will use DockerContainer infrastructure.

You can also create custom infrastructure blocks — either in the Prefect UI for in code via the API — and use the settings in the block to configure your infastructure. For example, here we specify settings for Kubernetes infrastructure in a block named k8sdev.

from prefect.infrastructure import KubernetesJob, KubernetesImagePullPolicy

k8s_job = KubernetesJob(
    namespace="dev",
    image="prefecthq/prefect:2.0.0-python3.9",
    image_pull_policy=KubernetesImagePullPolicy.IF_NOT_PRESENT,
)
k8s_job.save("k8sdev")

Now we can apply the infrastrucure type and settings in the block by specifying the block slug kubernetes-job/k8sdev as the infrastructure type when building a deployment:

prefect deployment build flows/k8s_example.py:k8s_flow --name k8sdev --tag k8s -sb s3/dev -ib kubernetes-job/k8sdev

See Deployments for more information about deployment build options.

Configuring infrastructure

Every infrastrcture type has type-specific options.

Process

Process infrastructure runs a command in a new process.

Current environment variables and Prefect settings will be included in the created process. Configured environment variables will override any current environment variables.

Process supports the following settings:

Attributes Description
command A list of strings specifying the command to start the flow run. In most cases you should not override this.
env Environment variables to set for the new process.
labels Labels for the process. Labels are for metadata purposes only and cannot be attached to the process itself.
name A name for the process. For display purposes only.

DockerContainer

DockerContainer infrastructure executes flow runs in a container.

Requirements for DockerContainer:

  • Docker Engine must be available.
  • You must configure remote Storage. Local storage is not supported for Docker.
  • The API must be available from within the flow run container. To facilitate connections to locally hosted APIs, localhost and 127.0.0.1 will be replaced with host.docker.internal.
  • The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.

DockerContainer supports the following settings:

Attributes Description
auto_remove Bool indicating whether the container will be removed on completion. If False, the container will remain after exit for inspection.
command A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.
env Environment variables to set for the container.
image An optional string specifying the name of a Docker image to use. Defaults to the Prefect image. If the image is stored anywhere other than a public Docker Hub registry, use a corresponding registry block, e.g. DockerRegistry or ensure otherwise that your execution layer is authenticated to pull the image from the image registry.
image_pull_policy Specifies if the image should be pulled. One of 'ALWAYS', 'NEVER', 'IF_NOT_PRESENT'.
image_registry A DockerRegistry block containing credentials to use if image is stored in a private image registry.
labels An optional dictionary of labels, mapping name to value.
name An optional name for the container.
networks An optional list of strings specifying Docker networks to connect the container to.
network_mode Set the network mode for the created container. Defaults to 'host' if a local API url is detected, otherwise the Docker default of 'bridge' is used. If 'networks' is set, this cannot be set.
stream_output Bool indicating whether to stream output from the subprocess to local standard output.
volumes An optional list of volume mount strings in the format of "local_path:container_path".

Prefect automatically sets a Docker image matching the Python and Prefect version you're using at deployment time. You can see all available images at Docker Hub.

KubernetesJob

KubernetesJob infrastructure executes flow runs in a Kubernetes Job.

Requirements for KubernetesJob:

  • kubectl must be available.
  • You must configure remote Storage. Local storage is not supported for Kubernetes.
  • The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.

The Prefect CLI command prefect kubernetes manifest server automatically generates a Kubernetes manifest with default settings for Prefect deployments. By default, it simply prints out the YAML configuration for a manifest. You can pipe this output to a file of your choice and edit as necessary.

KubernetesJob supports the following settings:

Attributes Description
cluster_config An optional Kubernetes cluster config to use for this job.
command A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.
customizations A list of JSON 6902 patches to apply to the base Job manifest. Alternatively, a valid JSON string is allowed (handy for deployments CLI).
env Environment variables to set for the container.
finished_job_ttl The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed.
image String specifying the tag of a Docker image to use for the Job.
image_pull_policy The Kubernetes image pull policy to use for job containers.
job The base manifest for the Kubernetes Job.
job_watch_timeout_seconds Number of seconds to watch for job creation before timing out (defaults to None).
labels Dictionary of labels to add to the Job.
name An optional name for the job.
namespace String signifying the Kubernetes namespace to use.
pod_watch_timeout_seconds Number of seconds to watch for pod creation before timing out (default 60).
service_account_name An optional string specifying which Kubernetes service account to use.
stream_output Bool indicating whether to stream output from the subprocess to local standard output.

KubernetesJob overrides and customizations

When creating deployments using KubernetesJob infrastructure, the infra_overrides parameter expects a dictionary. For a KubernetesJob, the customizations parameter expects a list.

Containers expect a list of objects, even if there is only one. For any patches applying to the container, the path value should be a list, for example: /spec/templates/spec/containers/0/resources

A Kubernetes-Job infrastructure block defined in Python:

customizations = [
 {
     "op": "add",
     "path": "/spec/template/spec/containers/0/resources",
     "value": {
         "requests": {
             "cpu": "2000m",
             "memory": "4gi"
         },
         "limits": {
             "cpu": "4000m",
             "memory": "8Gi",
             "nvidia.com/gpu": "1"
      }
  },
 }
]

k8s_job = KubernetesJob(
        namespace=namespace,
        image=image_name,
        image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
        finished_job_ttl=300,
        job_watch_timeout_seconds=600,
        pod_watch_timeout_seconds=600,
        service_account_name="prefect-server",
        customizations=customizations,
    )
k8s_job.save("devk8s")

A Deployment with infra-overrides defined in Python:

infra_overrides={ 
    "customizations": [
            {
                "op": "add",
                "path": "/spec/template/spec/containers/0/resources",
                "value": {
                    "requests": {
                        "cpu": "2000m",
                        "memory": "4gi"
                    },
                    "limits": {
                        "cpu": "4000m",
                        "memory": "8Gi",
                        "nvidia.com/gpu": "1"
                }
            },
        }
    ]
}

# Load an already created K8s Block
k8sjob = k8s_job.load("devk8s")

deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="s3-example",
    version=2,
    work_queue_name="aws",
    infrastructure=k8sjob,
    storage=storage,
    infra_overrides=infra_overrides,
)

deployment.apply()

ECSTask

ECSTask infrastructure runs your flow in an ECS Task.

Requirements for ECSTask:

  • The ephemeral Prefect API won't work with ECS directly. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
  • The prefect-aws collection must be installed within the agent environment: pip install prefect-aws
  • The ECSTask and AwsCredentials blocks must be registered within the agent environment: prefect block register -m prefect_aws.ecs
  • You must configure remote Storage. Local storage is not supported for ECS tasks. The most commonly used type of storage with ECSTask is S3. If you leverage that type of block, make sure that s3fs is installed within your agent and flow run environment. The easiest way to satisfy all the installation-related points mentioned above is to include the following commands in your Dockerfile:
FROM prefecthq/prefect:2-python3.9  # example base image 
RUN pip install s3fs prefect-aws

Make sure to allocate enough CPU and memory to your agent, and consider adding retries

When you start a Prefect agent on AWS ECS Fargate, allocate as much CPU and memory as needed for your workloads. Your agent needs enough resources to appropriately provision infrastructure for your flow runs and to monitor their execution. Otherwise, your flow runs may get stuck in a Pending state. Alternatively, set a work-queue concurrency limit to ensure that the agent will not try to process all runs at the same time.

Some API calls to provision infrastructure may fail due to unexpected issues on the client side (for example, transient errors such as ConnectionError, HTTPClientError, or RequestTimeout), or due to server-side rate limiting from the AWS service. To mitigate those issues, we recommend adding environment variables such as AWS_MAX_ATTEMPTS (can be set to an integer value such as 10) and AWS_RETRY_MODE (can be set to a string value including standard or adaptive modes). Those environment variables must be added within the agent environment, e.g. on your ECS service running the agent, rather than on the ECSTask infrastructure block.

Docker images

Learn about options for Prefect-maintained Docker images in the Docker guide.