Skip to content

prefect.engine

Client-side execution and orchestration of flows and tasks.

Engine process overview

  • The flow or task is called by the user. See Flow.__call__, Task.__call__

  • A synchronous engine function acts as an entrypoint to the async engine. See enter_flow_run_engine, enter_task_run_engine

  • The async engine creates a run via the API and prepares for execution of user-code. See begin_flow_run, begin_task_run

  • The run is orchestrated through states, calling the user's function as necessary. See orchestrate_flow_run, orchestrate_task_run

begin_flow_run async

Begins execution of a flow run; blocks until completion of the flow run

  • Starts a task runner
  • Determines the result storage block to use
  • Orchestrates the flow run (runs the user-function and generates tasks)
  • Waits for tasks to complete / shutsdown the task runner
  • Sets a terminal state for the flow run

Note that the flow_run contains a parameters attribute which is the serialized parameters sent to the backend while the parameters argument here should be the deserialized and validated dictionary of python objects.

Returns:

Type Description
State

The final state of the run

Source code in prefect/engine.py
async def begin_flow_run(
    flow: Flow,
    flow_run: FlowRun,
    parameters: Dict[str, Any],
    client: OrionClient,
) -> State:
    """
    Begins execution of a flow run; blocks until completion of the flow run

    - Starts a task runner
    - Determines the result storage block to use
    - Orchestrates the flow run (runs the user-function and generates tasks)
    - Waits for tasks to complete / shutsdown the task runner
    - Sets a terminal state for the flow run

    Note that the `flow_run` contains a `parameters` attribute which is the serialized
    parameters sent to the backend while the `parameters` argument here should be the
    deserialized and validated dictionary of python objects.

    Returns:
        The final state of the run
    """
    logger = flow_run_logger(flow_run, flow)

    flow_run_context = PartialModel(FlowRunContext)

    async with AsyncExitStack() as stack:

        await stack.enter_async_context(
            report_flow_run_crashes(flow_run=flow_run, client=client)
        )

        # If the flow is async, we need to provide a portal so sync tasks can run
        flow_run_context.sync_portal = (
            stack.enter_context(start_blocking_portal()) if flow.isasync else None
        )

        logger.debug(
            f"Starting {type(flow.task_runner).__name__!r}; submitted tasks "
            f"will be run {CONCURRENCY_MESSAGES[flow.task_runner.concurrency_type]}..."
        )
        flow_run_context.task_runner = await stack.enter_async_context(
            flow.task_runner.start()
        )

        result_filesystem = get_default_result_filesystem()
        await result_filesystem._save(is_anonymous=True)
        flow_run_context.result_filesystem = result_filesystem

        terminal_state = await orchestrate_flow_run(
            flow,
            flow_run=flow_run,
            parameters=parameters,
            client=client,
            partial_flow_run_context=flow_run_context,
            # Orchestration needs to be interruptible if it has a timeout
            interruptible=flow.timeout_seconds is not None,
        )

    # If debugging, use the more complete `repr` than the usual `str` description
    display_state = repr(terminal_state) if PREFECT_DEBUG_MODE else str(terminal_state)

    logger.log(
        level=logging.INFO if terminal_state.is_completed() else logging.ERROR,
        msg=f"Finished in state {display_state}",
        extra={"send_to_orion": False},
    )

    # When a "root" flow run finishes, flush logs so we do not have to rely on handling
    # during interpreter shutdown
    OrionHandler.flush(block=True)

    return terminal_state

begin_task_map async

Async entrypoint for task mapping

Source code in prefect/engine.py
async def begin_task_map(
    task: Task,
    flow_run_context: FlowRunContext,
    parameters: Dict[str, Any],
    wait_for: Optional[Iterable[PrefectFuture]],
    return_type: EngineReturnType,
    task_runner: Optional[BaseTaskRunner],
) -> List[Union[PrefectFuture, Awaitable[PrefectFuture]]]:
    """Async entrypoint for task mapping"""

    # Resolve any futures / states that are in the parameters as we need to
    # validate the lengths of those values before proceeding.
    parameters.update(await resolve_inputs(parameters))
    parameter_lengths = {
        key: len(val)
        for key, val in parameters.items()
        if not isinstance(val, unmapped)
    }

    lengths = set(parameter_lengths.values())
    if len(lengths) > 1:
        raise MappingLengthMismatch(
            "Received parameters with different lengths. Parameters for map "
            f"must all be the same length. Got lengths: {parameter_lengths}"
        )

    map_length = list(lengths)[0] if lengths else 1

    task_runs = []
    for i in range(map_length):
        call_parameters = {key: value[i] for key, value in parameters.items()}
        task_runs.append(
            partial(
                create_task_run_then_submit,
                task=task,
                flow_run_context=flow_run_context,
                parameters=call_parameters,
                wait_for=wait_for,
                return_type=return_type,
                task_runner=task_runner,
            )
        )

    return await gather(*task_runs)

begin_task_run async

Entrypoint for task run execution.

