With prefect-dbt, you can trigger and observe dbt Cloud jobs, execute dbt Core CLI commands, and incorporate other tools, such as Snowflake, into your dbt runs. Prefect provides a global view of the state of your workflows and allows you to take action based on state changes.

Prefect integrations may provide pre-built blocks, flows, or tasks for interacting with external systems. Block types in this library allow you to do things such as run a dbt Cloud job or execute a dbt Core command.

Getting started

Prerequisites

Install prefect-dbt

The following command will install a version of prefect-dbt compatible with your installed version of prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.

pip install "prefect[dbt]"

Upgrade to the latest versions of prefect and prefect-dbt:

pip install -U "prefect[dbt]"

If necessary, see additional installation options for dbt Core with BigQuery, Snowflake, and Postgres.

Register newly installed blocks types

Register the block types in the prefect-dbt module to make them available for use.

prefect block register -m prefect_dbt

dbt Cloud

If you have an existing dbt Cloud job, use the pre-built flow run_dbt_cloud_job to trigger a job run and wait until the job run is finished. If some nodes fail, run_dbt_cloud_job can efficiently retry the unsuccessful nodes. Prior to running this flow, save your dbt Cloud credentials to a DbtCloudCredentials block and create a dbt Cloud Job block:

Save dbt Cloud credentials to a block

Blocks can be created through code or through the UI.

To create a dbt Cloud Credentials block:

  1. Log into your dbt Cloud account.
  2. Click API Tokens on the sidebar.
  3. Copy a Service Token.
  4. Copy the account ID from the URL: https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>.
  5. Create and run the following script, replacing the placeholders:
from prefect_dbt.cloud import DbtCloudCredentials


DbtCloudCredentials(
    api_key="API-KEY-PLACEHOLDER",
    account_id="ACCOUNT-ID-PLACEHOLDER"
).save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

Create a dbt Cloud job block

  1. In dbt Cloud, click on Deploy -> Jobs.
  2. Select a job.
  3. Copy the job ID from the URL: https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID>
  4. Create and run the following script, replacing the placeholders.
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob


dbt_cloud_credentials = DbtCloudCredentials.load("CREDENTIALS-BLOCK-PLACEHOLDER")
dbt_cloud_job = DbtCloudJob(
    dbt_cloud_credentials=dbt_cloud_credentials,
    job_id="JOB-ID-PLACEHOLDER"
).save("JOB-BLOCK-NAME-PLACEHOLDER")

Run a dbt Cloud job and wait for completion

from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job
import asyncio

@flow
async def run_dbt_job_flow():
    result = await run_dbt_cloud_job(
        dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER"),
        targeted_retries = 0,
    )
    return await result

if __name__ == "__main__":
    asyncio.run(run_dbt_job_flow())

dbt Core

prefect-dbt 0.7.0 and later

During the prerelease phase of prefect-dbt==0.7.0, use the --pre flag to install the latest release candidate.

pip install -U --pre prefect-dbt

Versions 0.7.0 and later of prefect-dbt include the PrefectDbtRunner class, which provides an improved interface for running dbt Core commands with better logging, failure handling, and additional features.

The PrefectDbtRunner is inspired by the DbtRunner from dbt Core, and its invoke method accepts the same arguments. Refer to the DbtRunner documentation for more information on how to call invoke.

Basic usage:

from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner().invoke(["build"])


if __name__ == "__main__":
    run_dbt()

dbt settings

The PrefectDbtSettings class, based on Pydantic’s BaseSettings class, automatically detects common DBT_-prefixed environment variables, like DBT_PROFILES_DIR and DBT_PROJECT_DIR. If no environment variables are set, dbt’s defaults are used.

Provide a PrefectDbtSettings instance to PrefectDbtRunner to customize dbt settings or override environment variables.

from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt"
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

profiles.yml templating

The PrefectDbtRunner class supports templating in your profiles.yml file, allowing you to reference Prefect blocks and variables that will be resolved at runtime. This enables you to store sensitive credentials securely using Prefect blocks, and configure different targets based on the Prefect workspace.

For example, a Prefect variable called target can have a different value in development (dev) and production (prod) workspaces. This allows you to use the same profiles.yml file to automatically reference a local DuckDB instance in development and a Snowflake instance in production.

example:
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: snowflake
      account: "{{ prefect.blocks.snowflake-credentials.warehouse-access.account }}"
      user: "{{ prefect.blocks.snowflake-credentials.warehouse-access.user }}"
      password: "{{ prefect.blocks.snowflake-credentials.warehouse-access.password }}"
      database: "{{ prefect.blocks.snowflake-connector.prod-connector.database }}"
      schema: "{{ prefect.blocks.snowflake-connector.prod-connector.schema }}"
      warehouse: "{{ prefect.blocks.snowflake-connector.prod-connector.warehouse }}"
      threads: 4

  target: "{{ prefect.variables.target }}"

Logging

