prefect-dbt, you can trigger and observe dbt Cloud jobs, execute dbt Core CLI commands, and incorporate other tools, such as Snowflake, into your dbt runs.
Prefect provides a global view of the state of your workflows and allows you to take action based on state changes.
Prefect integrations may provide pre-built blocks, flows, or tasks for interacting with external systems.
Block types in this library allow you to do things such as run a dbt Cloud job or execute a dbt Core command.
Getting started
Prerequisites
- A dbt Cloud account if using dbt Cloud.
Install prefect-dbt
The following command will install a version of prefect-dbt compatible with your installed version of prefect.
If you don’t already have prefect installed, it will install the newest version of prefect as well.
prefect and prefect-dbt:
Register newly installed blocks types
Register the block types in theprefect-dbt module to make them available for use.
dbt Cloud
If you have an existing dbt Cloud job, use the pre-built flowrun_dbt_cloud_job to trigger a job run and wait until the job run is finished. If some nodes fail, run_dbt_cloud_job can efficiently retry the unsuccessful nodes. Prior to running this flow, save your dbt Cloud credentials to a DbtCloudCredentials block and create a dbt Cloud Job block:
Save dbt Cloud credentials to a block
Blocks can be created through code or through the UI. To create a dbt Cloud Credentials block:- Log into your dbt Cloud account.
- Click API Tokens on the sidebar.
- Copy a Service Token.
- Copy the account ID from the URL:
https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>. - Create and run the following script, replacing the placeholders:
Create a dbt Cloud job block
- In dbt Cloud, click on Deploy -> Jobs.
- Select a job.
- Copy the job ID from the URL:
https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID> - Create and run the following script, replacing the placeholders.
Run a dbt Cloud job and wait for completion
dbt Core
prefect-dbt 0.7.0 and later
Versions 0.7.0 and later ofprefect-dbt include the PrefectDbtRunner class, which provides an improved interface for running dbt Core commands with better logging, failure handling, and automatic asset lineage.
The
PrefectDbtRunner is inspired by the DbtRunner from dbt Core, and its invoke method accepts the same arguments.
Refer to the DbtRunner documentation for more information on how to call invoke..invoke() in a flow or task, each node in dbt’s execution graph is reflected as a task in Prefect’s execution graph.
Logs from each node will belong to the corresponding task, and each task’s state is determined by the state of that node’s execution.
The task runs created by calling
.invoke() run separately from dbt Core, and do not affect dbt’s execution behavior.
These tasks do not persist results and cannot be cached.Use dbt’s native retry functionality in combination with runtime data from prefect to retry failed nodes.Assets
Prefect Cloud maintains a graph of assets, objects produced by your workflows. Any dbt seed, source or model will appear on your asset graph in Prefect Cloud once it has been executed using thePrefectDbtRunner.
The upstream dependencies of an asset materialized by prefect-dbt are derived from the depends_on field in dbt’s manifest.json.
The asset’s key will be its corresponding dbt resource’s relation_name.
The name and description asset properties are populated by a dbt resource’s name description.
The owners asset property is populated if there is data assigned to the owner key under a resoure’s meta config.
dbt settings
ThePrefectDbtSettings class, based on Pydantic’s BaseSettings class, automatically detects DBT_-prefixed environment variables that have a direct effect on the PrefectDbtRunner class.
If no environment variables are set, dbt’s defaults are used.
Provide a PrefectDbtSettings instance to PrefectDbtRunner to customize dbt settings or override environment variables.
Logging
ThePrefectDbtRunner class maps all dbt log levels to standard Python logging levels, so filtering for log levels like WARNING or ERROR in the Prefect UI applies to dbt’s logs.
By default, the logging level used by dbt is Prefect’s logging level, which can be configured using the PREFECT_LOGGING_LEVEL Prefect setting.
The dbt logging level can be set independently from Prefect’s by using the DBT_LOG_LEVEL environment variable, setting log_level in PrefectDbtSettings, or passing the --log-level flag or log_level kwarg to .invoke().
Only logging levels of higher severity (more restrictive) than Prefect’s logging level will have an effect.
profiles.yml templating
The PrefectDbtRunner class supports templating in your profiles.yml file, allowing you to reference Prefect blocks and variables that will be resolved at runtime.
This enables you to store sensitive credentials securely using Prefect blocks, and configure different targets based on the Prefect workspace.
For example, a Prefect variable called target can have a different value in development (dev) and production (prod) workspaces.
This allows you to use the same profiles.yml file to automatically reference a local DuckDB instance in development and a Snowflake instance in production.
Failure handling
By default, any dbt node execution failures cause the entire dbt run to raise an exception with a message containing detailed information about the failure.PrefectDbtRunner’s raise_on_failure option can be set to False to prevent failures in dbt from causing the failure of the flow or task in which .invoke() is called.
Native dbt configuration
You can disable automatic asset lineage detection for all resources in your dbt project config, or for specific resources in their own config:prefect-dbt 0.6.6 and earlier
prefect-dbt supports a couple of ways to run dbt Core commands.
A DbtCoreOperation block will run the commands as shell commands, while other tasks use dbt’s Programmatic Invocation.
Optionally, specify the project_dir.
If profiles_dir is not set, the DBT_PROFILES_DIR environment variable will be used.
If DBT_PROFILES_DIR is not set, the default directory will be used $HOME/.dbt/.
Use an existing profile
If you have an existing dbtprofiles.yml file, specify the profiles_dir where the file is located:
profiles.yml with a DbtCliProfile block.
Use environment variables with Prefect secret blocks
If you use environment variables inprofiles.yml, set a Prefect Secret block as an environment variable:
profiles.yml file could then access that variable.
Create a new profiles.yml file with blocks
If you don’t have a profiles.yml file, you can use a DbtCliProfile block to create profiles.yml.
Then, specify profiles_dir where profiles.yml will be written.
Here’s example code with placeholders:
Supplying the
dbt_cli_profile argument will overwrite existing profiles.yml filesIf you already have a profiles.yml file in the specified profiles_dir, the file will be overwritten. If you do not specify a profiles directory, profiles.yml at ~/.dbt/ would be overwritten.TargetConfigs blocks.
If the desired service profile is not available, you can build one from the generic TargetConfigs class.
Programmatic Invocation
prefect-dbt has some pre-built tasks that use dbt’s programmatic invocation.
For example:
Create a summary artifact
These pre-built tasks can also create artifacts. These artifacts have extra information about dbt Core runs, such as messages and compiled code for nodes that fail or have errors.
BigQuery CLI profile block example
To create dbt Core target config and profile blocks for BigQuery:- Save and load a
GcpCredentialsblock. - Determine the schema / dataset you want to use in BigQuery.
- Create a short script, replacing the placeholders.
- Determine the dbt commands you want to run.
- Create a short script, replacing the placeholders.
Resources
For assistance using dbt, consult the dbt documentation. Refer to theprefect-dbt SDK documentation to explore all the capabilities of the prefect-dbt library.