This function is intended for submission to the task runner.

This method may be called from a worker so we ensure the settings context has been entered. For example, with a runner that is executing tasks in the same event loop, we will likely not enter the context again because the current context already matches:

main thread: --> Flow called with settings A --> begin_task_run executes same event loop --> Profile A matches and is not entered again

However, with execution on a remote environment, we are going to need to ensure the settings for the task run are respected by entering the context:

main thread: --> Flow called with settings A --> begin_task_run is scheduled on a remote worker, settings A is serialized remote worker: --> Remote worker imports Prefect (may not occur) --> Global settings is loaded with default settings --> begin_task_run executes on a different event loop than the flow --> Current settings is not set or does not match, settings A is entered

Source code in prefect/engine.py
async def begin_task_run(
    task: Task,
    task_run: TaskRun,
    parameters: Dict[str, Any],
    wait_for: Optional[Iterable[PrefectFuture]],
    result_filesystem: WritableFileSystem,
    settings: prefect.context.SettingsContext,
):
    """
    Entrypoint for task run execution.

    This function is intended for submission to the task runner.

    This method may be called from a worker so we ensure the settings context has been
    entered. For example, with a runner that is executing tasks in the same event loop,
    we will likely not enter the context again because the current context already
    matches:

    main thread:
    --> Flow called with settings A
    --> `begin_task_run` executes same event loop
    --> Profile A matches and is not entered again

    However, with execution on a remote environment, we are going to need to ensure the
    settings for the task run are respected by entering the context:

    main thread:
    --> Flow called with settings A
    --> `begin_task_run` is scheduled on a remote worker, settings A is serialized
    remote worker:
    --> Remote worker imports Prefect (may not occur)
    --> Global settings is loaded with default settings
    --> `begin_task_run` executes on a different event loop than the flow
    --> Current settings is not set or does not match, settings A is entered
    """
    flow_run_context = prefect.context.FlowRunContext.get()

    async with AsyncExitStack() as stack:

        # The settings context may be null on a remote worker so we use the safe `.get`
        # method and compare it to the settings required for this task run
        if prefect.context.SettingsContext.get() != settings:
            stack.enter_context(settings)
            setup_logging()

        if flow_run_context:
            # Accessible if on a worker that is running in the same thread as the flow
            client = flow_run_context.client
            # Only run the task in an interruptible thread if it in the same thread as
            # the flow _and_ the flow run has a timeout attached. If the task is on a
            # worker, the flow run timeout will not be raised in the worker process.
            interruptible = flow_run_context.timeout_scope is not None
        else:
            # Otherwise, retrieve a new client
            client = await stack.enter_async_context(get_client())
            interruptible = False

        connect_error = await client.api_healthcheck()
        if connect_error:
            raise RuntimeError(
                f"Cannot orchestrate task run '{task_run.id}'. "
                f"Failed to connect to API at {client.api_url}."
            ) from connect_error

        try:
            return await orchestrate_task_run(
                task=task,
                task_run=task_run,
                parameters=parameters,
                wait_for=wait_for,
                result_filesystem=result_filesystem,
                interruptible=interruptible,
                client=client,
            )
        except Abort:
            # Task run already completed, just fetch its state
            task_run = await client.read_task_run(task_run.id)
            get_run_logger(flow_run_context).debug(
                f"Task run '{task_run.id}' already finished. "
                f"Retrieving result for state {task_run.state!r}..."
            )
            # Hydrate the state data
            task_run.state.data._cache_data(await _retrieve_result(task_run.state))
            return task_run.state

collect_task_run_inputs async

This function recurses through an expression to generate a set of any discernable task run inputs it finds in the data structure. It produces a set of all inputs found.

Examples:

>>> task_inputs = {
>>>    k: await collect_task_run_inputs(v) for k, v in parameters.items()
>>> }
Source code in prefect/engine.py
async def collect_task_run_inputs(
    expr: Any,
) -> Set[Union[core.TaskRunResult, core.Parameter, core.Constant]]:
    """
    This function recurses through an expression to generate a set of any discernable
    task run inputs it finds in the data structure. It produces a set of all inputs
    found.

    Example:
        >>> task_inputs = {
        >>>    k: await collect_task_run_inputs(v) for k, v in parameters.items()
        >>> }
    """
    # TODO: This function needs to be updated to detect parameters and constants

    inputs = set()

    async def add_futures_and_states_to_inputs(obj):
        if isinstance(obj, PrefectFuture):
            inputs.add(core.TaskRunResult(id=obj.task_run.id))
        elif isinstance(obj, State):
            if obj.state_details.task_run_id:
                inputs.add(core.TaskRunResult(id=obj.state_details.task_run_id))
        else:
            state = get_state_for_result(obj)
            if state and state.state_details.task_run_id:
                inputs.add(core.TaskRunResult(id=state.state_details.task_run_id))

    await visit_collection(
        expr, visit_fn=add_futures_and_states_to_inputs, return_data=False
    )

    return inputs

create_and_begin_subflow_run async

