# DBT Tasks

Verified by Prefect

These tasks have been tested and verified by Prefect.

This module contains a task for interacting with dbt via the shell.

# DbtShellTask

class

prefect.tasks.dbt.dbt.DbtShellTask

(command=None, profile_name=None, env=None, environment=None, overwrite_profiles=False, profiles_dir=None, set_profiles_envar=True, dbt_kwargs=None, helper_script=None, shell="bash", return_all=False, log_stderr=False, **kwargs)[source]

Task for running dbt commands. It will create a profiles.yml file prior to running dbt commands.

This task inherits all configuration options from the ShellTask.

Args:

  • command (string, optional): dbt command to be executed; can also be provided post-initialization by calling this task instance
  • dbt_kwargs (dict, optional): keyword arguments used to populate the profiles.yml file (e.g. {'type': 'snowflake', 'threads': 4, 'account': '...'}); can also be provided at runtime
  • env (dict, optional): dictionary of environment variables to use for the subprocess; can also be provided at runtime
  • environment (string, optional): The default target your dbt project will use
  • overwrite_profiles (boolean, optional): flag to indicate whether existing profiles.yml file should be overwritten; defaults to False
  • profile_name (string, optional): Profile name used for populating the profile name of profiles.yml
  • profiles_dir (string, optional): path to directory where the profile.yml file will be contained
  • set_profiles_envar (boolean, optional): flag to indicate whether DBT_PROFILES_DIR should be set to the provided profiles_dir; defaults to True
  • helper_script (str, optional): a string representing a shell script, which will be executed prior to the command in the same process. Can be used to change directories, define helper functions, etc. when re-using this Task for different commands in a Flow; can also be provided at runtime
  • shell (string, optional): shell to run the command with; defaults to "bash"
  • return_all (bool, optional): boolean specifying whether this task should return all lines of stdout as a list, or just the last line as a string; defaults to False
  • log_stderr (bool, optional): boolean specifying whether this task should log the output from stderr in the case of a non-zero exit code; defaults to False
  • **kwargs: additional keyword arguments to pass on to the Task constructor
Example:

    from prefect import Flow
    from prefect.tasks.dbt import DbtShellTask

    with Flow(name="dbt_flow") as f:
        task = DbtShellTask(
            profile_name='default',
            environment='test',
            dbt_kwargs={
                'type': 'snowflake',
                'threads': 1,
                'account': 'account.us-east-1'
            },
            overwrite_profiles=True,
            profiles_dir=test_path
        )(command='dbt run')

    out = f.run()

methods:                                                                                                                                                       

prefect.tasks.dbt.dbt.DbtShellTask.run

(command=None, env=None, helper_script=None, dbt_kwargs=None)[source]

If no profiles.yml file is found or if overwrite_profiles flag is set to True, this will first generate a profiles.yml file in the profiles_dir directory. Then run the dbt cli shell command.

Args:

  • command (string): shell command to be executed; can also be provided at task initialization. Any variables / functions defined in self.helper_script will be available in the same process this command runs in
  • env (dict, optional): dictionary of environment variables to use for the subprocess
  • helper_script (str, optional): a string representing a shell script, which will be executed prior to the command in the same process. Can be used to change directories, define helper functions, etc. when re-using this Task for different commands in a Flow
  • dbt_kwargs(dict, optional): keyword arguments used to populate the profiles.yml file
Returns:
  • stdout (string): if return_all is False (the default), only the last line of stdout is returned, otherwise all lines are returned, which is useful for passing result of shell command to other downstream tasks. If there is no output, None is returned.
Raises:
  • prefect.engine.signals.FAIL: if command has an exit code other than 0



# DbtCloudRunJob

class

prefect.tasks.dbt.dbt.DbtCloudRunJob

(cause=None, account_id=None, job_id=None, token=None, additional_args=None, account_id_env_var_name="DBT_CLOUD_ACCOUNT_ID", job_id_env_var_name="DBT_CLOUD_JOB_ID", token_env_var_name="DBT_CLOUD_TOKEN", wait_for_job_run_completion=False, max_wait_time=None, domain="cloud.getdbt.com", **kwargs)[source]

Task for running a dbt Cloud job using dbt Cloud APIs v2. For info about dbt Cloud APIs, please refer to https://docs.getdbt.com/dbt-cloud/api-v2 Please note that this task will fail if any call to dbt Cloud APIs fails.

Running this task will generate a markdown artifact viewable in the Prefect UI. The artifact will contain links to the dbt artifacts generate as a result of the job run.

