Documentation Index Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
Use this file to discover all available pages before exploring further.
prefect.server.api.flow_runs
Routes for interacting with flow run objects.
Functions
create_flow_run
create_flow_run(flow_run: schemas.actions.FlowRunCreate, db: PrefectDBInterface = Depends(provide_database_interface), response: Response = None , created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), orchestration_parameters: Dict[ str , Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), worker_lookups: WorkerLookups = Depends(WorkerLookups)) -> schemas.responses.FlowRunResponse
Create a flow run. If a flow run with the same flow_id and
idempotency key already exists, the existing flow run will be returned.
If no state is provided, the flow run will be created in a PENDING state.
For more information, see https://docs.prefect.io/v3/concepts/flows .
update_flow_run
update_flow_run(flow_run: schemas.actions.FlowRunUpdate, flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Updates a flow run.
count_flow_runs
count_flow_runs(flows: Optional[schemas.filters.FlowFilter] = None , flow_runs: Optional[schemas.filters.FlowRunFilter] = None , task_runs: Optional[schemas.filters.TaskRunFilter] = None , deployments: Optional[schemas.filters.DeploymentFilter] = None , work_pools: Optional[schemas.filters.WorkPoolFilter] = None , work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None , db: PrefectDBInterface = Depends(provide_database_interface)) -> int
Query for flow runs.
average_flow_run_lateness
average_flow_run_lateness(flows: Optional[schemas.filters.FlowFilter] = None , flow_runs: Optional[schemas.filters.FlowRunFilter] = None , task_runs: Optional[schemas.filters.TaskRunFilter] = None , deployments: Optional[schemas.filters.DeploymentFilter] = None , work_pools: Optional[schemas.filters.WorkPoolFilter] = None , work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None , db: PrefectDBInterface = Depends(provide_database_interface)) -> Optional[ float ]
Query for average flow-run lateness in seconds.
flow_run_history
flow_run_history(history_start: DateTime = Body( ... , description = "The history's start time." ), history_end: DateTime = Body( ... , description = "The history's end time." ), history_interval_seconds: float = Body( ... , description = 'The size of each history interval, in seconds. Must be at least 1 second.' , json_schema_extra = { 'format' : 'time-delta' }), flows: Optional[schemas.filters.FlowFilter] = None , flow_runs: Optional[schemas.filters.FlowRunFilter] = None , task_runs: Optional[schemas.filters.TaskRunFilter] = None , deployments: Optional[schemas.filters.DeploymentFilter] = None , work_pools: Optional[schemas.filters.WorkPoolFilter] = None , work_queues: Optional[schemas.filters.WorkQueueFilter] = None , db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.HistoryResponse]
Query for flow run history data across a given range and interval.
read_flow_run
read_flow_run(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> schemas.responses.FlowRunResponse
Get a flow run by id.
read_flow_run_graph_v1
read_flow_run_graph_v1(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[DependencyResult]
Get a task run dependency map for a given flow run.
read_flow_run_graph_v2
read_flow_run_graph_v2(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), since: datetime.datetime = Query( default = jsonable_encoder(earliest_possible_datetime()), description = 'Only include runs that start or end after this time.' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> Graph
Get a graph of the tasks and subflow runs for the given flow run
resume_flow_run
resume_flow_run(response: Response, flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), db: PrefectDBInterface = Depends(provide_database_interface), run_input: Optional[dict[ str , Any]] = Body( default = None , embed = True ), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), task_policy: type[TaskRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_task_policy), orchestration_parameters: Dict[ str , Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[ str ] = Depends(dependencies.get_prefect_client_version)) -> OrchestrationResult
Resume a paused flow run.
read_flow_runs
read_flow_runs(sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort. ID_DESC ), limit: int = dependencies.LimitBody(), offset: int = Body( 0 , ge = 0 ), flows: Optional[schemas.filters.FlowFilter] = None , flow_runs: Optional[schemas.filters.FlowRunFilter] = None , task_runs: Optional[schemas.filters.TaskRunFilter] = None , deployments: Optional[schemas.filters.DeploymentFilter] = None , work_pools: Optional[schemas.filters.WorkPoolFilter] = None , work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None , db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.FlowRunResponse]
Query for flow runs.
delete_flow_run
delete_flow_run(docket: dependencies.Docket, flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Delete a flow run by id.
delete_flow_run_logs
delete_flow_run_logs() -> None
bulk_delete_flow_runs
bulk_delete_flow_runs(docket: dependencies.Docket, flow_runs: Optional[schemas.filters.FlowRunFilter] = Body( None , description = 'Filter criteria for flow runs to delete' ), limit: int = Body( BULK_OPERATION_LIMIT , ge = 1 , le = BULK_OPERATION_LIMIT , description = f 'Maximum number of flow runs to delete. Defaults to { BULK_OPERATION_LIMIT } .' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> FlowRunBulkDeleteResponse
Bulk delete flow runs matching the specified filter criteria.
Returns the IDs of flow runs that were deleted.
bulk_set_flow_run_state
bulk_set_flow_run_state(flow_runs: Optional[schemas.filters.FlowRunFilter] = Body(None, description='Filter criteria for flow runs to update'), state: schemas.actions.StateCreate = Body(..., description='The state to set'), force: bool = Body(False, description='If false, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.'), limit: int = Body(BULK_OPERATION_LIMIT, ge=1, le=BULK_OPERATION_LIMIT, description=f'Maximum number of flow runs to update. Defaults to {BULK_OPERATION_LIMIT}.'), db: PrefectDBInterface = Depends(provide_database_interface), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[str] = Depends(dependencies.get_prefect_client_version)) -> FlowRunBulkSetStateResponse
Bulk set state for flow runs matching the specified filter criteria.
Returns the orchestration results for each flow run.
set_flow_run_state
set_flow_run_state(response: Response, flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), state: schemas.actions.StateCreate = Body( ... , description = 'The intended state.' ), force: bool = Body( False , description = 'If false, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.' ), db: PrefectDBInterface = Depends(provide_database_interface), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), orchestration_parameters: Dict[ str , Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[ str ] = Depends(dependencies.get_prefect_client_version)) -> OrchestrationResult
Set a flow run state, invoking any orchestration rules.
create_flow_run_input(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), key: str = Body( ... , description = 'The input key' ), value: bytes = Body( ... , description = 'The value of the input' ), sender: Optional[ str ] = Body( None , description = 'The sender of the input' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Create a key/value input for a flow run.
filter_flow_run_input(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), prefix: str = Body( ... , description = 'The input key prefix' , embed = True ), limit: int = Body( 1 , description = 'The maximum number of results to return' , embed = True ), exclude_keys: List[ str ] = Body([], description = 'Exclude inputs with these keys' , embed = True ), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.core.FlowRunInput]
Filter flow run inputs by key prefix
read_flow_run_input(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), key: str = Path( ... , description = 'The input key' , alias = 'key' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> PlainTextResponse
Create a value from a flow run input
delete_flow_run_input(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), key: str = Path( ... , description = 'The input key' , alias = 'key' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Delete a flow run input
paginate_flow_runs
paginate_flow_runs(sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort. ID_DESC ), limit: int = dependencies.LimitBody(), page: int = Body( 1 , ge = 1 ), flows: Optional[schemas.filters.FlowFilter] = None , flow_runs: Optional[schemas.filters.FlowRunFilter] = None , task_runs: Optional[schemas.filters.TaskRunFilter] = None , deployments: Optional[schemas.filters.DeploymentFilter] = None , work_pools: Optional[schemas.filters.WorkPoolFilter] = None , work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None , db: PrefectDBInterface = Depends(provide_database_interface)) -> FlowRunPaginationResponse
Pagination query for flow runs.
download_logs
download_logs(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> StreamingResponse
Download all flow run logs as a CSV file, collecting all logs until there are no more logs to retrieve.
update_flow_run_labels
update_flow_run_labels(flow_run_id: UUID = Path( ... , description = 'The flow run id' , alias = 'id' ), labels: Dict[ str , Any] = Body( ... , description = 'The labels to update' ), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Update the labels of a flow run.