Skip to content

prefect.server.models.flows

Functions for interacting with flow ORM objects. Intended for internal use by the Prefect REST API.

count_flows async

Count flows.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
flow_filter FlowFilter

only count flows that match these filters

None
flow_run_filter FlowRunFilter

only count flows whose flow runs match these filters

None
task_run_filter TaskRunFilter

only count flows whose task runs match these filters

None
deployment_filter DeploymentFilter

only count flows whose deployments match these filters

None
work_pool_filter WorkPoolFilter

only count flows whose work pools match these filters

None

Returns:

Name Type Description
int int

count of flows

Source code in prefect/server/models/flows.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@db_injector
async def count_flows(
    db: PrefectDBInterface,
    session: AsyncSession,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    work_pool_filter: schemas.filters.WorkPoolFilter = None,
) -> int:
    """
    Count flows.

    Args:
        session: A database session
        flow_filter: only count flows that match these filters
        flow_run_filter: only count flows whose flow runs match these filters
        task_run_filter: only count flows whose task runs match these filters
        deployment_filter: only count flows whose deployments match these filters
        work_pool_filter: only count flows whose work pools match these filters

    Returns:
        int: count of flows
    """

    query = select(sa.func.count(sa.text("*"))).select_from(db.Flow)

    query = await _apply_flow_filters(
        query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        work_pool_filter=work_pool_filter,
    )

    result = await session.execute(query)
    return result.scalar()

create_flow async

Creates a new flow.

If a flow with the same name already exists, the existing flow is returned.

Parameters:

Name Type Description Default
session AsyncSession

a database session

required
flow Flow

a flow model

required

Returns:

Type Description
ORMFlow

db.Flow: the newly-created or existing flow

Source code in prefect/server/models/flows.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@db_injector
async def create_flow(
    db: PrefectDBInterface, session: AsyncSession, flow: schemas.core.Flow
) -> ORMFlow:
    """
    Creates a new flow.

    If a flow with the same name already exists, the existing flow is returned.

    Args:
        session: a database session
        flow: a flow model

    Returns:
        db.Flow: the newly-created or existing flow
    """

    insert_stmt = (
        db.insert(db.Flow)
        .values(**flow.dict(shallow=True, exclude_unset=True))
        .on_conflict_do_nothing(
            index_elements=db.flow_unique_upsert_columns,
        )
    )
    await session.execute(insert_stmt)

    query = (
        sa.select(db.Flow)
        .where(
            db.Flow.name == flow.name,
        )
        .limit(1)
        .execution_options(populate_existing=True)
    )
    result = await session.execute(query)
    model = result.scalar()
    return model

delete_flow async

Delete a flow by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
flow_id UUID

a flow id

required

Returns:

Name Type Description
bool bool

whether or not the flow was deleted

Source code in prefect/server/models/flows.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
@db_injector
async def delete_flow(
    db: PrefectDBInterface, session: AsyncSession, flow_id: UUID
) -> bool:
    """
    Delete a flow by id.

    Args:
        session: A database session
        flow_id: a flow id

    Returns:
        bool: whether or not the flow was deleted
    """

    result = await session.execute(delete(db.Flow).where(db.Flow.id == flow_id))
    return result.rowcount > 0

read_flow async

Reads a flow by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
flow_id UUID

a flow id

required

Returns:

Type Description
Optional[ORMFlow]

db.Flow: the flow

Source code in prefect/server/models/flows.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@db_injector
async def read_flow(
    db: PrefectDBInterface, session: AsyncSession, flow_id: UUID
) -> Optional[ORMFlow]:
    """
    Reads a flow by id.

    Args:
        session: A database session
        flow_id: a flow id

    Returns:
        db.Flow: the flow
    """
    return await session.get(db.Flow, flow_id)

read_flow_by_name async

Reads a flow by name.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
name str

a flow name

required

Returns:

Type Description
Optional[ORMFlow]

db.Flow: the flow

Source code in prefect/server/models/flows.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@db_injector
async def read_flow_by_name(
    db: PrefectDBInterface, session: AsyncSession, name: str
) -> Optional[ORMFlow]:
    """
    Reads a flow by name.

    Args:
        session: A database session
        name: a flow name

    Returns:
        db.Flow: the flow
    """

    result = await session.execute(select(db.Flow).filter_by(name=name))
    return result.scalar()

read_flows async

Read multiple flows.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
flow_filter FlowFilter

only select flows that match these filters

None
flow_run_filter FlowRunFilter

only select flows whose flow runs match these filters

None
task_run_filter TaskRunFilter

only select flows whose task runs match these filters

None
deployment_filter DeploymentFilter

only select flows whose deployments match these filters

None
work_pool_filter WorkPoolFilter

only select flows whose work pools match these filters

None
offset int

Query offset

None
limit int

Query limit

None

Returns:

Type Description
Sequence[ORMFlow]

List[db.Flow]: flows

Source code in prefect/server/models/flows.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@db_injector
async def read_flows(
    db: PrefectDBInterface,
    session: AsyncSession,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    work_pool_filter: schemas.filters.WorkPoolFilter = None,
    sort: schemas.sorting.FlowSort = schemas.sorting.FlowSort.NAME_ASC,
    offset: int = None,
    limit: int = None,
) -> Sequence[ORMFlow]:
    """
    Read multiple flows.

    Args:
        session: A database session
        flow_filter: only select flows that match these filters
        flow_run_filter: only select flows whose flow runs match these filters
        task_run_filter: only select flows whose task runs match these filters
        deployment_filter: only select flows whose deployments match these filters
        work_pool_filter: only select flows whose work pools match these filters
        offset: Query offset
        limit: Query limit

    Returns:
        List[db.Flow]: flows
    """

    query = select(db.Flow).order_by(sort.as_sql_sort(db=db))

    query = await _apply_flow_filters(
        query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        work_pool_filter=work_pool_filter,
    )

    if offset is not None:
        query = query.offset(offset)

    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

update_flow async

Updates a flow.

Parameters:

Name Type Description Default
session AsyncSession

a database session

required
flow_id UUID

the flow id to update

required
flow FlowUpdate

a flow update model

required

Returns:

Name Type Description
bool bool

whether or not matching rows were found to update

Source code in prefect/server/models/flows.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@db_injector
async def update_flow(
    db: PrefectDBInterface,
    session: AsyncSession,
    flow_id: UUID,
    flow: schemas.actions.FlowUpdate,
) -> bool:
    """
    Updates a flow.

    Args:
        session: a database session
        flow_id: the flow id to update
        flow: a flow update model

    Returns:
        bool: whether or not matching rows were found to update
    """
    update_stmt = (
        sa.update(db.Flow)
        .where(db.Flow.id == flow_id)
        # exclude_unset=True allows us to only update values provided by
        # the user, ignoring any defaults on the model
        .values(**flow.dict(shallow=True, exclude_unset=True))
    )
    result = await session.execute(update_stmt)
    return result.rowcount > 0