Documentation Index Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
Use this file to discover all available pages before exploring further.
prefect.server.events.triggers
The triggers consumer watches events streaming in from the event bus and decides whether
to act on them based on the automations that users have set up.
Functions
evaluate
evaluate(session: AsyncSession, trigger: EventTrigger, bucket: 'ORMAutomationBucket' , now: prefect.types._datetime.DateTime, triggering_event: Optional[ReceivedEvent]) -> 'ORMAutomationBucket | None'
Evaluates an Automation, either triggered by a specific event or proactively
on a time interval. Evaluating a Automation updates the associated counters for
each automation, and will fire the associated action if it has met the threshold.
fire
fire(session: AsyncSession, firing: Firing) -> None
evaluate_composite_trigger
evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> None
act
act(firing: Firing) -> None
Given a Automation that has been triggered, the triggering labels and event
(if there was one), publish an action for the actions service to process.
update_events_clock
update_events_clock(event: ReceivedEvent) -> None
get_events_clock
get_events_clock() -> Optional[ float ]
get_events_clock_offset
get_events_clock_offset() -> float
Calculate the current clock offset. This takes into account both the occurred
of the last event, as well as the time we saw the last event. This helps to
ensure that in low volume environments, we don’t end up getting huge offsets.
reset_events_clock
reset_events_clock() -> None
reactive_evaluation
reactive_evaluation(event: ReceivedEvent, depth: int = 0 ) -> None
Evaluate all automations that may apply to this event.
event (ReceivedEvent): The event to evaluate. This object contains all the necessary information
about the event, including its type, associated resources, and metadata.
depth (int, optional): The current recursion depth. This is used to prevent infinite recursion
due to cyclic event dependencies. Defaults to 0 and is incremented with
each recursive call.
get_lost_followers
get_lost_followers() -> List[ReceivedEvent]
Get followers that have been sitting around longer than our lookback
periodic_evaluation
periodic_evaluation(now: prefect.types._datetime.DateTime) -> None
Periodic tasks that should be run regularly, but not as often as every event
evaluate_periodically
evaluate_periodically(periodic_granularity: timedelta) -> None
Runs periodic evaluation on the given interval
find_interested_triggers
find_interested_triggers(event: ReceivedEvent) -> Collection[EventTrigger]
clear_loaded_automations
clear_loaded_automations() -> None
load_automation
load_automation(automation: Optional[Automation]) -> None
Loads the given automation into memory so that it is available for evaluations
forget_automation
forget_automation(automation_id: UUID ) -> None
Unloads the given automation from memory
automation_changed
automation_changed(automation_id: UUID , event: Literal[ 'automation__created' , 'automation__updated' , 'automation__deleted' ]) -> None
load_automations
load_automations(db: PrefectDBInterface, session: AsyncSession)
Loads all automations for the given set of accounts
read_automation_state_snapshot
read_automation_state_snapshot(db: PrefectDBInterface, session: AsyncSession) -> AutomationStateSnapshot
reconcile_automations
reconcile_automations(force: bool = False ) -> bool
remove_buckets_exceeding_threshold
remove_buckets_exceeding_threshold(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger)
Deletes bucket where the count has already exceeded the threshold
read_buckets_for_automation
read_buckets_for_automation(db: PrefectDBInterface, session: AsyncSession, trigger: Trigger, batch_size: int = AUTOMATION_BUCKET_BATCH_SIZE ) -> AsyncGenerator[ 'ORMAutomationBucket' , None ]
Yields buckets for the given automation and trigger in batches.
read_bucket
read_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: Trigger, bucketing_key: Tuple[ str , ... ]) -> Optional[ 'ORMAutomationBucket' ]
Gets the bucket this event would fall into for the given Automation, if there is
one currently
read_bucket_by_trigger_id
read_bucket_by_trigger_id(db: PrefectDBInterface, session: AsyncSession, automation_id: UUID , trigger_id: UUID , bucketing_key: Tuple[ str , ... ]) -> 'ORMAutomationBucket | None'
Gets the bucket this event would fall into for the given Automation, if there is
one currently
increment_bucket
increment_bucket(db: PrefectDBInterface, session: AsyncSession, bucket: 'ORMAutomationBucket' , count: int , last_event: Optional[ReceivedEvent]) -> 'ORMAutomationBucket'
Adds the given count to the bucket, returning the new bucket
start_new_bucket
start_new_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger, bucketing_key: Tuple[ str , ... ], start: prefect.types._datetime.DateTime, end: prefect.types._datetime.DateTime, count: int , triggered_at: Optional[prefect.types._datetime.DateTime] = None , last_event: Optional[ReceivedEvent] = None ) -> 'ORMAutomationBucket'
Ensures that a bucket with the given start and end exists with the given count,
returning the new bucket
ensure_bucket
ensure_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger, bucketing_key: Tuple[ str , ... ], start: prefect.types._datetime.DateTime, end: prefect.types._datetime.DateTime, last_event: Optional[ReceivedEvent], initial_count: int = 0 ) -> 'ORMAutomationBucket'
Ensures that a bucket has been started for the given automation and key,
returning the current bucket. Will not modify the existing bucket.
remove_bucket
remove_bucket(db: PrefectDBInterface, session: AsyncSession, bucket: 'ORMAutomationBucket' )
Removes the given bucket from the database
sweep_closed_buckets
sweep_closed_buckets(db: PrefectDBInterface, session: AsyncSession, older_than: prefect.types._datetime.DateTime) -> None
reset
Resets the in-memory state of the service
listen_for_automation_changes
listen_for_automation_changes() -> None
Listens for any changes to automations via PostgreSQL NOTIFY/LISTEN,
and applies those changes to the set of loaded automations.
consumer
consumer(periodic_granularity: timedelta = timedelta( seconds = 5 )) -> AsyncGenerator[MessageHandler, None ]
The triggers.consumer processes all Events arriving on the event bus to
determine if they meet the automation criteria, queuing up a corresponding
TriggeredAction for the actions service if the automation criteria is met.
proactive_evaluation
proactive_evaluation(trigger: EventTrigger, as_of: prefect.types._datetime.DateTime) -> prefect.types._datetime.DateTime
The core proactive evaluation operation for a single Automation
evaluate_proactive_triggers
evaluate_proactive_triggers() -> None