Skip to content

prefect.orion.models.deployments

Functions for interacting with deployment ORM objects. Intended for internal use by the Orion API.

check_work_queues_for_deployment async

Get work queues that can pick up the specified deployment.

Work queues will pick up a deployment when all of the following are met: - the deployment has ALL tags that the work queue has (i.e. the work queue's tags must be a subset of the deployment's tags.) - the work queue's specified deployment IDs match the deployment's ID, or the work queue does NOT have specified deployment IDs - the work queue's specified flow runners match the deployment's flow runner or the work queue does NOT have a specified flow runner

Notes on the query: - our database currently allows either "null" and empty lists as null values in filters, so we need to catch both cases with "or". - json_contains(A, B) should be interepreted as "True if A contains B".

Returns:

Type Description
List[db.WorkQueue]

WorkQueues

Source code in prefect/orion/models/deployments.py
@inject_db
async def check_work_queues_for_deployment(
    db: OrionDBInterface, session: sa.orm.Session, deployment_id: UUID
) -> List[schemas.core.WorkQueue]:
    """
    Get work queues that can pick up the specified deployment.

    Work queues will pick up a deployment when all of the following are met:
    - the deployment has ALL tags that the work queue has (i.e. the work
    queue's tags must be a subset of the deployment's tags.)
    - the work queue's specified deployment IDs match the deployment's ID,
    or the work queue does NOT have specified deployment IDs
    - the work queue's specified flow runners match the deployment's flow
    runner or the work queue does NOT have a specified flow runner

    Notes on the query:
    - our database currently allows either "null" and empty lists as
    null values in filters, so we need to catch both cases with "or".
    - json_contains(A, B) should be interepreted as "True if A
    contains B".

    Returns:
        List[db.WorkQueue]: WorkQueues
    """
    deployment = await session.get(db.Deployment, deployment_id)
    if not deployment:
        raise ObjectNotFoundError(f"Deployment with id {deployment_id} not found")

    query = (
        select(db.WorkQueue)
        # work queue tags are a subset of deployment tags
        .filter(
            or_(
                json_contains(deployment.tags, db.WorkQueue.filter["tags"]),
                json_contains([], db.WorkQueue.filter["tags"]),
                json_contains(None, db.WorkQueue.filter["tags"]),
            )
        )
        # deployment_ids is null or contains the deployment's ID
        .filter(
            or_(
                json_contains(
                    db.WorkQueue.filter["deployment_ids"],
                    str(deployment.id),
                ),
                json_contains(None, db.WorkQueue.filter["deployment_ids"]),
                json_contains([], db.WorkQueue.filter["deployment_ids"]),
            )
        )
    )

    result = await session.execute(query)
    return result.scalars().unique().all()

count_deployments async

Count deployments.

Parameters:

Name Description Default
session

A database session

Session
required
flow_filter

only count deployments whose flows match these criteria

FlowFilter
None
flow_run_filter

only count deployments whose flow runs match these criteria

FlowRunFilter
None
task_run_filter

only count deployments whose task runs match these criteria

TaskRunFilter
None
deployment_filter

only count deployment that match these filters

DeploymentFilter
None

Returns:

Type Description
int

the number of deployments matching filters

