Skip to content

prefect.orion.orchestration.global_policy

Bookkeeping logic that fires on every state transition.

For clarity, GlobalFlowpolicy and GlobalTaskPolicy contain all transition logic implemented using BaseUniversalTransform. None of these operations modify state, and regardless of what orchestration Orion might enforce on a transtition, the global policies contain Orion's necessary bookkeeping. Because these transforms record information about the validated state committed to the state database, they should be the most deeply nested contexts in orchestration loop.

GlobalFlowPolicy

Global transforms that run against flow-run-state transitions in priority order.

These transforms are intended to run immediately before and after a state transition is validated.

Source code in prefect/orion/orchestration/global_policy.py
class GlobalFlowPolicy(BaseOrchestrationPolicy):
    """
    Global transforms that run against flow-run-state transitions in priority order.

    These transforms are intended to run immediately before and after a state transition
    is validated.
    """

    def priority():
        return COMMON_GLOBAL_TRANSFORMS() + [
            UpdateSubflowParentTask,
            UpdateSubflowStateDetails,
        ]

GlobalFlowPolicy.priority

A list of orchestration rules in priority order.

Source code in prefect/orion/orchestration/global_policy.py
def priority():
    return COMMON_GLOBAL_TRANSFORMS() + [
        UpdateSubflowParentTask,
        UpdateSubflowStateDetails,
    ]

GlobalTaskPolicy

Global transforms that run against task-run-state transitions in priority order.

These transforms are intended to run immediately before and after a state transition is validated.

Source code in prefect/orion/orchestration/global_policy.py
class GlobalTaskPolicy(BaseOrchestrationPolicy):
    """
    Global transforms that run against task-run-state transitions in priority order.

    These transforms are intended to run immediately before and after a state transition
    is validated.
    """

    def priority():
        return COMMON_GLOBAL_TRANSFORMS()

GlobalTaskPolicy.priority

A list of orchestration rules in priority order.

Source code in prefect/orion/orchestration/global_policy.py
def priority():
    return COMMON_GLOBAL_TRANSFORMS()

SetRunStateType

Updates the state type of a run on a state transition.

Source code in prefect/orion/orchestration/global_policy.py
class SetRunStateType(BaseUniversalTransform):
    """
    Updates the state type of a run on a state transition.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # record the new state's type
        context.run.state_type = context.proposed_state.type

SetRunStateType.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # record the new state's type
    context.run.state_type = context.proposed_state.type

SetRunStateName

Updates the state name of a run on a state transition.

Source code in prefect/orion/orchestration/global_policy.py
class SetRunStateName(BaseUniversalTransform):
    """
    Updates the state name of a run on a state transition.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # record the new state's name
        context.run.state_name = context.proposed_state.name

SetRunStateName.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # record the new state's name
    context.run.state_name = context.proposed_state.name

SetStartTime

Records the time a run enters a running state for the first time.

Source code in prefect/orion/orchestration/global_policy.py
class SetStartTime(BaseUniversalTransform):
    """
    Records the time a run enters a running state for the first time.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if entering a running state and no start time is set...
        if context.proposed_state.is_running() and context.run.start_time is None:
            # set the start time
            context.run.start_time = context.proposed_state.timestamp

SetStartTime.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # if entering a running state and no start time is set...
    if context.proposed_state.is_running() and context.run.start_time is None:
        # set the start time
        context.run.start_time = context.proposed_state.timestamp

SetRunStateTimestamp

Records the time a run changes states.

Source code in prefect/orion/orchestration/global_policy.py
class SetRunStateTimestamp(BaseUniversalTransform):
    """
    Records the time a run changes states.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # record the new state's timestamp
        context.run.state_timestamp = context.proposed_state.timestamp

SetRunStateTimestamp.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # record the new state's timestamp
    context.run.state_timestamp = context.proposed_state.timestamp

SetEndTime

Records the time a run enters a terminal state.

With normal client usage, a run will not transition out of a terminal state. However, it's possible to force these transitions manually via the API. While leaving a terminal state, the end time will be unset.

