Skip to content

prefect.client.schemas.actions

StateCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a new state.

Source code in prefect/client/schemas/actions.py
58
59
60
61
62
63
64
65
66
67
class StateCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a new state."""

    type: StateType
    name: Optional[str] = Field(default=None)
    message: Optional[str] = Field(default=None, example="Run started")
    state_details: StateDetails = Field(default_factory=StateDetails)
    data: Union["BaseResult[R]", "DataDocument[R]", Any] = Field(
        default=None,
    )

FlowCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a flow.

Source code in prefect/client/schemas/actions.py
70
71
72
73
74
75
@copy_model_fields
class FlowCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a flow."""

    name: str = FieldFrom(objects.Flow)
    tags: List[str] = FieldFrom(objects.Flow)

FlowUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a flow.

Source code in prefect/client/schemas/actions.py
78
79
80
81
82
@copy_model_fields
class FlowUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a flow."""

    tags: List[str] = FieldFrom(objects.Flow)

DeploymentCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a deployment.

Source code in prefect/client/schemas/actions.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@experimental_field(
    "work_pool_name",
    group="work_pools",
    when=lambda x: x is not None,
)
@copy_model_fields
class DeploymentCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a deployment."""

    @root_validator(pre=True)
    def remove_old_fields(cls, values):
        # 2.7.7 removed worker_pool_queue_id in lieu of worker_pool_name and
        # worker_pool_queue_name. Those fields were later renamed to work_pool_name
        # and work_queue_name. This validator removes old fields provided
        # by older clients to avoid 422 errors.
        values_copy = copy(values)
        worker_pool_queue_id = values_copy.pop("worker_pool_queue_id", None)
        worker_pool_name = values_copy.pop("worker_pool_name", None)
        worker_pool_queue_name = values_copy.pop("worker_pool_queue_name", None)
        work_pool_queue_name = values_copy.pop("work_pool_queue_name", None)
        if worker_pool_queue_id:
            warnings.warn(
                (
                    "`worker_pool_queue_id` is no longer supported for creating "
                    "deployments. Please use `work_pool_name` and "
                    "`work_queue_name` instead."
                ),
                UserWarning,
            )
        if worker_pool_name or worker_pool_queue_name or work_pool_queue_name:
            warnings.warn(
                (
                    "`worker_pool_name`, `worker_pool_queue_name`, and "
                    "`work_pool_name` are"
                    "no longer supported for creating "
                    "deployments. Please use `work_pool_name` and "
                    "`work_queue_name` instead."
                ),
                UserWarning,
            )
        return values_copy

    name: str = FieldFrom(objects.Deployment)
    flow_id: UUID = FieldFrom(objects.Deployment)
    is_schedule_active: Optional[bool] = FieldFrom(objects.Deployment)
    enforce_parameter_schema: Optional[bool] = Field(
        default=None,
        description=(
            "Whether or not the deployment should enforce the parameter schema."
        ),
    )
    parameter_openapi_schema: Optional[Dict[str, Any]] = FieldFrom(objects.Deployment)
    parameters: Dict[str, Any] = FieldFrom(objects.Deployment)
    tags: List[str] = FieldFrom(objects.Deployment)
    pull_steps: Optional[List[dict]] = FieldFrom(objects.Deployment)

    manifest_path: Optional[str] = FieldFrom(objects.Deployment)
    work_queue_name: Optional[str] = FieldFrom(objects.Deployment)
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the deployment's work pool.",
        example="my-work-pool",
    )
    storage_document_id: Optional[UUID] = FieldFrom(objects.Deployment)
    infrastructure_document_id: Optional[UUID] = FieldFrom(objects.Deployment)
    schedule: Optional[SCHEDULE_TYPES] = FieldFrom(objects.Deployment)
    description: Optional[str] = FieldFrom(objects.Deployment)
    path: Optional[str] = FieldFrom(objects.Deployment)
    version: Optional[str] = FieldFrom(objects.Deployment)
    entrypoint: Optional[str] = FieldFrom(objects.Deployment)
    infra_overrides: Optional[Dict[str, Any]] = FieldFrom(objects.Deployment)

    def check_valid_configuration(self, base_job_template: dict):
        """Check that the combination of base_job_template defaults
        and infra_overrides conforms to the specified schema.
        """
        variables_schema = deepcopy(base_job_template.get("variables"))

        if variables_schema is not None:
            # jsonschema considers required fields, even if that field has a default,
            # to still be required. To get around this we remove the fields from
            # required if there is a default present.
            required = variables_schema.get("required")
            properties = variables_schema.get("properties")
            if required is not None and properties is not None:
                for k, v in properties.items():
                    if "default" in v and k in required:
                        required.remove(k)

            jsonschema.validate(self.infra_overrides, variables_schema)

check_valid_configuration

Check that the combination of base_job_template defaults and infra_overrides conforms to the specified schema.

Source code in prefect/client/schemas/actions.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def check_valid_configuration(self, base_job_template: dict):
    """Check that the combination of base_job_template defaults
    and infra_overrides conforms to the specified schema.
    """
    variables_schema = deepcopy(base_job_template.get("variables"))

    if variables_schema is not None:
        # jsonschema considers required fields, even if that field has a default,
        # to still be required. To get around this we remove the fields from
        # required if there is a default present.
        required = variables_schema.get("required")
        properties = variables_schema.get("properties")
        if required is not None and properties is not None:
            for k, v in properties.items():
                if "default" in v and k in required:
                    required.remove(k)

        jsonschema.validate(self.infra_overrides, variables_schema)

DeploymentUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a deployment.

Source code in prefect/client/schemas/actions.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
@experimental_field(
    "work_pool_name",
    group="work_pools",
    when=lambda x: x is not None,
)
@copy_model_fields
class DeploymentUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a deployment."""

    @root_validator(pre=True)
    def remove_old_fields(cls, values):
        # 2.7.7 removed worker_pool_queue_id in lieu of worker_pool_name and
        # worker_pool_queue_name. Those fields were later renamed to work_pool_name
        # and work_queue_name. This validator removes old fields provided
        # by older clients to avoid 422 errors.
        values_copy = copy(values)
        worker_pool_queue_id = values_copy.pop("worker_pool_queue_id", None)
        worker_pool_name = values_copy.pop("worker_pool_name", None)
        worker_pool_queue_name = values_copy.pop("worker_pool_queue_name", None)
        work_pool_queue_name = values_copy.pop("work_pool_queue_name", None)
        if worker_pool_queue_id:
            warnings.warn(
                (
                    "`worker_pool_queue_id` is no longer supported for updating "
                    "deployments. Please use `work_pool_name` and "
                    "`work_queue_name` instead."
                ),
                UserWarning,
            )
        if worker_pool_name or worker_pool_queue_name or work_pool_queue_name:
            warnings.warn(
                (
                    "`worker_pool_name`, `worker_pool_queue_name`, and "
                    "`work_pool_name` are"
                    "no longer supported for creating "
                    "deployments. Please use `work_pool_name` and "
                    "`work_queue_name` instead."
                ),
                UserWarning,
            )
        return values_copy

    version: Optional[str] = FieldFrom(objects.Deployment)
    schedule: Optional[SCHEDULE_TYPES] = FieldFrom(objects.Deployment)
    description: Optional[str] = FieldFrom(objects.Deployment)
    is_schedule_active: bool = FieldFrom(objects.Deployment)
    parameters: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Parameters for flow runs scheduled by the deployment.",
    )
    tags: List[str] = FieldFrom(objects.Deployment)
    work_queue_name: Optional[str] = FieldFrom(objects.Deployment)
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the deployment's work pool.",
        example="my-work-pool",
    )
    path: Optional[str] = FieldFrom(objects.Deployment)
    infra_overrides: Optional[Dict[str, Any]] = FieldFrom(objects.Deployment)
    entrypoint: Optional[str] = FieldFrom(objects.Deployment)
    manifest_path: Optional[str] = FieldFrom(objects.Deployment)
    storage_document_id: Optional[UUID] = FieldFrom(objects.Deployment)
    infrastructure_document_id: Optional[UUID] = FieldFrom(objects.Deployment)
    enforce_parameter_schema: Optional[bool] = Field(
        default=None,
        description=(
            "Whether or not the deployment should enforce the parameter schema."
        ),
    )

    def check_valid_configuration(self, base_job_template: dict):
        """Check that the combination of base_job_template defaults
        and infra_overrides conforms to the specified schema.
        """
        variables_schema = deepcopy(base_job_template.get("variables"))

        if variables_schema is not None:
            # jsonschema considers required fields, even if that field has a default,
            # to still be required. To get around this we remove the fields from
            # required if there is a default present.
            required = variables_schema.get("required")
            properties = variables_schema.get("properties")
            if required is not None and properties is not None:
                for k, v in properties.items():
                    if "default" in v and k in required:
                        required.remove(k)

        if variables_schema is not None:
            jsonschema.validate(self.infra_overrides, variables_schema)

check_valid_configuration

Check that the combination of base_job_template defaults and infra_overrides conforms to the specified schema.

Source code in prefect/client/schemas/actions.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def check_valid_configuration(self, base_job_template: dict):
    """Check that the combination of base_job_template defaults
    and infra_overrides conforms to the specified schema.
    """
    variables_schema = deepcopy(base_job_template.get("variables"))

    if variables_schema is not None:
        # jsonschema considers required fields, even if that field has a default,
        # to still be required. To get around this we remove the fields from
        # required if there is a default present.
        required = variables_schema.get("required")
        properties = variables_schema.get("properties")
        if required is not None and properties is not None:
            for k, v in properties.items():
                if "default" in v and k in required:
                    required.remove(k)

    if variables_schema is not None:
        jsonschema.validate(self.infra_overrides, variables_schema)

FlowRunUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a flow run.

Source code in prefect/client/schemas/actions.py
268
269
270
271
272
273
274
275
276
277
@copy_model_fields
class FlowRunUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a flow run."""

    name: Optional[str] = FieldFrom(objects.FlowRun)
    flow_version: Optional[str] = FieldFrom(objects.FlowRun)
    parameters: dict = FieldFrom(objects.FlowRun)
    empirical_policy: objects.FlowRunPolicy = FieldFrom(objects.FlowRun)
    tags: List[str] = FieldFrom(objects.FlowRun)
    infrastructure_pid: Optional[str] = FieldFrom(objects.FlowRun)

TaskRunCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a task run

Source code in prefect/client/schemas/actions.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@copy_model_fields
class TaskRunCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a task run"""

    # TaskRunCreate states must be provided as StateCreate objects
    state: Optional[StateCreate] = Field(
        default=None, description="The state of the task run to create"
    )

    name: str = FieldFrom(objects.TaskRun)
    flow_run_id: Optional[UUID] = FieldFrom(objects.TaskRun)
    task_key: str = FieldFrom(objects.TaskRun)
    dynamic_key: str = FieldFrom(objects.TaskRun)
    cache_key: Optional[str] = FieldFrom(objects.TaskRun)
    cache_expiration: Optional[objects.DateTimeTZ] = FieldFrom(objects.TaskRun)
    task_version: Optional[str] = FieldFrom(objects.TaskRun)
    empirical_policy: objects.TaskRunPolicy = FieldFrom(objects.TaskRun)
    tags: List[str] = FieldFrom(objects.TaskRun)
    task_inputs: Dict[
        str,
        List[
            Union[
                objects.TaskRunResult,
                objects.Parameter,
                objects.Constant,
            ]
        ],
    ] = FieldFrom(objects.TaskRun)

TaskRunUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a task run

Source code in prefect/client/schemas/actions.py
310
311
312
313
314
@copy_model_fields
class TaskRunUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a task run"""

    name: str = FieldFrom(objects.TaskRun)

FlowRunCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a flow run.

Source code in prefect/client/schemas/actions.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@copy_model_fields
class FlowRunCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a flow run."""

    # FlowRunCreate states must be provided as StateCreate objects
    state: Optional[StateCreate] = Field(
        default=None, description="The state of the flow run to create"
    )

    name: str = FieldFrom(objects.FlowRun)
    flow_id: UUID = FieldFrom(objects.FlowRun)
    deployment_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    flow_version: Optional[str] = FieldFrom(objects.FlowRun)
    parameters: dict = FieldFrom(objects.FlowRun)
    context: dict = FieldFrom(objects.FlowRun)
    parent_task_run_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    infrastructure_document_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    empirical_policy: objects.FlowRunPolicy = FieldFrom(objects.FlowRun)
    tags: List[str] = FieldFrom(objects.FlowRun)
    idempotency_key: Optional[str] = FieldFrom(objects.FlowRun)

    class Config(ActionBaseModel.Config):
        json_dumps = orjson_dumps_extra_compatible

DeploymentFlowRunCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a flow run from a deployment.

Source code in prefect/client/schemas/actions.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@copy_model_fields
class DeploymentFlowRunCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a flow run from a deployment."""

    # FlowRunCreate states must be provided as StateCreate objects
    state: Optional[StateCreate] = Field(
        default=None, description="The state of the flow run to create"
    )

    name: Optional[str] = FieldFrom(objects.FlowRun)
    parameters: dict = FieldFrom(objects.FlowRun)
    context: dict = FieldFrom(objects.FlowRun)
    infrastructure_document_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    empirical_policy: objects.FlowRunPolicy = FieldFrom(objects.FlowRun)
    tags: List[str] = FieldFrom(objects.FlowRun)
    idempotency_key: Optional[str] = FieldFrom(objects.FlowRun)
    parent_task_run_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    work_queue_name: Optional[str] = FieldFrom(objects.FlowRun)

SavedSearchCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a saved search.

Source code in prefect/client/schemas/actions.py
362
363
364
365
366
367
@copy_model_fields
class SavedSearchCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a saved search."""

    name: str = FieldFrom(objects.SavedSearch)
    filters: List[objects.SavedSearchFilter] = FieldFrom(objects.SavedSearch)

ConcurrencyLimitCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a concurrency limit.

Source code in prefect/client/schemas/actions.py
370
371
372
373
374
375
@copy_model_fields
class ConcurrencyLimitCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a concurrency limit."""

    tag: str = FieldFrom(objects.ConcurrencyLimit)
    concurrency_limit: int = FieldFrom(objects.ConcurrencyLimit)

BlockTypeCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a block type.

Source code in prefect/client/schemas/actions.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
@copy_model_fields
class BlockTypeCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a block type."""

    name: str = FieldFrom(objects.BlockType)
    slug: str = FieldFrom(objects.BlockType)
    logo_url: Optional[objects.HttpUrl] = FieldFrom(objects.BlockType)
    documentation_url: Optional[objects.HttpUrl] = FieldFrom(objects.BlockType)
    description: Optional[str] = FieldFrom(objects.BlockType)
    code_example: Optional[str] = FieldFrom(objects.BlockType)

    # validators
    _validate_slug_format = validator("slug", allow_reuse=True)(
        validate_block_type_slug
    )

BlockTypeUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a block type.

Source code in prefect/client/schemas/actions.py
395
396
397
398
399
400
401
402
403
404
405
406
@copy_model_fields
class BlockTypeUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a block type."""

    logo_url: Optional[objects.HttpUrl] = FieldFrom(objects.BlockType)
    documentation_url: Optional[objects.HttpUrl] = FieldFrom(objects.BlockType)
    description: Optional[str] = FieldFrom(objects.BlockType)
    code_example: Optional[str] = FieldFrom(objects.BlockType)

    @classmethod
    def updatable_fields(cls) -> set:
        return get_class_fields_only(cls)

BlockSchemaCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a block schema.

Source code in prefect/client/schemas/actions.py
409
410
411
412
413
414
415
416
@copy_model_fields
class BlockSchemaCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a block schema."""

    fields: dict = FieldFrom(objects.BlockSchema)
    block_type_id: Optional[UUID] = FieldFrom(objects.BlockSchema)
    capabilities: List[str] = FieldFrom(objects.BlockSchema)
    version: str = FieldFrom(objects.BlockSchema)

BlockDocumentCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a block document.

Source code in prefect/client/schemas/actions.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
@copy_model_fields
class BlockDocumentCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a block document."""

    name: Optional[str] = FieldFrom(objects.BlockDocument)
    data: dict = FieldFrom(objects.BlockDocument)
    block_schema_id: UUID = FieldFrom(objects.BlockDocument)
    block_type_id: UUID = FieldFrom(objects.BlockDocument)
    is_anonymous: bool = FieldFrom(objects.BlockDocument)

    _validate_name_format = validator("name", allow_reuse=True)(
        validate_block_document_name
    )

    @root_validator
    def validate_name_is_present_if_not_anonymous(cls, values):
        # TODO: We should find an elegant way to reuse this logic from the origin model
        if not values.get("is_anonymous") and not values.get("name"):
            raise ValueError("Names must be provided for block documents.")
        return values

BlockDocumentUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a block document.

