Skip to main content
prefect-databricks lets a Prefect flow drive Databricks Jobs: authenticate with a credentials block, trigger an existing job or submit a one-time notebook/JAR run, and wait for the run to finish and collect its output. The most common workflows are:
  • Trigger an existing Databricks job and wait for it to complete — the flagship use case.
  • Submit a one-time notebook (or JAR/Python) run on a new cluster and wait for its output.
  • List and inspect jobs and runs with the lower-level task wrappers.

Getting started

Prerequisites

Install prefect-databricks

The following installs a version of prefect-databricks compatible with your installed version of prefect. If you don’t already have prefect installed, it installs the newest version as well.
pip install "prefect[databricks]"

Create a credentials block

Every workflow below loads a DatabricksCredentials block by name, so create one first. Construct it and call .save() to persist it to your Prefect API:
from prefect_databricks import DatabricksCredentials

# Personal access token (PAT)
DatabricksCredentials(
    databricks_instance="YOUR_INSTANCE.cloud.databricks.com",
    token="YOUR_TOKEN",
).save("databricks", overwrite=True)
To authenticate as a service principal (OAuth) instead of a PAT, provide client_id and client_secret (and tenant_id for Azure Databricks):
from prefect_databricks import DatabricksCredentials

DatabricksCredentials(
    databricks_instance="dbc-abc12345-6789.cloud.databricks.com",
    client_id="my-client-id",
    client_secret="my-client-secret",
).save("databricks", overwrite=True)
You can also register the block type so it appears in the UI:
prefect block register -m prefect_databricks

Trigger an existing job and wait for it to complete

This is the most common workflow: kick off a job you’ve already defined in Databricks (by its job_id) and block until it finishes, polling along the way. jobs_runs_submit_by_id_and_wait_for_completion is an async flow, so run it with asyncio.run.
import asyncio

from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_by_id_and_wait_for_completion


async def trigger_databricks_job():
    databricks_credentials = await DatabricksCredentials.load("databricks")
    run = await jobs_runs_submit_by_id_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        job_id=11223344,
        max_wait_seconds=1800,  # give up after 30 minutes
        poll_frequency_seconds=30,
    )
    return run


if __name__ == "__main__":
    asyncio.run(trigger_databricks_job())

Submit a one-time notebook run and wait for its output

To run a notebook without first defining a job, submit a one-time run on a new cluster. jobs_runs_submit_and_wait_for_completion waits for completion and returns the notebook outputs keyed by task. Given a notebook at /Users/you@example.com/example that reads a name widget:
name = dbutils.widgets.get("name")
print(f"Welcome to prefect-databricks, {name}!")
The flow that launches a cluster, runs the notebook, and waits for its output:
import asyncio

from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AutoScale,
    JobTaskSettings,
    NewCluster,
    NotebookTask,
)


async def run_notebook():
    databricks_credentials = await DatabricksCredentials.load("databricks")

    new_cluster = NewCluster(
        autoscale=AutoScale(min_workers=1, max_workers=2),
        node_type_id="m4.large",
        spark_version="10.4.x-scala2.12",
    )
    notebook_task = NotebookTask(
        notebook_path="/Users/you@example.com/example",
        base_parameters={"name": "Marvin"},
    )
    task = JobTaskSettings(
        task_key="prefect-task",
        new_cluster=new_cluster,
        notebook_task=notebook_task,
    )

    notebook_outputs = await jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job",
        tasks=[task],
    )
    return notebook_outputs


if __name__ == "__main__":
    asyncio.run(run_notebook())
Instead of the typed models, you can pass equivalent JSON. For example, AutoScale(min_workers=1, max_workers=2) is the same as {"min_workers": 1, "max_workers": 2}.

List and inspect jobs

For finer-grained control, prefect_databricks.jobs wraps individual Databricks Jobs REST endpoints as async tasks (jobs_list, jobs_get, jobs_runs_get, and more). Call them from within a flow:
import asyncio

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.jobs import jobs_list


@flow
async def list_databricks_jobs():
    databricks_credentials = await DatabricksCredentials.load("databricks")
    return await jobs_list(databricks_credentials, limit=5)


if __name__ == "__main__":
    asyncio.run(list_databricks_jobs())

Resources

For assistance using Databricks, consult the Databricks documentation. Refer to the prefect-databricks SDK reference for the full list of credentials options, flows, and job tasks.