prefect.server.orchestration.rules

Prefect’s flow and task-run orchestration machinery. This module contains all the core concepts necessary to implement Prefect’s state orchestration engine. These states correspond to intuitive descriptions of all the points that a Prefect flow or task can observe executing user code and intervene, if necessary. A detailed description of states can be found in our concept documentation. Prefect’s orchestration engine operates under the assumption that no governed user code will execute without first requesting Prefect REST API validate a change in state and record metadata about the run. With all attempts to run user code being checked against a Prefect instance, the Prefect REST API database becomes the unambiguous source of truth for managing the execution of complex interacting workflows. Orchestration rules can be implemented as discrete units of logic that operate against each state transition and can be fully observable, extensible, and customizable — all without needing to store or parse a single line of user code.

Classes

OrchestrationContext

A container for a state transition, governed by orchestration rules. When a flow- or task- run attempts to change state, Prefect REST API has an opportunity to decide whether this transition can proceed. All the relevant information associated with the state transition is stored in an OrchestrationContext, which is subsequently governed by nested orchestration rules implemented using the BaseOrchestrationRule ABC. OrchestrationContext introduces the concept of a state being None in the context of an intended state transition. An initial state can be None if a run is is attempting to set a state for the first time. The proposed state might be None if a rule governing the transition determines that no state change should occur at all and nothing is written to the database. Args:
  • session: a SQLAlchemy database session
  • initial_state: the initial state of a run
  • proposed_state: the proposed state a run is transitioning into
Methods:

entry_context

entry_context(self) -> tuple[Optional[states.State], Optional[states.State], Self]
A convenience method that generates input parameters for orchestration rules. An OrchestrationContext defines a state transition that is managed by orchestration rules which can fire hooks before a transition has been committed to the database. These hooks have a consistent interface which can be generated with this method.

exit_context

exit_context(self) -> tuple[Optional[states.State], Optional[states.State], Self]
A convenience method that generates input parameters for orchestration rules. An OrchestrationContext defines a state transition that is managed by orchestration rules which can fire hooks after a transition has been committed to the database. These hooks have a consistent interface which can be generated with this method.

flow_run

flow_run(self) -> orm_models.FlowRun | None

initial_state_type

initial_state_type(self) -> Optional[states.StateType]
The state type of self.initial_state if it exists.

model_validate_list

model_validate_list(cls, obj: Any) -> list[Self]

proposed_state_type

proposed_state_type(self) -> Optional[states.StateType]
The state type of self.proposed_state if it exists.

reset_fields

reset_fields(self: Self) -> Self
Reset the fields of the model that are in the _reset_fields set. Returns:
  • A new instance of the model with the reset fields.

run_settings

run_settings(self) -> RP
Run-level settings used to orchestrate the state transition.

safe_copy

safe_copy(self) -> Self
Creates a mostly-mutation-safe copy for use in orchestration rules. Orchestration rules govern state transitions using information stored in an OrchestrationContext. However, mutating objects stored on the context directly can have unintended side-effects. To guard against this, self.safe_copy can be used to pass information to orchestration rules without risking mutation. Returns:
  • A mutation-safe copy of the OrchestrationContext

validated_state_type

validated_state_type(self) -> Optional[states.StateType]
The state type of self.validated_state if it exists.

FlowOrchestrationContext

A container for a flow run state transition, governed by orchestration rules. When a flow- run attempts to change state, Prefect REST API has an opportunity to decide whether this transition can proceed. All the relevant information associated with the state transition is stored in an OrchestrationContext, which is subsequently governed by nested orchestration rules implemented using the BaseOrchestrationRule ABC. FlowOrchestrationContext introduces the concept of a state being None in the context of an intended state transition. An initial state can be None if a run is is attempting to set a state for the first time. The proposed state might be None if a rule governing the transition determines that no state change should occur at all and nothing is written to the database. Args:
  • session: a SQLAlchemy database session
  • run: the flow run attempting to change state
  • initial_state: the initial state of a run
  • proposed_state: the proposed state a run is transitioning into
