Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.prefect.io/llms.txt

Use this file to discover all available pages before exploring further.

Beta. The PrefectDbtOrchestrator, ExecutionMode, TestStrategy, CacheConfig, and BuildPlan symbols currently live in a private module (prefect_dbt.core._orchestrator) and will move to public exports in a future release. The API is stable enough for production use, but import paths may change — watch the release notes for the public-export announcement.

Overview

The PrefectDbtOrchestrator gives Prefect full control over dbt DAG execution. Instead of running dbt build as a single batch operation, the orchestrator parses your manifest, computes execution waves, and runs each node or wave as a Prefect task — enabling per-node retries, Prefect-native concurrency control, automatic downstream skipping on failure, and fine-grained observability. If you instead want dbt to drive execution with Prefect observing from the outside, see PrefectDbtRunner.

Install

PrefectDbtOrchestrator first shipped in prefect-dbt 0.7.17, but this guide exercises APIs that landed later. Pin prefect-dbt>=0.7.23 to match every example on this page:
pip install "prefect[dbt]" "prefect-dbt>=0.7.23"
Feature floors, for reference:
FeatureMinimum version
PrefectDbtOrchestrator class, ExecutionMode, TestStrategy, run_build()0.7.17
DbtCloudExecutor0.7.19
CacheConfig, plan() / BuildPlan0.7.20
raise_on_failure constructor argument0.7.23
See the integration overview for adapter-specific extras.

Quick start

from pathlib import Path
from prefect import flow
from prefect_dbt import PrefectDbtSettings
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, ExecutionMode


@flow
def run_dbt_build():
    settings = PrefectDbtSettings(
        project_dir=Path("./my_dbt_project"),
        profiles_dir=Path.home() / ".dbt",
    )

    orchestrator = PrefectDbtOrchestrator(
        settings=settings,
        execution_mode=ExecutionMode.PER_NODE,
        concurrency=4,           # max 4 nodes running in parallel
        threads=2,               # dbt threads within each node
        retries=3,               # each node retries up to 3 times
        retry_delay_seconds=60,
    )

    # manifest.json is auto-generated if not found
    results = orchestrator.run_build(select="tag:daily")

    by_status = {}
    for r in results.values():
        by_status[r["status"]] = by_status.get(r["status"], 0) + 1
    print(f"Results: {by_status}")
run_build() automatically runs dbt parse to generate manifest.json if none is found in the project’s target directory. Pass manifest_path directly to the orchestrator constructor to use a pre-built manifest (e.g. from a CI artifact).
When the default DbtCoreExecutor has to generate a manifest itself (no manifest.json at the target path and no manifest_path passed), it first runs dbt deps to install packages declared in packages.yml, then dbt parse. If you pass a pre-built manifest_path or supply a custom executor, this does not happen — make sure your image already has the required dbt packages installed. To keep the default executor but skip the automatic dbt deps:
from prefect_dbt.core._executor import DbtCoreExecutor

orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    executor=DbtCoreExecutor(settings, run_deps=False),
)
Always pass settings= alongside a custom executor= — the orchestrator itself still uses settings.project_dir and settings.profiles_dir for selector resolution, source freshness, and artifact paths. Omitting it makes the orchestrator fall back to a default PrefectDbtSettings(), which only works when the current working directory already contains a valid dbt project and profile.
By default, run_build() raises DbtBuildFailed if any node finishes with "error" status, so a failing dbt build fails the enclosing flow run. Pass raise_on_failure=False to the orchestrator to inspect the results dict on partial failures instead.

Execution modes

PER_WAVE (default)

Each wave of independent nodes runs as a single dbt build invocation. Lowest overhead. Nodes inside a wave that finish with per-node artifacts still appear in the results dict with their real status — only the nodes that actually failed are marked "error" — but once any node in a wave fails, all downstream waves are skipped as a group.
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_WAVE,
)

PER_NODE

