Skip to main content
Versions 0.7.0 and later of prefect-dbt include the PrefectDbtRunner class, which provides an improved interface for running dbt Core commands with better logging, failure handling, and automatic asset lineage.
The PrefectDbtRunner is inspired by the DbtRunner from dbt Core, and its invoke method accepts the same arguments. Refer to the DbtRunner documentation for more information on how to call invoke.

Basic usage

from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner().invoke(["build"])


if __name__ == "__main__":
    run_dbt()
When calling .invoke() in a flow or task, each node in dbt’s execution graph is reflected as a task in Prefect’s execution graph. Logs from each node will belong to the corresponding task, and each task’s state is determined by the state of that node’s execution.
15:54:59.119 | INFO    | Flow run 'imposing-partridge' - Found 8 models, 3 seeds, 18 data tests, 543 macros
15:54:59.134 | INFO    | Flow run 'imposing-partridge' - 
15:54:59.148 | INFO    | Flow run 'imposing-partridge' - Concurrency: 1 threads (target='dev')
15:54:59.164 | INFO    | Flow run 'imposing-partridge' - 
15:54:59.665 | INFO    | Task run 'model my_first_dbt_model' - 1 of 29 OK created sql table model main.my_first_dbt_model ..................... [OK in 0.18s]
15:54:59.671 | INFO    | Task run 'model my_first_dbt_model' - Finished in state Completed()
...
15:55:02.373 | ERROR   | Task run 'model product_metrics' -   Runtime Error in model product_metrics (models/marts/product/product_metrics.sql)
  Binder Error: Values list "o" does not have a column named "product_id"
  
  LINE 47:         on p.product_id = o.product_id
15:55:02.857 | ERROR   | Task run 'model product_metrics' - Finished in state Failed('Task run encountered an exception Exception: Node model.demo.product_metrics finished with status error')
The task runs created by calling .invoke() run separately from dbt Core, and do not affect dbt’s execution behavior. These tasks do not persist results and cannot be cached.Use dbt’s native retry functionality in combination with runtime data from prefect to retry failed nodes.
from prefect import flow
from prefect.runtime.flow_run import get_run_count
from prefect_dbt import PrefectDbtRunner


@flow(retries=2)
def run_dbt():
    runner = PrefectDbtRunner()

    if get_run_count() == 1:
        runner.invoke(["build"])
    else:
        runner.invoke(["retry"])


if __name__ == "__main__":
    run_dbt()

Assets

Prefect Cloud maintains a graph of assets, objects produced by your workflows. Any dbt seed, source or model will appear on your asset graph in Prefect Cloud once it has been executed using the PrefectDbtRunner. The upstream dependencies of an asset materialized by prefect-dbt are derived from the depends_on field in dbt’s manifest.json. The asset’s key will be its corresponding dbt resource’s relation_name. The name asset property is derived from the dbt resource’s relation_name with adapter-specific quoting characters removed (for example, "dev"."main_marts"."product_metrics" becomes dev.main_marts.product_metrics). The description property is populated from the dbt resource’s description. The owners asset property is populated if there is data assigned to the owner key under a resource’s meta config.
models:
  - name: product_metrics
    description: "Product metrics and categorization"
    config:
      meta:
        owner: "kevin-g"
Asset metadata is collected from the result of the node’s execution.
{
  "node_path": "marts/product/product_metrics.sql",
  "node_name": "product_metrics",
  "unique_id": "model.demo.product_metrics",
  "resource_type": "model",
  "materialized": "table",
  "node_status": "error",
  "node_started_at": "2025-06-26T20:55:05.661126",
  "node_finished_at": "2025-06-26T20:55:05.733257",
  "meta": {
    "owner": "kevin-g"
  },
  "node_relation": {
    "database": "dev",
    "schema": "main_marts",
    "alias": "product_metrics",
    "relation_name": "\"dev\".\"main_marts\".\"product_metrics\""
  }
}
Optionally, the compiled code of a dbt model can be appended to the asset description.
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(include_compiled_code=True).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

dbt settings

The PrefectDbtSettings class, based on Pydantic’s BaseSettings class, automatically detects DBT_-prefixed environment variables that have a direct effect on the PrefectDbtRunner class. If no environment variables are set, dbt’s defaults are used. Provide a PrefectDbtSettings instance to PrefectDbtRunner to customize dbt settings or override environment variables.
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt"
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

Logging

The PrefectDbtRunner class maps all dbt log levels to standard Python logging levels, so filtering for log levels like WARNING or ERROR in the Prefect UI applies to dbt’s logs. By default, the logging level used by dbt is Prefect’s logging level, which can be configured using the PREFECT_LOGGING_LEVEL Prefect setting. The dbt logging level can be set independently from Prefect’s by using the DBT_LOG_LEVEL environment variable, setting log_level in PrefectDbtSettings, or passing the --log-level flag or log_level kwarg to .invoke(). Only logging levels of higher severity (more restrictive) than Prefect’s logging level will have an effect.
from dbt_common.events.base_types import EventLevel
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt",
            log_level=EventLevel.ERROR, # explicitly choose a higher log level for dbt
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

profiles.yml templating

The PrefectDbtRunner class supports templating in your profiles.yml file, allowing you to reference Prefect blocks and variables that will be resolved at runtime. This enables you to store sensitive credentials securely using Prefect blocks, and configure different targets based on the Prefect workspace. For example, a Prefect variable called target can have a different value in development (dev) and production (prod) workspaces. This allows you to use the same profiles.yml file to automatically reference a local DuckDB instance in development and a Snowflake instance in production.
example:
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: snowflake
      account: "{{ prefect.blocks.snowflake-credentials.warehouse-access.account }}"
      user: "{{ prefect.blocks.snowflake-credentials.warehouse-access.user }}"
      password: "{{ prefect.blocks.snowflake-credentials.warehouse-access.password }}"
      database: "{{ prefect.blocks.snowflake-connector.prod-connector.database }}"
      schema: "{{ prefect.blocks.snowflake-connector.prod-connector.schema }}"
      warehouse: "{{ prefect.blocks.snowflake-connector.prod-connector.warehouse }}"
      threads: 4

  target: "{{ prefect.variables.target }}"

Failure handling

By default, any dbt node execution failures cause the entire dbt run to raise an exception with a message containing detailed information about the failure.
Failures detected during invocation of dbt command 'build':
Test not_null_my_first_dbt_model_id failed with message: "Got 1 result, configured to fail if != 0"
The PrefectDbtRunner’s raise_on_failure option can be set to False to prevent failures in dbt from causing the failure of the flow or task in which .invoke() is called.
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(
        raise_on_failure=False  # Failed tests will not fail the flow run
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

Native dbt configuration

You can disable automatic asset lineage detection for all resources in your dbt project config, or for specific resources in their own config:
prefect:
  enable_assets: False

See also