prefect.server.events.triggers

The triggers consumer watches events streaming in from the event bus and decides whether to act on them based on the automations that users have set up.

Functions

evaluate

evaluate(session: AsyncSession, trigger: EventTrigger, bucket: 'ORMAutomationBucket', now: prefect.types._datetime.DateTime, triggering_event: Optional[ReceivedEvent]) -> 'ORMAutomationBucket | None'
Evaluates an Automation, either triggered by a specific event or proactively on a time interval. Evaluating a Automation updates the associated counters for each automation, and will fire the associated action if it has met the threshold.

fire

fire(session: AsyncSession, firing: Firing) -> None

evaluate_composite_trigger

evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> None

act

act(firing: Firing) -> None
Given a Automation that has been triggered, the triggering labels and event (if there was one), publish an action for the actions service to process.

update_events_clock

update_events_clock(event: ReceivedEvent) -> None

get_events_clock

get_events_clock() -> Optional[float]

get_events_clock_offset

get_events_clock_offset() -> float
Calculate the current clock offset. This takes into account both the occurred of the last event, as well as the time we saw the last event. This helps to ensure that in low volume environments, we don’t end up getting huge offsets.

reset_events_clock

reset_events_clock() -> None

reactive_evaluation

reactive_evaluation(event: ReceivedEvent, depth: int = 0) -> None
Evaluate all automations that may apply to this event. event (ReceivedEvent): The event to evaluate. This object contains all the necessary information about the event, including its type, associated resources, and metadata. depth (int, optional): The current recursion depth. This is used to prevent infinite recursion due to cyclic event dependencies. Defaults to 0 and is incremented with each recursive call.

get_lost_followers

get_lost_followers() -> List[ReceivedEvent]
Get followers that have been sitting around longer than our lookback

periodic_evaluation

periodic_evaluation(now: prefect.types._datetime.DateTime) -> None
Periodic tasks that should be run regularly, but not as often as every event

evaluate_periodically

evaluate_periodically(periodic_granularity: timedelta) -> None
Runs periodic evaluation on the given interval

find_interested_triggers

find_interested_triggers(event: ReceivedEvent) -> Collection[EventTrigger]

load_automation

load_automation(automation: Optional[Automation]) -> None
Loads the given automation into memory so that it is available for evaluations

forget_automation

forget_automation(automation_id: UUID) -> None
Unloads the given automation from memory

automation_changed

automation_changed(automation_id: UUID, event: Literal['automation__created', 'automation__updated', 'automation__deleted']) -> None

load_automations

load_automations(db: PrefectDBInterface, session: AsyncSession)
Loads all automations for the given set of accounts

remove_buckets_exceeding_threshold

remove_buckets_exceeding_threshold(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger)
Deletes bucket where the count has already exceeded the threshold

read_buckets_for_automation

read_buckets_for_automation(db: PrefectDBInterface, session: AsyncSession, trigger: Trigger, batch_size: int = AUTOMATION_BUCKET_BATCH_SIZE) -> AsyncGenerator['ORMAutomationBucket', None]
Yields buckets for the given automation and trigger in batches.

read_bucket

read_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: Trigger, bucketing_key: Tuple[str, ...]) -> Optional['ORMAutomationBucket']
Gets the bucket this event would fall into for the given Automation, if there is one currently

read_bucket_by_trigger_id

read_bucket_by_trigger_id(db: PrefectDBInterface, session: AsyncSession, automation_id: UUID, trigger_id: UUID, bucketing_key: Tuple[str, ...]) -> 'ORMAutomationBucket | None'
Gets the bucket this event would fall into for the given Automation, if there is one currently

increment_bucket

increment_bucket(db: PrefectDBInterface, session: AsyncSession, bucket: 'ORMAutomationBucket', count: int, last_event: Optional[ReceivedEvent]) -> 'ORMAutomationBucket'
Adds the given count to the bucket, returning the new bucket

start_new_bucket

start_new_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger, bucketing_key: Tuple[str, ...], start: prefect.types._datetime.DateTime, end: prefect.types._datetime.DateTime, count: int, triggered_at: Optional[prefect.types._datetime.DateTime] = None) -> 'ORMAutomationBucket'
Ensures that a bucket with the given start and end exists with the given count, returning the new bucket

ensure_bucket

ensure_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger, bucketing_key: Tuple[str, ...], start: prefect.types._datetime.DateTime, end: prefect.types._datetime.DateTime, last_event: Optional[ReceivedEvent], initial_count: int = 0) -> 'ORMAutomationBucket'
Ensures that a bucket has been started for the given automation and key, returning the current bucket. Will not modify the existing bucket.

remove_bucket

remove_bucket(db: PrefectDBInterface, session: AsyncSession, bucket: 'ORMAutomationBucket')
Removes the given bucket from the database

sweep_closed_buckets

sweep_closed_buckets(db: PrefectDBInterface, session: AsyncSession, older_than: prefect.types._datetime.DateTime) -> None

reset

reset() -> None
Resets the in-memory state of the service

causal_ordering

causal_ordering() -> CausalOrdering

listen_for_automation_changes

listen_for_automation_changes() -> None
Listens for any changes to automations via PostgreSQL NOTIFY/LISTEN, and applies those changes to the set of loaded automations.

consumer

consumer(periodic_granularity: timedelta = timedelta(seconds=5)) -> AsyncGenerator[MessageHandler, None]
The triggers.consumer processes all Events arriving on the event bus to determine if they meet the automation criteria, queuing up a corresponding TriggeredAction for the actions service if the automation criteria is met.

proactive_evaluation

proactive_evaluation(trigger: EventTrigger, as_of: prefect.types._datetime.DateTime) -> prefect.types._datetime.DateTime
The core proactive evaluation operation for a single Automation

evaluate_proactive_triggers

evaluate_proactive_triggers() -> None