> ## Documentation Index
> Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
> Use this file to discover all available pages before exploring further.

# PrefectDbtOrchestrator

> Per-node and per-wave dbt orchestration with retries, caching, and asset lineage

<Warning>
  **Beta.** The `PrefectDbtOrchestrator`, `ExecutionMode`, `TestStrategy`, `CacheConfig`, and `BuildPlan` symbols currently live in a private module (`prefect_dbt.core._orchestrator`) and will move to public exports in a future release. The API is stable enough for production use, but import paths may change — watch the [release notes](/v3/release-notes/integrations/prefect-dbt) for the public-export announcement.
</Warning>

## Overview

The `PrefectDbtOrchestrator` gives Prefect full control over dbt DAG execution. Instead of running `dbt build` as a single batch operation, the orchestrator parses your manifest, computes execution waves, and runs each node or wave as a Prefect task — enabling per-node retries, Prefect-native concurrency control, automatic downstream skipping on failure, and fine-grained observability.

If you instead want dbt to drive execution with Prefect observing from the outside, see [`PrefectDbtRunner`](/integrations/prefect-dbt/runner).

## Install

`PrefectDbtOrchestrator` first shipped in `prefect-dbt` 0.7.17, but this guide exercises APIs that landed later. Pin `prefect-dbt>=0.7.23` to match every example on this page:

```bash theme={null}
pip install "prefect[dbt]" "prefect-dbt>=0.7.23"
```

Feature floors, for reference:

| Feature                                                                        | Minimum version |
| ------------------------------------------------------------------------------ | --------------- |
| `PrefectDbtOrchestrator` class, `ExecutionMode`, `TestStrategy`, `run_build()` | 0.7.17          |
| `DbtCloudExecutor`                                                             | 0.7.19          |
| `CacheConfig`, `plan()` / `BuildPlan`                                          | 0.7.20          |
| `raise_on_failure` constructor argument                                        | 0.7.23          |

See the [integration overview](/integrations/prefect-dbt) for adapter-specific extras.

## Quick start

```python theme={null}
from pathlib import Path
from prefect import flow
from prefect_dbt import PrefectDbtSettings
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, ExecutionMode


@flow
def run_dbt_build():
    settings = PrefectDbtSettings(
        project_dir=Path("./my_dbt_project"),
        profiles_dir=Path.home() / ".dbt",
    )

    orchestrator = PrefectDbtOrchestrator(
        settings=settings,
        execution_mode=ExecutionMode.PER_NODE,
        concurrency=4,           # max 4 nodes running in parallel
        threads=2,               # dbt threads within each node
        retries=3,               # each node retries up to 3 times
        retry_delay_seconds=60,
    )

    # manifest.json is auto-generated if not found
    results = orchestrator.run_build(select="tag:daily")

    by_status = {}
    for r in results.values():
        by_status[r["status"]] = by_status.get(r["status"], 0) + 1
    print(f"Results: {by_status}")
```

<Info>
  `run_build()` automatically runs `dbt parse` to generate `manifest.json` if none is found in the project's target directory. Pass `manifest_path` directly to the orchestrator constructor to use a pre-built manifest (e.g. from a CI artifact).
</Info>

<Info>
  When the default `DbtCoreExecutor` has to generate a manifest itself (no `manifest.json` at the target path and no `manifest_path` passed), it first runs `dbt deps` to install packages declared in `packages.yml`, then `dbt parse`. If you pass a pre-built `manifest_path` or supply a custom executor, this does not happen — make sure your image already has the required dbt packages installed. To keep the default executor but skip the automatic `dbt deps`:

  ```python theme={null}
  from prefect_dbt.core._executor import DbtCoreExecutor

  orchestrator = PrefectDbtOrchestrator(
      settings=settings,
      executor=DbtCoreExecutor(settings, run_deps=False),
  )
  ```

  Always pass `settings=` alongside a custom `executor=` — the orchestrator itself still uses `settings.project_dir` and `settings.profiles_dir` for selector resolution, source freshness, and artifact paths. Omitting it makes the orchestrator fall back to a default `PrefectDbtSettings()`, which only works when the current working directory already contains a valid dbt project and profile.
</Info>

By default, `run_build()` raises `DbtBuildFailed` if any node finishes with `"error"` status, so a failing dbt build fails the enclosing flow run. Pass `raise_on_failure=False` to the orchestrator to inspect the results dict on partial failures instead.

## Execution modes

### `PER_WAVE` (default)

Each wave of independent nodes runs as a single `dbt build` invocation. Lowest overhead. Nodes inside a wave that finish with per-node artifacts still appear in the results dict with their real status — only the nodes that actually failed are marked `"error"` — but once any node in a wave fails, all downstream waves are skipped as a group.

```python theme={null}
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_WAVE,
)
```

### `PER_NODE`

Each node becomes a separate Prefect task in its own subprocess. Failed nodes retry independently; other nodes in the same wave continue; downstream dependents of failed nodes are skipped.

```python theme={null}
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    retries=2,
    concurrency=4,
)
```

| Mode       | Overhead                            | Best for                                      |
| ---------- | ----------------------------------- | --------------------------------------------- |
| `PER_WAVE` | Lower (one dbt invocation per wave) | CI/CD, dev iterations, stable DAGs            |
| `PER_NODE` | Higher (one invocation per node)    | Production, flaky nodes, fine-grained control |

<Warning>
  `retries` and `cache` are only supported in `PER_NODE` mode. Passing either with `PER_WAVE` raises `ValueError`.
</Warning>

## Result format

`run_build()` returns a dict mapping `unique_id` to a result. Each result has a `"status"` of `"success"`, `"cached"`, `"error"`, or `"skipped"`:

```python theme={null}
{
    "model.analytics.stg_users": {
        "status": "success",
        "timing": {"started_at": "...", "completed_at": "...", "duration_seconds": 5.2},
        "invocation": {"command": "build", "args": ["--select", "..."]},
    },
    "model.analytics.stg_orders": {
        "status": "error",
        "error": {"message": "relation does not exist", "type": "RuntimeError"},
    },
    "model.analytics.dim_products": {
        "status": "cached",
    },
    "model.analytics.order_summary": {
        "status": "skipped",
        "reason": "upstream failure",
        "failed_upstream": ["model.analytics.stg_orders"],
    },
}
```

## Test strategies

The `test_strategy` parameter controls when dbt tests run relative to models. The default is `IMMEDIATE`, matching `dbt build` semantics.

```python theme={null}
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, TestStrategy

orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    test_strategy=TestStrategy.IMMEDIATE,  # default
)
```

| Strategy              | Behavior                                                                                                                            |
| --------------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `IMMEDIATE` (default) | Tests run interleaved with models — each test runs in the wave after all its parent models complete. Matches `dbt build` semantics. |
| `DEFERRED`            | All model waves execute first, then all tests run together in a final wave.                                                         |
| `SKIP`                | Tests are excluded from execution entirely.                                                                                         |

<Tip>
  Use `DEFERRED` when you want to see all model results before deciding whether to run tests, or to parallelize model execution more aggressively. Use `SKIP` for fast iteration when you want to skip test overhead.
</Tip>

## Caching

Enable cross-run caching to skip nodes whose SQL, config, and upstream dependencies haven't changed since the last successful run. Only available in `PER_NODE` mode.

Pass a `CacheConfig` instance to consolidate all caching options:

```python theme={null}
from datetime import timedelta
from prefect_dbt.core._orchestrator import (
    PrefectDbtOrchestrator,
    CacheConfig,
    ExecutionMode,
)

orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(
        expiration=timedelta(hours=24),  # invalidate after 24h
    ),
)
```

The cache key for each node is derived from:

* the hash of the node's source SQL (or CSV for seeds) — **not** the compiled SQL, so config changes that only affect compilation via vars or environment will not invalidate the key on their own
* the node's dbt config
* the node's relation name
* the `full_refresh` flag
* the macros the node depends on (each macro's content is hashed and folded in)
* upstream nodes' cache keys (changes cascade downstream)

When a cached result is valid, the node's task returns immediately without invoking dbt. Cached results appear in the result dict with `"status": "cached"` (see [Result format](#result-format) above).

### `CacheConfig` fields

| Field                             | Type                  | Default                  | Description                                                                                                                             |
| --------------------------------- | --------------------- | ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------- |
| `expiration`                      | `timedelta \| None`   | `None`                   | Global TTL for cached results. `None` means no expiration.                                                                              |
| `result_storage`                  | `Path \| str \| None` | `None`                   | Storage for cached results. A `Path` for local storage, a block slug (e.g. `"s3-bucket/my-results"`) for remote, or `None` for default. |
| `key_storage`                     | `Path \| str \| None` | `None`                   | Storage for cache keys. Same format as `result_storage`.                                                                                |
| `use_source_freshness_expiration` | `bool`                | `False`                  | Derive each node's TTL from its upstream source freshness thresholds, overriding `expiration` per-node.                                 |
| `exclude_materializations`        | `frozenset[str]`      | `{"incremental"}`        | Materializations excluded from caching.                                                                                                 |
| `exclude_resource_types`          | `frozenset[NodeType]` | `{Test, Snapshot, Unit}` | Resource types excluded from caching.                                                                                                   |

### Custom cache storage

Block slugs follow the format `"block-type/block-name"` (for example `"s3-bucket/my-results"`). The block must be saved in Prefect before use. See [How to persist workflow results](/v3/develop/results) for a walkthrough on creating and saving storage blocks.

```python theme={null}
from pathlib import Path

# Option 1: local path
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(
        result_storage=Path("/tmp/dbt-results"),
        key_storage=Path("/tmp/dbt-cache-keys"),
    ),
)

# Option 2: block slug (block must be saved in Prefect)
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(
        result_storage="s3-bucket/my-results-block",
        key_storage="s3-bucket/my-cache-keys-block",
    ),
)
```

### Cache expiration from source freshness

When `use_source_freshness_expiration=True`, each node's cache TTL is derived from the freshness thresholds of its upstream sources — a node backed by an hourly source won't be cached longer than the source's warn threshold.

```python theme={null}
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(
        use_source_freshness_expiration=True,  # overrides expiration per node
    ),
)
```

<Info>
  `use_source_freshness_expiration` overrides the global `expiration` on a per-node basis.
</Info>

## Dry-run with `plan()`

Preview what `run_build()` would execute without actually running dbt models. The `plan()` method performs manifest parsing, selector resolution, and wave computation, but only invokes `dbt ls` (for selector resolution) and `dbt source freshness` (when enabled) — no models are built.

<Info>
  `plan()` is not free on the first call. With the default `DbtCoreExecutor`, if no `manifest.json` exists at the target path and no `manifest_path` was passed, the executor will run `dbt deps` followed by `dbt parse` before wave computation. To make `plan()` cheap, point `manifest_path` at a pre-built manifest (e.g. a CI artifact).
</Info>

```python theme={null}
from prefect_dbt.core._orchestrator import (
    PrefectDbtOrchestrator,
    ExecutionMode,
    CacheConfig,
)

orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(),
)

plan = orchestrator.plan(select="tag:daily")
print(plan)
# BuildPlan: 12 node(s) in 4 wave(s)  |  max parallelism = 5
#
#   Wave 0 (3 node(s)):
#     - model.analytics.stg_users [model, view] (cache: hit)
#     - model.analytics.stg_orders [model, view] (cache: miss)
#     ...
#
#   Cache: 7 hit(s), 3 miss(es), 2 excluded
```

The returned `BuildPlan` includes:

| Field                   | Description                                                                                                                                                                                                                     |
| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `waves`                 | Execution waves in topological order, each containing the nodes that can run in parallel.                                                                                                                                       |
| `node_count`            | Total nodes across all waves.                                                                                                                                                                                                   |
| `cache_predictions`     | Per-node `"hit"` / `"miss"` / `"excluded"` predictions (only when caching is configured).                                                                                                                                       |
| `skipped_nodes`         | Nodes dropped by source-freshness checks, with reason. Nodes that don't match the `select` / `exclude` filters are pruned from the graph and are **not** surfaced here.                                                         |
| `estimated_parallelism` | Width of the largest wave. Accurate under strict wave-barrier execution; in `PER_NODE` mode, eager scheduling can let nodes from wave N+1 start before all of wave N has finished, so peak concurrency may briefly exceed this. |

`plan()` accepts the same parameters as `run_build()`: `select`, `exclude`, `full_refresh`, `only_fresh_sources`, `target`, and `extra_cli_args`.

## Artifacts and asset tracking

### Summary artifact

When invoked from inside a flow run, `run_build()` creates a Prefect markdown artifact summarizing results — nodes executed, statuses, and timings. You can optionally write a dbt-compatible `run_results.json`:

```python theme={null}
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    create_summary_artifact=True,   # default — set False to disable
    write_run_results=True,         # write run_results.json to target_path
)
```

<Info>
  `include_compiled_code=True` does **not** affect the summary artifact — the artifact is rendered from the results dict alone. The flag currently only adds compiled SQL to the asset descriptions attached to each `MaterializingTask` in `PER_NODE` mode.
</Info>

Output locations:

* The summary artifact is created with key `dbt-orchestrator-summary`. Creation requires an active flow run context — if you call `run_build()` from a plain Python script (supported in `PER_WAVE` mode), the summary step is skipped even with `create_summary_artifact=True`.
* `run_results.json` is written inside `settings.target_path` (defaults to `<project_dir>/target/run_results.json`, but moves with `target_path` or a custom `manifest_path`).

To access the artifact programmatically, filter by key via `ArtifactFilter` — `read_artifacts` ignores unknown kwargs, so a bare `key=...` argument silently returns every artifact in the workspace:

```python theme={null}
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey

async with get_client() as client:
    artifacts = await client.read_artifacts(
        artifact_filter=ArtifactFilter(key=ArtifactFilterKey(any_=["dbt-orchestrator-summary"])),
    )
```

<Info>
  The artifact key `dbt-orchestrator-summary` is fixed and not currently configurable.
</Info>

### Prefect asset lineage

In `PER_NODE` mode, each model, seed, and snapshot creates a Prefect [asset](/v3/concepts/assets) and tracks upstream lineage automatically — integrating dbt's data lineage into Prefect's asset catalog.

Assets are named using the relation name (for example `snowflake://my_db/analytics/stg_users`). Upstream assets, including dbt sources, are tracked as asset dependencies.

To disable asset creation:

```python theme={null}
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    disable_assets=True,
)
```

## `run_build()` parameters

### `target`

Override the dbt target at run time without changing `profiles.yml`:

```python theme={null}
results = orchestrator.run_build(
    select="tag:daily",
    target="prod",
)
```

### `extra_cli_args`

Pass additional CLI flags directly to dbt invocations. Useful for flags not exposed as first-class orchestrator parameters:

```python theme={null}
results = orchestrator.run_build(
    select="tag:daily",
    extra_cli_args=["--vars", '{"run_date": "2024-01-15"}', "--no-partial-parse"],
)
```

<Warning>
  Some flags are **blocked** because they conflict with the orchestrator's internal logic and passing them raises `ValueError`:

  `--select`, `--models`, `--exclude`, `--selector`, `--indirect-selection`, `--project-dir`, `--target-path`, `--profiles-dir`, `--log-level`.

  Others are **first-class parameters** — use the orchestrator directly instead:

  | CLI flag         | Use instead                                    |
  | ---------------- | ---------------------------------------------- |
  | `--full-refresh` | `run_build(full_refresh=True)`                 |
  | `--target`       | `run_build(target="...")`                      |
  | `--threads`      | `PrefectDbtOrchestrator(threads=N)`            |
  | `--defer`        | `PrefectDbtOrchestrator(defer=True)`           |
  | `--state`        | `PrefectDbtOrchestrator(state_path=Path(...))` |
  | `--defer-state`  | `PrefectDbtOrchestrator(defer_state_path=...)` |
  | `--favor-state`  | `PrefectDbtOrchestrator(favor_state=True)`     |

  Caveat flags (`--resource-type`, `--exclude-resource-type`, `--fail-fast`) are accepted but log a warning because they can silently interact with orchestrator-managed test scheduling.
</Warning>

## State-based execution (CI/CD)

Use dbt's state flags to run only modified models and defer to production for everything else:

```python theme={null}
@flow
def run_ci_incremental():
    orchestrator = PrefectDbtOrchestrator(
        settings=PrefectDbtSettings(project_dir=Path("./analytics")),
        state_path=Path("./prod_artifacts"),
        defer=True,
        execution_mode=ExecutionMode.PER_WAVE,
    )
    return orchestrator.run_build(select="state:modified+")
```

Also supports `defer_state_path` and `favor_state` for more advanced deferral scenarios.

## dbt Cloud execution

The `DbtCloudExecutor` lets you run dbt nodes via **dbt Cloud ephemeral jobs** instead of local dbt-core. The orchestrator creates a temporary job in dbt Cloud for each node (or wave), polls until completion, fetches results, and deletes the job — giving you all orchestrator features (retries, caching, test strategies, assets) backed by dbt Cloud's managed infrastructure.

### Credentials setup

Create or load a `DbtCloudCredentials` block with your dbt Cloud API key and account ID:

```python theme={null}
from prefect_dbt.cloud import DbtCloudCredentials

# Option 1: create inline
credentials = DbtCloudCredentials(
    api_key="dbc_your_api_key_here",
    account_id=12345,
)

# Option 2: load a saved block
credentials = DbtCloudCredentials.load("my-dbt-cloud-creds")
```

### Quick start

```python theme={null}
from pathlib import Path
from prefect import flow
from prefect_dbt import PrefectDbtSettings
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudExecutor
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, ExecutionMode


@flow
def run_dbt_cloud_build():
    credentials = DbtCloudCredentials.load("my-dbt-cloud-creds")

    executor = DbtCloudExecutor(
        credentials=credentials,
        project_id=123456,
        environment_id=654321,
        defer_to_job_id=789012,  # fetch manifest from this job's last run
    )

    orchestrator = PrefectDbtOrchestrator(
        executor=executor,
        settings=PrefectDbtSettings(
            project_dir=Path("./my_dbt_project"),
            profiles_dir=Path.home() / ".dbt",
        ),
        execution_mode=ExecutionMode.PER_NODE,
        concurrency=4,
        retries=2,
        retry_delay_seconds=30,
    )

    return orchestrator.run_build(select="tag:daily")
```

<Info>
  Even with a `DbtCloudExecutor`, the orchestrator still reads `settings.project_dir` and `settings.profiles_dir` for `dbt build --select` resolution, source-freshness handling, and artifact paths. Pass an explicit `settings=PrefectDbtSettings(...)` pointing at a local checkout of the same project — omitting it only works when the flow already runs inside a valid local dbt project with a usable default profile. The orchestrator's `state_path`, `defer`, `defer_state_path`, and `favor_state` parameters, however, are specific to `DbtCoreExecutor` and have no effect when a custom executor is used.
</Info>

### Finding your dbt Cloud IDs

* **`account_id`** — Settings → Account Settings → Account ID (also visible in your dbt Cloud URL: `cloud.getdbt.com/deploy/{account_id}/...`).
* **`project_id`** — Deploy → Jobs → select any job → the URL contains `/projects/{project_id}/`.
* **`environment_id`** — Deploy → Environments → select the target environment → the URL contains `/environments/{environment_id}`.
* **`defer_to_job_id`** — Deploy → Jobs → select your production job → the URL ends with `/jobs/{job_id}`.

### Parameter reference

| Parameter                | Type                  | Default                  | Description                                                                    |
| ------------------------ | --------------------- | ------------------------ | ------------------------------------------------------------------------------ |
| `credentials`            | `DbtCloudCredentials` | *required*               | Block providing `api_key` and `account_id` for dbt Cloud API authentication.   |
| `project_id`             | `int`                 | *required*               | Numeric dbt Cloud project ID.                                                  |
| `environment_id`         | `int`                 | *required*               | Numeric dbt Cloud environment ID where ephemeral jobs run.                     |
| `job_name_prefix`        | `str`                 | `"prefect-orchestrator"` | Prefix for ephemeral job names shown in the dbt Cloud UI.                      |
| `timeout_seconds`        | `int`                 | `900`                    | Max seconds to poll a dbt Cloud run before raising `TimeoutError`.             |
| `poll_frequency_seconds` | `int`                 | `10`                     | Seconds between status polls.                                                  |
| `threads`                | `int \| None`         | `None`                   | If set, appends `--threads N` to all dbt commands.                             |
| `defer_to_job_id`        | `int \| None`         | `None`                   | Fetch manifest from this job's last successful run instead of compiling fresh. |

### Manifest resolution

The executor needs a `manifest.json` to parse the dbt DAG. How it gets one depends on `defer_to_job_id`:

* **`defer_to_job_id` is set** — fetches `manifest.json` from the most recent successful run of that job via `GET /jobs/{job_id}/artifacts/manifest.json`. This is the recommended approach for production — point it at your production job to reuse its compiled manifest.
* **`defer_to_job_id` is `None`** — creates an ephemeral `dbt compile` job, runs it, fetches the resulting manifest, and deletes the job. Useful for CI or when you need a fresh manifest, but adds compilation time.

<Tip>
  For the fastest builds, set `defer_to_job_id` to your production job. This avoids the ephemeral compile step and reuses the manifest your production environment already generates.
</Tip>

## Performance notes

* **Connection pooling.** In `PER_NODE` mode, dbt adapter connections are automatically pooled and reused across sequential node invocations within each worker process, reducing connection overhead.
* **Eager DAG scheduling.** In `PER_NODE` mode, nodes are submitted to the task runner as soon as all their individual dependencies complete, rather than waiting for wave boundaries. This maximizes concurrency without any configuration changes.

## Comparison with `PrefectDbtRunner`

| Feature                 | `PrefectDbtRunner`                | `PrefectDbtOrchestrator`                      |
| ----------------------- | --------------------------------- | --------------------------------------------- |
| **Execution model**     | Reactive — dbt controls execution | Proactive — Prefect controls execution        |
| **Per-node retries**    | No                                | Yes (`PER_NODE` mode)                         |
| **Concurrency control** | dbt's internal threading          | Prefect-native (process pool or named limits) |
| **Failure isolation**   | Entire build fails                | Per-node or per-wave with downstream skipping |
| **Test strategies**     | No                                | Yes (`IMMEDIATE` / `DEFERRED` / `SKIP`)       |
| **Cross-run caching**   | No                                | Yes (`PER_NODE` mode)                         |
| **Asset lineage**       | Yes                               | Yes (`PER_NODE` mode)                         |
| **Source freshness**    | No                                | Yes                                           |
| **Dry-run planning**    | No                                | Yes (`plan()` method)                         |
| **dbt Cloud execution** | No                                | Yes (`DbtCloudExecutor`)                      |
| **Best for**            | Simple builds, maximum speed      | Production reliability, CI/CD, complex DAGs   |

Both coexist in the same project — `PrefectDbtRunner` is unchanged. See [`PrefectDbtRunner`](/integrations/prefect-dbt/runner) for its dedicated guide.
