prefect.server.api.flow_runs

Routes for interacting with flow run objects.

Functions

create_flow_run

create_flow_run(flow_run: schemas.actions.FlowRunCreate, db: PrefectDBInterface = Depends(provide_database_interface), response: Response = None, created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), worker_lookups: WorkerLookups = Depends(WorkerLookups)) -> schemas.responses.FlowRunResponse
Create a flow run. If a flow run with the same flow_id and idempotency key already exists, the existing flow run will be returned. If no state is provided, the flow run will be created in a PENDING state. For more information, see https://docs.prefect.io/v3/develop/write-flows.

update_flow_run

update_flow_run(flow_run: schemas.actions.FlowRunUpdate, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Updates a flow run.

count_flow_runs

count_flow_runs(flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> int
Query for flow runs.

average_flow_run_lateness

average_flow_run_lateness(flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> Optional[float]
Query for average flow-run lateness in seconds.

flow_run_history

flow_run_history(history_start: DateTime = Body(..., description="The history's start time."), history_end: DateTime = Body(..., description="The history's end time."), history_interval: float = Body(..., description='The size of each history interval, in seconds. Must be at least 1 second.', json_schema_extra={'format': 'time-delta'}, alias='history_interval_seconds'), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.HistoryResponse]
Query for flow run history data across a given range and interval.

read_flow_run

read_flow_run(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> schemas.responses.FlowRunResponse
Get a flow run by id.

read_flow_run_graph_v1

read_flow_run_graph_v1(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[DependencyResult]
Get a task run dependency map for a given flow run.

read_flow_run_graph_v2

read_flow_run_graph_v2(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), since: datetime.datetime = Query(default=jsonable_encoder(earliest_possible_datetime()), description='Only include runs that start or end after this time.'), db: PrefectDBInterface = Depends(provide_database_interface)) -> Graph
Get a graph of the tasks and subflow runs for the given flow run

resume_flow_run

resume_flow_run(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface), run_input: Optional[dict[str, Any]] = Body(default=None, embed=True), response: Response = None, flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), task_policy: type[TaskRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_task_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version)) -> OrchestrationResult
Resume a paused flow run.

read_flow_runs

read_flow_runs(sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC), limit: int = dependencies.LimitBody(), offset: int = Body(0, ge=0), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.FlowRunResponse]
Query for flow runs.

delete_flow_run

delete_flow_run(background_tasks: BackgroundTasks, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Delete a flow run by id.

delete_flow_run_logs

delete_flow_run_logs(db: PrefectDBInterface, flow_run_id: UUID) -> None

set_flow_run_state

set_flow_run_state(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), state: schemas.actions.StateCreate = Body(..., description='The intended state.'), force: bool = Body(False, description='If false, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.'), db: PrefectDBInterface = Depends(provide_database_interface), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), response: Response = None, api_version: str = Depends(dependencies.provide_request_api_version)) -> OrchestrationResult
Set a flow run state, invoking any orchestration rules.

create_flow_run_input

create_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Body(..., description='The input key'), value: bytes = Body(..., description='The value of the input'), sender: Optional[str] = Body(None, description='The sender of the input'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Create a key/value input for a flow run.

filter_flow_run_input

filter_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), prefix: str = Body(..., description='The input key prefix', embed=True), limit: int = Body(1, description='The maximum number of results to return', embed=True), exclude_keys: List[str] = Body([], description='Exclude inputs with these keys', embed=True), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.core.FlowRunInput]
Filter flow run inputs by key prefix

read_flow_run_input

read_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Path(..., description='The input key', alias='key'), db: PrefectDBInterface = Depends(provide_database_interface)) -> PlainTextResponse
Create a value from a flow run input

delete_flow_run_input

delete_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Path(..., description='The input key', alias='key'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Delete a flow run input

paginate_flow_runs

paginate_flow_runs(sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC), limit: int = dependencies.LimitBody(), page: int = Body(1, ge=1), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> FlowRunPaginationResponse
Pagination query for flow runs.

download_logs

download_logs(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> StreamingResponse
Download all flow run logs as a CSV file, collecting all logs until there are no more logs to retrieve.

update_flow_run_labels

update_flow_run_labels(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), labels: Dict[str, Any] = Body(..., description='The labels to update'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Update the labels of a flow run.