from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint
@flow
def create_job_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
result = call_dbt_cloud_administrative_api_endpoint(
dbt_cloud_credentials=credentials,
path="/jobs/",
http_method="POST",
json={
"id": None,
"account_id": 123456789,
"project_id": 100,
"environment_id": 10,
"name": "Nightly run",
"dbt_version": None,
"triggers": {"github_webhook": True, "schedule": True},
"execute_steps": ["dbt run", "dbt test", "dbt source snapshot-freshness"],
"settings": {"threads": 4, "target_name": "prod"},
"state": 1,
"schedule": {
"date": {"type": "every_day"},
"time": {"type": "every_hour", "interval": 1},
},
},
)
return result["data"]
create_job_flow()