Source code in prefect/client/schemas/actions.py
441
442
443
444
445
446
447
448
449
@copy_model_fields
class BlockDocumentUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a block document."""

    block_schema_id: Optional[UUID] = Field(
        default=None, description="A block schema ID"
    )
    data: dict = FieldFrom(objects.BlockDocument)
    merge_existing_data: bool = True

BlockDocumentReferenceCreate

Bases: ActionBaseModel

Data used to create block document reference.

Source code in prefect/client/schemas/actions.py
452
453
454
455
456
457
458
459
@copy_model_fields
class BlockDocumentReferenceCreate(ActionBaseModel):
    """Data used to create block document reference."""

    id: UUID = FieldFrom(objects.BlockDocumentReference)
    parent_block_document_id: UUID = FieldFrom(objects.BlockDocumentReference)
    reference_block_document_id: UUID = FieldFrom(objects.BlockDocumentReference)
    name: str = FieldFrom(objects.BlockDocumentReference)

LogCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a log.

Source code in prefect/client/schemas/actions.py
462
463
464
465
466
467
468
469
470
471
@copy_model_fields
class LogCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a log."""

    name: str = FieldFrom(objects.Log)
    level: int = FieldFrom(objects.Log)
    message: str = FieldFrom(objects.Log)
    timestamp: objects.DateTimeTZ = FieldFrom(objects.Log)
    flow_run_id: Optional[UUID] = FieldFrom(objects.Log)
    task_run_id: Optional[UUID] = FieldFrom(objects.Log)

WorkPoolCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a work pool.

Source code in prefect/client/schemas/actions.py
474
475
476
477
478
479
480
481
482
483
@copy_model_fields
class WorkPoolCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a work pool."""

    name: str = FieldFrom(objects.WorkPool)
    description: Optional[str] = FieldFrom(objects.WorkPool)
    type: str = Field(description="The work pool type.", default="prefect-agent")
    base_job_template: Dict[str, Any] = FieldFrom(objects.WorkPool)
    is_paused: bool = FieldFrom(objects.WorkPool)
    concurrency_limit: Optional[int] = FieldFrom(objects.WorkPool)

WorkPoolUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a work pool.

Source code in prefect/client/schemas/actions.py
486
487
488
489
490
491
492
493
@copy_model_fields
class WorkPoolUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a work pool."""

    description: Optional[str] = FieldFrom(objects.WorkPool)
    is_paused: Optional[bool] = FieldFrom(objects.WorkPool)
    base_job_template: Optional[Dict[str, Any]] = FieldFrom(objects.WorkPool)
    concurrency_limit: Optional[int] = FieldFrom(objects.WorkPool)

WorkQueueCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a work queue.

Source code in prefect/client/schemas/actions.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
@copy_model_fields
class WorkQueueCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a work queue."""

    name: str = FieldFrom(objects.WorkQueue)
    description: Optional[str] = FieldFrom(objects.WorkQueue)
    is_paused: bool = FieldFrom(objects.WorkQueue)
    concurrency_limit: Optional[int] = FieldFrom(objects.WorkQueue)
    priority: Optional[int] = Field(
        default=None,
        description=(
            "The queue's priority. Lower values are higher priority (1 is the highest)."
        ),
    )

    # DEPRECATED

    filter: Optional[objects.QueueFilter] = Field(
        None,
        description="DEPRECATED: Filter criteria for the work queue.",
        deprecated=True,
    )

WorkQueueUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a work queue.

Source code in prefect/client/schemas/actions.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
@copy_model_fields
class WorkQueueUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a work queue."""

    name: str = FieldFrom(objects.WorkQueue)
    description: Optional[str] = FieldFrom(objects.WorkQueue)
    is_paused: bool = FieldFrom(objects.WorkQueue)
    concurrency_limit: Optional[int] = FieldFrom(objects.WorkQueue)
    priority: Optional[int] = FieldFrom(objects.WorkQueue)
    last_polled: Optional[DateTimeTZ] = FieldFrom(objects.WorkQueue)

    # DEPRECATED

    filter: Optional[objects.QueueFilter] = Field(
        None,
        description="DEPRECATED: Filter criteria for the work queue.",
        deprecated=True,
    )

FlowRunNotificationPolicyCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a flow run notification policy.

Source code in prefect/client/schemas/actions.py
540
541
542
543
544
545
546
547
548
@copy_model_fields
class FlowRunNotificationPolicyCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a flow run notification policy."""

    is_active: bool = FieldFrom(objects.FlowRunNotificationPolicy)
    state_names: List[str] = FieldFrom(objects.FlowRunNotificationPolicy)
    tags: List[str] = FieldFrom(objects.FlowRunNotificationPolicy)
    block_document_id: UUID = FieldFrom(objects.FlowRunNotificationPolicy)
    message_template: Optional[str] = FieldFrom(objects.FlowRunNotificationPolicy)

FlowRunNotificationPolicyUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a flow run notification policy.

Source code in prefect/client/schemas/actions.py
551
552
553
554
555
556
557
558
559
@copy_model_fields
class FlowRunNotificationPolicyUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a flow run notification policy."""

    is_active: Optional[bool] = FieldFrom(objects.FlowRunNotificationPolicy)
    state_names: Optional[List[str]] = FieldFrom(objects.FlowRunNotificationPolicy)
    tags: Optional[List[str]] = FieldFrom(objects.FlowRunNotificationPolicy)
    block_document_id: Optional[UUID] = FieldFrom(objects.FlowRunNotificationPolicy)
    message_template: Optional[str] = FieldFrom(objects.FlowRunNotificationPolicy)

ArtifactCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create an artifact.

Source code in prefect/client/schemas/actions.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
@copy_model_fields
class ArtifactCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create an artifact."""

    key: Optional[str] = FieldFrom(objects.Artifact)
    type: Optional[str] = FieldFrom(objects.Artifact)
    description: Optional[str] = FieldFrom(objects.Artifact)
    data: Optional[Union[Dict[str, Any], Any]] = FieldFrom(objects.Artifact)
    metadata_: Optional[Dict[str, str]] = FieldFrom(objects.Artifact)
    flow_run_id: Optional[UUID] = FieldFrom(objects.Artifact)
    task_run_id: Optional[UUID] = FieldFrom(objects.Artifact)

    _validate_artifact_format = validator("key", allow_reuse=True)(
        validate_artifact_key
    )

ArtifactUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update an artifact.

Source code in prefect/client/schemas/actions.py
579
580
581
582
583
584
585
@copy_model_fields
class ArtifactUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update an artifact."""

    data: Optional[Union[Dict[str, Any], Any]] = FieldFrom(objects.Artifact)
    description: Optional[str] = FieldFrom(objects.Artifact)
    metadata_: Optional[Dict[str, str]] = FieldFrom(objects.Artifact)

VariableCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a Variable.

Source code in prefect/client/schemas/actions.py
588
589
590
591
592
593
594
595
596
597
@copy_model_fields
class VariableCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a Variable."""

    name: str = FieldFrom(objects.Variable)
    value: str = FieldFrom(objects.Variable)
    tags: Optional[List[str]] = FieldFrom(objects.Variable)

    # validators
    _validate_name_format = validator("name", allow_reuse=True)(validate_variable_name)

VariableUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a Variable.

Source code in prefect/client/schemas/actions.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
@copy_model_fields
class VariableUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a Variable."""

    name: Optional[str] = Field(
        default=None,
        description="The name of the variable",
        example="my_variable",
        max_length=objects.MAX_VARIABLE_NAME_LENGTH,
    )
    value: Optional[str] = Field(
        default=None,
        description="The value of the variable",
        example="my-value",
        max_length=objects.MAX_VARIABLE_NAME_LENGTH,
    )
    tags: Optional[List[str]] = FieldFrom(objects.Variable)

    # validators
    _validate_name_format = validator("name", allow_reuse=True)(validate_variable_name)

prefect.client.schemas.filters

Schemas that define Prefect REST API filtering operations.

Operator

Bases: AutoEnum

Operators for combining filter criteria.

Source code in prefect/client/schemas/filters.py
21
22
23
24
25
class Operator(AutoEnum):
    """Operators for combining filter criteria."""

    and_ = AutoEnum.auto()
    or_ = AutoEnum.auto()

OperatorMixin

Base model for Prefect filters that combines criteria with a user-provided operator

Source code in prefect/client/schemas/filters.py
28
29
30
31
32
33
34
class OperatorMixin:
    """Base model for Prefect filters that combines criteria with a user-provided operator"""

    operator: Operator = Field(
        default=Operator.and_,
        description="Operator for combining filter criteria. Defaults to 'and_'.",
    )

FlowFilterId

Bases: PrefectBaseModel

Filter by Flow.id.

Source code in prefect/client/schemas/filters.py
37
38
39
40
41
42
class FlowFilterId(PrefectBaseModel):
    """Filter by `Flow.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow ids to include"
    )

FlowFilterName

Bases: PrefectBaseModel

Filter by Flow.name.

Source code in prefect/client/schemas/filters.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class FlowFilterName(PrefectBaseModel):
    """Filter by `Flow.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of flow names to include",
        example=["my-flow-1", "my-flow-2"],
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

FlowFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by Flow.tags.

Source code in prefect/client/schemas/filters.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class FlowFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `Flow.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        example=["tag-1", "tag-2"],
        description=(
            "A list of tags. Flows will be returned only if their tags are a superset"
            " of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include flows without tags"
    )

FlowFilter

Bases: PrefectBaseModel, OperatorMixin

Filter for flows. Only flows matching all criteria will be returned.

Source code in prefect/client/schemas/filters.py
81
82
83
84
85
86
87
88
89
90
91
92
class FlowFilter(PrefectBaseModel, OperatorMixin):
    """Filter for flows. Only flows matching all criteria will be returned."""

    id: Optional[FlowFilterId] = Field(
        default=None, description="Filter criteria for `Flow.id`"
    )
    name: Optional[FlowFilterName] = Field(
        default=None, description="Filter criteria for `Flow.name`"
    )
    tags: Optional[FlowFilterTags] = Field(
        default=None, description="Filter criteria for `Flow.tags`"
    )

FlowRunFilterId

Bases: PrefectBaseModel

Filter by FlowRun.id.

Source code in prefect/client/schemas/filters.py
 95
 96
 97
 98
 99
100
101
102
103
class FlowRunFilterId(PrefectBaseModel):
    """Filter by FlowRun.id."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run ids to include"
    )
    not_any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run ids to exclude"
    )

FlowRunFilterName

Bases: PrefectBaseModel

Filter by FlowRun.name.

Source code in prefect/client/schemas/filters.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class FlowRunFilterName(PrefectBaseModel):
    """Filter by `FlowRun.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of flow run names to include",
        example=["my-flow-run-1", "my-flow-run-2"],
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

FlowRunFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by FlowRun.tags.

Source code in prefect/client/schemas/filters.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class FlowRunFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `FlowRun.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        example=["tag-1", "tag-2"],
        description=(
            "A list of tags. Flow runs will be returned only if their tags are a"
            " superset of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include flow runs without tags"
    )

FlowRunFilterDeploymentId

Bases: PrefectBaseModel, OperatorMixin

Filter by FlowRun.deployment_id.

Source code in prefect/client/schemas/filters.py
142
143
144
145
146
147
148
149
150
151
class FlowRunFilterDeploymentId(PrefectBaseModel, OperatorMixin):
    """Filter by `FlowRun.deployment_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run deployment ids to include"
    )
    is_null_: Optional[bool] = Field(
        default=None,
        description="If true, only include flow runs without deployment ids",
    )

FlowRunFilterWorkQueueName

Bases: PrefectBaseModel, OperatorMixin

Filter by FlowRun.work_queue_name.

Source code in prefect/client/schemas/filters.py
154
155
156
157
158
159
160
161
162
163
164
165
class FlowRunFilterWorkQueueName(PrefectBaseModel, OperatorMixin):
    """Filter by `FlowRun.work_queue_name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of work queue names to include",
        example=["work_queue_1", "work_queue_2"],
    )
    is_null_: Optional[bool] = Field(
        default=None,
        description="If true, only include flow runs without work queue names",
    )

FlowRunFilterStateType

Bases: PrefectBaseModel

Filter by FlowRun.state_type.

Source code in prefect/client/schemas/filters.py
168
169
170
171
172
173
class FlowRunFilterStateType(PrefectBaseModel):
    """Filter by `FlowRun.state_type`."""

    any_: Optional[List[StateType]] = Field(
        default=None, description="A list of flow run state types to include"
    )

FlowRunFilterFlowVersion

Bases: PrefectBaseModel

Filter by FlowRun.flow_version.

Source code in prefect/client/schemas/filters.py
187
188
189
190
191
192
class FlowRunFilterFlowVersion(PrefectBaseModel):
    """Filter by `FlowRun.flow_version`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of flow run flow_versions to include"
    )

FlowRunFilterStartTime

Bases: PrefectBaseModel

Filter by FlowRun.start_time.

Source code in prefect/client/schemas/filters.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class FlowRunFilterStartTime(PrefectBaseModel):
    """Filter by `FlowRun.start_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include flow runs starting at or before this time",
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include flow runs starting at or after this time",
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only return flow runs without a start time"
    )

FlowRunFilterExpectedStartTime

Bases: PrefectBaseModel

Filter by FlowRun.expected_start_time.

Source code in prefect/client/schemas/filters.py
211
212
213
214
215
216
217
218
219
220
221
class FlowRunFilterExpectedStartTime(PrefectBaseModel):
    """Filter by `FlowRun.expected_start_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include flow runs scheduled to start at or before this time",
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include flow runs scheduled to start at or after this time",
    )

FlowRunFilterNextScheduledStartTime

Bases: PrefectBaseModel

Filter by FlowRun.next_scheduled_start_time.

Source code in prefect/client/schemas/filters.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class FlowRunFilterNextScheduledStartTime(PrefectBaseModel):
    """Filter by `FlowRun.next_scheduled_start_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description=(
            "Only include flow runs with a next_scheduled_start_time or before this"
            " time"
        ),
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description=(
            "Only include flow runs with a next_scheduled_start_time at or after this"
            " time"
        ),
    )

FlowRunFilterParentFlowRunId

Bases: PrefectBaseModel, OperatorMixin

Filter for subflows of the given flow runs

Source code in prefect/client/schemas/filters.py
243
244
245
246
247
248
class FlowRunFilterParentFlowRunId(PrefectBaseModel, OperatorMixin):
    """Filter for subflows of the given flow runs"""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run parents to include"
    )

FlowRunFilterParentTaskRunId

Bases: PrefectBaseModel, OperatorMixin

Filter by FlowRun.parent_task_run_id.

Source code in prefect/client/schemas/filters.py
251
252
253
254
255
256
257
258
259
260
class FlowRunFilterParentTaskRunId(PrefectBaseModel, OperatorMixin):
    """Filter by `FlowRun.parent_task_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run parent_task_run_ids to include"
    )
    is_null_: Optional[bool] = Field(
        default=None,
        description="If true, only include flow runs without parent_task_run_id",
    )

FlowRunFilterIdempotencyKey

Bases: PrefectBaseModel

Filter by FlowRun.idempotency_key.

Source code in prefect/client/schemas/filters.py
263
264
265
266
267
268
269
270
271
class FlowRunFilterIdempotencyKey(PrefectBaseModel):
    """Filter by FlowRun.idempotency_key."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of flow run idempotency keys to include"
    )
    not_any_: Optional[List[str]] = Field(
        default=None, description="A list of flow run idempotency keys to exclude"
    )

FlowRunFilter

Bases: PrefectBaseModel, OperatorMixin

Filter flow runs. Only flow runs matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class FlowRunFilter(PrefectBaseModel, OperatorMixin):
    """Filter flow runs. Only flow runs matching all criteria will be returned"""

    id: Optional[FlowRunFilterId] = Field(
        default=None, description="Filter criteria for `FlowRun.id`"
    )
    name: Optional[FlowRunFilterName] = Field(
        default=None, description="Filter criteria for `FlowRun.name`"
    )
    tags: Optional[FlowRunFilterTags] = Field(
        default=None, description="Filter criteria for `FlowRun.tags`"
    )
    deployment_id: Optional[FlowRunFilterDeploymentId] = Field(
        default=None, description="Filter criteria for `FlowRun.deployment_id`"
    )
    work_queue_name: Optional[FlowRunFilterWorkQueueName] = Field(
        default=None, description="Filter criteria for `FlowRun.work_queue_name"
    )
    state: Optional[FlowRunFilterState] = Field(
        default=None, description="Filter criteria for `FlowRun.state`"
    )
    flow_version: Optional[FlowRunFilterFlowVersion] = Field(
        default=None, description="Filter criteria for `FlowRun.flow_version`"
    )
    start_time: Optional[FlowRunFilterStartTime] = Field(
        default=None, description="Filter criteria for `FlowRun.start_time`"
    )
    expected_start_time: Optional[FlowRunFilterExpectedStartTime] = Field(
        default=None, description="Filter criteria for `FlowRun.expected_start_time`"
    )
    next_scheduled_start_time: Optional[FlowRunFilterNextScheduledStartTime] = Field(
        default=None,
        description="Filter criteria for `FlowRun.next_scheduled_start_time`",
    )
    parent_task_run_id: Optional[FlowRunFilterParentTaskRunId] = Field(
        default=None, description="Filter criteria for `FlowRun.parent_task_run_id`"
    )
    idempotency_key: Optional[FlowRunFilterIdempotencyKey] = Field(
        default=None, description="Filter criteria for `FlowRun.idempotency_key`"
    )

TaskRunFilterId

Bases: PrefectBaseModel

Filter by TaskRun.id.

Source code in prefect/client/schemas/filters.py
316
317
318
319
320
321
class TaskRunFilterId(PrefectBaseModel):
    """Filter by `TaskRun.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of task run ids to include"
    )

TaskRunFilterName

Bases: PrefectBaseModel

Filter by TaskRun.name.

Source code in prefect/client/schemas/filters.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
class TaskRunFilterName(PrefectBaseModel):
    """Filter by `TaskRun.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of task run names to include",
        example=["my-task-run-1", "my-task-run-2"],
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

TaskRunFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by TaskRun.tags.

Source code in prefect/client/schemas/filters.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class TaskRunFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `TaskRun.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        example=["tag-1", "tag-2"],
        description=(
            "A list of tags. Task runs will be returned only if their tags are a"
            " superset of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include task runs without tags"
    )

TaskRunFilterStateType

Bases: PrefectBaseModel

Filter by TaskRun.state_type.

Source code in prefect/client/schemas/filters.py
360
361
362
363
364
365
class TaskRunFilterStateType(PrefectBaseModel):
    """Filter by `TaskRun.state_type`."""

    any_: Optional[List[StateType]] = Field(
        default=None, description="A list of task run state types to include"
    )

TaskRunFilterSubFlowRuns

Bases: PrefectBaseModel

Filter by TaskRun.subflow_run.

Source code in prefect/client/schemas/filters.py
379
380
381
382
383
384
385
386
387
388
class TaskRunFilterSubFlowRuns(PrefectBaseModel):
    """Filter by `TaskRun.subflow_run`."""

    exists_: Optional[bool] = Field(
        default=None,
        description=(
            "If true, only include task runs that are subflow run parents; if false,"
            " exclude parent task runs"
        ),
    )

TaskRunFilterStartTime

Bases: PrefectBaseModel

Filter by TaskRun.start_time.

Source code in prefect/client/schemas/filters.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
class TaskRunFilterStartTime(PrefectBaseModel):
    """Filter by `TaskRun.start_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include task runs starting at or before this time",
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include task runs starting at or after this time",
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only return task runs without a start time"
    )

TaskRunFilter

Bases: PrefectBaseModel, OperatorMixin

Filter task runs. Only task runs matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
class TaskRunFilter(PrefectBaseModel, OperatorMixin):
    """Filter task runs. Only task runs matching all criteria will be returned"""

    id: Optional[TaskRunFilterId] = Field(
        default=None, description="Filter criteria for `TaskRun.id`"
    )
    name: Optional[TaskRunFilterName] = Field(
        default=None, description="Filter criteria for `TaskRun.name`"
    )
    tags: Optional[TaskRunFilterTags] = Field(
        default=None, description="Filter criteria for `TaskRun.tags`"
    )
    state: Optional[TaskRunFilterState] = Field(
        default=None, description="Filter criteria for `TaskRun.state`"
    )
    start_time: Optional[TaskRunFilterStartTime] = Field(
        default=None, description="Filter criteria for `TaskRun.start_time`"
    )
    subflow_runs: Optional[TaskRunFilterSubFlowRuns] = Field(
        default=None, description="Filter criteria for `TaskRun.subflow_run`"
    )

DeploymentFilterId

Bases: PrefectBaseModel

Filter by Deployment.id.

Source code in prefect/client/schemas/filters.py
430
431
432
433
434
435
class DeploymentFilterId(PrefectBaseModel):
    """Filter by `Deployment.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of deployment ids to include"
    )

DeploymentFilterName

Bases: PrefectBaseModel

Filter by Deployment.name.

Source code in prefect/client/schemas/filters.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
class DeploymentFilterName(PrefectBaseModel):
    """Filter by `Deployment.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of deployment names to include",
        example=["my-deployment-1", "my-deployment-2"],
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

DeploymentFilterWorkQueueName

Bases: PrefectBaseModel

Filter by Deployment.work_queue_name.

Source code in prefect/client/schemas/filters.py
458
459
460
461
462
463
464
465
class DeploymentFilterWorkQueueName(PrefectBaseModel):
    """Filter by `Deployment.work_queue_name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of work queue names to include",
        example=["work_queue_1", "work_queue_2"],
    )

DeploymentFilterIsScheduleActive

Bases: PrefectBaseModel

Filter by Deployment.is_schedule_active.

Source code in prefect/client/schemas/filters.py
468
469
470
471
472
473
474
class DeploymentFilterIsScheduleActive(PrefectBaseModel):
    """Filter by `Deployment.is_schedule_active`."""

    eq_: Optional[bool] = Field(
        default=None,
        description="Only returns where deployment schedule is/is not active",
    )

DeploymentFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by Deployment.tags.

Source code in prefect/client/schemas/filters.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
class DeploymentFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `Deployment.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        example=["tag-1", "tag-2"],
        description=(
            "A list of tags. Deployments will be returned only if their tags are a"
            " superset of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include deployments without tags"
    )

DeploymentFilter

Bases: PrefectBaseModel, OperatorMixin

Filter for deployments. Only deployments matching all criteria will be returned.

Source code in prefect/client/schemas/filters.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
class DeploymentFilter(PrefectBaseModel, OperatorMixin):
    """Filter for deployments. Only deployments matching all criteria will be returned."""

    id: Optional[DeploymentFilterId] = Field(
        default=None, description="Filter criteria for `Deployment.id`"
    )
    name: Optional[DeploymentFilterName] = Field(
        default=None, description="Filter criteria for `Deployment.name`"
    )
    is_schedule_active: Optional[DeploymentFilterIsScheduleActive] = Field(
        default=None, description="Filter criteria for `Deployment.is_schedule_active`"
    )
    tags: Optional[DeploymentFilterTags] = Field(
        default=None, description="Filter criteria for `Deployment.tags`"
    )
    work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field(
        default=None, description="Filter criteria for `Deployment.work_queue_name`"
    )

LogFilterName

Bases: PrefectBaseModel

Filter by Log.name.

Source code in prefect/client/schemas/filters.py
513
514
515
516
517
518
519
520
class LogFilterName(PrefectBaseModel):
    """Filter by `Log.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of log names to include",
        example=["prefect.logger.flow_runs", "prefect.logger.task_runs"],
    )

LogFilterLevel

Bases: PrefectBaseModel

Filter by Log.level.

Source code in prefect/client/schemas/filters.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
class LogFilterLevel(PrefectBaseModel):
    """Filter by `Log.level`."""

    ge_: Optional[int] = Field(
        default=None,
        description="Include logs with a level greater than or equal to this level",
        example=20,
    )

    le_: Optional[int] = Field(
        default=None,
        description="Include logs with a level less than or equal to this level",
        example=50,
    )

LogFilterTimestamp

Bases: PrefectBaseModel

Filter by Log.timestamp.

Source code in prefect/client/schemas/filters.py
539
540
541
542
543
544
545
546
547
548
549
class LogFilterTimestamp(PrefectBaseModel):
    """Filter by `Log.timestamp`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include logs with a timestamp at or before this time",
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include logs with a timestamp at or after this time",
    )

LogFilterFlowRunId

Bases: PrefectBaseModel

Filter by Log.flow_run_id.

Source code in prefect/client/schemas/filters.py
552
553
554
555
556
557
class LogFilterFlowRunId(PrefectBaseModel):
    """Filter by `Log.flow_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run IDs to include"
    )

LogFilterTaskRunId

Bases: PrefectBaseModel

Filter by Log.task_run_id.

Source code in prefect/client/schemas/filters.py
560
561
562
563
564
565
class LogFilterTaskRunId(PrefectBaseModel):
    """Filter by `Log.task_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of task run IDs to include"
    )

LogFilter

Bases: PrefectBaseModel, OperatorMixin

Filter logs. Only logs matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
class LogFilter(PrefectBaseModel, OperatorMixin):
    """Filter logs. Only logs matching all criteria will be returned"""

    level: Optional[LogFilterLevel] = Field(
        default=None, description="Filter criteria for `Log.level`"
    )
    timestamp: Optional[LogFilterTimestamp] = Field(
        default=None, description="Filter criteria for `Log.timestamp`"
    )
    flow_run_id: Optional[LogFilterFlowRunId] = Field(
        default=None, description="Filter criteria for `Log.flow_run_id`"
    )
    task_run_id: Optional[LogFilterTaskRunId] = Field(
        default=None, description="Filter criteria for `Log.task_run_id`"
    )

FilterSet

Bases: PrefectBaseModel

A collection of filters for common objects

Source code in prefect/client/schemas/filters.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
class FilterSet(PrefectBaseModel):
    """A collection of filters for common objects"""

    flows: FlowFilter = Field(
        default_factory=FlowFilter, description="Filters that apply to flows"
    )
    flow_runs: FlowRunFilter = Field(
        default_factory=FlowRunFilter, description="Filters that apply to flow runs"
    )
    task_runs: TaskRunFilter = Field(
        default_factory=TaskRunFilter, description="Filters that apply to task runs"
    )
    deployments: DeploymentFilter = Field(
        default_factory=DeploymentFilter,
        description="Filters that apply to deployments",
    )

BlockTypeFilterName

Bases: PrefectBaseModel

Filter by BlockType.name

Source code in prefect/client/schemas/filters.py
603
604
605
606
607
608
609
610
611
612
613
614
class BlockTypeFilterName(PrefectBaseModel):
    """Filter by `BlockType.name`"""

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

BlockTypeFilterSlug

Bases: PrefectBaseModel

Filter by BlockType.slug

Source code in prefect/client/schemas/filters.py
617
618
619
620
621
622
class BlockTypeFilterSlug(PrefectBaseModel):
    """Filter by `BlockType.slug`"""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of slugs to match"
    )

BlockTypeFilter

Bases: PrefectBaseModel

Filter BlockTypes

Source code in prefect/client/schemas/filters.py
625
626
627
628
629
630
631
632
633
634
class BlockTypeFilter(PrefectBaseModel):
    """Filter BlockTypes"""

    name: Optional[BlockTypeFilterName] = Field(
        default=None, description="Filter criteria for `BlockType.name`"
    )

    slug: Optional[BlockTypeFilterSlug] = Field(
        default=None, description="Filter criteria for `BlockType.slug`"
    )

BlockSchemaFilterBlockTypeId

Bases: PrefectBaseModel

Filter by BlockSchema.block_type_id.

Source code in prefect/client/schemas/filters.py
637
638
639
640
641
642
class BlockSchemaFilterBlockTypeId(PrefectBaseModel):
    """Filter by `BlockSchema.block_type_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of block type ids to include"
    )

BlockSchemaFilterId

Bases: PrefectBaseModel

Filter by BlockSchema.id

Source code in prefect/client/schemas/filters.py
645
646
647
648
649
650
class BlockSchemaFilterId(PrefectBaseModel):
    """Filter by BlockSchema.id"""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of IDs to include"
    )

BlockSchemaFilterCapabilities

Bases: PrefectBaseModel

Filter by BlockSchema.capabilities

Source code in prefect/client/schemas/filters.py
653
654
655
656
657
658
659
660
661
662
663
class BlockSchemaFilterCapabilities(PrefectBaseModel):
    """Filter by `BlockSchema.capabilities`"""

    all_: Optional[List[str]] = Field(
        default=None,
        example=["write-storage", "read-storage"],
        description=(
            "A list of block capabilities. Block entities will be returned only if an"
            " associated block schema has a superset of the defined capabilities."
        ),
    )

BlockSchemaFilterVersion

Bases: PrefectBaseModel

Filter by BlockSchema.capabilities

Source code in prefect/client/schemas/filters.py
666
667
668
669
670
671
672
673
class BlockSchemaFilterVersion(PrefectBaseModel):
    """Filter by `BlockSchema.capabilities`"""

    any_: Optional[List[str]] = Field(
        default=None,
        example=["2.0.0", "2.1.0"],
        description="A list of block schema versions.",
    )

BlockSchemaFilter

Bases: PrefectBaseModel, OperatorMixin

Filter BlockSchemas

Source code in prefect/client/schemas/filters.py
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
class BlockSchemaFilter(PrefectBaseModel, OperatorMixin):
    """Filter BlockSchemas"""

    block_type_id: Optional[BlockSchemaFilterBlockTypeId] = Field(
        default=None, description="Filter criteria for `BlockSchema.block_type_id`"
    )
    block_capabilities: Optional[BlockSchemaFilterCapabilities] = Field(
        default=None, description="Filter criteria for `BlockSchema.capabilities`"
    )
    id: Optional[BlockSchemaFilterId] = Field(
        default=None, description="Filter criteria for `BlockSchema.id`"
    )
    version: Optional[BlockSchemaFilterVersion] = Field(
        default=None, description="Filter criteria for `BlockSchema.version`"
    )

BlockDocumentFilterIsAnonymous

Bases: PrefectBaseModel

Filter by BlockDocument.is_anonymous.

Source code in prefect/client/schemas/filters.py
693
694
695
696
697
698
699
700
701
class BlockDocumentFilterIsAnonymous(PrefectBaseModel):
    """Filter by `BlockDocument.is_anonymous`."""

    eq_: Optional[bool] = Field(
        default=None,
        description=(
            "Filter block documents for only those that are or are not anonymous."
        ),
    )

BlockDocumentFilterBlockTypeId

Bases: PrefectBaseModel

Filter by BlockDocument.block_type_id.

Source code in prefect/client/schemas/filters.py
704
705
706
707
708
709
class BlockDocumentFilterBlockTypeId(PrefectBaseModel):
    """Filter by `BlockDocument.block_type_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of block type ids to include"
    )

BlockDocumentFilterId

Bases: PrefectBaseModel

Filter by BlockDocument.id.

Source code in prefect/client/schemas/filters.py
712
713
714
715
716
717
class BlockDocumentFilterId(PrefectBaseModel):
    """Filter by `BlockDocument.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of block ids to include"
    )

BlockDocumentFilterName

Bases: PrefectBaseModel

Filter by BlockDocument.name.

Source code in prefect/client/schemas/filters.py
720
721
722
723
724
725
class BlockDocumentFilterName(PrefectBaseModel):
    """Filter by `BlockDocument.name`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of block names to include"
    )

BlockDocumentFilter

Bases: PrefectBaseModel, OperatorMixin

Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
class BlockDocumentFilter(PrefectBaseModel, OperatorMixin):
    """Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned"""

    id: Optional[BlockDocumentFilterId] = Field(
        default=None, description="Filter criteria for `BlockDocument.id`"
    )
    is_anonymous: Optional[BlockDocumentFilterIsAnonymous] = Field(
        # default is to exclude anonymous blocks
        BlockDocumentFilterIsAnonymous(eq_=False),
        description=(
            "Filter criteria for `BlockDocument.is_anonymous`. "
            "Defaults to excluding anonymous blocks."
        ),
    )
    block_type_id: Optional[BlockDocumentFilterBlockTypeId] = Field(
        default=None, description="Filter criteria for `BlockDocument.block_type_id`"
    )
    name: Optional[BlockDocumentFilterName] = Field(
        default=None, description="Filter criteria for `BlockDocument.name`"
    )

FlowRunNotificationPolicyFilterIsActive

Bases: PrefectBaseModel

Filter by FlowRunNotificationPolicy.is_active.

Source code in prefect/client/schemas/filters.py
750
751
752
753
754
755
756
757
758
class FlowRunNotificationPolicyFilterIsActive(PrefectBaseModel):
    """Filter by `FlowRunNotificationPolicy.is_active`."""

    eq_: Optional[bool] = Field(
        default=None,
        description=(
            "Filter notification policies for only those that are or are not active."
        ),
    )

FlowRunNotificationPolicyFilter

Bases: PrefectBaseModel

Filter FlowRunNotificationPolicies.

Source code in prefect/client/schemas/filters.py
761
762
763
764
765
766
767
class FlowRunNotificationPolicyFilter(PrefectBaseModel):
    """Filter FlowRunNotificationPolicies."""

    is_active: Optional[FlowRunNotificationPolicyFilterIsActive] = Field(
        default=FlowRunNotificationPolicyFilterIsActive(eq_=False),
        description="Filter criteria for `FlowRunNotificationPolicy.is_active`. ",
    )

WorkQueueFilterId

Bases: PrefectBaseModel

Filter by WorkQueue.id.

Source code in prefect/client/schemas/filters.py
770
771
772
773
774
775
776
class WorkQueueFilterId(PrefectBaseModel):
    """Filter by `WorkQueue.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None,
        description="A list of work queue ids to include",
    )

WorkQueueFilterName

Bases: PrefectBaseModel

Filter by WorkQueue.name.

Source code in prefect/client/schemas/filters.py
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
class WorkQueueFilterName(PrefectBaseModel):
    """Filter by `WorkQueue.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of work queue names to include",
        example=["wq-1", "wq-2"],
    )

    startswith_: Optional[List[str]] = Field(
        default=None,
        description=(
            "A list of case-insensitive starts-with matches. For example, "
            " passing 'marvin' will match "
            "'marvin', and 'Marvin-robot', but not 'sad-marvin'."
        ),
        example=["marvin", "Marvin-robot"],
    )