Async entrypoint for flows calls within a flow run

Subflows differ from parent flows in that they - Resolve futures in passed parameters into values - Create a dummy task for representation in the parent flow - Retrieve default result storage from the parent flow rather than the server

Returns:

Type Description
Any

The final state of the run

Source code in prefect/engine.py
@inject_client
async def create_and_begin_subflow_run(
    flow: Flow,
    parameters: Dict[str, Any],
    return_type: EngineReturnType,
    client: OrionClient,
) -> Any:
    """
    Async entrypoint for flows calls within a flow run

    Subflows differ from parent flows in that they
    - Resolve futures in passed parameters into values
    - Create a dummy task for representation in the parent flow
    - Retrieve default result storage from the parent flow rather than the server

    Returns:
        The final state of the run
    """
    parent_flow_run_context = FlowRunContext.get()
    parent_logger = get_run_logger(parent_flow_run_context)

    parent_logger.debug(f"Resolving inputs to {flow.name!r}")
    task_inputs = {k: await collect_task_run_inputs(v) for k, v in parameters.items()}

    # Generate a task in the parent flow run to represent the result of the subflow run
    dummy_task = Task(name=flow.name, fn=flow.fn, version=flow.version)
    parent_task_run = await client.create_task_run(
        task=dummy_task,
        flow_run_id=parent_flow_run_context.flow_run.id,
        dynamic_key=_dynamic_key_for_task_run(parent_flow_run_context, dummy_task),
        task_inputs=task_inputs,
        state=Pending(),
    )

    # Resolve any task futures in the input
    parameters = await resolve_futures_to_data(parameters)

    if parent_task_run.state.is_final():

        # Retrieve the most recent flow run from the database
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                parent_task_run_id={"any_": [parent_task_run.id]}
            ),
            sort=FlowRunSort.EXPECTED_START_TIME_ASC,
        )
        flow_run = flow_runs[-1]

        # Hydrate the retrieved state
        flow_run.state.data._cache_data(await _retrieve_result(flow_run.state))

        # Set up variables required downstream
        terminal_state = flow_run.state
        logger = flow_run_logger(flow_run, flow)

    else:
        flow_run = await client.create_flow_run(
            flow,
            parameters=flow.serialize_parameters(parameters),
            parent_task_run_id=parent_task_run.id,
            state=parent_task_run.state,
            tags=TagsContext.get().current_tags,
        )

        parent_logger.info(
            f"Created subflow run {flow_run.name!r} for flow {flow.name!r}"
        )
        logger = flow_run_logger(flow_run, flow)

        if flow.should_validate_parameters:
            try:
                parameters = flow.validate_parameters(parameters)
            except Exception as exc:
                state = Failed(
                    message="Flow run received invalid parameters.",
                    data=DataDocument.encode("cloudpickle", exc),
                )
                await client.propose_state(
                    state=state,
                    flow_run_id=flow_run.id,
                )
                logger.error("Received invalid parameters", exc_info=True)
                return state

        async with AsyncExitStack() as stack:
            await stack.enter_async_context(
                report_flow_run_crashes(flow_run=flow_run, client=client)
            )
            task_runner = await stack.enter_async_context(flow.task_runner.start())

            terminal_state = await orchestrate_flow_run(
                flow,
                flow_run=flow_run,
                parameters=parameters,
                # If the parent flow run has a timeout, then this one needs to be
                # interruptible as well
                interruptible=parent_flow_run_context.timeout_scope is not None,
                client=client,
                partial_flow_run_context=PartialModel(
                    FlowRunContext,
                    sync_portal=parent_flow_run_context.sync_portal,
                    result_filesystem=parent_flow_run_context.result_filesystem,
                    task_runner=task_runner,
                ),
            )

    # Display the full state (including the result) if debugging
    display_state = repr(terminal_state) if PREFECT_DEBUG_MODE else str(terminal_state)
    logger.log(
        level=logging.INFO if terminal_state.is_completed() else logging.ERROR,
        msg=f"Finished in state {display_state}",
        extra={"send_to_orion": False},
    )

    # Track the subflow state so the parent flow can use it to determine its final state
    parent_flow_run_context.flow_run_states.append(terminal_state)

    if return_type == "state":
        return terminal_state
    elif return_type == "result":
        return terminal_state.result()
    else:
        raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

create_then_begin_flow_run async

Async entrypoint for flow calls

Creates the flow run in the backend, then enters the main flow run engine.

