> ## Documentation Index
> Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
> Use this file to discover all available pages before exploring further.

# flows

# `prefect_databricks.flows`

Module containing flows for interacting with Databricks

## Functions

### `jobs_runs_submit_and_wait_for_completion` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L63" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
jobs_runs_submit_and_wait_for_completion(databricks_credentials: DatabricksCredentials, tasks: Optional[List[RunSubmitTaskSettings]] = None, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, git_source: Optional[GitSource] = None, timeout_seconds: Optional[int] = None, idempotency_token: Optional[str] = None, access_control_list: Optional[List[AccessControlRequest]] = None, return_metadata: bool = False, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]
```

Flow that triggers a job run and waits for the triggered run to complete.

**Args:**

* `databricks_credentials`:
  Credentials to use for authentication with Databricks.
* `tasks`: Tasks to run, e.g.

```
[
    {
        "task_key"\: "Sessionize",
        "description"\: "Extracts session data from events",
        "depends_on"\: [],
        "existing_cluster_id"\: "0923-164208-meows279",
        "spark_jar_task"\: {
            "main_class_name"\: "com.databricks.Sessionize",
            "parameters"\: ["--data", "dbfs\:/path/to/data.json"],
        },
        "libraries"\: [{"jar"\: "dbfs\:/mnt/databricks/Sessionize.jar"}],
        "timeout_seconds"\: 86400,
    },
    {
        "task_key"\: "Orders_Ingest",
        "description"\: "Ingests order data",
        "depends_on"\: [],
        "existing_cluster_id"\: "0923-164208-meows279",
        "spark_jar_task"\: {
            "main_class_name"\: "com.databricks.OrdersIngest",
            "parameters"\: ["--data", "dbfs\:/path/to/order-data.json"],
        },
        "libraries"\: [{"jar"\: "dbfs\:/mnt/databricks/OrderIngest.jar"}],
        "timeout_seconds"\: 86400,
    },
    {
        "task_key"\: "Match",
        "description"\: "Matches orders with user sessions",
        "depends_on"\: [
            {"task_key"\: "Orders_Ingest"},
            {"task_key"\: "Sessionize"},
        ],
        "new_cluster"\: {
            "spark_version"\: "7.3.x-scala2.12",
            "node_type_id"\: "i3.xlarge",
            "spark_conf"\: {"spark.speculation"\: True},
            "aws_attributes"\: {
                "availability"\: "SPOT",
                "zone_id"\: "us-west-2a",
            },
            "autoscale"\: {"min_workers"\: 2, "max_workers"\: 16},
        },
        "notebook_task"\: {
            "notebook_path"\: "/Users/user.name@databricks.com/Match",
            "base_parameters"\: {"name"\: "John Doe", "age"\: "35"},
        },
        "timeout_seconds"\: 86400,
    },
]
```

* `run_name`:
  An optional name for the run. The default value is `Untitled`, e.g. `A
  multitask job run`.
* `git_source`:
  This functionality is in Public Preview.  An optional specification for
  a remote repository containing the notebooks used by this
  job's notebook tasks. Key-values:
* git\_url:
  URL of the repository to be cloned by this job. The maximum
  length is 300 characters, e.g.
  `https\://github.com/databricks/databricks-cli`.
* git\_provider:
  Unique identifier of the service used to host the Git
  repository. The value is case insensitive, e.g. `github`.
* git\_branch:
  Name of the branch to be checked out and used by this job.
  This field cannot be specified in conjunction with git\_tag
  or git\_commit. The maximum length is 255 characters, e.g.
  `main`.
* git\_tag:
  Name of the tag to be checked out and used by this job. This
  field cannot be specified in conjunction with git\_branch or
  git\_commit. The maximum length is 255 characters, e.g.
  `release-1.0.0`.
* git\_commit:
  Commit to be checked out and used by this job. This field
  cannot be specified in conjunction with git\_branch or
  git\_tag. The maximum length is 64 characters, e.g.
  `e0056d01`.
* git\_snapshot:
  Read-only state of the remote repository at the time the job was run.
  This field is only included on job runs.
* `timeout_seconds`:
  An optional timeout applied to each run of this job. The default
  behavior is to have no timeout, e.g. `86400`.