WorkQueueFilter

Bases: PrefectBaseModel, OperatorMixin

Filter work queues. Only work queues matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
799
800
801
802
803
804
805
806
807
808
809
class WorkQueueFilter(PrefectBaseModel, OperatorMixin):
    """Filter work queues. Only work queues matching all criteria will be
    returned"""

    id: Optional[WorkQueueFilterId] = Field(
        default=None, description="Filter criteria for `WorkQueue.id`"
    )

    name: Optional[WorkQueueFilterName] = Field(
        default=None, description="Filter criteria for `WorkQueue.name`"
    )

WorkPoolFilterId

Bases: PrefectBaseModel

Filter by WorkPool.id.

Source code in prefect/client/schemas/filters.py
812
813
814
815
816
817
class WorkPoolFilterId(PrefectBaseModel):
    """Filter by `WorkPool.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of work pool ids to include"
    )

WorkPoolFilterName

Bases: PrefectBaseModel

Filter by WorkPool.name.

Source code in prefect/client/schemas/filters.py
820
821
822
823
824
825
class WorkPoolFilterName(PrefectBaseModel):
    """Filter by `WorkPool.name`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of work pool names to include"
    )

WorkPoolFilterType

Bases: PrefectBaseModel

Filter by WorkPool.type.

Source code in prefect/client/schemas/filters.py
828
829
830
831
832
833
class WorkPoolFilterType(PrefectBaseModel):
    """Filter by `WorkPool.type`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of work pool types to include"
    )

WorkerFilterWorkPoolId

Bases: PrefectBaseModel

Filter by Worker.worker_config_id.

Source code in prefect/client/schemas/filters.py
848
849
850
851
852
853
class WorkerFilterWorkPoolId(PrefectBaseModel):
    """Filter by `Worker.worker_config_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of work pool ids to include"
    )

WorkerFilterLastHeartbeatTime

Bases: PrefectBaseModel

Filter by Worker.last_heartbeat_time.

Source code in prefect/client/schemas/filters.py
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
class WorkerFilterLastHeartbeatTime(PrefectBaseModel):
    """Filter by `Worker.last_heartbeat_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description=(
            "Only include processes whose last heartbeat was at or before this time"
        ),
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description=(
            "Only include processes whose last heartbeat was at or after this time"
        ),
    )

ArtifactFilterId

Bases: PrefectBaseModel

Filter by Artifact.id.

Source code in prefect/client/schemas/filters.py
884
885
886
887
888
889
class ArtifactFilterId(PrefectBaseModel):
    """Filter by `Artifact.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of artifact ids to include"
    )

ArtifactFilterKey

Bases: PrefectBaseModel

Filter by Artifact.key.

Source code in prefect/client/schemas/filters.py
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
class ArtifactFilterKey(PrefectBaseModel):
    """Filter by `Artifact.key`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact keys to include"
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match artifact keys against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        example="my-artifact-%",
    )

    exists_: Optional[bool] = Field(
        default=None,
        description=(
            "If `true`, only include artifacts with a non-null key. If `false`, "
            "only include artifacts with a null key."
        ),
    )

ArtifactFilterFlowRunId

Bases: PrefectBaseModel

Filter by Artifact.flow_run_id.

Source code in prefect/client/schemas/filters.py
917
918
919
920
921
922
class ArtifactFilterFlowRunId(PrefectBaseModel):
    """Filter by `Artifact.flow_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run IDs to include"
    )

ArtifactFilterTaskRunId

Bases: PrefectBaseModel

Filter by Artifact.task_run_id.

Source code in prefect/client/schemas/filters.py
925
926
927
928
929
930
class ArtifactFilterTaskRunId(PrefectBaseModel):
    """Filter by `Artifact.task_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of task run IDs to include"
    )

ArtifactFilterType

Bases: PrefectBaseModel

Filter by Artifact.type.

Source code in prefect/client/schemas/filters.py
933
934
935
936
937
938
939
940
941
class ArtifactFilterType(PrefectBaseModel):
    """Filter by `Artifact.type`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact types to include"
    )
    not_any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact types to exclude"
    )

ArtifactFilter

Bases: PrefectBaseModel, OperatorMixin

Filter artifacts. Only artifacts matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
class ArtifactFilter(PrefectBaseModel, OperatorMixin):
    """Filter artifacts. Only artifacts matching all criteria will be returned"""

    id: Optional[ArtifactFilterId] = Field(
        default=None, description="Filter criteria for `Artifact.id`"
    )
    key: Optional[ArtifactFilterKey] = Field(
        default=None, description="Filter criteria for `Artifact.key`"
    )
    flow_run_id: Optional[ArtifactFilterFlowRunId] = Field(
        default=None, description="Filter criteria for `Artifact.flow_run_id`"
    )
    task_run_id: Optional[ArtifactFilterTaskRunId] = Field(
        default=None, description="Filter criteria for `Artifact.task_run_id`"
    )
    type: Optional[ArtifactFilterType] = Field(
        default=None, description="Filter criteria for `Artifact.type`"
    )

ArtifactCollectionFilterLatestId

Bases: PrefectBaseModel

Filter by ArtifactCollection.latest_id.

Source code in prefect/client/schemas/filters.py
964
965
966
967
968
969
class ArtifactCollectionFilterLatestId(PrefectBaseModel):
    """Filter by `ArtifactCollection.latest_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of artifact ids to include"
    )

ArtifactCollectionFilterKey

Bases: PrefectBaseModel

Filter by ArtifactCollection.key.

Source code in prefect/client/schemas/filters.py
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
class ArtifactCollectionFilterKey(PrefectBaseModel):
    """Filter by `ArtifactCollection.key`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact keys to include"
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match artifact keys against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        example="my-artifact-%",
    )

    exists_: Optional[bool] = Field(
        default=None,
        description=(
            "If `true`, only include artifacts with a non-null key. If `false`, "
            "only include artifacts with a null key. Should return all rows in "
            "the ArtifactCollection table if specified."
        ),
    )

ArtifactCollectionFilterFlowRunId

Bases: PrefectBaseModel

Filter by ArtifactCollection.flow_run_id.

Source code in prefect/client/schemas/filters.py
 998
 999
1000
1001
1002
1003
class ArtifactCollectionFilterFlowRunId(PrefectBaseModel):
    """Filter by `ArtifactCollection.flow_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run IDs to include"
    )

ArtifactCollectionFilterTaskRunId

Bases: PrefectBaseModel

Filter by ArtifactCollection.task_run_id.

Source code in prefect/client/schemas/filters.py
1006
1007
1008
1009
1010
1011
class ArtifactCollectionFilterTaskRunId(PrefectBaseModel):
    """Filter by `ArtifactCollection.task_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of task run IDs to include"
    )

ArtifactCollectionFilterType

Bases: PrefectBaseModel

Filter by ArtifactCollection.type.

Source code in prefect/client/schemas/filters.py
1014
1015
1016
1017
1018
1019
1020
1021
1022
class ArtifactCollectionFilterType(PrefectBaseModel):
    """Filter by `ArtifactCollection.type`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact types to include"
    )
    not_any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact types to exclude"
    )

ArtifactCollectionFilter

Bases: PrefectBaseModel, OperatorMixin

Filter artifact collections. Only artifact collections matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
class ArtifactCollectionFilter(PrefectBaseModel, OperatorMixin):
    """Filter artifact collections. Only artifact collections matching all criteria will be returned"""

    latest_id: Optional[ArtifactCollectionFilterLatestId] = Field(
        default=None, description="Filter criteria for `Artifact.id`"
    )
    key: Optional[ArtifactCollectionFilterKey] = Field(
        default=None, description="Filter criteria for `Artifact.key`"
    )
    flow_run_id: Optional[ArtifactCollectionFilterFlowRunId] = Field(
        default=None, description="Filter criteria for `Artifact.flow_run_id`"
    )
    task_run_id: Optional[ArtifactCollectionFilterTaskRunId] = Field(
        default=None, description="Filter criteria for `Artifact.task_run_id`"
    )
    type: Optional[ArtifactCollectionFilterType] = Field(
        default=None, description="Filter criteria for `Artifact.type`"
    )

VariableFilterId

Bases: PrefectBaseModel

Filter by Variable.id.

Source code in prefect/client/schemas/filters.py
1045
1046
1047
1048
1049
1050
class VariableFilterId(PrefectBaseModel):
    """Filter by `Variable.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of variable ids to include"
    )

VariableFilterName

Bases: PrefectBaseModel

Filter by Variable.name.

Source code in prefect/client/schemas/filters.py
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
class VariableFilterName(PrefectBaseModel):
    """Filter by `Variable.name`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of variables names to include"
    )
    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match variable names against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        example="my_variable_%",
    )

VariableFilterValue

Bases: PrefectBaseModel

Filter by Variable.value.

Source code in prefect/client/schemas/filters.py
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
class VariableFilterValue(PrefectBaseModel):
    """Filter by `Variable.value`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of variables value to include"
    )
    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match variable value against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        example="my-value-%",
    )

VariableFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by Variable.tags.

Source code in prefect/client/schemas/filters.py
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
class VariableFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `Variable.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        example=["tag-1", "tag-2"],
        description=(
            "A list of tags. Variables will be returned only if their tags are a"
            " superset of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include Variables without tags"
    )

VariableFilter

Bases: PrefectBaseModel, OperatorMixin

Filter variables. Only variables matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
class VariableFilter(PrefectBaseModel, OperatorMixin):
    """Filter variables. Only variables matching all criteria will be returned"""

    id: Optional[VariableFilterId] = Field(
        default=None, description="Filter criteria for `Variable.id`"
    )
    name: Optional[VariableFilterName] = Field(
        default=None, description="Filter criteria for `Variable.name`"
    )
    value: Optional[VariableFilterValue] = Field(
        default=None, description="Filter criteria for `Variable.value`"
    )
    tags: Optional[VariableFilterTags] = Field(
        default=None, description="Filter criteria for `Variable.tags`"
    )

prefect.client.schemas.objects

StateType

Bases: AutoEnum

Enumeration of state types.

Source code in prefect/client/schemas/objects.py
60
61
62
63
64
65
66
67
68
69
70
71
class StateType(AutoEnum):
    """Enumeration of state types."""

    SCHEDULED = AutoEnum.auto()
    PENDING = AutoEnum.auto()
    RUNNING = AutoEnum.auto()
    COMPLETED = AutoEnum.auto()
    FAILED = AutoEnum.auto()
    CANCELLED = AutoEnum.auto()
    CRASHED = AutoEnum.auto()
    PAUSED = AutoEnum.auto()
    CANCELLING = AutoEnum.auto()

WorkPoolStatus

Bases: AutoEnum

Enumeration of work pool statuses.

Source code in prefect/client/schemas/objects.py
74
75
76
77
78
79
class WorkPoolStatus(AutoEnum):
    """Enumeration of work pool statuses."""

    READY = AutoEnum.auto()
    NOT_READY = AutoEnum.auto()
    PAUSED = AutoEnum.auto()

WorkerStatus

Bases: AutoEnum

Enumeration of worker statuses.

Source code in prefect/client/schemas/objects.py
82
83
84
85
86
class WorkerStatus(AutoEnum):
    """Enumeration of worker statuses."""

    ONLINE = AutoEnum.auto()
    OFFLINE = AutoEnum.auto()

DeploymentStatus

Bases: AutoEnum

Enumeration of deployment statuses.

Source code in prefect/client/schemas/objects.py
89
90
91
92
93
class DeploymentStatus(AutoEnum):
    """Enumeration of deployment statuses."""

    READY = AutoEnum.auto()
    NOT_READY = AutoEnum.auto()

State

Bases: ObjectBaseModel, Generic[R]

The state of a run.

Source code in prefect/client/schemas/objects.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class State(ObjectBaseModel, Generic[R]):
    """
    The state of a run.
    """

    type: StateType
    name: Optional[str] = Field(default=None)
    timestamp: DateTimeTZ = Field(default_factory=lambda: pendulum.now("UTC"))
    message: Optional[str] = Field(default=None, example="Run started")
    state_details: StateDetails = Field(default_factory=StateDetails)
    data: Union["BaseResult[R]", "DataDocument[R]", Any] = Field(
        default=None,
    )

    @overload
    def result(self: "State[R]", raise_on_failure: bool = True) -> R:
        ...

    @overload
    def result(self: "State[R]", raise_on_failure: bool = False) -> Union[R, Exception]:
        ...

    def result(self, raise_on_failure: bool = True, fetch: Optional[bool] = None):
        """
        Retrieve the result attached to this state.

        Args:
            raise_on_failure: a boolean specifying whether to raise an exception
                if the state is of type `FAILED` and the underlying data is an exception
            fetch: a boolean specifying whether to resolve references to persisted
                results into data. For synchronous users, this defaults to `True`.
                For asynchronous users, this defaults to `False` for backwards
                compatibility.

        Raises:
            TypeError: If the state is failed but the result is not an exception.

        Returns:
            The result of the run

        Examples:
            >>> from prefect import flow, task
            >>> @task
            >>> def my_task(x):
            >>>     return x

            Get the result from a task future in a flow

            >>> @flow
            >>> def my_flow():
            >>>     future = my_task("hello")
            >>>     state = future.wait()
            >>>     result = state.result()
            >>>     print(result)
            >>> my_flow()
            hello

            Get the result from a flow state

            >>> @flow
            >>> def my_flow():
            >>>     return "hello"
            >>> my_flow(return_state=True).result()
            hello

            Get the result from a failed state

            >>> @flow
            >>> def my_flow():
            >>>     raise ValueError("oh no!")
            >>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
            >>> state.result()  # Raises `ValueError`

            Get the result from a failed state without erroring

            >>> @flow
            >>> def my_flow():
            >>>     raise ValueError("oh no!")
            >>> state = my_flow(return_state=True)
            >>> result = state.result(raise_on_failure=False)
            >>> print(result)
            ValueError("oh no!")


            Get the result from a flow state in an async context

            >>> @flow
            >>> async def my_flow():
            >>>     return "hello"
            >>> state = await my_flow(return_state=True)
            >>> await state.result()
            hello
        """
        from prefect.states import get_state_result

        return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

    def to_state_create(self):
        """
        Convert this state to a `StateCreate` type which can be used to set the state of
        a run in the API.

        This method will drop this state's `data` if it is not a result type. Only
        results should be sent to the API. Other data is only available locally.
        """
        from prefect.client.schemas.actions import StateCreate
        from prefect.results import BaseResult

        return StateCreate(
            type=self.type,
            name=self.name,
            message=self.message,
            data=self.data if isinstance(self.data, BaseResult) else None,
            state_details=self.state_details,
        )

    @validator("name", always=True)
    def default_name_from_type(cls, v, *, values, **kwargs):
        """If a name is not provided, use the type"""

        # if `type` is not in `values` it means the `type` didn't pass its own
        # validation check and an error will be raised after this function is called
        if v is None and values.get("type"):
            v = " ".join([v.capitalize() for v in values.get("type").value.split("_")])
        return v

    @root_validator
    def default_scheduled_start_time(cls, values):
        """
        TODO: This should throw an error instead of setting a default but is out of
              scope for https://github.com/PrefectHQ/orion/pull/174/ and can be rolled
              into work refactoring state initialization
        """
        if values.get("type") == StateType.SCHEDULED:
            state_details = values.setdefault(
                "state_details", cls.__fields__["state_details"].get_default()
            )
            if not state_details.scheduled_time:
                state_details.scheduled_time = pendulum.now("utc")
        return values

    def is_scheduled(self) -> bool:
        return self.type == StateType.SCHEDULED

    def is_pending(self) -> bool:
        return self.type == StateType.PENDING

    def is_running(self) -> bool:
        return self.type == StateType.RUNNING

    def is_completed(self) -> bool:
        return self.type == StateType.COMPLETED

    def is_failed(self) -> bool:
        return self.type == StateType.FAILED

    def is_crashed(self) -> bool:
        return self.type == StateType.CRASHED

    def is_cancelled(self) -> bool:
        return self.type == StateType.CANCELLED

    def is_cancelling(self) -> bool:
        return self.type == StateType.CANCELLING

    def is_final(self) -> bool:
        return self.type in {
            StateType.CANCELLED,
            StateType.FAILED,
            StateType.COMPLETED,
            StateType.CRASHED,
        }

    def is_paused(self) -> bool:
        return self.type == StateType.PAUSED

    def copy(self, *, update: dict = None, reset_fields: bool = False, **kwargs):
        """
        Copying API models should return an object that could be inserted into the
        database again. The 'timestamp' is reset using the default factory.
        """
        update = update or {}
        update.setdefault("timestamp", self.__fields__["timestamp"].get_default())
        return super().copy(reset_fields=reset_fields, update=update, **kwargs)

    def __repr__(self) -> str:
        """
        Generates a complete state representation appropriate for introspection
        and debugging, including the result:

        `MyCompletedState(message="my message", type=COMPLETED, result=...)`
        """
        from prefect.deprecated.data_documents import DataDocument

        if isinstance(self.data, DataDocument):
            result = self.data.decode()
        else:
            result = self.data

        display = dict(
            message=repr(self.message),
            type=str(self.type.value),
            result=repr(result),
        )

        return f"{self.name}({', '.join(f'{k}={v}' for k, v in display.items())})"

    def __str__(self) -> str:
        """
        Generates a simple state representation appropriate for logging:

        `MyCompletedState("my message", type=COMPLETED)`
        """

        display = []

        if self.message:
            display.append(repr(self.message))

        if self.type.value.lower() != self.name.lower():
            display.append(f"type={self.type.value}")

        return f"{self.name}({', '.join(display)})"

    def __hash__(self) -> int:
        return hash(
            (
                getattr(self.state_details, "flow_run_id", None),
                getattr(self.state_details, "task_run_id", None),
                self.timestamp,
                self.type,
            )
        )

