Skip to content

prefect.automations

Automation

Bases: AutomationCore

Source code in prefect/automations.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class Automation(AutomationCore):
    id: Optional[UUID] = Field(default=None, description="The ID of this automation")

    @sync_compatible
    async def create(self: Self) -> Self:
        """
        Create a new automation.

        auto_to_create = Automation(
            name="woodchonk",
            trigger=EventTrigger(
                expect={"animal.walked"},
                match={
                    "genus": "Marmota",
                    "species": "monax",
                },
                posture="Reactive",
                threshold=3,
                within=timedelta(seconds=10),
            ),
            actions=[CancelFlowRun()]
        )
        created_automation = auto_to_create.create()
        """
        client, _ = get_or_create_client()
        automation = AutomationCore(**self.dict(exclude={"id"}))
        self.id = await client.create_automation(automation=automation)
        return self

    @sync_compatible
    async def update(self: Self):
        """
        Updates an existing automation.
        auto = Automation.read(id=123)
        auto.name = "new name"
        auto.update()
        """

        client, _ = get_or_create_client()
        automation = AutomationCore(**self.dict(exclude={"id", "owner_resource"}))
        await client.update_automation(automation_id=self.id, automation=automation)

    @classmethod
    @sync_compatible
    async def read(
        cls: Self, id: Optional[UUID] = None, name: Optional[str] = None
    ) -> Self:
        """
        Read an automation by ID or name.
        automation = Automation.read(name="woodchonk")

        or

        automation = Automation.read(id=UUID("b3514963-02b1-47a5-93d1-6eeb131041cb"))
        """
        if id and name:
            raise ValueError("Only one of id or name can be provided")
        if not id and not name:
            raise ValueError("One of id or name must be provided")
        client, _ = get_or_create_client()
        if id:
            try:
                automation = await client.read_automation(automation_id=id)
            except PrefectHTTPStatusError as exc:
                if exc.response.status_code == 404:
                    raise ValueError(f"Automation with ID {id!r} not found")
            return Automation(**automation.dict())
        else:
            automation = await client.read_automations_by_name(name=name)
            if len(automation) > 0:
                return Automation(**automation[0].dict()) if automation else None
            else:
                raise ValueError(f"Automation with name {name!r} not found")

    @sync_compatible
    async def delete(self: Self) -> bool:
        """
        auto = Automation.read(id = 123)
        auto.delete()
        """
        try:
            client, _ = get_or_create_client()
            await client.delete_automation(self.id)
            return True
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 404:
                return False
            raise

    @sync_compatible
    async def disable(self: Self) -> bool:
        """
        Disable an automation.
        auto = Automation.read(id = 123)
        auto.disable()
        """
        try:
            client, _ = get_or_create_client()
            await client.pause_automation(self.id)
            return True
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 404:
                return False
            raise

    @sync_compatible
    async def enable(self: Self) -> bool:
        """
        Enable an automation.
        auto = Automation.read(id = 123)
        auto.enable()
        """
        try:
            client, _ = get_or_create_client()
            await client.resume_automation("asd")
            return True
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 404:
                return False
            raise

create async

Create a new automation.

auto_to_create = Automation( name="woodchonk", trigger=EventTrigger( expect={"animal.walked"}, match={ "genus": "Marmota", "species": "monax", }, posture="Reactive", threshold=3, within=timedelta(seconds=10), ), actions=[CancelFlowRun()] ) created_automation = auto_to_create.create()

Source code in prefect/automations.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@sync_compatible
async def create(self: Self) -> Self:
    """
    Create a new automation.

    auto_to_create = Automation(
        name="woodchonk",
        trigger=EventTrigger(
            expect={"animal.walked"},
            match={
                "genus": "Marmota",
                "species": "monax",
            },
            posture="Reactive",
            threshold=3,
            within=timedelta(seconds=10),
        ),
        actions=[CancelFlowRun()]
    )
    created_automation = auto_to_create.create()
    """
    client, _ = get_or_create_client()
    automation = AutomationCore(**self.dict(exclude={"id"}))
    self.id = await client.create_automation(automation=automation)
    return self

delete async

auto = Automation.read(id = 123) auto.delete()

Source code in prefect/automations.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@sync_compatible
async def delete(self: Self) -> bool:
    """
    auto = Automation.read(id = 123)
    auto.delete()
    """
    try:
        client, _ = get_or_create_client()
        await client.delete_automation(self.id)
        return True
    except PrefectHTTPStatusError as exc:
        if exc.response.status_code == 404:
            return False
        raise