Each node becomes a separate Prefect task in its own subprocess. Failed nodes retry independently; other nodes in the same wave continue; downstream dependents of failed nodes are skipped.
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    retries=2,
    concurrency=4,
)
ModeOverheadBest for
PER_WAVELower (one dbt invocation per wave)CI/CD, dev iterations, stable DAGs
PER_NODEHigher (one invocation per node)Production, flaky nodes, fine-grained control
retries and cache are only supported in PER_NODE mode. Passing either with PER_WAVE raises ValueError.

Result format

run_build() returns a dict mapping unique_id to a result. Each result has a "status" of "success", "cached", "error", or "skipped":
{
    "model.analytics.stg_users": {
        "status": "success",
        "timing": {"started_at": "...", "completed_at": "...", "duration_seconds": 5.2},
        "invocation": {"command": "build", "args": ["--select", "..."]},
    },
    "model.analytics.stg_orders": {
        "status": "error",
        "error": {"message": "relation does not exist", "type": "RuntimeError"},
    },
    "model.analytics.dim_products": {
        "status": "cached",
    },
    "model.analytics.order_summary": {
        "status": "skipped",
        "reason": "upstream failure",
        "failed_upstream": ["model.analytics.stg_orders"],
    },
}

Test strategies

The test_strategy parameter controls when dbt tests run relative to models. The default is IMMEDIATE, matching dbt build semantics.
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, TestStrategy

orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    test_strategy=TestStrategy.IMMEDIATE,  # default
)
StrategyBehavior
IMMEDIATE (default)Tests run interleaved with models — each test runs in the wave after all its parent models complete. Matches dbt build semantics.
DEFERREDAll model waves execute first, then all tests run together in a final wave.
SKIPTests are excluded from execution entirely.
Use DEFERRED when you want to see all model results before deciding whether to run tests, or to parallelize model execution more aggressively. Use SKIP for fast iteration when you want to skip test overhead.

Caching

Enable cross-run caching to skip nodes whose SQL, config, and upstream dependencies haven’t changed since the last successful run. Only available in PER_NODE mode. Pass a CacheConfig instance to consolidate all caching options:
from datetime import timedelta
from prefect_dbt.core._orchestrator import (
    PrefectDbtOrchestrator,
    CacheConfig,
    ExecutionMode,
)

orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(
        expiration=timedelta(hours=24),  # invalidate after 24h
    ),
)
The cache key for each node is derived from:
  • the hash of the node’s source SQL (or CSV for seeds) — not the compiled SQL, so config changes that only affect compilation via vars or environment will not invalidate the key on their own
  • the node’s dbt config
  • the node’s relation name
  • the full_refresh flag
  • the macros the node depends on (each macro’s content is hashed and folded in)
  • upstream nodes’ cache keys (changes cascade downstream)
When a cached result is valid, the node’s task returns immediately without invoking dbt. Cached results appear in the result dict with "status": "cached" (see Result format above).

CacheConfig fields

FieldTypeDefaultDescription
expirationtimedelta | NoneNoneGlobal TTL for cached results. None means no expiration.
result_storagePath | str | NoneNoneStorage for cached results. A Path for local storage, a block slug (e.g. "s3-bucket/my-results") for remote, or None for default.
key_storagePath | str | NoneNoneStorage for cache keys. Same format as result_storage.
use_source_freshness_expirationboolFalseDerive each node’s TTL from its upstream source freshness thresholds, overriding expiration per-node.
exclude_materializationsfrozenset[str]{"incremental"}Materializations excluded from caching.
exclude_resource_typesfrozenset[NodeType]{Test, Snapshot, Unit}Resource types excluded from caching.

Custom cache storage

Block slugs follow the format "block-type/block-name" (for example "s3-bucket/my-results"). The block must be saved in Prefect before use. See How to persist workflow results for a walkthrough on creating and saving storage blocks.
from pathlib import Path

