prefect-gcp helps you leverage the capabilities of Google Cloud Platform (GCP) in your workflows. For example, you can run flows on Vertex AI or Cloud Run, read and write data to BigQuery and Cloud Storage, and retrieve secrets with Secret Manager.

Getting started

Prerequisites

  • A GCP account and the necessary permissions to access desired services.

Install prefect-gcp

Install prefect-gcp as an extra of prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.

If using BigQuery, Cloud Storage, Secret Manager, or Vertex AI, see additional installation options.

Install extras

To install prefect-gcp with all additional capabilities, run the install command above and then run the following command:

Or, install extras individually:

Register newly installed block types

Register the block types in the module to make them available for use.

prefect block register -m prefect_gcp

Blocks setup

Credentials

Authenticate with a service account to use prefect-gcp services.

  1. Refer to the GCP service account documentation to create and download a service account key file.
  2. Copy the JSON contents.
  3. Use the Python code below, replace the placeholders with your information.
from prefect_gcp import GcpCredentials

# replace this PLACEHOLDER dict with your own service account info
service_account_info = {
  "type": "service_account",
  "project_id": "PROJECT_ID",
  "private_key_id": "KEY_ID",
  "private_key": "-----BEGIN PRIVATE KEY-----\nPRIVATE_KEY\n-----END PRIVATE KEY-----\n",
  "client_email": "SERVICE_ACCOUNT_EMAIL",
  "client_id": "CLIENT_ID",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://accounts.google.com/o/oauth2/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/SERVICE_ACCOUNT_EMAIL"
}

GcpCredentials(
    service_account_info=service_account_info
).save("CREDENTIALS-BLOCK-NAME")

This credential block can be used to create other prefect_gcp blocks.

service_account_info vs service_account_file

The advantage of using service_account_info, instead of service_account_file, is that it is accessible across containers.

If service_account_file is used, the provided path must be available in the container executing the flow.

BigQuery

Read data from and write to Google BigQuery within your Prefect flows.

Be sure to install prefect-gcp with the BigQuery extra.

from prefect_gcp.bigquery import GcpCredentials, BigQueryWarehouse

gcp_credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME")

bigquery_block = BigQueryWarehouse(
    gcp_credentials = gcp_credentials,
    fetch_size = 1  # Optional: specify a default number of rows to fetch when calling fetch_many
)
bigquery_block.save("BIGQUERY-BLOCK-NAME")

Secret Manager

Manage secrets in Google Cloud Platform’s Secret Manager.

from prefect_gcp import GcpCredentials, GcpSecret
gcp_credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME")

gcp_secret = GcpSecret(
    secret_name = "your-secret-name",
    secret_version = "latest",
    gcp_credentials = gcp_credentials
)

gcp_secret.save("SECRET-BLOCK-NAME")

Cloud Storage

Create a block to interact with a GCS bucket.

from prefect_gcp import GcpCredentials, GcsBucket

gcs_bucket = GcsBucket(
    bucket="BUCKET-NAME",
    gcp_credentials=GcpCredentials.load("BIGQUERY-BLOCK-NAME")
)
gcs_bucket.save("GCS-BLOCK-NAME")

Run flows on Google Cloud Run or Vertex AI

Run flows on Google Cloud Run or Vertex AI to dynamically scale your infrastructure.

Prefect Cloud offers Google Cloud Run push work pools. Push work pools submit runs directly to Google Cloud Run, instead of requiring a worker to actively poll for flow runs to execute.

See the Google Cloud Run Worker Guide for a walkthrough of using Google Cloud Run in a hybrid work pool.

Examples

Interact with BigQuery

This code creates a new dataset in BigQuery, defines a table, insert rows, and fetches data from the table:

from prefect import flow
from prefect_gcp.bigquery import GcpCredentials, BigQueryWarehouse

@flow
def bigquery_flow():
    all_rows = []
    gcp_credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME")

    client = gcp_credentials.get_bigquery_client()
    client.create_dataset("test_example", exists_ok=True)

    with BigQueryWarehouse(gcp_credentials=gcp_credentials) as warehouse:
        warehouse.execute(
            "CREATE TABLE IF NOT EXISTS test_example.customers (name STRING, address STRING);"
        )
        warehouse.execute_many(
            "INSERT INTO test_example.customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Marvin", "address": "Highway 42"},
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Highway 42"},
            ],
        )
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = warehouse.fetch_many("SELECT * FROM test_example.customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.extend(new_rows)
    return all_rows


if __name__ == "__main__":
    bigquery_flow()

Use Prefect with Google Cloud Storage

Interact with Google Cloud Storage.

The code below uses prefect_gcp to upload a file to a Google Cloud Storage bucket and download the same file under a different filename.

from pathlib import Path
from prefect import flow
from prefect_gcp import GcpCredentials, GcsBucket


@flow
def cloud_storage_flow():
    # create a dummy file to upload
    file_path = Path("test-example.txt")
    file_path.write_text("Hello, Prefect!")

    gcp_credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME")
    gcs_bucket = GcsBucket(
        bucket="BUCKET-NAME",
        gcp_credentials=gcp_credentials
    )

    gcs_bucket_path = gcs_bucket.upload_from_path(file_path)
    downloaded_file_path = gcs_bucket.download_object_to_path(
        gcs_bucket_path, "downloaded-test-example.txt"
    )
    return downloaded_file_path.read_text()


if __name__ == "__main__":
    cloud_storage_flow()

Upload and download directories

GcsBucket supports uploading and downloading entire directories.

Save secrets with Google Secret Manager

Read and write secrets with Google Secret Manager.

Be sure to install prefect-gcp with the Secret Manager extra.

The code below writes a secret to the Secret Manager, reads the secret data, and deletes the secret.

from prefect import flow
from prefect_gcp import GcpCredentials, GcpSecret


@flow
def secret_manager_flow():
    gcp_credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME")
    gcp_secret = GcpSecret(secret_name="test-example", gcp_credentials=gcp_credentials)
    gcp_secret.write_secret(secret_data=b"Hello, Prefect!")
    secret_data = gcp_secret.read_secret()
    gcp_secret.delete_secret()
    return secret_data


if __name__ == "__main__":
    secret_manager_flow()

Resources

For assistance using GCP, consult the Google Cloud documentation.

GCP can also authenticate without storing credentials in a block. See Access third-party secrets for an example that uses AWS Secrets Manager and Snowflake.

Refer to the prefect-gcp SDK documentation to explore all of the capabilities of the prefect-gcp library.