Skip to content

prefect.orion.schemas.filters

Schemas that define Orion filtering operations.

Each filter schema includes logic for transforming itself into a SQL where clause.

BlockDocumentFilter pydantic-model

Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned

Source code in prefect/orion/schemas/filters.py
class BlockDocumentFilter(PrefectOperatorFilterBaseModel):
    """Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned"""

    is_anonymous: Optional[BlockDocumentFilterIsAnonymous] = Field(
        # default is to exclude anonymous blocks
        BlockDocumentFilterIsAnonymous(eq_=False),
        description=(
            "Filter criteria for `BlockDocument.is_anonymous`. "
            "Defaults to excluding anonymous blocks."
        ),
    )
    block_type_id: Optional[BlockDocumentFilterBlockTypeId] = Field(
        None, description="Filter criteria for `BlockDocument.block_type_id`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.is_anonymous is not None:
            filters.append(self.is_anonymous.as_sql_filter(db))
        if self.block_type_id is not None:
            filters.append(self.block_type_id.as_sql_filter(db))

        return filters

block_type_id pydantic-field

Type: BlockDocumentFilterBlockTypeId

Filter criteria for BlockDocument.block_type_id

is_anonymous pydantic-field

Type: BlockDocumentFilterIsAnonymous

Filter criteria for BlockDocument.is_anonymous. Defaults to excluding anonymous blocks.

BlockDocumentFilterBlockTypeId pydantic-model

Filter by BlockDocument.block_type_id.

Source code in prefect/orion/schemas/filters.py
class BlockDocumentFilterBlockTypeId(PrefectFilterBaseModel):
    """Filter by `BlockDocument.block_type_id`."""

    any_: List[UUID] = Field(None, description="A list of block type ids to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.BlockDocument.block_type_id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of block type ids to include

BlockDocumentFilterIsAnonymous pydantic-model

Filter by BlockDocument.is_anonymous.

Source code in prefect/orion/schemas/filters.py
class BlockDocumentFilterIsAnonymous(PrefectFilterBaseModel):
    """Filter by `BlockDocument.is_anonymous`."""

    eq_: bool = Field(
        None,
        description="Filter block documents for only those that are or are not anonymous.",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.eq_ is not None:
            filters.append(db.BlockDocument.is_anonymous.is_(self.eq_))
        return filters

eq_ pydantic-field

Type: bool

Filter block documents for only those that are or are not anonymous.

BlockSchemaFilter pydantic-model

Filter BlockSchemas

Source code in prefect/orion/schemas/filters.py
class BlockSchemaFilter(PrefectOperatorFilterBaseModel):
    """Filter BlockSchemas"""

    block_type_id: Optional[BlockSchemaFilterBlockTypeId] = Field(
        None, description="Filter criteria for `BlockSchema.block_type_id`"
    )
    block_capabilities: Optional[BlockSchemaFilterCapabilities] = Field(
        None, description="Filter criteria for `BlockSchema.capabilities`"
    )
    id: Optional[BlockSchemaFilterId] = Field(
        None, description="Filter criteria for `BlockSchema.id`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.block_type_id is not None:
            filters.append(self.block_type_id.as_sql_filter(db))
        if self.block_capabilities is not None:
            filters.append(self.block_capabilities.as_sql_filter(db))
        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))

        return filters

block_capabilities pydantic-field

Type: BlockSchemaFilterCapabilities

Filter criteria for BlockSchema.capabilities

block_type_id pydantic-field

Type: BlockSchemaFilterBlockTypeId

Filter criteria for BlockSchema.block_type_id

id pydantic-field

Type: BlockSchemaFilterId

Filter criteria for BlockSchema.id

BlockSchemaFilterBlockTypeId pydantic-model

Filter by BlockSchema.block_type_id.

Source code in prefect/orion/schemas/filters.py
class BlockSchemaFilterBlockTypeId(PrefectFilterBaseModel):
    """Filter by `BlockSchema.block_type_id`."""

    any_: List[UUID] = Field(None, description="A list of block type ids to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.BlockSchema.block_type_id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of block type ids to include

BlockSchemaFilterCapabilities pydantic-model

Filter by BlockSchema.capabilities

Source code in prefect/orion/schemas/filters.py
class BlockSchemaFilterCapabilities(PrefectFilterBaseModel):
    """Filter by `BlockSchema.capabilities`"""

    all_: List[str] = Field(
        None,
        example=["write-storage", "read-storage"],
        description="A list of block capabilities. Block entities will be returned "
        "only if an associated block schema has a superset of the defined capabilities.",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.BlockSchema.capabilities, self.all_))
        return filters

all_ pydantic-field

Type: List[str]

A list of block capabilities. Block entities will be returned only if an associated block schema has a superset of the defined capabilities.

BlockSchemaFilterId pydantic-model

Filter by BlockSchema.id

Source code in prefect/orion/schemas/filters.py
class BlockSchemaFilterId(PrefectFilterBaseModel):
    """Filter by BlockSchema.id"""

    any_: List[UUID] = Field(None, description="A list of IDs to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.BlockSchema.id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of IDs to include

BlockTypeFilter pydantic-model

Filter BlockTypes

Source code in prefect/orion/schemas/filters.py
class BlockTypeFilter(PrefectFilterBaseModel):
    """Filter BlockTypes"""

    name: Optional[BlockTypeFilterName] = Field(
        None, description="Filter criteria for `BlockType.name`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))

        return filters

name pydantic-field

Type: BlockTypeFilterName

Filter criteria for BlockType.name

BlockTypeFilterName pydantic-model

Filter by BlockType.name

Source code in prefect/orion/schemas/filters.py
class BlockTypeFilterName(PrefectFilterBaseModel):
    """Filter by `BlockType.name`"""

    like_: str = Field(
        None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.like_ is not None:
            filters.append(db.BlockType.name.ilike(f"%{self.like_}%"))
        return filters

like_ pydantic-field

Type: str

A case-insensitive partial match. For example, passing 'marvin' will match 'marvin', 'sad-Marvin', and 'marvin-robot'.

DeploymentFilter pydantic-model

Filter for deployments. Only deployments matching all criteria will be returned.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilter(PrefectOperatorFilterBaseModel):
    """Filter for deployments. Only deployments matching all criteria will be returned."""

    id: Optional[DeploymentFilterId] = Field(
        None, description="Filter criteria for `Deployment.id`"
    )
    name: Optional[DeploymentFilterName] = Field(
        None, description="Filter criteria for `Deployment.name`"
    )
    is_schedule_active: Optional[DeploymentFilterIsScheduleActive] = Field(
        None, description="Filter criteria for `Deployment.is_schedule_active`"
    )
    tags: Optional[DeploymentFilterTags] = Field(
        None, description="Filter criteria for `Deployment.tags`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))
        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))
        if self.is_schedule_active is not None:
            filters.append(self.is_schedule_active.as_sql_filter(db))
        if self.tags is not None:
            filters.append(self.tags.as_sql_filter(db))

        return filters

id pydantic-field

Type: DeploymentFilterId

Filter criteria for Deployment.id

is_schedule_active pydantic-field

Type: DeploymentFilterIsScheduleActive

Filter criteria for Deployment.is_schedule_active

name pydantic-field

Type: DeploymentFilterName

Filter criteria for Deployment.name

tags pydantic-field

Type: DeploymentFilterTags

Filter criteria for Deployment.tags

DeploymentFilterId pydantic-model

Filter by Deployment.id.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilterId(PrefectFilterBaseModel):
    """Filter by `Deployment.id`."""

    any_: List[UUID] = Field(None, description="A list of deployment ids to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Deployment.id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of deployment ids to include

DeploymentFilterIsScheduleActive pydantic-model

Filter by Deployment.is_schedule_active.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilterIsScheduleActive(PrefectFilterBaseModel):
    """Filter by `Deployment.is_schedule_active`."""

    eq_: bool = Field(
        None,
        description="Only returns where deployment schedule is/is not active",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.eq_ is not None:
            filters.append(db.Deployment.is_schedule_active.is_(self.eq_))
        return filters

eq_ pydantic-field

Type: bool

Only returns where deployment schedule is/is not active

DeploymentFilterName pydantic-model

Filter by Deployment.name.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilterName(PrefectFilterBaseModel):
    """Filter by `Deployment.name`."""

    any_: List[str] = Field(
        None,
        description="A list of deployment names to include",
        example=["my-deployment-1", "my-deployment-2"],
    )

    like_: str = Field(
        None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Deployment.name.in_(self.any_))
        if self.like_ is not None:
            filters.append(db.Deployment.name.ilike(f"%{self.like_}%"))
        return filters

any_ pydantic-field

Type: List[str]

A list of deployment names to include

like_ pydantic-field

Type: str

A case-insensitive partial match. For example, passing 'marvin' will match 'marvin', 'sad-Marvin', and 'marvin-robot'.

DeploymentFilterTags pydantic-model

Filter by Deployment.tags.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilterTags(PrefectOperatorFilterBaseModel):
    """Filter by `Deployment.tags`."""

    all_: List[str] = Field(
        None,
        example=["tag-1", "tag-2"],
        description="A list of tags. Deployments will be returned only if their tags are a superset of the list",
    )
    is_null_: bool = Field(
        None, description="If true, only include deployments without tags"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.Deployment.tags, self.all_))
        if self.is_null_ is not None:
            filters.append(
                db.Deployment.tags == [] if self.is_null_ else db.Deployment.tags != []
            )
        return filters

all_ pydantic-field

Type: List[str]

A list of tags. Deployments will be returned only if their tags are a superset of the list

is_null_ pydantic-field

Type: bool

If true, only include deployments without tags

FilterSet pydantic-model

A collection of filters for common objects

Source code in prefect/orion/schemas/filters.py
class FilterSet(PrefectBaseModel):
    """A collection of filters for common objects"""

    flows: FlowFilter = Field(
        default_factory=FlowFilter, description="Filters that apply to flows"
    )
    flow_runs: FlowRunFilter = Field(
        default_factory=FlowRunFilter, description="Filters that apply to flow runs"
    )
    task_runs: TaskRunFilter = Field(
        default_factory=TaskRunFilter, description="Filters that apply to task runs"
    )
    deployments: DeploymentFilter = Field(
        default_factory=DeploymentFilter,
        description="Filters that apply to deployments",
    )

deployments pydantic-field

Type: DeploymentFilter

Filters that apply to deployments

flow_runs pydantic-field

Type: FlowRunFilter

Filters that apply to flow runs

flows pydantic-field

Type: FlowFilter

Filters that apply to flows

task_runs pydantic-field

Type: TaskRunFilter

Filters that apply to task runs

FlowFilter pydantic-model

Filter for flows. Only flows matching all criteria will be returned.

Source code in prefect/orion/schemas/filters.py
class FlowFilter(PrefectOperatorFilterBaseModel):
    """Filter for flows. Only flows matching all criteria will be returned."""

    id: Optional[FlowFilterId] = Field(
        None, description="Filter criteria for `Flow.id`"
    )
    name: Optional[FlowFilterName] = Field(
        None, description="Filter criteria for `Flow.name`"
    )
    tags: Optional[FlowFilterTags] = Field(
        None, description="Filter criteria for `Flow.tags`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))
        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))
        if self.tags is not None:
            filters.append(self.tags.as_sql_filter(db))

        return filters

id pydantic-field

Type: FlowFilterId

Filter criteria for Flow.id

name pydantic-field

Type: FlowFilterName

Filter criteria for Flow.name

tags pydantic-field

Type: FlowFilterTags

Filter criteria for Flow.tags

FlowFilterId pydantic-model

Filter by Flow.id.

Source code in prefect/orion/schemas/filters.py
class FlowFilterId(PrefectFilterBaseModel):
    """Filter by `Flow.id`."""

    any_: List[UUID] = Field(None, description="A list of flow ids to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Flow.id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow ids to include

FlowFilterName pydantic-model

Filter by Flow.name.

Source code in prefect/orion/schemas/filters.py
class FlowFilterName(PrefectFilterBaseModel):
    """Filter by `Flow.name`."""

    any_: List[str] = Field(
        None,
        description="A list of flow names to include",
        example=["my-flow-1", "my-flow-2"],
    )

    like_: str = Field(
        None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Flow.name.in_(self.any_))
        if self.like_ is not None:
            filters.append(db.Flow.name.ilike(f"%{self.like_}%"))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow names to include

like_ pydantic-field

Type: str

A case-insensitive partial match. For example, passing 'marvin' will match 'marvin', 'sad-Marvin', and 'marvin-robot'.

FlowFilterTags pydantic-model

Filter by Flow.tags.

Source code in prefect/orion/schemas/filters.py
class FlowFilterTags(PrefectOperatorFilterBaseModel):
    """Filter by `Flow.tags`."""

    all_: List[str] = Field(
        None,
        example=["tag-1", "tag-2"],
        description="A list of tags. Flows will be returned only if their tags are a superset of the list",
    )
    is_null_: bool = Field(None, description="If true, only include flows without tags")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.Flow.tags, self.all_))
        if self.is_null_ is not None:
            filters.append(db.Flow.tags == [] if self.is_null_ else db.Flow.tags != [])
        return filters

all_ pydantic-field

Type: List[str]

A list of tags. Flows will be returned only if their tags are a superset of the list

is_null_ pydantic-field

Type: bool

If true, only include flows without tags

FlowRunFilter pydantic-model

Filter flow runs. Only flow runs matching all criteria will be returned

Source code in prefect/orion/schemas/filters.py
class FlowRunFilter(PrefectOperatorFilterBaseModel):
    """Filter flow runs. Only flow runs matching all criteria will be returned"""

    id: Optional[FlowRunFilterId] = Field(
        None, description="Filter criteria for `FlowRun.id`"
    )
    name: Optional[FlowRunFilterName] = Field(
        None, description="Filter criteria for `FlowRun.name`"
    )
    tags: Optional[FlowRunFilterTags] = Field(
        None, description="Filter criteria for `FlowRun.tags`"
    )
    deployment_id: Optional[FlowRunFilterDeploymentId] = Field(
        None, description="Filter criteria for `FlowRun.deployment_id`"
    )
    state: Optional[FlowRunFilterState] = Field(
        None, description="Filter criteria for `FlowRun.state`"
    )
    flow_version: Optional[FlowRunFilterFlowVersion] = Field(
        None, description="Filter criteria for `FlowRun.flow_version`"
    )
    start_time: Optional[FlowRunFilterStartTime] = Field(
        None, description="Filter criteria for `FlowRun.start_time`"
    )
    expected_start_time: Optional[FlowRunFilterExpectedStartTime] = Field(
        None, description="Filter criteria for `FlowRun.expected_start_time`"
    )
    next_scheduled_start_time: Optional[FlowRunFilterNextScheduledStartTime] = Field(
        None, description="Filter criteria for `FlowRun.next_scheduled_start_time`"
    )
    parent_task_run_id: Optional[FlowRunFilterParentTaskRunId] = Field(
        None, description="Filter criteria for `FlowRun.parent_task_run_id`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))
        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))
        if self.tags is not None:
            filters.append(self.tags.as_sql_filter(db))
        if self.deployment_id is not None:
            filters.append(self.deployment_id.as_sql_filter(db))
        if self.flow_version is not None:
            filters.append(self.flow_version.as_sql_filter(db))
        if self.state is not None:
            filters.append(self.state.as_sql_filter(db))
        if self.start_time is not None:
            filters.append(self.start_time.as_sql_filter(db))
        if self.expected_start_time is not None:
            filters.append(self.expected_start_time.as_sql_filter(db))
        if self.next_scheduled_start_time is not None:
            filters.append(self.next_scheduled_start_time.as_sql_filter(db))
        if self.parent_task_run_id is not None:
            filters.append(self.parent_task_run_id.as_sql_filter(db))

        return filters

deployment_id pydantic-field

Type: FlowRunFilterDeploymentId

Filter criteria for FlowRun.deployment_id

expected_start_time pydantic-field

Type: FlowRunFilterExpectedStartTime

Filter criteria for FlowRun.expected_start_time

flow_version pydantic-field

Type: FlowRunFilterFlowVersion

Filter criteria for FlowRun.flow_version

id pydantic-field

Type: FlowRunFilterId

Filter criteria for FlowRun.id

name pydantic-field

Type: FlowRunFilterName

Filter criteria for FlowRun.name

next_scheduled_start_time pydantic-field

Type: FlowRunFilterNextScheduledStartTime

Filter criteria for FlowRun.next_scheduled_start_time

parent_task_run_id pydantic-field

Type: FlowRunFilterParentTaskRunId

Filter criteria for FlowRun.parent_task_run_id

start_time pydantic-field

Type: FlowRunFilterStartTime

Filter criteria for FlowRun.start_time

state pydantic-field

Type: FlowRunFilterState

Filter criteria for FlowRun.state

tags pydantic-field

Type: FlowRunFilterTags

Filter criteria for FlowRun.tags

FlowRunFilterDeploymentId pydantic-model

Filter by FlowRun.deployment_id.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterDeploymentId(PrefectOperatorFilterBaseModel):
    """Filter by `FlowRun.deployment_id`."""

    any_: List[UUID] = Field(
        None, description="A list of flow run deployment ids to include"
    )
    is_null_: bool = Field(
        None, description="If true, only include flow runs without deployment ids"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.deployment_id.in_(self.any_))
        if self.is_null_ is not None:
            filters.append(
                db.FlowRun.deployment_id == None
                if self.is_null_
                else db.FlowRun.deployment_id != None
            )
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run deployment ids to include

is_null_ pydantic-field

Type: bool

If true, only include flow runs without deployment ids

FlowRunFilterExpectedStartTime pydantic-model

Filter by FlowRun.expected_start_time.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterExpectedStartTime(PrefectFilterBaseModel):
    """Filter by `FlowRun.expected_start_time`."""

    before_: DateTimeTZ = Field(
        None,
        description="Only include flow runs scheduled to start at or before this time",
    )
    after_: DateTimeTZ = Field(
        None,
        description="Only include flow runs scheduled to start at or after this time",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.FlowRun.expected_start_time <= self.before_)
        if self.after_ is not None:
            filters.append(db.FlowRun.expected_start_time >= self.after_)
        return filters

after_ pydantic-field

Type: DateTimeTZ

Only include flow runs scheduled to start at or after this time

before_ pydantic-field

Type: DateTimeTZ

Only include flow runs scheduled to start at or before this time

FlowRunFilterFlowVersion pydantic-model

Filter by FlowRun.flow_version.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterFlowVersion(PrefectFilterBaseModel):
    """Filter by `FlowRun.flow_version`."""

    any_: List[str] = Field(
        None, description="A list of flow run flow_versions to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.flow_version.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow run flow_versions to include

FlowRunFilterId pydantic-model

Filter by FlowRun.id.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterId(PrefectFilterBaseModel):
    """Filter by FlowRun.id."""

    any_: List[UUID] = Field(None, description="A list of flow run ids to include")
    not_any_: List[UUID] = Field(None, description="A list of flow run ids to exclude")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.id.in_(self.any_))
        if self.not_any_ is not None:
            filters.append(db.FlowRun.id.not_in(self.not_any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run ids to include

not_any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run ids to exclude

FlowRunFilterName pydantic-model

Filter by FlowRun.name.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterName(PrefectFilterBaseModel):
    """Filter by `FlowRun.name`."""

    any_: List[str] = Field(
        None,
        description="A list of flow run names to include",
        example=["my-flow-run-1", "my-flow-run-2"],
    )

    like_: str = Field(
        None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.name.in_(self.any_))
        if self.like_ is not None:
            filters.append(db.FlowRun.name.ilike(f"%{self.like_}%"))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow run names to include

like_ pydantic-field

Type: str

A case-insensitive partial match. For example, passing 'marvin' will match 'marvin', 'sad-Marvin', and 'marvin-robot'.

FlowRunFilterNextScheduledStartTime pydantic-model

Filter by FlowRun.next_scheduled_start_time.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterNextScheduledStartTime(PrefectFilterBaseModel):
    """Filter by `FlowRun.next_scheduled_start_time`."""

    before_: DateTimeTZ = Field(
        None,
        description="Only include flow runs with a next_scheduled_start_time or before this time",
    )
    after_: DateTimeTZ = Field(
        None,
        description="Only include flow runs with a next_scheduled_start_time at or after this time",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.FlowRun.next_scheduled_start_time <= self.before_)
        if self.after_ is not None:
            filters.append(db.FlowRun.next_scheduled_start_time >= self.after_)
        return filters

after_ pydantic-field

Type: DateTimeTZ

Only include flow runs with a next_scheduled_start_time at or after this time

before_ pydantic-field

Type: DateTimeTZ

Only include flow runs with a next_scheduled_start_time or before this time

FlowRunFilterParentTaskRunId pydantic-model

Filter by FlowRun.parent_task_run_id.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterParentTaskRunId(PrefectOperatorFilterBaseModel):
    """Filter by `FlowRun.parent_task_run_id`."""

    any_: List[UUID] = Field(
        None, description="A list of flow run parent_task_run_ids to include"
    )
    is_null_: bool = Field(
        None, description="If true, only include flow runs without parent_task_run_id"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.parent_task_run_id.in_(self.any_))
        if self.is_null_ is not None:
            filters.append(
                db.FlowRun.parent_task_run_id == None
                if self.is_null_
                else db.FlowRun.parent_task_run_id != None
            )
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run parent_task_run_ids to include

is_null_ pydantic-field

Type: bool

If true, only include flow runs without parent_task_run_id

FlowRunFilterStartTime pydantic-model

Filter by FlowRun.start_time.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterStartTime(PrefectFilterBaseModel):
    """Filter by `FlowRun.start_time`."""

    before_: DateTimeTZ = Field(
        None, description="Only include flow runs starting at or before this time"
    )
    after_: DateTimeTZ = Field(
        None, description="Only include flow runs starting at or after this time"
    )
    is_null_: bool = Field(
        None, description="If true, only return flow runs without a start time"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.FlowRun.start_time <= self.before_)
        if self.after_ is not None:
            filters.append(db.FlowRun.start_time >= self.after_)
        if self.is_null_ is not None:
            filters.append(
                db.FlowRun.start_time == None
                if self.is_null_
                else db.FlowRun.start_time != None
            )
        return filters

after_ pydantic-field

Type: DateTimeTZ

Only include flow runs starting at or after this time

before_ pydantic-field

Type: DateTimeTZ

Only include flow runs starting at or before this time

is_null_ pydantic-field

Type: bool

If true, only return flow runs without a start time

FlowRunFilterStateName pydantic-model

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterStateName(PrefectFilterBaseModel):
    any_: List[str] = Field(
        None, description="A list of flow run state names to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.state_name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow run state names to include

FlowRunFilterStateType pydantic-model

Filter by FlowRun.state_type.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterStateType(PrefectFilterBaseModel):
    """Filter by `FlowRun.state_type`."""

    any_: List[schemas.states.StateType] = Field(
        None, description="A list of flow run state types to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.state_type.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[prefect.orion.schemas.states.StateType]

A list of flow run state types to include

FlowRunFilterTags pydantic-model

Filter by FlowRun.tags.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterTags(PrefectOperatorFilterBaseModel):
    """Filter by `FlowRun.tags`."""

    all_: List[str] = Field(
        None,
        example=["tag-1", "tag-2"],
        description="A list of tags. Flow runs will be returned only if their tags are a superset of the list",
    )
    is_null_: bool = Field(
        None, description="If true, only include flow runs without tags"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.FlowRun.tags, self.all_))
        if self.is_null_ is not None:
            filters.append(
                db.FlowRun.tags == [] if self.is_null_ else db.FlowRun.tags != []
            )
        return filters

all_ pydantic-field

Type: List[str]

A list of tags. Flow runs will be returned only if their tags are a superset of the list

is_null_ pydantic-field

Type: bool

If true, only include flow runs without tags

FlowRunNotificationPolicyFilter pydantic-model

Filter FlowRunNotificationPolicies.

Source code in prefect/orion/schemas/filters.py
class FlowRunNotificationPolicyFilter(PrefectFilterBaseModel):
    """Filter FlowRunNotificationPolicies."""

    is_active: Optional[FlowRunNotificationPolicyFilterIsActive] = Field(
        FlowRunNotificationPolicyFilterIsActive(eq_=False),
        description=("Filter criteria for `FlowRunNotificationPolicy.is_active`. "),
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.is_active is not None:
            filters.append(self.is_active.as_sql_filter(db))

        return filters

is_active pydantic-field

Type: FlowRunNotificationPolicyFilterIsActive

Filter criteria for FlowRunNotificationPolicy.is_active.

FlowRunNotificationPolicyFilterIsActive pydantic-model

Filter by FlowRunNotificationPolicy.is_active.

Source code in prefect/orion/schemas/filters.py
class FlowRunNotificationPolicyFilterIsActive(PrefectFilterBaseModel):
    """Filter by `FlowRunNotificationPolicy.is_active`."""

    eq_: bool = Field(
        None,
        description="Filter notification policies for only those that are or are not active.",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.eq_ is not None:
            filters.append(db.FlowRunNotificationPolicy.is_active.is_(self.eq_))
        return filters

eq_ pydantic-field

Type: bool

Filter notification policies for only those that are or are not active.

LogFilter pydantic-model

Filter logs. Only logs matching all criteria will be returned

Source code in prefect/orion/schemas/filters.py
class LogFilter(PrefectOperatorFilterBaseModel):
    """Filter logs. Only logs matching all criteria will be returned"""

    level: Optional[LogFilterLevel] = Field(
        None, description="Filter criteria for `Log.level`"
    )
    timestamp: Optional[LogFilterTimestamp] = Field(
        None, description="Filter criteria for `Log.timestamp`"
    )
    flow_run_id: Optional[LogFilterFlowRunId] = Field(
        None, description="Filter criteria for `Log.flow_run_id`"
    )
    task_run_id: Optional[LogFilterTaskRunId] = Field(
        None, description="Filter criteria for `Log.task_run_id`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.level is not None:
            filters.append(self.level.as_sql_filter(db))
        if self.timestamp is not None:
            filters.append(self.timestamp.as_sql_filter(db))
        if self.flow_run_id is not None:
            filters.append(self.flow_run_id.as_sql_filter(db))
        if self.task_run_id is not None:
            filters.append(self.task_run_id.as_sql_filter(db))

        return filters

flow_run_id pydantic-field

Type: LogFilterFlowRunId

Filter criteria for Log.flow_run_id

level pydantic-field

Type: LogFilterLevel

Filter criteria for Log.level

task_run_id pydantic-field

Type: LogFilterTaskRunId

Filter criteria for Log.task_run_id

timestamp pydantic-field

Type: LogFilterTimestamp

Filter criteria for Log.timestamp

LogFilterFlowRunId pydantic-model

Filter by Log.flow_run_id.

Source code in prefect/orion/schemas/filters.py
class LogFilterFlowRunId(PrefectFilterBaseModel):
    """Filter by `Log.flow_run_id`."""

    any_: List[UUID] = Field(None, description="A list of flow run IDs to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Log.flow_run_id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run IDs to include

LogFilterLevel pydantic-model

Filter by Log.level.

Source code in prefect/orion/schemas/filters.py
class LogFilterLevel(PrefectFilterBaseModel):
    """Filter by `Log.level`."""

    ge_: int = Field(
        None,
        description="Include logs with a level greater than or equal to this level",
        example=20,
    )

    le_: int = Field(
        None,
        description="Include logs with a level less than or equal to this level",
        example=50,
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.ge_ is not None:
            filters.append(db.Log.level >= self.ge_)
        if self.le_ is not None:
            filters.append(db.Log.level <= self.le_)
        return filters

ge_ pydantic-field

Type: int

Include logs with a level greater than or equal to this level

le_ pydantic-field

Type: int

Include logs with a level less than or equal to this level

LogFilterName pydantic-model

Filter by Log.name.

Source code in prefect/orion/schemas/filters.py
class LogFilterName(PrefectFilterBaseModel):
    """Filter by `Log.name`."""

    any_: List[str] = Field(
        None,
        description="A list of log names to include",
        example=["prefect.logger.flow_runs", "prefect.logger.task_runs"],
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Log.name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of log names to include

LogFilterTaskRunId pydantic-model

Filter by Log.task_run_id.

Source code in prefect/orion/schemas/filters.py
class LogFilterTaskRunId(PrefectFilterBaseModel):
    """Filter by `Log.task_run_id`."""

    any_: List[UUID] = Field(None, description="A list of task run IDs to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Log.task_run_id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of task run IDs to include

LogFilterTimestamp pydantic-model

Filter by Log.timestamp.

Source code in prefect/orion/schemas/filters.py
class LogFilterTimestamp(PrefectFilterBaseModel):
    """Filter by `Log.timestamp`."""

    before_: DateTimeTZ = Field(
        None, description="Only include logs with a timestamp at or before this time"
    )
    after_: DateTimeTZ = Field(
        None, description="Only include logs with a timestamp at or after this time"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.Log.timestamp <= self.before_)
        if self.after_ is not None:
            filters.append(db.Log.timestamp >= self.after_)
        return filters

after_ pydantic-field

Type: DateTimeTZ

Only include logs with a timestamp at or after this time

before_ pydantic-field

Type: DateTimeTZ

Only include logs with a timestamp at or before this time

Operator

Operators for combining filter criteria.

Source code in prefect/orion/schemas/filters.py
class Operator(AutoEnum):
    """Operators for combining filter criteria."""

    and_ = AutoEnum.auto()
    or_ = AutoEnum.auto()

PrefectFilterBaseModel pydantic-model

Base model for Prefect filters

Source code in prefect/orion/schemas/filters.py
class PrefectFilterBaseModel(PrefectBaseModel):
    """Base model for Prefect filters"""

    class Config:
        extra = "forbid"

    def as_sql_filter(self, db: "OrionDBInterface") -> BooleanClauseList:
        """Generate SQL filter from provided filter parameters. If no filters parameters are available, return a TRUE filter."""
        filters = self._get_filter_list(db)
        if not filters:
            return True
        return sa.and_(*filters)

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        """Return a list of boolean filter statements based on filter parameters"""
        raise NotImplementedError("_get_filter_list must be implemented")

PrefectOperatorFilterBaseModel pydantic-model

Base model for Prefect filters that combines criteria with a user-provided operator

Source code in prefect/orion/schemas/filters.py
class PrefectOperatorFilterBaseModel(PrefectFilterBaseModel):
    """Base model for Prefect filters that combines criteria with a user-provided operator"""

    operator: Operator = Field(
        default=Operator.and_,
        description="Operator for combining filter criteria. Defaults to 'and_'.",
    )

    def as_sql_filter(self, db: "OrionDBInterface") -> BooleanClauseList:
        filters = self._get_filter_list(db)
        if not filters:
            return True
        return sa.and_(*filters) if self.operator == Operator.and_ else sa.or_(*filters)

operator pydantic-field

Type: Operator

Operator for combining filter criteria. Defaults to 'and_'.

TaskRunFilter pydantic-model

Filter task runs. Only task runs matching all criteria will be returned

Source code in prefect/orion/schemas/filters.py
class TaskRunFilter(PrefectOperatorFilterBaseModel):
    """Filter task runs. Only task runs matching all criteria will be returned"""

    id: Optional[TaskRunFilterId] = Field(
        None, description="Filter criteria for `TaskRun.id`"
    )
    name: Optional[TaskRunFilterName] = Field(
        None, description="Filter criteria for `TaskRun.name`"
    )
    tags: Optional[TaskRunFilterTags] = Field(
        None, description="Filter criteria for `TaskRun.tags`"
    )
    state: Optional[TaskRunFilterState] = Field(
        None, description="Filter criteria for `TaskRun.state`"
    )
    start_time: Optional[TaskRunFilterStartTime] = Field(
        None, description="Filter criteria for `TaskRun.start_time`"
    )
    subflow_runs: Optional[TaskRunFilterSubFlowRuns] = Field(
        None, description="Filter criteria for `TaskRun.subflow_run`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))
        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))
        if self.tags is not None:
            filters.append(self.tags.as_sql_filter(db))
        if self.state is not None:
            filters.append(self.state.as_sql_filter(db))
        if self.start_time is not None:
            filters.append(self.start_time.as_sql_filter(db))
        if self.subflow_runs is not None:
            filters.append(self.subflow_runs.as_sql_filter(db))

        return filters

id pydantic-field

Type: TaskRunFilterId

Filter criteria for TaskRun.id

name pydantic-field

Type: TaskRunFilterName

Filter criteria for TaskRun.name

start_time pydantic-field

Type: TaskRunFilterStartTime

Filter criteria for TaskRun.start_time

state pydantic-field

Type: TaskRunFilterState

Filter criteria for TaskRun.state

subflow_runs pydantic-field

Type: TaskRunFilterSubFlowRuns

Filter criteria for TaskRun.subflow_run

tags pydantic-field

Type: TaskRunFilterTags

Filter criteria for TaskRun.tags

TaskRunFilterId pydantic-model

Filter by TaskRun.id.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterId(PrefectFilterBaseModel):
    """Filter by `TaskRun.id`."""

    any_: List[UUID] = Field(None, description="A list of task run ids to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.TaskRun.id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of task run ids to include

TaskRunFilterName pydantic-model

Filter by TaskRun.name.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterName(PrefectFilterBaseModel):
    """Filter by `TaskRun.name`."""

    any_: List[str] = Field(
        None,
        description="A list of task run names to include",
        example=["my-task-run-1", "my-task-run-2"],
    )

    like_: str = Field(
        None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        example="marvin",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.TaskRun.name.in_(self.any_))
        if self.like_ is not None:
            filters.append(db.TaskRun.name.ilike(f"%{self.like_}%"))
        return filters

any_ pydantic-field

Type: List[str]

A list of task run names to include

like_ pydantic-field

Type: str

A case-insensitive partial match. For example, passing 'marvin' will match 'marvin', 'sad-Marvin', and 'marvin-robot'.

TaskRunFilterStartTime pydantic-model

Filter by TaskRun.start_time.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterStartTime(PrefectFilterBaseModel):
    """Filter by `TaskRun.start_time`."""

    before_: DateTimeTZ = Field(
        None, description="Only include task runs starting at or before this time"
    )
    after_: DateTimeTZ = Field(
        None, description="Only include task runs starting at or after this time"
    )
    is_null_: bool = Field(
        None, description="If true, only return task runs without a start time"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.TaskRun.start_time <= self.before_)
        if self.after_ is not None:
            filters.append(db.TaskRun.start_time >= self.after_)
        if self.is_null_ is not None:
            filters.append(
                db.TaskRun.start_time == None
                if self.is_null_
                else db.TaskRun.start_time != None
            )
        return filters

after_ pydantic-field

Type: DateTimeTZ

Only include task runs starting at or after this time

before_ pydantic-field

Type: DateTimeTZ

Only include task runs starting at or before this time

is_null_ pydantic-field

Type: bool

If true, only return task runs without a start time

TaskRunFilterStateName pydantic-model

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterStateName(PrefectFilterBaseModel):
    any_: List[str] = Field(
        None, description="A list of task run state names to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.TaskRun.state_name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of task run state names to include

TaskRunFilterStateType pydantic-model

Filter by TaskRun.state_type.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterStateType(PrefectFilterBaseModel):
    """Filter by `TaskRun.state_type`."""

    any_: List[schemas.states.StateType] = Field(
        None, description="A list of task run state types to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.TaskRun.state_type.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[prefect.orion.schemas.states.StateType]

A list of task run state types to include

TaskRunFilterSubFlowRuns pydantic-model

Filter by TaskRun.subflow_run.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterSubFlowRuns(PrefectFilterBaseModel):
    """Filter by `TaskRun.subflow_run`."""

    exists_: bool = Field(
        None,
        description="If true, only include task runs that are subflow run parents; if false, exclude parent task runs",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.exists_ is True:
            filters.append(db.TaskRun.subflow_run.has())
        elif self.exists_ is False:
            filters.append(sa.not_(db.TaskRun.subflow_run.has()))
        return filters

exists_ pydantic-field

Type: bool

If true, only include task runs that are subflow run parents; if false, exclude parent task runs

TaskRunFilterTags pydantic-model

Filter by TaskRun.tags.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterTags(PrefectOperatorFilterBaseModel):
    """Filter by `TaskRun.tags`."""

    all_: List[str] = Field(
        None,
        example=["tag-1", "tag-2"],
        description="A list of tags. Task runs will be returned only if their tags are a superset of the list",
    )
    is_null_: bool = Field(
        None, description="If true, only include task runs without tags"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.TaskRun.tags, self.all_))
        if self.is_null_ is not None:
            filters.append(
                db.TaskRun.tags == [] if self.is_null_ else db.TaskRun.tags != []
            )
        return filters

all_ pydantic-field

Type: List[str]

A list of tags. Task runs will be returned only if their tags are a superset of the list

is_null_ pydantic-field

Type: bool

If true, only include task runs without tags