Source code in prefect/engine.py
@inject_client
async def create_then_begin_flow_run(
    flow: Flow,
    parameters: Dict[str, Any],
    return_type: EngineReturnType,
    client: OrionClient,
) -> Any:
    """
    Async entrypoint for flow calls

    Creates the flow run in the backend, then enters the main flow run engine.
    """
    # TODO: Returns a `State` depending on `return_type` and we can add an overload to
    #       the function signature to clarify this eventually.

    connect_error = await client.api_healthcheck()
    if connect_error:
        raise RuntimeError(
            f"Cannot create flow run. Failed to reach API at {client.api_url}."
        ) from connect_error

    state = Pending()
    if flow.should_validate_parameters:
        try:
            parameters = flow.validate_parameters(parameters)
        except Exception as exc:
            state = Failed(
                message="Flow run received invalid parameters.",
                data=DataDocument.encode("cloudpickle", exc),
            )

    flow_run = await client.create_flow_run(
        flow,
        # Send serialized parameters to the backend
        parameters=flow.serialize_parameters(parameters),
        state=state,
        tags=TagsContext.get().current_tags,
    )

    engine_logger.info(f"Created flow run {flow_run.name!r} for flow {flow.name!r}")

    if state.is_failed():
        engine_logger.info(
            f"Flow run {flow_run.name!r} received invalid parameters and is marked as failed."
        )
    else:
        state = await begin_flow_run(
            flow=flow, flow_run=flow_run, parameters=parameters, client=client
        )

    if return_type == "state":
        return state
    elif return_type == "result":
        return state.result()
    else:
        raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

enter_flow_run_engine_from_flow_call

Sync entrypoint for flow calls.

This function does the heavy lifting of ensuring we can get into an async context for flow run execution with minimal overhead.

Source code in prefect/engine.py
def enter_flow_run_engine_from_flow_call(
    flow: Flow, parameters: Dict[str, Any], return_type: EngineReturnType
) -> Union[State, Awaitable[State]]:
    """
    Sync entrypoint for flow calls.

    This function does the heavy lifting of ensuring we can get into an async context
    for flow run execution with minimal overhead.
    """
    setup_logging()

    registry = PrefectObjectRegistry.get()
    if registry and registry.block_code_execution:
        engine_logger.warning(
            f"Script loading is in progress, flow {flow.name!r} will not be executed. "
            "Consider updating the script to only call the flow if executed directly:\n\n"
            f'\tif __name__ == "main":\n\t\t{flow.fn.__name__}()'
        )
        return None

    if TaskRunContext.get():
        raise RuntimeError(
            "Flows cannot be run from within tasks. Did you mean to call this "
            "flow in a flow?"
        )

    parent_flow_run_context = FlowRunContext.get()
    is_subflow_run = parent_flow_run_context is not None

    begin_run = partial(
        create_and_begin_subflow_run if is_subflow_run else create_then_begin_flow_run,
        flow=flow,
        parameters=parameters,
        return_type=return_type,
    )

    # Async flow run
    if flow.isasync:
        return begin_run()  # Return a coroutine for the user to await

    # Sync flow run
    if not is_subflow_run:
        if in_async_main_thread():
            # An event loop is already running and we must create a blocking portal to
            # run async code from this synchronous context
            with start_blocking_portal() as portal:
                return portal.call(begin_run)
        else:
            # An event loop is not running so we will create one
            return anyio.run(begin_run)

    # Sync subflow run
    if not parent_flow_run_context.flow.isasync:
        return run_async_from_worker_thread(begin_run)
    else:
        return parent_flow_run_context.sync_portal.call(begin_run)

enter_flow_run_engine_from_subprocess

Sync entrypoint for flow runs that have been submitted for execution by an agent

Differs from enter_flow_run_engine_from_flow_call in that we have a flow run id but not a flow object. The flow must be retrieved before execution can begin. Additionally, this assumes that the caller is always in a context without an event loop as this should be called from a fresh process.

Source code in prefect/engine.py
def enter_flow_run_engine_from_subprocess(flow_run_id: UUID) -> State:
    """
    Sync entrypoint for flow runs that have been submitted for execution by an agent

    Differs from `enter_flow_run_engine_from_flow_call` in that we have a flow run id
    but not a flow object. The flow must be retrieved before execution can begin.
    Additionally, this assumes that the caller is always in a context without an event
    loop as this should be called from a fresh process.
    """
    setup_logging()

    return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)

enter_task_run_engine

Sync entrypoint for task calls

Source code in prefect/engine.py
def enter_task_run_engine(
    task: Task,
    parameters: Dict[str, Any],
    wait_for: Optional[Iterable[PrefectFuture]],
    return_type: EngineReturnType,
    task_runner: Optional[BaseTaskRunner],
    mapped: bool,
) -> Union[PrefectFuture, Awaitable[PrefectFuture]]:
    """
    Sync entrypoint for task calls
    """

    flow_run_context = FlowRunContext.get()
    if not flow_run_context:
        raise RuntimeError(
            "Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use `task.fn()`."
        )

    if TaskRunContext.get():
        raise RuntimeError(
            "Tasks cannot be run from within tasks. Did you mean to call this "
            "task in a flow?"
        )

    if flow_run_context.timeout_scope and flow_run_context.timeout_scope.cancel_called:
        raise TimeoutError("Flow run timed out")

    begin_run = partial(
        begin_task_map if mapped else create_task_run_then_submit,
        task=task,
        flow_run_context=flow_run_context,
        parameters=parameters,
        wait_for=wait_for,
        return_type=return_type,
        task_runner=task_runner,
    )

    # Async task run in async flow run
    if task.isasync and flow_run_context.flow.isasync:
        return begin_run()  # Return a coroutine for the user to await

    # Async or sync task run in sync flow run
    elif not flow_run_context.flow.isasync:
        return run_async_from_worker_thread(begin_run)

    # Sync task run in async flow run
    else:
        # Call out to the sync portal since we are not in a worker thread
        return flow_run_context.sync_portal.call(begin_run)

