prefect.flow_runs

Functions

wait_for_flow_run

wait_for_flow_run(flow_run_id: UUID, timeout: int | None = 10800, poll_interval: int | None = None, client: 'PrefectClient | None' = None, log_states: bool = False) -> FlowRun
Waits for the prefect flow run to finish and returns the FlowRun Args:
  • flow_run_id: The flow run ID for the flow run to wait for.
  • timeout: The wait timeout in seconds. Defaults to 10800 (3 hours).
  • poll_interval: Deprecated; polling is no longer used to wait for flow runs.
  • client: Optional Prefect client. If not provided, one will be injected.
  • log_states: If True, log state changes. Defaults to False.
Returns:
  • The finished flow run.
Raises:
  • prefect.exceptions.FlowWaitTimeout: If flow run goes over the timeout.
Examples: Create a flow run for a deployment and wait for it to finish:
import asyncio

from prefect.client.orchestration import get_client
from prefect.flow_runs import wait_for_flow_run

async def main():
    async with get_client() as client:
        flow_run = await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
        flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
        print(flow_run.state)

if __name__ == "__main__":
    asyncio.run(main())

Trigger multiple flow runs and wait for them to finish:
import asyncio

from prefect.client.orchestration import get_client
from prefect.flow_runs import wait_for_flow_run

async def main(num_runs: int):
    async with get_client() as client:
        flow_runs = [
            await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
            for _
            in range(num_runs)
        ]
        coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
        finished_flow_runs = await asyncio.gather(*coros)
        print([flow_run.state for flow_run in finished_flow_runs])

if __name__ == "__main__":
    asyncio.run(main(num_runs=10))

pause_flow_run

pause_flow_run(wait_for_input: Type[T] | None = None, timeout: int = 3600, poll_interval: int = 10, key: str | None = None) -> T | None
Pauses the current flow run by blocking execution until resumed. When called within a flow run, execution will block and no downstream tasks will run until the flow is resumed. Task runs that have already started will continue running. A timeout parameter can be passed that will fail the flow run if it has not been resumed within the specified time. Args:
  • timeout: the number of seconds to wait for the flow to be resumed before failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds any configured flow-level timeout, the flow might fail even after resuming.
  • poll_interval: The number of seconds between checking whether the flow has been resumed. Defaults to 10 seconds.
  • key: An optional key to prevent calling pauses more than once. This defaults to the number of pauses observed by the flow so far, and prevents pauses that use the “reschedule” option from running the same pause twice. A custom key can be supplied for custom pausing behavior.
  • wait_for_input: a subclass of RunInput or any type supported by Pydantic. If provided when the flow pauses, the flow will wait for the input to be provided before resuming. If the flow is resumed without providing the input, the flow will fail. If the flow is resumed with the input, the flow will resume and the input will be loaded and returned from this function.
Example:
@task
def task_one():
    for i in range(3):
        sleep(1)

@flow
def my_flow():
    terminal_state = task_one.submit(return_state=True)
    if terminal_state.type == StateType.COMPLETED:
        print("Task one succeeded! Pausing flow run..")
        pause_flow_run(timeout=2)
    else:
        print("Task one failed. Skipping pause flow run..")

suspend_flow_run

suspend_flow_run(wait_for_input: Type[T] | None = None, flow_run_id: UUID | None = None, timeout: int | None = 3600, key: str | None = None, client: 'PrefectClient | None' = None) -> T | None
Suspends a flow run by stopping code execution until resumed. When suspended, the flow run will continue execution until the NEXT task is orchestrated, at which point the flow will exit. Any tasks that have already started will run until completion. When resumed, the flow run will be rescheduled to finish execution. In order suspend a flow run in this way, the flow needs to have an associated deployment and results need to be configured with the persist_result option. Args:
  • flow_run_id: a flow run id. If supplied, this function will attempt to suspend the specified flow run. If not supplied will attempt to suspend the current flow run.
  • timeout: the number of seconds to wait for the flow to be resumed before failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds any configured flow-level timeout, the flow might fail even after resuming.
  • key: An optional key to prevent calling suspend more than once. This defaults to a random string and prevents suspends from running the same suspend twice. A custom key can be supplied for custom suspending behavior.
  • wait_for_input: a subclass of RunInput or any type supported by Pydantic. If provided when the flow suspends, the flow will remain suspended until receiving the input before resuming. If the flow is resumed without providing the input, the flow will fail. If the flow is resumed with the input, the flow will resume and the input will be loaded and returned from this function.

resume_flow_run

resume_flow_run(flow_run_id: UUID, run_input: dict[str, Any] | None = None) -> None
Resumes a paused flow. Args:
  • flow_run_id: the flow_run_id to resume
  • run_input: a dictionary of inputs to provide to the flow run.