prefect.server.models.workers

Functions for interacting with worker ORM objects. Intended for internal use by the Prefect REST API.

Functions

create_work_pool

create_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool: Union[schemas.core.WorkPool, schemas.actions.WorkPoolCreate]) -> orm_models.WorkPool
Creates a work pool. If a WorkPool with the same name exists, an error will be thrown. Args:
  • session: a database session
  • work_pool: a WorkPool model
Returns:
  • orm_models.WorkPool: the newly-created WorkPool

read_work_pool

read_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> Optional[orm_models.WorkPool]
Reads a WorkPool by id. Args:
  • session: A database session
  • work_pool_id: a WorkPool id
Returns:
  • orm_models.WorkPool: the WorkPool

read_work_pool_by_name

read_work_pool_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str) -> Optional[orm_models.WorkPool]
Reads a WorkPool by name. Args:
  • session: A database session
  • work_pool_name: a WorkPool name
Returns:
  • orm_models.WorkPool: the WorkPool

read_work_pools

read_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[orm_models.WorkPool]
Read worker configs. Args:
  • session: A database session
  • offset: Query offset
  • limit: Query limit
Returns: List[orm_models.WorkPool]: worker configs

count_work_pools

count_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None) -> int
Read worker configs. Args:
  • session: A database session
  • work_pool_filter: filter criteria to apply to the count
Returns: int: the count of work pools matching the criteria

update_work_pool

update_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_pool: schemas.actions.WorkPoolUpdate, emit_status_change: Optional[Callable[[UUID, DateTime, orm_models.WorkPool, orm_models.WorkPool], Awaitable[None]]] = None) -> bool
Update a WorkPool by id. Args:
  • session: A database session
  • work_pool_id: a WorkPool id
  • worker: the work queue data
  • emit_status_change: function to call when work pool status is changed
Returns:
  • whether or not the worker was updated

delete_work_pool

delete_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> bool
Delete a WorkPool by id. Args:
  • session: A database session
  • work_pool_id: a work pool id
Returns:
  • whether or not the WorkPool was deleted

get_scheduled_flow_runs

get_scheduled_flow_runs(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Optional[List[UUID]] = None, work_queue_ids: Optional[List[UUID]] = None, scheduled_before: Optional[datetime.datetime] = None, scheduled_after: Optional[datetime.datetime] = None, limit: Optional[int] = None, respect_queue_priorities: Optional[bool] = None) -> Sequence[schemas.responses.WorkerFlowRunResponse]
Get runs from queues in a specific work pool. Args:
  • session: a database session
  • work_pool_ids: a list of work pool ids
  • work_queue_ids: a list of work pool queue ids
  • scheduled_before: a datetime to filter runs scheduled before
  • scheduled_after: a datetime to filter runs scheduled after
  • respect_queue_priorities: whether or not to respect queue priorities
  • limit: the maximum number of runs to return
  • db: a database interface
Returns:
  • List[WorkerFlowRunResponse]: the runs, as well as related work pool details

create_work_queue

create_work_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue: schemas.actions.WorkQueueCreate) -> orm_models.WorkQueue
Creates a work pool queue. Args:
  • session: a database session
  • work_pool_id: a work pool id
  • work_queue: a WorkQueue action model
Returns:
  • orm_models.WorkQueue: the newly-created WorkQueue

bulk_update_work_queue_priorities

bulk_update_work_queue_priorities(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, new_priorities: Dict[UUID, int]) -> None
This is a brute force update of all work pool queue priorities for a given work pool. It loads all queues fully into memory, sorts them, and flushes the update to the orm_models. The algorithm ensures that priorities are unique integers > 0, and makes the minimum number of changes required to satisfy the provided new_priorities. For example, if no queues currently have the provided new_priorities, then they are assigned without affecting other queues. If they are held by other queues, then those queues’ priorities are incremented as necessary. Updating queue priorities is not a common operation (happens on the same scale as queue modification, which is significantly less than reading from queues), so while this implementation is slow, it may suffice and make up for that with extreme simplicity.

read_work_queues

read_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[orm_models.WorkQueue]
Read all work pool queues for a work pool. Results are ordered by ascending priority. Args:
  • session: a database session
  • work_pool_id: a work pool id
  • work_queue_filter: Filter criteria for work pool queues
  • offset: Query offset
  • limit: Query limit
Returns:
  • List[orm_models.WorkQueue]: the WorkQueues

read_work_queue

read_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: Union[UUID, PrefectUUID]) -> Optional[orm_models.WorkQueue]
Read a specific work pool queue. Args:
  • session: a database session
  • work_queue_id: a work pool queue id
Returns:
  • orm_models.WorkQueue: the WorkQueue

read_work_queue_by_name

read_work_queue_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str, work_queue_name: str) -> Optional[orm_models.WorkQueue]
Reads a WorkQueue by name. Args:
  • session: A database session
  • work_pool_name: a WorkPool name
  • work_queue_name: a WorkQueue name
Returns:
  • orm_models.WorkQueue: the WorkQueue

update_work_queue

update_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, work_queue: schemas.actions.WorkQueueUpdate, emit_status_change: Optional[Callable[[orm_models.WorkQueue], Awaitable[None]]] = None, default_status: WorkQueueStatus = WorkQueueStatus.NOT_READY) -> bool
Update a work pool queue. Args:
  • session: a database session
  • work_queue_id: a work pool queue ID
  • work_queue: a WorkQueue model
  • emit_status_change: function to call when work queue status is changed
Returns:
  • whether or not the WorkQueue was updated

delete_work_queue

delete_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> bool
Delete a work pool queue. Args:
  • session: a database session
  • work_queue_id: a work pool queue ID
Returns:
  • whether or not the WorkQueue was deleted

read_workers

read_workers(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_filter: Optional[schemas.filters.WorkerFilter] = None, limit: Optional[int] = None, offset: Optional[int] = None) -> Sequence[orm_models.Worker]

worker_heartbeat

worker_heartbeat(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str, heartbeat_interval_seconds: Optional[int] = None) -> bool
Record a worker process heartbeat. Args:
  • session: a database session
  • work_pool_id: a work pool ID
  • worker_name: a worker name
Returns:
  • whether or not the worker was updated

delete_worker

delete_worker(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str) -> bool
Delete a work pool’s worker. Args:
  • session: a database session
  • work_pool_id: a work pool ID
  • worker_name: a worker name
Returns:
  • whether or not the Worker was deleted

emit_work_pool_status_event

emit_work_pool_status_event(event_id: UUID, occurred: DateTime, pre_update_work_pool: Optional[orm_models.WorkPool], work_pool: orm_models.WorkPool) -> None