The prefect-aws integration makes it easy to leverage the capabilities of AWS in your workflows. For example, you can retrieve secrets using AWS Secrets Manager, read and write objects with AWS S3, and deploy your flows on AWS ECS.

Getting started

Prerequisites

  • An AWS account and the necessary permissions to access desired services.

Installation

Install prefect-aws as a dependency of Prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.

pip install "prefect[aws]"

Upgrade to the latest versions of prefect and prefect-aws:

pip install -U "prefect[aws]"

Register newly installed block types

Register the block types in prefect-aws to make them available for use.

prefect block register -m prefect_aws

Blocks setup

Credentials

Most AWS services requires an authenticated session. Prefect makes it simple to provide credentials via AWS Credentials blocks.

Steps:

  1. Refer to the AWS Configuration documentation to retrieve your access key ID and secret access key.
  2. Copy the access key ID and secret access key.
  3. Create an AwsCredentials block in the Prefect UI or use a Python script like the one below.
from prefect_aws import AwsCredentials


AwsCredentials(
    aws_access_key_id="PLACEHOLDER",
    aws_secret_access_key="PLACEHOLDER",
    aws_session_token=None,  # replace this with token if necessary
    region_name="us-east-2"
).save("BLOCK-NAME-PLACEHOLDER")

Prefect uses the Boto3 library under the hood. To find credentials for authentication, any data not provided to the block are sourced at runtime in the order shown in the Boto3 docs. Prefect creates the session object using the values in the block and then, any missing values follow the sequence in the Boto3 docs.

S3

Create a block for reading and writing files to S3.

from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket

S3Bucket(
    bucket_name="BUCKET-NAME-PLACEHOLDER",
    credentials=aws_credentials
).save("S3-BLOCK-NAME-PLACEHOLDER")

Lambda

Invoke AWS Lambdas, synchronously or asynchronously.

from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials

LambdaFunction(
    function_name="test_lambda_function",
    aws_credentials=credentials,
).save("LAMBDA-BLOCK-NAME-PLACEHOLDER")

Secret Manager

Create a block to read, write, and delete AWS Secret Manager secrets.

from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import AwsSecret

AwsSecret(
    secret_name="test_secret_name",
    aws_credentials=credentials,
).save("AWS-SECRET-BLOCK-NAME-PLACEHOLDER")

Run flows on AWS ECS

Run flows on AWS Elastic Container Service (ECS) to dynamically scale your infrastructure.

Prefect Cloud offers ECS push work pools. Push work pools submit runs directly to ECS, instead of requiring a worker to actively poll for flow runs to execute.

See the ECS work pool guide for a walkthrough of using ECS in a hybrid work pool.

Examples

Read and write files to AWS S3

Upload a file to an AWS S3 bucket and download the same file under a different filename. The following code assumes that the bucket already exists:

from pathlib import Path
from prefect import flow
from prefect_aws import AwsCredentials, S3Bucket


@flow
def s3_flow():
    # create a dummy file to upload
    file_path = Path("test-example.txt")
    file_path.write_text("Hello, Prefect!")

    aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
    s3_bucket = S3Bucket(
        bucket_name="BUCKET-NAME-PLACEHOLDER",
        credentials=aws_credentials
    )

    s3_bucket_path = s3_bucket.upload_from_path(file_path)
    downloaded_file_path = s3_bucket.download_object_to_path(
        s3_bucket_path, "downloaded-test-example.txt"
    )
    return downloaded_file_path.read_text()


if __name__ == "__main__":
    s3_flow()

Access secrets with AWS Secrets Manager

Write a secret to AWS Secrets Manager, read the secret data, delete the secret, and return the secret data.

from prefect import flow
from prefect_aws import AwsCredentials, AwsSecret


@flow
def secrets_manager_flow():
    aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
    aws_secret = AwsSecret(secret_name="test-example", aws_credentials=aws_credentials)
    aws_secret.write_secret(secret_data=b"Hello, Prefect!")
    secret_data = aws_secret.read_secret()
    aws_secret.delete_secret()
    return secret_data


if __name__ == "__main__":
    secrets_manager_flow()

Invoke lambdas

from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials

credentials = AwsCredentials()
lambda_function = LambdaFunction(
    function_name="test_lambda_function",
    aws_credentials=credentials,
)
response = lambda_function.invoke(
    payload={"foo": "bar"},
    invocation_type="RequestResponse",
)
response["Payload"].read()

Submit AWS Glue jobs

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock


@flow
def example_run_glue_job():
    aws_credentials = AwsCredentials(
        aws_access_key_id="your_access_key_id",
        aws_secret_access_key="your_secret_access_key"
    )
    glue_job_run = GlueJobBlock(
        job_name="your_glue_job_name",
        arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
    ).trigger()

    return glue_job_run.wait_for_completion()


if __name__ == "__main__":
    example_run_glue_job()

Submit AWS Batch jobs

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.batch import batch_submit


@flow
def example_batch_submit_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="access_key_id",
        aws_secret_access_key="secret_access_key"
    )
    job_id = batch_submit(
        "job_name",
        "job_queue",
        "job_definition",
        aws_credentials
    )
    return job_id


if __name__ == "__main__":
    example_batch_submit_flow()

Resources

For assistance using AWS, consult the AWS documentation and, in particular, the Boto3 documentation.

Refer to the prefect-aws SDK documentation linked in the sidebar to explore all the capabilities of the prefect-aws library.

Refer to the secrets documentation to see an example of using the AwsCredentials block with a third-party service like Snowflake. The example does not require you to save your AWS credentials as part of the Snowflake block.