result

Retrieve the result attached to this state.

Parameters:

Name Type Description Default
raise_on_failure bool

a boolean specifying whether to raise an exception if the state is of type FAILED and the underlying data is an exception

True
fetch Optional[bool]

a boolean specifying whether to resolve references to persisted results into data. For synchronous users, this defaults to True. For asynchronous users, this defaults to False for backwards compatibility.

None

Raises:

Type Description
TypeError

If the state is failed but the result is not an exception.

Returns:

Type Description

The result of the run

Examples:

>>> from prefect import flow, task
>>> @task
>>> def my_task(x):
>>>     return x

Get the result from a task future in a flow

>>> @flow
>>> def my_flow():
>>>     future = my_task("hello")
>>>     state = future.wait()
>>>     result = state.result()
>>>     print(result)
>>> my_flow()
hello

Get the result from a flow state

>>> @flow
>>> def my_flow():
>>>     return "hello"
>>> my_flow(return_state=True).result()
hello

Get the result from a failed state

>>> @flow
>>> def my_flow():
>>>     raise ValueError("oh no!")
>>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
>>> state.result()  # Raises `ValueError`

Get the result from a failed state without erroring

>>> @flow
>>> def my_flow():
>>>     raise ValueError("oh no!")
>>> state = my_flow(return_state=True)
>>> result = state.result(raise_on_failure=False)
>>> print(result)
ValueError("oh no!")

Get the result from a flow state in an async context

>>> @flow
>>> async def my_flow():
>>>     return "hello"
>>> state = await my_flow(return_state=True)
>>> await state.result()
hello
Source code in prefect/client/schemas/objects.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def result(self, raise_on_failure: bool = True, fetch: Optional[bool] = None):
    """
    Retrieve the result attached to this state.

    Args:
        raise_on_failure: a boolean specifying whether to raise an exception
            if the state is of type `FAILED` and the underlying data is an exception
        fetch: a boolean specifying whether to resolve references to persisted
            results into data. For synchronous users, this defaults to `True`.
            For asynchronous users, this defaults to `False` for backwards
            compatibility.

    Raises:
        TypeError: If the state is failed but the result is not an exception.

    Returns:
        The result of the run

    Examples:
        >>> from prefect import flow, task
        >>> @task
        >>> def my_task(x):
        >>>     return x

        Get the result from a task future in a flow

        >>> @flow
        >>> def my_flow():
        >>>     future = my_task("hello")
        >>>     state = future.wait()
        >>>     result = state.result()
        >>>     print(result)
        >>> my_flow()
        hello

        Get the result from a flow state

        >>> @flow
        >>> def my_flow():
        >>>     return "hello"
        >>> my_flow(return_state=True).result()
        hello

        Get the result from a failed state

        >>> @flow
        >>> def my_flow():
        >>>     raise ValueError("oh no!")
        >>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
        >>> state.result()  # Raises `ValueError`

        Get the result from a failed state without erroring

        >>> @flow
        >>> def my_flow():
        >>>     raise ValueError("oh no!")
        >>> state = my_flow(return_state=True)
        >>> result = state.result(raise_on_failure=False)
        >>> print(result)
        ValueError("oh no!")


        Get the result from a flow state in an async context

        >>> @flow
        >>> async def my_flow():
        >>>     return "hello"
        >>> state = await my_flow(return_state=True)
        >>> await state.result()
        hello
    """
    from prefect.states import get_state_result

    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

to_state_create

Convert this state to a StateCreate type which can be used to set the state of a run in the API.

This method will drop this state's data if it is not a result type. Only results should be sent to the API. Other data is only available locally.

Source code in prefect/client/schemas/objects.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def to_state_create(self):
    """
    Convert this state to a `StateCreate` type which can be used to set the state of
    a run in the API.

    This method will drop this state's `data` if it is not a result type. Only
    results should be sent to the API. Other data is only available locally.
    """
    from prefect.client.schemas.actions import StateCreate
    from prefect.results import BaseResult

    return StateCreate(
        type=self.type,
        name=self.name,
        message=self.message,
        data=self.data if isinstance(self.data, BaseResult) else None,
        state_details=self.state_details,
    )

default_name_from_type

If a name is not provided, use the type

Source code in prefect/client/schemas/objects.py
227
228
229
230
231
232
233
234
235
@validator("name", always=True)
def default_name_from_type(cls, v, *, values, **kwargs):
    """If a name is not provided, use the type"""

    # if `type` is not in `values` it means the `type` didn't pass its own
    # validation check and an error will be raised after this function is called
    if v is None and values.get("type"):
        v = " ".join([v.capitalize() for v in values.get("type").value.split("_")])
    return v

default_scheduled_start_time

This should throw an error instead of setting a default but is out of

scope for https://github.com/PrefectHQ/orion/pull/174/ and can be rolled into work refactoring state initialization

Source code in prefect/client/schemas/objects.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@root_validator
def default_scheduled_start_time(cls, values):
    """
    TODO: This should throw an error instead of setting a default but is out of
          scope for https://github.com/PrefectHQ/orion/pull/174/ and can be rolled
          into work refactoring state initialization
    """
    if values.get("type") == StateType.SCHEDULED:
        state_details = values.setdefault(
            "state_details", cls.__fields__["state_details"].get_default()
        )
        if not state_details.scheduled_time:
            state_details.scheduled_time = pendulum.now("utc")
    return values

FlowRunPolicy

Bases: PrefectBaseModel

Defines of how a flow run should be orchestrated.

Source code in prefect/client/schemas/objects.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
class FlowRunPolicy(PrefectBaseModel):
    """Defines of how a flow run should be orchestrated."""

    max_retries: int = Field(
        default=0,
        description=(
            "The maximum number of retries. Field is not used. Please use `retries`"
            " instead."
        ),
        deprecated=True,
    )
    retry_delay_seconds: float = Field(
        default=0,
        description=(
            "The delay between retries. Field is not used. Please use `retry_delay`"
            " instead."
        ),
        deprecated=True,
    )
    retries: Optional[int] = Field(default=None, description="The number of retries.")
    retry_delay: Optional[int] = Field(
        default=None, description="The delay time between retries, in seconds."
    )
    pause_keys: Optional[set] = Field(
        default_factory=set, description="Tracks pauses this run has observed."
    )
    resuming: Optional[bool] = Field(
        default=False, description="Indicates if this run is resuming from a pause."
    )

    @root_validator
    def populate_deprecated_fields(cls, values):
        """
        If deprecated fields are provided, populate the corresponding new fields
        to preserve orchestration behavior.
        """
        if not values.get("retries", None) and values.get("max_retries", 0) != 0:
            values["retries"] = values["max_retries"]
        if (
            not values.get("retry_delay", None)
            and values.get("retry_delay_seconds", 0) != 0
        ):
            values["retry_delay"] = values["retry_delay_seconds"]
        return values

populate_deprecated_fields

If deprecated fields are provided, populate the corresponding new fields to preserve orchestration behavior.

Source code in prefect/client/schemas/objects.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
@root_validator
def populate_deprecated_fields(cls, values):
    """
    If deprecated fields are provided, populate the corresponding new fields
    to preserve orchestration behavior.
    """
    if not values.get("retries", None) and values.get("max_retries", 0) != 0:
        values["retries"] = values["max_retries"]
    if (
        not values.get("retry_delay", None)
        and values.get("retry_delay_seconds", 0) != 0
    ):
        values["retry_delay"] = values["retry_delay_seconds"]
    return values

FlowRun

Bases: ObjectBaseModel

Source code in prefect/client/schemas/objects.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
class FlowRun(ObjectBaseModel):
    name: str = Field(
        default_factory=lambda: generate_slug(2),
        description=(
            "The name of the flow run. Defaults to a random slug if not specified."
        ),
        example="my-flow-run",
    )
    flow_id: UUID = Field(default=..., description="The id of the flow being run.")
    state_id: Optional[UUID] = Field(
        default=None, description="The id of the flow run's current state."
    )
    deployment_id: Optional[UUID] = Field(
        default=None,
        description=(
            "The id of the deployment associated with this flow run, if available."
        ),
    )
    work_queue_name: Optional[str] = Field(
        default=None, description="The work queue that handled this flow run."
    )
    flow_version: Optional[str] = Field(
        default=None,
        description="The version of the flow executed in this flow run.",
        example="1.0",
    )
    parameters: dict = Field(
        default_factory=dict, description="Parameters for the flow run."
    )
    idempotency_key: Optional[str] = Field(
        default=None,
        description=(
            "An optional idempotency key for the flow run. Used to ensure the same flow"
            " run is not created multiple times."
        ),
    )
    context: dict = Field(
        default_factory=dict,
        description="Additional context for the flow run.",
        example={"my_var": "my_val"},
    )
    empirical_policy: FlowRunPolicy = Field(
        default_factory=FlowRunPolicy,
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of tags on the flow run",
        example=["tag-1", "tag-2"],
    )
    parent_task_run_id: Optional[UUID] = Field(
        default=None,
        description=(
            "If the flow run is a subflow, the id of the 'dummy' task in the parent"
            " flow used to track subflow state."
        ),
    )
    run_count: int = Field(
        default=0, description="The number of times the flow run was executed."
    )
    expected_start_time: Optional[DateTimeTZ] = Field(
        default=None,
        description="The flow run's expected start time.",
    )
    next_scheduled_start_time: Optional[DateTimeTZ] = Field(
        default=None,
        description="The next time the flow run is scheduled to start.",
    )
    start_time: Optional[DateTimeTZ] = Field(
        default=None, description="The actual start time."
    )
    end_time: Optional[DateTimeTZ] = Field(
        default=None, description="The actual end time."
    )
    total_run_time: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description=(
            "Total run time. If the flow run was executed multiple times, the time of"
            " each run will be summed."
        ),
    )
    estimated_run_time: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description="A real-time estimate of the total run time.",
    )
    estimated_start_time_delta: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description="The difference between actual and expected start time.",
    )
    auto_scheduled: bool = Field(
        default=False,
        description="Whether or not the flow run was automatically scheduled.",
    )
    infrastructure_document_id: Optional[UUID] = Field(
        default=None,
        description="The block document defining infrastructure to use this flow run.",
    )
    infrastructure_pid: Optional[str] = Field(
        default=None,
        description="The id of the flow run as returned by an infrastructure block.",
    )
    created_by: Optional[CreatedBy] = Field(
        default=None,
        description="Optional information about the creator of this flow run.",
    )
    work_queue_id: Optional[UUID] = Field(
        default=None, description="The id of the run's work pool queue."
    )

    work_pool_id: Optional[UUID] = Field(
        description="The work pool with which the queue is associated."
    )
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the flow run's work pool.",
        example="my-work-pool",
    )
    state: Optional[State] = Field(
        default=None,
        description="The state of the flow run.",
        example=State(type=StateType.COMPLETED),
    )

    def __eq__(self, other: Any) -> bool:
        """
        Check for "equality" to another flow run schema

        Estimates times are rolling and will always change with repeated queries for
        a flow run so we ignore them during equality checks.
        """
        if isinstance(other, FlowRun):
            exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
            return self.dict(exclude=exclude_fields) == other.dict(
                exclude=exclude_fields
            )
        return super().__eq__(other)

    @validator("name", pre=True)
    def set_default_name(cls, name):
        return name or generate_slug(2)

    # These are server-side optimizations and should not be present on client models
    # TODO: Deprecate these fields

    state_type: Optional[StateType] = Field(
        default=None, description="The type of the current flow run state."
    )
    state_name: Optional[str] = Field(
        default=None, description="The name of the current flow run state."
    )

TaskRunPolicy

Bases: PrefectBaseModel

Defines of how a task run should retry.

Source code in prefect/client/schemas/objects.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
class TaskRunPolicy(PrefectBaseModel):
    """Defines of how a task run should retry."""

    max_retries: int = Field(
        default=0,
        description=(
            "The maximum number of retries. Field is not used. Please use `retries`"
            " instead."
        ),
        deprecated=True,
    )
    retry_delay_seconds: float = Field(
        default=0,
        description=(
            "The delay between retries. Field is not used. Please use `retry_delay`"
            " instead."
        ),
        deprecated=True,
    )
    retries: Optional[int] = Field(default=None, description="The number of retries.")
    retry_delay: Union[None, int, List[int]] = Field(
        default=None,
        description="A delay time or list of delay times between retries, in seconds.",
    )
    retry_jitter_factor: Optional[float] = Field(
        default=None, description="Determines the amount a retry should jitter"
    )

    @root_validator
    def populate_deprecated_fields(cls, values):
        """
        If deprecated fields are provided, populate the corresponding new fields
        to preserve orchestration behavior.
        """
        if not values.get("retries", None) and values.get("max_retries", 0) != 0:
            values["retries"] = values["max_retries"]

        if (
            not values.get("retry_delay", None)
            and values.get("retry_delay_seconds", 0) != 0
        ):
            values["retry_delay"] = values["retry_delay_seconds"]

        return values

    @validator("retry_delay")
    def validate_configured_retry_delays(cls, v):
        if isinstance(v, list) and (len(v) > 50):
            raise ValueError("Can not configure more than 50 retry delays per task.")
        return v

    @validator("retry_jitter_factor")
    def validate_jitter_factor(cls, v):
        if v is not None and v < 0:
            raise ValueError("`retry_jitter_factor` must be >= 0.")
        return v

populate_deprecated_fields

If deprecated fields are provided, populate the corresponding new fields to preserve orchestration behavior.

Source code in prefect/client/schemas/objects.py
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
@root_validator
def populate_deprecated_fields(cls, values):
    """
    If deprecated fields are provided, populate the corresponding new fields
    to preserve orchestration behavior.
    """
    if not values.get("retries", None) and values.get("max_retries", 0) != 0:
        values["retries"] = values["max_retries"]

    if (
        not values.get("retry_delay", None)
        and values.get("retry_delay_seconds", 0) != 0
    ):
        values["retry_delay"] = values["retry_delay_seconds"]

    return values

TaskRunInput

Bases: PrefectBaseModel

Base class for classes that represent inputs to task runs, which could include, constants, parameters, or other task runs.

Source code in prefect/client/schemas/objects.py
601
602
603
604
605
606
607
608
609
610
611
class TaskRunInput(PrefectBaseModel):
    """
    Base class for classes that represent inputs to task runs, which
    could include, constants, parameters, or other task runs.
    """

    # freeze TaskRunInputs to allow them to be placed in sets
    class Config:
        frozen = True

    input_type: str

TaskRunResult

Bases: TaskRunInput

Represents a task run result input to another task run.

Source code in prefect/client/schemas/objects.py
614
615
616
617
618
class TaskRunResult(TaskRunInput):
    """Represents a task run result input to another task run."""

    input_type: Literal["task_run"] = "task_run"
    id: UUID

Parameter

Bases: TaskRunInput

Represents a parameter input to a task run.

Source code in prefect/client/schemas/objects.py
621
622
623
624
625
class Parameter(TaskRunInput):
    """Represents a parameter input to a task run."""

    input_type: Literal["parameter"] = "parameter"
    name: str

Constant

Bases: TaskRunInput

Represents constant input value to a task run.

Source code in prefect/client/schemas/objects.py
628
629
630
631
632
class Constant(TaskRunInput):
    """Represents constant input value to a task run."""

    input_type: Literal["constant"] = "constant"
    type: str

Workspace

Bases: PrefectBaseModel

A Prefect Cloud workspace.

Expected payload for each workspace returned by the me/workspaces route.

