Skip to main content
Beta FeatureThis feature is currently in beta. While we encourage you to try it out and provide feedback, please be aware that the API may change in future releases, potentially including breaking changes.
Prefect allows you to submit workflows directly to different infrastructure types without requiring a deployment. This enables you to dynamically choose where your workflows run based on their requirements, such as:
  • Training machine learning models that require GPUs
  • Processing large datasets that need significant memory
  • Running lightweight tasks that can use minimal resources

Benefits

Submitting workflows directly to dynamic infrastructure provides several advantages:
  • Dynamic resource allocation: Choose infrastructure based on workflow requirements at runtime
  • Cost efficiency: Use expensive infrastructure only when needed
  • Consistency: Ensure workflows always run on the appropriate infrastructure type
  • Simplified workflow management: No need to create and maintain deployments for different infrastructure types

Supported infrastructure

Direct submission of workflows is currently supported for the following infrastructures:
InfrastructureRequired PackageDecorator
Dockerprefect-docker@docker
Kubernetesprefect-kubernetes@kubernetes
AWS ECSprefect-aws@ecs
Google Cloud Runprefect-gcp@cloud_run
Google Vertex AIprefect-gcp@vertex_ai
Azure Container Instancesprefect-azure@azure_container_instance
Each package can be installed using pip, for example:
pip install prefect-docker

Prerequisites

Before submitting workflows to specific infrastructure, you need:
  1. A work pool for each infrastructure type you want to use
  2. Object storage to associate with your work pool(s)

Setting up work pools and storage

Creating a work pool

Create work pools for each infrastructure type using the Prefect CLI:
prefect work-pool create NAME --type WORK_POOL_TYPE
For detailed information on creating and configuring work pools, refer to the work pools documentation.

Configuring work pool storage

To enable Prefect to run workflows in remote infrastructure, work pools need an associated storage location to store serialized versions of submitted workflows and results from workflow runs. Configure storage for your work pools using one of the supported storage types:
prefect work-pool storage configure s3 WORK_POOL_NAME \
    --bucket BUCKET_NAME \
    --aws-credentials-block-name BLOCK_NAME
To allow Prefect to upload and download serialized workflows, you can create a block containing credentials with permission to access your configured storage location. If a credentials block is not provided, Prefect will use the default credentials (for example, a local profile or an IAM role) as determined by the corresponding cloud provider. You can inspect your storage configuration using:
prefect work-pool storage inspect WORK_POOL_NAME
Local storage for @dockerWhen using the @docker decorator with a local Docker engine, you can use volume mounts to share data between your Docker container and host machine.Here’s an example:
from prefect import flow
from prefect.filesystems import LocalFileSystem
from prefect_docker.experimental import docker


result_storage = LocalFileSystem(basepath="/tmp/results")
result_storage.save("result-storage", overwrite=True)


@docker(
    work_pool="above-ground",
    volumes=["/tmp/results:/tmp/results"],
)
@flow(result_storage=result_storage)
def run_in_docker(name: str):
    return(f"Hello, {name}!")


print(run_in_docker("world")) # prints "Hello, world!"
To use local storage, ensure that:
  1. The volume mount path is identical on both the host and container side
  2. The LocalFileSystem block’s basepath matches the path specified in the volume mount

Running infrastructure-bound flows

An infrastructure-bound flow supports three execution modes: direct calling, .submit(), and .submit_to_work_pool(). Each mode targets a different use case depending on whether you need blocking or non-blocking execution and whether the submitting machine has direct access to the target infrastructure.
MethodBlockingRequires local infrastructure accessRequires a running worker
Direct callYesYesNo
.submit()NoYesNo
.submit_to_work_pool()NoNoYes

Direct call (blocking)

Calling an infrastructure-bound flow directly submits it to remote infrastructure and blocks until the run completes. Prefect spins up a temporary local worker to create the infrastructure and monitor the run.
from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes


@kubernetes(work_pool="olympic")
@flow
def my_remote_flow(name: str):
    print(f"Hello {name}!")

@flow
def my_flow():
    # Blocks until my_remote_flow completes on Kubernetes
    my_remote_flow("Marvin")

my_flow()
When you run this code on your machine, my_flow executes locally, while my_remote_flow is submitted to run in a Kubernetes job. The call blocks until the Kubernetes job finishes.

Non-blocking submission with .submit()

Use .submit() when you want to submit a flow to remote infrastructure without blocking the caller. Like a direct call, .submit() spins up a temporary local worker to create the infrastructure, but it returns a PrefectFlowRunFuture immediately so you can continue running other work. Use .submit() when:
  • The submitting machine has access to the target infrastructure (for example, it can connect to the Kubernetes cluster or has permissions to create an ECS task)
  • You want to run multiple infrastructure-bound flows concurrently
  • You don’t have a worker already running for the work pool
from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes


@kubernetes(work_pool="olympic")
@flow
def train_model(dataset: str):
    print(f"Training on {dataset}")
    return {"accuracy": 0.95}


