prefect.server.events.services.event_persister

The event persister moves event messages from the event bus to storage storage as fast as it can. Never gets tired.

Functions

batch_delete

batch_delete(session: AsyncSession, model: type[T], condition: Any, batch_size: int = 10000) -> int
Perform a batch deletion of database records using a subquery with LIMIT. Works with both PostgreSQL and SQLite. Compared to a basic delete(…).where(…), a batch deletion is more robust against timeouts when handling large tables, which is especially the case if we first delete old entries from long existing tables. Returns:
  • Total number of deleted records

create_handler

create_handler(batch_size: int = 20, flush_every: timedelta = timedelta(seconds=5), trim_every: timedelta = timedelta(minutes=15)) -> AsyncGenerator[MessageHandler, None]
Set up a message handler that will accumulate and send events to the database every batch_size messages, or every flush_every interval to flush any remaining messages

Classes

EventPersister

A service that persists events to the database as they arrive. Methods:

all_services

all_services(cls) -> Sequence[type[Self]]
Get list of all service classes

enabled

enabled(cls) -> bool
Whether the service is enabled

enabled_services

enabled_services(cls) -> list[type[Self]]
Get list of enabled service classes

environment_variable_name

environment_variable_name(cls) -> str

run_services

run_services(cls) -> NoReturn
Run enabled services until cancelled.

running

running(cls) -> AsyncGenerator[None, None]
A context manager that runs enabled services on entry and stops them on exit.

service_settings

service_settings(cls) -> ServicesBaseSetting

service_settings

service_settings(cls) -> ServicesBaseSetting
The Prefect setting that controls whether the service is enabled

start

start(self) -> NoReturn

start

start(self) -> NoReturn
Start running the service, which may run indefinitely

started_event

started_event(self) -> asyncio.Event

started_event

started_event(self, value: asyncio.Event) -> None

stop

stop(self) -> None

stop

stop(self) -> None
Stop the service