Source code in prefect/orion/orchestration/global_policy.py
class SetEndTime(BaseUniversalTransform):
    """
    Records the time a run enters a terminal state.

    With normal client usage, a run will not transition out of a terminal state.
    However, it's possible to force these transitions manually via the API. While
    leaving a terminal state, the end time will be unset.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if exiting a final state for a non-final state...
        if (
            context.initial_state
            and context.initial_state.is_final()
            and not context.proposed_state.is_final()
        ):
            # clear the end time
            context.run.end_time = None

        # if entering a final state...
        if context.proposed_state.is_final():
            # if the run has a start time and no end time, give it one
            if context.run.start_time and not context.run.end_time:
                context.run.end_time = context.proposed_state.timestamp

SetEndTime.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # if exiting a final state for a non-final state...
    if (
        context.initial_state
        and context.initial_state.is_final()
        and not context.proposed_state.is_final()
    ):
        # clear the end time
        context.run.end_time = None

    # if entering a final state...
    if context.proposed_state.is_final():
        # if the run has a start time and no end time, give it one
        if context.run.start_time and not context.run.end_time:
            context.run.end_time = context.proposed_state.timestamp

IncrementRunTime

Records the amount of time a run spends in the running state.

Source code in prefect/orion/orchestration/global_policy.py
class IncrementRunTime(BaseUniversalTransform):
    """
    Records the amount of time a run spends in the running state.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if exiting a running state...
        if context.initial_state and context.initial_state.is_running():
            # increment the run time by the time spent in the previous state
            context.run.total_run_time += (
                context.proposed_state.timestamp - context.initial_state.timestamp
            )

IncrementRunTime.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # if exiting a running state...
    if context.initial_state and context.initial_state.is_running():
        # increment the run time by the time spent in the previous state
        context.run.total_run_time += (
            context.proposed_state.timestamp - context.initial_state.timestamp
        )

IncrementRunCount

Records the number of times a run enters a running state. For use with retries.

Source code in prefect/orion/orchestration/global_policy.py
class IncrementRunCount(BaseUniversalTransform):
    """
    Records the number of times a run enters a running state. For use with retries.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if entering a running state...
        if context.proposed_state.is_running():
            # increment the run count
            context.run.run_count += 1

IncrementRunCount.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # if entering a running state...
    if context.proposed_state.is_running():
        # increment the run count
        context.run.run_count += 1

SetExpectedStartTime

Estimates the time a state is expected to start running if not set.

For scheduled states, this estimate is simply the scheduled time. For other states, this is set to the time the proposed state was created by Orion.

Source code in prefect/orion/orchestration/global_policy.py
class SetExpectedStartTime(BaseUniversalTransform):
    """
    Estimates the time a state is expected to start running if not set.

    For scheduled states, this estimate is simply the scheduled time. For other states,
    this is set to the time the proposed state was created by Orion.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # set expected start time if this is the first state
        if not context.run.expected_start_time:
            if context.proposed_state.is_scheduled():
                context.run.expected_start_time = (
                    context.proposed_state.state_details.scheduled_time
                )
            else:
                context.run.expected_start_time = context.proposed_state.timestamp

SetExpectedStartTime.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # set expected start time if this is the first state
    if not context.run.expected_start_time:
        if context.proposed_state.is_scheduled():
            context.run.expected_start_time = (
                context.proposed_state.state_details.scheduled_time
            )
        else:
            context.run.expected_start_time = context.proposed_state.timestamp

SetNextScheduledStartTime

Records the scheduled time on a run.

When a run enters a scheduled state, run.next_scheduled_start_time is set to the state's scheduled time. When leaving a scheduled state, run.next_scheduled_start_time is unset.

