Skip to main content

prefect_redis.ordering

Manages the partial causal ordering of events for a particular consumer. This module maintains a buffer of events to be processed, aiming to process them in the order they occurred causally.

Classes

EventProcessingCompletion

Holds the result of completing event processing, including any followers.

CausalOrdering

Methods:

complete_event_and_get_followers

complete_event_and_get_followers(self, event: ReceivedEvent) -> list[ReceivedEvent]
Atomically marks the event as seen, retrieves any waiting followers, and releases the processing lock. This operation is atomic to prevent a race condition where a follower could park itself between the lock release and the followers check.

event_has_been_seen

event_has_been_seen(self, event: Union[UUID, Event]) -> bool

event_has_started_processing

event_has_started_processing(self, event: Union[UUID, Event]) -> bool

event_is_processing

event_is_processing(self, event: ReceivedEvent) -> AsyncGenerator[EventProcessingCompletion, None]
Mark an event as being processed for the duration of its lifespan through the ordering system. Yields an EventProcessingCompletion object that will be populated with any followers after successful processing.

followers_by_id

followers_by_id(self, follower_ids: list[UUID]) -> list[ReceivedEvent]
Returns the events with the given IDs, in the order they occurred

forget_event_is_processing

forget_event_is_processing(self, event: ReceivedEvent) -> None

forget_follower

forget_follower(self, follower: ReceivedEvent)
Forget that this event is waiting on another event to arrive

get_followers

get_followers(self, leader: ReceivedEvent) -> list[ReceivedEvent]
Returns events that were waiting on this leader event to arrive

get_lost_followers

get_lost_followers(self) -> list[ReceivedEvent]
Returns events that were waiting on a leader event that never arrived

preceding_event_confirmed

preceding_event_confirmed(self, handler: event_handler, event: ReceivedEvent, depth: int = 0) -> AsyncGenerator[None, None]
Events may optionally declare that they logically follow another event, so that we can preserve important event orderings in the face of unreliable delivery and ordering of messages from the queues. This function keeps track of the ID of each event that this shard has successfully processed going back to the PRECEDING_EVENT_LOOKBACK period. If an event arrives that must follow another one, confirm that we have recently seen and processed that event before proceeding. is ready to be processed event (ReceivedEvent): The event to be processed. This object should include metadata indicating if and what event it follows. depth (int, optional): The current recursion depth, used to prevent infinite recursion due to cyclic dependencies between events. Defaults to 0. Raises EventArrivedEarly if the current event shouldn’t be processed yet.

record_event_as_processing

record_event_as_processing(self, event: ReceivedEvent) -> bool
Record that an event is being processed, returning False if the event is already being processed.

record_event_as_seen

record_event_as_seen(self, event: ReceivedEvent) -> None

record_follower

record_follower(self, event: ReceivedEvent)
Remember that this event is waiting on another event to arrive

wait_for_leader

wait_for_leader(self, event: ReceivedEvent)
Given an event, wait for its leader to be processed before proceeding, or raise EventArrivedEarly if we would wait too long in this attempt.