> ## Documentation Index
> Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
> Use this file to discover all available pages before exploring further.

# flow_engine

# `prefect.flow_engine`

## Functions

### `load_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L222" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
load_flow_run(flow_run_id: UUID) -> FlowRun
```

### `load_flow` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L235" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
load_flow(flow_run: FlowRun) -> Flow[..., Any]
```

### `load_flow_and_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L247" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
load_flow_and_flow_run(flow_run_id: UUID) -> tuple[FlowRun, Flow[..., Any]]
```

### `send_heartbeats_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L379" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
send_heartbeats_sync(engine: 'FlowRunEngine[Any, Any]') -> Generator[None, None, None]
```

### `send_heartbeats_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L391" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
send_heartbeats_async(engine: 'AsyncFlowRunEngine[Any, Any]') -> AsyncGenerator[None, None]
```

### `run_flow_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1920" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
run_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]
```

### `run_flow_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1944" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
run_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]
```

### `run_generator_flow_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1968" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
run_generator_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Generator[R, None, None]
```

### `run_generator_flow_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L2009" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
run_generator_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> AsyncGenerator[R, None]
```

### `run_flow` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L2052" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
run_flow(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', error_logger: Optional[logging.Logger] = None, context: Optional[dict[str, Any]] = None) -> R | State | None | Coroutine[Any, Any, R | State | None] | Generator[R, None, None] | AsyncGenerator[R, None]
```

### `run_flow_in_subprocess` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L2141" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
run_flow_in_subprocess(flow: 'Flow[..., Any]', flow_run: 'FlowRun | None' = None, parameters: dict[str, Any] | None = None, wait_for: Iterable[PrefectFuture[Any]] | None = None, context: dict[str, Any] | None = None, env: dict[str, str | None] | None = None) -> multiprocessing.context.SpawnProcess
```

Run a flow in a subprocess.

Note the result of the flow will only be accessible if the flow is configured to
persist its result.

**Args:**

* `flow`: The flow to run.
* `flow_run`: The flow run object containing run metadata.
* `parameters`: The parameters to use when invoking the flow.
* `wait_for`: The futures to wait for before starting the flow.
* `context`: A serialized context to hydrate before running the flow. If not provided,
  the current context will be used. A serialized context should be provided if
  this function is called in a separate memory space from the parent run (e.g.
  in a subprocess or on another machine).
* `env`: Additional environment variables to set in the subprocess.

**Returns:**

* A multiprocessing.context.SpawnProcess representing the process that is running the flow.

## Classes

### `FlowRunTimeoutError` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L218" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when a flow run exceeds its defined timeout.

### `BaseFlowRunEngine` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L399" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

**Methods:**

#### `cancel_all_tasks` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L446" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
cancel_all_tasks(self) -> None
```

#### `heartbeat_seconds` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L439" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
heartbeat_seconds(self) -> Optional[int]
```

Get the heartbeat interval from settings.

#### `is_pending` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L433" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
is_pending(self) -> bool
```

#### `is_running` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L428" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
is_running(self) -> bool
```

#### `state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L425" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
state(self) -> State
```

### `FlowRunEngine` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L576" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

**Methods:**

#### `begin_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L626" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
begin_run(self) -> State
```

#### `call_flow_fn` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1225" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
call_flow_fn(self) -> Union[R, Coroutine[Any, Any, R]]
```

Convenience method to call the flow function. Returns a coroutine if the
flow is async.

#### `call_hooks` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L935" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
call_hooks(self, state: Optional[State] = None) -> None
```

#### `client` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L582" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
client(self) -> SyncPrefectClient
```

#### `create_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L900" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
create_flow_run(self, client: SyncPrefectClient) -> FlowRun
```

#### `handle_cancellation` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L830" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_cancellation(self, exc: BaseException) -> None
```

Force this run through Cancelling -> Cancelled.

