prefect.server.api.deployments

Routes for interacting with Deployment objects.

Functions

create_deployment

create_deployment(deployment: schemas.actions.DeploymentCreate, response: Response, worker_lookups: WorkerLookups = Depends(WorkerLookups), created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), updated_by: Optional[schemas.core.UpdatedBy] = Depends(dependencies.get_updated_by), db: PrefectDBInterface = Depends(provide_database_interface)) -> schemas.responses.DeploymentResponse
Gracefully creates a new deployment from the provided schema. If a deployment with the same name and flow_id already exists, the deployment is updated. If the deployment has an active schedule, flow runs will be scheduled. When upserting, any scheduled runs from the existing deployment will be deleted. For more information, see https://docs.prefect.io/v3/deploy.

update_deployment

update_deployment(deployment: schemas.actions.DeploymentUpdate, deployment_id: UUID = Path(..., description='The deployment id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None

read_deployment_by_name

read_deployment_by_name(flow_name: str = Path(..., description='The name of the flow'), deployment_name: str = Path(..., description='The name of the deployment'), db: PrefectDBInterface = Depends(provide_database_interface)) -> schemas.responses.DeploymentResponse
Get a deployment using the name of the flow and the deployment.

read_deployment

read_deployment(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> schemas.responses.DeploymentResponse
Get a deployment by id.

read_deployments

read_deployments(limit: int = dependencies.LimitBody(), offset: int = Body(0, ge=0), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, sort: schemas.sorting.DeploymentSort = Body(schemas.sorting.DeploymentSort.NAME_ASC), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.DeploymentResponse]
Query for deployments.

paginate_deployments

paginate_deployments(limit: int = dependencies.LimitBody(), page: int = Body(1, ge=1), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, sort: schemas.sorting.DeploymentSort = Body(schemas.sorting.DeploymentSort.NAME_ASC), db: PrefectDBInterface = Depends(provide_database_interface)) -> DeploymentPaginationResponse
Pagination query for flow runs.

get_scheduled_flow_runs_for_deployments

get_scheduled_flow_runs_for_deployments(background_tasks: BackgroundTasks, deployment_ids: list[UUID] = Body(default=..., description='The deployment IDs to get scheduled runs for'), scheduled_before: DateTime = Body(None, description='The maximum time to look for scheduled flow runs'), limit: int = dependencies.LimitBody(), db: PrefectDBInterface = Depends(provide_database_interface)) -> list[schemas.responses.FlowRunResponse]
Get scheduled runs for a set of deployments. Used by a runner to poll for work.

count_deployments

count_deployments(flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> int
Count deployments.

delete_deployment

delete_deployment(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Delete a deployment by id.

schedule_deployment

schedule_deployment(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), start_time: datetime.datetime = Body(None, description='The earliest date to schedule'), end_time: datetime.datetime = Body(None, description='The latest date to schedule'), min_time: float = Body(None, description='Runs will be scheduled until at least this long after the `start_time`', json_schema_extra={'format': 'time-delta'}), min_runs: int = Body(None, description='The minimum number of runs to schedule'), max_runs: int = Body(None, description='The maximum number of runs to schedule'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Schedule runs for a deployment. For backfills, provide start/end times in the past. This function will generate the minimum number of runs that satisfy the min and max times, and the min and max counts. Specifically, the following order will be respected.
  • Runs will be generated starting on or after the start_time
  • No more than max_runs runs will be generated
  • No runs will be generated after end_time is reached
  • At least min_runs runs will be generated
  • Runs will be generated until at least start_time + min_time is reached

resume_deployment

resume_deployment(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Set a deployment schedule to active. Runs will be scheduled immediately.

pause_deployment

pause_deployment(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Set a deployment schedule to inactive. Any auto-scheduled runs still in a Scheduled state will be deleted.

create_flow_run_from_deployment

create_flow_run_from_deployment(flow_run: schemas.actions.DeploymentFlowRunCreate, deployment_id: UUID = Path(..., description='The deployment id', alias='id'), created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), db: PrefectDBInterface = Depends(provide_database_interface), worker_lookups: WorkerLookups = Depends(WorkerLookups), response: Response = None) -> schemas.responses.FlowRunResponse
Create a flow run from a deployment. Any parameters not provided will be inferred from the deployment’s parameters. If tags are not provided, the deployment’s tags will be used. If no state is provided, the flow run will be created in a SCHEDULED state.

work_queue_check_for_deployment

work_queue_check_for_deployment(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.core.WorkQueue]
Get list of work-queues that are able to pick up the specified deployment. This endpoint is intended to be used by the UI to provide users warnings about deployments that are unable to be executed because there are no work queues that will pick up their runs, based on existing filter criteria. It may be deprecated in the future because there is not a strict relationship between work queues and deployments.

read_deployment_schedules

read_deployment_schedules(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.core.DeploymentSchedule]

create_deployment_schedules

create_deployment_schedules(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), schedules: List[schemas.actions.DeploymentScheduleCreate] = Body(default=..., description='The schedules to create'), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.core.DeploymentSchedule]

update_deployment_schedule

update_deployment_schedule(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), schedule_id: UUID = Path(..., description='The schedule id', alias='schedule_id'), schedule: schemas.actions.DeploymentScheduleUpdate = Body(default=..., description='The updated schedule'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None

delete_deployment_schedule

delete_deployment_schedule(deployment_id: UUID = Path(..., description='The deployment id', alias='id'), schedule_id: UUID = Path(..., description='The schedule id', alias='schedule_id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None