# Option 1: local path
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(
        result_storage=Path("/tmp/dbt-results"),
        key_storage=Path("/tmp/dbt-cache-keys"),
    ),
)

# Option 2: block slug (block must be saved in Prefect)
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(
        result_storage="s3-bucket/my-results-block",
        key_storage="s3-bucket/my-cache-keys-block",
    ),
)

Cache expiration from source freshness

When use_source_freshness_expiration=True, each node’s cache TTL is derived from the freshness thresholds of its upstream sources — a node backed by an hourly source won’t be cached longer than the source’s warn threshold.
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(
        use_source_freshness_expiration=True,  # overrides expiration per node
    ),
)
use_source_freshness_expiration overrides the global expiration on a per-node basis.

Dry-run with plan()

Preview what run_build() would execute without actually running dbt models. The plan() method performs manifest parsing, selector resolution, and wave computation, but only invokes dbt ls (for selector resolution) and dbt source freshness (when enabled) — no models are built.
plan() is not free on the first call. With the default DbtCoreExecutor, if no manifest.json exists at the target path and no manifest_path was passed, the executor will run dbt deps followed by dbt parse before wave computation. To make plan() cheap, point manifest_path at a pre-built manifest (e.g. a CI artifact).
from prefect_dbt.core._orchestrator import (
    PrefectDbtOrchestrator,
    ExecutionMode,
    CacheConfig,
)

orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    cache=CacheConfig(),
)

plan = orchestrator.plan(select="tag:daily")
print(plan)
# BuildPlan: 12 node(s) in 4 wave(s)  |  max parallelism = 5
#
#   Wave 0 (3 node(s)):
#     - model.analytics.stg_users [model, view] (cache: hit)
#     - model.analytics.stg_orders [model, view] (cache: miss)
#     ...
#
#   Cache: 7 hit(s), 3 miss(es), 2 excluded
The returned BuildPlan includes:
FieldDescription
wavesExecution waves in topological order, each containing the nodes that can run in parallel.
node_countTotal nodes across all waves.
cache_predictionsPer-node "hit" / "miss" / "excluded" predictions (only when caching is configured).
skipped_nodesNodes dropped by source-freshness checks, with reason. Nodes that don’t match the select / exclude filters are pruned from the graph and are not surfaced here.
estimated_parallelismWidth of the largest wave. Accurate under strict wave-barrier execution; in PER_NODE mode, eager scheduling can let nodes from wave N+1 start before all of wave N has finished, so peak concurrency may briefly exceed this.
plan() accepts the same parameters as run_build(): select, exclude, full_refresh, only_fresh_sources, target, and extra_cli_args.

Artifacts and asset tracking

Summary artifact

When invoked from inside a flow run, run_build() creates a Prefect markdown artifact summarizing results — nodes executed, statuses, and timings. You can optionally write a dbt-compatible run_results.json:
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    create_summary_artifact=True,   # default — set False to disable
    write_run_results=True,         # write run_results.json to target_path
)
include_compiled_code=True does not affect the summary artifact — the artifact is rendered from the results dict alone. The flag currently only adds compiled SQL to the asset descriptions attached to each MaterializingTask in PER_NODE mode.
Output locations:
  • The summary artifact is created with key dbt-orchestrator-summary. Creation requires an active flow run context — if you call run_build() from a plain Python script (supported in PER_WAVE mode), the summary step is skipped even with create_summary_artifact=True.
  • run_results.json is written inside settings.target_path (defaults to <project_dir>/target/run_results.json, but moves with target_path or a custom manifest_path).
To access the artifact programmatically, filter by key via ArtifactFilterread_artifacts ignores unknown kwargs, so a bare key=... argument silently returns every artifact in the workspace:
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey

async with get_client() as client:
    artifacts = await client.read_artifacts(
        artifact_filter=ArtifactFilter(key=ArtifactFilterKey(any_=["dbt-orchestrator-summary"])),
    )
The artifact key dbt-orchestrator-summary is fixed and not currently configurable.

Prefect asset lineage

In PER_NODE mode, each model, seed, and snapshot creates a Prefect asset and tracks upstream lineage automatically — integrating dbt’s data lineage into Prefect’s asset catalog. Assets are named using the relation name (for example snowflake://my_db/analytics/stg_users). Upstream assets, including dbt sources, are tracked as asset dependencies. To disable asset creation:
orchestrator = PrefectDbtOrchestrator(
    settings=settings,
    execution_mode=ExecutionMode.PER_NODE,
    disable_assets=True,
)

run_build() parameters

target

Override the dbt target at run time without changing profiles.yml:
results = orchestrator.run_build(
    select="tag:daily",
    target="prod",
)

extra_cli_args

Pass additional CLI flags directly to dbt invocations. Useful for flags not exposed as first-class orchestrator parameters:
results = orchestrator.run_build(
    select="tag:daily",
    extra_cli_args=["--vars", '{"run_date": "2024-01-15"}', "--no-partial-parse"],
)
Some flags are blocked because they conflict with the orchestrator’s internal logic and passing them raises ValueError:--select, --models, --exclude, --selector, --indirect-selection, --project-dir, --target-path, --profiles-dir, --log-level.Others are first-class parameters — use the orchestrator directly instead:
CLI flagUse instead
--full-refreshrun_build(full_refresh=True)
--targetrun_build(target="...")
--threadsPrefectDbtOrchestrator(threads=N)
--deferPrefectDbtOrchestrator(defer=True)
--statePrefectDbtOrchestrator(state_path=Path(...))
--defer-statePrefectDbtOrchestrator(defer_state_path=...)
--favor-statePrefectDbtOrchestrator(favor_state=True)
Caveat flags (--resource-type, --exclude-resource-type, --fail-fast) are accepted but log a warning because they can silently interact with orchestrator-managed test scheduling.

State-based execution (CI/CD)

Use dbt’s state flags to run only modified models and defer to production for everything else:
@flow
def run_ci_incremental():
    orchestrator = PrefectDbtOrchestrator(
        settings=PrefectDbtSettings(project_dir=Path("./analytics")),
        state_path=Path("./prod_artifacts"),
        defer=True,
        execution_mode=ExecutionMode.PER_WAVE,
    )
    return orchestrator.run_build(select="state:modified+")
Also supports defer_state_path and favor_state for more advanced deferral scenarios.

dbt Cloud execution

The DbtCloudExecutor lets you run dbt nodes via dbt Cloud ephemeral jobs instead of local dbt-core. The orchestrator creates a temporary job in dbt Cloud for each node (or wave), polls until completion, fetches results, and deletes the job — giving you all orchestrator features (retries, caching, test strategies, assets) backed by dbt Cloud’s managed infrastructure.

Credentials setup

Create or load a DbtCloudCredentials block with your dbt Cloud API key and account ID:
from prefect_dbt.cloud import DbtCloudCredentials

# Option 1: create inline
credentials = DbtCloudCredentials(
    api_key="dbc_your_api_key_here",
    account_id=12345,
)

# Option 2: load a saved block
credentials = DbtCloudCredentials.load("my-dbt-cloud-creds")

Quick start

from pathlib import Path
from prefect import flow
from prefect_dbt import PrefectDbtSettings
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudExecutor
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, ExecutionMode


@flow
def run_dbt_cloud_build():
    credentials = DbtCloudCredentials.load("my-dbt-cloud-creds")

    executor = DbtCloudExecutor(
        credentials=credentials,
        project_id=123456,
        environment_id=654321,
        defer_to_job_id=789012,  # fetch manifest from this job's last run
    )

    orchestrator = PrefectDbtOrchestrator(
        executor=executor,
        settings=PrefectDbtSettings(
            project_dir=Path("./my_dbt_project"),
            profiles_dir=Path.home() / ".dbt",
        ),
        execution_mode=ExecutionMode.PER_NODE,
        concurrency=4,
        retries=2,
        retry_delay_seconds=30,
    )

    return orchestrator.run_build(select="tag:daily")
Even with a DbtCloudExecutor, the orchestrator still reads settings.project_dir and settings.profiles_dir for dbt build --select resolution, source-freshness handling, and artifact paths. Pass an explicit settings=PrefectDbtSettings(...) pointing at a local checkout of the same project — omitting it only works when the flow already runs inside a valid local dbt project with a usable default profile. The orchestrator’s state_path, defer, defer_state_path, and favor_state parameters, however, are specific to DbtCoreExecutor and have no effect when a custom executor is used.

Finding your dbt Cloud IDs

  • account_id — Settings → Account Settings → Account ID (also visible in your dbt Cloud URL: cloud.getdbt.com/deploy/{account_id}/...).
  • project_id — Deploy → Jobs → select any job → the URL contains /projects/{project_id}/.
  • environment_id — Deploy → Environments → select the target environment → the URL contains /environments/{environment_id}.
  • defer_to_job_id — Deploy → Jobs → select your production job → the URL ends with /jobs/{job_id}.

Parameter reference

ParameterTypeDefaultDescription
credentialsDbtCloudCredentialsrequiredBlock providing api_key and account_id for dbt Cloud API authentication.
project_idintrequiredNumeric dbt Cloud project ID.
environment_idintrequiredNumeric dbt Cloud environment ID where ephemeral jobs run.
job_name_prefixstr"prefect-orchestrator"Prefix for ephemeral job names shown in the dbt Cloud UI.
timeout_secondsint900Max seconds to poll a dbt Cloud run before raising TimeoutError.
poll_frequency_secondsint10Seconds between status polls.
threadsint | NoneNoneIf set, appends --threads N to all dbt commands.
defer_to_job_idint | NoneNoneFetch manifest from this job’s last successful run instead of compiling fresh.

Manifest resolution

The executor needs a manifest.json to parse the dbt DAG. How it gets one depends on defer_to_job_id:
  • defer_to_job_id is set — fetches manifest.json from the most recent successful run of that job via GET /jobs/{job_id}/artifacts/manifest.json. This is the recommended approach for production — point it at your production job to reuse its compiled manifest.
  • defer_to_job_id is None — creates an ephemeral dbt compile job, runs it, fetches the resulting manifest, and deletes the job. Useful for CI or when you need a fresh manifest, but adds compilation time.
For the fastest builds, set defer_to_job_id to your production job. This avoids the ephemeral compile step and reuses the manifest your production environment already generates.

Performance notes

  • Connection pooling. In PER_NODE mode, dbt adapter connections are automatically pooled and reused across sequential node invocations within each worker process, reducing connection overhead.
  • Eager DAG scheduling. In PER_NODE mode, nodes are submitted to the task runner as soon as all their individual dependencies complete, rather than waiting for wave boundaries. This maximizes concurrency without any configuration changes.

Comparison with PrefectDbtRunner

FeaturePrefectDbtRunnerPrefectDbtOrchestrator
Execution modelReactive — dbt controls executionProactive — Prefect controls execution
Per-node retriesNoYes (PER_NODE mode)
Concurrency controldbt’s internal threadingPrefect-native (process pool or named limits)
Failure isolationEntire build failsPer-node or per-wave with downstream skipping
Test strategiesNoYes (IMMEDIATE / DEFERRED / SKIP)
Cross-run cachingNoYes (PER_NODE mode)
Asset lineageYesYes (PER_NODE mode)
Source freshnessNoYes
Dry-run planningNoYes (plan() method)
dbt Cloud executionNoYes (DbtCloudExecutor)
Best forSimple builds, maximum speedProduction reliability, CI/CD, complex DAGs
Both coexist in the same project — PrefectDbtRunner is unchanged. See PrefectDbtRunner for its dedicated guide.