Why use prefect-aws?

prefect-aws offers significant advantages over direct boto3 integration:
  • Production-ready integrations: Pre-built, tested components that handle common AWS patterns and edge cases
  • Unified credential management: Secure, centralized authentication that works consistently across all AWS services
  • Built-in observability: Automatic logging, monitoring, and state tracking for all AWS operations
  • Infrastructure as code: Deploy and scale workflows on AWS ECS with minimal configuration

Getting started

Prerequisites

  • An AWS account and the necessary permissions to access desired services.

Install prefect-aws

The following command will install a version of prefect-aws compatible with your installed version of prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.
pip install "prefect[aws]"
Upgrade to the latest versions of prefect and prefect-aws:
pip install -U "prefect[aws]"

Blocks setup

Credentials

Most AWS services requires an authenticated session. Prefect makes it simple to provide credentials via AWS Credentials blocks. Steps:
  1. Refer to the AWS Configuration documentation to retrieve your access key ID and secret access key.
  2. Copy the access key ID and secret access key.
  3. Create an AwsCredentials block in the Prefect UI or use a Python script like the one below.
from prefect_aws import AwsCredentials


AwsCredentials(
    aws_access_key_id="PLACEHOLDER",
    aws_secret_access_key="PLACEHOLDER",
    aws_session_token=None,  # replace this with token if necessary
    region_name="us-east-2"
).save("BLOCK-NAME-PLACEHOLDER")
Prefect uses the Boto3 library under the hood. To find credentials for authentication, any data not provided to the block are sourced at runtime in the order shown in the Boto3 docs. Prefect creates the session object using the values in the block and then, any missing values follow the sequence in the Boto3 docs.

S3

Create a block for reading and writing files to S3.
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket

S3Bucket(
    bucket_name="BUCKET-NAME-PLACEHOLDER",
    credentials=aws_credentials
).save("S3-BLOCK-NAME-PLACEHOLDER")

Lambda

Invoke AWS Lambdas, synchronously or asynchronously.
from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials

LambdaFunction(
    function_name="test_lambda_function",
    aws_credentials=credentials,
).save("LAMBDA-BLOCK-NAME-PLACEHOLDER")

Secret Manager

Create a block to read, write, and delete AWS Secret Manager secrets.
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import AwsSecret

AwsSecret(
    secret_name="test_secret_name",
    aws_credentials=credentials,
).save("AWS-SECRET-BLOCK-NAME-PLACEHOLDER")

Supported AWS services

prefect-aws provides comprehensive integrations for key AWS services:
ServiceIntegration TypeUse Cases
S3S3Bucket blockFile storage, data lake operations, deployment storage
Secrets ManagerAwsSecret blockSecure credential storage, API key management
LambdaLambdaFunction blockServerless function execution, event-driven processing
GlueGlueJobBlock blockETL operations, data transformation pipelines
ECSECSWorker infrastructureContainer orchestration, scalable compute workloads
Batchbatch_submit taskHigh-throughput computing, batch job processing
Integration types:
  • Blocks: Reusable configuration objects that can be saved and shared across flows
  • Tasks: Functions decorated with @task for direct use in flows
  • Workers: Infrastructure components for running flows on AWS compute services

Scale workflows with AWS infrastructure

ECS (Elastic Container Service)

Deploy and scale your Prefect workflows on AWS ECS for production workloads. prefect-aws provides:
  • ECS worker: Long-running worker for hybrid deployments with full control over execution environment
  • Auto-scaling: Dynamic resource allocation based on workflow demands
  • Cost optimization: Pay only for compute resources when workflows are running
See the ECS worker deployment guide for a step-by-step walkthrough of deploying production-ready workers to your ECS cluster.

Docker Images

Pre-built Docker images with prefect-aws are available for simplified deployment:
docker pull prefecthq/prefect-aws:latest

Available Tags

Image tags have the following format:
  • prefecthq/prefect-aws:latest - Latest stable release with Python 3.12
  • prefecthq/prefect-aws:latest-python3.11 - Latest stable with Python 3.11
  • prefecthq/prefect-aws:0.5.9-python3.12 - Specific prefect-aws version with Python 3.12
  • prefecthq/prefect-aws:0.5.9-python3.12-prefect3.4.9 - Full version specification

Usage Examples

Running an ECS worker:
docker run -d \
  --name prefect-ecs-worker \
  -e PREFECT_API_URL=https://api.prefect.cloud/api/accounts/your-account/workspaces/your-workspace \
  -e PREFECT_API_KEY=your-api-key \
  prefecthq/prefect-aws:latest \
  prefect worker start --pool ecs-pool
Local development:
docker run -it --rm \
  -v $(pwd):/opt/prefect \
  prefecthq/prefect-aws:latest \
  python your_flow.py

Examples

Read and write files to AWS S3

Upload a file to an AWS S3 bucket and download the same file under a different filename. The following code assumes that the bucket already exists:
from pathlib import Path
from prefect import flow
from prefect_aws import AwsCredentials, S3Bucket


@flow
def s3_flow():
    # create a dummy file to upload
    file_path = Path("test-example.txt")
    file_path.write_text("Hello, Prefect!")

    aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
    s3_bucket = S3Bucket(
        bucket_name="BUCKET-NAME-PLACEHOLDER",
        credentials=aws_credentials
    )

    s3_bucket_path = s3_bucket.upload_from_path(file_path)
    downloaded_file_path = s3_bucket.download_object_to_path(
        s3_bucket_path, "downloaded-test-example.txt"
    )
    return downloaded_file_path.read_text()


if __name__ == "__main__":
    s3_flow()

Access secrets with AWS Secrets Manager

Write a secret to AWS Secrets Manager, read the secret data, delete the secret, and return the secret data.
from prefect import flow
from prefect_aws import AwsCredentials, AwsSecret


@flow
def secrets_manager_flow():
    aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
    aws_secret = AwsSecret(secret_name="test-example", aws_credentials=aws_credentials)
    aws_secret.write_secret(secret_data=b"Hello, Prefect!")
    secret_data = aws_secret.read_secret()
    aws_secret.delete_secret()
    return secret_data


if __name__ == "__main__":
    secrets_manager_flow()

Invoke lambdas

from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials

credentials = AwsCredentials()
lambda_function = LambdaFunction(
    function_name="test_lambda_function",
    aws_credentials=credentials,
)
response = lambda_function.invoke(
    payload={"foo": "bar"},
    invocation_type="RequestResponse",
)
response["Payload"].read()

Submit AWS Glue jobs

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock


@flow
def example_run_glue_job():
    aws_credentials = AwsCredentials(
        aws_access_key_id="your_access_key_id",
        aws_secret_access_key="your_secret_access_key"
    )
    glue_job_run = GlueJobBlock(
        job_name="your_glue_job_name",
        arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
    ).trigger()

    return glue_job_run.wait_for_completion()


if __name__ == "__main__":
    example_run_glue_job()

Submit AWS Batch jobs

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.batch import batch_submit


@flow
def example_batch_submit_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="access_key_id",
        aws_secret_access_key="secret_access_key"
    )
    job_id = batch_submit(
        "job_name",
        "job_queue",
        "job_definition",
        aws_credentials
    )
    return job_id


if __name__ == "__main__":
    example_batch_submit_flow()

Resources

Documentation

AWS Resources