Overview

The PrefectClient offers methods to simplify common operations against Prefect’s REST API that may not be abstracted away by the SDK.

For example, to reschedule flow runs, one might use methods like:

  • read_flow_runs with a FlowRunFilter to read certain flow runs
  • create_flow_run_from_deployment to schedule new flow runs
  • delete_flow_run to delete a very Late flow run

Getting a client

By default, get_client() returns an asynchronous client to be used as a context manager, but you may also use a synchronous client.

Examples

These examples are meant to illustrate how one might develop their own utilities for interacting with the API.

If you believe a client method is missing, or you’d like to see a specific pattern better represented in the SDK generally, please open an issue.

Reschedule late flow runs

To bulk reschedule flow runs that are late, delete the late flow runs and create new ones in a Scheduled state with a delay. This is useful if you accidentally scheduled many flow runs of a deployment to an inactive work pool, for example.

The following example reschedules the last three late flow runs of a deployment named healthcheck-storage-test to run six hours later than their original expected start time. It also deletes any remaining late flow runs of that deployment.

First, define the rescheduling function:

async def reschedule_late_flow_runs(
    deployment_name: str,
    delay: timedelta,
    most_recent_n: int,
    delete_remaining: bool = True,
    states: list[str] | None = None
) -> list[FlowRun]:
    states = states or ["Late"]

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=dict(name=dict(any_=states)),
                expected_start_time=dict(before_=datetime.now(timezone.utc)),
            ),
            deployment_filter=DeploymentFilter(name={'like_': deployment_name}),
            sort=FlowRunSort.START_TIME_DESC,
            limit=most_recent_n if not delete_remaining else None
        )

        rescheduled_flow_runs: list[FlowRun] = []
        for i, run in enumerate(flow_runs):
            await client.delete_flow_run(flow_run_id=run.id)
            if i < most_recent_n:
                new_run = await client.create_flow_run_from_deployment(
                    deployment_id=run.deployment_id,
                    state=Scheduled(scheduled_time=run.expected_start_time + delay),
                )
                rescheduled_flow_runs.append(new_run)
            
        return rescheduled_flow_runs

Then use it to reschedule flows:

rescheduled_flow_runs = asyncio.run(
    reschedule_late_flow_runs(
        deployment_name="healthcheck-storage-test",
        delay=timedelta(hours=6),
        most_recent_n=3,
    )
)

Get the last N completed flow runs from your workspace

To get the last N completed flow runs from your workspace, use read_flow_runs and prefect.client.schemas.

This example gets the last three completed flow runs from your workspace:

async def get_most_recent_flow_runs(
    n: int,
    states: list[str] | None = None
) -> list[FlowRun]:    
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state={'type': {'any_': states or ["COMPLETED"]}}
            ),
            sort=FlowRunSort.END_TIME_DESC,
            limit=n,
        )

Use it to get the last 3 completed runs:

flow_runs: list[FlowRun] = asyncio.run(
    get_most_recent_flow_runs(n=3)
)

Instead of the last three from the whole workspace, you can also use the DeploymentFilter to get the last three completed flow runs of a specific deployment.

Transition all running flows to cancelled through the Client

Use get_clientto set multiple runs to a Cancelled state. This example cancels all flow runs that are in Pending, Running, Scheduled, or Late states when the script is run.

async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]:
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=states)
                )
            )
        )

async def cancel_flow_runs(flow_runs: list[FlowRun]):
    async with get_client() as client:
        for flow_run in flow_runs:
            state = flow_run.state.copy(
                update={"name": "Cancelled", "type": StateType.CANCELLED}
            )
            await client.set_flow_run_state(flow_run.id, state, force=True)

Cancel all pending, running, scheduled or late flows:

async def bulk_cancel_flow_runs():
    states = ["Pending", "Running", "Scheduled", "Late"]
    flow_runs = await list_flow_runs_with_states(states)

    while flow_runs:
        print(f"Cancelling {len(flow_runs)} flow runs")
        await cancel_flow_runs(flow_runs)
        flow_runs = await list_flow_runs_with_states(states)

asyncio.run(bulk_cancel_flow_runs())