Source code in prefect/orion/orchestration/global_policy.py
class SetNextScheduledStartTime(BaseUniversalTransform):
    """
    Records the scheduled time on a run.

    When a run enters a scheduled state, `run.next_scheduled_start_time` is set to
    the state's scheduled time. When leaving a scheduled state,
    `run.next_scheduled_start_time` is unset.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # remove the next scheduled start time if exiting a scheduled state
        if context.initial_state and context.initial_state.is_scheduled():
            context.run.next_scheduled_start_time = None

        # set next scheduled start time if entering a scheduled state
        if context.proposed_state.is_scheduled():
            context.run.next_scheduled_start_time = (
                context.proposed_state.state_details.scheduled_time
            )

SetNextScheduledStartTime.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # remove the next scheduled start time if exiting a scheduled state
    if context.initial_state and context.initial_state.is_scheduled():
        context.run.next_scheduled_start_time = None

    # set next scheduled start time if entering a scheduled state
    if context.proposed_state.is_scheduled():
        context.run.next_scheduled_start_time = (
            context.proposed_state.state_details.scheduled_time
        )

UpdateSubflowParentTask

Whenever a subflow changes state, it must update its parent task run's state.

Source code in prefect/orion/orchestration/global_policy.py
class UpdateSubflowParentTask(BaseUniversalTransform):
    """
    Whenever a subflow changes state, it must update its parent task run's state.
    """

    async def after_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # only applies to flow runs with a parent task run id
        if context.run.parent_task_run_id is not None:

            # avoid mutation of the flow run state
            subflow_parent_task_state = context.validated_state.copy(
                reset_fields=True,
                include={
                    "type",
                    "timestamp",
                    "name",
                    "message",
                    "state_details",
                    "data",
                },
            )

            # set the task's "child flow run id" to be the subflow run id
            subflow_parent_task_state.state_details.child_flow_run_id = context.run.id

            await models.task_runs.set_task_run_state(
                session=context.session,
                task_run_id=context.run.parent_task_run_id,
                state=subflow_parent_task_state,
                force=True,
            )

UpdateSubflowParentTask.after_transition async

Implements a hook that can fire after a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def after_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # only applies to flow runs with a parent task run id
    if context.run.parent_task_run_id is not None:

        # avoid mutation of the flow run state
        subflow_parent_task_state = context.validated_state.copy(
            reset_fields=True,
            include={
                "type",
                "timestamp",
                "name",
                "message",
                "state_details",
                "data",
            },
        )

        # set the task's "child flow run id" to be the subflow run id
        subflow_parent_task_state.state_details.child_flow_run_id = context.run.id

        await models.task_runs.set_task_run_state(
            session=context.session,
            task_run_id=context.run.parent_task_run_id,
            state=subflow_parent_task_state,
            force=True,
        )

UpdateSubflowStateDetails

Update a child subflow state's references to a corresponding tracking task run id in the parent flow run

Source code in prefect/orion/orchestration/global_policy.py
class UpdateSubflowStateDetails(BaseUniversalTransform):
    """
    Update a child subflow state's references to a corresponding tracking task run id
    in the parent flow run
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # only applies to flow runs with a parent task run id
        if context.run.parent_task_run_id is not None:
            context.proposed_state.state_details.task_run_id = (
                context.run.parent_task_run_id
            )

UpdateSubflowStateDetails.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
    if self.nullified_transition():
        return

    # only applies to flow runs with a parent task run id
    if context.run.parent_task_run_id is not None:
        context.proposed_state.state_details.task_run_id = (
            context.run.parent_task_run_id
        )

UpdateStateDetails

Update a state's references to a corresponding flow- or task- run.

Source code in prefect/orion/orchestration/global_policy.py
class UpdateStateDetails(BaseUniversalTransform):
    """
    Update a state's references to a corresponding flow- or task- run.
    """

    async def before_transition(
        self,
        context: OrchestrationContext,
    ) -> None:
        if self.nullified_transition():
            return

        if isinstance(context, FlowOrchestrationContext):
            flow_run = await context.flow_run()
            context.proposed_state.state_details.flow_run_id = flow_run.id

        elif isinstance(context, TaskOrchestrationContext):
            task_run = await context.task_run()
            context.proposed_state.state_details.flow_run_id = task_run.flow_run_id
            context.proposed_state.state_details.task_run_id = task_run.id

UpdateStateDetails.before_transition async

Implements a hook that fires before a state is committed to the database.

Parameters:

Name Description Default
context

the OrchestrationContext that contains transition details

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(
    self,
    context: OrchestrationContext,
) -> None:
    if self.nullified_transition():
        return

    if isinstance(context, FlowOrchestrationContext):
        flow_run = await context.flow_run()
        context.proposed_state.state_details.flow_run_id = flow_run.id

    elif isinstance(context, TaskOrchestrationContext):
        task_run = await context.task_run()
        context.proposed_state.state_details.flow_run_id = task_run.flow_run_id
        context.proposed_state.state_details.task_run_id = task_run.id