Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.prefect.io/llms.txt

Use this file to discover all available pages before exploring further.

If you have an existing dbt Cloud job, use the pre-built flow run_dbt_cloud_job to trigger a job run and wait until the job run is finished. If some nodes fail, run_dbt_cloud_job can efficiently retry the unsuccessful nodes. Prior to running this flow, save your dbt Cloud credentials to a DbtCloudCredentials block and create a DbtCloudJob block.
Before creating blocks for the first time, register prefect-dbt’s block types:
prefect block register -m prefect_dbt

Save dbt Cloud credentials to a block

Blocks can be created through code or through the UI. To create a dbt Cloud Credentials block:
  1. Log into your dbt Cloud account.
  2. Click API Tokens on the sidebar.
  3. Copy a Service Token.
  4. Copy the account ID from the URL: https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>.
  5. Create and run the following script, replacing the placeholders:
from prefect_dbt.cloud import DbtCloudCredentials


DbtCloudCredentials(
    api_key="API-KEY-PLACEHOLDER",
    account_id="ACCOUNT-ID-PLACEHOLDER"
).save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

Create a dbt Cloud job block

  1. In dbt Cloud, click on Deploy -> Jobs.
  2. Select a job.
  3. Copy the job ID from the URL: https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID>
  4. Create and run the following script, replacing the placeholders.
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob


dbt_cloud_credentials = DbtCloudCredentials.load("CREDENTIALS-BLOCK-PLACEHOLDER")
dbt_cloud_job = DbtCloudJob(
    dbt_cloud_credentials=dbt_cloud_credentials,
    job_id="JOB-ID-PLACEHOLDER",
)
dbt_cloud_job.save("JOB-BLOCK-NAME-PLACEHOLDER")

Run a dbt Cloud job and wait for completion

import asyncio

from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job

@flow
async def run_dbt_job_flow():
    dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")
    return await run_dbt_cloud_job(
        dbt_cloud_job=dbt_cloud_job,
        targeted_retries=0,
    )


if __name__ == "__main__":
    asyncio.run(run_dbt_job_flow())

Create Prefect assets from a dbt Cloud job

Set create_assets=True to emit Prefect asset materializations for the dbt models, seeds, and snapshots that completed successfully in the dbt Cloud run. Assets are created from the run’s dbt artifacts, so the dbt Cloud job must make manifest.json and run_results.json available.
import asyncio

from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job

@flow
async def run_dbt_job_flow():
    dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")
    return await run_dbt_cloud_job(
        dbt_cloud_job=dbt_cloud_job,
        targeted_retries=0,
        create_assets=True,
    )


if __name__ == "__main__":
    asyncio.run(run_dbt_job_flow())

See also