Source code in prefect/client/schemas/objects.py
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
class Workspace(PrefectBaseModel):
    """
    A Prefect Cloud workspace.

    Expected payload for each workspace returned by the `me/workspaces` route.
    """

    account_id: UUID = Field(..., description="The account id of the workspace.")
    account_name: str = Field(..., description="The account name.")
    account_handle: str = Field(..., description="The account's unique handle.")
    workspace_id: UUID = Field(..., description="The workspace id.")
    workspace_name: str = Field(..., description="The workspace name.")
    workspace_description: str = Field(..., description="Description of the workspace.")
    workspace_handle: str = Field(..., description="The workspace's unique handle.")

    class Config:
        extra = "ignore"

    @property
    def handle(self) -> str:
        """
        The full handle of the workspace as `account_handle` / `workspace_handle`
        """
        return self.account_handle + "/" + self.workspace_handle

    def api_url(self) -> str:
        """
        Generate the API URL for accessing this workspace
        """
        return (
            f"{PREFECT_CLOUD_API_URL.value()}"
            f"/accounts/{self.account_id}"
            f"/workspaces/{self.workspace_id}"
        )

    def __hash__(self):
        return hash(self.handle)

handle: str property

The full handle of the workspace as account_handle / workspace_handle

api_url

Generate the API URL for accessing this workspace

Source code in prefect/client/schemas/objects.py
766
767
768
769
770
771
772
773
774
def api_url(self) -> str:
    """
    Generate the API URL for accessing this workspace
    """
    return (
        f"{PREFECT_CLOUD_API_URL.value()}"
        f"/accounts/{self.account_id}"
        f"/workspaces/{self.workspace_id}"
    )

BlockType

Bases: ObjectBaseModel

An ORM representation of a block type

