prefect.server.models.deployments

Functions for interacting with deployment ORM objects. Intended for internal use by the Prefect REST API.

Functions

create_deployment

create_deployment(db: PrefectDBInterface, session: AsyncSession, deployment: schemas.core.Deployment | schemas.actions.DeploymentCreate) -> Optional[orm_models.Deployment]
Upserts a deployment. Args:
  • session: a database session
  • deployment: a deployment model
Returns:
  • orm_models.Deployment: the newly-created or updated deployment

update_deployment

update_deployment(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID, deployment: schemas.actions.DeploymentUpdate) -> bool
Updates a deployment. Args:
  • session: a database session
  • deployment_id: the ID of the deployment to modify
  • deployment: changes to a deployment model
Returns:
  • whether the deployment was updated

read_deployment

read_deployment(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID) -> Optional[orm_models.Deployment]
Reads a deployment by id. Args:
  • session: A database session
  • deployment_id: a deployment id
Returns:
  • orm_models.Deployment: the deployment

read_deployment_by_name

read_deployment_by_name(db: PrefectDBInterface, session: AsyncSession, name: str, flow_name: str) -> Optional[orm_models.Deployment]
Reads a deployment by name. Args:
  • session: A database session
  • name: a deployment name
  • flow_name: the name of the flow the deployment belongs to
Returns:
  • orm_models.Deployment: the deployment

read_deployments

read_deployments(db: PrefectDBInterface, session: AsyncSession, offset: Optional[int] = None, limit: Optional[int] = None, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, sort: schemas.sorting.DeploymentSort = schemas.sorting.DeploymentSort.NAME_ASC) -> Sequence[orm_models.Deployment]
Read deployments. Args:
  • session: A database session
  • offset: Query offset
  • limit: Query limit
  • flow_filter: only select deployments whose flows match these criteria
  • flow_run_filter: only select deployments whose flow runs match these criteria
  • task_run_filter: only select deployments whose task runs match these criteria
  • deployment_filter: only select deployment that match these filters
  • work_pool_filter: only select deployments whose work pools match these criteria
  • work_queue_filter: only select deployments whose work pool queues match these criteria
  • sort: the sort criteria for selected deployments. Defaults to name ASC.
Returns:
  • list[orm_models.Deployment]: deployments

count_deployments

count_deployments(db: PrefectDBInterface, session: AsyncSession, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> int
Count deployments. Args:
  • session: A database session
  • flow_filter: only count deployments whose flows match these criteria
  • flow_run_filter: only count deployments whose flow runs match these criteria
  • task_run_filter: only count deployments whose task runs match these criteria
  • deployment_filter: only count deployment that match these filters
  • work_pool_filter: only count deployments that match these work pool filters
  • work_queue_filter: only count deployments that match these work pool queue filters
Returns:
  • the number of deployments matching filters

delete_deployment

delete_deployment(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID) -> bool
Delete a deployment by id. Args:
  • session: A database session
  • deployment_id: a deployment id
Returns:
  • whether or not the deployment was deleted

schedule_runs

schedule_runs(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, min_time: Optional[datetime.timedelta] = None, min_runs: Optional[int] = None, max_runs: Optional[int] = None, auto_scheduled: bool = True) -> Sequence[UUID]
Schedule flow runs for a deployment Args:
  • session: a database session
  • deployment_id: the id of the deployment to schedule
  • start_time: the time from which to start scheduling runs
  • end_time: runs will be scheduled until at most this time
  • min_time: runs will be scheduled until at least this far in the future
  • min_runs: a minimum amount of runs to schedule
  • max_runs: a maximum amount of runs to schedule
This function will generate the minimum number of runs that satisfy the min and max times, and the min and max counts. Specifically, the following order will be respected.
  • Runs will be generated starting on or after the start_time
  • No more than max_runs runs will be generated
  • No runs will be generated after end_time is reached
  • At least min_runs runs will be generated
  • Runs will be generated until at least start_time + min_time is reached
Returns:
  • a list of flow run ids scheduled for the deployment

check_work_queues_for_deployment

check_work_queues_for_deployment(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID) -> Sequence[orm_models.WorkQueue]
Get work queues that can pick up the specified deployment. Work queues will pick up a deployment when all of the following are met.
  • The deployment has ALL tags that the work queue has (i.e. the work queue’s tags must be a subset of the deployment’s tags).
  • The work queue’s specified deployment IDs match the deployment’s ID, or the work queue does NOT have specified deployment IDs.
  • The work queue’s specified flow runners match the deployment’s flow runner or the work queue does NOT have a specified flow runner.
Notes on the query:
  • Our database currently allows either “null” and empty lists as null values in filters, so we need to catch both cases with “or”.
  • A.contains(B) should be interpreted as “True if A contains B”.
Returns:
  • List[orm_models.WorkQueue]: WorkQueues

create_deployment_schedules

create_deployment_schedules(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID, schedules: list[schemas.actions.DeploymentScheduleCreate]) -> list[schemas.core.DeploymentSchedule]
Creates a deployment’s schedules. Args:
  • session: A database session
  • deployment_id: a deployment id
  • schedules: a list of deployment schedule create actions

read_deployment_schedules

read_deployment_schedules(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID, deployment_schedule_filter: Optional[schemas.filters.DeploymentScheduleFilter] = None) -> list[schemas.core.DeploymentSchedule]
Reads a deployment’s schedules. Args:
  • session: A database session
  • deployment_id: a deployment id
Returns:
  • list[schemas.core.DeploymentSchedule]: the deployment’s schedules

update_deployment_schedule

update_deployment_schedule(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID, schedule: schemas.actions.DeploymentScheduleUpdate, deployment_schedule_id: UUID | None = None, deployment_schedule_slug: str | None = None) -> bool
Updates a deployment’s schedules. Args:
  • session: A database session
  • deployment_schedule_id: a deployment schedule id
  • schedule: a deployment schedule update action

delete_schedules_for_deployment

delete_schedules_for_deployment(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID) -> bool
Deletes a deployment schedule. Args:
  • session: A database session
  • deployment_id: a deployment id

delete_deployment_schedule

delete_deployment_schedule(db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID, deployment_schedule_id: UUID) -> bool
Deletes a deployment schedule. Args:
  • session: A database session
  • deployment_schedule_id: a deployment schedule id

mark_deployments_ready

mark_deployments_ready(db: PrefectDBInterface, deployment_ids: Optional[Iterable[UUID]] = None, work_queue_ids: Optional[Iterable[UUID]] = None) -> None

mark_deployments_not_ready

mark_deployments_not_ready(db: PrefectDBInterface, deployment_ids: Optional[Iterable[UUID]] = None, work_queue_ids: Optional[Iterable[UUID]] = None) -> None

with_system_labels_for_deployment

with_system_labels_for_deployment(session: AsyncSession, deployment: schemas.core.Deployment) -> schemas.core.KeyValueLabels
Augment user supplied labels with system default labels for a deployment.

with_system_labels_for_deployment_flow_run

with_system_labels_for_deployment_flow_run(session: AsyncSession, deployment: orm_models.Deployment, user_supplied_labels: Optional[schemas.core.KeyValueLabels] = None) -> schemas.core.KeyValueLabels
Generate system labels for a flow run created from a deployment. Args:
  • session: Database session
  • deployment: The deployment the flow run is created from
  • user_supplied_labels: Optional user-supplied labels to include
Returns:
  • Complete set of labels for the flow run