disable async

Disable an automation. auto = Automation.read(id = 123) auto.disable()

Source code in prefect/automations.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@sync_compatible
async def disable(self: Self) -> bool:
    """
    Disable an automation.
    auto = Automation.read(id = 123)
    auto.disable()
    """
    try:
        client, _ = get_or_create_client()
        await client.pause_automation(self.id)
        return True
    except PrefectHTTPStatusError as exc:
        if exc.response.status_code == 404:
            return False
        raise

enable async

Enable an automation. auto = Automation.read(id = 123) auto.enable()

Source code in prefect/automations.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@sync_compatible
async def enable(self: Self) -> bool:
    """
    Enable an automation.
    auto = Automation.read(id = 123)
    auto.enable()
    """
    try:
        client, _ = get_or_create_client()
        await client.resume_automation("asd")
        return True
    except PrefectHTTPStatusError as exc:
        if exc.response.status_code == 404:
            return False
        raise

read async classmethod

Read an automation by ID or name. automation = Automation.read(name="woodchonk")

or

automation = Automation.read(id=UUID("b3514963-02b1-47a5-93d1-6eeb131041cb"))

Source code in prefect/automations.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@classmethod
@sync_compatible
async def read(
    cls: Self, id: Optional[UUID] = None, name: Optional[str] = None
) -> Self:
    """
    Read an automation by ID or name.
    automation = Automation.read(name="woodchonk")

    or

    automation = Automation.read(id=UUID("b3514963-02b1-47a5-93d1-6eeb131041cb"))
    """
    if id and name:
        raise ValueError("Only one of id or name can be provided")
    if not id and not name:
        raise ValueError("One of id or name must be provided")
    client, _ = get_or_create_client()
    if id:
        try:
            automation = await client.read_automation(automation_id=id)
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 404:
                raise ValueError(f"Automation with ID {id!r} not found")
        return Automation(**automation.dict())
    else:
        automation = await client.read_automations_by_name(name=name)
        if len(automation) > 0:
            return Automation(**automation[0].dict()) if automation else None
        else:
            raise ValueError(f"Automation with name {name!r} not found")

update async

Updates an existing automation. auto = Automation.read(id=123) auto.name = "new name" auto.update()

Source code in prefect/automations.py
72
73
74
75
76
77
78
79
80
81
82
83
@sync_compatible
async def update(self: Self):
    """
    Updates an existing automation.
    auto = Automation.read(id=123)
    auto.name = "new name"
    auto.update()
    """

    client, _ = get_or_create_client()
    automation = AutomationCore(**self.dict(exclude={"id", "owner_resource"}))
    await client.update_automation(automation_id=self.id, automation=automation)

AutomationCore

Bases: PrefectBaseModel

Defines an action a user wants to take when a certain number of events do or don't happen to the matching resources

Source code in prefect/events/schemas/automations.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class AutomationCore(PrefectBaseModel, extra="ignore"):  # type: ignore[call-arg]
    """Defines an action a user wants to take when a certain number of events
    do or don't happen to the matching resources"""

    name: str = Field(..., description="The name of this automation")
    description: str = Field("", description="A longer description of this automation")

    enabled: bool = Field(True, description="Whether this automation will be evaluated")

    trigger: TriggerTypes = Field(
        ...,
        description=(
            "The criteria for which events this Automation covers and how it will "
            "respond to the presence or absence of those events"
        ),
    )

    actions: List[ActionTypes] = Field(
        ...,
        description="The actions to perform when this Automation triggers",
    )

    actions_on_trigger: List[ActionTypes] = Field(
        default_factory=list,
        description="The actions to perform when an Automation goes into a triggered state",
    )

    actions_on_resolve: List[ActionTypes] = Field(
        default_factory=list,
        description="The actions to perform when an Automation goes into a resolving state",
    )

    owner_resource: Optional[str] = Field(
        default=None, description="The owning resource of this automation"
    )

CompositeTrigger

Bases: Trigger, ABC

Requires some number of triggers to have fired within the given time period.

Source code in prefect/events/schemas/automations.py
312
313
314
315
316
317
318
319
class CompositeTrigger(Trigger, abc.ABC):
    """
    Requires some number of triggers to have fired within the given time period.
    """

    type: Literal["compound", "sequence"]
    triggers: List["TriggerTypes"]
    within: Optional[timedelta]

