To bulk reschedule flow runs that are late, delete the late flow runs and create new ones in a
Scheduled state with a delay. This is useful if you accidentally scheduled many
flow runs of a deployment to an inactive work pool, for example.
The following example reschedules the last three late flow runs of a deployment named
healthcheck-storage-test to run six hours later than their original expected start time.
It also deletes any remaining late flow runs of that deployment.
First, define the rescheduling function:
async def reschedule_late_flow_runs( deployment_name: str, delay: timedelta, most_recent_n: int, delete_remaining: bool = True, states: list[str] | None = None) -> list[FlowRun]: states = states or ["Late"] async with get_client() as client: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=dict(name=dict(any_=states)), expected_start_time=dict(before_=datetime.now(timezone.utc)), ), deployment_filter=DeploymentFilter(name={'like_': deployment_name}), sort=FlowRunSort.START_TIME_DESC, limit=most_recent_n if not delete_remaining else None ) rescheduled_flow_runs: list[FlowRun] = [] for i, run in enumerate(flow_runs): await client.delete_flow_run(flow_run_id=run.id) if i < most_recent_n: new_run = await client.create_flow_run_from_deployment( deployment_id=run.deployment_id, state=Scheduled(scheduled_time=run.expected_start_time + delay), ) rescheduled_flow_runs.append(new_run) return rescheduled_flow_runs
from __future__ import annotationsimport asynciofrom prefect import get_clientfrom prefect.client.schemas.filters import FlowRunFilterfrom prefect.client.schemas.objects import FlowRunfrom prefect.client.schemas.sorting import FlowRunSortasync def get_most_recent_flow_runs( n: int, states: list[str] | None = None) -> list[FlowRun]: async with get_client() as client: return await client.read_flow_runs( flow_run_filter=FlowRunFilter( state={'type': {'any_': states or ["COMPLETED"]}} ), sort=FlowRunSort.END_TIME_DESC, limit=n, )if __name__ == "__main__": flow_runs: list[FlowRun] = asyncio.run( get_most_recent_flow_runs(n=3) ) assert len(flow_runs) == 3 assert all( run.state.is_completed() for run in flow_runs ) assert ( end_times := [run.end_time for run in flow_runs] ) == sorted(end_times, reverse=True)
Instead of the last three from the whole workspace, you can also use the DeploymentFilter
to get the last three completed flow runs of a specific deployment.
Transition all running flows to cancelled through the Client
Use get_clientto set multiple runs to a Cancelled state.
This example cancels all flow runs that are in Pending, Running, Scheduled, or Late states when the script is run.
async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]: async with get_client() as client: return await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( name=FlowRunFilterStateName(any_=states) ) ) )async def cancel_flow_runs(flow_runs: list[FlowRun]): async with get_client() as client: for flow_run in flow_runs: state = flow_run.state.copy( update={"name": "Cancelled", "type": StateType.CANCELLED} ) await client.set_flow_run_state(flow_run.id, state, force=True)
Cancel all pending, running, scheduled or late flows:
Create, read, or delete artifacts programmatically through the Prefect REST API.
With the Artifacts API, you can automate the creation and management of artifacts as part of your workflow.
For example, to read the five most recently created Markdown, table, and link artifacts, you can run the following: