Skip to main content

prefect_aws.workers.ecs_worker

Prefect worker for executing flow runs as ECS tasks. Get started by creating a work pool:
$ prefect work-pool create --type ecs my-ecs-pool
Then, you can start a worker for the pool:
$ prefect worker start --pool my-ecs-pool
It’s common to deploy the worker as an ECS task as well. However, you can run the worker locally to get started. The worker may work without any additional configuration, but it is dependent on your specific AWS setup and we’d recommend opening the work pool editor in the UI to see the available options. By default, the worker will register a task definition for each flow run and run a task in your default ECS cluster using AWS Fargate. Fargate requires tasks to configure subnets, which we will infer from your default VPC. If you do not have a default VPC, you must provide a VPC ID or manually setup the network configuration for your tasks. Note, the worker caches task definitions for each deployment to avoid excessive registration. The worker will check that the cached task definition is compatible with your configuration before using it. The launch type option can be used to run your tasks in different modes. For example, FARGATE_SPOT can be used to use spot instances for your Fargate tasks or EC2 can be used to run your tasks on a cluster backed by EC2 instances. Generally, it is very useful to enable CloudWatch logging for your ECS tasks; this can help you debug task failures. To enable CloudWatch logging, you must provide an execution role ARN with permissions to create and write to log streams. See the configure_cloudwatch_logs field documentation for details. The worker can be configured to use an existing task definition by setting the task definition arn variable or by providing a “taskDefinition” in the task run request. When a task definition is provided, the worker will never create a new task definition which may result in variables that are templated into the task definition payload being ignored.

Functions

parse_identifier

parse_identifier(identifier: str) -> ECSIdentifier
Splits identifier into its cluster and task components, e.g. input “cluster_name::task_arn” outputs (“cluster_name”, “task_arn”).

mask_sensitive_env_values

mask_sensitive_env_values(task_run_request: dict, values: List[str], keep_length = 3, replace_with = '***')

mask_api_key

mask_api_key(task_run_request)

Classes

ECSIdentifier

The identifier for a running ECS task.

CapacityProvider

The capacity provider strategy to use when running the task.

ECSJobConfiguration

Job configuration for an ECS worker. Methods:

at_least_one_container_is_essential

at_least_one_container_is_essential(self) -> Self
Ensures that at least one container will be marked as essential in the task definition.

cloudwatch_logs_options_requires_configure_cloudwatch_logs

cloudwatch_logs_options_requires_configure_cloudwatch_logs(self) -> Self
Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging.

configure_cloudwatch_logs_requires_execution_role_arn

configure_cloudwatch_logs_requires_execution_role_arn(self) -> Self
Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging.

container_name_default_from_task_definition

container_name_default_from_task_definition(self) -> Self
Infers the container name from the task definition if not provided.

json_template

json_template(cls) -> dict[str, Any]
Returns a dict with job configuration as keys and the corresponding templates as values Defaults to using the job configuration parameter name as the template variable name. e.g.
{
    key1: '{{ key1 }}',     # default variable template
    key2: '{{ template2 }}', # `template2` specifically provide as template
}

network_configuration_requires_vpc_id

network_configuration_requires_vpc_id(self) -> Self
Enforces a vpc_id is provided when custom network configuration mode is enabled for network settings.

prepare_for_flow_run

prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: 'DeploymentResponse | None' = None, flow: 'APIFlow | None' = None, work_pool: 'WorkPool | None' = None, worker_name: str | None = None, worker_id: 'UUID | None' = None) -> None

task_run_request_requires_arn_if_no_task_definition_given

task_run_request_requires_arn_if_no_task_definition_given(self) -> Self
If no task definition is provided, a task definition ARN must be present on the task run request.

ECSVariables

Variables for templating an ECS job.

ECSWorkerResult

The result of an ECS job.

ECSWorker

A Prefect worker to run flow runs as ECS tasks. Methods:

kill_infrastructure

kill_infrastructure(self, infrastructure_pid: str, configuration: ECSJobConfiguration, grace_seconds: int = 30) -> None
Stop an ECS task. Args:
  • infrastructure_pid: The infrastructure identifier in format “cluster::task_arn”.
  • configuration: The job configuration used to connect to AWS.
  • grace_seconds: Not used for ECS (ECS handles graceful shutdown internally).
Raises:
  • InfrastructureNotFound: If the task doesn’t exist.

run

run(self, flow_run: 'FlowRun', configuration: ECSJobConfiguration, task_status: Optional[anyio.abc.TaskStatus] = None) -> ECSWorkerResult
Runs a given flow run on the current worker.