The PrefectDbtRunner class maps all dbt log levels to standard Python logging levels, so filtering for log levels like WARNING or ERROR in the Prefect UI applies to dbt’s logs. It also selects the appropriate logging target based on the invocation context: logging to a Prefect run logger when executed within a flow or task, and logging directly to the terminal when run outside of a flow or task.

Failure handling

The PrefectDbtRunner class raises exceptions with standardized messages containing detailed information about the failure.

Failures detected during invocation of dbt command 'build':
Test not_null_my_first_dbt_model_id failed with message: "Got 1 result, configured to fail if != 0"

It also offers a raise_on_failure option to control whether non-exception dbt failures, like failed tests, are raised as exceptions.

from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(
        raise_on_failure=False  # Failed tests will not fail the flow run
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

Events

The PrefectDbtRunner automatically emits Prefect events for completed dbt nodes with status information.

An event for a skipped model may look like this, for example:

{
  "event": "my_second_dbt_model skipped",
  "occurred": "2025-02-06T17:05:02.614Z",
  "payload": {
    "node_info": {
      "node_path": "test/my_second_dbt_model.sql",
      "node_name": "my_second_dbt_model",
      "unique_id": "model.test.my_second_dbt_model",
      "resource_type": "model",
      "materialized": "view",
      "node_status": "skipped",
      "node_started_at": "2025-02-06T17:05:02.614034",
      "meta": {},
      "node_relation": {
        "database": "dev",
        "schema": "main",
        "alias": "my_second_dbt_model",
        "relation_name": "\"dev\".\"main\".\"my_second_dbt_model\""
      }
    },
    "run_result": {
      "status": "skipped",
      "thread": "Thread-2 (worker)",
      "adapter_response": {}
    }
  },
  "received": "2025-02-06T17:05:02.843Z",
  "resource": {
    "prefect.resource.id": "dbt.model.test.my_second_dbt_model",
    "prefect.resource.name": "my_second_dbt_model",
    "dbt.node.status": "skipped"
  },
  "workspace": "87a4870c-8152-46cb-93aa-06482f9aa1df"
}

These events contain both the node_info and run_result for processed nodes in your dbt project, as well as the final status of the node’s execution in the primary resource. This enables you to set up alerting for specific node states through Prefect’s automations system. To be notified when this model is skipped, the following automation trigger could be set up:

{
  "type": "event",
  "match": {
    "prefect.resource.id": [
      "dbt.model.test.my_second_dbt_model"
    ],
    "dbt.node.status": "skipped"
  },
  "match_related": {},
  "after": [],
  "expect": [
    "my_second_dbt_model skipped"
  ],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 0
}

Resources and lineage

Resources in Prefect Cloud are currently experienmental. Enable them by setting the PREFECT_EXPERIMENTS_LINEAGE_EVENTS_ENABLED environment variable to true wherever flow runs are executed.

The PrefectDbtRunner class automatically emits lineage events describing the relationships between the resources in your dbt project. Resources can be viewed in Prefect Cloud on the flow run page. Click “View graph” to display a lineage graph with nodes directly upstream and downstream of the selected resource.

When running in Prefect Cloud, the lineage graph for executed nodes will be visible on the flow run page, showing upstream and downstream dependencies.

To describe resource that exist outside of your dbt project and are upstream of a given node, supply the resource in your dbt config. For example, to describe a BigQuery table as an upstream resource of a dbt model, add the following to your model’s config:

prefect:
  upstream_resources:
    - name: "my_bigquery_table"
      role: "table"
      id: "bigquery://my-project.my_dataset.my_table"

Native dbt configuration

You can disable these features for all resources in your dbt project config, or for specific resources in their own config:

prefect:
  emit_events: False  # Disable all events
  emit_node_events: False  # Disable node events
  emit_lineage_events: False  # Disable lineage events

prefect-dbt 0.6.6 and earlier

prefect-dbt supports a couple of ways to run dbt Core commands. A DbtCoreOperation block will run the commands as shell commands, while other tasks use dbt’s Programmatic Invocation.

Optionally, specify the project_dir. If profiles_dir is not set, the DBT_PROFILES_DIR environment variable will be used. If DBT_PROFILES_DIR is not set, the default directory will be used $HOME/.dbt/.

Use an existing profile

If you have an existing dbt profiles.yml file, specify the profiles_dir where the file is located:

from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation


@flow
def trigger_dbt_flow() -> str:
    result = DbtCoreOperation(
        commands=["pwd", "dbt debug", "dbt run"],
        project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
        profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER"
    ).run()
    return result


if __name__ == "__main__":
    trigger_dbt_flow()

If you are already using Prefect blocks such as the Snowflake Connector block, you can use those blocks to create a new profiles.yml with a DbtCliProfile block.

Use environment variables with Prefect secret blocks

If you use environment variables in profiles.yml, set a Prefect Secret block as an environment variable:

import os
from prefect.blocks.system import Secret


secret_block = Secret.load("DBT_PASSWORD_PLACEHOLDER")

# Access the stored secret
DBT_PASSWORD = secret_block.get()
os.environ["DBT_PASSWORD"] = DBT_PASSWORD

This example profiles.yml file could then access that variable.

profile:
  target: prod
  outputs:
    prod:
      type: postgres
      host: 127.0.0.1
      # IMPORTANT: Make sure to quote the entire Jinja string here
      user: dbt_user
      password: "{{ env_var('DBT_PASSWORD') }}"

Create a new profiles.yml file with blocks

If you don’t have a profiles.yml file, you can use a DbtCliProfile block to create profiles.yml. Then, specify profiles_dir where profiles.yml will be written. Here’s example code with placeholders:

from prefect import flow
from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation


@flow
def trigger_dbt_flow():
    dbt_cli_profile = DbtCliProfile.load("DBT-CORE-OPERATION-BLOCK-PLACEHOLDER")
    with DbtCoreOperation(
        commands=["dbt debug", "dbt run"],
        project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
        profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER",
        dbt_cli_profile=dbt_cli_profile,
    ) as dbt_operation:
        dbt_process = dbt_operation.trigger()
        # do other things before waiting for completion
        dbt_process.wait_for_completion()
        result = dbt_process.fetch_result()
    return result


if __name__ == "__main__":
    trigger_dbt_flow()

Supplying the dbt_cli_profile argument will overwrite existing profiles.yml files

If you already have a profiles.yml file in the specified profiles_dir, the file will be overwritten. If you do not specify a profiles directory, profiles.yml at ~/.dbt/ would be overwritten.

Visit the SDK reference in the side navigation to see other built-in TargetConfigs blocks.

If the desired service profile is not available, you can build one from the generic TargetConfigs class.

Programmatic Invocation

prefect-dbt has some pre-built tasks that use dbt’s programmatic invocation.

For example:

from prefect import flow
from prefect_dbt.cli.tasks import from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command, dbt_build_task


@flow
def dbt_build_flow():
    trigger_dbt_cli_command(
        command="dbt deps", project_dir="/Users/test/my_dbt_project_dir",
    )
    dbt_build_task(
        project_dir = "/Users/test/my_dbt_project_dir",
        create_summary_artifact = True,
        summary_artifact_key = "dbt-build-task-summary",
        extra_command_args=["--select", "foo_model"]
    )


if __name__ == "__main__":
    dbt_build_flow()

See the SDK docs for other pre-built tasks.

Create a summary artifact

These pre-built tasks can also create artifacts. These artifacts have extra information about dbt Core runs, such as messages and compiled code for nodes that fail or have errors.

BigQuery CLI profile block example

To create dbt Core target config and profile blocks for BigQuery:

  1. Save and load a GcpCredentials block.
  2. Determine the schema / dataset you want to use in BigQuery.
  3. Create a short script, replacing the placeholders.
from prefect_gcp.credentials import GcpCredentials
from prefect_dbt.cli import BigQueryTargetConfigs, DbtCliProfile


credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
target_configs = BigQueryTargetConfigs(
    schema="SCHEMA-NAME-PLACEHOLDER",  # also known as dataset
    credentials=credentials,
)
target_configs.save("TARGET-CONFIGS-BLOCK-NAME-PLACEHOLDER")

dbt_cli_profile = DbtCliProfile(
    name="PROFILE-NAME-PLACEHOLDER",
    target="TARGET-NAME-placeholder",
    target_configs=target_configs,
)
dbt_cli_profile.save("DBT-CLI-PROFILE-BLOCK-NAME-PLACEHOLDER")

To create a dbt Core operation block:

  1. Determine the dbt commands you want to run.
  2. Create a short script, replacing the placeholders.
from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation


dbt_cli_profile = DbtCliProfile.load("DBT-CLI-PROFILE-BLOCK-NAME-PLACEHOLDER")
dbt_core_operation = DbtCoreOperation(
    commands=["DBT-CLI-COMMANDS-PLACEHOLDER"],
    dbt_cli_profile=dbt_cli_profile,
    overwrite_profiles=True,
)
dbt_core_operation.save("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")

Load the saved block that holds your credentials:

from prefect_dbt.cloud import DbtCoreOperation


DbtCoreOperation.load("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")

Resources

For assistance using dbt, consult the dbt documentation.

Refer to the prefect-dbt SDK documentation to explore all the capabilities of the prefect-dbt library.

Additional installation options

Additional installation options for dbt Core with BigQuery, Snowflake, and Postgres are shown below.

Additional capabilities for dbt Core and Snowflake profiles

First install the main library compatible with your Prefect version:

pip install "prefect[dbt]"

Then install the additional capabilities you need.

pip install "prefect-dbt[snowflake]"

Additional capabilities for dbt Core and BigQuery profiles

pip install "prefect-dbt[bigquery]"

Additional capabilities for dbt Core and Postgres profiles

pip install "prefect-dbt[postgres]"

Or, install all of the extras.

pip install -U "prefect-dbt[all_extras]"