Args:

  • cause (string): A string describing the reason for triggering the job run
  • account_id (int, optional): dbt Cloud account ID. Can also be passed as an env var.
  • job_id (int, optional): dbt Cloud job ID
  • token (string, optional): dbt Cloud token. Please note that this token must have access at least to the dbt Trigger Job API.
  • additional_args (dict, optional): additional information to pass to the Trigger Job API. For a list of the possible information, have a look at: https://docs.getdbt.com/dbt-cloud/api-v2#operation/triggerRun
  • account_id_env_var_name (string, optional): the name of the env var that contains the dbt Cloud account ID. Defaults to DBT_CLOUD_ACCOUNT_ID. Used only if account_id is None.
  • job_id_env_var_name (string, optional): the name of the env var that contains the dbt Cloud job ID Default to DBT_CLOUD_JOB_ID. Used only if job_id is None.
  • token_env_var_name (string, optional): the name of the env var that contains the dbt Cloud token Default to DBT_CLOUD_TOKEN. Used only if token is None.
  • wait_for_job_run_completion (boolean, optional): Whether the task should wait for the job run completion or not. Default to False.
  • max_wait_time (int, optional): The number of seconds to wait for the dbt Cloud job to finish. Used only if wait_for_job_run_completion = True.
  • domain (str, optional): Custom domain for API call. Defaults to cloud.getdbt.com.
  • **kwargs: additional keyword arguments to pass to the Task constructor
Returns:
    if wait_for_job_run_completion = True, then returns the get job result. The get job result is the dict under the "data" key. Have a look at the Response section at: https://docs.getdbt.com/dbt-cloud/api-v2#operation/getRunByIdRaises:
    • prefect.engine.signals.FAIL: whether there's a HTTP status code != 200 and also whether the run job result has a status != 10 AND "finished_at" is not None Have a look at the status codes at: https://docs.getdbt.com/dbt-cloud/api-v2#operation/getRunById

    methods:                                                                                                                                                       

    prefect.tasks.dbt.dbt.DbtCloudRunJob.run

    (cause=None, account_id=None, job_id=None, token=None, additional_args=None, account_id_env_var_name="ACCOUNT_ID", job_id_env_var_name="JOB_ID", token_env_var_name="DBT_CLOUD_TOKEN", wait_for_job_run_completion=False, max_wait_time=None, domain=None)[source]

    All params available to the run method can also be passed during initialization.

    Args:

    • cause (string): A string describing the reason for triggering the job run
    • account_id (int, optional): dbt Cloud account ID. Can also be passed as an env var.
    • job_id (int, optional): dbt Cloud job ID
    • token (string, optional): dbt Cloud token. Please note that this token must have access at least to the dbt Trigger Job API.
    • additional_args (dict, optional): additional information to pass to the Trigger Job API. For a list of the possible information, have a look at: https://docs.getdbt.com/dbt-cloud/api-v2#operation/triggerRun
    • account_id_env_var_name (string, optional): the name of the env var that contains the dbt Cloud account ID. Defaults to DBT_CLOUD_ACCOUNT_ID. Used only if account_id is None.
    • job_id_env_var_name (string, optional): the name of the env var that contains the dbt Cloud job ID Default to DBT_CLOUD_JOB_ID. Used only if job_id is None.
    • token_env_var_name (string, optional): the name of the env var that contains the dbt Cloud token Default to DBT_CLOUD_TOKEN. Used only if token is None.
    • wait_for_job_run_completion (boolean, optional): Whether the task should wait for the job run completion or not. Default to False.
    • max_wait_time (int, optional): The number of seconds to wait for the dbt Cloud job to finish. Used only if wait_for_job_run_completion = True.
    • domain (str, optional): Custom domain for API call. Defaults to cloud.getdbt.com.
    Returns:
      if wait_for_job_run_completion = True, then returns the get job result. The get job result is the dict under the "data" key. Links to the dbt artifacts are also included under the artifact_urls key. Have a look at the Response section at: https://docs.getdbt.com/dbt-cloud/api-v2#operation/getRunByIdRaises:
      • prefect.engine.signals.FAIL: whether there's a HTTP status code != 200 and also whether the run job result has a status != 10 AND "finished_at" is not None Have a look at the status codes at: https://docs.getdbt.com/dbt-cloud/api-v2#operation/getRunById



      This documentation was auto-generated from commit ffa9a6c
      on February 1, 2023 at 18:44 UTC