prefect.orion.models.deployments
¶
Functions for interacting with deployment ORM objects. Intended for internal use by the Orion API.
check_work_queues_for_deployment
async
¶
Get work queues that can pick up the specified deployment.
Work queues will pick up a deployment when all of the following are met: - the deployment has ALL tags that the work queue has (i.e. the work queue's tags must be a subset of the deployment's tags.) - the work queue's specified deployment IDs match the deployment's ID, or the work queue does NOT have specified deployment IDs - the work queue's specified flow runners match the deployment's flow runner or the work queue does NOT have a specified flow runner
Notes on the query: - our database currently allows either "null" and empty lists as null values in filters, so we need to catch both cases with "or". - json_contains(A, B) should be interepreted as "True if A contains B".
Returns:
Type | Description |
---|---|
List[db.WorkQueue] |
WorkQueues |
Source code in prefect/orion/models/deployments.py
@inject_db
async def check_work_queues_for_deployment(
db: OrionDBInterface, session: sa.orm.Session, deployment_id: UUID
) -> List[schemas.core.WorkQueue]:
"""
Get work queues that can pick up the specified deployment.
Work queues will pick up a deployment when all of the following are met:
- the deployment has ALL tags that the work queue has (i.e. the work
queue's tags must be a subset of the deployment's tags.)
- the work queue's specified deployment IDs match the deployment's ID,
or the work queue does NOT have specified deployment IDs
- the work queue's specified flow runners match the deployment's flow
runner or the work queue does NOT have a specified flow runner
Notes on the query:
- our database currently allows either "null" and empty lists as
null values in filters, so we need to catch both cases with "or".
- json_contains(A, B) should be interepreted as "True if A
contains B".
Returns:
List[db.WorkQueue]: WorkQueues
"""
deployment = await session.get(db.Deployment, deployment_id)
if not deployment:
raise ObjectNotFoundError(f"Deployment with id {deployment_id} not found")
query = (
select(db.WorkQueue)
# work queue tags are a subset of deployment tags
.filter(
or_(
json_contains(deployment.tags, db.WorkQueue.filter["tags"]),
json_contains([], db.WorkQueue.filter["tags"]),
json_contains(None, db.WorkQueue.filter["tags"]),
)
)
# deployment_ids is null or contains the deployment's ID
.filter(
or_(
json_contains(
db.WorkQueue.filter["deployment_ids"],
str(deployment.id),
),
json_contains(None, db.WorkQueue.filter["deployment_ids"]),
json_contains([], db.WorkQueue.filter["deployment_ids"]),
)
)
)
result = await session.execute(query)
return result.scalars().unique().all()
count_deployments
async
¶
Count deployments.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
flow_filter |
only count deployments whose flows match these criteria FlowFilter |
None |
flow_run_filter |
only count deployments whose flow runs match these criteria FlowRunFilter |
None |
task_run_filter |
only count deployments whose task runs match these criteria TaskRunFilter |
None |
deployment_filter |
only count deployment that match these filters DeploymentFilter |
None |
Returns:
Type | Description |
---|---|
int |
the number of deployments matching filters |
Source code in prefect/orion/models/deployments.py
@inject_db
async def count_deployments(
session: sa.orm.Session,
db: OrionDBInterface,
flow_filter: schemas.filters.FlowFilter = None,
flow_run_filter: schemas.filters.FlowRunFilter = None,
task_run_filter: schemas.filters.TaskRunFilter = None,
deployment_filter: schemas.filters.DeploymentFilter = None,
) -> int:
"""
Count deployments.
Args:
session: A database session
flow_filter: only count deployments whose flows match these criteria
flow_run_filter: only count deployments whose flow runs match these criteria
task_run_filter: only count deployments whose task runs match these criteria
deployment_filter: only count deployment that match these filters
Returns:
int: the number of deployments matching filters
"""
query = select(sa.func.count(sa.text("*"))).select_from(db.Deployment)
query = await _apply_deployment_filters(
query=query,
flow_filter=flow_filter,
flow_run_filter=flow_run_filter,
task_run_filter=task_run_filter,
deployment_filter=deployment_filter,
db=db,
)
result = await session.execute(query)
return result.scalar()
create_deployment
async
¶
Upserts a deployment.
Parameters:
Name | Description | Default |
---|---|---|
session |
a database session Session |
required |
deployment |
a deployment model Deployment |
required |
Returns:
Type | Description |
---|---|
db.Deployment |
the newly-created or updated deployment |
Source code in prefect/orion/models/deployments.py
@inject_db
async def create_deployment(
session: sa.orm.Session, deployment: schemas.core.Deployment, db: OrionDBInterface
):
"""Upserts a deployment.
Args:
session: a database session
deployment: a deployment model
Returns:
db.Deployment: the newly-created or updated deployment
"""
# set `updated` manually
# known limitation of `on_conflict_do_update`, will not use `Column.onupdate`
# https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#the-set-clause
deployment.updated = pendulum.now("UTC")
insert_values = deployment.dict(shallow=True, exclude_unset=True)
insert_stmt = (
(await db.insert(db.Deployment))
.values(**insert_values)
.on_conflict_do_update(
index_elements=db.deployment_unique_upsert_columns,
set_={
**deployment.dict(
shallow=True,
include={
"schedule",
"is_schedule_active",
"description",
"tags",
"parameters",
"updated",
"storage_document_id",
"infrastructure_document_id",
"manifest_path",
"path",
"entrypoint",
"infra_overrides",
},
),
},
)
)
await session.execute(insert_stmt)
query = (
sa.select(db.Deployment)
.where(
sa.and_(
db.Deployment.flow_id == deployment.flow_id,
db.Deployment.name == deployment.name,
)
)
.execution_options(populate_existing=True)
)
result = await session.execute(query)
model = result.scalar()
# because this could upsert a different schedule, delete any runs from the old
# deployment
await _delete_auto_scheduled_runs(session=session, deployment_id=model.id, db=db)
return model
delete_deployment
async
¶
Delete a deployment by id.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
deployment_id |
a deployment id UUID |
required |
Returns:
Type | Description |
---|---|
bool |
whether or not the deployment was deleted |
Source code in prefect/orion/models/deployments.py
@inject_db
async def delete_deployment(
session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
) -> bool:
"""
Delete a deployment by id.
Args:
session: A database session
deployment_id: a deployment id
Returns:
bool: whether or not the deployment was deleted
"""
# delete scheduled runs, both auto- and user- created.
delete_query = sa.delete(db.FlowRun).where(
db.FlowRun.deployment_id == deployment_id,
db.FlowRun.state_type == schemas.states.StateType.SCHEDULED.value,
)
await session.execute(delete_query)
result = await session.execute(
delete(db.Deployment).where(db.Deployment.id == deployment_id)
)
return result.rowcount > 0
read_deployment
async
¶
Reads a deployment by id.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
deployment_id |
a deployment id UUID |
required |
Returns:
Type | Description |
---|---|
db.Deployment |
the deployment |
Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment(
session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
):
"""Reads a deployment by id.
Args:
session: A database session
deployment_id: a deployment id
Returns:
db.Deployment: the deployment
"""
return await session.get(db.Deployment, deployment_id)
read_deployment_by_name
async
¶
Reads a deployment by name.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
name |
a deployment name str |
required |
flow_name |
the name of the flow the deployment belongs to str |
required |
Returns:
Type | Description |
---|---|
db.Deployment |
the deployment |
Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment_by_name(
session: sa.orm.Session, name: str, flow_name: str, db: OrionDBInterface
):
"""Reads a deployment by name.
Args:
session: A database session
name: a deployment name
flow_name: the name of the flow the deployment belongs to
Returns:
db.Deployment: the deployment
"""
result = await session.execute(
select(db.Deployment)
.join(db.Flow, db.Deployment.flow_id == db.Flow.id)
.where(
sa.and_(
db.Flow.name == flow_name,
db.Deployment.name == name,
)
)
.limit(1)
)
return result.scalar()
read_deployments
async
¶
Read deployments.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
offset |
Query offset int |
None |
limit |
Query limit int |
None |
flow_filter |
only select deployments whose flows match these criteria FlowFilter |
None |
flow_run_filter |
only select deployments whose flow runs match these criteria FlowRunFilter |
None |
task_run_filter |
only select deployments whose task runs match these criteria TaskRunFilter |
None |
deployment_filter |
only select deployment that match these filters DeploymentFilter |
None |
Returns:
Type | Description |
---|---|
List[db.Deployment] |
deployments |
Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployments(
session: sa.orm.Session,
db: OrionDBInterface,
offset: int = None,
limit: int = None,
flow_filter: schemas.filters.FlowFilter = None,
flow_run_filter: schemas.filters.FlowRunFilter = None,
task_run_filter: schemas.filters.TaskRunFilter = None,
deployment_filter: schemas.filters.DeploymentFilter = None,
):
"""
Read deployments.
Args:
session: A database session
offset: Query offset
limit: Query limit
flow_filter: only select deployments whose flows match these criteria
flow_run_filter: only select deployments whose flow runs match these criteria
task_run_filter: only select deployments whose task runs match these criteria
deployment_filter: only select deployment that match these filters
Returns:
List[db.Deployment]: deployments
"""
query = select(db.Deployment).order_by(db.Deployment.name)
query = await _apply_deployment_filters(
query=query,
flow_filter=flow_filter,
flow_run_filter=flow_run_filter,
task_run_filter=task_run_filter,
deployment_filter=deployment_filter,
db=db,
)
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
result = await session.execute(query)
return result.scalars().unique().all()
schedule_runs
async
¶
Schedule flow runs for a deployment
Parameters:
Name | Description | Default |
---|---|---|
session |
a database session Session |
required |
deployment_id |
the id of the deployment to schedule UUID |
required |
start_time |
the time from which to start scheduling runs datetime |
None |
end_time |
a limit on how far in the future runs will be scheduled datetime |
None |
max_runs |
a maximum amount of runs to schedule int |
None |
Returns:
Type | Description |
---|---|
List[uuid.UUID] |
a list of flow run ids scheduled for the deployment |
Source code in prefect/orion/models/deployments.py
async def schedule_runs(
session: sa.orm.Session,
deployment_id: UUID,
start_time: datetime.datetime = None,
end_time: datetime.datetime = None,
max_runs: int = None,
) -> List[UUID]:
"""
Schedule flow runs for a deployment
Args:
session: a database session
deployment_id: the id of the deployment to schedule
start_time: the time from which to start scheduling runs
end_time: a limit on how far in the future runs will be scheduled
max_runs: a maximum amount of runs to schedule
Returns:
a list of flow run ids scheduled for the deployment
"""
if max_runs is None:
max_runs = PREFECT_ORION_SERVICES_SCHEDULER_MAX_RUNS.value()
if start_time is None:
start_time = pendulum.now("UTC")
start_time = pendulum.instance(start_time)
if end_time is None:
end_time = start_time + (
PREFECT_ORION_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value()
)
end_time = pendulum.instance(end_time)
runs = await _generate_scheduled_flow_runs(
session=session,
deployment_id=deployment_id,
start_time=start_time,
end_time=end_time,
max_runs=max_runs,
)
return await _insert_scheduled_flow_runs(session=session, runs=runs)
update_deployment
async
¶
Updates a deployment.
Parameters:
Name | Description | Default |
---|---|---|
session |
a database session Session |
required |
deployment_id |
the ID of the deployment to modify UUID |
required |
deployment |
changes to a deployment model DeploymentUpdate |
required |
Returns:
Type | Description |
---|---|
bool |
whether the deployment was updated |
Source code in prefect/orion/models/deployments.py
@inject_db
async def update_deployment(
session: sa.orm.Session,
deployment_id: UUID,
deployment: schemas.actions.DeploymentUpdate,
db: OrionDBInterface,
) -> bool:
"""Updates a deployment.
Args:
session: a database session
deployment_id: the ID of the deployment to modify
deployment: changes to a deployment model
Returns:
bool: whether the deployment was updated
"""
# exclude_unset=True allows us to only update values provided by
# the user, ignoring any defaults on the model
update_data = deployment.dict(shallow=True, exclude_unset=True)
update_stmt = (
sa.update(db.Deployment)
.where(db.Deployment.id == deployment_id)
.values(**update_data)
)
result = await session.execute(update_stmt)
# delete any auto scheduled runs that would have reflected the old deployment config
await _delete_auto_scheduled_runs(
session=session, deployment_id=deployment_id, db=db
)
return result.rowcount > 0