Skip to main content

prefect_gcp.workers.cloud_run

<!— # noqa —> Module containing the Cloud Run worker used for executing flow runs as Cloud Run jobs. Get started by creating a Cloud Run work pool:
prefect work-pool create 'my-cloud-run-pool' --type cloud-run
Then start a Cloud Run worker with the following command:
prefect worker start --pool 'my-cloud-run-pool'

Configuration

Read more about configuring work pools here.

Advanced Configuration

!!! example “Using a custom Cloud Run job template” Below is the default job body template used by the Cloud Run Worker:
{
    "apiVersion": "run.googleapis.com/v1",
    "kind": "Job",
    "metadata":
        {
            "name": "{{ name }}",
            "annotations":
            {
                "run.googleapis.com/launch-stage": "BETA",
            }
        },
        "spec":
        {
            "template":
            {
                "spec":
                {
                    "template":
                    {
                        "spec":
                        {
                            "containers":
                            [
                                {
                                    "image": "{{ image }}",
                                    "args": "{{ args }}",
                                    "resources":
                                    {
                                        "limits":
                                        {
                                            "cpu": "{{ cpu }}",
                                            "memory": "{{ memory }}"
                                        },
                                        "requests":
                                        {
                                            "cpu": "{{ cpu }}",
                                            "memory": "{{ memory }}"
                                        }
                                    }
                                }
                            ],
                            "timeoutSeconds": "{{ timeout }}",
                            "serviceAccountName": "{{ service_account_name }}"
                        }
                    }
                }
                }
            },
            "metadata":
            {
                "annotations":
                {
                    "run.googleapis.com/vpc-access-connector": "{{ vpc_connector_name }}"
                }
            }
        },
    },
    "keep_job": "{{ keep_job }}"
}
Each values enclosed in {{ }} is a placeholder that will be replaced with a value at runtime on a per-deployment basis. The values that can be used a placeholders are defined by the variables schema defined in the base job template. The default job body template and available variables can be customized on a work pool by work pool basis. By editing the default job body template you can:
  • Add additional placeholders to the default job template
  • Remove placeholders from the default job template
  • Pass values to Cloud Run that are not defined in the variables schema

Adding additional placeholders

For example, to allow for extra customization of a new annotation not described in the default job template, you can add the following:
{
    "apiVersion": "run.googleapis.com/v1",
    "kind": "Job",
    "metadata":
    {
        "name": "{{ name }}",
        "annotations":
        {
            "run.googleapis.com/my-custom-annotation": "{{ my_custom_annotation }}",
            "run.googleapis.com/launch-stage": "BETA",
        },
      ...
    },
  ...
}
my_custom_annotation can now be used as a placeholder in the job template and set on a per-deployment basis.
# prefect.yaml
deployments:
...
  - name: my-deployment
  ...
  work_pool: my-cloud-run-pool
    job_variables: {"my_custom_annotation": "my-custom-value"}
Additionally, fields can be set to prevent configuration at the deployment level. For example to configure the vpc_connector_name field, the placeholder can be removed and replaced with an actual value. Now all deployments that point to this work pool will use the same vpc_connector_name value.
{
    "apiVersion": "run.googleapis.com/v1",
    "kind": "Job",
    "spec":
    {
        "template":
        {
            "metadata":
            {
                "annotations":
                {
                    "run.googleapis.com/vpc-access-connector": "my-vpc-connector"
                }
            },
            ...
        },
        ...
    }
}

Classes

CloudRunWorkerJobConfiguration

Configuration class used by the Cloud Run Worker to create a Cloud Run Job. An instance of this class is passed to the Cloud Run worker’s run method for each flow run. It contains all information necessary to execute the flow run as a Cloud Run Job. Attributes:
  • region: The region where the Cloud Run Job resides.
  • credentials: The GCP Credentials used to connect to Cloud Run.
  • job_body: The job body used to create the Cloud Run Job.
  • timeout: Max allowed duration the job may be active before Cloud Run will actively try to mark it failed and kill associated containers (maximum of 3600 seconds, 1 hour).
  • keep_job: Whether to delete the Cloud Run Job after it completes.
  • prefect_api_key_secret: A GCP secret containing a Prefect API Key.
  • prefect_api_auth_string_secret: A GCP secret containing a Prefect API authorization string.
Methods:

job_name

job_name(self) -> str
property for accessing the name from the job metadata.

prepare_for_flow_run

prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: Optional['DeploymentResponse'] = None, flow: Optional['Flow'] = None, work_pool: Optional['WorkPool'] = None, worker_name: Optional[str] = None, worker_id: Optional['UUID'] = None)
Prepares the job configuration for a flow run. Ensures that necessary values are present in the job body and that the job body is valid. Args:
  • flow_run: The flow run to prepare the job configuration for
  • deployment: The deployment associated with the flow run used for preparation.
  • flow: The flow associated with the flow run used for preparation.

project

project(self) -> str
property for accessing the project from the credentials.

CloudRunWorkerVariables

Default variables for the Cloud Run worker. The schema for this class is used to populate the variables section of the default base job template.

CloudRunWorkerResult

Contains information about the final state of a completed process

CloudRunWorker

Prefect worker that executes flow runs within Cloud Run Jobs. Methods:

kill_infrastructure

kill_infrastructure(self, infrastructure_pid: str, configuration: CloudRunWorkerJobConfiguration, grace_seconds: int = 30) -> None
Kill a Cloud Run Job by deleting it. Args:
  • infrastructure_pid: The job name.
  • configuration: The job configuration used to connect to GCP.
  • grace_seconds: Not used for Cloud Run (GCP handles graceful shutdown).
Raises:
  • InfrastructureNotFound: If the job doesn’t exist.

run

run(self, flow_run: 'FlowRun', configuration: CloudRunWorkerJobConfiguration, task_status: Optional[anyio.abc.TaskStatus] = None) -> CloudRunWorkerResult
Executes a flow run within a Cloud Run Job and waits for the flow run to complete. Args:
  • flow_run: The flow run to execute
  • configuration: The configuration to use when executing the flow run.
  • task_status: The task status object for the current flow run. If provided, the task will be marked as started.
Returns:
  • A result object containing information about the final state of the flow run