Methods:

flow_run

flow_run(self) -> orm_models.FlowRun

run_settings

run_settings(self) -> core.FlowRunPolicy
Run-level settings used to orchestrate the state transition.

safe_copy

safe_copy(self) -> Self
Creates a mostly-mutation-safe copy for use in orchestration rules. Orchestration rules govern state transitions using information stored in an OrchestrationContext. However, mutating objects stored on the context directly can have unintended side-effects. To guard against this, self.safe_copy can be used to pass information to orchestration rules without risking mutation. Returns:
  • A mutation-safe copy of FlowOrchestrationContext

task_run

task_run(self) -> None

validate_proposed_state

validate_proposed_state(self, db: PrefectDBInterface)
Validates a proposed state by committing it to the database. After the FlowOrchestrationContext is governed by orchestration rules, the proposed state can be validated: the proposed state is added to the current SQLAlchemy session and is flushed. self.validated_state set to the flushed state. The state on the run is set to the validated state as well. If the proposed state is None when this method is called, no state will be written and self.validated_state will be set to the run’s current state. Returns:
  • None

TaskOrchestrationContext

A container for a task run state transition, governed by orchestration rules. When a task- run attempts to change state, Prefect REST API has an opportunity to decide whether this transition can proceed. All the relevant information associated with the state transition is stored in an OrchestrationContext, which is subsequently governed by nested orchestration rules implemented using the BaseOrchestrationRule ABC. TaskOrchestrationContext introduces the concept of a state being None in the context of an intended state transition. An initial state can be None if a run is is attempting to set a state for the first time. The proposed state might be None if a rule governing the transition determines that no state change should occur at all and nothing is written to the database. Args:
  • session: a SQLAlchemy database session
  • run: the task run attempting to change state
  • initial_state: the initial state of a run
  • proposed_state: the proposed state a run is transitioning into
Methods:

flow_run

flow_run(self) -> orm_models.FlowRun | None

run_settings

run_settings(self) -> core.TaskRunPolicy
Run-level settings used to orchestrate the state transition.

safe_copy

safe_copy(self) -> Self
Creates a mostly-mutation-safe copy for use in orchestration rules. Orchestration rules govern state transitions using information stored in an OrchestrationContext. However, mutating objects stored on the context directly can have unintended side-effects. To guard against this, self.safe_copy can be used to pass information to orchestration rules without risking mutation. Returns:
  • A mutation-safe copy of TaskOrchestrationContext

task_run

task_run(self) -> orm_models.TaskRun

validate_proposed_state

validate_proposed_state(self, db: PrefectDBInterface)
Validates a proposed state by committing it to the database. After the TaskOrchestrationContext is governed by orchestration rules, the proposed state can be validated: the proposed state is added to the current SQLAlchemy session and is flushed. self.validated_state set to the flushed state. The state on the run is set to the validated state as well. If the proposed state is None when this method is called, no state will be written and self.validated_state will be set to the run’s current state. Returns:
  • None

BaseOrchestrationRule

