> ## Documentation Index
> Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
> Use this file to discover all available pages before exploring further.

# PrefectDbtRunner

> Run dbt Core commands from a Prefect flow with asset lineage, lifecycle hooks, and improved logging

Versions 0.7.0 and later of `prefect-dbt` include the `PrefectDbtRunner` class, which provides an improved interface for running dbt Core commands with better logging, failure handling, and automatic asset lineage.

<Tip>
  The `PrefectDbtRunner` is inspired by the `DbtRunner` from dbt Core, and its `invoke` method accepts the same arguments.
  Refer to the [`DbtRunner` documentation](https://docs.getdbt.com/reference/programmatic-invocations) for more information on how to call `invoke`.
</Tip>

## Basic usage

```python theme={null}
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner().invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

When calling `.invoke()` in a flow or task, each node in dbt's execution graph is reflected as a task in Prefect's execution graph.
Logs from each node will belong to the corresponding task, and each task's state is determined by the state of that node's execution.

```bash theme={null}
15:54:59.119 | INFO    | Flow run 'imposing-partridge' - Found 8 models, 3 seeds, 18 data tests, 543 macros
15:54:59.134 | INFO    | Flow run 'imposing-partridge' -
15:54:59.148 | INFO    | Flow run 'imposing-partridge' - Concurrency: 1 threads (target='dev')
15:54:59.164 | INFO    | Flow run 'imposing-partridge' -
15:54:59.665 | INFO    | Task run 'model my_first_dbt_model' - 1 of 29 OK created sql table model main.my_first_dbt_model ..................... [OK in 0.18s]
15:54:59.671 | INFO    | Task run 'model my_first_dbt_model' - Finished in state Completed()
...
15:55:02.373 | ERROR   | Task run 'model product_metrics' -   Runtime Error in model product_metrics (models/marts/product/product_metrics.sql)
  Binder Error: Values list "o" does not have a column named "product_id"

  LINE 47:         on p.product_id = o.product_id
15:55:02.857 | ERROR   | Task run 'model product_metrics' - Finished in state Failed('Task run encountered an exception Exception: Node model.demo.product_metrics finished with status error')
```

<Warning>
  The task runs created by calling `.invoke()` run separately from dbt Core, and do not affect dbt's execution behavior.
  These tasks do not persist results and cannot be cached.

  Use [dbt's native retry functionality](https://docs.getdbt.com/reference/commands/retry) in combination with [runtime data from `prefect`](/v3/how-to-guides/workflows/access-runtime-info) to retry failed nodes.

  ```python theme={null}
  from prefect import flow
  from prefect.runtime.flow_run import get_run_count
  from prefect_dbt import PrefectDbtRunner


  @flow(retries=2)
  def run_dbt():
      runner = PrefectDbtRunner()

      if get_run_count() == 1:
          runner.invoke(["build"])
      else:
          runner.invoke(["retry"])


  if __name__ == "__main__":
      run_dbt()
  ```
</Warning>

## Assets

Prefect Cloud maintains a graph of [assets](/v3/concepts/assets), objects produced by your workflows.

Any dbt seed, source or model will appear on your asset graph in Prefect Cloud once it has been executed using the `PrefectDbtRunner`.
The upstream dependencies of an asset materialized by `prefect-dbt` are derived from the `depends_on` field in dbt's `manifest.json`.

The asset's `key` will be its corresponding dbt resource's `relation_name`.

The `name` asset property is derived from the dbt resource's `relation_name` with adapter-specific quoting characters removed (for example, `"dev"."main_marts"."product_metrics"` becomes `dev.main_marts.product_metrics`). The `description` property is populated from the dbt resource's description.

The `owners` asset property is populated if there is data assigned to the `owner` key under a resource's `meta` config.

```yaml theme={null}
models:
  - name: product_metrics
    description: "Product metrics and categorization"
    config:
      meta:
        owner: "kevin-g"
```

Asset metadata is collected from the result of the node's execution.

```json theme={null}
{
  "node_path": "marts/product/product_metrics.sql",
  "node_name": "product_metrics",
  "unique_id": "model.demo.product_metrics",
  "resource_type": "model",
  "materialized": "table",
  "node_status": "error",
  "node_started_at": "2025-06-26T20:55:05.661126",
  "node_finished_at": "2025-06-26T20:55:05.733257",
  "meta": {
    "owner": "kevin-g"
  },
  "node_relation": {
    "database": "dev",
    "schema": "main_marts",
    "alias": "product_metrics",
    "relation_name": "\"dev\".\"main_marts\".\"product_metrics\""
  }
}
```

Optionally, the compiled code of a dbt model can be appended to the asset description.

```python theme={null}
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(include_compiled_code=True).invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

## dbt settings

The `PrefectDbtSettings` class, based on Pydantic's `BaseSettings` class, automatically detects `DBT_`-prefixed environment variables that have a direct effect on the `PrefectDbtRunner` class.
If no environment variables are set, dbt's defaults are used.

Provide a `PrefectDbtSettings` instance to `PrefectDbtRunner` to customize dbt settings or override environment variables.

```python theme={null}
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt"
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

## Logging

The `PrefectDbtRunner` class maps all dbt log levels to standard Python logging levels, so filtering for log levels like `WARNING` or `ERROR` in the Prefect UI applies to dbt's logs.

By default, the logging level used by dbt is Prefect's logging level, which can be configured using the `PREFECT_LOGGING_LEVEL` Prefect setting.

The dbt logging level can be set independently from Prefect's by using the `DBT_LOG_LEVEL` environment variable, setting `log_level` in `PrefectDbtSettings`, or passing the `--log-level` flag or `log_level` kwarg to `.invoke()`.
Only logging levels of higher severity (more restrictive) than Prefect's logging level will have an effect.

```python theme={null}
from dbt_common.events.base_types import EventLevel
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt",
            log_level=EventLevel.ERROR, # explicitly choose a higher log level for dbt
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

## `profiles.yml` templating

The `PrefectDbtRunner` class supports templating in your `profiles.yml` file, allowing you to reference Prefect blocks and variables that will be resolved at runtime.
This enables you to store sensitive credentials securely using Prefect blocks, and configure different targets based on the Prefect workspace.

For example, a Prefect variable called `target` can have a different value in development (`dev`) and production (`prod`) workspaces.
This allows you to use the same `profiles.yml` file to automatically reference a local DuckDB instance in development and a Snowflake instance in production.

```yaml theme={null}
example:
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: snowflake
      account: "{{ prefect.blocks.snowflake-credentials.warehouse-access.account }}"
      user: "{{ prefect.blocks.snowflake-credentials.warehouse-access.user }}"
      password: "{{ prefect.blocks.snowflake-credentials.warehouse-access.password }}"
      database: "{{ prefect.blocks.snowflake-connector.prod-connector.database }}"
      schema: "{{ prefect.blocks.snowflake-connector.prod-connector.schema }}"
      warehouse: "{{ prefect.blocks.snowflake-connector.prod-connector.warehouse }}"
      threads: 4

  target: "{{ prefect.variables.target }}"
```

## Failure handling

By default, any dbt node execution failures cause the entire dbt run to raise an exception with a message containing detailed information about the failure.

```
Failures detected during invocation of dbt command 'build':
Test not_null_my_first_dbt_model_id failed with message: "Got 1 result, configured to fail if != 0"
```

The `PrefectDbtRunner`'s `raise_on_failure` option can be set to `False` to prevent failures in dbt from causing the failure of the flow or task in which `.invoke()` is called.

```python theme={null}
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(
        raise_on_failure=False  # Failed tests will not fail the flow run
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

## Native dbt configuration

You can disable automatic asset lineage detection for all resources in your dbt project config, or for specific resources in their own config:

```yaml theme={null}
prefect:
  enable_assets: False
```

## Lifecycle hooks

<Note>
  Lifecycle hooks are available in `prefect-dbt` 0.7.24 and later.
</Note>

`PrefectDbtRunner` supports decorator-based lifecycle hooks that let you react to events during a dbt invocation.
Hooks are registered on a runner instance and receive a `DbtHookContext` with information about the event.

```python theme={null}
from prefect import flow
from prefect_dbt import PrefectDbtRunner, DbtHookContext


runner = PrefectDbtRunner()


@runner.on_run_start
def before_build(ctx: DbtHookContext):
    print(f"Starting dbt command: {ctx.command} with args: {ctx.args}")


@runner.post_model(select="tag:critical")
def on_critical_done(ctx: DbtHookContext):
    if ctx.status == "error":
        print(f"Critical model {ctx.node_id} failed: {ctx.error}")


@runner.on_run_end(select="tag:marts")
def after_marts(ctx: DbtHookContext):
    print(f"Run finished with status: {ctx.status}")
    if ctx.run_results:
        print(f"Matched nodes: {len(ctx.run_results)}")


@flow
def run_dbt():
    runner.invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

### Available hooks

| Hook                                | Level | Accepts `select=` | When it runs                                           |
| ----------------------------------- | ----- | ----------------- | ------------------------------------------------------ |
| <nobr>`@runner.on_run_start`</nobr> | Run   | No                | Before dbt begins executing any nodes                  |
| <nobr>`@runner.post_model`</nobr>   | Node  | Yes (optional)    | After each model node finishes                         |
| <nobr>`@runner.on_run_end`</nobr>   | Run   | Yes (optional)    | After all nodes have finished and dbt returns a result |

When `select=` is provided, the hook only fires for nodes matching the given [dbt selector](https://docs.getdbt.com/reference/node-selection/syntax).
For `on_run_end`, a selector filters by the set of node IDs that participated in the run.

### `DbtHookContext` fields

Each hook receives a `DbtHookContext` dataclass. The fields available depend on the hook event:

| Field         | Type                   | `on_run_start`                   | `post_model`                           | `on_run_end`                                   |
| ------------- | ---------------------- | -------------------------------- | -------------------------------------- | ---------------------------------------------- |
| `event`       | `str`                  | `"run_start"`                    | `"post_model"`                         | `"run_end"`                                    |
| `command`     | `str`                  | The dbt command (e.g. `"build"`) | Same                                   | Same                                           |
| `owner`       | `PrefectDbtRunner`     | The runner instance              | Same                                   | Same                                           |
| `args`        | `tuple[str, ...]`      | CLI args passed to `invoke`      | Same                                   | Same                                           |
| `node`        | `ManifestNode \| None` | `None`                           | The dbt manifest node                  | `None`                                         |
| `node_id`     | `str \| None`          | `None`                           | The node's `unique_id`                 | `None`                                         |
| `status`      | `str \| None`          | `None`                           | `"success"`, `"error"`, or `"skipped"` | `"success"` or `"error"` (overall run)         |
| `result`      | `dict \| None`         | `None`                           | Event data and message from dbt        | `{"success": bool}`                            |
| `run_results` | `dict \| None`         | `None`                           | `None`                                 | Per-node result artifacts keyed by `unique_id` |
| `error`       | `Any`                  | `None`                           | Error message if the node failed       | Exception if the run failed                    |
| `node_ids`    | `tuple[str, ...]`      | `()`                             | Contains the single node ID            | All node IDs from run results                  |

### Error handling

Hooks are best-effort: if a hook raises an exception, Prefect logs a warning and dbt execution continues normally.
Do not use hooks as failure-control mechanisms for dbt — use dbt's own [`on-run-end`](https://docs.getdbt.com/reference/project-configs/on-run-start-on-run-end) hooks
or [Prefect automations](/v3/how-to-guides/automations/creating-automations) for critical failure responses.

<Warning>
  Hooks run synchronously in the runner's process. Long-running hook logic delays dbt node processing and the `on_run_end` hook.
  Keep hooks lightweight — emit metrics, log information, or enqueue work rather than performing expensive operations inline.
</Warning>

### Recipes

The examples below use placeholder functions (`emit_metric`, `notify`, etc.) for external systems — wire them up to your own metrics or notification client.

#### Emit a metric when a critical model fails

Combine `post_model(select=...)` with `ctx.status` to react to failures of important models without filtering inside the hook body. The selector uses [dbt's node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax), so any tag, FQN, or graph operator your project uses will work.

```python theme={null}
from prefect_dbt import DbtHookContext, PrefectDbtRunner


def emit_metric(name: str, tags: dict[str, str]) -> None:
    """Replace with your metrics client (Datadog, statsd, OpenTelemetry, ...)."""
    ...


runner = PrefectDbtRunner()


@runner.post_model(select="tag:critical")
def alert_on_critical_failure(ctx: DbtHookContext) -> None:
    if ctx.status != "error":
        return
    emit_metric(
        "dbt.critical_model.failed",
        tags={
            "node_id": ctx.node_id or "unknown",
            "command": ctx.command,
        },
    )


runner.invoke(["build"])
```

#### Send a run summary after dbt completes

Use `on_run_end` to log or post a summary derived from `ctx.run_results` and `ctx.status`. The hook fires for both successful and failed runs.

```python theme={null}
from collections import Counter

from prefect_dbt import DbtHookContext, PrefectDbtRunner


def notify(channel: str, message: str) -> None:
    """Replace with Slack, email, PagerDuty, etc."""
    ...


runner = PrefectDbtRunner()


@runner.on_run_end
def summarize_run(ctx: DbtHookContext) -> None:
    results = ctx.run_results or {}
    statuses = Counter(node.get("status") for node in results.values())
    notify(
        "#data-platform",
        f"dbt {ctx.command} finished with status={ctx.status}; "
        f"per-node: {dict(statuses)}",
    )


runner.invoke(["build"])
```

#### Inspect `run_results` safely

`ctx.run_results` is `None` when dbt could not produce results (for example, a parse failure during compile), and individual entries may be missing fields depending on the dbt version. Use `.get()` with defaults rather than indexing.

```python theme={null}
from prefect_dbt import DbtHookContext, PrefectDbtRunner

runner = PrefectDbtRunner()


@runner.on_run_end
def collect_long_running_models(ctx: DbtHookContext) -> None:
    if ctx.status == "error" and ctx.error is not None:
        # Compile-time errors leave run_results unset.
        print(f"dbt run did not produce results: {ctx.error}")
        return

    slow_models: list[tuple[str, float]] = []
    for unique_id, result in (ctx.run_results or {}).items():
        if not unique_id.startswith("model."):
            continue
        execution_time = float(result.get("execution_time") or 0.0)
        if execution_time >= 30.0:
            slow_models.append((unique_id, execution_time))

    for unique_id, execution_time in sorted(
        slow_models, key=lambda item: item[1], reverse=True
    ):
        print(f"slow model {unique_id}: {execution_time:.1f}s")


runner.invoke(["build"])
```

#### Use `select=` with `post_model` and `on_run_end`

Both `post_model` and `on_run_end` accept dbt selectors. `on_run_start` does not — it always sees the full set of nodes scheduled for the run, and passing `select=` raises `TypeError`.

```python theme={null}
from prefect_dbt import DbtHookContext, PrefectDbtRunner

runner = PrefectDbtRunner()


@runner.on_run_start
def before_run(ctx: DbtHookContext) -> None:
    # Filter inside the hook if you only care about certain nodes.
    mart_nodes = [nid for nid in ctx.node_ids if nid.startswith("model.marts.")]
    print(f"about to run {len(mart_nodes)} mart models")


@runner.post_model(select="tag:pii")
def audit_pii_models(ctx: DbtHookContext) -> None:
    print(f"finished PII model {ctx.node_id} with status={ctx.status}")


@runner.on_run_end(select="tag:marts")
def after_marts(ctx: DbtHookContext) -> None:
    # Fires only when at least one mart node participated in the run.
    print(f"marts touched in command {ctx.command!r}; status={ctx.status}")


runner.invoke(["build"])
```

<Note>
  The selector is resolved against the project manifest once per invocation. If a selector does not match any node in the resolved manifest (or fails to resolve), the hook silently fires for nothing — check your selector with `dbt ls --select <selector>` if a hook isn't running.
</Note>

## See also

* [SDK reference](/integrations/prefect-dbt/api-ref/prefect_dbt-core-runner) — full `PrefectDbtRunner` API.
