> ## Documentation Index
> Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
> Use this file to discover all available pages before exploring further.

# workers

# `prefect.server.models.workers`

Functions for interacting with worker ORM objects.
Intended for internal use by the Prefect REST API.

## Functions

### `create_work_pool` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L53" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
create_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool: Union[schemas.core.WorkPool, schemas.actions.WorkPoolCreate]) -> orm_models.WorkPool
```

Creates a work pool.

If a WorkPool with the same name exists, an error will be thrown.

**Args:**

* `session`: a database session
* `work_pool`: a WorkPool model

**Returns:**

* orm\_models.WorkPool: the newly-created WorkPool

### `read_work_pool` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L100" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
read_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> Optional[orm_models.WorkPool]
```

Reads a WorkPool by id.

**Args:**

* `session`: A database session
* `work_pool_id`: a WorkPool id

**Returns:**

* orm\_models.WorkPool: the WorkPool

### `read_work_pool_by_name` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L119" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
read_work_pool_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str) -> Optional[orm_models.WorkPool]
```

Reads a WorkPool by name.

**Args:**

* `session`: A database session
* `work_pool_name`: a WorkPool name

**Returns:**

* orm\_models.WorkPool: the WorkPool

### `read_work_pools` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L138" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
read_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[orm_models.WorkPool]
```

Read worker configs.

**Args:**

* `session`: A database session
* `offset`: Query offset
* `limit`: Query limit

Returns:
List\[orm\_models.WorkPool]: worker configs

### `count_work_pools` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L170" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None) -> int
```

Read worker configs.

**Args:**

* `session`: A database session
* `work_pool_filter`: filter criteria to apply to the count

Returns:
int: the count of work pools matching the criteria

### `count_work_pool_active_slots` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L203" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_pool_active_slots(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> int
```

Count flow runs in slot-occupying states (Pending, Running) for a given
work pool. Does not filter on queue pause status — paused queues may
still have running/pending runs consuming resources. This matches the
behavior of count\_work\_pool\_slot\_holders / get\_work\_pool\_slot\_holders.

### `count_work_pool_active_slots_bulk` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L228" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_pool_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Sequence[UUID]) -> dict[UUID, int]
```

Count active slots for multiple work pools in a single query.
Returns a mapping of work\_pool\_id -> active slot count.
Does not filter on queue pause status (see count\_work\_pool\_active\_slots).

### `count_work_queue_active_slots` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L259" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_queue_active_slots(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> int
```

Count flow runs in slot-occupying states (Pending, Running) for a given
work queue under a work pool. Counts by work\_queue\_id FK only.

### `count_work_queue_active_slots_bulk` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L281" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_queue_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_queue_ids: Sequence[UUID]) -> dict[UUID, int]
```

Count active slots for multiple work queues in a single query.
Returns a mapping of work\_queue\_id -> active slot count.

### `update_work_pool` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L309" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
update_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_pool: schemas.actions.WorkPoolUpdate, emit_status_change: Optional[Callable[[UUID, DateTime, orm_models.WorkPool, orm_models.WorkPool], Awaitable[None]]] = None, emit_update_event: bool = True) -> bool
```

Update a WorkPool by id.

**Args:**

* `session`: A database session
* `work_pool_id`: a WorkPool id
* `worker`: the work queue data
* `emit_status_change`: function to call when work pool
  status is changed
* `emit_update_event`: whether to emit an event for updated non-status fields

**Returns:**

* whether or not the worker was updated

### `delete_work_pool` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L432" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
delete_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> bool
```

Delete a WorkPool by id.

**Args:**

* `session`: A database session
* `work_pool_id`: a work pool id

**Returns:**

* whether or not the WorkPool was deleted

### `get_scheduled_flow_runs` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L466" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
get_scheduled_flow_runs(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Optional[List[UUID]] = None, work_queue_ids: Optional[List[UUID]] = None, scheduled_before: Optional[datetime.datetime] = None, scheduled_after: Optional[datetime.datetime] = None, limit: Optional[int] = None, respect_queue_priorities: Optional[bool] = None) -> Sequence[schemas.responses.WorkerFlowRunResponse]
```

Get runs from queues in a specific work pool.

**Args:**

* `session`: a database session
* `work_pool_ids`: a list of work pool ids
* `work_queue_ids`: a list of work pool queue ids
* `scheduled_before`: a datetime to filter runs scheduled before
* `scheduled_after`: a datetime to filter runs scheduled after
* `respect_queue_priorities`: whether or not to respect queue priorities
* `limit`: the maximum number of runs to return
* `db`: a database interface

**Returns:**

* List\[WorkerFlowRunResponse]: the runs, as well as related work pool details

### `create_work_queue` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L518" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
create_work_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue: schemas.actions.WorkQueueCreate) -> orm_models.WorkQueue
```

Creates a work pool queue.

**Args:**

* `session`: a database session
* `work_pool_id`: a work pool id
* `work_queue`: a WorkQueue action model

**Returns:**

* orm\_models.WorkQueue: the newly-created WorkQueue

### `bulk_update_work_queue_priorities` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L583" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
bulk_update_work_queue_priorities(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, new_priorities: Dict[UUID, int]) -> None
```

This is a brute force update of all work pool queue priorities for a given work
pool.

It loads all queues fully into memory, sorts them, and flushes the update to
the orm\_models. The algorithm ensures that priorities are unique integers > 0, and
makes the minimum number of changes required to satisfy the provided
`new_priorities`. For example, if no queues currently have the provided
`new_priorities`, then they are assigned without affecting other queues. If
they are held by other queues, then those queues' priorities are
incremented as necessary.

Updating queue priorities is not a common operation (happens on the same scale as
queue modification, which is significantly less than reading from queues),
so while this implementation is slow, it may suffice and make up for that
with extreme simplicity.

### `read_work_queues` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L649" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
read_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[orm_models.WorkQueue]
```

Read all work pool queues for a work pool. Results are ordered by ascending priority.

**Args:**

* `session`: a database session
* `work_pool_id`: a work pool id
* `work_queue_filter`: Filter criteria for work pool queues
* `offset`: Query offset
* `limit`: Query limit

**Returns:**

* List\[orm\_models.WorkQueue]: the WorkQueues

### `count_work_queues` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L690" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> int
```

Count work pool queues for a work pool.

### `read_work_queue` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L709" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
read_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: Union[UUID, PrefectUUID]) -> Optional[orm_models.WorkQueue]
```

Read a specific work pool queue.

**Args:**

* `session`: a database session
* `work_queue_id`: a work pool queue id

**Returns:**

* orm\_models.WorkQueue: the WorkQueue

### `read_work_queue_by_name` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L729" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
read_work_queue_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str, work_queue_name: str) -> Optional[orm_models.WorkQueue]
```

Reads a WorkQueue by name.

**Args:**

* `session`: A database session
* `work_pool_name`: a WorkPool name
* `work_queue_name`: a WorkQueue name

**Returns:**

* orm\_models.WorkQueue: the WorkQueue

### `update_work_queue` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L763" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
update_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, work_queue: schemas.actions.WorkQueueUpdate, emit_status_change: Optional[Callable[[orm_models.WorkQueue], Awaitable[None]]] = None, default_status: WorkQueueStatus = WorkQueueStatus.NOT_READY) -> bool
```

Update a work pool queue.

**Args:**

* `session`: a database session
* `work_queue_id`: a work pool queue ID
* `work_queue`: a WorkQueue model
* `emit_status_change`: function to call when work queue
  status is changed

**Returns:**

* whether or not the WorkQueue was updated

### `delete_work_queue` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L887" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
delete_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> bool
```

Delete a work pool queue.

**Args:**

* `session`: a database session
* `work_queue_id`: a work pool queue ID

**Returns:**

* whether or not the WorkQueue was deleted

### `read_workers` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L942" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
read_workers(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_filter: Optional[schemas.filters.WorkerFilter] = None, limit: Optional[int] = None, offset: Optional[int] = None) -> Sequence[orm_models.Worker]
```

### `read_worker_by_name` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L971" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
read_worker_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str) -> Optional[orm_models.Worker]
```

### `worker_heartbeat` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L990" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
worker_heartbeat(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str, heartbeat_interval_seconds: Optional[int] = None) -> bool
```

Record a worker process heartbeat.

**Args:**

* `session`: a database session
* `work_pool_id`: a work pool ID
* `worker_name`: a worker name

**Returns:**

* whether or not the worker was updated

### `record_worker_heartbeat` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1039" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
record_worker_heartbeat(session: AsyncSession, work_pool: orm_models.WorkPool, worker_name: str, heartbeat_interval_seconds: Optional[int] = None, emit_status_change: Optional[Callable[[UUID, DateTime, orm_models.WorkPool, orm_models.WorkPool], Awaitable[None]]] = None, return_worker: bool = False) -> Optional[orm_models.Worker]
```

### `delete_worker` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1082" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
delete_worker(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str) -> bool
```

Delete a work pool's worker.

**Args:**

* `session`: a database session
* `work_pool_id`: a work pool ID
* `worker_name`: a worker name

**Returns:**

* whether or not the Worker was deleted

### `count_work_pool_slot_holders` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1187" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_pool_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> int
```

Counts flow runs in slot-occupying states for a work pool.

### `get_work_pool_slot_holders` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1204" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
get_work_pool_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_ids: Optional[List[UUID]] = None, flow_run_limit: Optional[int] = None) -> Sequence[tuple[orm_models.FlowRun, Optional[DateTime]]]
```

Returns flow runs in slot-occupying states for a work pool.

Each result is a tuple of (FlowRun, slot\_acquired\_at) where
slot\_acquired\_at is when the current slot-occupying sequence began.

**Args:**

* `work_pool_id`: The work pool to query.
* `work_queue_ids`: If provided, only return runs for these queues.
* `flow_run_limit`: If provided, cap results per work\_queue\_id.

### `count_work_pool_slot_holders_by_queue` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1257" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_pool_slot_holders_by_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> dict[UUID, int]
```

Returns `{work_queue_id: count}` for slot-holding runs in a pool.

### `count_work_queue_slot_holders` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1290" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
count_work_queue_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> int
```

Counts flow runs in slot-occupying states for a single work queue.

### `get_work_queue_slot_holders` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1311" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
get_work_queue_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[tuple[orm_models.FlowRun, Optional[DateTime]]]
```

Returns flow runs in slot-occupying states for a single work queue.

Each result is a tuple of (FlowRun, slot\_acquired\_at) where
slot\_acquired\_at is when the current slot-occupying sequence began.

### `emit_work_pool_updated_event` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1345" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
emit_work_pool_updated_event(session: AsyncSession, work_pool: orm_models.WorkPool, changed_fields: Dict[str, Dict[str, Any]]) -> None
```

Emit an event when work pool fields are updated.

### `emit_work_pool_created_event` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1365" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
emit_work_pool_created_event(work_pool: orm_models.WorkPool) -> None
```

Emit an event when a work pool is created.

### `emit_work_pool_deleted_event` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1373" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
emit_work_pool_deleted_event(work_pool: orm_models.WorkPool) -> None
```

Emit an event when a work pool is deleted.

### `emit_work_pool_status_event` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1381" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
emit_work_pool_status_event(event_id: UUID, occurred: DateTime, pre_update_work_pool: Optional[orm_models.WorkPool], work_pool: orm_models.WorkPool) -> None
```