An abstract base class used to implement a discrete piece of orchestration logic. An OrchestrationRule is a stateful context manager that directly governs a state transition. Complex orchestration is achieved by nesting multiple rules. Each rule runs against an OrchestrationContext that contains the transition details; this context is then passed to subsequent rules. The context can be modified by hooks that fire before and after a new state is validated and committed to the database. These hooks will fire as long as the state transition is considered “valid” and govern a transition by either modifying the proposed state before it is validated or by producing a side-effect. A state transition occurs whenever a flow- or task- run changes state, prompting Prefect REST API to decide whether or not this transition can proceed. The current state of the run is referred to as the “initial state”, and the state a run is attempting to transition into is the “proposed state”. Together, the initial state transitioning into the proposed state is the intended transition that is governed by these orchestration rules. After using rules to enter a runtime context, the OrchestrationContext will contain a proposed state that has been governed by each rule, and at that point can validate the proposed state and commit it to the database. The validated state will be set on the context as context.validated_state, and rules will call the self.after_transition hook upon exiting the managed context. Examples: Create a rule:
class BasicRule(BaseOrchestrationRule):
    # allowed initial state types
    FROM_STATES = [StateType.RUNNING]
    # allowed proposed state types
    TO_STATES = [StateType.COMPLETED, StateType.FAILED]

    async def before_transition(initial_state, proposed_state, ctx):
        # side effects and proposed state mutation can happen here
        ...

    async def after_transition(initial_state, validated_state, ctx):
        # operations on states that have been validated can happen here
        ...

    async def cleanup(intitial_state, validated_state, ctx):
        # reverts side effects generated by `before_transition` if necessary
        ...
Use a rule:
intended_transition = (StateType.RUNNING, StateType.COMPLETED)
async with BasicRule(context, *intended_transition):
    # context.proposed_state has been governed by BasicRule
    ...
Use multiple rules:
rules = [BasicRule, BasicRule]
intended_transition = (StateType.RUNNING, StateType.COMPLETED)
async with contextlib.AsyncExitStack() as stack:
    for rule in rules:
        stack.enter_async_context(rule(context, *intended_transition))

    # context.proposed_state has been governed by all rules
    ...
Args:
  • context: A FlowOrchestrationContext or TaskOrchestrationContext that is passed between rules
  • from_state_type: The state type of the initial state of a run, if this state type is not contained in FROM_STATES, no hooks will fire
  • to_state_type: The state type of the proposed state before orchestration, if this state type is not contained in TO_STATES, no hooks will fire
Methods:

abort_transition

abort_transition(self, reason: str) -> None
Aborts a proposed transition before the transition is validated. This method will abort a proposed transition, expecting no further action to occur for this run. The proposed state is set to None, signaling to the OrchestrationContext that no state should be written to the database. A reason for aborting the transition is also provided. Rules that abort the transition will not fizzle, despite the proposed state type changing. Args:
  • reason: The reason for aborting the transition

after_transition

after_transition(self, initial_state: Optional[states.State], validated_state: Optional[states.State], context: OrchestrationContext[T, RP]) -> None
Implements a hook that can fire after a state is committed to the database. Args:
  • initial_state: The initial state of a transition
  • validated_state: The governed state that has been committed to the database
  • context: A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.
Returns:
  • None

before_transition

before_transition(self, initial_state: Optional[states.State], proposed_state: Optional[states.State], context: OrchestrationContext[T, RP]) -> None
Implements a hook that can fire before a state is committed to the database. This hook may produce side-effects or mutate the proposed state of a transition using one of four methods: self.reject_transition, self.delay_transition, self.abort_transition, and self.rename_state. Args:
  • initial_state: The initial state of a transition
  • proposed_state: The proposed state of a transition
  • context: A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.
Returns:
  • None

cleanup

cleanup(self, initial_state: Optional[states.State], validated_state: Optional[states.State], context: OrchestrationContext[T, RP]) -> None
Implements a hook that can fire after a state is committed to the database. The intended use of this method is to revert side-effects produced by self.before_transition when the transition is found to be invalid on exit. This allows multiple rules to be gracefully run in sequence, without logic that keeps track of all other rules that might govern a transition. Args:
  • initial_state: The initial state of a transition
  • validated_state: The governed state that has been committed to the database
  • context: A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.
Returns:
  • None

delay_transition

delay_transition(self, delay_seconds: int, reason: str) -> None
Delays a proposed transition before the transition is validated. This method will delay a proposed transition, setting the proposed state to None, signaling to the OrchestrationContext that no state should be written to the database. The number of seconds a transition should be delayed is passed to the OrchestrationContext. A reason for delaying the transition is also provided. Rules that delay the transition will not fizzle, despite the proposed state type changing. Args:
  • delay_seconds: The number of seconds the transition should be delayed
  • reason: The reason for delaying the transition

fizzled

fizzled(self) -> bool
Determines if a rule is fizzled and side-effects need to be reverted. Rules are fizzled if the transitions were valid on entry (thus firing self.before_transition) but are invalid upon exiting the governed context, most likely caused by another rule mutating the transition. Returns:
  • True if the rule is fizzled, False otherwise.

invalid

invalid(self) -> bool
Determines if a rule is invalid. Invalid rules do nothing and no hooks fire upon entering or exiting a governed context. Rules are invalid if the transition states types are not contained in self.FROM_STATES and self.TO_STATES, or if the context is proposing a transition that differs from the transition the rule was instantiated with. Returns:
  • True if the rules in invalid, False otherwise.

invalid_transition

invalid_transition(self) -> bool
Determines if the transition proposed by the OrchestrationContext is invalid. If the OrchestrationContext is attempting to manage a transition with this rule that differs from the transition the rule was instantiated with, the transition is considered to be invalid. Depending on the context, a rule with an invalid transition is either “invalid” or “fizzled”. Returns:
  • True if the transition is invalid, False otherwise.

reject_transition

reject_transition(self, state: Optional[states.State], reason: str) -> None
Rejects a proposed transition before the transition is validated. This method will reject a proposed transition, mutating the proposed state to the provided state. A reason for rejecting the transition is also passed on to the OrchestrationContext. Rules that reject the transition will not fizzle, despite the proposed state type changing. Args:
  • state: The new proposed state. If None, the current run state will be returned in the result instead.
  • reason: The reason for rejecting the transition

rename_state

rename_state(self, state_name: str) -> None
Sets the “name” attribute on a proposed state. The name of a state is an annotation intended to provide rich, human-readable context for how a run is progressing. This method only updates the name and not the canonical state TYPE, and will not fizzle or invalidate any other rules that might govern this state transition.

update_context_parameters

update_context_parameters(self, key: str, value: Any) -> None
Updates the “parameters” dictionary attribute with the specified key-value pair. This mechanism streamlines the process of passing messages and information between orchestration rules if necessary and is simpler and more ephemeral than message-passing via the database or some other side-effect. This mechanism can be used to break up large rules for ease of testing or comprehension, but note that any rules coupled this way (or any other way) are no longer independent and the order in which they appear in the orchestration policy priority will matter.

FlowRunOrchestrationRule

TaskRunOrchestrationRule

GenericOrchestrationRule

BaseUniversalTransform

An abstract base class used to implement privileged bookkeeping logic. Beyond the orchestration rules implemented with the BaseOrchestrationRule ABC, Universal transforms are not stateful, and fire their before- and after- transition hooks on every state transition unless the proposed state is None, indicating that no state should be written to the database. Because there are no guardrails in place to prevent directly mutating state or other parts of the orchestration context, universal transforms should only be used with care. Args:
  • context: A FlowOrchestrationContext or TaskOrchestrationContext that is passed between transforms
Methods:

after_transition

after_transition(self, context: OrchestrationContext[T, RP]) -> None
Implements a hook that can fire after a state is committed to the database. Args:
  • context: the OrchestrationContext that contains transition details
Returns:
  • None

before_transition

before_transition(self, context: OrchestrationContext[T, RP]) -> None
Implements a hook that fires before a state is committed to the database. Args:
  • context: the OrchestrationContext that contains transition details
Returns:
  • None

exception_in_transition

exception_in_transition(self) -> bool
Determines if the transition has encountered an exception. Returns:
  • True if the transition is encountered an exception, False otherwise.

nullified_transition

nullified_transition(self) -> bool
Determines if the transition has been nullified. Transitions are nullified if the proposed state is None, indicating that nothing should be written to the database. Returns:
  • True if the transition is nullified, False otherwise.

TaskRunUniversalTransform

FlowRunUniversalTransform

GenericUniversalTransform