get_default_result_filesystem

Generate a default file system for result storage.

Source code in prefect/engine.py
def get_default_result_filesystem() -> LocalFileSystem:
    """
    Generate a default file system for result storage.
    """
    return LocalFileSystem(basepath=PREFECT_LOCAL_STORAGE_PATH.value())

get_state_for_result

Get the state related to a result object.

link_state_to_result must have been called first.

Source code in prefect/engine.py
def get_state_for_result(obj: Any) -> Optional[State]:
    """
    Get the state related to a result object.

    `link_state_to_result` must have been called first.
    """
    flow_run_context = FlowRunContext.get()
    if flow_run_context:
        return flow_run_context.task_run_results.get(id(obj))

Caches a link between a state and a result using the id of the result to map to the state. The cache is persisted to the current flow run context since task relationships are limited to within a flow run.

This allows dependency tracking to occur when results are passed around.

We do not hash the result because:

  • If changes are made to the object in the flow between task calls, we can still track that they are related.
  • Hashing can be expensive.
  • Not all objects are hashable.

We do not set an attribute, e.g. __prefect_state__, on the result because:

  • Mutating user's objects is dangerous.
  • Unrelated equality comparisons can break unexpectedly.
  • The field can be preserved on copy.
  • We cannot set this attribute on Python built-ins.
Source code in prefect/engine.py
def link_state_to_result(state: State, result: Any) -> None:
    """
    Caches a link between a state and a result using the `id` of the result to map to
    the state. The cache is persisted to the current flow run context since task
    relationships are limited to within a flow run.

    This allows dependency tracking to occur when results are passed around.

    We do not hash the result because:

    - If changes are made to the object in the flow between task calls, we can still
      track that they are related.
    - Hashing can be expensive.
    - Not all objects are hashable.

    We do not set an attribute, e.g. `__prefect_state__`, on the result because:

    - Mutating user's objects is dangerous.
    - Unrelated equality comparisons can break unexpectedly.
    - The field can be preserved on copy.
    - We cannot set this attribute on Python built-ins.
    """
    # We cannot track some Python built-ins since they are singletons and could create
    # confusing relationships, e.g. `None`
    if type(result) in UNTRACKABLE_TYPES:
        return

    flow_run_context = FlowRunContext.get()
    if flow_run_context:
        flow_run_context.task_run_results[id(result)] = state

orchestrate_flow_run async

Executes a flow run.

Note on flow timeouts: Since async flows are run directly in the main event loop, timeout behavior will match that described by anyio. If the flow is awaiting something, it will immediately return; otherwise, the next time it awaits it will exit. Sync flows are being task runner in a worker thread, which cannot be interrupted. The worker thread will exit at the next task call. The worker thread also has access to the status of the cancellation scope at FlowRunContext.timeout_scope.cancel_called which allows it to raise a TimeoutError to respect the timeout.

Returns:

Type Description
State

The final state of the run

