prefect.server.database.query_components

Classes

FlowRunGraphV2Node

BaseQueryComponents

Abstract base class used to inject dialect-specific SQL operations into Prefect. Methods:

build_json_object

build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]
builds a JSON object from sequential key-value pairs

cast_to_json

cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]
casts to JSON object if necessary

clear_configuration_value_cache_for_key

clear_configuration_value_cache_for_key(self, key: str) -> None
Removes a configuration key from the cache.

flow_run_graph_v2

flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph
Returns the query that selects all of the nodes and edges for a flow run graph (version 2).

get_scheduled_flow_runs_from_work_pool

get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]

get_scheduled_flow_runs_from_work_queues

get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]
Returns all scheduled runs in work queues, subject to provided parameters. This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling result.all() will return both; calling result.scalars().unique().all() will return only the flow run because it grabs the first result.

insert

insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]
dialect-specific insert statement

json_arr_agg

json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]
aggregates a JSON array

make_timestamp_intervals

make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

read_configuration_value

read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]
Read a configuration value by key. Configuration values should not be changed at run time, so retrieved values are cached in memory. The main use of configurations is encrypting blocks, this speeds up nested block document queries.

set_state_id_on_inserted_flow_runs_statement

set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

unique_key

unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.

uses_json_strings

uses_json_strings(self) -> bool
specifies whether the configured dialect returns JSON as strings

AsyncPostgresQueryComponents

Methods:

build_json_object

build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

build_json_object

build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]
builds a JSON object from sequential key-value pairs

cast_to_json

cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

cast_to_json

cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]
casts to JSON object if necessary

clear_configuration_value_cache_for_key

clear_configuration_value_cache_for_key(self, key: str) -> None
Removes a configuration key from the cache.

flow_run_graph_v2

flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph
Returns the query that selects all of the nodes and edges for a flow run graph (version 2).

get_scheduled_flow_runs_from_work_pool

get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]

get_scheduled_flow_runs_from_work_queues

get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]
Returns all scheduled runs in work queues, subject to provided parameters. This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling result.all() will return both; calling result.scalars().unique().all() will return only the flow run because it grabs the first result.

insert

insert(self, obj: type[orm_models.Base]) -> postgresql.Insert

insert

insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]
dialect-specific insert statement

json_arr_agg

json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

json_arr_agg

json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]
aggregates a JSON array

make_timestamp_intervals

make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

make_timestamp_intervals

make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

read_configuration_value

read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]
Read a configuration value by key. Configuration values should not be changed at run time, so retrieved values are cached in memory. The main use of configurations is encrypting blocks, this speeds up nested block document queries.

set_state_id_on_inserted_flow_runs_statement

set_state_id_on_inserted_flow_runs_statement(self, db: PrefectDBInterface, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update
Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs

set_state_id_on_inserted_flow_runs_statement

set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

unique_key

unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.

uses_json_strings

uses_json_strings(self) -> bool

uses_json_strings

uses_json_strings(self) -> bool
specifies whether the configured dialect returns JSON as strings

UUIDList

Map a JSON list of strings back to a list of UUIDs at the result loading stage Methods:

process_result_value

process_result_value(self, value: Optional[list[Union[str, UUID]]], dialect: sa.Dialect) -> Optional[list[UUID]]

AioSqliteQueryComponents

Methods:

build_json_object

build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

build_json_object

build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]
builds a JSON object from sequential key-value pairs

cast_to_json

cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

cast_to_json

cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]
casts to JSON object if necessary

clear_configuration_value_cache_for_key

clear_configuration_value_cache_for_key(self, key: str) -> None
Removes a configuration key from the cache.

flow_run_graph_v2

flow_run_graph_v2(self, db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: DateTime, max_nodes: int, max_artifacts: int) -> Graph
Returns the query that selects all of the nodes and edges for a flow run graph (version 2).

get_scheduled_flow_runs_from_work_pool

get_scheduled_flow_runs_from_work_pool(self, db: PrefectDBInterface, session: AsyncSession, limit: Optional[int] = None, worker_limit: Optional[int] = None, queue_limit: Optional[int] = None, work_pool_ids: Optional[list[UUID]] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None, scheduled_after: Optional[DateTime] = None, respect_queue_priorities: bool = False) -> list[schemas.responses.WorkerFlowRunResponse]

get_scheduled_flow_runs_from_work_queues

get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]
Returns all scheduled runs in work queues, subject to provided parameters. This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling result.all() will return both; calling result.scalars().unique().all() will return only the flow run because it grabs the first result.

insert

insert(self, obj: type[orm_models.Base]) -> sqlite.Insert

insert

insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]
dialect-specific insert statement

json_arr_agg

json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

json_arr_agg

json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]
aggregates a JSON array

make_timestamp_intervals

make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

make_timestamp_intervals

make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

read_configuration_value

read_configuration_value(self, db: PrefectDBInterface, session: AsyncSession, key: str) -> Optional[dict[str, Any]]
Read a configuration value by key. Configuration values should not be changed at run time, so retrieved values are cached in memory. The main use of configurations is encrypting blocks, this speeds up nested block document queries.

set_state_id_on_inserted_flow_runs_statement

set_state_id_on_inserted_flow_runs_statement(self, db: PrefectDBInterface, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update
Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs

set_state_id_on_inserted_flow_runs_statement

set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

unique_key

unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.

uses_json_strings

uses_json_strings(self) -> bool

uses_json_strings

uses_json_strings(self) -> bool
specifies whether the configured dialect returns JSON as strings