Documentation Index
Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
Use this file to discover all available pages before exploring further.
Beta. The PrefectDbtOrchestrator, ExecutionMode, TestStrategy, CacheConfig, and BuildPlan symbols currently live in a private module (prefect_dbt.core._orchestrator) and will move to public exports in a future release. The API is stable enough for production use, but import paths may change — watch the release notes for the public-export announcement.
Overview
The PrefectDbtOrchestrator gives Prefect full control over dbt DAG execution. Instead of running dbt build as a single batch operation, the orchestrator parses your manifest, computes execution waves, and runs each node or wave as a Prefect task — enabling per-node retries, Prefect-native concurrency control, automatic downstream skipping on failure, and fine-grained observability.
If you instead want dbt to drive execution with Prefect observing from the outside, see PrefectDbtRunner.
Install
PrefectDbtOrchestrator first shipped in prefect-dbt 0.7.17, but this guide exercises APIs that landed later. Pin prefect-dbt>=0.7.23 to match every example on this page:
pip install "prefect[dbt]" "prefect-dbt>=0.7.23"
Feature floors, for reference:
| Feature | Minimum version |
|---|
PrefectDbtOrchestrator class, ExecutionMode, TestStrategy, run_build() | 0.7.17 |
DbtCloudExecutor | 0.7.19 |
CacheConfig, plan() / BuildPlan | 0.7.20 |
raise_on_failure constructor argument | 0.7.23 |
See the integration overview for adapter-specific extras.
Quick start
from pathlib import Path
from prefect import flow
from prefect_dbt import PrefectDbtSettings
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, ExecutionMode
@flow
def run_dbt_build():
settings = PrefectDbtSettings(
project_dir=Path("./my_dbt_project"),
profiles_dir=Path.home() / ".dbt",
)
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_NODE,
concurrency=4, # max 4 nodes running in parallel
threads=2, # dbt threads within each node
retries=3, # each node retries up to 3 times
retry_delay_seconds=60,
)
# manifest.json is auto-generated if not found
results = orchestrator.run_build(select="tag:daily")
by_status = {}
for r in results.values():
by_status[r["status"]] = by_status.get(r["status"], 0) + 1
print(f"Results: {by_status}")
run_build() automatically runs dbt parse to generate manifest.json if none is found in the project’s target directory. Pass manifest_path directly to the orchestrator constructor to use a pre-built manifest (e.g. from a CI artifact).
When the default DbtCoreExecutor has to generate a manifest itself (no manifest.json at the target path and no manifest_path passed), it first runs dbt deps to install packages declared in packages.yml, then dbt parse. If you pass a pre-built manifest_path or supply a custom executor, this does not happen — make sure your image already has the required dbt packages installed. To keep the default executor but skip the automatic dbt deps:from prefect_dbt.core._executor import DbtCoreExecutor
orchestrator = PrefectDbtOrchestrator(
settings=settings,
executor=DbtCoreExecutor(settings, run_deps=False),
)
Always pass settings= alongside a custom executor= — the orchestrator itself still uses settings.project_dir and settings.profiles_dir for selector resolution, source freshness, and artifact paths. Omitting it makes the orchestrator fall back to a default PrefectDbtSettings(), which only works when the current working directory already contains a valid dbt project and profile.
By default, run_build() raises DbtBuildFailed if any node finishes with "error" status, so a failing dbt build fails the enclosing flow run. Pass raise_on_failure=False to the orchestrator to inspect the results dict on partial failures instead.
Execution modes
PER_WAVE (default)
Each wave of independent nodes runs as a single dbt build invocation. Lowest overhead. Nodes inside a wave that finish with per-node artifacts still appear in the results dict with their real status — only the nodes that actually failed are marked "error" — but once any node in a wave fails, all downstream waves are skipped as a group.
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_WAVE,
)
PER_NODE
Each node becomes a separate Prefect task in its own subprocess. Failed nodes retry independently; other nodes in the same wave continue; downstream dependents of failed nodes are skipped.
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_NODE,
retries=2,
concurrency=4,
)
| Mode | Overhead | Best for |
|---|
PER_WAVE | Lower (one dbt invocation per wave) | CI/CD, dev iterations, stable DAGs |
PER_NODE | Higher (one invocation per node) | Production, flaky nodes, fine-grained control |
retries and cache are only supported in PER_NODE mode. Passing either with PER_WAVE raises ValueError.
run_build() returns a dict mapping unique_id to a result. Each result has a "status" of "success", "cached", "error", or "skipped":
{
"model.analytics.stg_users": {
"status": "success",
"timing": {"started_at": "...", "completed_at": "...", "duration_seconds": 5.2},
"invocation": {"command": "build", "args": ["--select", "..."]},
},
"model.analytics.stg_orders": {
"status": "error",
"error": {"message": "relation does not exist", "type": "RuntimeError"},
},
"model.analytics.dim_products": {
"status": "cached",
},
"model.analytics.order_summary": {
"status": "skipped",
"reason": "upstream failure",
"failed_upstream": ["model.analytics.stg_orders"],
},
}
Test strategies
The test_strategy parameter controls when dbt tests run relative to models. The default is IMMEDIATE, matching dbt build semantics.
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, TestStrategy
orchestrator = PrefectDbtOrchestrator(
settings=settings,
test_strategy=TestStrategy.IMMEDIATE, # default
)
| Strategy | Behavior |
|---|
IMMEDIATE (default) | Tests run interleaved with models — each test runs in the wave after all its parent models complete. Matches dbt build semantics. |
DEFERRED | All model waves execute first, then all tests run together in a final wave. |
SKIP | Tests are excluded from execution entirely. |
Use DEFERRED when you want to see all model results before deciding whether to run tests, or to parallelize model execution more aggressively. Use SKIP for fast iteration when you want to skip test overhead.
Caching
Enable cross-run caching to skip nodes whose SQL, config, and upstream dependencies haven’t changed since the last successful run. Only available in PER_NODE mode.
Pass a CacheConfig instance to consolidate all caching options:
from datetime import timedelta
from prefect_dbt.core._orchestrator import (
PrefectDbtOrchestrator,
CacheConfig,
ExecutionMode,
)
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_NODE,
cache=CacheConfig(
expiration=timedelta(hours=24), # invalidate after 24h
),
)
The cache key for each node is derived from:
- the hash of the node’s source SQL (or CSV for seeds) — not the compiled SQL, so config changes that only affect compilation via vars or environment will not invalidate the key on their own
- the node’s dbt config
- the node’s relation name
- the
full_refresh flag
- the macros the node depends on (each macro’s content is hashed and folded in)
- upstream nodes’ cache keys (changes cascade downstream)
When a cached result is valid, the node’s task returns immediately without invoking dbt. Cached results appear in the result dict with "status": "cached" (see Result format above).
CacheConfig fields
| Field | Type | Default | Description |
|---|
expiration | timedelta | None | None | Global TTL for cached results. None means no expiration. |
result_storage | Path | str | None | None | Storage for cached results. A Path for local storage, a block slug (e.g. "s3-bucket/my-results") for remote, or None for default. |
key_storage | Path | str | None | None | Storage for cache keys. Same format as result_storage. |
use_source_freshness_expiration | bool | False | Derive each node’s TTL from its upstream source freshness thresholds, overriding expiration per-node. |
exclude_materializations | frozenset[str] | {"incremental"} | Materializations excluded from caching. |
exclude_resource_types | frozenset[NodeType] | {Test, Snapshot, Unit} | Resource types excluded from caching. |
Custom cache storage
Block slugs follow the format "block-type/block-name" (for example "s3-bucket/my-results"). The block must be saved in Prefect before use. See How to persist workflow results for a walkthrough on creating and saving storage blocks.
from pathlib import Path
# Option 1: local path
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_NODE,
cache=CacheConfig(
result_storage=Path("/tmp/dbt-results"),
key_storage=Path("/tmp/dbt-cache-keys"),
),
)
# Option 2: block slug (block must be saved in Prefect)
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_NODE,
cache=CacheConfig(
result_storage="s3-bucket/my-results-block",
key_storage="s3-bucket/my-cache-keys-block",
),
)
Cache expiration from source freshness
When use_source_freshness_expiration=True, each node’s cache TTL is derived from the freshness thresholds of its upstream sources — a node backed by an hourly source won’t be cached longer than the source’s warn threshold.
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_NODE,
cache=CacheConfig(
use_source_freshness_expiration=True, # overrides expiration per node
),
)
use_source_freshness_expiration overrides the global expiration on a per-node basis.
Dry-run with plan()
Preview what run_build() would execute without actually running dbt models. The plan() method performs manifest parsing, selector resolution, and wave computation, but only invokes dbt ls (for selector resolution) and dbt source freshness (when enabled) — no models are built.
plan() is not free on the first call. With the default DbtCoreExecutor, if no manifest.json exists at the target path and no manifest_path was passed, the executor will run dbt deps followed by dbt parse before wave computation. To make plan() cheap, point manifest_path at a pre-built manifest (e.g. a CI artifact).
from prefect_dbt.core._orchestrator import (
PrefectDbtOrchestrator,
ExecutionMode,
CacheConfig,
)
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_NODE,
cache=CacheConfig(),
)
plan = orchestrator.plan(select="tag:daily")
print(plan)
# BuildPlan: 12 node(s) in 4 wave(s) | max parallelism = 5
#
# Wave 0 (3 node(s)):
# - model.analytics.stg_users [model, view] (cache: hit)
# - model.analytics.stg_orders [model, view] (cache: miss)
# ...
#
# Cache: 7 hit(s), 3 miss(es), 2 excluded
The returned BuildPlan includes:
| Field | Description |
|---|
waves | Execution waves in topological order, each containing the nodes that can run in parallel. |
node_count | Total nodes across all waves. |
cache_predictions | Per-node "hit" / "miss" / "excluded" predictions (only when caching is configured). |
skipped_nodes | Nodes dropped by source-freshness checks, with reason. Nodes that don’t match the select / exclude filters are pruned from the graph and are not surfaced here. |
estimated_parallelism | Width of the largest wave. Accurate under strict wave-barrier execution; in PER_NODE mode, eager scheduling can let nodes from wave N+1 start before all of wave N has finished, so peak concurrency may briefly exceed this. |
plan() accepts the same parameters as run_build(): select, exclude, full_refresh, only_fresh_sources, target, and extra_cli_args.
Artifacts and asset tracking
Summary artifact
When invoked from inside a flow run, run_build() creates a Prefect markdown artifact summarizing results — nodes executed, statuses, and timings. You can optionally write a dbt-compatible run_results.json:
orchestrator = PrefectDbtOrchestrator(
settings=settings,
create_summary_artifact=True, # default — set False to disable
write_run_results=True, # write run_results.json to target_path
)
include_compiled_code=True does not affect the summary artifact — the artifact is rendered from the results dict alone. The flag currently only adds compiled SQL to the asset descriptions attached to each MaterializingTask in PER_NODE mode.
Output locations:
- The summary artifact is created with key
dbt-orchestrator-summary. Creation requires an active flow run context — if you call run_build() from a plain Python script (supported in PER_WAVE mode), the summary step is skipped even with create_summary_artifact=True.
run_results.json is written inside settings.target_path (defaults to <project_dir>/target/run_results.json, but moves with target_path or a custom manifest_path).
To access the artifact programmatically, filter by key via ArtifactFilter — read_artifacts ignores unknown kwargs, so a bare key=... argument silently returns every artifact in the workspace:
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey
async with get_client() as client:
artifacts = await client.read_artifacts(
artifact_filter=ArtifactFilter(key=ArtifactFilterKey(any_=["dbt-orchestrator-summary"])),
)
The artifact key dbt-orchestrator-summary is fixed and not currently configurable.
Prefect asset lineage
In PER_NODE mode, each model, seed, and snapshot creates a Prefect asset and tracks upstream lineage automatically — integrating dbt’s data lineage into Prefect’s asset catalog.
Assets are named using the relation name (for example snowflake://my_db/analytics/stg_users). Upstream assets, including dbt sources, are tracked as asset dependencies.
To disable asset creation:
orchestrator = PrefectDbtOrchestrator(
settings=settings,
execution_mode=ExecutionMode.PER_NODE,
disable_assets=True,
)
run_build() parameters
target
Override the dbt target at run time without changing profiles.yml:
results = orchestrator.run_build(
select="tag:daily",
target="prod",
)
Pass additional CLI flags directly to dbt invocations. Useful for flags not exposed as first-class orchestrator parameters:
results = orchestrator.run_build(
select="tag:daily",
extra_cli_args=["--vars", '{"run_date": "2024-01-15"}', "--no-partial-parse"],
)
Some flags are blocked because they conflict with the orchestrator’s internal logic and passing them raises ValueError:--select, --models, --exclude, --selector, --indirect-selection, --project-dir, --target-path, --profiles-dir, --log-level.Others are first-class parameters — use the orchestrator directly instead:| CLI flag | Use instead |
|---|
--full-refresh | run_build(full_refresh=True) |
--target | run_build(target="...") |
--threads | PrefectDbtOrchestrator(threads=N) |
--defer | PrefectDbtOrchestrator(defer=True) |
--state | PrefectDbtOrchestrator(state_path=Path(...)) |
--defer-state | PrefectDbtOrchestrator(defer_state_path=...) |
--favor-state | PrefectDbtOrchestrator(favor_state=True) |
Caveat flags (--resource-type, --exclude-resource-type, --fail-fast) are accepted but log a warning because they can silently interact with orchestrator-managed test scheduling.
State-based execution (CI/CD)
Use dbt’s state flags to run only modified models and defer to production for everything else:
@flow
def run_ci_incremental():
orchestrator = PrefectDbtOrchestrator(
settings=PrefectDbtSettings(project_dir=Path("./analytics")),
state_path=Path("./prod_artifacts"),
defer=True,
execution_mode=ExecutionMode.PER_WAVE,
)
return orchestrator.run_build(select="state:modified+")
Also supports defer_state_path and favor_state for more advanced deferral scenarios.
dbt Cloud execution
The DbtCloudExecutor lets you run dbt nodes via dbt Cloud ephemeral jobs instead of local dbt-core. The orchestrator creates a temporary job in dbt Cloud for each node (or wave), polls until completion, fetches results, and deletes the job — giving you all orchestrator features (retries, caching, test strategies, assets) backed by dbt Cloud’s managed infrastructure.
Credentials setup
Create or load a DbtCloudCredentials block with your dbt Cloud API key and account ID:
from prefect_dbt.cloud import DbtCloudCredentials
# Option 1: create inline
credentials = DbtCloudCredentials(
api_key="dbc_your_api_key_here",
account_id=12345,
)
# Option 2: load a saved block
credentials = DbtCloudCredentials.load("my-dbt-cloud-creds")
Quick start
from pathlib import Path
from prefect import flow
from prefect_dbt import PrefectDbtSettings
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudExecutor
from prefect_dbt.core._orchestrator import PrefectDbtOrchestrator, ExecutionMode
@flow
def run_dbt_cloud_build():
credentials = DbtCloudCredentials.load("my-dbt-cloud-creds")
executor = DbtCloudExecutor(
credentials=credentials,
project_id=123456,
environment_id=654321,
defer_to_job_id=789012, # fetch manifest from this job's last run
)
orchestrator = PrefectDbtOrchestrator(
executor=executor,
settings=PrefectDbtSettings(
project_dir=Path("./my_dbt_project"),
profiles_dir=Path.home() / ".dbt",
),
execution_mode=ExecutionMode.PER_NODE,
concurrency=4,
retries=2,
retry_delay_seconds=30,
)
return orchestrator.run_build(select="tag:daily")
Even with a DbtCloudExecutor, the orchestrator still reads settings.project_dir and settings.profiles_dir for dbt build --select resolution, source-freshness handling, and artifact paths. Pass an explicit settings=PrefectDbtSettings(...) pointing at a local checkout of the same project — omitting it only works when the flow already runs inside a valid local dbt project with a usable default profile. The orchestrator’s state_path, defer, defer_state_path, and favor_state parameters, however, are specific to DbtCoreExecutor and have no effect when a custom executor is used.
Finding your dbt Cloud IDs
account_id — Settings → Account Settings → Account ID (also visible in your dbt Cloud URL: cloud.getdbt.com/deploy/{account_id}/...).
project_id — Deploy → Jobs → select any job → the URL contains /projects/{project_id}/.
environment_id — Deploy → Environments → select the target environment → the URL contains /environments/{environment_id}.
defer_to_job_id — Deploy → Jobs → select your production job → the URL ends with /jobs/{job_id}.
Parameter reference
| Parameter | Type | Default | Description |
|---|
credentials | DbtCloudCredentials | required | Block providing api_key and account_id for dbt Cloud API authentication. |
project_id | int | required | Numeric dbt Cloud project ID. |
environment_id | int | required | Numeric dbt Cloud environment ID where ephemeral jobs run. |
job_name_prefix | str | "prefect-orchestrator" | Prefix for ephemeral job names shown in the dbt Cloud UI. |
timeout_seconds | int | 900 | Max seconds to poll a dbt Cloud run before raising TimeoutError. |
poll_frequency_seconds | int | 10 | Seconds between status polls. |
threads | int | None | None | If set, appends --threads N to all dbt commands. |
defer_to_job_id | int | None | None | Fetch manifest from this job’s last successful run instead of compiling fresh. |
Manifest resolution
The executor needs a manifest.json to parse the dbt DAG. How it gets one depends on defer_to_job_id:
defer_to_job_id is set — fetches manifest.json from the most recent successful run of that job via GET /jobs/{job_id}/artifacts/manifest.json. This is the recommended approach for production — point it at your production job to reuse its compiled manifest.
defer_to_job_id is None — creates an ephemeral dbt compile job, runs it, fetches the resulting manifest, and deletes the job. Useful for CI or when you need a fresh manifest, but adds compilation time.
For the fastest builds, set defer_to_job_id to your production job. This avoids the ephemeral compile step and reuses the manifest your production environment already generates.
- Connection pooling. In
PER_NODE mode, dbt adapter connections are automatically pooled and reused across sequential node invocations within each worker process, reducing connection overhead.
- Eager DAG scheduling. In
PER_NODE mode, nodes are submitted to the task runner as soon as all their individual dependencies complete, rather than waiting for wave boundaries. This maximizes concurrency without any configuration changes.
Comparison with PrefectDbtRunner
| Feature | PrefectDbtRunner | PrefectDbtOrchestrator |
|---|
| Execution model | Reactive — dbt controls execution | Proactive — Prefect controls execution |
| Per-node retries | No | Yes (PER_NODE mode) |
| Concurrency control | dbt’s internal threading | Prefect-native (process pool or named limits) |
| Failure isolation | Entire build fails | Per-node or per-wave with downstream skipping |
| Test strategies | No | Yes (IMMEDIATE / DEFERRED / SKIP) |
| Cross-run caching | No | Yes (PER_NODE mode) |
| Asset lineage | Yes | Yes (PER_NODE mode) |
| Source freshness | No | Yes |
| Dry-run planning | No | Yes (plan() method) |
| dbt Cloud execution | No | Yes (DbtCloudExecutor) |
| Best for | Simple builds, maximum speed | Production reliability, CI/CD, complex DAGs |
Both coexist in the same project — PrefectDbtRunner is unchanged. See PrefectDbtRunner for its dedicated guide.