Source code in prefect/engine.py
async def orchestrate_flow_run(
    flow: Flow,
    flow_run: FlowRun,
    parameters: Dict[str, Any],
    interruptible: bool,
    client: OrionClient,
    partial_flow_run_context: PartialModel[FlowRunContext],
) -> State:
    """
    Executes a flow run.

    Note on flow timeouts:
        Since async flows are run directly in the main event loop, timeout behavior will
        match that described by anyio. If the flow is awaiting something, it will
        immediately return; otherwise, the next time it awaits it will exit. Sync flows
        are being task runner in a worker thread, which cannot be interrupted. The worker
        thread will exit at the next task call. The worker thread also has access to the
        status of the cancellation scope at `FlowRunContext.timeout_scope.cancel_called`
        which allows it to raise a `TimeoutError` to respect the timeout.

    Returns:
        The final state of the run
    """
    logger = flow_run_logger(flow_run, flow)

    timeout_context = (
        anyio.fail_after(flow.timeout_seconds)
        if flow.timeout_seconds
        else nullcontext()
    )
    flow_run_context = None

    state = await client.propose_state(Running(), flow_run_id=flow_run.id)

    while state.is_running():
        waited_for_task_runs = False

        # Update the flow run to the latest data
        flow_run = await client.read_flow_run(flow_run.id)

        try:
            with timeout_context as timeout_scope:
                with partial_flow_run_context.finalize(
                    flow=flow,
                    flow_run=flow_run,
                    client=client,
                    timeout_scope=timeout_scope,
                ) as flow_run_context:
                    args, kwargs = parameters_to_args_kwargs(flow.fn, parameters)
                    logger.debug(
                        f"Executing flow {flow.name!r} for flow run {flow_run.name!r}..."
                    )

                    if PREFECT_DEBUG_MODE:
                        logger.debug(f"Executing {call_repr(flow.fn, *args, **kwargs)}")
                    else:
                        logger.debug(
                            f"Beginning execution...", extra={"state_message": True}
                        )

                    flow_call = partial(flow.fn, *args, **kwargs)

                    if flow.isasync:
                        result = await flow_call()
                    else:
                        run_sync = (
                            run_sync_in_interruptible_worker_thread
                            if interruptible or timeout_scope
                            else run_sync_in_worker_thread
                        )
                        result = await run_sync(flow_call)

                waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
                    flow_run_context.task_run_futures, client=client
                )

        except TimeoutError as exc:
            # TODO: Cancel task runs if feasible
            terminal_state = Failed(
                name="TimedOut",
                message=f"Flow run exceeded timeout of {flow.timeout_seconds} seconds",
                data=DataDocument.encode("cloudpickle", exc),
            )
        except Exception as exc:
            logger.error(
                f"Encountered exception during execution:",
                exc_info=True,
            )
            terminal_state = Failed(
                message="Flow run encountered an exception.",
                data=DataDocument.encode("cloudpickle", exc),
            )
        else:
            if result is None:
                # All tasks and subflows are reference tasks if there is no return value
                # If there are no tasks, use `None` instead of an empty iterable
                result = (
                    flow_run_context.task_run_futures
                    + flow_run_context.task_run_states
                    + flow_run_context.flow_run_states
                ) or None

            terminal_state = await return_value_to_state(
                result, serializer="cloudpickle"
            )

        if not waited_for_task_runs:
            # An exception occured that prevented us from waiting for task runs to
            # complete. Ensure that we wait for them before proposing a final state
            # for the flow run.
            await wait_for_task_runs_and_report_crashes(
                flow_run_context.task_run_futures, client=client
            )

        # Before setting the flow run state, store state.data using
        # block storage and send the resulting data document to the Orion API instead.
        # This prevents the pickled return value of flow runs
        # from being sent to the Orion API and stored in the Orion database.
        # state.data is left as is, otherwise we would have to load
        # the data from block storage again after storing.
        state = await client.propose_state(
            state=terminal_state,
            flow_run_id=flow_run.id,
            backend_state_data=(
                await _persist_serialized_result(
                    terminal_state.data.json().encode(),
                    filesystem=flow_run_context.result_filesystem,
                )
                if terminal_state.data is not None and flow_run_context
                # if None is passed, state.data will be sent
                # to the Orion API and stored in the database
                else None
            ),
        )

        if state.type != terminal_state.type and PREFECT_DEBUG_MODE:
            logger.debug(
                f"Received new state {state} when proposing final state {terminal_state}",
                extra={"send_to_orion": False},
            )

        if not state.is_final():
            logger.info(
                f"Received non-final state {state.name!r} when proposing final state {terminal_state.name!r} and will attempt to run again...",
                extra={"send_to_orion": False},
            )
            # Attempt to enter a running state again
            state = await client.propose_state(Running(), flow_run_id=flow_run.id)

    if state.data is not None and state.data.encoding == "result":
        state.data = DataDocument.parse_raw(
            await _retrieve_serialized_result(state.data)
        )

    return state

orchestrate_task_run async

Execute a task run

This function should be submitted to an task runner. We must construct the context here instead of receiving it already populated since we may be in a new environment.

Proposes a RUNNING state, then - if accepted, the task user function will be run - if rejected, the received state will be returned

When the user function is run, the result will be used to determine a final state - if an exception is encountered, it is trapped and stored in a FAILED state - otherwise, return_value_to_state is used to determine the state

If the final state is COMPLETED, we generate a cache key as specified by the task

The final state is then proposed - if accepted, this is the final state and will be returned - if rejected and a new final state is provided, it will be returned - if rejected and a non-final state is provided, we will attempt to enter a RUNNING state again

Returns:

Type Description
State

The final state of the run

