prefect.server.events.actions

The actions consumer watches for actions that have been triggered by Automations and carries them out. Also includes the various concrete subtypes of Actions

Functions

record_action_happening

record_action_happening(id: UUID) -> None
Record that an action has happened, with an expiration of an hour.

action_has_already_happened

action_has_already_happened(id: UUID) -> bool
Check if the action has already happened

consumer

consumer() -> AsyncGenerator[MessageHandler, None]

Classes

ActionFailed

Action

An Action that may be performed when an Automation is triggered Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None
Perform the requested Action

fail

fail(self, triggered_action: 'TriggeredAction', reason: str) -> None

logging_context

logging_context(self, triggered_action: 'TriggeredAction') -> Dict[str, Any]
Common logging context for all actions

model_validate_list

model_validate_list(cls, obj: Any) -> list[Self]

reset_fields

reset_fields(self: Self) -> Self
Reset the fields of the model that are in the _reset_fields set. Returns:
  • A new instance of the model with the reset fields.

succeed

succeed(self, triggered_action: 'TriggeredAction') -> None

DoNothing

Do nothing when an Automation is triggered Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

EmitEventAction

Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

create_event

create_event(self, triggered_action: 'TriggeredAction') -> 'Event'
Create an event from the TriggeredAction

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

ExternalDataAction

Base class for Actions that require data from an external source such as the Orchestration API Methods:

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

JinjaTemplateAction

Base class for Actions that use Jinja templates supplied by the user and are rendered with a context containing data from the triggered action, and the orchestration API. Methods:

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

instantiate_object

instantiate_object(self, model: Type[PrefectBaseModel], data: Dict[str, Any], triggered_action: 'TriggeredAction', resource: Optional['Resource'] = None) -> PrefectBaseModel

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

templates_in_dictionary

templates_in_dictionary(cls, dict_: dict[Any, Any | dict[Any, Any]]) -> list[tuple[dict[Any, Any], dict[Any, str]]]

validate_template

validate_template(cls, template: str, field_name: str) -> str

DeploymentAction

Base class for Actions that operate on Deployments and need to infer them from events Methods:

deployment_id_to_use

deployment_id_to_use(self, triggered_action: 'TriggeredAction') -> UUID

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

selected_deployment_requires_id

selected_deployment_requires_id(self) -> Self

DeploymentCommandAction

Executes a command against a matching deployment Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response
Execute the deployment command

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

selected_deployment_requires_id

selected_deployment_requires_id(self)

RunDeployment

Runs the given deployment with the given parameters Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response
Execute the deployment command

instantiate_object

instantiate_object(self, model: Type[PrefectBaseModel], data: Dict[str, Any], triggered_action: 'TriggeredAction', resource: Optional['Resource'] = None) -> PrefectBaseModel

render_parameters

render_parameters(self, triggered_action: 'TriggeredAction') -> Dict[str, Any]

templates_in_dictionary

templates_in_dictionary(cls, dict_: dict[Any, Any | dict[Any, Any]]) -> list[tuple[dict[Any, Any], dict[Any, str]]]

validate_parameters

validate_parameters(cls, value: dict[str, Any] | None) -> dict[str, Any] | None

validate_template

validate_template(cls, template: str, field_name: str) -> str

PauseDeployment

Pauses the given Deployment Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response
Execute the deployment command

ResumeDeployment

Resumes the given Deployment Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response
Execute the deployment command

FlowRunAction

An action that operates on a flow run Methods:

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

flow_run

flow_run(self, triggered_action: 'TriggeredAction') -> UUID

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

FlowRunStateChangeAction

Changes the state of a flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

flow_run

flow_run(self, triggered_action: 'TriggeredAction') -> UUID

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate
Return the new state for the flow run

ChangeFlowRunState

Changes the state of a flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate
Return the new state for the flow run

CancelFlowRun

Cancels a flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate
Return the new state for the flow run

SuspendFlowRun

Suspends a flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate
Return the new state for the flow run

ResumeFlowRun

Resumes a paused or suspended flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

flow_run

flow_run(self, triggered_action: 'TriggeredAction') -> UUID

CallWebhook

Call a webhook when an Automation is triggered. Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

ensure_payload_is_a_string

ensure_payload_is_a_string(cls, value: Union[str, Dict[str, Any], None]) -> Optional[str]
Temporary measure while we migrate payloads from being a dictionary to a string template. This covers both reading from the database where values may currently be a dictionary, as well as the API, where older versions of the frontend may be sending a JSON object with the single "message" key.

instantiate_object

instantiate_object(self, model: Type[PrefectBaseModel], data: Dict[str, Any], triggered_action: 'TriggeredAction', resource: Optional['Resource'] = None) -> PrefectBaseModel

templates_in_dictionary

templates_in_dictionary(cls, dict_: dict[Any, Any | dict[Any, Any]]) -> list[tuple[dict[Any, Any], dict[Any, str]]]

validate_payload_templates

validate_payload_templates(cls, value: Optional[str]) -> Optional[str]
Validate user-provided payload template.

validate_template

validate_template(cls, template: str, field_name: str) -> str

SendNotification

Send a notification when an Automation is triggered Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

instantiate_object

instantiate_object(self, model: Type[PrefectBaseModel], data: Dict[str, Any], triggered_action: 'TriggeredAction', resource: Optional['Resource'] = None) -> PrefectBaseModel

is_valid_template

is_valid_template(cls, value: str, info: ValidationInfo) -> str

render

render(self, triggered_action: 'TriggeredAction') -> List[str]

templates_in_dictionary

templates_in_dictionary(cls, dict_: dict[Any, Any | dict[Any, Any]]) -> list[tuple[dict[Any, Any], dict[Any, str]]]

validate_template

validate_template(cls, template: str, field_name: str) -> str

WorkPoolAction

Base class for Actions that operate on Work Pools and need to infer them from events Methods:

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

selected_work_pool_requires_id

selected_work_pool_requires_id(self) -> Self

work_pool_id_to_use

work_pool_id_to_use(self, triggered_action: 'TriggeredAction') -> UUID

WorkPoolCommandAction

Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Pool

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

target_work_pool

target_work_pool(self, triggered_action: 'TriggeredAction') -> WorkPool

PauseWorkPool

Pauses a Work Pool Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Pool

target_work_pool

target_work_pool(self, triggered_action: 'TriggeredAction') -> WorkPool

ResumeWorkPool

Resumes a Work Pool Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Pool

target_work_pool

target_work_pool(self, triggered_action: 'TriggeredAction') -> WorkPool

WorkQueueAction

Base class for Actions that operate on Work Queues and need to infer them from events Methods:

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

selected_work_queue_requires_id

selected_work_queue_requires_id(self) -> Self

work_queue_id_to_use

work_queue_id_to_use(self, triggered_action: 'TriggeredAction') -> UUID

WorkQueueCommandAction

Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

selected_work_queue_requires_id

selected_work_queue_requires_id(self) -> Self

PauseWorkQueue

Pauses a Work Queue Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

ResumeWorkQueue

Resumes a Work Queue Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

AutomationAction

Base class for Actions that operate on Automations and need to infer them from events Methods:

automation_id_to_use

automation_id_to_use(self, triggered_action: 'TriggeredAction') -> UUID

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

selected_automation_requires_id

selected_automation_requires_id(self) -> Self

AutomationCommandAction

Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

selected_automation_requires_id

selected_automation_requires_id(self) -> Self

PauseAutomation

Pauses a Work Queue Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

ResumeAutomation

Resumes a Work Queue Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue