The PrefectClient
offers methods to simplify common operations against Prefect’s REST API that may not be abstracted away by the SDK.For example, to reschedule flow runs, one might use methods like:
read_flow_runs with a FlowRunFilter to read certain flow runs
create_flow_run_from_deployment to schedule new flow runs
Client methods that accept limit and offset parameters - such as read_flow_runs, read_deployments,
and read_task_runs — are subject to a server-side maximum.
When limit is None (the default), the server applies PREFECT_API_DEFAULT_LIMIT, which defaults
to 200. To retrieve all matching records, paginate with offset:
from prefect import get_clientasync def read_all_deployments(page_size: int = 200): all_deployments = [] offset = 0 async with get_client() as client: while True: page = await client.read_deployments( limit=page_size, offset=offset ) if not page: break all_deployments.extend(page) if len(page) < page_size: break offset += page_size return all_deployments
You can configure custom HTTP headers to be sent with every API request by setting the PREFECT_CLIENT_CUSTOM_HEADERS setting. This is useful for adding authentication headers, API keys, or other custom headers required by proxies, CDNs, or security systems.
To bulk reschedule flow runs that are late, delete the late flow runs and create new ones in a
Scheduled state with a delay. This is useful if you accidentally scheduled many
flow runs of a deployment to an inactive work pool, for example.The following example reschedules the last three late flow runs of a deployment named
healthcheck-storage-test to run six hours later than their original expected start time.
It also deletes any remaining late flow runs of that deployment.First, define the rescheduling function:
async def reschedule_late_flow_runs( deployment_name: str, delay: timedelta, most_recent_n: int, delete_remaining: bool = True, states: list[str] | None = None) -> list[FlowRun]: states = states or ["Late"] async with get_client() as client: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=dict(name=dict(any_=states)), expected_start_time=dict(before_=datetime.now(timezone.utc)), ), deployment_filter=DeploymentFilter(name={'like_': deployment_name}), sort=FlowRunSort.START_TIME_DESC, limit=most_recent_n if not delete_remaining else None ) rescheduled_flow_runs: list[FlowRun] = [] for i, run in enumerate(flow_runs): await client.delete_flow_run(flow_run_id=run.id) if i < most_recent_n: new_run = await client.create_flow_run_from_deployment( deployment_id=run.deployment_id, state=Scheduled(scheduled_time=run.expected_start_time + delay), ) rescheduled_flow_runs.append(new_run) return rescheduled_flow_runs
Get the last N completed flow runs from your workspace
To get the last N completed flow runs from your workspace, use read_flow_runs and prefect.client.schemas.This example gets the last three completed flow runs from your workspace:
async def get_most_recent_flow_runs( n: int, states: list[str] | None = None) -> list[FlowRun]: async with get_client() as client: return await client.read_flow_runs( flow_run_filter=FlowRunFilter( state={'type': {'any_': states or ["COMPLETED"]}} ), sort=FlowRunSort.END_TIME_DESC, limit=n, )
from __future__ import annotationsimport asynciofrom prefect import get_clientfrom prefect.client.schemas.filters import FlowRunFilterfrom prefect.client.schemas.objects import FlowRunfrom prefect.client.schemas.sorting import FlowRunSortasync def get_most_recent_flow_runs( n: int, states: list[str] | None = None) -> list[FlowRun]: async with get_client() as client: return await client.read_flow_runs( flow_run_filter=FlowRunFilter( state={'type': {'any_': states or ["COMPLETED"]}} ), sort=FlowRunSort.END_TIME_DESC, limit=n, )if __name__ == "__main__": flow_runs: list[FlowRun] = asyncio.run( get_most_recent_flow_runs(n=3) ) assert len(flow_runs) == 3 assert all( run.state.is_completed() for run in flow_runs ) assert ( end_times := [run.end_time for run in flow_runs] ) == sorted(end_times, reverse=True)
Instead of the last three from the whole workspace, you can also use the DeploymentFilter
to get the last three completed flow runs of a specific deployment.
Transition all running flows to cancelled through the Client
Use get_clientto set multiple runs to a Cancelled state.
This example cancels all flow runs that are in Pending, Running, Scheduled, or Late states when the script is run.
async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]: async with get_client() as client: return await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( name=FlowRunFilterStateName(any_=states) ) ) )async def cancel_flow_runs(flow_runs: list[FlowRun]): async with get_client() as client: for flow_run in flow_runs: state = flow_run.state.copy( update={"name": "Cancelled", "type": StateType.CANCELLED} ) await client.set_flow_run_state(flow_run.id, state, force=True)
Cancel all pending, running, scheduled or late flows:
Query historical events from the Prefect API with support for filtering and pagination. This is useful for analyzing past activity, debugging issues, or building custom monitoring tools.The following example queries events from the last hour and demonstrates how to paginate through results:
from datetime import datetime, timedelta, timezonefrom prefect import get_clientfrom prefect.events.filters import EventFilter, EventOccurredFilterasync def query_recent_events(): async with get_client() as client: # query events from the last hour now = datetime.now(timezone.utc) event_filter = EventFilter( occurred=EventOccurredFilter( since=now - timedelta(hours=1), until=now, ) ) # get first page page = await client.read_events(filter=event_filter, limit=10) print(f"Total events: {page.total}") # iterate through all pages while page: for event in page.events: print(f"{event.occurred} - {event.event}") page = await page.get_next_page(client)
View the complete example
query_events.py
import asynciofrom datetime import datetime, timedelta, timezonefrom prefect import get_clientfrom prefect.events.filters import EventFilter, EventOccurredFilterasync def query_recent_events(): async with get_client() as client: # query events from the last hour now = datetime.now(timezone.utc) event_filter = EventFilter( occurred=EventOccurredFilter( since=now - timedelta(hours=1), until=now, ) ) # get first page with small limit to demonstrate pagination print("=== first page ===") event_page = await client.read_events(filter=event_filter, limit=5) print(f"total events: {event_page.total}") print(f"events on this page: {len(event_page.events)}") for event in event_page.events: print(f" {event.occurred} - {event.event}") print() # if there are more pages, fetch the next one second_page = await event_page.get_next_page(client) if second_page: print("=== second page ===") print(f"events on this page: {len(second_page.events)}") for event in second_page.events: print(f" {event.occurred} - {event.event}") print() # demonstrate iterating through all pages print("=== collecting all events ===") all_events = [] page = await client.read_events(filter=event_filter, limit=5) page_count = 0 while page: all_events.extend(page.events) page_count += 1 page = await page.get_next_page(client) print(f"collected {len(all_events)} events across {page_count} pages")if __name__ == "__main__": asyncio.run(query_recent_events())
Create, read, or delete artifacts programmatically through the Prefect REST API.
With the Artifacts API, you can automate the creation and management of artifacts as part of your workflow.For example, to read the five most recently created Markdown, table, and link artifacts, you can run the following:
If you don’t specify a key or that a key must exist, you will also return results, which are a type of key-less artifact.See the Prefect REST API documentation on artifacts for more information.