Skip to main content
If you have an existing dbt Cloud job, use the pre-built flow run_dbt_cloud_job to trigger a job run and wait until the job run is finished. If some nodes fail, run_dbt_cloud_job can efficiently retry the unsuccessful nodes. Prior to running this flow, save your dbt Cloud credentials to a DbtCloudCredentials block and create a DbtCloudJob block.
Before creating blocks for the first time, register prefect-dbt’s block types:
prefect block register -m prefect_dbt

Save dbt Cloud credentials to a block

Blocks can be created through code or through the UI. To create a dbt Cloud Credentials block:
  1. Log into your dbt Cloud account.
  2. Click API Tokens on the sidebar.
  3. Copy a Service Token.
  4. Copy the account ID from the URL: https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>.
  5. Create and run the following script, replacing the placeholders:
from prefect_dbt.cloud import DbtCloudCredentials


DbtCloudCredentials(
    api_key="API-KEY-PLACEHOLDER",
    account_id="ACCOUNT-ID-PLACEHOLDER"
).save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

Create a dbt Cloud job block

  1. In dbt Cloud, click on Deploy -> Jobs.
  2. Select a job.
  3. Copy the job ID from the URL: https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID>
  4. Create and run the following script, replacing the placeholders.
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob


dbt_cloud_credentials = DbtCloudCredentials.load("CREDENTIALS-BLOCK-PLACEHOLDER")
dbt_cloud_job = DbtCloudJob(
    dbt_cloud_credentials=dbt_cloud_credentials,
    job_id="JOB-ID-PLACEHOLDER",
)
dbt_cloud_job.save("JOB-BLOCK-NAME-PLACEHOLDER")

Run a dbt Cloud job and wait for completion

from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job
import asyncio

@flow
async def run_dbt_job_flow():
    dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")
    return await run_dbt_cloud_job(
        dbt_cloud_job=dbt_cloud_job,
        targeted_retries=0,
    )


if __name__ == "__main__":
    asyncio.run(run_dbt_job_flow())

See also