@flow
def orchestrator():
    # Submit two training jobs without waiting
    future_a = train_model.submit(dataset="dataset-a")
    future_b = train_model.submit(dataset="dataset-b")

    # Retrieve results when needed
    result_a = future_a.result()
    result_b = future_b.result()
    print(f"Results: {result_a}, {result_b}")

orchestrator()
In this example, both training jobs are submitted to Kubernetes concurrently. The orchestrator flow continues executing and only blocks when it calls .result() on each future.

Submitting to a work pool with .submit_to_work_pool()

Use .submit_to_work_pool() when you want to submit a flow to remote infrastructure but the submitting machine does not have direct access to create that infrastructure. Instead of spinning up a local worker, this method creates a flow run and places it in the work pool for an already-running worker to pick up. Use .submit_to_work_pool() when:
  • The submitting machine cannot connect to the target infrastructure (for example, it cannot reach the Kubernetes cluster or lacks permissions to create an ECS task)
  • You already have a worker running that polls the target work pool
  • You want to separate the submission environment from the execution environment
from prefect import flow
from prefect_aws.experimental import ecs


@ecs(work_pool="my-ecs-pool")
@flow
def process_data(source: str):
    print(f"Processing {source}")
    return {"rows": 1000}


# Submit to the work pool for an existing worker to execute
future = process_data.submit_to_work_pool(source="s3://my-bucket/data.csv")

# Retrieve the result once the worker completes the run
result = future.result()
print(result)
Before calling .submit_to_work_pool(), start a worker that polls the target work pool:
prefect worker start --pool my-ecs-pool

Working with PrefectFlowRunFuture

Both .submit() and .submit_to_work_pool() return a PrefectFlowRunFuture. Use this object to check the status of the flow run, wait for it to finish, or retrieve the result.
future = my_flow.submit(name="Marvin")

# Check the current state without blocking
print(future.state)

# Block until the run completes
future.wait()

# Retrieve the result (blocks if the run is still in progress)
result = future.result()
Parameters must be serializableParameters passed to infrastructure-bound flows are serialized with cloudpickle to allow them to be transported to the destination infrastructure.Most Python objects can be serialized with cloudpickle, but objects like database connections cannot be serialized. For parameters that cannot be serialized, create the object inside your infrastructure-bound workflow.

Customizing infrastructure configuration

You can override the default configuration by providing additional kwargs to the infrastructure decorator:
from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes


@kubernetes(
    work_pool="my-kubernetes-pool",
    namespace="custom-namespace"
)
@flow
def custom_namespace_flow():
    pass
Any kwargs passed to the infrastructure decorator will override the corresponding default value in the base job template for the specified work pool.

Including files in the bundle

When a flow runs on remote infrastructure, your code is serialized and sent to the execution environment. However, non-Python files such as configuration files, data files, or model artifacts are not included by default. Use the include_files parameter on any infrastructure decorator to bundle additional files alongside your flow.
from prefect import flow
from prefect_docker.experimental import docker


@docker(
    work_pool="my-pool",
    include_files=["config.yaml", "data/"]
)
@flow
def my_flow():
    import yaml

    with open("config.yaml") as f:
        config = yaml.safe_load(f)
    print(config)
The include_files parameter accepts a list of relative paths and glob patterns. Paths are resolved relative to the directory containing the flow file.

Supported patterns

PatternDescription
"config.yaml"A single file
"data/"All files in a directory (recursive)
"*.yaml"Glob pattern matching files in the flow directory
"data/**/*.csv"Recursive glob pattern
"!*.test.py"Negation pattern to exclude previously matched files
Patterns are processed in order. Negation patterns (prefixed with !) remove files already matched by earlier patterns:
from prefect import flow
from prefect_kubernetes.experimental import kubernetes


@kubernetes(
    work_pool="my-pool",
    include_files=["*.json", "!fixtures/*.json"]
)
@flow
def process_json():
    ...
This example includes all JSON files except those in the fixtures/ directory.

Filtering with .prefectignore

If a .prefectignore file exists in the flow file’s directory or at the project root (detected via pyproject.toml), its patterns are applied to filter out matching files. The .prefectignore file uses gitignore-style syntax:
# .prefectignore
*.log
tmp/
__pycache__/
Files that match a .prefectignore pattern are excluded from the bundle even if they match an include_files pattern.

Default exclusions

Certain common directories and file types are always excluded from directory and glob collection, even without a .prefectignore file:
  • __pycache__/, *.pyc, *.pyo
  • .git/, .hg/, .svn/
  • node_modules/, .venv/, venv/
  • .idea/, .vscode/
  • .DS_Store, Thumbs.db
Hidden files and directories (names starting with .) are also excluded when collecting directories.
Files matching sensitive patterns such as .env*, *.pem, *.key, or credentials.* are bundled without special treatment. Add sensitive files to .prefectignore to prevent accidental inclusion.

Further reading