Used when `capture_sigterm` has determined (via the cancellation
listener) that the SIGTERM was the runner asking for cancellation,
not an unrelated termination. Transitioning through Cancelling first
The same ownership rule also governs `on_cancellation` / `on_crashed`
hooks: if the engine owns cancellation/crash handling, it must
drive the `Cancelling -> Cancelled` transitions locally; if an
external runner owns them, the child should avoid duplicating
state history.

#### `handle_crash` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L821" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_crash(self, exc: BaseException) -> None
```

#### `handle_exception` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L763" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State
```

#### `handle_success` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L735" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_success(self, result: R) -> R
```

#### `handle_timeout` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L796" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_timeout(self, exc: TimeoutError) -> None
```

#### `initialize_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1086" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
initialize_run(self)
```

Enters a client context and creates a flow run if needed.

#### `load_subflow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L851" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
load_subflow_run(self, parent_task_run: TaskRun, client: SyncPrefectClient, context: FlowRunContext) -> Union[FlowRun, None]
```

This method attempts to load an existing flow run for a subflow task
run, if appropriate.

If the parent task run is in a final but not COMPLETED state, and not
being rerun, then we attempt to load an existing flow run instead of
creating a new one. This will prevent the engine from running the
subflow again.

If no existing flow run is found, or if the subflow should be rerun,
then no flow run is returned.

#### `result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L709" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'
```

#### `run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1205" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
run_context(self)
```

#### `set_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L691" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
set_state(self, state: State, force: bool = False) -> State
```

#### `setup_run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L989" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
setup_run_context(self, client: Optional[SyncPrefectClient] = None)
```

#### `start` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1193" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
start(self) -> Generator[None, None, None]
```

### `AsyncFlowRunEngine` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1243" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Async version of the flow run engine.

NOTE: This has not been fully asyncified yet which may lead to async flows
not being fully asyncified.

**Methods:**

#### `begin_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1300" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
begin_run(self) -> State
```

#### `call_flow_fn` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1908" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
call_flow_fn(self) -> Coroutine[Any, Any, R]
```

Convenience method to call the flow function. Returns a coroutine if the
flow is async.

#### `call_hooks` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1602" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
call_hooks(self, state: Optional[State] = None) -> None
```

#### `client` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1256" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
client(self) -> PrefectClient
```

#### `create_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1569" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
create_flow_run(self, client: PrefectClient) -> FlowRun
```

#### `handle_cancellation` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1503" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_cancellation(self, exc: BaseException) -> None
```

Force this run through Cancelling -> Cancelled.

Async counterpart to `FlowRunEngine.handle_cancellation`. Shielded
from asyncio cancellation so engine-owned transitions always reach
the server.

#### `handle_crash` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1490" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_crash(self, exc: BaseException) -> None
```

#### `handle_exception` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1433" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State
```

#### `handle_success` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1408" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_success(self, result: R) -> R
```

#### `handle_timeout` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1464" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
handle_timeout(self, exc: TimeoutError) -> None
```

#### `initialize_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1751" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
initialize_run(self)
```

Enters a client context and creates a flow run if needed.

#### `load_subflow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1520" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
load_subflow_run(self, parent_task_run: TaskRun, client: PrefectClient, context: FlowRunContext) -> Union[FlowRun, None]
```

This method attempts to load an existing flow run for a subflow task
run, if appropriate.

If the parent task run is in a final but not COMPLETED state, and not
being rerun, then we attempt to load an existing flow run instead of
creating a new one. This will prevent the engine from running the
subflow again.

If no existing flow run is found, or if the subflow should be rerun,
then no flow run is returned.

#### `result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1383" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'
```

#### `run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1888" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
run_context(self)
```

#### `set_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1365" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
set_state(self, state: State, force: bool = False) -> State
```

#### `setup_run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1656" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
setup_run_context(self, client: Optional[PrefectClient] = None)
```

#### `start` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1876" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
start(self) -> AsyncGenerator[None, None]
```
