prefect.orion.schemas.core
¶
Full schemas of Orion API objects.
Agent
pydantic-model
¶
An ORM representation of an agent
Source code in prefect/orion/schemas/core.py
class Agent(ORMBaseModel):
"""An ORM representation of an agent"""
name: str = Field(
default_factory=lambda: generate_slug(2),
description="The name of the agent. If a name is not provided, it will be auto-generated.",
)
work_queue_id: UUID = Field(
..., description="The work queue with which the agent is associated."
)
last_activity_time: Optional[DateTimeTZ] = Field(
None, description="The last time this agent polled for work."
)
last_activity_time
pydantic-field
¶
Type: DateTimeTZ
The last time this agent polled for work.
name
pydantic-field
¶
Type: str
The name of the agent. If a name is not provided, it will be auto-generated.
work_queue_id
pydantic-field
required
¶
Type: UUID
The work queue with which the agent is associated.
BlockDocument
pydantic-model
¶
An ORM representation of a block document.
Source code in prefect/orion/schemas/core.py
class BlockDocument(ORMBaseModel):
"""An ORM representation of a block document."""
name: Optional[str] = Field(
None,
description="The block document's name. Not required for anonymous block documents.",
)
data: dict = Field(default_factory=dict, description="The block document's data")
block_schema_id: UUID = Field(..., description="A block schema ID")
block_schema: Optional[BlockSchema] = Field(
None, description="The associated block schema"
)
block_type_id: UUID = Field(..., description="A block type ID")
block_type: Optional[BlockType] = Field(
None, description="The associated block type"
)
block_document_references: Dict[str, Dict[str, Any]] = Field(
default_factory=dict, description="Record of the block document's references"
)
is_anonymous: bool = Field(
False,
description="Whether the block is anonymous (anonymous blocks are usually created by Prefect automatically)",
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
# the BlockDocumentCreate subclass allows name=None
# and will inherit this validator
if v is not None:
raise_on_invalid_name(v)
return v
@root_validator
def validate_name_is_present_if_not_anonymous(cls, values):
# anonymous blocks may have no name prior to actually being
# stored in the database
if not values.get("is_anonymous") and not values.get("name"):
raise ValueError("Names must be provided for block documents.")
return values
@classmethod
async def from_orm_model(
cls,
session,
orm_block_document: "prefect.orion.database.orm_models.ORMBlockDocument",
include_secrets: bool = False,
):
data = await orm_block_document.decrypt_data(session=session)
# if secrets are not included, obfuscate them based on the schema's
# `secret_fields`. Note this walks any nested blocks as well. If the
# nested blocks were recovered from named blocks, they will already
# be obfuscated, but if nested fields were hardcoded into the parent
# blocks data, this is the only opportunity to obfuscate them.
if not include_secrets:
flat_data = dict_to_flatdict(data)
# iterate over the (possibly nested) secret fields
# and obfuscate their data
for field in orm_block_document.block_schema.fields.get(
"secret_fields", []
):
key = tuple(field.split("."))
if flat_data.get(key) is not None:
flat_data[key] = obfuscate_string(flat_data[key])
data = flatdict_to_dict(flat_data)
return cls(
id=orm_block_document.id,
created=orm_block_document.created,
updated=orm_block_document.updated,
name=orm_block_document.name,
data=data,
block_schema_id=orm_block_document.block_schema_id,
block_schema=orm_block_document.block_schema,
block_type_id=orm_block_document.block_type_id,
block_type=orm_block_document.block_type,
is_anonymous=orm_block_document.is_anonymous,
)
block_document_references
pydantic-field
¶
Type: Dict[str, Dict[str, Any]]
Record of the block document's references
block_schema
pydantic-field
¶
Type: BlockSchema
The associated block schema
block_schema_id
pydantic-field
required
¶
Type: UUID
A block schema ID
block_type
pydantic-field
¶
Type: BlockType
The associated block type
block_type_id
pydantic-field
required
¶
Type: UUID
A block type ID
data
pydantic-field
¶
Type: dict
The block document's data
is_anonymous
pydantic-field
¶
Type: bool
Whether the block is anonymous (anonymous blocks are usually created by Prefect automatically)
name
pydantic-field
¶
Type: str
The block document's name. Not required for anonymous block documents.
BlockDocumentReference
pydantic-model
¶
An ORM representation of a block document reference.
Source code in prefect/orion/schemas/core.py
class BlockDocumentReference(ORMBaseModel):
"""An ORM representation of a block document reference."""
parent_block_document_id: UUID = Field(
..., description="ID of block document the reference is nested within"
)
parent_block_document: Optional[BlockDocument] = Field(
None, description="The block document the reference is nested within"
)
reference_block_document_id: UUID = Field(
..., description="ID of the nested block document"
)
reference_block_document: Optional[BlockDocument] = Field(
None, description="The nested block document"
)
name: str = Field(..., description="The name that the reference is nested under")
name
pydantic-field
required
¶
Type: str
The name that the reference is nested under
parent_block_document
pydantic-field
¶
Type: BlockDocument
The block document the reference is nested within
parent_block_document_id
pydantic-field
required
¶
Type: UUID
ID of block document the reference is nested within
reference_block_document
pydantic-field
¶
Type: BlockDocument
The nested block document
reference_block_document_id
pydantic-field
required
¶
Type: UUID
ID of the nested block document
BlockSchema
pydantic-model
¶
An ORM representation of a block schema.
Source code in prefect/orion/schemas/core.py
class BlockSchema(ORMBaseModel):
"""An ORM representation of a block schema."""
checksum: str = Field(..., description="The block schema's unique checksum")
fields: dict = Field(
default_factory=dict, description="The block schema's field schema"
)
block_type_id: Optional[UUID] = Field(..., description="A block type ID")
block_type: Optional[BlockType] = Field(
None, description="The associated block type"
)
capabilities: List[str] = Field(
default_factory=list,
description="A list of Block capabilities",
)
block_type
pydantic-field
¶
Type: BlockType
The associated block type
block_type_id
pydantic-field
required
¶
Type: UUID
A block type ID
capabilities
pydantic-field
¶
Type: List[str]
A list of Block capabilities
checksum
pydantic-field
required
¶
Type: str
The block schema's unique checksum
fields
pydantic-field
¶
Type: dict
The block schema's field schema
BlockSchemaReference
pydantic-model
¶
An ORM representation of a block schema reference.
Source code in prefect/orion/schemas/core.py
class BlockSchemaReference(ORMBaseModel):
"""An ORM representation of a block schema reference."""
parent_block_schema_id: UUID = Field(
..., description="ID of block schema the reference is nested within"
)
parent_block_schema: Optional[BlockSchema] = Field(
None, description="The block schema the reference is nested within"
)
reference_block_schema_id: UUID = Field(
..., description="ID of the nested block schema"
)
reference_block_schema: Optional[BlockSchema] = Field(
None, description="The nested block schema"
)
name: str = Field(..., description="The name that the reference is nested under")
name
pydantic-field
required
¶
Type: str
The name that the reference is nested under
parent_block_schema
pydantic-field
¶
Type: BlockSchema
The block schema the reference is nested within
parent_block_schema_id
pydantic-field
required
¶
Type: UUID
ID of block schema the reference is nested within
reference_block_schema
pydantic-field
¶
Type: BlockSchema
The nested block schema
reference_block_schema_id
pydantic-field
required
¶
Type: UUID
ID of the nested block schema
BlockType
pydantic-model
¶
An ORM representation of a block type
Source code in prefect/orion/schemas/core.py
class BlockType(ORMBaseModel):
"""An ORM representation of a block type"""
name: str = Field(..., description="A block type's name")
slug: str = Field(..., description="A block type's slug")
logo_url: Optional[HttpUrl] = Field(
None, description="Web URL for the block type's logo"
)
documentation_url: Optional[HttpUrl] = Field(
None, description="Web URL for the block type's documentation"
)
description: Optional[str] = Field(
None, description="A short blurb about the corresponding block's intended use"
)
code_example: Optional[str] = Field(
None, description="A code snippet demonstrating use of the corresponding block"
)
is_protected: bool = Field(
False, description="Protected block types cannot be modified via API."
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
code_example
pydantic-field
¶
Type: str
A code snippet demonstrating use of the corresponding block
description
pydantic-field
¶
Type: str
A short blurb about the corresponding block's intended use
documentation_url
pydantic-field
¶
Type: HttpUrl
Web URL for the block type's documentation
is_protected
pydantic-field
¶
Type: bool
Protected block types cannot be modified via API.
logo_url
pydantic-field
¶
Type: HttpUrl
Web URL for the block type's logo
name
pydantic-field
required
¶
Type: str
A block type's name
slug
pydantic-field
required
¶
Type: str
A block type's slug
ConcurrencyLimit
pydantic-model
¶
An ORM representation of a concurrency limit.
Source code in prefect/orion/schemas/core.py
class ConcurrencyLimit(ORMBaseModel):
"""An ORM representation of a concurrency limit."""
tag: str = Field(..., description="A tag the concurrency limit is applied to.")
concurrency_limit: int = Field(..., description="The concurrency limit.")
active_slots: List[UUID] = Field(
default_factory=list,
description="A list of active run ids using a concurrency slot",
)
active_slots
pydantic-field
¶
Type: List[uuid.UUID]
A list of active run ids using a concurrency slot
concurrency_limit
pydantic-field
required
¶
Type: int
The concurrency limit.
tag
pydantic-field
required
¶
Type: str
A tag the concurrency limit is applied to.
Configuration
pydantic-model
¶
An ORM representation of account info.
Source code in prefect/orion/schemas/core.py
class Configuration(ORMBaseModel):
"""An ORM representation of account info."""
key: str = Field(..., description="Account info key")
value: dict = Field(..., description="Account info")
key
pydantic-field
required
¶
Type: str
Account info key
value
pydantic-field
required
¶
Type: dict
Account info
Deployment
pydantic-model
¶
An ORM representation of deployment data.
Source code in prefect/orion/schemas/core.py
class Deployment(ORMBaseModel):
"""An ORM representation of deployment data."""
name: str = Field(..., description="The name of the deployment.")
version: Optional[str] = Field(
None, description="An optional version for the deployment."
)
description: str = Field(None, description="A description for the deployment.")
flow_id: UUID = Field(
..., description="The flow id associated with the deployment."
)
schedule: schemas.schedules.SCHEDULE_TYPES = Field(
None, description="A schedule for the deployment."
)
is_schedule_active: bool = Field(
True, description="Whether or not the deployment schedule is active."
)
infra_overrides: Dict[str, Any] = Field(
default_factory=dict,
description="Overrides to apply to the base infrastructure block at runtime.",
)
parameters: Dict[str, Any] = Field(
default_factory=dict,
description="Parameters for flow runs scheduled by the deployment.",
)
tags: List[str] = Field(
default_factory=list,
description="A list of tags for the deployment",
example=["tag-1", "tag-2"],
)
parameter_openapi_schema: Dict[str, Any] = Field(
None,
description="The parameter schema of the flow, including defaults.",
)
path: str = Field(
None,
description="The path to the working directory for the workflow, relative to remote storage or an absolute path.",
)
entrypoint: str = Field(
None,
description="The path to the entrypoint for the workflow, relative to the `path`.",
)
manifest_path: str = Field(
None,
description="The path to the flow's manifest file, relative to the chosen storage.",
)
storage_document_id: Optional[UUID] = Field(
None,
description="The block document defining storage used for this flow.",
)
infrastructure_document_id: Optional[UUID] = Field(
None,
description="The block document defining infrastructure to use for flow runs.",
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
description
pydantic-field
¶
Type: str
A description for the deployment.
entrypoint
pydantic-field
¶
Type: str
The path to the entrypoint for the workflow, relative to the path
.
flow_id
pydantic-field
required
¶
Type: UUID
The flow id associated with the deployment.
infra_overrides
pydantic-field
¶
Type: Dict[str, Any]
Overrides to apply to the base infrastructure block at runtime.
infrastructure_document_id
pydantic-field
¶
Type: UUID
The block document defining infrastructure to use for flow runs.
is_schedule_active
pydantic-field
¶
Type: bool
Whether or not the deployment schedule is active.
manifest_path
pydantic-field
¶
Type: str
The path to the flow's manifest file, relative to the chosen storage.
name
pydantic-field
required
¶
Type: str
The name of the deployment.
parameter_openapi_schema
pydantic-field
¶
Type: Dict[str, Any]
The parameter schema of the flow, including defaults.
parameters
pydantic-field
¶
Type: Dict[str, Any]
Parameters for flow runs scheduled by the deployment.
path
pydantic-field
¶
Type: str
The path to the working directory for the workflow, relative to remote storage or an absolute path.
schedule
pydantic-field
¶
Type: Union[prefect.orion.schemas.schedules.IntervalSchedule, prefect.orion.schemas.schedules.CronSchedule, prefect.orion.schemas.schedules.RRuleSchedule]
A schedule for the deployment.
storage_document_id
pydantic-field
¶
Type: UUID
The block document defining storage used for this flow.
tags
pydantic-field
¶
Type: List[str]
A list of tags for the deployment
version
pydantic-field
¶
Type: str
An optional version for the deployment.
Flow
pydantic-model
¶
An ORM representation of flow data.
Source code in prefect/orion/schemas/core.py
class Flow(ORMBaseModel):
"""An ORM representation of flow data."""
name: str = Field(..., description="The name of the flow", example="my-flow")
tags: List[str] = Field(
default_factory=list,
description="A list of flow tags",
example=["tag-1", "tag-2"],
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
name
pydantic-field
required
¶
Type: str
The name of the flow
tags
pydantic-field
¶
Type: List[str]
A list of flow tags
FlowRun
pydantic-model
¶
An ORM representation of flow run data.
Source code in prefect/orion/schemas/core.py
class FlowRun(ORMBaseModel):
"""An ORM representation of flow run data."""
name: str = Field(
default_factory=lambda: generate_slug(2),
description="The name of the flow run. Defaults to a random slug if not specified.",
example="my-flow-run",
)
flow_id: UUID = Field(..., description="The id of the flow being run.")
state_id: UUID = Field(None, description="The id of the flow run's current state.")
deployment_id: UUID = Field(
None,
description="The id of the deployment associated with this flow run, if available.",
)
flow_version: str = Field(
None,
description="The version of the flow executed in this flow run.",
example="1.0",
)
parameters: dict = Field(
default_factory=dict, description="Parameters for the flow run."
)
idempotency_key: str = Field(
None,
description="An optional idempotency key for the flow run. Used to ensure the same flow run is not created multiple times.",
)
context: dict = Field(
default_factory=dict,
description="Additional context for the flow run.",
example={"my_var": "my_val"},
)
empirical_policy: FlowRunPolicy = Field(
default_factory=FlowRunPolicy,
)
tags: List[str] = Field(
default_factory=list,
description="A list of tags on the flow run",
example=["tag-1", "tag-2"],
)
parent_task_run_id: UUID = Field(
None,
description="If the flow run is a subflow, the id of the 'dummy' task in the parent flow used to track subflow state.",
)
state_type: schemas.states.StateType = Field(
None, description="The type of the current flow run state."
)
state_name: str = Field(None, description="The name of the current flow run state.")
run_count: int = Field(
0, description="The number of times the flow run was executed."
)
expected_start_time: DateTimeTZ = Field(
None,
description="The flow run's expected start time.",
)
next_scheduled_start_time: DateTimeTZ = Field(
None,
description="The next time the flow run is scheduled to start.",
)
start_time: DateTimeTZ = Field(None, description="The actual start time.")
end_time: DateTimeTZ = Field(None, description="The actual end time.")
total_run_time: datetime.timedelta = Field(
datetime.timedelta(0),
description="Total run time. If the flow run was executed multiple times, the time of each run will be summed.",
)
estimated_run_time: datetime.timedelta = Field(
datetime.timedelta(0), description="A real-time estimate of the total run time."
)
estimated_start_time_delta: datetime.timedelta = Field(
datetime.timedelta(0),
description="The difference between actual and expected start time.",
)
auto_scheduled: bool = Field(
False, description="Whether or not the flow run was automatically scheduled."
)
infrastructure_document_id: Optional[UUID] = Field(
None,
description="The block document defining infrastructure to use this flow run.",
)
# relationships
# flow: Flow = None
# task_runs: List["TaskRun"] = Field(default_factory=list)
state: schemas.states.State = Field(
None, description="The current state of the flow run."
)
# parent_task_run: "TaskRun" = None
@validator("name", pre=True)
def set_name(cls, name):
return name or generate_slug(2)
def __eq__(self, other: Any) -> bool:
"""
Check for "equality" to another flow run schema
Estimates times are rolling and will always change with repeated queries for
a flow run so we ignore them during equality checks.
"""
if isinstance(other, FlowRun):
exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
return self.dict(exclude=exclude_fields) == other.dict(
exclude=exclude_fields
)
return super().__eq__(other)
auto_scheduled
pydantic-field
¶
Type: bool
Whether or not the flow run was automatically scheduled.
context
pydantic-field
¶
Type: dict
Additional context for the flow run.
deployment_id
pydantic-field
¶
Type: UUID
The id of the deployment associated with this flow run, if available.
end_time
pydantic-field
¶
Type: DateTimeTZ
The actual end time.
estimated_run_time
pydantic-field
¶
Type: timedelta
A real-time estimate of the total run time.
estimated_start_time_delta
pydantic-field
¶
Type: timedelta
The difference between actual and expected start time.
expected_start_time
pydantic-field
¶
Type: DateTimeTZ
The flow run's expected start time.
flow_id
pydantic-field
required
¶
Type: UUID
The id of the flow being run.
flow_version
pydantic-field
¶
Type: str
The version of the flow executed in this flow run.
idempotency_key
pydantic-field
¶
Type: str
An optional idempotency key for the flow run. Used to ensure the same flow run is not created multiple times.
infrastructure_document_id
pydantic-field
¶
Type: UUID
The block document defining infrastructure to use this flow run.
name
pydantic-field
¶
Type: str
The name of the flow run. Defaults to a random slug if not specified.
next_scheduled_start_time
pydantic-field
¶
Type: DateTimeTZ
The next time the flow run is scheduled to start.
parameters
pydantic-field
¶
Type: dict
Parameters for the flow run.
parent_task_run_id
pydantic-field
¶
Type: UUID
If the flow run is a subflow, the id of the 'dummy' task in the parent flow used to track subflow state.
run_count
pydantic-field
¶
Type: int
The number of times the flow run was executed.
start_time
pydantic-field
¶
Type: DateTimeTZ
The actual start time.
state
pydantic-field
¶
Type: State
The current state of the flow run.
state_id
pydantic-field
¶
Type: UUID
The id of the flow run's current state.
state_name
pydantic-field
¶
Type: str
The name of the current flow run state.
state_type
pydantic-field
¶
Type: StateType
The type of the current flow run state.
tags
pydantic-field
¶
Type: List[str]
A list of tags on the flow run
total_run_time
pydantic-field
¶
Type: timedelta
Total run time. If the flow run was executed multiple times, the time of each run will be summed.
FlowRunNotificationPolicy
pydantic-model
¶
An ORM representation of a flow run notification.
Source code in prefect/orion/schemas/core.py
class FlowRunNotificationPolicy(ORMBaseModel):
"""An ORM representation of a flow run notification."""
is_active: bool = Field(True, description="Whether the policy is currently active")
state_names: List[str] = Field(
..., description="The flow run states that trigger notifications"
)
tags: List[str] = Field(
...,
description="The flow run tags that trigger notifications (set [] to disable)",
)
block_document_id: UUID = Field(
..., description="The block document ID used for sending notifications"
)
message_template: str = Field(
None,
description=(
"A templatable notification message. Use {braces} to add variables. "
f'Valid variables include: {listrepr(sorted(FLOW_RUN_NOTIFICATION_TEMPLATE_KWARGS), sep=", ")}'
),
example="Flow run {flow_run_name} with id {flow_run_id} entered state {flow_run_state_name}.",
)
@validator("message_template")
def validate_message_template_variables(cls, v):
if v is not None:
try:
v.format(**{k: "test" for k in FLOW_RUN_NOTIFICATION_TEMPLATE_KWARGS})
except KeyError as exc:
raise ValueError(f"Invalid template variable provided: '{exc.args[0]}'")
return v
block_document_id
pydantic-field
required
¶
Type: UUID
The block document ID used for sending notifications
is_active
pydantic-field
¶
Type: bool
Whether the policy is currently active
message_template
pydantic-field
¶
Type: str
A templatable notification message. Use {braces} to add variables. Valid variables include: 'flow_id', 'flow_name', 'flow_run_id', 'flow_run_name', 'flow_run_notification_policy_id', 'flow_run_parameters', 'flow_run_state_message', 'flow_run_state_name', 'flow_run_state_timestamp', 'flow_run_state_type'
state_names
pydantic-field
required
¶
Type: List[str]
The flow run states that trigger notifications
tags
pydantic-field
required
¶
Type: List[str]
The flow run tags that trigger notifications (set [] to disable)
FlowRunPolicy
pydantic-model
¶
Defines of how a flow run should retry.
Source code in prefect/orion/schemas/core.py
class FlowRunPolicy(PrefectBaseModel):
"""Defines of how a flow run should retry."""
# TODO: Determine how to separate between infrastructure and within-process level
# retries
max_retries: int = 0
retry_delay_seconds: float = 0
FlowRunnerSettings
pydantic-model
¶
An API schema for passing details about the flow runner.
This schema is agnostic to the types and configuration provided by clients
Source code in prefect/orion/schemas/core.py
class FlowRunnerSettings(PrefectBaseModel):
"""
An API schema for passing details about the flow runner.
This schema is agnostic to the types and configuration provided by clients
"""
type: str = Field(
None,
description="The type of the flow runner which can be used by the client for dispatching.",
)
config: dict = Field(
None, description="The configuration for the given flow runner type."
)
# The following is required for composite compatibility in the ORM
def __init__(self, type: str = None, config: dict = None, **kwargs) -> None:
# Pydantic does not support positional arguments so they must be converted to
# keyword arguments
super().__init__(type=type, config=config, **kwargs)
def __composite_values__(self):
return self.type, self.config
config
pydantic-field
¶
Type: dict
The configuration for the given flow runner type.
type
pydantic-field
¶
Type: str
The type of the flow runner which can be used by the client for dispatching.
Log
pydantic-model
¶
An ORM representation of log data.
Source code in prefect/orion/schemas/core.py
class Log(ORMBaseModel):
"""An ORM representation of log data."""
name: str = Field(..., description="The logger name.")
level: int = Field(..., description="The log level.")
message: str = Field(..., description="The log message.")
timestamp: DateTimeTZ = Field(..., description="The log timestamp.")
flow_run_id: UUID = Field(
..., description="The flow run ID associated with the log."
)
task_run_id: Optional[UUID] = Field(
None, description="The task run ID associated with the log."
)
flow_run_id
pydantic-field
required
¶
Type: UUID
The flow run ID associated with the log.
level
pydantic-field
required
¶
Type: int
The log level.
message
pydantic-field
required
¶
Type: str
The log message.
name
pydantic-field
required
¶
Type: str
The logger name.
task_run_id
pydantic-field
¶
Type: UUID
The task run ID associated with the log.
timestamp
pydantic-field
required
¶
Type: DateTimeTZ
The log timestamp.
QueueFilter
pydantic-model
¶
Filter criteria definition for a work queue.
Source code in prefect/orion/schemas/core.py
class QueueFilter(PrefectBaseModel):
"""Filter criteria definition for a work queue."""
tags: Optional[List[str]] = Field(
None,
description="Only include flow runs with these tags in the work queue.",
)
deployment_ids: Optional[List[UUID]] = Field(
None,
description="Only include flow runs from these deployments in the work queue.",
)
def get_flow_run_filter(self) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's flow runs.
"""
return schemas.filters.FlowRunFilter(
tags=schemas.filters.FlowRunFilterTags(all_=self.tags),
deployment_id=schemas.filters.FlowRunFilterDeploymentId(
any_=self.deployment_ids,
is_null_=False,
),
)
def get_scheduled_flow_run_filter(
self, scheduled_before: datetime.datetime
) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's SCHEDULED flow runs.
Args:
scheduled_before: Create a FlowRunFilter that excludes runs scheduled before this date.
Returns:
Flow run filter that can be used to query the work queue for scheduled runs.
"""
return self.get_flow_run_filter().copy(
update={
"state": schemas.filters.FlowRunFilterState(
type=schemas.filters.FlowRunFilterStateType(
any_=[
schemas.states.StateType.SCHEDULED,
]
)
),
"next_scheduled_start_time": schemas.filters.FlowRunFilterNextScheduledStartTime(
before_=scheduled_before
),
}
)
def get_executing_flow_run_filter(self) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's PENDING or RUNNING flow runs.
"""
return self.get_flow_run_filter().copy(
update={
"state": schemas.filters.FlowRunFilterState(
type=schemas.filters.FlowRunFilterStateType(
any_=[
schemas.states.StateType.PENDING,
schemas.states.StateType.RUNNING,
]
)
)
}
)
deployment_ids
pydantic-field
¶
Type: List[uuid.UUID]
Only include flow runs from these deployments in the work queue.
tags
pydantic-field
¶
Type: List[str]
Only include flow runs with these tags in the work queue.
QueueFilter.get_executing_flow_run_filter
¶
Construct a flow run filter for the work queue's PENDING or RUNNING flow runs.
Source code in prefect/orion/schemas/core.py
def get_executing_flow_run_filter(self) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's PENDING or RUNNING flow runs.
"""
return self.get_flow_run_filter().copy(
update={
"state": schemas.filters.FlowRunFilterState(
type=schemas.filters.FlowRunFilterStateType(
any_=[
schemas.states.StateType.PENDING,
schemas.states.StateType.RUNNING,
]
)
)
}
)
QueueFilter.get_flow_run_filter
¶
Construct a flow run filter for the work queue's flow runs.
Source code in prefect/orion/schemas/core.py
def get_flow_run_filter(self) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's flow runs.
"""
return schemas.filters.FlowRunFilter(
tags=schemas.filters.FlowRunFilterTags(all_=self.tags),
deployment_id=schemas.filters.FlowRunFilterDeploymentId(
any_=self.deployment_ids,
is_null_=False,
),
)
QueueFilter.get_scheduled_flow_run_filter
¶
Construct a flow run filter for the work queue's SCHEDULED flow runs.
Parameters:
Name | Description | Default |
---|---|---|
scheduled_before |
Create a FlowRunFilter that excludes runs scheduled before this date. datetime |
required |
Returns:
Type | Description |
---|---|
schemas.filters.FlowRunFilter |
Flow run filter that can be used to query the work queue for scheduled runs. |
Source code in prefect/orion/schemas/core.py
def get_scheduled_flow_run_filter(
self, scheduled_before: datetime.datetime
) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's SCHEDULED flow runs.
Args:
scheduled_before: Create a FlowRunFilter that excludes runs scheduled before this date.
Returns:
Flow run filter that can be used to query the work queue for scheduled runs.
"""
return self.get_flow_run_filter().copy(
update={
"state": schemas.filters.FlowRunFilterState(
type=schemas.filters.FlowRunFilterStateType(
any_=[
schemas.states.StateType.SCHEDULED,
]
)
),
"next_scheduled_start_time": schemas.filters.FlowRunFilterNextScheduledStartTime(
before_=scheduled_before
),
}
)
SavedSearch
pydantic-model
¶
An ORM representation of saved search data. Represents a set of filter criteria.
Source code in prefect/orion/schemas/core.py
class SavedSearch(ORMBaseModel):
"""An ORM representation of saved search data. Represents a set of filter criteria."""
name: str = Field(..., description="The name of the saved search.")
filters: List[SavedSearchFilter] = Field(
default_factory=list, description="The filter set for the saved search."
)
filters
pydantic-field
¶
Type: List[prefect.orion.schemas.core.SavedSearchFilter]
The filter set for the saved search.
name
pydantic-field
required
¶
Type: str
The name of the saved search.
SavedSearchFilter
pydantic-model
¶
A filter for a saved search model. Intended for use by the Prefect UI.
Source code in prefect/orion/schemas/core.py
class SavedSearchFilter(PrefectBaseModel):
"""A filter for a saved search model. Intended for use by the Prefect UI."""
object: str = Field(..., description="The object over which to filter.")
property: str = Field(
..., description="The property of the object on which to filter."
)
type: str = Field(..., description="The type of the property.")
operation: str = Field(
..., description="The operator to apply to the object. For example, `equals`."
)
value: Any = Field(..., description="A JSON-compatible value for the filter.")
object
pydantic-field
required
¶
Type: str
The object over which to filter.
operation
pydantic-field
required
¶
Type: str
The operator to apply to the object. For example, equals
.
property
pydantic-field
required
¶
Type: str
The property of the object on which to filter.
type
pydantic-field
required
¶
Type: str
The type of the property.
value
pydantic-field
required
¶
Type: Any
A JSON-compatible value for the filter.
TaskRun
pydantic-model
¶
An ORM representation of task run data.
Source code in prefect/orion/schemas/core.py
class TaskRun(ORMBaseModel):
"""An ORM representation of task run data."""
name: str = Field(default_factory=lambda: generate_slug(2), example="my-task-run")
flow_run_id: UUID = Field(..., description="The flow run id of the task run.")
task_key: str = Field(
..., description="A unique identifier for the task being run."
)
dynamic_key: str = Field(
...,
description="A dynamic key used to differentiate between multiple runs of the same task within the same flow run.",
)
cache_key: str = Field(
None,
description="An optional cache key. If a COMPLETED state associated with this cache key is found, the cached COMPLETED state will be used instead of executing the task run.",
)
cache_expiration: DateTimeTZ = Field(
None, description="Specifies when the cached state should expire."
)
task_version: str = Field(None, description="The version of the task being run.")
empirical_policy: TaskRunPolicy = Field(
default_factory=TaskRunPolicy,
)
tags: List[str] = Field(
default_factory=list,
description="A list of tags for the task run.",
example=["tag-1", "tag-2"],
)
state_id: UUID = Field(None, description="The id of the current task run state.")
task_inputs: Dict[str, List[Union[TaskRunResult, Parameter, Constant]]] = Field(
default_factory=dict,
description="Tracks the source of inputs to a task run. Used for internal bookkeeping.",
)
state_type: schemas.states.StateType = Field(
None, description="The type of the current task run state."
)
state_name: str = Field(None, description="The name of the current task run state.")
run_count: int = Field(
0, description="The number of times the task run has been executed."
)
expected_start_time: DateTimeTZ = Field(
None,
description="The task run's expected start time.",
)
# the next scheduled start time will be populated
# whenever the run is in a scheduled state
next_scheduled_start_time: DateTimeTZ = Field(
None,
description="The next time the task run is scheduled to start.",
)
start_time: DateTimeTZ = Field(None, description="The actual start time.")
end_time: DateTimeTZ = Field(None, description="The actual end time.")
total_run_time: datetime.timedelta = Field(
datetime.timedelta(0),
description="Total run time. If the task run was executed multiple times, the time of each run will be summed.",
)
estimated_run_time: datetime.timedelta = Field(
datetime.timedelta(0), description="A real-time estimate of total run time."
)
estimated_start_time_delta: datetime.timedelta = Field(
datetime.timedelta(0),
description="The difference between actual and expected start time.",
)
# relationships
# flow_run: FlowRun = None
# subflow_runs: List[FlowRun] = Field(default_factory=list)
state: schemas.states.State = Field(None, description="The current task run state.")
@validator("name", pre=True)
def set_name(cls, name):
return name or generate_slug(2)
cache_expiration
pydantic-field
¶
Type: DateTimeTZ
Specifies when the cached state should expire.
cache_key
pydantic-field
¶
Type: str
An optional cache key. If a COMPLETED state associated with this cache key is found, the cached COMPLETED state will be used instead of executing the task run.
dynamic_key
pydantic-field
required
¶
Type: str
A dynamic key used to differentiate between multiple runs of the same task within the same flow run.
end_time
pydantic-field
¶
Type: DateTimeTZ
The actual end time.
estimated_run_time
pydantic-field
¶
Type: timedelta
A real-time estimate of total run time.
estimated_start_time_delta
pydantic-field
¶
Type: timedelta
The difference between actual and expected start time.
expected_start_time
pydantic-field
¶
Type: DateTimeTZ
The task run's expected start time.
flow_run_id
pydantic-field
required
¶
Type: UUID
The flow run id of the task run.
next_scheduled_start_time
pydantic-field
¶
Type: DateTimeTZ
The next time the task run is scheduled to start.
run_count
pydantic-field
¶
Type: int
The number of times the task run has been executed.
start_time
pydantic-field
¶
Type: DateTimeTZ
The actual start time.
state
pydantic-field
¶
Type: State
The current task run state.
state_id
pydantic-field
¶
Type: UUID
The id of the current task run state.
state_name
pydantic-field
¶
Type: str
The name of the current task run state.
state_type
pydantic-field
¶
Type: StateType
The type of the current task run state.
tags
pydantic-field
¶
Type: List[str]
A list of tags for the task run.
task_inputs
pydantic-field
¶
Type: Dict[str, List[Union[prefect.orion.schemas.core.TaskRunResult, prefect.orion.schemas.core.Parameter, prefect.orion.schemas.core.Constant]]]
Tracks the source of inputs to a task run. Used for internal bookkeeping.
task_key
pydantic-field
required
¶
Type: str
A unique identifier for the task being run.
task_version
pydantic-field
¶
Type: str
The version of the task being run.
total_run_time
pydantic-field
¶
Type: timedelta
Total run time. If the task run was executed multiple times, the time of each run will be summed.
WorkQueue
pydantic-model
¶
An ORM representation of a work queue
Source code in prefect/orion/schemas/core.py
class WorkQueue(ORMBaseModel):
"""An ORM representation of a work queue"""
filter: QueueFilter = Field(
default_factory=QueueFilter, description="Filter criteria for the work queue."
)
name: str = Field(..., description="The name of the work queue.")
description: Optional[str] = Field(
"", description="An optional description for the work queue."
)
is_paused: bool = Field(
False, description="Whether or not the work queue is paused."
)
concurrency_limit: Optional[int] = Field(
None, description="An optional concurrency limit for the work queue."
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
concurrency_limit
pydantic-field
¶
Type: int
An optional concurrency limit for the work queue.
description
pydantic-field
¶
Type: str
An optional description for the work queue.
filter
pydantic-field
¶
Type: QueueFilter
Filter criteria for the work queue.
is_paused
pydantic-field
¶
Type: bool
Whether or not the work queue is paused.
name
pydantic-field
required
¶
Type: str
The name of the work queue.
raise_on_invalid_name
¶
Raise an InvalidNameError if the given name contains any invalid characters.
Source code in prefect/orion/schemas/core.py
def raise_on_invalid_name(name: str) -> None:
"""
Raise an InvalidNameError if the given name contains any invalid
characters.
"""
if any(c in name for c in INVALID_CHARACTERS):
raise InvalidNameError(
f"Name {name!r} contains an invalid character. "
f"Must not contain any of: {INVALID_CHARACTERS}."
)