Prefect integrations for interacting with Databricks.
Getting started
Prerequisites
Install prefect-databricks
The following command will install a version of prefect-databricks
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
pip install "prefect[databricks]"
Upgrade to the latest versions of prefect
and prefect-databricks
:
pip install -U "prefect[databricks]"
List jobs on the Databricks instance
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.jobs import jobs_list
@flow
def example_execute_endpoint_flow():
databricks_credentials = DatabricksCredentials.load("my-block")
jobs = jobs_list(
databricks_credentials,
limit=5
)
return jobs
if __name__ == "__main__":
example_execute_endpoint_flow()
Use with_options
to customize options on any existing task or flow
custom_example_execute_endpoint_flow = example_execute_endpoint_flow.with_options(
name="My custom flow name",
retries=2,
retry_delay_seconds=10,
)
Launch a new cluster and run a Databricks notebook
Notebook named example.ipynb
on Databricks which accepts a name parameter:
name = dbutils.widgets.get("name")
message = f"Don't worry {name}, I got your request! Welcome to prefect-databricks!"
print(message)
Prefect flow that launches a new cluster to run example.ipynb
:
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.jobs import jobs_runs_submit
from prefect_databricks.models.jobs import (
AutoScale,
AwsAttributes,
JobTaskSettings,
NotebookTask,
NewCluster,
)
@flow
def jobs_runs_submit_flow(notebook_path, **base_parameters):
databricks_credentials = DatabricksCredentials.load("my-block")
aws_attributes = AwsAttributes(
availability="SPOT",
zone_id="us-west-2a",
ebs_volume_type="GENERAL_PURPOSE_SSD",
ebs_volume_count=3,
ebs_volume_size=100,
)
auto_scale = AutoScale(min_workers=1, max_workers=2)
new_cluster = NewCluster(
aws_attributes=aws_attributes,
autoscale=auto_scale,
node_type_id="m4.large",
spark_version="10.4.x-scala2.12",
spark_conf={"spark.speculation": True},
)
notebook_task = NotebookTask(
notebook_path=notebook_path,
base_parameters=base_parameters,
)
job_task_settings = JobTaskSettings(
new_cluster=new_cluster,
notebook_task=notebook_task,
task_key="prefect-task"
)
run = jobs_runs_submit(
databricks_credentials=databricks_credentials,
run_name="prefect-job",
tasks=[job_task_settings]
)
return run
if __name__ == "__main__":
jobs_runs_submit_flow("/Users/username@gmail.com/example.ipynb", name="Marvin")
Note, instead of using the built-in models, you may also input valid JSON. For example, AutoScale(min_workers=1, max_workers=2)
is equivalent to {"min_workers": 1, "max_workers": 2}
.
Resources
For assistance using Databricks, consult the Databricks documentation.
Refer to the prefect-databricks
SDK documentation linked in the sidebar to explore all the capabilities of the prefect-databricks
library.