How to submit flows directly to dynamic infrastructure
Submit flows directly to different infrastructure types without a deployment
Beta FeatureThis feature is currently in beta. While we encourage you to try it out and provide feedback, please be aware that the API may change in future releases, potentially including breaking changes.
Prefect allows you to submit workflows directly to different infrastructure types without requiring a deployment. This enables you to dynamically choose where your workflows run based on their requirements, such as:
Training machine learning models that require GPUs
Processing large datasets that need significant memory
Running lightweight tasks that can use minimal resources
To enable Prefect to run workflows in remote infrastructure, work pools need an associated storage location to store serialized versions of submitted workflows and results from workflow runs.Configure storage for your work pools using one of the supported storage types:
To allow Prefect to upload and download serialized workflows, you can create a block containing credentials with permission to access your configured storage location.If a credentials block is not provided, Prefect will use the default credentials (for example, a local profile or an IAM role) as determined by the corresponding cloud provider.You can inspect your storage configuration using:
Copy
prefect work-pool storage inspect WORK_POOL_NAME
Local storage for @dockerWhen using the @docker decorator with a local Docker engine, you can use volume mounts to share data between your Docker container and host machine.Here’s an example:
An infrastructure-bound flow supports three execution modes: direct calling, .submit(), and .submit_to_work_pool(). Each mode targets a different use case depending on whether you need blocking or non-blocking execution and whether the submitting machine has direct access to the target infrastructure.
Calling an infrastructure-bound flow directly submits it to remote infrastructure and blocks until the run completes. Prefect spins up a temporary local worker to create the infrastructure and monitor the run.
Copy
from prefect import flowfrom prefect_kubernetes.experimental.decorators import kubernetes@kubernetes(work_pool="olympic")@flowdef my_remote_flow(name: str): print(f"Hello {name}!")@flowdef my_flow(): # Blocks until my_remote_flow completes on Kubernetes my_remote_flow("Marvin")my_flow()
When you run this code on your machine, my_flow executes locally, while my_remote_flow is submitted to run in a Kubernetes job. The call blocks until the Kubernetes job finishes.
Use .submit() when you want to submit a flow to remote infrastructure without blocking the caller. Like a direct call, .submit() spins up a temporary local worker to create the infrastructure, but it returns a PrefectFlowRunFuture immediately so you can continue running other work.Use .submit() when:
The submitting machine has access to the target infrastructure (for example, it can connect to the Kubernetes cluster or has permissions to create an ECS task)
You want to run multiple infrastructure-bound flows concurrently
You don’t have a worker already running for the work pool
Copy
from prefect import flowfrom prefect_kubernetes.experimental.decorators import kubernetes@kubernetes(work_pool="olympic")@flowdef train_model(dataset: str): print(f"Training on {dataset}") return {"accuracy": 0.95}@flowdef orchestrator(): # Submit two training jobs without waiting future_a = train_model.submit(dataset="dataset-a") future_b = train_model.submit(dataset="dataset-b") # Retrieve results when needed result_a = future_a.result() result_b = future_b.result() print(f"Results: {result_a}, {result_b}")orchestrator()
In this example, both training jobs are submitted to Kubernetes concurrently. The orchestrator flow continues executing and only blocks when it calls .result() on each future.
Submitting to a work pool with .submit_to_work_pool()
Use .submit_to_work_pool() when you want to submit a flow to remote infrastructure but the submitting machine does not have direct access to create that infrastructure. Instead of spinning up a local worker, this method creates a flow run and places it in the work pool for an already-running worker to pick up.Use .submit_to_work_pool() when:
The submitting machine cannot connect to the target infrastructure (for example, it cannot reach the Kubernetes cluster or lacks permissions to create an ECS task)
You already have a worker running that polls the target work pool
You want to separate the submission environment from the execution environment
Copy
from prefect import flowfrom prefect_aws.experimental import ecs@ecs(work_pool="my-ecs-pool")@flowdef process_data(source: str): print(f"Processing {source}") return {"rows": 1000}# Submit to the work pool for an existing worker to executefuture = process_data.submit_to_work_pool(source="s3://my-bucket/data.csv")# Retrieve the result once the worker completes the runresult = future.result()print(result)
Before calling .submit_to_work_pool(), start a worker that polls the target work pool:
Both .submit() and .submit_to_work_pool() return a PrefectFlowRunFuture. Use this object to check the status of the flow run, wait for it to finish, or retrieve the result.
Copy
future = my_flow.submit(name="Marvin")# Check the current state without blockingprint(future.state)# Block until the run completesfuture.wait()# Retrieve the result (blocks if the run is still in progress)result = future.result()
Parameters must be serializableParameters passed to infrastructure-bound flows are serialized with cloudpickle to allow them to be transported to the destination infrastructure.Most Python objects can be serialized with cloudpickle, but objects like database connections cannot be serialized. For parameters that cannot be serialized, create the object inside your infrastructure-bound workflow.
When a flow runs on remote infrastructure, your code is serialized and sent to the execution environment. However, non-Python files such as configuration files, data files, or model artifacts are not included by default. Use the include_files parameter on any infrastructure decorator to bundle additional files alongside your flow.
Copy
from prefect import flowfrom prefect_docker.experimental import docker@docker( work_pool="my-pool", include_files=["config.yaml", "data/"])@flowdef my_flow(): import yaml with open("config.yaml") as f: config = yaml.safe_load(f) print(config)
The include_files parameter accepts a list of relative paths and glob patterns. Paths are resolved relative to the directory containing the flow file.
If a .prefectignore file exists in the flow file’s directory or at the project root (detected via pyproject.toml), its patterns are applied to filter out matching files. The .prefectignore file uses gitignore-style syntax:
Copy
# .prefectignore*.logtmp/__pycache__/
Files that match a .prefectignore pattern are excluded from the bundle even if they match an include_files pattern.
Certain common directories and file types are always excluded from directory and glob collection, even without a .prefectignore file:
__pycache__/, *.pyc, *.pyo
.git/, .hg/, .svn/
node_modules/, .venv/, venv/
.idea/, .vscode/
.DS_Store, Thumbs.db
Hidden files and directories (names starting with .) are also excluded when collecting directories.
Files matching sensitive patterns such as .env*, *.pem, *.key, or credentials.* are bundled without special treatment. Add sensitive files to .prefectignore to prevent accidental inclusion.