Source code in prefect/client/schemas/objects.py
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
class BlockType(ObjectBaseModel):
    """An ORM representation of a block type"""

    name: str = Field(default=..., description="A block type's name")
    slug: str = Field(default=..., description="A block type's slug")
    logo_url: Optional[HttpUrl] = Field(
        default=None, description="Web URL for the block type's logo"
    )
    documentation_url: Optional[HttpUrl] = Field(
        default=None, description="Web URL for the block type's documentation"
    )
    description: Optional[str] = Field(
        default=None,
        description="A short blurb about the corresponding block's intended use",
    )
    code_example: Optional[str] = Field(
        default=None,
        description="A code snippet demonstrating use of the corresponding block",
    )
    is_protected: bool = Field(
        default=False, description="Protected block types cannot be modified via API."
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        raise_on_name_with_banned_characters(v)
        return v

BlockDocument

Bases: ObjectBaseModel

An ORM representation of a block document.

Source code in prefect/client/schemas/objects.py
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
class BlockDocument(ObjectBaseModel):
    """An ORM representation of a block document."""

    name: Optional[str] = Field(
        default=None,
        description=(
            "The block document's name. Not required for anonymous block documents."
        ),
    )
    data: dict = Field(default_factory=dict, description="The block document's data")
    block_schema_id: UUID = Field(default=..., description="A block schema ID")
    block_schema: Optional[BlockSchema] = Field(
        default=None, description="The associated block schema"
    )
    block_type_id: UUID = Field(default=..., description="A block type ID")
    block_type_name: Optional[str] = Field(None, description="A block type name")
    block_type: Optional[BlockType] = Field(
        default=None, description="The associated block type"
    )
    block_document_references: Dict[str, Dict[str, Any]] = Field(
        default_factory=dict, description="Record of the block document's references"
    )
    is_anonymous: bool = Field(
        default=False,
        description=(
            "Whether the block is anonymous (anonymous blocks are usually created by"
            " Prefect automatically)"
        ),
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        # the BlockDocumentCreate subclass allows name=None
        # and will inherit this validator
        if v is not None:
            raise_on_name_with_banned_characters(v)
        return v

    @root_validator
    def validate_name_is_present_if_not_anonymous(cls, values):
        # anonymous blocks may have no name prior to actually being
        # stored in the database
        if not values.get("is_anonymous") and not values.get("name"):
            raise ValueError("Names must be provided for block documents.")
        return values

Flow

Bases: ObjectBaseModel

An ORM representation of flow data.

Source code in prefect/client/schemas/objects.py
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
class Flow(ObjectBaseModel):
    """An ORM representation of flow data."""

    name: str = Field(
        default=..., description="The name of the flow", example="my-flow"
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of flow tags",
        example=["tag-1", "tag-2"],
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        raise_on_name_with_banned_characters(v)
        return v

FlowRunnerSettings

Bases: PrefectBaseModel

An API schema for passing details about the flow runner.

This schema is agnostic to the types and configuration provided by clients

Source code in prefect/client/schemas/objects.py
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
class FlowRunnerSettings(PrefectBaseModel):
    """
    An API schema for passing details about the flow runner.

    This schema is agnostic to the types and configuration provided by clients
    """

    type: Optional[str] = Field(
        default=None,
        description=(
            "The type of the flow runner which can be used by the client for"
            " dispatching."
        ),
    )
    config: Optional[dict] = Field(
        default=None, description="The configuration for the given flow runner type."
    )

    # The following is required for composite compatibility in the ORM

    def __init__(self, type: str = None, config: dict = None, **kwargs) -> None:
        # Pydantic does not support positional arguments so they must be converted to
        # keyword arguments
        super().__init__(type=type, config=config, **kwargs)

    def __composite_values__(self):
        return self.type, self.config

Deployment

Bases: ObjectBaseModel

An ORM representation of deployment data.

Source code in prefect/client/schemas/objects.py
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
class Deployment(ObjectBaseModel):
    """An ORM representation of deployment data."""

    name: str = Field(default=..., description="The name of the deployment.")
    version: Optional[str] = Field(
        default=None, description="An optional version for the deployment."
    )
    description: Optional[str] = Field(
        default=None, description="A description for the deployment."
    )
    flow_id: UUID = Field(
        default=..., description="The flow id associated with the deployment."
    )
    schedule: Optional[SCHEDULE_TYPES] = Field(
        default=None, description="A schedule for the deployment."
    )
    is_schedule_active: bool = Field(
        default=True, description="Whether or not the deployment schedule is active."
    )
    infra_overrides: Dict[str, Any] = Field(
        default_factory=dict,
        description="Overrides to apply to the base infrastructure block at runtime.",
    )
    parameters: Dict[str, Any] = Field(
        default_factory=dict,
        description="Parameters for flow runs scheduled by the deployment.",
    )
    pull_steps: Optional[List[dict]] = Field(
        default=None,
        description="Pull steps for cloning and running this deployment.",
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of tags for the deployment",
        example=["tag-1", "tag-2"],
    )
    work_queue_name: Optional[str] = Field(
        default=None,
        description=(
            "The work queue for the deployment. If no work queue is set, work will not"
            " be scheduled."
        ),
    )
    last_polled: Optional[DateTimeTZ] = Field(
        default=None,
        description="The last time the deployment was polled for status updates.",
    )
    parameter_openapi_schema: Optional[Dict[str, Any]] = Field(
        default=None,
        description="The parameter schema of the flow, including defaults.",
    )
    path: Optional[str] = Field(
        default=None,
        description=(
            "The path to the working directory for the workflow, relative to remote"
            " storage or an absolute path."
        ),
    )
    entrypoint: Optional[str] = Field(
        default=None,
        description=(
            "The path to the entrypoint for the workflow, relative to the `path`."
        ),
    )
    manifest_path: Optional[str] = Field(
        default=None,
        description=(
            "The path to the flow's manifest file, relative to the chosen storage."
        ),
    )
    storage_document_id: Optional[UUID] = Field(
        default=None,
        description="The block document defining storage used for this flow.",
    )
    infrastructure_document_id: Optional[UUID] = Field(
        default=None,
        description="The block document defining infrastructure to use for flow runs.",
    )
    created_by: Optional[CreatedBy] = Field(
        default=None,
        description="Optional information about the creator of this deployment.",
    )
    updated_by: Optional[UpdatedBy] = Field(
        default=None,
        description="Optional information about the updater of this deployment.",
    )
    work_queue_id: UUID = Field(
        default=None,
        description=(
            "The id of the work pool queue to which this deployment is assigned."
        ),
    )
    enforce_parameter_schema: bool = Field(
        default=False,
        description=(
            "Whether or not the deployment should enforce the parameter schema."
        ),
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        raise_on_name_with_banned_characters(v)
        return v

ConcurrencyLimit

Bases: ObjectBaseModel

An ORM representation of a concurrency limit.

Source code in prefect/client/schemas/objects.py
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
class ConcurrencyLimit(ObjectBaseModel):
    """An ORM representation of a concurrency limit."""

    tag: str = Field(
        default=..., description="A tag the concurrency limit is applied to."
    )
    concurrency_limit: int = Field(default=..., description="The concurrency limit.")
    active_slots: List[UUID] = Field(
        default_factory=list,
        description="A list of active run ids using a concurrency slot",
    )

BlockSchema

Bases: ObjectBaseModel

An ORM representation of a block schema.

Source code in prefect/client/schemas/objects.py
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
class BlockSchema(ObjectBaseModel):
    """An ORM representation of a block schema."""

    checksum: str = Field(default=..., description="The block schema's unique checksum")
    fields: dict = Field(
        default_factory=dict, description="The block schema's field schema"
    )
    block_type_id: Optional[UUID] = Field(default=..., description="A block type ID")
    block_type: Optional[BlockType] = Field(
        default=None, description="The associated block type"
    )
    capabilities: List[str] = Field(
        default_factory=list,
        description="A list of Block capabilities",
    )
    version: str = Field(
        default=DEFAULT_BLOCK_SCHEMA_VERSION,
        description="Human readable identifier for the block schema",
    )

BlockSchemaReference

Bases: ObjectBaseModel

An ORM representation of a block schema reference.

Source code in prefect/client/schemas/objects.py
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
class BlockSchemaReference(ObjectBaseModel):
    """An ORM representation of a block schema reference."""

    parent_block_schema_id: UUID = Field(
        default=..., description="ID of block schema the reference is nested within"
    )
    parent_block_schema: Optional[BlockSchema] = Field(
        default=None, description="The block schema the reference is nested within"
    )
    reference_block_schema_id: UUID = Field(
        default=..., description="ID of the nested block schema"
    )
    reference_block_schema: Optional[BlockSchema] = Field(
        default=None, description="The nested block schema"
    )
    name: str = Field(
        default=..., description="The name that the reference is nested under"
    )

BlockDocumentReference

Bases: ObjectBaseModel

An ORM representation of a block document reference.

Source code in prefect/client/schemas/objects.py
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
class BlockDocumentReference(ObjectBaseModel):
    """An ORM representation of a block document reference."""

    parent_block_document_id: UUID = Field(
        default=..., description="ID of block document the reference is nested within"
    )
    parent_block_document: Optional[BlockDocument] = Field(
        default=None, description="The block document the reference is nested within"
    )
    reference_block_document_id: UUID = Field(
        default=..., description="ID of the nested block document"
    )
    reference_block_document: Optional[BlockDocument] = Field(
        default=None, description="The nested block document"
    )
    name: str = Field(
        default=..., description="The name that the reference is nested under"
    )

    @root_validator
    def validate_parent_and_ref_are_different(cls, values):
        parent_id = values.get("parent_block_document_id")
        ref_id = values.get("reference_block_document_id")
        if parent_id and ref_id and parent_id == ref_id:
            raise ValueError(
                "`parent_block_document_id` and `reference_block_document_id` cannot be"
                " the same"
            )
        return values

SavedSearchFilter

Bases: PrefectBaseModel

A filter for a saved search model. Intended for use by the Prefect UI.

Source code in prefect/client/schemas/objects.py
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
class SavedSearchFilter(PrefectBaseModel):
    """A filter for a saved search model. Intended for use by the Prefect UI."""

    object: str = Field(default=..., description="The object over which to filter.")
    property: str = Field(
        default=..., description="The property of the object on which to filter."
    )
    type: str = Field(default=..., description="The type of the property.")
    operation: str = Field(
        default=...,
        description="The operator to apply to the object. For example, `equals`.",
    )
    value: Any = Field(
        default=..., description="A JSON-compatible value for the filter."
    )

SavedSearch

Bases: ObjectBaseModel

An ORM representation of saved search data. Represents a set of filter criteria.

Source code in prefect/client/schemas/objects.py
1138
1139
1140
1141
1142
1143
1144
class SavedSearch(ObjectBaseModel):
    """An ORM representation of saved search data. Represents a set of filter criteria."""

    name: str = Field(default=..., description="The name of the saved search.")
    filters: List[SavedSearchFilter] = Field(
        default_factory=list, description="The filter set for the saved search."
    )

Log

Bases: ObjectBaseModel

An ORM representation of log data.

Source code in prefect/client/schemas/objects.py
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
class Log(ObjectBaseModel):
    """An ORM representation of log data."""

    name: str = Field(default=..., description="The logger name.")
    level: int = Field(default=..., description="The log level.")
    message: str = Field(default=..., description="The log message.")
    timestamp: DateTimeTZ = Field(default=..., description="The log timestamp.")
    flow_run_id: Optional[UUID] = Field(
        default=None, description="The flow run ID associated with the log."
    )
    task_run_id: Optional[UUID] = Field(
        default=None, description="The task run ID associated with the log."
    )

QueueFilter

Bases: PrefectBaseModel

Filter criteria definition for a work queue.

Source code in prefect/client/schemas/objects.py
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
class QueueFilter(PrefectBaseModel):
    """Filter criteria definition for a work queue."""

    tags: Optional[List[str]] = Field(
        default=None,
        description="Only include flow runs with these tags in the work queue.",
    )
    deployment_ids: Optional[List[UUID]] = Field(
        default=None,
        description="Only include flow runs from these deployments in the work queue.",
    )

WorkQueue

Bases: ObjectBaseModel

An ORM representation of a work queue

Source code in prefect/client/schemas/objects.py
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
class WorkQueue(ObjectBaseModel):
    """An ORM representation of a work queue"""

    name: str = Field(default=..., description="The name of the work queue.")
    description: Optional[str] = Field(
        default="", description="An optional description for the work queue."
    )
    is_paused: bool = Field(
        default=False, description="Whether or not the work queue is paused."
    )
    concurrency_limit: Optional[conint(ge=0)] = Field(
        default=None, description="An optional concurrency limit for the work queue."
    )
    priority: conint(ge=1) = Field(
        default=1,
        description=(
            "The queue's priority. Lower values are higher priority (1 is the highest)."
        ),
    )
    work_pool_name: Optional[str] = Field(default=None)
    # Will be required after a future migration
    work_pool_id: Optional[UUID] = Field(
        description="The work pool with which the queue is associated."
    )
    filter: Optional[QueueFilter] = Field(
        default=None,
        description="DEPRECATED: Filter criteria for the work queue.",
        deprecated=True,
    )
    last_polled: Optional[DateTimeTZ] = Field(
        default=None, description="The last time an agent polled this queue for work."
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        raise_on_name_with_banned_characters(v)
        return v

WorkQueueHealthPolicy

Bases: PrefectBaseModel

Source code in prefect/client/schemas/objects.py
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
class WorkQueueHealthPolicy(PrefectBaseModel):
    maximum_late_runs: Optional[int] = Field(
        default=0,
        description=(
            "The maximum number of late runs in the work queue before it is deemed"
            " unhealthy. Defaults to `0`."
        ),
    )
    maximum_seconds_since_last_polled: Optional[int] = Field(
        default=60,
        description=(
            "The maximum number of time in seconds elapsed since work queue has been"
            " polled before it is deemed unhealthy. Defaults to `60`."
        ),
    )

    def evaluate_health_status(
        self, late_runs_count: int, last_polled: Optional[DateTimeTZ] = None
    ) -> bool:
        """
        Given empirical information about the state of the work queue, evaluate its health status.

        Args:
            late_runs: the count of late runs for the work queue.
            last_polled: the last time the work queue was polled, if available.

        Returns:
            bool: whether or not the work queue is healthy.
        """
        healthy = True
        if (
            self.maximum_late_runs is not None
            and late_runs_count > self.maximum_late_runs
        ):
            healthy = False

        if self.maximum_seconds_since_last_polled is not None:
            if (
                last_polled is None
                or pendulum.now("UTC").diff(last_polled).in_seconds()
                > self.maximum_seconds_since_last_polled
            ):
                healthy = False

        return healthy

evaluate_health_status

Given empirical information about the state of the work queue, evaluate its health status.

Parameters:

Name Type Description Default
late_runs

the count of late runs for the work queue.

required
last_polled Optional[DateTimeTZ]

the last time the work queue was polled, if available.

None

Returns:

Name Type Description
bool bool

whether or not the work queue is healthy.

Source code in prefect/client/schemas/objects.py
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
def evaluate_health_status(
    self, late_runs_count: int, last_polled: Optional[DateTimeTZ] = None
) -> bool:
    """
    Given empirical information about the state of the work queue, evaluate its health status.

    Args:
        late_runs: the count of late runs for the work queue.
        last_polled: the last time the work queue was polled, if available.

    Returns:
        bool: whether or not the work queue is healthy.
    """
    healthy = True
    if (
        self.maximum_late_runs is not None
        and late_runs_count > self.maximum_late_runs
    ):
        healthy = False

    if self.maximum_seconds_since_last_polled is not None:
        if (
            last_polled is None
            or pendulum.now("UTC").diff(last_polled).in_seconds()
            > self.maximum_seconds_since_last_polled
        ):
            healthy = False

    return healthy

FlowRunNotificationPolicy

Bases: ObjectBaseModel

An ORM representation of a flow run notification.

Source code in prefect/client/schemas/objects.py
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
class FlowRunNotificationPolicy(ObjectBaseModel):
    """An ORM representation of a flow run notification."""

    is_active: bool = Field(
        default=True, description="Whether the policy is currently active"
    )
    state_names: List[str] = Field(
        default=..., description="The flow run states that trigger notifications"
    )
    tags: List[str] = Field(
        default=...,
        description="The flow run tags that trigger notifications (set [] to disable)",
    )
    block_document_id: UUID = Field(
        default=..., description="The block document ID used for sending notifications"
    )
    message_template: Optional[str] = Field(
        default=None,
        description=(
            "A templatable notification message. Use {braces} to add variables."
            " Valid variables include:"
            f" {listrepr(sorted(FLOW_RUN_NOTIFICATION_TEMPLATE_KWARGS), sep=', ')}"
        ),
        example=(
            "Flow run {flow_run_name} with id {flow_run_id} entered state"
            " {flow_run_state_name}."
        ),
    )

    @validator("message_template")
    def validate_message_template_variables(cls, v):
        if v is not None:
            try:
                v.format(**{k: "test" for k in FLOW_RUN_NOTIFICATION_TEMPLATE_KWARGS})
            except KeyError as exc:
                raise ValueError(f"Invalid template variable provided: '{exc.args[0]}'")
        return v

Agent

Bases: ObjectBaseModel

An ORM representation of an agent

Source code in prefect/client/schemas/objects.py
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
class Agent(ObjectBaseModel):
    """An ORM representation of an agent"""

    name: str = Field(
        default_factory=lambda: generate_slug(2),
        description=(
            "The name of the agent. If a name is not provided, it will be"
            " auto-generated."
        ),
    )
    work_queue_id: UUID = Field(
        default=..., description="The work queue with which the agent is associated."
    )
    last_activity_time: Optional[DateTimeTZ] = Field(
        default=None, description="The last time this agent polled for work."
    )

WorkPool

Bases: ObjectBaseModel

An ORM representation of a work pool

Source code in prefect/client/schemas/objects.py
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
class WorkPool(ObjectBaseModel):
    """An ORM representation of a work pool"""

    name: str = Field(
        description="The name of the work pool.",
    )
    description: Optional[str] = Field(
        default=None, description="A description of the work pool."
    )
    type: str = Field(description="The work pool type.")
    base_job_template: Dict[str, Any] = Field(
        default_factory=dict, description="The work pool's base job template."
    )
    is_paused: bool = Field(
        default=False,
        description="Pausing the work pool stops the delivery of all work.",
    )
    concurrency_limit: Optional[conint(ge=0)] = Field(
        default=None, description="A concurrency limit for the work pool."
    )
    status: Optional[WorkPoolStatus] = Field(
        default=None, description="The current status of the work pool."
    )

    # this required field has a default of None so that the custom validator
    # below will be called and produce a more helpful error message
    default_queue_id: UUID = Field(
        None, description="The id of the pool's default queue."
    )

    @property
    def is_push_pool(self) -> bool:
        return self.type.endswith(":push")

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        raise_on_name_with_banned_characters(v)
        return v

    @validator("default_queue_id", always=True)
    def helpful_error_for_missing_default_queue_id(cls, v):
        """
        Default queue ID is required because all pools must have a default queue
        ID, but it represents a circular foreign key relationship to a
        WorkQueue (which can't be created until the work pool exists).
        Therefore, while this field can *technically* be null, it shouldn't be.
        This should only be an issue when creating new pools, as reading
        existing ones will always have this field populated. This custom error
        message will help users understand that they should use the
        `actions.WorkPoolCreate` model in that case.
        """
        if v is None:
            raise ValueError(
                "`default_queue_id` is a required field. If you are "
                "creating a new WorkPool and don't have a queue "
                "ID yet, use the `actions.WorkPoolCreate` model instead."
            )
        return v

helpful_error_for_missing_default_queue_id

Default queue ID is required because all pools must have a default queue ID, but it represents a circular foreign key relationship to a WorkQueue (which can't be created until the work pool exists). Therefore, while this field can technically be null, it shouldn't be. This should only be an issue when creating new pools, as reading existing ones will always have this field populated. This custom error message will help users understand that they should use the actions.WorkPoolCreate model in that case.

Source code in prefect/client/schemas/objects.py
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
@validator("default_queue_id", always=True)
def helpful_error_for_missing_default_queue_id(cls, v):
    """
    Default queue ID is required because all pools must have a default queue
    ID, but it represents a circular foreign key relationship to a
    WorkQueue (which can't be created until the work pool exists).
    Therefore, while this field can *technically* be null, it shouldn't be.
    This should only be an issue when creating new pools, as reading
    existing ones will always have this field populated. This custom error
    message will help users understand that they should use the
    `actions.WorkPoolCreate` model in that case.
    """
    if v is None:
        raise ValueError(
            "`default_queue_id` is a required field. If you are "
            "creating a new WorkPool and don't have a queue "
            "ID yet, use the `actions.WorkPoolCreate` model instead."
        )
    return v

Worker

Bases: ObjectBaseModel

An ORM representation of a worker

Source code in prefect/client/schemas/objects.py
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
class Worker(ObjectBaseModel):
    """An ORM representation of a worker"""

    name: str = Field(description="The name of the worker.")
    work_pool_id: UUID = Field(
        description="The work pool with which the queue is associated."
    )
    last_heartbeat_time: datetime.datetime = Field(
        None, description="The last time the worker process sent a heartbeat."
    )
    heartbeat_interval_seconds: Optional[int] = Field(
        default=None,
        description=(
            "The number of seconds to expect between heartbeats sent by the worker."
        ),
    )
    status: WorkerStatus = Field(
        WorkerStatus.OFFLINE,
        description="Current status of the worker.",
    )

prefect.client.schemas.responses

SetStateStatus

Bases: AutoEnum

Enumerates return statuses for setting run states.

Source code in prefect/client/schemas/responses.py
24
25
26
27
28
29
30
class SetStateStatus(AutoEnum):
    """Enumerates return statuses for setting run states."""

    ACCEPT = AutoEnum.auto()
    REJECT = AutoEnum.auto()
    ABORT = AutoEnum.auto()
    WAIT = AutoEnum.auto()

StateAcceptDetails

Bases: PrefectBaseModel

Details associated with an ACCEPT state transition.

Source code in prefect/client/schemas/responses.py
33
34
35
36
37
38
39
40
41
42
class StateAcceptDetails(PrefectBaseModel):
    """Details associated with an ACCEPT state transition."""

    type: Literal["accept_details"] = Field(
        default="accept_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )

StateRejectDetails

Bases: PrefectBaseModel

Details associated with a REJECT state transition.

Source code in prefect/client/schemas/responses.py
45
46
47
48
49
50
51
52
53
54
55
56
57
class StateRejectDetails(PrefectBaseModel):
    """Details associated with a REJECT state transition."""

    type: Literal["reject_details"] = Field(
        default="reject_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition was rejected."
    )

StateAbortDetails

Bases: PrefectBaseModel

Details associated with an ABORT state transition.

Source code in prefect/client/schemas/responses.py
60
61
62
63
64
65
66
67
68
69
70
71
72
class StateAbortDetails(PrefectBaseModel):
    """Details associated with an ABORT state transition."""

    type: Literal["abort_details"] = Field(
        default="abort_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition was aborted."
    )

StateWaitDetails

Bases: PrefectBaseModel

Details associated with a WAIT state transition.

Source code in prefect/client/schemas/responses.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class StateWaitDetails(PrefectBaseModel):
    """Details associated with a WAIT state transition."""

    type: Literal["wait_details"] = Field(
        default="wait_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    delay_seconds: int = Field(
        default=...,
        description=(
            "The length of time in seconds the client should wait before transitioning"
            " states."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition should wait."
    )

HistoryResponseState

Bases: PrefectBaseModel

Represents a single state's history over an interval.

Source code in prefect/client/schemas/responses.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class HistoryResponseState(PrefectBaseModel):
    """Represents a single state's history over an interval."""

    state_type: objects.StateType = Field(default=..., description="The state type.")
    state_name: str = Field(default=..., description="The state name.")
    count_runs: int = Field(
        default=...,
        description="The number of runs in the specified state during the interval.",
    )
    sum_estimated_run_time: datetime.timedelta = Field(
        default=...,
        description="The total estimated run time of all runs during the interval.",
    )
    sum_estimated_lateness: datetime.timedelta = Field(
        default=...,
        description=(
            "The sum of differences between actual and expected start time during the"
            " interval."
        ),
    )

HistoryResponse

Bases: PrefectBaseModel

Represents a history of aggregation states over an interval

Source code in prefect/client/schemas/responses.py
119
120
121
122
123
124
125
126
127
128
129
130
class HistoryResponse(PrefectBaseModel):
    """Represents a history of aggregation states over an interval"""

    interval_start: DateTimeTZ = Field(
        default=..., description="The start date of the interval."
    )
    interval_end: DateTimeTZ = Field(
        default=..., description="The end date of the interval."
    )
    states: List[HistoryResponseState] = Field(
        default=..., description="A list of state histories during the interval."
    )

OrchestrationResult

Bases: PrefectBaseModel

A container for the output of state orchestration.

Source code in prefect/client/schemas/responses.py
138
139
140
141
142
143
144
145
class OrchestrationResult(PrefectBaseModel):
    """
    A container for the output of state orchestration.
    """

    state: Optional[objects.State]
    status: SetStateStatus
    details: StateResponseDetails

FlowRunResponse

Bases: ObjectBaseModel

Source code in prefect/client/schemas/responses.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
@copy_model_fields
class FlowRunResponse(ObjectBaseModel):
    name: str = FieldFrom(objects.FlowRun)
    flow_id: UUID = FieldFrom(objects.FlowRun)
    state_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    deployment_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    work_queue_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    work_queue_name: Optional[str] = FieldFrom(objects.FlowRun)
    flow_version: Optional[str] = FieldFrom(objects.FlowRun)
    parameters: dict = FieldFrom(objects.FlowRun)
    idempotency_key: Optional[str] = FieldFrom(objects.FlowRun)
    context: dict = FieldFrom(objects.FlowRun)
    empirical_policy: objects.FlowRunPolicy = FieldFrom(objects.FlowRun)
    tags: List[str] = FieldFrom(objects.FlowRun)
    parent_task_run_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    state_type: Optional[objects.StateType] = FieldFrom(objects.FlowRun)
    state_name: Optional[str] = FieldFrom(objects.FlowRun)
    run_count: int = FieldFrom(objects.FlowRun)
    expected_start_time: Optional[DateTimeTZ] = FieldFrom(objects.FlowRun)
    next_scheduled_start_time: Optional[DateTimeTZ] = FieldFrom(objects.FlowRun)
    start_time: Optional[DateTimeTZ] = FieldFrom(objects.FlowRun)
    end_time: Optional[DateTimeTZ] = FieldFrom(objects.FlowRun)
    total_run_time: datetime.timedelta = FieldFrom(objects.FlowRun)
    estimated_run_time: datetime.timedelta = FieldFrom(objects.FlowRun)
    estimated_start_time_delta: datetime.timedelta = FieldFrom(objects.FlowRun)
    auto_scheduled: bool = FieldFrom(objects.FlowRun)
    infrastructure_document_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    infrastructure_pid: Optional[str] = FieldFrom(objects.FlowRun)
    created_by: Optional[CreatedBy] = FieldFrom(objects.FlowRun)
    work_pool_id: Optional[UUID] = FieldFrom(objects.FlowRun)
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the flow run's work pool.",
        example="my-work-pool",
    )
    state: Optional[objects.State] = FieldFrom(objects.FlowRun)

    def __eq__(self, other: Any) -> bool:
        """
        Check for "equality" to another flow run schema

        Estimates times are rolling and will always change with repeated queries for
        a flow run so we ignore them during equality checks.
        """
        if isinstance(other, FlowRunResponse):
            exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
            return self.dict(exclude=exclude_fields) == other.dict(
                exclude=exclude_fields
            )
        return super().__eq__(other)

prefect.client.schemas.schedules

Schedule schemas

IntervalSchedule

Bases: PrefectBaseModel

A schedule formed by adding interval increments to an anchor_date. If no anchor_date is supplied, the current UTC time is used. If a timezone-naive datetime is provided for anchor_date, it is assumed to be in the schedule's timezone (or UTC). Even if supplied with an IANA timezone, anchor dates are always stored as UTC offsets, so a timezone can be provided to determine localization behaviors like DST boundary handling. If none is provided it will be inferred from the anchor date.

NOTE: If the IntervalSchedule anchor_date or timezone is provided in a DST-observing timezone, then the schedule will adjust itself appropriately. Intervals greater than 24 hours will follow DST conventions, while intervals of less than 24 hours will follow UTC intervals. For example, an hourly schedule will fire every UTC hour, even across DST boundaries. When clocks are set back, this will result in two runs that appear to both be scheduled for 1am local time, even though they are an hour apart in UTC time. For longer intervals, like a daily schedule, the interval schedule will adjust for DST boundaries so that the clock-hour remains constant. This means that a daily schedule that always fires at 9am will observe DST and continue to fire at 9am in the local time zone.

Parameters:

Name Type Description Default
interval timedelta

an interval to schedule on

required
anchor_date DateTimeTZ

an anchor date to schedule increments against; if not provided, the current timestamp will be used

required
timezone str

a valid timezone string

required
Source code in prefect/client/schemas/schedules.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class IntervalSchedule(PrefectBaseModel):
    """
    A schedule formed by adding `interval` increments to an `anchor_date`. If no
    `anchor_date` is supplied, the current UTC time is used.  If a
    timezone-naive datetime is provided for `anchor_date`, it is assumed to be
    in the schedule's timezone (or UTC). Even if supplied with an IANA timezone,
    anchor dates are always stored as UTC offsets, so a `timezone` can be
    provided to determine localization behaviors like DST boundary handling. If
    none is provided it will be inferred from the anchor date.

    NOTE: If the `IntervalSchedule` `anchor_date` or `timezone` is provided in a
    DST-observing timezone, then the schedule will adjust itself appropriately.
    Intervals greater than 24 hours will follow DST conventions, while intervals
    of less than 24 hours will follow UTC intervals. For example, an hourly
    schedule will fire every UTC hour, even across DST boundaries. When clocks
    are set back, this will result in two runs that *appear* to both be
    scheduled for 1am local time, even though they are an hour apart in UTC
    time. For longer intervals, like a daily schedule, the interval schedule
    will adjust for DST boundaries so that the clock-hour remains constant. This
    means that a daily schedule that always fires at 9am will observe DST and
    continue to fire at 9am in the local time zone.

    Args:
        interval (datetime.timedelta): an interval to schedule on
        anchor_date (DateTimeTZ, optional): an anchor date to schedule increments against;
            if not provided, the current timestamp will be used
        timezone (str, optional): a valid timezone string
    """

    class Config:
        extra = "forbid"
        exclude_none = True

    interval: datetime.timedelta
    anchor_date: DateTimeTZ = None
    timezone: Optional[str] = Field(default=None, example="America/New_York")

    @validator("interval")
    def interval_must_be_positive(cls, v):
        if v.total_seconds() <= 0:
            raise ValueError("The interval must be positive")
        return v

    @validator("anchor_date", always=True)
    def default_anchor_date(cls, v):
        if v is None:
            return pendulum.now("UTC")
        return pendulum.instance(v)

    @validator("timezone", always=True)
    def default_timezone(cls, v, *, values, **kwargs):
        # if was provided, make sure its a valid IANA string
        if v and v not in pendulum.tz.timezones:
            raise ValueError(f'Invalid timezone: "{v}"')

        # otherwise infer the timezone from the anchor date
        elif v is None and values.get("anchor_date"):
            tz = values["anchor_date"].tz.name
            if tz in pendulum.tz.timezones:
                return tz
            # sometimes anchor dates have "timezones" that are UTC offsets
            # like "-04:00". This happens when parsing ISO8601 strings.
            # In this case we, the correct inferred localization is "UTC".
            else:
                return "UTC"

        return v

CronSchedule

Bases: PrefectBaseModel

Cron schedule

NOTE: If the timezone is a DST-observing one, then the schedule will adjust itself appropriately. Cron's rules for DST are based on schedule times, not intervals. This means that an hourly cron schedule will fire on every new schedule hour, not every elapsed hour; for example, when clocks are set back this will result in a two-hour pause as the schedule will fire the first time 1am is reached and the first time 2am is reached, 120 minutes later. Longer schedules, such as one that fires at 9am every morning, will automatically adjust for DST.

Parameters:

Name Type Description Default
cron str

a valid cron string

required
timezone str

a valid timezone string in IANA tzdata format (for example, America/New_York).

required
day_or bool

Control how croniter handles day and day_of_week entries. Defaults to True, matching cron which connects those values using OR. If the switch is set to False, the values are connected using AND. This behaves like fcron and enables you to e.g. define a job that executes each 2nd friday of a month by setting the days of month and the weekday.

required
Source code in prefect/client/schemas/schedules.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class CronSchedule(PrefectBaseModel):
    """
    Cron schedule

    NOTE: If the timezone is a DST-observing one, then the schedule will adjust
    itself appropriately. Cron's rules for DST are based on schedule times, not
    intervals. This means that an hourly cron schedule will fire on every new
    schedule hour, not every elapsed hour; for example, when clocks are set back
    this will result in a two-hour pause as the schedule will fire *the first
    time* 1am is reached and *the first time* 2am is reached, 120 minutes later.
    Longer schedules, such as one that fires at 9am every morning, will
    automatically adjust for DST.

    Args:
        cron (str): a valid cron string
        timezone (str): a valid timezone string in IANA tzdata format (for example,
            America/New_York).
        day_or (bool, optional): Control how croniter handles `day` and `day_of_week`
            entries. Defaults to True, matching cron which connects those values using
            OR. If the switch is set to False, the values are connected using AND. This
            behaves like fcron and enables you to e.g. define a job that executes each
            2nd friday of a month by setting the days of month and the weekday.

    """

    class Config:
        extra = "forbid"

    cron: str = Field(default=..., example="0 0 * * *")
    timezone: Optional[str] = Field(default=None, example="America/New_York")
    day_or: bool = Field(
        default=True,
        description=(
            "Control croniter behavior for handling day and day_of_week entries."
        ),
    )

    @validator("timezone")
    def valid_timezone(cls, v):
        if v and v not in pendulum.tz.timezones:
            raise ValueError(
                f'Invalid timezone: "{v}" (specify in IANA tzdata format, for example,'
                " America/New_York)"
            )
        return v

    @validator("cron")
    def valid_cron_string(cls, v):
        # croniter allows "random" and "hashed" expressions
        # which we do not support https://github.com/kiorky/croniter
        if not croniter.is_valid(v):
            raise ValueError(f'Invalid cron string: "{v}"')
        elif any(c for c in v.split() if c.casefold() in ["R", "H", "r", "h"]):
            raise ValueError(
                f'Random and Hashed expressions are unsupported, received: "{v}"'
            )
        return v

RRuleSchedule

Bases: PrefectBaseModel

RRule schedule, based on the iCalendar standard (RFC 5545) as implemented in dateutils.rrule.

RRules are appropriate for any kind of calendar-date manipulation, including irregular intervals, repetition, exclusions, week day or day-of-month adjustments, and more.

Note that as a calendar-oriented standard, RRuleSchedules are sensitive to to the initial timezone provided. A 9am daily schedule with a daylight saving time-aware start date will maintain a local 9am time through DST boundaries; a 9am daily schedule with a UTC start date will maintain a 9am UTC time.

Parameters:

Name Type Description Default
rrule str

a valid RRule string

required
timezone str

a valid timezone string

required
Source code in prefect/client/schemas/schedules.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
class RRuleSchedule(PrefectBaseModel):
    """
    RRule schedule, based on the iCalendar standard
    ([RFC 5545](https://datatracker.ietf.org/doc/html/rfc5545)) as
    implemented in `dateutils.rrule`.

    RRules are appropriate for any kind of calendar-date manipulation, including
    irregular intervals, repetition, exclusions, week day or day-of-month
    adjustments, and more.

    Note that as a calendar-oriented standard, `RRuleSchedules` are sensitive to
    to the initial timezone provided. A 9am daily schedule with a daylight saving
    time-aware start date will maintain a local 9am time through DST boundaries;
    a 9am daily schedule with a UTC start date will maintain a 9am UTC time.

    Args:
        rrule (str): a valid RRule string
        timezone (str, optional): a valid timezone string
    """

    class Config:
        extra = "forbid"

    rrule: str
    timezone: Optional[str] = Field(default=None, example="America/New_York")

    @validator("rrule")
    def validate_rrule_str(cls, v):
        # attempt to parse the rrule string as an rrule object
        # this will error if the string is invalid
        try:
            dateutil.rrule.rrulestr(v, cache=True)
        except ValueError as exc:
            # rrules errors are a mix of cryptic and informative
            # so reraise to be clear that the string was invalid
            raise ValueError(f'Invalid RRule string "{v}": {exc}')
        if len(v) > MAX_RRULE_LENGTH:
            raise ValueError(
                f'Invalid RRule string "{v[:40]}..."\n'
                f"Max length is {MAX_RRULE_LENGTH}, got {len(v)}"
            )
        return v

    @classmethod
    def from_rrule(cls, rrule: dateutil.rrule.rrule):
        if isinstance(rrule, dateutil.rrule.rrule):
            if rrule._dtstart.tzinfo is not None:
                timezone = rrule._dtstart.tzinfo.name
            else:
                timezone = "UTC"
            return RRuleSchedule(rrule=str(rrule), timezone=timezone)
        elif isinstance(rrule, dateutil.rrule.rruleset):
            dtstarts = [rr._dtstart for rr in rrule._rrule if rr._dtstart is not None]
            unique_dstarts = set(pendulum.instance(d).in_tz("UTC") for d in dtstarts)
            unique_timezones = set(d.tzinfo for d in dtstarts if d.tzinfo is not None)

            if len(unique_timezones) > 1:
                raise ValueError(
                    f"rruleset has too many dtstart timezones: {unique_timezones}"
                )

            if len(unique_dstarts) > 1:
                raise ValueError(f"rruleset has too many dtstarts: {unique_dstarts}")

            if unique_dstarts and unique_timezones:
                timezone = dtstarts[0].tzinfo.name
            else:
                timezone = "UTC"

            rruleset_string = ""
            if rrule._rrule:
                rruleset_string += "\n".join(str(r) for r in rrule._rrule)
            if rrule._exrule:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "\n".join(str(r) for r in rrule._exrule).replace(
                    "RRULE", "EXRULE"
                )
            if rrule._rdate:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "RDATE:" + ",".join(
                    rd.strftime("%Y%m%dT%H%M%SZ") for rd in rrule._rdate
                )
            if rrule._exdate:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "EXDATE:" + ",".join(
                    exd.strftime("%Y%m%dT%H%M%SZ") for exd in rrule._exdate
                )
            return RRuleSchedule(rrule=rruleset_string, timezone=timezone)
        else:
            raise ValueError(f"Invalid RRule object: {rrule}")

    def to_rrule(self) -> dateutil.rrule.rrule:
        """
        Since rrule doesn't properly serialize/deserialize timezones, we localize dates
        here
        """
        rrule = dateutil.rrule.rrulestr(
            self.rrule,
            dtstart=DEFAULT_ANCHOR_DATE,
            cache=True,
        )
        timezone = dateutil.tz.gettz(self.timezone)
        if isinstance(rrule, dateutil.rrule.rrule):
            kwargs = dict(dtstart=rrule._dtstart.replace(tzinfo=timezone))
            if rrule._until:
                kwargs.update(
                    until=rrule._until.replace(tzinfo=timezone),
                )
            return rrule.replace(**kwargs)
        elif isinstance(rrule, dateutil.rrule.rruleset):
            # update rrules
            localized_rrules = []
            for rr in rrule._rrule:
                kwargs = dict(dtstart=rr._dtstart.replace(tzinfo=timezone))
                if rr._until:
                    kwargs.update(
                        until=rr._until.replace(tzinfo=timezone),
                    )
                localized_rrules.append(rr.replace(**kwargs))
            rrule._rrule = localized_rrules

            # update exrules
            localized_exrules = []
            for exr in rrule._exrule:
                kwargs = dict(dtstart=exr._dtstart.replace(tzinfo=timezone))
                if exr._until:
                    kwargs.update(
                        until=exr._until.replace(tzinfo=timezone),
                    )
                localized_exrules.append(exr.replace(**kwargs))
            rrule._exrule = localized_exrules

            # update rdates
            localized_rdates = []
            for rd in rrule._rdate:
                localized_rdates.append(rd.replace(tzinfo=timezone))
            rrule._rdate = localized_rdates

            # update exdates
            localized_exdates = []
            for exd in rrule._exdate:
                localized_exdates.append(exd.replace(tzinfo=timezone))
            rrule._exdate = localized_exdates

            return rrule

    @validator("timezone", always=True)
    def valid_timezone(cls, v):
        if v and v not in pytz.all_timezones_set:
            raise ValueError(f'Invalid timezone: "{v}"')
        elif v is None:
            return "UTC"
        return v

to_rrule

Since rrule doesn't properly serialize/deserialize timezones, we localize dates here

Source code in prefect/client/schemas/schedules.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def to_rrule(self) -> dateutil.rrule.rrule:
    """
    Since rrule doesn't properly serialize/deserialize timezones, we localize dates
    here
    """
    rrule = dateutil.rrule.rrulestr(
        self.rrule,
        dtstart=DEFAULT_ANCHOR_DATE,
        cache=True,
    )
    timezone = dateutil.tz.gettz(self.timezone)
    if isinstance(rrule, dateutil.rrule.rrule):
        kwargs = dict(dtstart=rrule._dtstart.replace(tzinfo=timezone))
        if rrule._until:
            kwargs.update(
                until=rrule._until.replace(tzinfo=timezone),
            )
        return rrule.replace(**kwargs)
    elif isinstance(rrule, dateutil.rrule.rruleset):
        # update rrules
        localized_rrules = []
        for rr in rrule._rrule:
            kwargs = dict(dtstart=rr._dtstart.replace(tzinfo=timezone))
            if rr._until:
                kwargs.update(
                    until=rr._until.replace(tzinfo=timezone),
                )
            localized_rrules.append(rr.replace(**kwargs))
        rrule._rrule = localized_rrules

        # update exrules
        localized_exrules = []
        for exr in rrule._exrule:
            kwargs = dict(dtstart=exr._dtstart.replace(tzinfo=timezone))
            if exr._until:
                kwargs.update(
                    until=exr._until.replace(tzinfo=timezone),
                )
            localized_exrules.append(exr.replace(**kwargs))
        rrule._exrule = localized_exrules

        # update rdates
        localized_rdates = []
        for rd in rrule._rdate:
            localized_rdates.append(rd.replace(tzinfo=timezone))
        rrule._rdate = localized_rdates

        # update exdates
        localized_exdates = []
        for exd in rrule._exdate:
            localized_exdates.append(exd.replace(tzinfo=timezone))
        rrule._exdate = localized_exdates

        return rrule

construct_schedule

Construct a schedule from the provided arguments.

Parameters:

Name Type Description Default
interval Optional[Union[int, float, timedelta]]

An interval on which to schedule runs. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.

None
anchor_date Optional[Union[datetime, str]]

The start date for an interval schedule.

None
cron Optional[str]

A cron schedule for runs.

None
rrule Optional[str]

An rrule schedule of when to execute runs of this flow.

None
timezone Optional[str]

A timezone to use for the schedule. Defaults to UTC.

None
Source code in prefect/client/schemas/schedules.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def construct_schedule(
    interval: Optional[Union[int, float, datetime.timedelta]] = None,
    anchor_date: Optional[Union[datetime.datetime, str]] = None,
    cron: Optional[str] = None,
    rrule: Optional[str] = None,
    timezone: Optional[str] = None,
) -> SCHEDULE_TYPES:
    """
    Construct a schedule from the provided arguments.

    Args:
        interval: An interval on which to schedule runs. Accepts either a number
            or a timedelta object. If a number is given, it will be interpreted as seconds.
        anchor_date: The start date for an interval schedule.
        cron: A cron schedule for runs.
        rrule: An rrule schedule of when to execute runs of this flow.
        timezone: A timezone to use for the schedule. Defaults to UTC.
    """
    num_schedules = sum(1 for entry in (interval, cron, rrule) if entry is not None)
    if num_schedules > 1:
        raise ValueError("Only one of interval, cron, or rrule can be provided.")

    if anchor_date and not interval:
        raise ValueError(
            "An anchor date can only be provided with an interval schedule"
        )

    if timezone and not (interval or cron or rrule):
        raise ValueError(
            "A timezone can only be provided with interval, cron, or rrule"
        )

    schedule = None
    if interval:
        if isinstance(interval, (int, float)):
            interval = datetime.timedelta(seconds=interval)
        schedule = IntervalSchedule(
            interval=interval, anchor_date=anchor_date, timezone=timezone
        )
    elif cron:
        schedule = CronSchedule(cron=cron, timezone=timezone)
    elif rrule:
        schedule = RRuleSchedule(rrule=rrule, timezone=timezone)

    if schedule is None:
        raise ValueError("Either interval, cron, or rrule must be provided")

    return schedule

prefect.client.schemas.sorting

FlowRunSort

Bases: AutoEnum

Defines flow run sorting options.

Source code in prefect/client/schemas/sorting.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class FlowRunSort(AutoEnum):
    """Defines flow run sorting options."""

    ID_DESC = AutoEnum.auto()
    START_TIME_ASC = AutoEnum.auto()
    START_TIME_DESC = AutoEnum.auto()
    EXPECTED_START_TIME_ASC = AutoEnum.auto()
    EXPECTED_START_TIME_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    NAME_DESC = AutoEnum.auto()
    NEXT_SCHEDULED_START_TIME_ASC = AutoEnum.auto()
    END_TIME_DESC = AutoEnum.auto()

TaskRunSort

Bases: AutoEnum

Defines task run sorting options.

Source code in prefect/client/schemas/sorting.py
18
19
20
21
22
23
24
25
26
27
class TaskRunSort(AutoEnum):
    """Defines task run sorting options."""

    ID_DESC = AutoEnum.auto()
    EXPECTED_START_TIME_ASC = AutoEnum.auto()
    EXPECTED_START_TIME_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    NAME_DESC = AutoEnum.auto()
    NEXT_SCHEDULED_START_TIME_ASC = AutoEnum.auto()
    END_TIME_DESC = AutoEnum.auto()

LogSort

Bases: AutoEnum

Defines log sorting options.

Source code in prefect/client/schemas/sorting.py
30
31
32
33
34
class LogSort(AutoEnum):
    """Defines log sorting options."""

    TIMESTAMP_ASC = AutoEnum.auto()
    TIMESTAMP_DESC = AutoEnum.auto()

FlowSort

Bases: AutoEnum

Defines flow sorting options.

Source code in prefect/client/schemas/sorting.py
37
38
39
40
41
42
43
class FlowSort(AutoEnum):
    """Defines flow sorting options."""

    CREATED_DESC = AutoEnum.auto()
    UPDATED_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    NAME_DESC = AutoEnum.auto()

DeploymentSort

Bases: AutoEnum

Defines deployment sorting options.

Source code in prefect/client/schemas/sorting.py
46
47
48
49
50
51
52
class DeploymentSort(AutoEnum):
    """Defines deployment sorting options."""

    CREATED_DESC = AutoEnum.auto()
    UPDATED_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    NAME_DESC = AutoEnum.auto()

ArtifactSort

Bases: AutoEnum

Defines artifact sorting options.

Source code in prefect/client/schemas/sorting.py
55
56
57
58
59
60
61
62
class ArtifactSort(AutoEnum):
    """Defines artifact sorting options."""

    CREATED_DESC = AutoEnum.auto()
    UPDATED_DESC = AutoEnum.auto()
    ID_DESC = AutoEnum.auto()
    KEY_DESC = AutoEnum.auto()
    KEY_ASC = AutoEnum.auto()

ArtifactCollectionSort

Bases: AutoEnum

Defines artifact collection sorting options.

Source code in prefect/client/schemas/sorting.py
65
66
67
68
69
70
71
72
class ArtifactCollectionSort(AutoEnum):
    """Defines artifact collection sorting options."""

    CREATED_DESC = AutoEnum.auto()
    UPDATED_DESC = AutoEnum.auto()
    ID_DESC = AutoEnum.auto()
    KEY_DESC = AutoEnum.auto()
    KEY_ASC = AutoEnum.auto()

VariableSort

Bases: AutoEnum

Defines variables sorting options.

Source code in prefect/client/schemas/sorting.py
75
76
77
78
79
80
81
class VariableSort(AutoEnum):
    """Defines variables sorting options."""

    CREATED_DESC = "CREATED_DESC"
    UPDATED_DESC = "UPDATED_DESC"
    NAME_DESC = "NAME_DESC"
    NAME_ASC = "NAME_ASC"

BlockDocumentSort

Bases: AutoEnum

Defines block document sorting options.

Source code in prefect/client/schemas/sorting.py
84
85
86
87
88
89
class BlockDocumentSort(AutoEnum):
    """Defines block document sorting options."""

    NAME_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    BLOCK_TYPE_AND_NAME_ASC = AutoEnum.auto()