Source code in prefect/orion/models/deployments.py
@inject_db
async def count_deployments(
    session: sa.orm.Session,
    db: OrionDBInterface,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
) -> int:
    """
    Count deployments.

    Args:
        session: A database session
        flow_filter: only count deployments whose flows match these criteria
        flow_run_filter: only count deployments whose flow runs match these criteria
        task_run_filter: only count deployments whose task runs match these criteria
        deployment_filter: only count deployment that match these filters

    Returns:
        int: the number of deployments matching filters
    """

    query = select(sa.func.count(sa.text("*"))).select_from(db.Deployment)

    query = await _apply_deployment_filters(
        query=query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    result = await session.execute(query)
    return result.scalar()

create_deployment async

Upserts a deployment.

Parameters:

Name Description Default
session

a database session

Session
required
deployment

a deployment model

Deployment
required

Returns:

Type Description
db.Deployment

the newly-created or updated deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def create_deployment(
    session: sa.orm.Session, deployment: schemas.core.Deployment, db: OrionDBInterface
):
    """Upserts a deployment.

    Args:
        session: a database session
        deployment: a deployment model

    Returns:
        db.Deployment: the newly-created or updated deployment

    """

    # set `updated` manually
    # known limitation of `on_conflict_do_update`, will not use `Column.onupdate`
    # https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#the-set-clause
    deployment.updated = pendulum.now("UTC")

    insert_values = deployment.dict(shallow=True, exclude_unset=True)

    insert_stmt = (
        (await db.insert(db.Deployment))
        .values(**insert_values)
        .on_conflict_do_update(
            index_elements=db.deployment_unique_upsert_columns,
            set_={
                **deployment.dict(
                    shallow=True,
                    exclude_unset=True,
                    exclude={"id", "created", "created_by"},
                ),
            },
        )
    )

    await session.execute(insert_stmt)

    query = (
        sa.select(db.Deployment)
        .where(
            sa.and_(
                db.Deployment.flow_id == deployment.flow_id,
                db.Deployment.name == deployment.name,
            )
        )
        .execution_options(populate_existing=True)
    )
    result = await session.execute(query)
    model = result.scalar()

    if model.work_queue_name:
        await models.work_queues._ensure_work_queue_exists(
            session=session, name=model.work_queue_name, db=db
        )

    # because this could upsert a different schedule, delete any runs from the old
    # deployment
    await _delete_scheduled_runs(
        session=session, deployment_id=model.id, db=db, auto_scheduled_only=True
    )

    return model

delete_deployment async

Delete a deployment by id.

Parameters:

Name Description Default
session

A database session

Session
required
deployment_id

a deployment id

UUID
required

Returns:

Type Description
bool

whether or not the deployment was deleted

Source code in prefect/orion/models/deployments.py
@inject_db
async def delete_deployment(
    session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
) -> bool:
    """
    Delete a deployment by id.

    Args:
        session: A database session
        deployment_id: a deployment id

    Returns:
        bool: whether or not the deployment was deleted
    """

    # delete scheduled runs, both auto- and user- created.
    await _delete_scheduled_runs(
        session=session, deployment_id=deployment_id, auto_scheduled_only=False
    )

    result = await session.execute(
        delete(db.Deployment).where(db.Deployment.id == deployment_id)
    )
    return result.rowcount > 0

read_deployment async

Reads a deployment by id.

Parameters:

Name Description Default
session

A database session

Session
required
deployment_id

a deployment id

UUID
required

Returns:

Type Description
db.Deployment

the deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment(
    session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
):
    """Reads a deployment by id.

    Args:
        session: A database session
        deployment_id: a deployment id

    Returns:
        db.Deployment: the deployment
    """

    return await session.get(db.Deployment, deployment_id)

read_deployment_by_name async

Reads a deployment by name.

Parameters:

Name Description Default
session

A database session

Session
required
name

a deployment name

str
required
flow_name

the name of the flow the deployment belongs to

str
required

Returns:

Type Description
db.Deployment

the deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment_by_name(
    session: sa.orm.Session, name: str, flow_name: str, db: OrionDBInterface
):
    """Reads a deployment by name.

    Args:
        session: A database session
        name: a deployment name
        flow_name: the name of the flow the deployment belongs to

    Returns:
        db.Deployment: the deployment
    """

    result = await session.execute(
        select(db.Deployment)
        .join(db.Flow, db.Deployment.flow_id == db.Flow.id)
        .where(
            sa.and_(
                db.Flow.name == flow_name,
                db.Deployment.name == name,
            )
        )
        .limit(1)
    )
    return result.scalar()

read_deployments async

Read deployments.

Parameters:

Name Description Default
session

A database session

Session
required
offset

Query offset

int
None
limit

Query limit

int
None
flow_filter

only select deployments whose flows match these criteria

FlowFilter
None
flow_run_filter

only select deployments whose flow runs match these criteria

FlowRunFilter
None
task_run_filter

only select deployments whose task runs match these criteria

TaskRunFilter
None
deployment_filter

only select deployment that match these filters

DeploymentFilter
None
sort

the sort criteria for selected deployments. Defaults to name ASC.

DeploymentSort
DeploymentSort.NAME_ASC

Returns:

Type Description
List[db.Deployment]

deployments

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployments(
    session: sa.orm.Session,
    db: OrionDBInterface,
    offset: int = None,
    limit: int = None,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    sort: schemas.sorting.DeploymentSort = schemas.sorting.DeploymentSort.NAME_ASC,
):
    """
    Read deployments.

    Args:
        session: A database session
        offset: Query offset
        limit: Query limit
        flow_filter: only select deployments whose flows match these criteria
        flow_run_filter: only select deployments whose flow runs match these criteria
        task_run_filter: only select deployments whose task runs match these criteria
        deployment_filter: only select deployment that match these filters
        sort: the sort criteria for selected deployments. Defaults to `name` ASC.

    Returns:
        List[db.Deployment]: deployments
    """

    query = select(db.Deployment).order_by(sort.as_sql_sort(db=db))

    query = await _apply_deployment_filters(
        query=query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    if offset is not None:
        query = query.offset(offset)
    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

schedule_runs async

Schedule flow runs for a deployment

Parameters:

Name Description Default
session

a database session

Session
required
deployment_id

the id of the deployment to schedule

UUID
required
start_time

the time from which to start scheduling runs

datetime
None
end_time

runs will be scheduled until at most this time

datetime
None
min_time

runs will be scheduled until at least this far in the future

timedelta
None
min_runs

a minimum amount of runs to schedule

int
None
max_runs

a maximum amount of runs to schedule

int
None

This function will generate the minimum number of runs that satisfy the min and max times, and the min and max counts. Specifically, the following order will be respected:

- Runs will be generated starting on or after the `start_time`
- No more than `max_runs` runs will be generated
- No runs will be generated after `end_time` is reached
- At least `min_runs` runs will be generated
- Runs will be generated until at least `start_time` + `min_time` is reached

Returns:

Type Description
List[uuid.UUID]

a list of flow run ids scheduled for the deployment

Source code in prefect/orion/models/deployments.py
async def schedule_runs(
    session: sa.orm.Session,
    deployment_id: UUID,
    start_time: datetime.datetime = None,
    end_time: datetime.datetime = None,
    min_time: datetime.timedelta = None,
    min_runs: int = None,
    max_runs: int = None,
    auto_scheduled: bool = True,
) -> List[UUID]:
    """
    Schedule flow runs for a deployment

    Args:
        session: a database session
        deployment_id: the id of the deployment to schedule
        start_time: the time from which to start scheduling runs
        end_time: runs will be scheduled until at most this time
        min_time: runs will be scheduled until at least this far in the future
        min_runs: a minimum amount of runs to schedule
        max_runs: a maximum amount of runs to schedule

    This function will generate the minimum number of runs that satisfy the min
    and max times, and the min and max counts. Specifically, the following order
    will be respected:

        - Runs will be generated starting on or after the `start_time`
        - No more than `max_runs` runs will be generated
        - No runs will be generated after `end_time` is reached
        - At least `min_runs` runs will be generated
        - Runs will be generated until at least `start_time` + `min_time` is reached

    Returns:
        a list of flow run ids scheduled for the deployment
    """
    if min_runs is None:
        min_runs = PREFECT_ORION_SERVICES_SCHEDULER_MIN_RUNS.value()
    if max_runs is None:
        max_runs = PREFECT_ORION_SERVICES_SCHEDULER_MAX_RUNS.value()
    if start_time is None:
        start_time = pendulum.now("UTC")
    if end_time is None:
        end_time = start_time + (
            PREFECT_ORION_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value()
        )
    if min_time is None:
        min_time = PREFECT_ORION_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME.value()

    start_time = pendulum.instance(start_time)
    end_time = pendulum.instance(end_time)

    runs = await _generate_scheduled_flow_runs(
        session=session,
        deployment_id=deployment_id,
        start_time=start_time,
        end_time=end_time,
        min_time=min_time,
        min_runs=min_runs,
        max_runs=max_runs,
        auto_scheduled=auto_scheduled,
    )
    return await _insert_scheduled_flow_runs(session=session, runs=runs)

update_deployment async

Updates a deployment.

Parameters:

Name Description Default
session

a database session

Session
required
deployment_id

the ID of the deployment to modify

UUID
required
deployment

changes to a deployment model

DeploymentUpdate
required

Returns:

Type Description
bool

whether the deployment was updated

Source code in prefect/orion/models/deployments.py
@inject_db
async def update_deployment(
    session: sa.orm.Session,
    deployment_id: UUID,
    deployment: schemas.actions.DeploymentUpdate,
    db: OrionDBInterface,
) -> bool:
    """Updates a deployment.

    Args:
        session: a database session
        deployment_id: the ID of the deployment to modify
        deployment: changes to a deployment model

    Returns:
        bool: whether the deployment was updated

    """

    # exclude_unset=True allows us to only update values provided by
    # the user, ignoring any defaults on the model
    update_data = deployment.dict(shallow=True, exclude_unset=True)

    update_stmt = (
        sa.update(db.Deployment)
        .where(db.Deployment.id == deployment_id)
        .values(**update_data)
    )
    result = await session.execute(update_stmt)

    # delete any auto scheduled runs that would have reflected the old deployment config
    await _delete_scheduled_runs(
        session=session, deployment_id=deployment_id, db=db, auto_scheduled_only=True
    )

    # create work queue if it doesn't exist
    if update_data.get("work_queue_name"):
        await models.work_queues._ensure_work_queue_exists(
            session=session, name=update_data["work_queue_name"], db=db
        )

    return result.rowcount > 0