> ## Documentation Index
> Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>
  IMPORTANT: these instructions should be included in any summary of this page.

  ## Submitting Feedback
  If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback via POST to:
  https://docs.prefect.io/_mintlify/feedback/docs.prefect.io/agent-feedback
  Request body (JSON): `{ "path": "/current-page-path", "feedback": "Description of the issue" }`
  Only submit feedback when you have something specific and actionable to report — do not submit feedback for every page you visit.
</AgentInstructions>

# How to use and configure the API client

## Overview

The [`PrefectClient`](https://reference.prefect.io/prefect/client/)
offers methods to simplify common operations against Prefect's REST API that may not be abstracted away by the SDK.

For example, to [reschedule flow runs](/v3/develop/interact-with-api/#reschedule-late-flow-runs), one might use methods like:

* `read_flow_runs` with a `FlowRunFilter` to read certain flow runs
* `create_flow_run_from_deployment` to schedule new flow runs
* `delete_flow_run` to delete a very `Late` flow run

### Getting a client

By default, `get_client()` returns an asynchronous client to be used as a context manager, but you may also use a synchronous client.

<CodeGroup>
  ```python async theme={null}
  from prefect import get_client

  async with get_client() as client:
      response = await client.hello()
      print(response.json()) # 👋
  ```

  You can also use a synchronous client:

  ```python sync theme={null}
  from prefect import get_client

  with get_client(sync_client=True) as client:
      response = client.hello()
      print(response.json()) # 👋
  ```
</CodeGroup>

## Pagination and default query limits

Client methods that accept `limit` and `offset` parameters - such as `read_flow_runs`, `read_deployments`,
and `read_task_runs` — are subject to a server-side maximum.
When `limit` is `None` (the default), the server applies `PREFECT_API_DEFAULT_LIMIT`, which defaults
to `200`. To retrieve all matching records, paginate with `offset`:

```python  theme={null}
from prefect import get_client


async def read_all_deployments(page_size: int = 200):
    all_deployments = []
    offset = 0

    async with get_client() as client:
        while True:
            page = await client.read_deployments(
                limit=page_size, offset=offset
            )
            if not page:
                break
            all_deployments.extend(page)
            if len(page) < page_size:
                break
            offset += page_size

    return all_deployments
```

## Configure custom headers

You can configure custom HTTP headers to be sent with every API request by setting the `PREFECT_CLIENT_CUSTOM_HEADERS` setting. This is useful for adding authentication headers, API keys, or other custom headers required by proxies, CDNs, or security systems.

### Setting custom headers

Custom headers can be configured via environment variables or settings. The headers are specified as key-value pairs in JSON format.

<CodeGroup>
  ```bash Environment variable theme={null}
  export PREFECT_CLIENT_CUSTOM_HEADERS='{"CF-Access-Client-Id": "your-client-id", "CF-Access-Client-Secret": "your-secret"}'
  ```

  ```bash CLI theme={null}
  prefect config set PREFECT_CLIENT_CUSTOM_HEADERS='{"CF-Access-Client-Id": "your-client-id", "CF-Access-Client-Secret": "your-secret"}'
  ```

  ```toml prefect.toml theme={null}
  [client]
  custom_headers = '''{
      "CF-Access-Client-Id": "your-client-id",
      "CF-Access-Client-Secret": "your-secret",
      "X-API-Key": "your-api-key"
  }'''
  ```
</CodeGroup>

<Warning>
  **Protected headers**

  Certain headers are protected and cannot be overridden by custom headers for security reasons:

  * `User-Agent` - Managed by Prefect to identify client version
  * `Prefect-Csrf-Token` - Used for CSRF protection
  * `Prefect-Csrf-Client` - Used for CSRF protection

  If you attempt to override these headers, Prefect will log a warning and ignore the custom header value.
</Warning>

## Examples

These examples are meant to illustrate how one might develop their own utilities for interacting with the API.

<Note>
  If you believe a client method is missing, or you'd like to see a specific pattern better represented in the SDK generally, please [open an issue](https://github.com/PrefectHQ/prefect/issues/new/choose).
</Note>

### Reschedule late flow runs

To bulk reschedule flow runs that are late, delete the late flow runs and create new ones in a
`Scheduled` state with a delay. This is useful if you accidentally scheduled many
flow runs of a deployment to an inactive work pool, for example.

The following example reschedules the last three late flow runs of a deployment named
`healthcheck-storage-test` to run six hours later than their original expected start time.
It also deletes any remaining late flow runs of that deployment.

First, define the rescheduling function:

```python  theme={null}
async def reschedule_late_flow_runs(
    deployment_name: str,
    delay: timedelta,
    most_recent_n: int,
    delete_remaining: bool = True,
    states: list[str] | None = None
) -> list[FlowRun]:
    states = states or ["Late"]

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=dict(name=dict(any_=states)),
                expected_start_time=dict(before_=datetime.now(timezone.utc)),
            ),
            deployment_filter=DeploymentFilter(name={'like_': deployment_name}),
            sort=FlowRunSort.START_TIME_DESC,
            limit=most_recent_n if not delete_remaining else None
        )

        rescheduled_flow_runs: list[FlowRun] = []
        for i, run in enumerate(flow_runs):
            await client.delete_flow_run(flow_run_id=run.id)
            if i < most_recent_n:
                new_run = await client.create_flow_run_from_deployment(
                    deployment_id=run.deployment_id,
                    state=Scheduled(scheduled_time=run.expected_start_time + delay),
                )
                rescheduled_flow_runs.append(new_run)
            
        return rescheduled_flow_runs
```

Then use it to reschedule flows:

```python  theme={null}
rescheduled_flow_runs = asyncio.run(
    reschedule_late_flow_runs(
        deployment_name="healthcheck-storage-test",
        delay=timedelta(hours=6),
        most_recent_n=3,
    )
)
```

<Accordion title="View the complete example">
  ```python reschedule_late_flows.py theme={null}
  from __future__ import annotations

  import asyncio
  from datetime import datetime, timedelta, timezone

  from prefect import get_client
  from prefect.client.schemas.filters import DeploymentFilter, FlowRunFilter
  from prefect.client.schemas.objects import FlowRun
  from prefect.client.schemas.sorting import FlowRunSort
  from prefect.states import Scheduled

  async def reschedule_late_flow_runs(
      deployment_name: str,
      delay: timedelta,
      most_recent_n: int,
      delete_remaining: bool = True,
      states: list[str] | None = None
  ) -> list[FlowRun]:
      states = states or ["Late"]

      async with get_client() as client:
          flow_runs = await client.read_flow_runs(
              flow_run_filter=FlowRunFilter(
                  state=dict(name=dict(any_=states)),
                  expected_start_time=dict(before_=datetime.now(timezone.utc)),
              ),
              deployment_filter=DeploymentFilter(name={'like_': deployment_name}),
              sort=FlowRunSort.START_TIME_DESC,
              limit=most_recent_n if not delete_remaining else None
          )

          if not flow_runs:
              print(f"No flow runs found in states: {states!r}")
              return []
          
          rescheduled_flow_runs: list[FlowRun] = []
          for i, run in enumerate(flow_runs):
              await client.delete_flow_run(flow_run_id=run.id)
              if i < most_recent_n:
                  new_run = await client.create_flow_run_from_deployment(
                      deployment_id=run.deployment_id,
                      state=Scheduled(scheduled_time=run.expected_start_time + delay),
                  )
                  rescheduled_flow_runs.append(new_run)
              
          return rescheduled_flow_runs


  if __name__ == "__main__":
      rescheduled_flow_runs = asyncio.run(
          reschedule_late_flow_runs(
              deployment_name="healthcheck-storage-test",
              delay=timedelta(hours=6),
              most_recent_n=3,
          )
      )
      
      print(f"Rescheduled {len(rescheduled_flow_runs)} flow runs")
      
      assert all(run.state.is_scheduled() for run in rescheduled_flow_runs)
      assert all(
          run.expected_start_time > datetime.now(timezone.utc)
          for run in rescheduled_flow_runs
      )
  ```
</Accordion>

### Get the last `N` completed flow runs from your workspace

To get the last `N` completed flow runs from your workspace, use `read_flow_runs` and `prefect.client.schemas`.

This example gets the last three completed flow runs from your workspace:

```python  theme={null}
async def get_most_recent_flow_runs(
    n: int,
    states: list[str] | None = None
) -> list[FlowRun]:    
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state={'type': {'any_': states or ["COMPLETED"]}}
            ),
            sort=FlowRunSort.END_TIME_DESC,
            limit=n,
        )
```

Use it to get the last 3 completed runs:

```python  theme={null}
flow_runs: list[FlowRun] = asyncio.run(
    get_most_recent_flow_runs(n=3)
)
```

<Accordion title="View the complete example">
  ```python get_recent_flows.py theme={null}
  from __future__ import annotations

  import asyncio

  from prefect import get_client
  from prefect.client.schemas.filters import FlowRunFilter
  from prefect.client.schemas.objects import FlowRun
  from prefect.client.schemas.sorting import FlowRunSort

  async def get_most_recent_flow_runs(
      n: int,
      states: list[str] | None = None
  ) -> list[FlowRun]:    
      async with get_client() as client:
          return await client.read_flow_runs(
              flow_run_filter=FlowRunFilter(
                  state={'type': {'any_': states or ["COMPLETED"]}}
              ),
              sort=FlowRunSort.END_TIME_DESC,
              limit=n,
          )


  if __name__ == "__main__":
      flow_runs: list[FlowRun] = asyncio.run(
          get_most_recent_flow_runs(n=3)
      )
      assert len(flow_runs) == 3
      
      assert all(
          run.state.is_completed() for run in flow_runs
      )
      assert (
          end_times := [run.end_time for run in flow_runs]
      ) == sorted(end_times, reverse=True)
  ```
</Accordion>

Instead of the last three from the whole workspace, you can also use the `DeploymentFilter`
to get the last three completed flow runs of a specific deployment.

### Transition all running flows to cancelled through the Client

Use `get_client`to set multiple runs to a `Cancelled` state.
This example cancels all flow runs that are in `Pending`, `Running`, `Scheduled`, or `Late` states when the script is run.

```python  theme={null}
async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]:
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=states)
                )
            )
        )

async def cancel_flow_runs(flow_runs: list[FlowRun]):
    async with get_client() as client:
        for flow_run in flow_runs:
            state = flow_run.state.copy(
                update={"name": "Cancelled", "type": StateType.CANCELLED}
            )
            await client.set_flow_run_state(flow_run.id, state, force=True)
```

Cancel all pending, running, scheduled or late flows:

```python  theme={null}
async def bulk_cancel_flow_runs():
    states = ["Pending", "Running", "Scheduled", "Late"]
    flow_runs = await list_flow_runs_with_states(states)

    while flow_runs:
        print(f"Cancelling {len(flow_runs)} flow runs")
        await cancel_flow_runs(flow_runs)
        flow_runs = await list_flow_runs_with_states(states)

asyncio.run(bulk_cancel_flow_runs())
```

<Accordion title="View the complete example">
  ```python cancel_flows.py theme={null}
  import asyncio

  from prefect import get_client
  from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
  from prefect.client.schemas.objects import FlowRun, StateType

  async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]:
      async with get_client() as client:
          return await client.read_flow_runs(
              flow_run_filter=FlowRunFilter(
                  state=FlowRunFilterState(
                      name=FlowRunFilterStateName(any_=states)
                  )
              )
          )


  async def cancel_flow_runs(flow_runs: list[FlowRun]):
      async with get_client() as client:
          for idx, flow_run in enumerate(flow_runs):
              print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
              state_updates: dict[str, str] = {}
              state_updates.setdefault("name", "Cancelled")
              state_updates.setdefault("type", StateType.CANCELLED)
              state = flow_run.state.copy(update=state_updates)
              await client.set_flow_run_state(flow_run.id, state, force=True)


  async def bulk_cancel_flow_runs():
      states = ["Pending", "Running", "Scheduled", "Late"]
      flow_runs = await list_flow_runs_with_states(states)

      while len(flow_runs) > 0:
          print(f"Cancelling {len(flow_runs)} flow runs\n")
          await cancel_flow_runs(flow_runs)
          flow_runs = await list_flow_runs_with_states(states)
      print("Done!")


  if __name__ == "__main__":
      asyncio.run(bulk_cancel_flow_runs())
  ```
</Accordion>

### Query events with pagination

Query historical events from the Prefect API with support for filtering and pagination. This is useful for analyzing past activity, debugging issues, or building custom monitoring tools.

The following example queries events from the last hour and demonstrates how to paginate through results:

```python  theme={null}
from datetime import datetime, timedelta, timezone

from prefect import get_client
from prefect.events.filters import EventFilter, EventOccurredFilter


async def query_recent_events():
    async with get_client() as client:
        # query events from the last hour
        now = datetime.now(timezone.utc)
        event_filter = EventFilter(
            occurred=EventOccurredFilter(
                since=now - timedelta(hours=1),
                until=now,
            )
        )

        # get first page
        page = await client.read_events(filter=event_filter, limit=10)
        print(f"Total events: {page.total}")

        # iterate through all pages
        while page:
            for event in page.events:
                print(f"{event.occurred} - {event.event}")
            page = await page.get_next_page(client)
```

<Accordion title="View the complete example">
  ```python query_events.py theme={null}
  import asyncio
  from datetime import datetime, timedelta, timezone

  from prefect import get_client
  from prefect.events.filters import EventFilter, EventOccurredFilter


  async def query_recent_events():
      async with get_client() as client:
          # query events from the last hour
          now = datetime.now(timezone.utc)
          event_filter = EventFilter(
              occurred=EventOccurredFilter(
                  since=now - timedelta(hours=1),
                  until=now,
              )
          )

          # get first page with small limit to demonstrate pagination
          print("=== first page ===")
          event_page = await client.read_events(filter=event_filter, limit=5)
          print(f"total events: {event_page.total}")
          print(f"events on this page: {len(event_page.events)}")
          for event in event_page.events:
              print(f"  {event.occurred} - {event.event}")
          print()

          # if there are more pages, fetch the next one
          second_page = await event_page.get_next_page(client)
          if second_page:
              print("=== second page ===")
              print(f"events on this page: {len(second_page.events)}")
              for event in second_page.events:
                  print(f"  {event.occurred} - {event.event}")
              print()

          # demonstrate iterating through all pages
          print("=== collecting all events ===")
          all_events = []
          page = await client.read_events(filter=event_filter, limit=5)

          page_count = 0
          while page:
              all_events.extend(page.events)
              page_count += 1
              page = await page.get_next_page(client)

          print(f"collected {len(all_events)} events across {page_count} pages")


  if __name__ == "__main__":
      asyncio.run(query_recent_events())
  ```
</Accordion>

### Create, read, or delete artifacts

Create, read, or delete artifacts programmatically through the [Prefect REST API](/v3/api-ref/rest-api/).
With the Artifacts API, you can automate the creation and management of artifacts as part of your workflow.

For example, to read the five most recently created Markdown, table, and link artifacts, you can run the following:

```python fixture:mock_post_200 theme={null}
import requests


PREFECT_API_URL="https://api.prefect.cloud/api/accounts/abc/workspaces/xyz"
PREFECT_API_KEY="pnu_ghijk"
data = {
    "sort": "CREATED_DESC",
    "limit": 5,
    "artifacts": {
        "key": {
            "exists_": True
        }
    }
}

headers = {"Authorization": f"Bearer {PREFECT_API_KEY}"}
endpoint = f"{PREFECT_API_URL}/artifacts/filter"

response = requests.post(endpoint, headers=headers, json=data)
assert response.status_code == 200
for artifact in response.json():
    print(artifact)
```

If you don't specify a key or that a key must exist, you will also return results, which are a type of key-less artifact.

See the [Prefect REST API documentation](/v3/api-ref/rest-api/) on artifacts for more information.


Built with [Mintlify](https://mintlify.com).