CompoundTrigger

Bases: CompositeTrigger

A composite trigger that requires some number of triggers to have fired within the given time period

Source code in prefect/events/schemas/automations.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
class CompoundTrigger(CompositeTrigger):
    """A composite trigger that requires some number of triggers to have
    fired within the given time period"""

    type: Literal["compound"] = "compound"
    require: Union[int, Literal["any", "all"]]

    @root_validator
    def validate_require(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        require = values.get("require")

        if isinstance(require, int):
            if require < 1:
                raise ValueError("required must be at least 1")
            if require > len(values["triggers"]):
                raise ValueError(
                    "required must be less than or equal to the number of triggers"
                )

        return values

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        return textwrap.indent(
            "\n".join(
                [
                    f"{str(self.require).capitalize()} of:",
                    "\n".join(
                        [
                            trigger.describe_for_cli(indent=indent + 1)
                            for trigger in self.triggers
                        ]
                    ),
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    return textwrap.indent(
        "\n".join(
            [
                f"{str(self.require).capitalize()} of:",
                "\n".join(
                    [
                        trigger.describe_for_cli(indent=indent + 1)
                        for trigger in self.triggers
                    ]
                ),
            ]
        ),
        prefix="  " * indent,
    )

EventTrigger

Bases: ResourceTrigger

A trigger that fires based on the presence or absence of events within a given period of time.

Source code in prefect/events/schemas/automations.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class EventTrigger(ResourceTrigger):
    """
    A trigger that fires based on the presence or absence of events within a given
    period of time.
    """

    type: Literal["event"] = "event"

    after: Set[str] = Field(
        default_factory=set,
        description=(
            "The event(s) which must first been seen to fire this trigger.  If "
            "empty, then fire this trigger immediately.  Events may include "
            "trailing wildcards, like `prefect.flow-run.*`"
        ),
    )
    expect: Set[str] = Field(
        default_factory=set,
        description=(
            "The event(s) this trigger is expecting to see.  If empty, this "
            "trigger will match any event.  Events may include trailing wildcards, "
            "like `prefect.flow-run.*`"
        ),
    )

    for_each: Set[str] = Field(
        default_factory=set,
        description=(
            "Evaluate the trigger separately for each distinct value of these labels "
            "on the resource.  By default, labels refer to the primary resource of the "
            "triggering event.  You may also refer to labels from related "
            "resources by specifying `related:<role>:<label>`.  This will use the "
            "value of that label for the first related resource in that role.  For "
            'example, `"for_each": ["related:flow:prefect.resource.id"]` would '
            "evaluate the trigger for each flow."
        ),
    )
    posture: Literal[Posture.Reactive, Posture.Proactive] = Field(  # type: ignore[valid-type]
        Posture.Reactive,
        description=(
            "The posture of this trigger, either Reactive or Proactive.  Reactive "
            "triggers respond to the _presence_ of the expected events, while "
            "Proactive triggers respond to the _absence_ of those expected events."
        ),
    )
    threshold: int = Field(
        1,
        description=(
            "The number of events required for this trigger to fire (for "
            "Reactive triggers), or the number of events expected (for Proactive "
            "triggers)"
        ),
    )
    within: timedelta = Field(
        timedelta(0),
        minimum=0.0,
        exclusiveMinimum=False,
        description=(
            "The time period over which the events must occur.  For Reactive triggers, "
            "this may be as low as 0 seconds, but must be at least 10 seconds for "
            "Proactive triggers"
        ),
    )

    @validator("within")
    def enforce_minimum_within(
        cls, value: timedelta, values, config, field: ModelField
    ):
        return validate_trigger_within(value, field)

    @root_validator(skip_on_failure=True)
    def enforce_minimum_within_for_proactive_triggers(cls, values: Dict[str, Any]):
        posture: Optional[Posture] = values.get("posture")
        within: Optional[timedelta] = values.get("within")

        if posture == Posture.Proactive:
            if not within or within == timedelta(0):
                values["within"] = timedelta(seconds=10.0)
            elif within < timedelta(seconds=10.0):
                raise ValueError(
                    "The minimum within for Proactive triggers is 10 seconds"
                )

        return values

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        if self.posture == Posture.Reactive:
            return textwrap.indent(
                "\n".join(
                    [
                        f"Reactive: expecting {self.threshold} of {self.expect}",
                    ],
                ),
                prefix="  " * indent,
            )
        else:
            return textwrap.indent(
                "\n".join(
                    [
                        f"Proactive: expecting {self.threshold} {self.expect} event "
                        f"within {self.within}",
                    ],
                ),
                prefix="  " * indent,
            )

describe_for_cli

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    if self.posture == Posture.Reactive:
        return textwrap.indent(
            "\n".join(
                [
                    f"Reactive: expecting {self.threshold} of {self.expect}",
                ],
            ),
            prefix="  " * indent,
        )
    else:
        return textwrap.indent(
            "\n".join(
                [
                    f"Proactive: expecting {self.threshold} {self.expect} event "
                    f"within {self.within}",
                ],
            ),
            prefix="  " * indent,
        )

MetricTrigger

Bases: ResourceTrigger

A trigger that fires based on the results of a metric query.

Source code in prefect/events/schemas/automations.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class MetricTrigger(ResourceTrigger):
    """
    A trigger that fires based on the results of a metric query.
    """

    type: Literal["metric"] = "metric"

    posture: Literal[Posture.Metric] = Field(  # type: ignore[valid-type]
        Posture.Metric,
        description="Periodically evaluate the configured metric query.",
    )

    metric: MetricTriggerQuery = Field(
        ...,
        description="The metric query to evaluate for this trigger. ",
    )

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        m = self.metric
        return textwrap.indent(
            "\n".join(
                [
                    f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
299
300
301
302
303
304
305
306
307
308
309
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    m = self.metric
    return textwrap.indent(
        "\n".join(
            [
                f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
            ]
        ),
        prefix="  " * indent,
    )

MetricTriggerQuery

Bases: PrefectBaseModel

Defines a subset of the Trigger subclass, which is specific to Metric automations, that specify the query configurations and breaching conditions for the Automation

Source code in prefect/events/schemas/automations.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class MetricTriggerQuery(PrefectBaseModel):
    """Defines a subset of the Trigger subclass, which is specific
    to Metric automations, that specify the query configurations
    and breaching conditions for the Automation"""

    name: PrefectMetric = Field(
        ...,
        description="The name of the metric to query.",
    )
    threshold: float = Field(
        ...,
        description=(
            "The threshold value against which we'll compare " "the query result."
        ),
    )
    operator: MetricTriggerOperator = Field(
        ...,
        description=(
            "The comparative operator (LT / LTE / GT / GTE) used to compare "
            "the query result against the threshold value."
        ),
    )
    range: timedelta = Field(
        timedelta(seconds=300),  # defaults to 5 minutes
        minimum=300.0,
        exclusiveMinimum=False,
        description=(
            "The lookback duration (seconds) for a metric query. This duration is "
            "used to determine the time range over which the query will be executed. "
            "The minimum value is 300 seconds (5 minutes)."
        ),
    )
    firing_for: timedelta = Field(
        timedelta(seconds=300),  # defaults to 5 minutes
        minimum=300.0,
        exclusiveMinimum=False,
        description=(
            "The duration (seconds) for which the metric query must breach "
            "or resolve continuously before the state is updated and the "
            "automation is triggered. "
            "The minimum value is 300 seconds (5 minutes)."
        ),
    )

ResourceSpecification

Bases: PrefectBaseModel

A specification that may match zero, one, or many resources, used to target or select a set of resources in a query or automation. A resource must match at least one value of all of the provided labels

Source code in prefect/events/schemas/events.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class ResourceSpecification(PrefectBaseModel):
    """A specification that may match zero, one, or many resources, used to target or
    select a set of resources in a query or automation.  A resource must match at least
    one value of all of the provided labels"""

    __root__: Dict[str, Union[str, List[str]]]

    def matches_every_resource(self) -> bool:
        return len(self) == 0

    def matches_every_resource_of_kind(self, prefix: str) -> bool:
        if self.matches_every_resource():
            return True

        if len(self.__root__) == 1:
            if resource_id := self.__root__.get("prefect.resource.id"):
                values = [resource_id] if isinstance(resource_id, str) else resource_id
                return any(value == f"{prefix}.*" for value in values)

        return False

    def includes(self, candidates: Iterable[Resource]) -> bool:
        if self.matches_every_resource():
            return True

        for candidate in candidates:
            if self.matches(candidate):
                return True

        return False

    def matches(self, resource: Resource) -> bool:
        for label, expected in self.items():
            value = resource.get(label)
            if not any(matches(candidate, value) for candidate in expected):
                return False
        return True

    def items(self) -> Iterable[Tuple[str, List[str]]]:
        return [
            (label, [value] if isinstance(value, str) else value)
            for label, value in self.__root__.items()
        ]

    def __contains__(self, key: str) -> bool:
        return self.__root__.__contains__(key)

    def __getitem__(self, key: str) -> List[str]:
        value = self.__root__[key]
        if not value:
            return []
        if not isinstance(value, list):
            value = [value]
        return value

    def pop(
        self, key: str, default: Optional[Union[str, List[str]]] = None
    ) -> Optional[List[str]]:
        value = self.__root__.pop(key, default)
        if not value:
            return []
        if not isinstance(value, list):
            value = [value]
        return value

    def get(
        self, key: str, default: Optional[Union[str, List[str]]] = None
    ) -> Optional[List[str]]:
        value = self.__root__.get(key, default)
        if not value:
            return []
        if not isinstance(value, list):
            value = [value]
        return value

    def __len__(self) -> int:
        return len(self.__root__)

    def deepcopy(self) -> "ResourceSpecification":
        return ResourceSpecification.parse_obj(copy.deepcopy(self.__root__))

ResourceTrigger

Bases: Trigger, ABC

Base class for triggers that may filter by the labels of resources.

Source code in prefect/events/schemas/automations.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class ResourceTrigger(Trigger, abc.ABC):
    """
    Base class for triggers that may filter by the labels of resources.
    """

    type: str

    match: ResourceSpecification = Field(
        default_factory=lambda: ResourceSpecification.parse_obj({}),
        description="Labels for resources which this trigger will match.",
    )
    match_related: ResourceSpecification = Field(
        default_factory=lambda: ResourceSpecification.parse_obj({}),
        description="Labels for related resources which this trigger will match.",
    )

SequenceTrigger

Bases: CompositeTrigger

A composite trigger that requires some number of triggers to have fired within the given time period in a specific order

Source code in prefect/events/schemas/automations.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class SequenceTrigger(CompositeTrigger):
    """A composite trigger that requires some number of triggers to have fired
    within the given time period in a specific order"""

    type: Literal["sequence"] = "sequence"

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        return textwrap.indent(
            "\n".join(
                [
                    "In this order:",
                    "\n".join(
                        [
                            trigger.describe_for_cli(indent=indent + 1)
                            for trigger in self.triggers
                        ]
                    ),
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    return textwrap.indent(
        "\n".join(
            [
                "In this order:",
                "\n".join(
                    [
                        trigger.describe_for_cli(indent=indent + 1)
                        for trigger in self.triggers
                    ]
                ),
            ]
        ),
        prefix="  " * indent,
    )

Trigger

Bases: PrefectBaseModel, ABC

Base class describing a set of criteria that must be satisfied in order to trigger an automation.

Source code in prefect/events/schemas/automations.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class Trigger(PrefectBaseModel, abc.ABC, extra="ignore"):  # type: ignore[call-arg]
    """
    Base class describing a set of criteria that must be satisfied in order to trigger
    an automation.
    """

    type: str

    @abc.abstractmethod
    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""

    # The following allows the regular Trigger class to be used when serving or
    # deploying flows, analogous to how the Deployment*Trigger classes work

    _deployment_id: Optional[UUID] = PrivateAttr(default=None)

    def set_deployment_id(self, deployment_id: UUID):
        self._deployment_id = deployment_id

    def owner_resource(self) -> Optional[str]:
        return f"prefect.deployment.{self._deployment_id}"

    def actions(self) -> List[ActionTypes]:
        assert self._deployment_id
        return [
            RunDeployment(
                source="selected",
                deployment_id=self._deployment_id,
                parameters=getattr(self, "parameters", None),
                job_variables=getattr(self, "job_variables", None),
            )
        ]

    def as_automation(self) -> "AutomationCore":
        assert self._deployment_id

        trigger: TriggerTypes = cast(TriggerTypes, self)

        # This is one of the Deployment*Trigger classes, so translate it over to a
        # plain Trigger
        if hasattr(self, "trigger_type"):
            trigger = self.trigger_type(**self.dict())

        return AutomationCore(
            name=(
                getattr(self, "name", None)
                or f"Automation for deployment {self._deployment_id}"
            ),
            description="",
            enabled=getattr(self, "enabled", True),
            trigger=trigger,
            actions=self.actions(),
            owner_resource=self.owner_resource(),
        )

describe_for_cli abstractmethod

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
50
51
52
@abc.abstractmethod
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""