prefect-dbt
With prefect-dbt
, you can trigger and observe dbt Cloud jobs, execute dbt Core CLI commands, and incorporate other tools, such as Snowflake, into your dbt runs.
Prefect provides a global view of the state of your workflows and allows you to take action based on state changes.
Getting started
Prerequisites
- A dbt Cloud account if using dbt Cloud.
Install prefect-dbt
The following command will install a version of prefect-dbt
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
pip install "prefect[dbt]"
Upgrade to the latest versions of prefect
and prefect-dbt
:
pip install -U "prefect[dbt]"
If necessary, see additional installation options for dbt Core with BigQuery, Snowflake, and Postgres.
To install with all additional capabilities, use the following command:
pip install -U "prefect-dbt[all_extras]"
Register newly installed blocks types
Register the block types in the prefect-dbt module to make them available for use.
prefect block register -m prefect_dbt
dbt Cloud
If you have an existing dbt Cloud job, use the pre-built flow run_dbt_cloud_job
to trigger a job run and wait until the job run is finished.
If some nodes fail, run_dbt_cloud_job
efficiently retries the unsuccessful nodes.
Prior to running this flow, save your dbt Cloud credentials to a DbtCloudCredentials block:
from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job
@flow
def run_dbt_job_flow():
result = run_dbt_cloud_job(
dbt_cloud_job=DbtCloudJob.load("my-block-name"),
targeted_retries=5,
)
return result
run_dbt_job_flow()
Save dbt Cloud credentials to a block
Blocks can be created through code or through the UI.
To create a dbt Cloud Credentials block:
- Go to your dbt Cloud profile.
- Log in to your dbt Cloud account.
- Scroll to API or click API Access on the sidebar.
- Copy the API Key.
- Click Projects on the sidebar.
- Copy the account ID from the URL:
https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>
. - Create and run the following script, replacing the placeholders:
from prefect_dbt.cloud import DbtCloudCredentials
DbtCloudCredentials(
api_key="API-KEY-PLACEHOLDER",
account_id="ACCOUNT-ID-PLACEHOLDER"
).save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
Then, create a dbt Cloud job block:
- Navigate to your dbt home page.
- On the top nav bar, click on Deploy -> Jobs.
- Select a job.
- Copy the job ID from the URL:
https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID>
- Create and run the following script, replacing the placeholders.
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
dbt_cloud_credentials = DbtCloudCredentials.load("CREDENTIALS-BLOCK-PLACEHOLDER")
dbt_cloud_job = DbtCloudJob(
dbt_cloud_credentials=dbt_cloud_credentials,
job_id="JOB-ID-PLACEHOLDER"
).save("JOB-BLOCK-NAME-PLACEHOLDER")
Load the saved block, which can access your credentials:
from prefect_dbt.cloud import DbtCloudJob
DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")
dbt Core
Prefect-dbt supports execution of dbt Core CLI commands.
If you don’t have a DbtCoreOperation
block saved, create one and set the commands that you want to run.
Optionally, specify the project_dir
.
If profiles_dir
is not set, the DBT_PROFILES_DIR
environment variable will be used.
If DBT_PROFILES_DIR
is not set, the default directory will be used $HOME/.dbt/
.
Use an existing profile
If you have an existing dbt profiles.yml
file, specify the profiles_dir
where the file is located:
from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation
@flow
def trigger_dbt_flow() -> str:
result = DbtCoreOperation(
commands=["pwd", "dbt debug", "dbt run"],
project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER"
).run()
return result
if __name__ == "__main__":
trigger_dbt_flow()
If you are already using Prefect blocks such as the Snowflake Connector block, you can use those blocks to create a new profiles.yml with a DbtCliProfile block.
Use environment variables with Prefect secret blocks
If you use environment variables in profiles.yml
, set a Prefect Secret block as an environment variable:
import os
from prefect.blocks.system import Secret
secret_block = Secret.load("DBT_PASSWORD_PLACEHOLDER")
# Access the stored secret
DBT_PASSWORD = secret_block.get()
os.environ["DBT_PASSWORD"] = DBT_PASSWORD
This example profiles.yml
file could then access that variable.
profile:
target: prod
outputs:
prod:
type: postgres
host: 127.0.0.1
# IMPORTANT: Make sure to quote the entire Jinja string here
user: dbt_user
password: "{{ env_var('DBT_PASSWORD') }}"
Programmatic Invocation
prefect-dbt
has some pre-built tasks that use dbt’s programmatic invocation.
For example:
from prefect import flow
from prefect_dbt.cli.tasks import from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command, dbt_build_task
@flow
def dbt_build_flow():
trigger_dbt_cli_command(
command="dbt deps", project_dir="/Users/test/my_dbt_project_dir",
)
dbt_build_task(
project_dir="/Users/test/my_dbt_project_dir",
create_summary_artifact: bool = True,
summary_artifact_key: str = "dbt-build-task-summary",
extra_command_args=["--model", "foo_model"]
)
if __name__ == "__main__":
dbt_build_flow()
See the SDK docs for other pre-built tasks.
Create a summary artifact
These pre-built tasks can also create artifacts. These artifacts have extra information about dbt Core runs, such as messages and compiled code for nodes that fail or have errors.
Create a new profile with blocks
Use a DbtCliProfile block to create profiles.yml
.
Then, specify profiles_dir
where profiles.yml
will be written.
Here’s example code with placeholders:
from prefect import flow
from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation
@flow
def trigger_dbt_flow():
dbt_cli_profile = DbtCliProfile.load("DBT-CORE-OPERATION-BLOCK-PLACEHOLDER")
with DbtCoreOperation(
commands=["dbt debug", "dbt run"],
project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER",
dbt_cli_profile=dbt_cli_profile,
) as dbt_operation:
dbt_process = dbt_operation.trigger()
# do other things before waiting for completion
dbt_process.wait_for_completion()
result = dbt_process.fetch_result()
return result
if __name__ == "__main__":
trigger_dbt_flow()
Supplying the dbt_cli_profile
argument will overwrite existing profiles.yml
files
If you already have a profiles.yml
file in the specified profiles_dir
, the file will be overwritten. If you do not specify a profiles directory, profiles.yml
at ~/.dbt/
would be overwritten.
Visit the SDK reference in the side navigation to see other built-in TargetConfigs
blocks.
If the desired service profile is not available, you can build one from the generic TargetConfigs
class.
BigQuery profile example
To create dbt Core target config and profile blocks for BigQuery:
- Save and load a
GcpCredentials
block. - Determine the schema / dataset you want to use in BigQuery.
- Create a short script, replacing the placeholders.
from prefect_gcp.credentials import GcpCredentials
from prefect_dbt.cli import BigQueryTargetConfigs, DbtCliProfile
credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
target_configs = BigQueryTargetConfigs(
schema="SCHEMA-NAME-PLACEHOLDER", # also known as dataset
credentials=credentials,
)
target_configs.save("TARGET-CONFIGS-BLOCK-NAME-PLACEHOLDER")
dbt_cli_profile = DbtCliProfile(
name="PROFILE-NAME-PLACEHOLDER",
target="TARGET-NAME-placeholder",
target_configs=target_configs,
)
dbt_cli_profile.save("DBT-CLI-PROFILE-BLOCK-NAME-PLACEHOLDER")
To create a dbt Core operation block:
- Determine the dbt commands you want to run.
- Create a short script, replacing the placeholders.
from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation
dbt_cli_profile = DbtCliProfile.load("DBT-CLI-PROFILE-BLOCK-NAME-PLACEHOLDER")
dbt_core_operation = DbtCoreOperation(
commands=["DBT-CLI-COMMANDS-PLACEHOLDER"],
dbt_cli_profile=dbt_cli_profile,
overwrite_profiles=True,
)
dbt_core_operation.save("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")
Load the saved block that holds your credentials:
from prefect_dbt.cloud import DbtCoreOperation
DbtCoreOperation.load("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")
Resources
For assistance using dbt, consult the dbt documentation.
Refer to the prefect-dbt
SDK documentation linked in the sidebar to explore all the capabilities of the prefect-dbt
library.
Additional installation options
Additional installation options for dbt Core with BigQuery, Snowflake, and Postgres are shown below.
Additional capabilities for dbt Core and Snowflake profiles
First install the main library compatible with your Prefect version:
pip install "prefect[dbt]"
Then install the additional capabilities you need.
pip install "prefect-dbt[snowflake]"
Additional capabilities for dbt Core and BigQuery profiles
pip install "prefect-dbt[bigquery]"
Additional capabilities for dbt Core and Postgres profiles
pip install "prefect-dbt[postgres]"