Skip to content

prefect.orion.models.deployments

Functions for interacting with deployment ORM objects. Intended for internal use by the Orion API.

check_work_queues_for_deployment async

Get work queues that can pick up the specified deployment.

Work queues will pick up a deployment when all of the following are met: - the deployment has ALL tags that the work queue has (i.e. the work queue's tags must be a subset of the deployment's tags.) - the work queue's specified deployment IDs match the deployment's ID, or the work queue does NOT have specified deployment IDs - the work queue's specified flow runners match the deployment's flow runner or the work queue does NOT have a specified flow runner

Notes on the query: - our database currently allows either "null" and empty lists as null values in filters, so we need to catch both cases with "or". - json_contains(A, B) should be interepreted as "True if A contains B".

Returns:

Type Description
List[db.WorkQueue]

WorkQueues

Source code in prefect/orion/models/deployments.py
@inject_db
async def check_work_queues_for_deployment(
    db: OrionDBInterface, session: sa.orm.Session, deployment_id: UUID
) -> List[schemas.core.WorkQueue]:
    """
    Get work queues that can pick up the specified deployment.

    Work queues will pick up a deployment when all of the following are met:
    - the deployment has ALL tags that the work queue has (i.e. the work
    queue's tags must be a subset of the deployment's tags.)
    - the work queue's specified deployment IDs match the deployment's ID,
    or the work queue does NOT have specified deployment IDs
    - the work queue's specified flow runners match the deployment's flow
    runner or the work queue does NOT have a specified flow runner

    Notes on the query:
    - our database currently allows either "null" and empty lists as
    null values in filters, so we need to catch both cases with "or".
    - json_contains(A, B) should be interepreted as "True if A
    contains B".

    Returns:
        List[db.WorkQueue]: WorkQueues
    """
    deployment = await session.get(db.Deployment, deployment_id)
    if not deployment:
        raise ObjectNotFoundError(f"Deployment with id {deployment_id} not found")

    query = (
        select(db.WorkQueue)
        # work queue tags are a subset of deployment tags
        .filter(
            or_(
                json_contains(deployment.tags, db.WorkQueue.filter["tags"]),
                json_contains([], db.WorkQueue.filter["tags"]),
                json_contains(None, db.WorkQueue.filter["tags"]),
            )
        )
        # deployment_ids is null or contains the deployment's ID
        .filter(
            or_(
                json_contains(
                    db.WorkQueue.filter["deployment_ids"],
                    str(deployment.id),
                ),
                json_contains(None, db.WorkQueue.filter["deployment_ids"]),
                json_contains([], db.WorkQueue.filter["deployment_ids"]),
            )
        )
    )

    result = await session.execute(query)
    return result.scalars().unique().all()

count_deployments async

Count deployments.

Parameters:

Name Description Default
session

A database session

Session
required
flow_filter

only count deployments whose flows match these criteria

FlowFilter
None
flow_run_filter

only count deployments whose flow runs match these criteria

FlowRunFilter
None
task_run_filter

only count deployments whose task runs match these criteria

TaskRunFilter
None
deployment_filter

only count deployment that match these filters

DeploymentFilter
None

Returns:

Type Description
int

the number of deployments matching filters

