Documentation Index Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
Use this file to discover all available pages before exploring further.
prefect.server.models.workers
Functions for interacting with worker ORM objects.
Intended for internal use by the Prefect REST API.
Functions
create_work_pool
create_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool: Union[schemas.core.WorkPool, schemas.actions.WorkPoolCreate]) -> orm_models.WorkPool
Creates a work pool.
If a WorkPool with the same name exists, an error will be thrown.
Args:
session: a database session
work_pool: a WorkPool model
Returns:
orm_models.WorkPool: the newly-created WorkPool
read_work_pool
read_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID ) -> Optional[orm_models.WorkPool]
Reads a WorkPool by id.
Args:
session: A database session
work_pool_id: a WorkPool id
Returns:
orm_models.WorkPool: the WorkPool
read_work_pool_by_name
read_work_pool_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str ) -> Optional[orm_models.WorkPool]
Reads a WorkPool by name.
Args:
session: A database session
work_pool_name: a WorkPool name
Returns:
orm_models.WorkPool: the WorkPool
read_work_pools
read_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None , offset: Optional[ int ] = None , limit: Optional[ int ] = None ) -> Sequence[orm_models.WorkPool]
Read worker configs.
Args:
session: A database session
offset: Query offset
limit: Query limit
Returns:
List[orm_models.WorkPool]: worker configs
count_work_pools
count_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None ) -> int
Read worker configs.
Args:
session: A database session
work_pool_filter: filter criteria to apply to the count
Returns:
int: the count of work pools matching the criteria
count_work_pool_active_slots
count_work_pool_active_slots(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID ) -> int
Count flow runs in slot-occupying states (Pending, Running) for a given
work pool. Does not filter on queue pause status — paused queues may
still have running/pending runs consuming resources. This matches the
behavior of count_work_pool_slot_holders / get_work_pool_slot_holders.
count_work_pool_active_slots_bulk
count_work_pool_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Sequence[ UUID ]) -> dict[ UUID , int ]
Count active slots for multiple work pools in a single query.
Returns a mapping of work_pool_id -> active slot count.
Does not filter on queue pause status (see count_work_pool_active_slots).
count_work_queue_active_slots
count_work_queue_active_slots(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID ) -> int
Count flow runs in slot-occupying states (Pending, Running) for a given
work queue under a work pool. Counts by work_queue_id FK only.
count_work_queue_active_slots_bulk
count_work_queue_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_queue_ids: Sequence[ UUID ]) -> dict[ UUID , int ]
Count active slots for multiple work queues in a single query.
Returns a mapping of work_queue_id -> active slot count.
update_work_pool
update_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , work_pool: schemas.actions.WorkPoolUpdate, emit_status_change: Optional[Callable[[ UUID , DateTime, orm_models.WorkPool, orm_models.WorkPool], Awaitable[ None ]]] = None ) -> bool
Update a WorkPool by id.
Args:
session: A database session
work_pool_id: a WorkPool id
worker: the work queue data
emit_status_change: function to call when work pool
status is changed
Returns:
whether or not the worker was updated
delete_work_pool
delete_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID ) -> bool
Delete a WorkPool by id.
Args:
session: A database session
work_pool_id: a work pool id
Returns:
whether or not the WorkPool was deleted
get_scheduled_flow_runs
get_scheduled_flow_runs(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Optional[List[ UUID ]] = None , work_queue_ids: Optional[List[ UUID ]] = None , scheduled_before: Optional[datetime.datetime] = None , scheduled_after: Optional[datetime.datetime] = None , limit: Optional[ int ] = None , respect_queue_priorities: Optional[ bool ] = None ) -> Sequence[schemas.responses.WorkerFlowRunResponse]
Get runs from queues in a specific work pool.
Args:
session: a database session
work_pool_ids: a list of work pool ids
work_queue_ids: a list of work pool queue ids
scheduled_before: a datetime to filter runs scheduled before
scheduled_after: a datetime to filter runs scheduled after
respect_queue_priorities: whether or not to respect queue priorities
limit: the maximum number of runs to return
db: a database interface
Returns:
List[WorkerFlowRunResponse]: the runs, as well as related work pool details
create_work_queue
create_work_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , work_queue: schemas.actions.WorkQueueCreate) -> orm_models.WorkQueue
Creates a work pool queue.
Args:
session: a database session
work_pool_id: a work pool id
work_queue: a WorkQueue action model
Returns:
orm_models.WorkQueue: the newly-created WorkQueue
bulk_update_work_queue_priorities
bulk_update_work_queue_priorities(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , new_priorities: Dict[ UUID , int ]) -> None
This is a brute force update of all work pool queue priorities for a given work
pool.
It loads all queues fully into memory, sorts them, and flushes the update to
the orm_models. The algorithm ensures that priorities are unique integers > 0, and
makes the minimum number of changes required to satisfy the provided
new_priorities. For example, if no queues currently have the provided
new_priorities, then they are assigned without affecting other queues. If
they are held by other queues, then those queues’ priorities are
incremented as necessary.
Updating queue priorities is not a common operation (happens on the same scale as
queue modification, which is significantly less than reading from queues),
so while this implementation is slow, it may suffice and make up for that
with extreme simplicity.
read_work_queues
read_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None , offset: Optional[ int ] = None , limit: Optional[ int ] = None ) -> Sequence[orm_models.WorkQueue]
Read all work pool queues for a work pool. Results are ordered by ascending priority.
Args:
session: a database session
work_pool_id: a work pool id
work_queue_filter: Filter criteria for work pool queues
offset: Query offset
limit: Query limit
Returns:
List[orm_models.WorkQueue]: the WorkQueues
count_work_queues
count_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None ) -> int
Count work pool queues for a work pool.
read_work_queue
read_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: Union[ UUID , PrefectUUID]) -> Optional[orm_models.WorkQueue]
Read a specific work pool queue.
Args:
session: a database session
work_queue_id: a work pool queue id
Returns:
orm_models.WorkQueue: the WorkQueue
read_work_queue_by_name
read_work_queue_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str , work_queue_name: str ) -> Optional[orm_models.WorkQueue]
Reads a WorkQueue by name.
Args:
session: A database session
work_pool_name: a WorkPool name
work_queue_name: a WorkQueue name
Returns:
orm_models.WorkQueue: the WorkQueue
update_work_queue
update_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID , work_queue: schemas.actions.WorkQueueUpdate, emit_status_change: Optional[Callable[[orm_models.WorkQueue], Awaitable[ None ]]] = None , default_status: WorkQueueStatus = WorkQueueStatus. NOT_READY ) -> bool
Update a work pool queue.
Args:
session: a database session
work_queue_id: a work pool queue ID
work_queue: a WorkQueue model
emit_status_change: function to call when work queue
status is changed
Returns:
whether or not the WorkQueue was updated
delete_work_queue
delete_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID ) -> bool
Delete a work pool queue.
Args:
session: a database session
work_queue_id: a work pool queue ID
Returns:
whether or not the WorkQueue was deleted
read_workers
read_workers(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , worker_filter: Optional[schemas.filters.WorkerFilter] = None , limit: Optional[ int ] = None , offset: Optional[ int ] = None ) -> Sequence[orm_models.Worker]
worker_heartbeat
worker_heartbeat(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , worker_name: str , heartbeat_interval_seconds: Optional[ int ] = None ) -> bool
Record a worker process heartbeat.
Args:
session: a database session
work_pool_id: a work pool ID
worker_name: a worker name
Returns:
whether or not the worker was updated
delete_worker
delete_worker(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , worker_name: str ) -> bool
Delete a work pool’s worker.
Args:
session: a database session
work_pool_id: a work pool ID
worker_name: a worker name
Returns:
whether or not the Worker was deleted
count_work_pool_slot_holders
count_work_pool_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID ) -> int
Counts flow runs in slot-occupying states for a work pool.
get_work_pool_slot_holders
get_work_pool_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID , work_queue_ids: Optional[List[ UUID ]] = None , flow_run_limit: Optional[ int ] = None ) -> Sequence[tuple[orm_models.FlowRun, Optional[DateTime]]]
Returns flow runs in slot-occupying states for a work pool.
Each result is a tuple of (FlowRun, slot_acquired_at) where
slot_acquired_at is when the current slot-occupying sequence began.
Args:
work_pool_id: The work pool to query.
work_queue_ids: If provided, only return runs for these queues.
flow_run_limit: If provided, cap results per work_queue_id.
count_work_pool_slot_holders_by_queue
count_work_pool_slot_holders_by_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID ) -> dict[ UUID , int ]
Returns {work_queue_id: count} for slot-holding runs in a pool.
count_work_queue_slot_holders
count_work_queue_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID ) -> int
Counts flow runs in slot-occupying states for a single work queue.
get_work_queue_slot_holders
get_work_queue_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID , offset: Optional[ int ] = None , limit: Optional[ int ] = None ) -> Sequence[tuple[orm_models.FlowRun, Optional[DateTime]]]
Returns flow runs in slot-occupying states for a single work queue.
Each result is a tuple of (FlowRun, slot_acquired_at) where
slot_acquired_at is when the current slot-occupying sequence began.
emit_work_pool_updated_event
emit_work_pool_updated_event(session: AsyncSession, work_pool: orm_models.WorkPool, changed_fields: Dict[ str , Dict[ str , Any]]) -> None
Emit an event when work pool fields are updated.
emit_work_pool_status_event
emit_work_pool_status_event(event_id: UUID , occurred: DateTime, pre_update_work_pool: Optional[orm_models.WorkPool], work_pool: orm_models.WorkPool) -> None