prefect-gcp
prefect-gcp
helps you leverage the capabilities of Google Cloud Platform (GCP) in your workflows.
For example, you can run flow on Vertex AI or Cloud Run, read and write data to BigQuery and Cloud Storage, retrieve secrets with Secret Manager.
Getting started
Prerequisites
- A GCP account and the necessary permissions to access desired services.
Install prefect-gcp
The following command will install a version of prefect-gcp
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
pip install "prefect[gcp]"
Upgrade to the latest versions of prefect
and prefect-gcp
:
pip install -U "prefect[gcp]"
If using BigQuery, Cloud Storage, Secret Manager, or Vertex AI, see additional installation options.
To install prefect-gcp
with all additional capabilities, run the install command above and then run the following command:
pip install "prefect-gcp[all_extras]"
Register newly installed block types
Register the block types in the module to make them available for use.
prefect block register -m prefect_gcp
Authenticate using a GCP Credentials block
Authenticate with a service account to use prefect-gcp
services.
- Refer to the GCP service account documentation to create and download a service account key file.
- Copy the JSON contents.
- Use the Python code below, replace the placeholders with your information.
from prefect_gcp import GcpCredentials
# replace this PLACEHOLDER dict with your own service account info
service_account_info = {
"type": "service_account",
"project_id": "PROJECT_ID",
"private_key_id": "KEY_ID",
"private_key": "-----BEGIN PRIVATE KEY-----\nPRIVATE_KEY\n-----END PRIVATE KEY-----\n",
"client_email": "SERVICE_ACCOUNT_EMAIL",
"client_id": "CLIENT_ID",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/SERVICE_ACCOUNT_EMAIL"
}
GcpCredentials(
service_account_info=service_account_info
).save("BLOCK-NAME-PLACEHOLDER")
service_account_info
vs service_account_file
The advantage of using service_account_info
, instead of service_account_file
, is that it is accessible across containers.
If service_account_file
is used, the provided file path must be available in the container executing the flow.
Alternatively, GCP can authenticate without storing credentials in a block. See the Third-pary Secrets docs for an analogous example that uses AWS Secrets Manager and Snowflake.
Run flows on Google Cloud Run or Vertex AI
Run flows on Google Cloud Run or Vertex AI to dynamically scale your infrastructure.
See the Google Cloud Run Worker Guide for a walkthrough of using Google Cloud Run to run workflows with a hybrid work pool.
If you’re using Prefect Cloud, Google Cloud Run push work pools provide all the benefits of Google Cloud Run along with a quick setup and no worker needed.
Use Prefect with Google BigQuery
Read data from and write to Google BigQuery within your Prefect flows.
Be sure to install prefect-gcp
with the BigQuery extra.
This code creates a new dataset in BigQuery, define a table, insert rows, and fetch data from the table:
from prefect import flow
from prefect_gcp.bigquery import GcpCredentials, BigQueryWarehouse
@flow
def bigquery_flow():
all_rows = []
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
client = gcp_credentials.get_bigquery_client()
client.create_dataset("test_example", exists_ok=True)
with BigQueryWarehouse(gcp_credentials=gcp_credentials) as warehouse:
warehouse.execute(
"CREATE TABLE IF NOT EXISTS test_example.customers (name STRING, address STRING);"
)
warehouse.execute_many(
"INSERT INTO test_example.customers (name, address) VALUES (%(name)s, %(address)s);",
seq_of_parameters=[
{"name": "Marvin", "address": "Highway 42"},
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Highway 42"},
],
)
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = warehouse.fetch_many("SELECT * FROM test_example.customers", size=2)
if len(new_rows) == 0:
break
all_rows.extend(new_rows)
return all_rows
if __name__ == "__main__":
bigquery_flow()
Use Prefect with Google Cloud Storage
Interact with Google Cloud Storage.
Be sure to install prefect-gcp
with the Cloud Storage extra.
The code below uses prefect_gcp
to upload a file to a Google Cloud Storage bucket and download the same file under a different file name.
from pathlib import Path
from prefect import flow
from prefect_gcp import GcpCredentials, GcsBucket
@flow
def cloud_storage_flow():
# create a dummy file to upload
file_path = Path("test-example.txt")
file_path.write_text("Hello, Prefect!")
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
gcs_bucket = GcsBucket(
bucket="BUCKET-NAME-PLACEHOLDER",
gcp_credentials=gcp_credentials
)
gcs_bucket_path = gcs_bucket.upload_from_path(file_path)
downloaded_file_path = gcs_bucket.download_object_to_path(
gcs_bucket_path, "downloaded-test-example.txt"
)
return downloaded_file_path.read_text()
if __name__ == "__main__":
cloud_storage_flow()
Upload and download directories
GcsBucket
supports uploading and downloading entire directories.
Save secrets with Google Secret Manager
Read and write secrets with Google Secret Manager.
Be sure to install prefect-gcp
with the Secret Manager extra.
The code below writes a secret to the Secret Manager, reads the secret data, and deletes the secret.
from prefect import flow
from prefect_gcp import GcpCredentials, GcpSecret
@flow
def secret_manager_flow():
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
gcp_secret = GcpSecret(secret_name="test-example", gcp_credentials=gcp_credentials)
gcp_secret.write_secret(secret_data=b"Hello, Prefect!")
secret_data = gcp_secret.read_secret()
gcp_secret.delete_secret()
return secret_data
if __name__ == "__main__":
secret_manager_flow()
Access Google credentials or clients from GcpCredentials
You can instantiate a Google Cloud client, such as bigquery.Client
.
Note that a GcpCredentials
object is NOT a valid input to the underlying BigQuery client - use the get_credentials_from_service_account
method to access and pass a google.auth.Credentials
object.
import google.cloud.bigquery
from prefect import flow
from prefect_gcp import GcpCredentials
@flow
def create_bigquery_client():
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
google_auth_credentials = gcp_credentials.get_credentials_from_service_account()
bigquery_client = bigquery.Client(credentials=google_auth_credentials)
To access the underlying client, use the get_client
method from GcpCredentials
.
from prefect import flow
from prefect_gcp import GcpCredentials
@flow
def create_bigquery_client():
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
bigquery_client = gcp_credentials.get_client("bigquery")
Resources
For assistance using GCP, consult the Google Cloud documentation.
Refer to the prefect-gcp
SDK documentation linked in the sidebar to explore all the capabilities of the prefect-gcp
library.
Additional installation options
First install the main library compatible with your prefect
version:
pip install "prefect[gcp]"
Then install the additional capabilities you need.
To use Cloud Storage
pip install "prefect-gcp[cloud_storage]"
To use BigQuery
pip install "prefect-gcp[bigquery]"
To use Secret Manager
pip install "prefect-gcp[secret_manager]"
To use Vertex AI
pip install "prefect-gcp[aiplatform]"
Was this page helpful?