Source code in prefect/engine.py
async def orchestrate_task_run(
    task: Task,
    task_run: TaskRun,
    parameters: Dict[str, Any],
    wait_for: Optional[Iterable[PrefectFuture]],
    result_filesystem: WritableFileSystem,
    interruptible: bool,
    client: OrionClient,
) -> State:
    """
    Execute a task run

    This function should be submitted to an task runner. We must construct the context
    here instead of receiving it already populated since we may be in a new environment.

    Proposes a RUNNING state, then
    - if accepted, the task user function will be run
    - if rejected, the received state will be returned

    When the user function is run, the result will be used to determine a final state
    - if an exception is encountered, it is trapped and stored in a FAILED state
    - otherwise, `return_value_to_state` is used to determine the state

    If the final state is COMPLETED, we generate a cache key as specified by the task

    The final state is then proposed
    - if accepted, this is the final state and will be returned
    - if rejected and a new final state is provided, it will be returned
    - if rejected and a non-final state is provided, we will attempt to enter a RUNNING
        state again

    Returns:
        The final state of the run
    """
    logger = task_run_logger(task_run, task=task)
    task_run_context = TaskRunContext(
        task_run=task_run,
        task=task,
        client=client,
        result_filesystem=result_filesystem,
    )

    try:
        # Resolve futures in parameters into data
        resolved_parameters = await resolve_inputs(parameters)
        # Resolve futures in any non-data dependencies to ensure they are ready
        await resolve_inputs(wait_for, return_data=False)
    except UpstreamTaskError as upstream_exc:
        return await client.propose_state(
            Pending(name="NotReady", message=str(upstream_exc)),
            task_run_id=task_run.id,
        )

    # Generate the cache key to attach to proposed states
    cache_key = (
        task.cache_key_fn(task_run_context, resolved_parameters)
        if task.cache_key_fn
        else None
    )

    # Transition from `PENDING` -> `RUNNING`
    state = await client.propose_state(
        Running(state_details=StateDetails(cache_key=cache_key)),
        task_run_id=task_run.id,
    )

    # Only run the task if we enter a `RUNNING` state
    while state.is_running():
        try:
            args, kwargs = parameters_to_args_kwargs(task.fn, resolved_parameters)

            if PREFECT_DEBUG_MODE.value():
                logger.debug(f"Executing {call_repr(task.fn, *args, **kwargs)}")
            else:
                logger.debug(f"Beginning execution...", extra={"state_message": True})

            with task_run_context:
                if task.isasync:
                    result = await task.fn(*args, **kwargs)
                else:
                    run_sync = (
                        run_sync_in_interruptible_worker_thread
                        if interruptible
                        else run_sync_in_worker_thread
                    )
                    result = await run_sync(task.fn, *args, **kwargs)

        except Exception as exc:
            logger.error(
                f"Encountered exception during execution:",
                exc_info=True,
            )
            terminal_state = Failed(
                message="Task run encountered an exception.",
                data=DataDocument.encode("cloudpickle", exc),
            )
        else:
            terminal_state = await return_value_to_state(
                result, serializer="cloudpickle"
            )

            # for COMPLETED tasks, add the cache key and expiration
            if terminal_state.is_completed():
                terminal_state.state_details.cache_expiration = (
                    (pendulum.now("utc") + task.cache_expiration)
                    if task.cache_expiration
                    else None
                )
                terminal_state.state_details.cache_key = cache_key

        # Before setting the terminal task run state, store state.data using
        # block storage and send the resulting data document to the Orion API instead.
        # This prevents the pickled return value of flow runs
        # from being sent to the Orion API and stored in the Orion database.
        # terminal_state.data is left as is, otherwise we would have to load
        # the data from block storage again after storing.
        state = await client.propose_state(
            terminal_state,
            task_run_id=task_run.id,
            backend_state_data=(
                await _persist_serialized_result(
                    terminal_state.data.json().encode(), filesystem=result_filesystem
                )
                if terminal_state.data is not None
                # if None is passed, terminal_state.data will be sent
                # to the Orion API and stored in the database
                else None
            ),
        )

        if state.type != terminal_state.type and PREFECT_DEBUG_MODE:
            logger.debug(
                f"Received new state {state} when proposing final state {terminal_state}",
                extra={"send_to_orion": False},
            )

        if not state.is_final():
            logger.info(
                f"Received non-final state {state.name!r} when proposing final state {terminal_state.name!r} and will attempt to run again...",
                extra={"send_to_orion": False},
            )
            # Attempt to enter a running state again
            state = await client.propose_state(Running(), task_run_id=task_run.id)

    # If debugging, use the more complete `repr` than the usual `str` description
    display_state = repr(state) if PREFECT_DEBUG_MODE else str(state)

    logger.log(
        level=logging.INFO if state.is_completed() else logging.ERROR,
        msg=f"Finished in state {display_state}",
        extra={"send_to_orion": False},
    )

    if state.data is not None and state.data.encoding == "result":
        state.data = DataDocument.parse_raw(
            await _retrieve_serialized_result(state.data)
        )

    return state

report_flow_run_crashes

Detect flow run crashes during this context and update the run to a proper final state.

This context must reraise the exception to properly exit the run.

Source code in prefect/engine.py
@asynccontextmanager
async def report_flow_run_crashes(flow_run: FlowRun, client: OrionClient):
    """
    Detect flow run crashes during this context and update the run to a proper final
    state.

    This context _must_ reraise the exception to properly exit the run.
    """
    try:
        yield
    except BaseException as exc:
        state = exception_to_crashed_state(exc)
        logger = flow_run_logger(flow_run)
        with anyio.CancelScope(shield=True):
            logger.error(f"Crash detected! {state.message}")
            logger.debug("Crash details:", exc_info=exc)
            await client.set_flow_run_state(
                state=state,
                flow_run_id=flow_run.id,
                force=True,
            )
            engine_logger.debug(
                f"Reported crashed flow run {flow_run.name!r} successfully!"
            )

        # Reraise the exception
        raise exc from None