* `idempotency_token`:
  An optional token that can be used to guarantee the idempotency of job
  run requests. If a run with the provided token already
  exists, the request does not create a new run but returns
  the ID of the existing run instead. If a run with the
  provided token is deleted, an error is returned.  If you
  specify the idempotency token, upon failure you can retry
  until the request succeeds. Databricks guarantees that
  exactly one run is launched with that idempotency token.
  This token must have at most 64 characters.  For more
  information, see [How to ensure idempotency for
  jobs](https://kb.databricks.com/jobs/jobs-idempotency.html),
  e.g. `8f018174-4792-40d5-bcbc-3e6a527352c8`.
* `access_control_list`:
  List of permissions to set on the job.
* `max_wait_seconds`: Maximum number of seconds to wait for the entire flow to complete.
* `poll_frequency_seconds`: Number of seconds to wait in between checks for
  run completion.
* `return_metadata`: When True, method will return a tuple of notebook output as well as
  job run metadata; by default though, the method only returns notebook output
* `job_submission_handler`: An optional callable to intercept job submission.
* `**jobs_runs_submit_kwargs`: Additional keyword arguments to pass to `jobs_runs_submit`.

**Returns:**

* Either a dict or a tuple (depends on `return_metadata`) comprised of
* * task\_notebook\_outputs: dictionary of task keys to its corresponding notebook output;
    this is the only object returned by default from this method
* * jobs\_runs\_metadata: dictionary containing IDs of the jobs runs tasks; this is only
    returned if `return_metadata=True`.

**Examples:**

Submit jobs runs and wait.

```python theme={null}
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AutoScale,
    AwsAttributes,
    JobTaskSettings,
    NotebookTask,
    NewCluster,
)

@flow
def jobs_runs_submit_and_wait_for_completion_flow(notebook_path, **base_parameters):
    databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")

    # specify new cluster settings
    aws_attributes = AwsAttributes(
        availability="SPOT",
        zone_id="us-west-2a",
        ebs_volume_type="GENERAL_PURPOSE_SSD",
        ebs_volume_count=3,
        ebs_volume_size=100,
    )
    auto_scale = AutoScale(min_workers=1, max_workers=2)
    new_cluster = NewCluster(
        aws_attributes=aws_attributes,
        autoscale=auto_scale,
        node_type_id="m4.large",
        spark_version="10.4.x-scala2.12",
        spark_conf={"spark.speculation": True},
    )

    # specify notebook to use and parameters to pass
    notebook_task = NotebookTask(
        notebook_path=notebook_path,
        base_parameters=base_parameters,
    )

    # compile job task settings
    job_task_settings = JobTaskSettings(
        new_cluster=new_cluster,
        notebook_task=notebook_task,
        task_key="prefect-task"
    )

    multi_task_runs = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job",
        tasks=[job_task_settings]
    )

    return multi_task_runs
```

### `jobs_runs_wait_for_completion` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L409" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
jobs_runs_wait_for_completion(multi_task_jobs_runs_id: int, databricks_credentials: DatabricksCredentials, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10)
```

Flow that triggers a job run and waits for the triggered run to complete.

**Args:**

* `run_name`: The name of the jobs runs task.
* `multi_task_jobs_run_id`: The ID of the jobs runs task to watch.
* `databricks_credentials`:
  Credentials to use for authentication with Databricks.
* `max_wait_seconds`:
  Maximum number of seconds to wait for the entire flow to complete.
* `poll_frequency_seconds`: Number of seconds to wait in between checks for
  run completion.

**Returns:**

* A dict containing the jobs runs life cycle state and message.
* A dict containing IDs of the jobs runs tasks.

### `jobs_runs_submit_by_id_and_wait_for_completion` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L498" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
jobs_runs_submit_by_id_and_wait_for_completion(databricks_credentials: DatabricksCredentials, job_id: int, idempotency_token: Optional[str] = None, jar_params: Optional[List[str]] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, notebook_params: Optional[Dict] = None, python_params: Optional[List[str]] = None, spark_submit_params: Optional[List[str]] = None, python_named_params: Optional[Dict] = None, pipeline_params: Optional[str] = None, sql_params: Optional[Dict] = None, dbt_commands: Optional[List] = None, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Dict
```

flow that triggers an existing job and waits for its completion

**Args:**

* `databricks_credentials`: Credentials to use for authentication with Databricks.
* `job_id`: Id of the databricks job.
* `idempotency_token`:
  An optional token that can be used to guarantee the idempotency of job
  run requests. If a run with the provided token already
  exists, the request does not create a new run but returns
  the ID of the existing run instead. If a run with the
  provided token is deleted, an error is returned.  If you
  specify the idempotency token, upon failure you can retry
  until the request succeeds. Databricks guarantees that
  exactly one run is launched with that idempotency token.
  This token must have at most 64 characters.  For more
  information, see [How to ensure idempotency for
  jobs](https://kb.databricks.com/jobs/jobs-idempotency.html),
  e.g. `8f018174-4792-40d5-bcbc-3e6a527352c8`.
* `jar_params`:
  A list of parameters for jobs with Spark JAR tasks, for example "jar\_params"
  : \["john doe", "35"]. The parameters are used to invoke the main function of
  the main class specified in the Spark JAR task. If not specified upon run-
  now, it defaults to an empty list. jar\_params cannot be specified in
  conjunction with notebook\_params. The JSON representation of this field (for
  example `{"jar_params"\: ["john doe","35"]}`) cannot exceed 10,000 bytes.
* `max_wait_seconds`:
  Maximum number of seconds to wait for the entire flow to complete.
* `poll_frequency_seconds`: Number of seconds to wait in between checks for
  run completion.
* `notebook_params`:
  A map from keys to values for jobs with notebook task, for example
  "notebook\_params": `{"name"\: "john doe", "age"\: "35"}`. The map is
  passed to the notebook and is accessible through the dbutils.widgets.get
  function. If not specified upon run-now, the triggered run uses the job’s
  base parameters. notebook\_params cannot be specified in conjunction with
  jar\_params. Use Task parameter variables to set parameters containing
  information about job runs. The JSON representation of this field
  (for example `{"notebook_params"\:{"name"\:"john doe","age"\:"35"}}`) cannot
  exceed 10,000 bytes.
* `python_params`:
  A list of parameters for jobs with Python tasks, for example "python\_params"
  :\["john doe", "35"]. The parameters are passed to Python file as command-
  line parameters. If specified upon run-now, it would overwrite the
  parameters specified in job setting. The JSON representation of this field
  (for example `{"python_params"\:["john doe","35"]}`) cannot exceed 10,000 bytes
  Use Task parameter variables to set parameters containing information
  about job runs. These parameters accept only Latin characters (ASCII
  character set). Using non-ASCII characters returns an error. Examples of
  invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.
* `spark_submit_params`:
  A list of parameters for jobs with spark submit task, for example
  "spark\_submit\_params": \["--class", "org.apache.spark.examples.SparkPi"].
  The parameters are passed to spark-submit script as command-line parameters.
  If specified upon run-now, it would overwrite the parameters specified in
  job setting. The JSON representation of this field (for example
  `{"python_params"\:["john doe","35"]}`) cannot exceed 10,000 bytes.
  Use Task parameter variables to set parameters containing information about
  job runs. These parameters accept only Latin characters (ASCII character
  set). Using non-ASCII characters returns an error. Examples of invalid,
  non-ASCII characters are Chinese, Japanese kanjis, and emojis.
* `python_named_params`:
  A map from keys to values for jobs with Python wheel task, for example
  "python\_named\_params": `{"name"\: "task", "data"\: "dbfs\:/path/to/data.json"}`.
* `pipeline_params`:
  If `full_refresh` is set to true, trigger a full refresh on the
  delta live table e.g.

```
    "pipeline_params"\: {"full_refresh"\: true}
```

* `sql_params`:
  A map from keys to values for SQL tasks, for example "sql\_params":
  `{"name"\: "john doe", "age"\: "35"}`. The SQL alert task does not support
  custom parameters.
* `dbt_commands`:
  An array of commands to execute for jobs with the dbt task,
  for example "dbt\_commands": \["dbt deps", "dbt seed", "dbt run"]
* `job_submission_handler`: An optional callable to intercept job submission

**Raises:**

* `DatabricksJobTerminated`:
  Raised when the Databricks job run is terminated with a non-successful
  result state.
* `DatabricksJobSkipped`: Raised when the Databricks job run is skipped.
* `DatabricksJobInternalError`:
  Raised when the Databricks job run encounters an internal error.

**Returns:**

* A dictionary containing information about the completed job run.

## Classes

### `DatabricksJobTerminated` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L31" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when Databricks jobs runs submit terminates

### `DatabricksJobSkipped` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L35" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when Databricks jobs runs submit skips

### `DatabricksJobInternalError` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L39" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when Databricks jobs runs submit encounters internal error

### `DatabricksJobRunTimedOut` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L43" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when Databricks jobs runs does not complete in the configured max
wait seconds
