Skip to main content

prefect_dbt.cli.commands

Module containing tasks and flows for interacting with dbt CLI

Functions

atrigger_dbt_cli_command

atrigger_dbt_cli_command(command: str, profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: Optional[str] = 'dbt-cli-command-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True) -> Optional[dbtRunnerResult]
Async task for running dbt commands. See trigger_dbt_cli_command for full docs.

trigger_dbt_cli_command

trigger_dbt_cli_command(command: str, profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: Optional[str] = 'dbt-cli-command-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True) -> Optional[dbtRunnerResult]
Task for running dbt commands. If no profiles.yml file is found or if overwrite_profiles flag is set to True, this will first generate a profiles.yml file in the profiles_dir directory. Then run the dbt CLI shell command. Args:
  • command: The dbt command to be executed.
  • profiles_dir: The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR environment variable, but if that’s also not set, will use the default directory $HOME/.dbt/.
  • project_dir: The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.
  • overwrite_profiles: Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.
  • dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.
  • create_summary_artifact: If True, creates a Prefect artifact on the task run with the dbt results using the specified artifact key. Defaults to False.
  • summary_artifact_key: The key under which to store the dbt results artifact in Prefect. Defaults to ‘dbt-cli-command-summary’.
  • extra_command_args: Additional command arguments to pass to the dbt command. These arguments get appended to the command that gets passed to the dbtRunner client. Example: extra_command_args=[“—model”, “foo_model”]
  • stream_output: If True, the output from the dbt command will be logged in Prefect as it happens. Defaults to True.
Returns:
  • The result from the dbt CLI invocation.
Examples: Execute dbt debug with a pre-populated profiles.yml.
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command

@flow
def trigger_dbt_cli_command_flow():
    result = trigger_dbt_cli_command("dbt debug")
    return result

trigger_dbt_cli_command_flow()

arun_dbt_build

arun_dbt_build(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-build-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Async version of run_dbt_build. See run_dbt_build for full documentation.

run_dbt_build

run_dbt_build(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-build-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Executes the ‘dbt build’ command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt build results. Args:
  • profiles_dir: The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that’s also not set, will use the default directory $HOME/.dbt/.
  • project_dir: The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.
  • overwrite_profiles: Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.
  • dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.
  • create_summary_artifact: If True, creates a Prefect artifact on the task run with the dbt build results using the specified artifact key. Defaults to False.
  • summary_artifact_key: The key under which to store the dbt build results artifact in Prefect. Defaults to ‘dbt-build-task-summary’.
  • extra_command_args: Additional command arguments to pass to the dbt build command.
  • stream_output: If True, the output from the dbt command will be logged in Prefect as it happens. Defaults to True.
Raises:
  • ValueError: If required dbt_cli_profile is not provided when needed for profile writing.
  • RuntimeError: If the dbt build fails for any reason, it will be indicated by the exception raised.

arun_dbt_model

arun_dbt_model(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-run-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Async version of run_dbt_model. See run_dbt_model for full documentation.

run_dbt_model

run_dbt_model(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-run-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Executes the ‘dbt run’ command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt model results. Args:
  • profiles_dir: The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that’s also not set, will use the default directory $HOME/.dbt/.
  • project_dir: The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.
  • overwrite_profiles: Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.
  • dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.
  • create_summary_artifact: If True, creates a Prefect artifact on the task run with the dbt model run results using the specified artifact key. Defaults to False.
  • summary_artifact_key: The key under which to store the dbt model run results artifact in Prefect. Defaults to ‘dbt-run-task-summary’.
  • extra_command_args: Additional command arguments to pass to the dbt run command.
  • stream_output: If True, the output from the dbt command will be logged in Prefect as it happens. Defaults to True.
Raises:
  • ValueError: If required dbt_cli_profile is not provided when needed for profile writing.
  • RuntimeError: If the dbt build fails for any reason, it will be indicated by the exception raised.

arun_dbt_test

arun_dbt_test(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-test-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Async version of run_dbt_test. See run_dbt_test for full documentation.

run_dbt_test

run_dbt_test(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-test-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Executes the ‘dbt test’ command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt test results. Args:
  • profiles_dir: The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that’s also not set, will use the default directory $HOME/.dbt/.
  • project_dir: The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.
  • overwrite_profiles: Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.
  • dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.
  • create_summary_artifact: If True, creates a Prefect artifact on the task run with the dbt test results using the specified artifact key. Defaults to False.
  • summary_artifact_key: The key under which to store the dbt test results artifact in Prefect. Defaults to ‘dbt-test-task-summary’.
  • extra_command_args: Additional command arguments to pass to the dbt test command.
  • stream_output: If True, the output from the dbt command will be logged in Prefect as it happens. Defaults to True.
Raises:
  • ValueError: If required dbt_cli_profile is not provided when needed for profile writing.
  • RuntimeError: If the dbt build fails for any reason, it will be indicated by the exception raised.

arun_dbt_snapshot

arun_dbt_snapshot(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-snapshot-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Async version of run_dbt_snapshot. See run_dbt_snapshot for full documentation.

run_dbt_snapshot

run_dbt_snapshot(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-snapshot-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Executes the ‘dbt snapshot’ command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt snapshot results. Args:
  • profiles_dir: The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that’s also not set, will use the default directory $HOME/.dbt/.
  • project_dir: The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.
  • overwrite_profiles: Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.
  • dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.
  • create_summary_artifact: If True, creates a Prefect artifact on the task run with the dbt snapshot results using the specified artifact key. Defaults to False.
  • summary_artifact_key: The key under which to store the dbt snapshot results artifact in Prefect. Defaults to ‘dbt-snapshot-task-summary’.
  • extra_command_args: Additional command arguments to pass to the dbt snapshot command.
  • stream_output: If True, the output from the dbt command will be logged in Prefect as it happens. Defaults to True.
Raises:
  • ValueError: If required dbt_cli_profile is not provided when needed for profile writing.
  • RuntimeError: If the dbt build fails for any reason, it will be indicated by the exception raised.

arun_dbt_seed

arun_dbt_seed(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-seed-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Async version of run_dbt_seed. See run_dbt_seed for full documentation.

run_dbt_seed

run_dbt_seed(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-seed-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Executes the ‘dbt seed’ command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt seed results. Args:
  • profiles_dir: The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that’s also not set, will use the default directory $HOME/.dbt/.
  • project_dir: The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.
  • overwrite_profiles: Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.
  • dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.
  • create_summary_artifact: If True, creates a Prefect artifact on the task run with the dbt seed results using the specified artifact key. Defaults to False.
  • summary_artifact_key: The key under which to store the dbt seed results artifact in Prefect. Defaults to ‘dbt-seed-task-summary’.
  • extra_command_args: Additional command arguments to pass to the dbt seed command.
  • stream_output: If True, the output from the dbt command will be logged in Prefect as it happens. Defaults to True.
Raises:
  • ValueError: If required dbt_cli_profile is not provided when needed for profile writing.
  • RuntimeError: If the dbt build fails for any reason, it will be indicated by the exception raised.

arun_dbt_source_freshness

arun_dbt_source_freshness(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-source-freshness-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Async version of run_dbt_source_freshness. See run_dbt_source_freshness for full documentation.

run_dbt_source_freshness

run_dbt_source_freshness(profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, overwrite_profiles: bool = False, dbt_cli_profile: Optional[DbtCliProfile] = None, create_summary_artifact: bool = False, summary_artifact_key: str = 'dbt-source-freshness-task-summary', extra_command_args: Optional[List[str]] = None, stream_output: bool = True)
Executes the ‘dbt source freshness’ command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt source freshness results. Args:
  • profiles_dir: The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that’s also not set, will use the default directory $HOME/.dbt/.
  • project_dir: The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.
  • overwrite_profiles: Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.
  • dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.
  • create_summary_artifact: If True, creates a Prefect artifact on the task run with the dbt source freshness results using the specified artifact key. Defaults to False.
  • summary_artifact_key: The key under which to store the dbt source freshness results artifact in Prefect. Defaults to ‘dbt-source-freshness-task-summary’.
  • extra_command_args: Additional command arguments to pass to the dbt source freshness command.
  • stream_output: If True, the output from the dbt command will be logged in Prefect as it happens. Defaults to True.
Raises:
  • ValueError: If required dbt_cli_profile is not provided when needed for profile writing.
  • RuntimeError: If the dbt build fails for any reason, it will be indicated by the exception raised.

create_summary_markdown

create_summary_markdown(run_results: dict[str, Any], command: str) -> str
Creates a Prefect task artifact summarizing the results of the above predefined prefrect-dbt task.

consolidate_run_results

consolidate_run_results(results: dbtRunnerResult) -> dict

Classes

DbtCoreOperation

A block representing a dbt operation, containing multiple dbt and shell commands. For long-lasting operations, use the trigger method and utilize the block as a context manager for automatic closure of processes when context is exited. If not, manually call the close method to close processes. For short-lasting operations, use the run method. Context is automatically managed with this method. Attributes:
  • commands: A list of commands to execute sequentially.
  • stream_output: Whether to stream output.
  • env: A dictionary of environment variables to set for the shell operation.
  • working_dir: The working directory context the commands will be executed within.
  • shell: The shell to use to execute the commands.
  • extension: The extension to use for the temporary file. if unset defaults to .ps1 on Windows and .sh on other platforms.
  • profiles_dir: The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the dbt commands provided. If this is not set, will try using the DBT_PROFILES_DIR environment variable, but if that’s also not set, will use the default directory $HOME/.dbt/.
  • project_dir: The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.
  • overwrite_profiles: Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.
  • dbt_cli_profile: Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.
Examples: Load a configured block.
from prefect_dbt import DbtCoreOperation

dbt_op = DbtCoreOperation.load("BLOCK_NAME")
Execute short-lasting dbt debug and list with a custom DbtCliProfile.
from prefect_dbt import DbtCoreOperation, DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake import SnowflakeConnector

snowflake_connector = await SnowflakeConnector.load("snowflake-connector")
target_configs = SnowflakeTargetConfigs(connector=snowflake_connector)
dbt_cli_profile = DbtCliProfile(
    name="jaffle_shop",
    target="dev",
    target_configs=target_configs,
)
dbt_init = DbtCoreOperation(
    commands=["dbt debug", "dbt list"],
    dbt_cli_profile=dbt_cli_profile,
    overwrite_profiles=True
)
dbt_init.run()
Execute a longer-lasting dbt run as a context manager.
with DbtCoreOperation(commands=["dbt run"]) as dbt_run:
    dbt_process = dbt_run.trigger()
    # do other things
    dbt_process.wait_for_completion()
    dbt_output = dbt_process.fetch_result()