Source code in prefect/orion/models/deployments.py
@inject_db
async def count_deployments(
    session: sa.orm.Session,
    db: OrionDBInterface,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
) -> int:
    """
    Count deployments.

    Args:
        session: A database session
        flow_filter: only count deployments whose flows match these criteria
        flow_run_filter: only count deployments whose flow runs match these criteria
        task_run_filter: only count deployments whose task runs match these criteria
        deployment_filter: only count deployment that match these filters

    Returns:
        int: the number of deployments matching filters
    """

    query = select(sa.func.count(sa.text("*"))).select_from(db.Deployment)

    query = await _apply_deployment_filters(
        query=query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    result = await session.execute(query)
    return result.scalar()

create_deployment async

Upserts a deployment.

Parameters:

Name Description Default
session

a database session

Session
required
deployment

a deployment model

Deployment
required

Returns:

Type Description
db.Deployment

the newly-created or updated deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def create_deployment(
    session: sa.orm.Session, deployment: schemas.core.Deployment, db: OrionDBInterface
):
    """Upserts a deployment.

    Args:
        session: a database session
        deployment: a deployment model

    Returns:
        db.Deployment: the newly-created or updated deployment

    """

    # set `updated` manually
    # known limitation of `on_conflict_do_update`, will not use `Column.onupdate`
    # https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#the-set-clause
    deployment.updated = pendulum.now("UTC")

    insert_values = deployment.dict(shallow=True, exclude_unset=True)

    insert_stmt = (
        (await db.insert(db.Deployment))
        .values(**insert_values)
        .on_conflict_do_update(
            index_elements=db.deployment_unique_upsert_columns,
            set_={
                **deployment.dict(
                    shallow=True,
                    include={
                        "schedule",
                        "is_schedule_active",
                        "description",
                        "tags",
                        "parameters",
                        "updated",
                        "storage_document_id",
                        "infrastructure_document_id",
                        "manifest_path",
                        "path",
                        "entrypoint",
                        "infra_overrides",
                    },
                ),
            },
        )
    )

    await session.execute(insert_stmt)

    query = (
        sa.select(db.Deployment)
        .where(
            sa.and_(
                db.Deployment.flow_id == deployment.flow_id,
                db.Deployment.name == deployment.name,
            )
        )
        .execution_options(populate_existing=True)
    )
    result = await session.execute(query)
    model = result.scalar()

    # because this could upsert a different schedule, delete any runs from the old
    # deployment
    await _delete_auto_scheduled_runs(session=session, deployment_id=model.id, db=db)

    return model

delete_deployment async

Delete a deployment by id.

Parameters:

Name Description Default
session

A database session

Session
required
deployment_id

a deployment id

UUID
required

Returns:

Type Description
bool

whether or not the deployment was deleted

Source code in prefect/orion/models/deployments.py
@inject_db
async def delete_deployment(
    session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
) -> bool:
    """
    Delete a deployment by id.

    Args:
        session: A database session
        deployment_id: a deployment id

    Returns:
        bool: whether or not the deployment was deleted
    """

    # delete scheduled runs, both auto- and user- created.
    delete_query = sa.delete(db.FlowRun).where(
        db.FlowRun.deployment_id == deployment_id,
        db.FlowRun.state_type == schemas.states.StateType.SCHEDULED.value,
    )
    await session.execute(delete_query)

    result = await session.execute(
        delete(db.Deployment).where(db.Deployment.id == deployment_id)
    )
    return result.rowcount > 0

read_deployment async

Reads a deployment by id.

Parameters:

Name Description Default
session

A database session

Session
required
deployment_id

a deployment id

UUID
required

Returns:

Type Description
db.Deployment

the deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment(
    session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
):
    """Reads a deployment by id.

    Args:
        session: A database session
        deployment_id: a deployment id

    Returns:
        db.Deployment: the deployment
    """

    return await session.get(db.Deployment, deployment_id)

read_deployment_by_name async

Reads a deployment by name.

Parameters:

Name Description Default
session

A database session

Session
required
name

a deployment name

str
required
flow_name

the name of the flow the deployment belongs to

str
required

Returns:

Type Description
db.Deployment

the deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment_by_name(
    session: sa.orm.Session, name: str, flow_name: str, db: OrionDBInterface
):
    """Reads a deployment by name.

    Args:
        session: A database session
        name: a deployment name
        flow_name: the name of the flow the deployment belongs to

    Returns:
        db.Deployment: the deployment
    """

    result = await session.execute(
        select(db.Deployment)
        .join(db.Flow, db.Deployment.flow_id == db.Flow.id)
        .where(
            sa.and_(
                db.Flow.name == flow_name,
                db.Deployment.name == name,
            )
        )
        .limit(1)
    )
    return result.scalar()

read_deployments async

Read deployments.

Parameters:

Name Description Default
session

A database session

Session
required
offset

Query offset

int
None
limit

Query limit

int
None
flow_filter

only select deployments whose flows match these criteria

FlowFilter
None
flow_run_filter

only select deployments whose flow runs match these criteria

FlowRunFilter
None
task_run_filter

only select deployments whose task runs match these criteria

TaskRunFilter
None
deployment_filter

only select deployment that match these filters

DeploymentFilter
None

Returns:

Type Description
List[db.Deployment]

deployments

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployments(
    session: sa.orm.Session,
    db: OrionDBInterface,
    offset: int = None,
    limit: int = None,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
):
    """
    Read deployments.

    Args:
        session: A database session
        offset: Query offset
        limit: Query limit
        flow_filter: only select deployments whose flows match these criteria
        flow_run_filter: only select deployments whose flow runs match these criteria
        task_run_filter: only select deployments whose task runs match these criteria
        deployment_filter: only select deployment that match these filters


    Returns:
        List[db.Deployment]: deployments
    """

    query = select(db.Deployment).order_by(db.Deployment.name)

    query = await _apply_deployment_filters(
        query=query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    if offset is not None:
        query = query.offset(offset)
    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

schedule_runs async

Schedule flow runs for a deployment

Parameters:

Name Description Default
session

a database session

Session
required
deployment_id

the id of the deployment to schedule

UUID
required
start_time

the time from which to start scheduling runs

datetime
None
end_time

a limit on how far in the future runs will be scheduled

datetime
None
max_runs

a maximum amount of runs to schedule

int
None

Returns:

Type Description
List[uuid.UUID]

a list of flow run ids scheduled for the deployment

Source code in prefect/orion/models/deployments.py
async def schedule_runs(
    session: sa.orm.Session,
    deployment_id: UUID,
    start_time: datetime.datetime = None,
    end_time: datetime.datetime = None,
    max_runs: int = None,
) -> List[UUID]:
    """
    Schedule flow runs for a deployment

    Args:
        session: a database session
        deployment_id: the id of the deployment to schedule
        start_time: the time from which to start scheduling runs
        end_time: a limit on how far in the future runs will be scheduled
        max_runs: a maximum amount of runs to schedule

    Returns:
        a list of flow run ids scheduled for the deployment
    """
    if max_runs is None:
        max_runs = PREFECT_ORION_SERVICES_SCHEDULER_MAX_RUNS.value()
    if start_time is None:
        start_time = pendulum.now("UTC")
    start_time = pendulum.instance(start_time)
    if end_time is None:
        end_time = start_time + (
            PREFECT_ORION_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value()
        )
    end_time = pendulum.instance(end_time)

    runs = await _generate_scheduled_flow_runs(
        session=session,
        deployment_id=deployment_id,
        start_time=start_time,
        end_time=end_time,
        max_runs=max_runs,
    )
    return await _insert_scheduled_flow_runs(session=session, runs=runs)

update_deployment async

Updates a deployment.

Parameters:

Name Description Default
session

a database session

Session
required
deployment_id

the ID of the deployment to modify

UUID
required
deployment

changes to a deployment model

DeploymentUpdate
required

Returns:

Type Description
bool

whether the deployment was updated

Source code in prefect/orion/models/deployments.py
@inject_db
async def update_deployment(
    session: sa.orm.Session,
    deployment_id: UUID,
    deployment: schemas.actions.DeploymentUpdate,
    db: OrionDBInterface,
) -> bool:
    """Updates a deployment.

    Args:
        session: a database session
        deployment_id: the ID of the deployment to modify
        deployment: changes to a deployment model

    Returns:
        bool: whether the deployment was updated

    """

    # exclude_unset=True allows us to only update values provided by
    # the user, ignoring any defaults on the model
    update_data = deployment.dict(shallow=True, exclude_unset=True)

    update_stmt = (
        sa.update(db.Deployment)
        .where(db.Deployment.id == deployment_id)
        .values(**update_data)
    )
    result = await session.execute(update_stmt)

    # delete any auto scheduled runs that would have reflected the old deployment config
    await _delete_auto_scheduled_runs(
        session=session, deployment_id=deployment_id, db=db
    )

    return result.rowcount > 0