Skip to main content

prefect_dbt.cloud.utils

Utilities for common interactions with the dbt Cloud API

Functions

extract_user_message

extract_user_message(ex: HTTPStatusError) -> Optional[str]
Extracts user message from a error response from the dbt Cloud administrative API. Args:
  • ex: An HTTPStatusError raised by httpx
Returns:
  • user_message from dbt Cloud administrative API response or None if a
  • user_message cannot be extracted

extract_developer_message

extract_developer_message(ex: HTTPStatusError) -> Optional[str]
Extracts developer message from a error response from the dbt Cloud administrative API. Args:
  • ex: An HTTPStatusError raised by httpx
Returns:
  • developer_message from dbt Cloud administrative API response or None if a
  • developer_message cannot be extracted

call_dbt_cloud_administrative_api_endpoint

call_dbt_cloud_administrative_api_endpoint(dbt_cloud_credentials: DbtCloudCredentials, path: str, http_method: str, params: Optional[Dict[str, Any]] = None, json: Optional[Dict[str, Any]] = None) -> Any
Task that calls a specified endpoint in the dbt Cloud administrative API. Use this task if a prebuilt one is not yet available. Args:
  • dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
  • path: The partial path for the request (e.g. /projects/). Will be appended onto the base URL as determined by the client configuration.
  • http_method: HTTP method to call on the endpoint.
  • params: Query parameters to include in the request.
  • json: JSON serializable body to send in the request.
Returns:
  • The body of the response. If the body is JSON serializable, then the result of json.loads with the body as the input will be returned. Otherwise, the body will be returned directly.
Examples: List projects for an account:
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint

@flow
def get_projects_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    result = call_dbt_cloud_administrative_api_endpoint(
        dbt_cloud_credentials=credentials,
        path="/projects/",
        http_method="GET",
    )
    return result["data"]

get_projects_flow()
Create a new job:
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint


@flow
def create_job_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    result = call_dbt_cloud_administrative_api_endpoint(
        dbt_cloud_credentials=credentials,
        path="/jobs/",
        http_method="POST",
        json={
            "id": None,
            "account_id": 123456789,
            "project_id": 100,
            "environment_id": 10,
            "name": "Nightly run",
            "dbt_version": None,
            "triggers": {"github_webhook": True, "schedule": True},
            "execute_steps": ["dbt run", "dbt test", "dbt source snapshot-freshness"],
            "settings": {"threads": 4, "target_name": "prod"},
            "state": 1,
            "schedule": {
                "date": {"type": "every_day"},
                "time": {"type": "every_hour", "interval": 1},
            },
        },
    )
    return result["data"]

create_job_flow()

Classes

DbtCloudAdministrativeApiCallFailed

Raised when a call to dbt Cloud administrative API fails.