resolve_inputs async

Resolve any Quote, PrefectFuture, or State types nested in parameters into data.

Returns:

Type Description
Dict[str, Any]

A copy of the parameters with resolved data

Exceptions:

Type Description
UpstreamTaskError

If any of the upstream states are not COMPLETED

Source code in prefect/engine.py
async def resolve_inputs(
    parameters: Dict[str, Any], return_data: bool = True
) -> Dict[str, Any]:
    """
    Resolve any `Quote`, `PrefectFuture`, or `State` types nested in parameters into
    data.

    Returns:
        A copy of the parameters with resolved data

    Raises:
        UpstreamTaskError: If any of the upstream states are not `COMPLETED`
    """

    async def visit_fn(expr):
        state = None

        if isinstance(expr, Quote):
            return expr.unquote()
        elif isinstance(expr, PrefectFuture):
            state = await expr._wait()
        elif isinstance(expr, State):
            state = expr
        else:
            return expr

        if not state.is_completed():
            raise UpstreamTaskError(
                f"Upstream task run '{state.state_details.task_run_id}' did not reach a 'COMPLETED' state."
            )

        # Only retrieve the result if requested as it may be expensive
        return state.result() if return_data else None

    return await visit_collection(
        parameters,
        visit_fn=visit_fn,
        return_data=return_data,
    )

retrieve_flow_then_begin_flow_run async

Async entrypoint for flow runs that have been submitted for execution by an agent

  • Retrieves the deployment information
  • Loads the flow object using deployment information
  • Updates the flow run version
Source code in prefect/engine.py
@inject_client
async def retrieve_flow_then_begin_flow_run(
    flow_run_id: UUID, client: OrionClient
) -> State:
    """
    Async entrypoint for flow runs that have been submitted for execution by an agent

    - Retrieves the deployment information
    - Loads the flow object using deployment information
    - Updates the flow run version
    """
    flow_run = await client.read_flow_run(flow_run_id)

    try:
        flow = await load_flow_from_flow_run(flow_run, client=client)
    except Exception as exc:
        message = "Flow could not be retrieved from deployment."
        flow_run_logger(flow_run).exception(message)
        state = Failed(message=message, data=safe_encode_exception(exc))
        await client.set_flow_run_state(
            state=state, flow_run_id=flow_run_id, force=True
        )
        return state

    await client.update_flow_run(
        flow_run_id=flow_run_id,
        flow_version=flow.version,
    )

    if flow.should_validate_parameters:
        try:
            parameters = flow.validate_parameters(flow_run.parameters)
        except Exception as exc:
            flow_run_logger(flow_run).exception("Flow run received invalid parameters.")
            state = Failed(
                message="Flow run received invalid parameters.",
                data=DataDocument.encode("cloudpickle", exc),
            )
            await client.propose_state(
                state=state,
                flow_run_id=flow_run_id,
            )
            return state
    else:
        parameters = flow_run.parameters

    return await begin_flow_run(
        flow=flow,
        flow_run=flow_run,
        parameters=parameters,
        client=client,
    )

submit_task_run async

Async entrypoint for task calls.

Tasks must be called within a flow. When tasks are called, they create a task run and submit orchestration of the run to the flow run's task runner. The task runner returns a future that is returned immediately.

Source code in prefect/engine.py
async def submit_task_run(
    task: Task,
    flow_run_context: FlowRunContext,
    parameters: Dict[str, Any],
    task_run: TaskRun,
    wait_for: Optional[Iterable[PrefectFuture]],
    task_runner: BaseTaskRunner,
) -> PrefectFuture:
    """
    Async entrypoint for task calls.

    Tasks must be called within a flow. When tasks are called, they create a task run
    and submit orchestration of the run to the flow run's task runner. The task runner
    returns a future that is returned immediately.
    """
    logger = get_run_logger(flow_run_context)

    if task_runner.concurrency_type == TaskConcurrencyType.SEQUENTIAL:
        logger.info(f"Executing {task_run.name!r} immediately...")

    future = await task_runner.submit(
        task_run=task_run,
        run_key=f"{task_run.name}-{task_run.id.hex}-{flow_run_context.flow_run.run_count}",
        run_fn=begin_task_run,
        run_kwargs=dict(
            task=task,
            task_run=task_run,
            parameters=parameters,
            wait_for=wait_for,
            result_filesystem=flow_run_context.result_filesystem,
            settings=prefect.context.SettingsContext.get().copy(),
        ),
        asynchronous=task.isasync and flow_run_context.flow.isasync,
    )

    if task_runner.concurrency_type != TaskConcurrencyType.SEQUENTIAL:
        logger.info(f"Submitted task run {task_run.name!r} for execution.")

    # Track the task run future in the flow run context
    flow_run_context.task_run_futures.append(future)

    return future