prefect.orion.orchestration.rules
¶
Orion's flow- and task- run orchestration machinery.
This module contains all the core concepts necessary to implement Orion's state orchestration engine. These states correspond to intuitive descriptions of all the points that a Prefect flow or task can observe executing user code and intervene, if necessary. A detailed description of states can be found in our concept documentation.
Orion's orchestration engine operates under the assumption that no governed user code will execute without first requesting Orion validate a change in state and record metadata about the run. With all attempts to run user code being checked against an Orion instance, the Orion database becomes the unambiguous source of truth for managing the execution of complex interacting workflows. Orchestration rules can be implemented as discrete units of logic that operate against each state transition and can be fully observable, extensible, and customizable--all without needing to store or parse a single line of user code.
OrchestrationResult
pydantic-model
¶
A container for the output of state orchestration.
Source code in prefect/orion/orchestration/rules.py
class OrchestrationResult(PrefectBaseModel):
"""
A container for the output of state orchestration.
"""
state: Optional[states.State]
status: SetStateStatus
details: StateResponseDetails
OrchestrationContext
pydantic-model
¶
A container for a state transition, governed by orchestration rules.
Note
An OrchestrationContext
should not be instantiated directly, instead
use the flow- or task- specific subclasses, FlowOrchestrationContext
and
TaskOrchestrationContext
.
When a flow- or task- run attempts to change state, Orion has an opportunity
to decide whether this transition can proceed. All the relevant information
associated with the state transition is stored in an OrchestrationContext
,
which is subsequently governed by nested orchestration rules implemented using
the BaseOrchestrationRule
ABC.
OrchestrationContext
introduces the concept of a state being None
in the
context of an intended state transition. An initial state can be None
if a run
is is attempting to set a state for the first time. The proposed state might be
None
if a rule governing the transition determines that no state change
should occur at all and nothing is written to the database.
Attributes:
Name | Description |
---|---|
session |
a SQLAlchemy database session Union[sqlalchemy.orm.session.Session, sqlalchemy.ext.asyncio.session.AsyncSession] |
initial_state |
the initial state of a run Optional[prefect.orion.schemas.states.State] |
proposed_state |
the proposed state a run is transitioning into Optional[prefect.orion.schemas.states.State] |
validated_state |
a proposed state that has committed to the database Optional[prefect.orion.schemas.states.State] |
rule_signature |
a record of rules that have fired on entry into a managed context, currently only used for debugging purposes List[str] |
finalization_signature |
a record of rules that have fired on exit from a managed context, currently only used for debugging purposes List[str] |
response_status |
a SetStateStatus object used to build the API response SetStateStatus |
response_details |
a StateResponseDetails object use to build the API response Union[prefect.orion.schemas.responses.StateAcceptDetails, prefect.orion.schemas.responses.StateWaitDetails, prefect.orion.schemas.responses.StateRejectDetails, prefect.orion.schemas.responses.StateAbortDetails] |
Parameters:
Name | Description | Default |
---|---|---|
session |
a SQLAlchemy database session |
required |
initial_state |
the initial state of a run |
required |
proposed_state |
the proposed state a run is transitioning into |
required |
Source code in prefect/orion/orchestration/rules.py
class OrchestrationContext(PrefectBaseModel):
"""
A container for a state transition, governed by orchestration rules.
Note:
An `OrchestrationContext` should not be instantiated directly, instead
use the flow- or task- specific subclasses, `FlowOrchestrationContext` and
`TaskOrchestrationContext`.
When a flow- or task- run attempts to change state, Orion has an opportunity
to decide whether this transition can proceed. All the relevant information
associated with the state transition is stored in an `OrchestrationContext`,
which is subsequently governed by nested orchestration rules implemented using
the `BaseOrchestrationRule` ABC.
`OrchestrationContext` introduces the concept of a state being `None` in the
context of an intended state transition. An initial state can be `None` if a run
is is attempting to set a state for the first time. The proposed state might be
`None` if a rule governing the transition determines that no state change
should occur at all and nothing is written to the database.
Attributes:
session: a SQLAlchemy database session
initial_state: the initial state of a run
proposed_state: the proposed state a run is transitioning into
validated_state: a proposed state that has committed to the database
rule_signature: a record of rules that have fired on entry into a
managed context, currently only used for debugging purposes
finalization_signature: a record of rules that have fired on exit from a
managed context, currently only used for debugging purposes
response_status: a SetStateStatus object used to build the API response
response_details:a StateResponseDetails object use to build the API response
Args:
session: a SQLAlchemy database session
initial_state: the initial state of a run
proposed_state: the proposed state a run is transitioning into
"""
class Config:
arbitrary_types_allowed = True
session: Optional[Union[sa.orm.Session, AsyncSession]] = ...
initial_state: Optional[states.State] = ...
proposed_state: Optional[states.State] = ...
validated_state: Optional[states.State]
rule_signature: List[str] = Field(default_factory=list)
finalization_signature: List[str] = Field(default_factory=list)
response_status: SetStateStatus = Field(default=SetStateStatus.ACCEPT)
response_details: StateResponseDetails = Field(default_factory=StateAcceptDetails)
@property
def initial_state_type(self) -> Optional[states.StateType]:
"""The state type of `self.initial_state` if it exists."""
return self.initial_state.type if self.initial_state else None
@property
def proposed_state_type(self) -> Optional[states.StateType]:
"""The state type of `self.proposed_state` if it exists."""
return self.proposed_state.type if self.proposed_state else None
@property
def validated_state_type(self) -> Optional[states.StateType]:
"""The state type of `self.validated_state` if it exists."""
return self.validated_state.type if self.validated_state else None
def safe_copy(self):
"""
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an `OrchestrationContext`. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
`self.safe_copy` can be used to pass information to orchestration rules
without risking mutation.
Returns:
A mutation-safe copy of the `OrchestrationContext`
"""
safe_copy = self.copy()
safe_copy.initial_state = (
self.initial_state.copy() if self.initial_state else None
)
safe_copy.proposed_state = (
self.proposed_state.copy() if self.proposed_state else None
)
safe_copy.validated_state = (
self.validated_state.copy() if self.validated_state else None
)
return safe_copy
def entry_context(self):
"""
A convenience method that generates input parameters for orchestration rules.
An `OrchestrationContext` defines a state transition that is managed by
orchestration rules which can fire hooks before a transition has been committed
to the database. These hooks have a consistent interface which can be generated
with this method.
"""
safe_context = self.safe_copy()
return safe_context.initial_state, safe_context.proposed_state, safe_context
def exit_context(self):
"""
A convenience method that generates input parameters for orchestration rules.
An `OrchestrationContext` defines a state transition that is managed by
orchestration rules which can fire hooks after a transition has been committed
to the database. These hooks have a consistent interface which can be generated
with this method.
"""
safe_context = self.safe_copy()
return safe_context.initial_state, safe_context.validated_state, safe_context
initial_state_type
property
readonly
¶
Type: Optional[prefect.orion.schemas.states.StateType]
The state type of self.initial_state
if it exists.
proposed_state_type
property
readonly
¶
Type: Optional[prefect.orion.schemas.states.StateType]
The state type of self.proposed_state
if it exists.
validated_state_type
property
readonly
¶
Type: Optional[prefect.orion.schemas.states.StateType]
The state type of self.validated_state
if it exists.
OrchestrationContext.safe_copy
¶
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an OrchestrationContext
. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
self.safe_copy
can be used to pass information to orchestration rules
without risking mutation.
Returns:
Type | Description |
---|---|
A mutation-safe copy of the |
Source code in prefect/orion/orchestration/rules.py
def safe_copy(self):
"""
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an `OrchestrationContext`. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
`self.safe_copy` can be used to pass information to orchestration rules
without risking mutation.
Returns:
A mutation-safe copy of the `OrchestrationContext`
"""
safe_copy = self.copy()
safe_copy.initial_state = (
self.initial_state.copy() if self.initial_state else None
)
safe_copy.proposed_state = (
self.proposed_state.copy() if self.proposed_state else None
)
safe_copy.validated_state = (
self.validated_state.copy() if self.validated_state else None
)
return safe_copy
OrchestrationContext.entry_context
¶
A convenience method that generates input parameters for orchestration rules.
An OrchestrationContext
defines a state transition that is managed by
orchestration rules which can fire hooks before a transition has been committed
to the database. These hooks have a consistent interface which can be generated
with this method.
Source code in prefect/orion/orchestration/rules.py
def entry_context(self):
"""
A convenience method that generates input parameters for orchestration rules.
An `OrchestrationContext` defines a state transition that is managed by
orchestration rules which can fire hooks before a transition has been committed
to the database. These hooks have a consistent interface which can be generated
with this method.
"""
safe_context = self.safe_copy()
return safe_context.initial_state, safe_context.proposed_state, safe_context
OrchestrationContext.exit_context
¶
A convenience method that generates input parameters for orchestration rules.
An OrchestrationContext
defines a state transition that is managed by
orchestration rules which can fire hooks after a transition has been committed
to the database. These hooks have a consistent interface which can be generated
with this method.
Source code in prefect/orion/orchestration/rules.py
def exit_context(self):
"""
A convenience method that generates input parameters for orchestration rules.
An `OrchestrationContext` defines a state transition that is managed by
orchestration rules which can fire hooks after a transition has been committed
to the database. These hooks have a consistent interface which can be generated
with this method.
"""
safe_context = self.safe_copy()
return safe_context.initial_state, safe_context.validated_state, safe_context
FlowOrchestrationContext
pydantic-model
¶
A container for a flow run state transition, governed by orchestration rules.
When a flow- run attempts to change state, Orion has an opportunity
to decide whether this transition can proceed. All the relevant information
associated with the state transition is stored in an OrchestrationContext
,
which is subsequently governed by nested orchestration rules implemented using
the BaseOrchestrationRule
ABC.
FlowOrchestrationContext
introduces the concept of a state being None
in the
context of an intended state transition. An initial state can be None
if a run
is is attempting to set a state for the first time. The proposed state might be
None
if a rule governing the transition determines that no state change
should occur at all and nothing is written to the database.
Attributes:
Name | Description |
---|---|
session |
a SQLAlchemy database session Union[sqlalchemy.orm.session.Session, sqlalchemy.ext.asyncio.session.AsyncSession] |
run |
the flow run attempting to change state Any |
initial_state |
the initial state of the run Optional[prefect.orion.schemas.states.State] |
proposed_state |
the proposed state the run is transitioning into Optional[prefect.orion.schemas.states.State] |
validated_state |
a proposed state that has committed to the database Optional[prefect.orion.schemas.states.State] |
rule_signature |
a record of rules that have fired on entry into a managed context, currently only used for debugging purposes List[str] |
finalization_signature |
a record of rules that have fired on exit from a managed context, currently only used for debugging purposes List[str] |
response_status |
a SetStateStatus object used to build the API response SetStateStatus |
response_details |
a StateResponseDetails object use to build the API response Union[prefect.orion.schemas.responses.StateAcceptDetails, prefect.orion.schemas.responses.StateWaitDetails, prefect.orion.schemas.responses.StateRejectDetails, prefect.orion.schemas.responses.StateAbortDetails] |
Parameters:
Name | Description | Default |
---|---|---|
session |
a SQLAlchemy database session |
required |
run |
the flow run attempting to change state |
required |
initial_state |
the initial state of a run |
required |
proposed_state |
the proposed state a run is transitioning into |
required |
Source code in prefect/orion/orchestration/rules.py
class FlowOrchestrationContext(OrchestrationContext):
"""
A container for a flow run state transition, governed by orchestration rules.
When a flow- run attempts to change state, Orion has an opportunity
to decide whether this transition can proceed. All the relevant information
associated with the state transition is stored in an `OrchestrationContext`,
which is subsequently governed by nested orchestration rules implemented using
the `BaseOrchestrationRule` ABC.
`FlowOrchestrationContext` introduces the concept of a state being `None` in the
context of an intended state transition. An initial state can be `None` if a run
is is attempting to set a state for the first time. The proposed state might be
`None` if a rule governing the transition determines that no state change
should occur at all and nothing is written to the database.
Attributes:
session: a SQLAlchemy database session
run: the flow run attempting to change state
initial_state: the initial state of the run
proposed_state: the proposed state the run is transitioning into
validated_state: a proposed state that has committed to the database
rule_signature: a record of rules that have fired on entry into a
managed context, currently only used for debugging purposes
finalization_signature: a record of rules that have fired on exit from a
managed context, currently only used for debugging purposes
response_status: a SetStateStatus object used to build the API response
response_details:a StateResponseDetails object use to build the API response
Args:
session: a SQLAlchemy database session
run: the flow run attempting to change state
initial_state: the initial state of a run
proposed_state: the proposed state a run is transitioning into
"""
# run: db.FlowRun = ...
run: Any = ...
@inject_db
async def validate_proposed_state(
self,
db: OrionDBInterface,
):
"""
Validates a proposed state by committing it to the database.
After the `FlowOrchestrationContext` is governed by orchestration rules, the
proposed state can be validated: the proposed state is added to the current
SQLAlchemy session and is flushed. `self.validated_state` set to the flushed
state. The state on the run is set to the validated state as well. If the
proposed state is `None` when this method is called, nothing happens.
Returns:
None
"""
for validation_attempt in range(2):
validation_errors = []
try:
await self._validate_proposed_state()
except Exception as exc:
# unset the run state in case it's been set
validation_errors.append(exc)
if self.initial_state is not None:
initial_orm_state = db.FlowRunState(
flow_run_id=self.run.id,
**self.initial_state.dict(shallow=True),
)
self.session.add(initial_orm_state)
self.run.set_state(initial_orm_state)
else:
self.run.set_state(None)
continue
return
logger.error(
f"Encountered errors during state validation: {validation_errors!r}"
)
self.proposed_state = None
reason = f"Error validating state: {validation_errors!r}"
self.response_status = SetStateStatus.ABORT
self.response_details = StateAbortDetails(reason=reason)
@inject_db
async def _validate_proposed_state(
self,
db: OrionDBInterface,
):
if self.proposed_state is not None:
validated_orm_state = db.FlowRunState(
flow_run_id=self.run.id,
**self.proposed_state.dict(shallow=True),
)
self.session.add(validated_orm_state)
self.run.set_state(validated_orm_state)
else:
validated_orm_state = None
validated_state = (
validated_orm_state.as_state() if validated_orm_state else None
)
await self.session.flush()
self.validated_state = validated_state
def safe_copy(self):
"""
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an `OrchestrationContext`. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
`self.safe_copy` can be used to pass information to orchestration rules
without risking mutation.
Note:
`self.run` is an ORM model, and even when copied is unsafe to mutate
Returns:
A mutation-safe copy of `FlowOrchestrationContext`
"""
return super().safe_copy()
@property
def run_settings(self) -> Dict:
"""Run-level settings used to orchestrate the state transition."""
return self.run.empirical_policy
async def task_run(self):
return None
async def flow_run(self):
return self.run
run_settings
property
readonly
¶
Type: Dict
Run-level settings used to orchestrate the state transition.
FlowOrchestrationContext.validate_proposed_state
async
¶
Validates a proposed state by committing it to the database.
After the FlowOrchestrationContext
is governed by orchestration rules, the
proposed state can be validated: the proposed state is added to the current
SQLAlchemy session and is flushed. self.validated_state
set to the flushed
state. The state on the run is set to the validated state as well. If the
proposed state is None
when this method is called, nothing happens.
Returns:
Type | Description |
---|---|
None |
Source code in prefect/orion/orchestration/rules.py
@inject_db
async def validate_proposed_state(
self,
db: OrionDBInterface,
):
"""
Validates a proposed state by committing it to the database.
After the `FlowOrchestrationContext` is governed by orchestration rules, the
proposed state can be validated: the proposed state is added to the current
SQLAlchemy session and is flushed. `self.validated_state` set to the flushed
state. The state on the run is set to the validated state as well. If the
proposed state is `None` when this method is called, nothing happens.
Returns:
None
"""
for validation_attempt in range(2):
validation_errors = []
try:
await self._validate_proposed_state()
except Exception as exc:
# unset the run state in case it's been set
validation_errors.append(exc)
if self.initial_state is not None:
initial_orm_state = db.FlowRunState(
flow_run_id=self.run.id,
**self.initial_state.dict(shallow=True),
)
self.session.add(initial_orm_state)
self.run.set_state(initial_orm_state)
else:
self.run.set_state(None)
continue
return
logger.error(
f"Encountered errors during state validation: {validation_errors!r}"
)
self.proposed_state = None
reason = f"Error validating state: {validation_errors!r}"
self.response_status = SetStateStatus.ABORT
self.response_details = StateAbortDetails(reason=reason)
FlowOrchestrationContext.safe_copy
¶
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an OrchestrationContext
. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
self.safe_copy
can be used to pass information to orchestration rules
without risking mutation.
Note
self.run
is an ORM model, and even when copied is unsafe to mutate
Returns:
Type | Description |
---|---|
A mutation-safe copy of |
Source code in prefect/orion/orchestration/rules.py
def safe_copy(self):
"""
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an `OrchestrationContext`. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
`self.safe_copy` can be used to pass information to orchestration rules
without risking mutation.
Note:
`self.run` is an ORM model, and even when copied is unsafe to mutate
Returns:
A mutation-safe copy of `FlowOrchestrationContext`
"""
return super().safe_copy()
TaskOrchestrationContext
pydantic-model
¶
A container for a task run state transition, governed by orchestration rules.
When a task- run attempts to change state, Orion has an opportunity
to decide whether this transition can proceed. All the relevant information
associated with the state transition is stored in an OrchestrationContext
,
which is subsequently governed by nested orchestration rules implemented using
the BaseOrchestrationRule
ABC.
TaskOrchestrationContext
introduces the concept of a state being None
in the
context of an intended state transition. An initial state can be None
if a run
is is attempting to set a state for the first time. The proposed state might be
None
if a rule governing the transition determines that no state change
should occur at all and nothing is written to the database.
Attributes:
Name | Description |
---|---|
session |
a SQLAlchemy database session Union[sqlalchemy.orm.session.Session, sqlalchemy.ext.asyncio.session.AsyncSession] |
run |
the task run attempting to change state Any |
initial_state |
the initial state of the run Optional[prefect.orion.schemas.states.State] |
proposed_state |
the proposed state the run is transitioning into Optional[prefect.orion.schemas.states.State] |
validated_state |
a proposed state that has committed to the database Optional[prefect.orion.schemas.states.State] |
rule_signature |
a record of rules that have fired on entry into a managed context, currently only used for debugging purposes List[str] |
finalization_signature |
a record of rules that have fired on exit from a managed context, currently only used for debugging purposes List[str] |
response_status |
a SetStateStatus object used to build the API response SetStateStatus |
response_details |
a StateResponseDetails object use to build the API response Union[prefect.orion.schemas.responses.StateAcceptDetails, prefect.orion.schemas.responses.StateWaitDetails, prefect.orion.schemas.responses.StateRejectDetails, prefect.orion.schemas.responses.StateAbortDetails] |
Parameters:
Name | Description | Default |
---|---|---|
session |
a SQLAlchemy database session |
required |
run |
the task run attempting to change state |
required |
initial_state |
the initial state of a run |
required |
proposed_state |
the proposed state a run is transitioning into |
required |
Source code in prefect/orion/orchestration/rules.py
class TaskOrchestrationContext(OrchestrationContext):
"""
A container for a task run state transition, governed by orchestration rules.
When a task- run attempts to change state, Orion has an opportunity
to decide whether this transition can proceed. All the relevant information
associated with the state transition is stored in an `OrchestrationContext`,
which is subsequently governed by nested orchestration rules implemented using
the `BaseOrchestrationRule` ABC.
`TaskOrchestrationContext` introduces the concept of a state being `None` in the
context of an intended state transition. An initial state can be `None` if a run
is is attempting to set a state for the first time. The proposed state might be
`None` if a rule governing the transition determines that no state change
should occur at all and nothing is written to the database.
Attributes:
session: a SQLAlchemy database session
run: the task run attempting to change state
initial_state: the initial state of the run
proposed_state: the proposed state the run is transitioning into
validated_state: a proposed state that has committed to the database
rule_signature: a record of rules that have fired on entry into a
managed context, currently only used for debugging purposes
finalization_signature: a record of rules that have fired on exit from a
managed context, currently only used for debugging purposes
response_status: a SetStateStatus object used to build the API response
response_details:a StateResponseDetails object use to build the API response
Args:
session: a SQLAlchemy database session
run: the task run attempting to change state
initial_state: the initial state of a run
proposed_state: the proposed state a run is transitioning into
"""
# run: db.TaskRun = ...
run: Any = ...
@inject_db
async def validate_proposed_state(
self,
db: OrionDBInterface,
):
"""
Validates a proposed state by committing it to the database.
After the `TaskOrchestrationContext` is governed by orchestration rules, the
proposed state can be validated: the proposed state is added to the current
SQLAlchemy session and is flushed. `self.validated_state` set to the flushed
state. The state on the run is set to the validated state as well. If the
proposed state is `None` when this method is called, nothing happens.
Returns:
None
"""
for validation_attempt in range(2):
validation_errors = []
try:
await self._validate_proposed_state()
except Exception as exc:
# unset the run state in case it's been set
validation_errors.append(exc)
if self.initial_state is not None:
initial_orm_state = db.TaskRunState(
task_run_id=self.run.id,
**self.initial_state.dict(shallow=True),
)
self.session.add(initial_orm_state)
self.run.set_state(initial_orm_state)
else:
self.run.set_state(None)
continue
return
logger.error(
f"Encountered errors during state validation: {validation_errors!r}"
)
self.proposed_state = None
reason = f"Error validating state: {validation_errors!r}"
self.response_status = SetStateStatus.ABORT
self.response_details = StateAbortDetails(reason=reason)
@inject_db
async def _validate_proposed_state(
self,
db: OrionDBInterface,
):
if self.proposed_state is not None:
validated_orm_state = db.TaskRunState(
task_run_id=self.run.id,
**self.proposed_state.dict(shallow=True),
)
self.session.add(validated_orm_state)
self.run.set_state(validated_orm_state)
else:
validated_orm_state = None
validated_state = (
validated_orm_state.as_state() if validated_orm_state else None
)
await self.session.flush()
self.validated_state = validated_state
def safe_copy(self):
"""
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an `OrchestrationContext`. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
`self.safe_copy` can be used to pass information to orchestration rules
without risking mutation.
Note:
`self.run` is an ORM model, and even when copied is unsafe to mutate
Returns:
A mutation-safe copy of `TaskOrchestrationContext`
"""
return super().safe_copy()
@property
def run_settings(self) -> Dict:
"""Run-level settings used to orchestrate the state transition."""
return self.run.empirical_policy
async def task_run(self):
return self.run
async def flow_run(self):
return await flow_runs.read_flow_run(
session=self.session,
flow_run_id=self.run.flow_run_id,
)
run_settings
property
readonly
¶
Type: Dict
Run-level settings used to orchestrate the state transition.
TaskOrchestrationContext.validate_proposed_state
async
¶
Validates a proposed state by committing it to the database.
After the TaskOrchestrationContext
is governed by orchestration rules, the
proposed state can be validated: the proposed state is added to the current
SQLAlchemy session and is flushed. self.validated_state
set to the flushed
state. The state on the run is set to the validated state as well. If the
proposed state is None
when this method is called, nothing happens.
Returns:
Type | Description |
---|---|
None |
Source code in prefect/orion/orchestration/rules.py
@inject_db
async def validate_proposed_state(
self,
db: OrionDBInterface,
):
"""
Validates a proposed state by committing it to the database.
After the `TaskOrchestrationContext` is governed by orchestration rules, the
proposed state can be validated: the proposed state is added to the current
SQLAlchemy session and is flushed. `self.validated_state` set to the flushed
state. The state on the run is set to the validated state as well. If the
proposed state is `None` when this method is called, nothing happens.
Returns:
None
"""
for validation_attempt in range(2):
validation_errors = []
try:
await self._validate_proposed_state()
except Exception as exc:
# unset the run state in case it's been set
validation_errors.append(exc)
if self.initial_state is not None:
initial_orm_state = db.TaskRunState(
task_run_id=self.run.id,
**self.initial_state.dict(shallow=True),
)
self.session.add(initial_orm_state)
self.run.set_state(initial_orm_state)
else:
self.run.set_state(None)
continue
return
logger.error(
f"Encountered errors during state validation: {validation_errors!r}"
)
self.proposed_state = None
reason = f"Error validating state: {validation_errors!r}"
self.response_status = SetStateStatus.ABORT
self.response_details = StateAbortDetails(reason=reason)
TaskOrchestrationContext.safe_copy
¶
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an OrchestrationContext
. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
self.safe_copy
can be used to pass information to orchestration rules
without risking mutation.
Note
self.run
is an ORM model, and even when copied is unsafe to mutate
Returns:
Type | Description |
---|---|
A mutation-safe copy of |
Source code in prefect/orion/orchestration/rules.py
def safe_copy(self):
"""
Creates a mostly-mutation-safe copy for use in orchestration rules.
Orchestration rules govern state transitions using information stored in
an `OrchestrationContext`. However, mutating objects stored on the context
directly can have unintended side-effects. To guard against this,
`self.safe_copy` can be used to pass information to orchestration rules
without risking mutation.
Note:
`self.run` is an ORM model, and even when copied is unsafe to mutate
Returns:
A mutation-safe copy of `TaskOrchestrationContext`
"""
return super().safe_copy()
BaseOrchestrationRule
¶
An abstract base class used to implement a discrete piece of orchestration logic.
An OrchestrationRule
is a stateful context manager that directly governs a state
transition. Complex orchestration is achieved by nesting multiple rules.
Each rule runs against an OrchestrationContext
that contains the transition
details; this context is then passed to subsequent rules. The context can be
modified by hooks that fire before and after a new state is validated and committed
to the database. These hooks will fire as long as the state transition is
considered "valid" and govern a transition by either modifying the proposed state
before it is validated or by producing a side-effect.
A state transition occurs whenever a flow- or task- run changes state, prompting
Orion to decide whether or not this transition can proceed. The current state of
the run is referred to as the "initial state", and the state a run is
attempting to transition into is the "proposed state". Together, the initial state
transitioning into the proposed state is the intended transition that is governed
by these orchestration rules. After using rules to enter a runtime context, the
OrchestrationContext
will contain a proposed state that has been governed by
each rule, and at that point can validate the proposed state and commit it to
the database. The validated state will be set on the context as
context.validated_state
, and rules will call the self.after_transition
hook
upon exiting the managed context.
Examples:
Create a rule
>>> class BasicRule(BaseOrchestrationRule):
>>> # allowed initial state types
>>> FROM_STATES = [StateType.RUNNING]
>>> # allowed proposed state types
>>> TO_STATES = [StateType.COMPLETED, StateType.FAILED]
>>>
>>> async def before_transition(initial_state, proposed_state, ctx):
>>> # side effects and proposed state mutation can happen here
>>> ...
>>>
>>> async def after_transition(initial_state, validated_state, ctx):
>>> # operations on states that have been validated can happen here
>>> ...
>>>
>>> async def cleanup(intitial_state, validated_state, ctx):
>>> # reverts side effects generated by `before_transition` if necessary
>>> ...
Use a rule
>>> intended_transition = (StateType.RUNNING, StateType.COMPLETED)
>>> async with BasicRule(context, *intended_transition):
>>> # context.proposed_state has been governed by BasicRule
>>> ...
Use multiple rules
>>> rules = [BasicRule, BasicRule]
>>> intended_transition = (StateType.RUNNING, StateType.COMPLETED)
>>> async with contextlib.AsyncExitStack() as stack:
>>> for rule in rules:
>>> stack.enter_async_context(rule(context, *intended_transition))
>>>
>>> # context.proposed_state has been governed by all rules
>>> ...
Attributes:
Name | Description |
---|---|
FROM_STATES |
list of valid initial state types this rule governs Iterable |
TO_STATES |
list of valid proposed state types this rule governs Iterable |
context |
the orchestration context |
from_state_type |
the state type a run is currently in |
to_state_type |
the proposed state type a run is transitioning into |
Parameters:
Name | Description | Default |
---|---|---|
context |
A OrchestrationContext |
required |
from_state_type |
The state type of the initial state of a run, if this
state type is not contained in Optional[prefect.orion.schemas.states.StateType] |
required |
to_state_type |
The state type of the proposed state before orchestration, if
this state type is not contained in Optional[prefect.orion.schemas.states.StateType] |
required |
Source code in prefect/orion/orchestration/rules.py
class BaseOrchestrationRule(contextlib.AbstractAsyncContextManager):
"""
An abstract base class used to implement a discrete piece of orchestration logic.
An `OrchestrationRule` is a stateful context manager that directly governs a state
transition. Complex orchestration is achieved by nesting multiple rules.
Each rule runs against an `OrchestrationContext` that contains the transition
details; this context is then passed to subsequent rules. The context can be
modified by hooks that fire before and after a new state is validated and committed
to the database. These hooks will fire as long as the state transition is
considered "valid" and govern a transition by either modifying the proposed state
before it is validated or by producing a side-effect.
A state transition occurs whenever a flow- or task- run changes state, prompting
Orion to decide whether or not this transition can proceed. The current state of
the run is referred to as the "initial state", and the state a run is
attempting to transition into is the "proposed state". Together, the initial state
transitioning into the proposed state is the intended transition that is governed
by these orchestration rules. After using rules to enter a runtime context, the
`OrchestrationContext` will contain a proposed state that has been governed by
each rule, and at that point can validate the proposed state and commit it to
the database. The validated state will be set on the context as
`context.validated_state`, and rules will call the `self.after_transition` hook
upon exiting the managed context.
Examples:
Create a rule
>>> class BasicRule(BaseOrchestrationRule):
>>> # allowed initial state types
>>> FROM_STATES = [StateType.RUNNING]
>>> # allowed proposed state types
>>> TO_STATES = [StateType.COMPLETED, StateType.FAILED]
>>>
>>> async def before_transition(initial_state, proposed_state, ctx):
>>> # side effects and proposed state mutation can happen here
>>> ...
>>>
>>> async def after_transition(initial_state, validated_state, ctx):
>>> # operations on states that have been validated can happen here
>>> ...
>>>
>>> async def cleanup(intitial_state, validated_state, ctx):
>>> # reverts side effects generated by `before_transition` if necessary
>>> ...
Use a rule
>>> intended_transition = (StateType.RUNNING, StateType.COMPLETED)
>>> async with BasicRule(context, *intended_transition):
>>> # context.proposed_state has been governed by BasicRule
>>> ...
Use multiple rules
>>> rules = [BasicRule, BasicRule]
>>> intended_transition = (StateType.RUNNING, StateType.COMPLETED)
>>> async with contextlib.AsyncExitStack() as stack:
>>> for rule in rules:
>>> stack.enter_async_context(rule(context, *intended_transition))
>>>
>>> # context.proposed_state has been governed by all rules
>>> ...
Attributes:
FROM_STATES: list of valid initial state types this rule governs
TO_STATES: list of valid proposed state types this rule governs
context: the orchestration context
from_state_type: the state type a run is currently in
to_state_type: the proposed state type a run is transitioning into
Args:
context: A `FlowOrchestrationContext` or `TaskOrchestrationContext` that is
passed between rules
from_state_type: The state type of the initial state of a run, if this
state type is not contained in `FROM_STATES`, no hooks will fire
to_state_type: The state type of the proposed state before orchestration, if
this state type is not contained in `TO_STATES`, no hooks will fire
"""
FROM_STATES: Iterable = []
TO_STATES: Iterable = []
def __init__(
self,
context: OrchestrationContext,
from_state_type: Optional[states.StateType],
to_state_type: Optional[states.StateType],
):
self.context = context
self.from_state_type = from_state_type
self.to_state_type = to_state_type
self._invalid_on_entry = None
async def __aenter__(self) -> OrchestrationContext:
"""
Enter an async runtime context governed by this rule.
The `with` statement will bind a governed `OrchestrationContext` to the target
specified by the `as` clause. If the transition proposed by the
`OrchestrationContext` is considered invalid on entry, entering this context
will do nothing. Otherwise, `self.before_transition` will fire.
"""
if await self.invalid():
pass
else:
entry_context = self.context.entry_context()
await self.before_transition(*entry_context)
self.context.rule_signature.append(str(self.__class__))
return self.context
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""
Exit the async runtime context governed by this rule.
One of three outcomes can happen upon exiting this rule's context depending on
the state of the rule. If the rule was found to be invalid on entry, nothing
happens. If the rule was valid on entry and continues to be valid on exit,
`self.after_transition` will fire. If the rule was valid on entry but invalid
on exit, the rule will "fizzle" and `self.cleanup` will fire in order to revert
any side-effects produced by `self.before_transition`.
"""
exit_context = self.context.exit_context()
if await self.invalid():
pass
elif await self.fizzled():
await self.cleanup(*exit_context)
else:
await self.after_transition(*exit_context)
self.context.finalization_signature.append(str(self.__class__))
async def before_transition(
self,
initial_state: Optional[states.State],
proposed_state: Optional[states.State],
context: OrchestrationContext,
) -> None:
"""
Implements a hook that can fire before a state is committed to the database.
This hook may produce side-effects or mutate the proposed state of a
transition using one of four methods: `self.reject_transition`,
`self.delay_transition`, `self.abort_transition`, and `self.rename_state`.
Note:
As currently implemented, the `before_transition` hook is not
perfectly isolated from mutating the transition. It is a standard instance
method that has access to `self`, and therefore `self.context`. This should
never be modified directly. Furthermore, `context.run` is an ORM model, and
mutating the run can also cause unintended writes to the database.
Args:
initial_state: The initial state of a transtion
proposed_state: The proposed state of a transition
context: A safe copy of the `OrchestrationContext`, with the exception of
`context.run`, mutating this context will have no effect on the broader
orchestration environment.
Returns:
None
"""
async def after_transition(
self,
initial_state: Optional[states.State],
validated_state: Optional[states.State],
context: OrchestrationContext,
) -> None:
"""
Implements a hook that can fire after a state is committed to the database.
Args:
initial_state: The initial state of a transtion
validated_state: The governed state that has been committed to the database
context: A safe copy of the `OrchestrationContext`, with the exception of
`context.run`, mutating this context will have no effect on the broader
orchestration environment.
Returns:
None
"""
async def cleanup(
self,
initial_state: Optional[states.State],
validated_state: Optional[states.State],
context: OrchestrationContext,
) -> None:
"""
Implements a hook that can fire after a state is committed to the database.
The intended use of this method is to revert side-effects produced by
`self.before_transition` when the transition is found to be invalid on exit.
This allows multiple rules to be gracefully run in sequence, without logic that
keeps track of all other rules that might govern a transition.
Args:
initial_state: The initial state of a transtion
validated_state: The governed state that has been committed to the database
context: A safe copy of the `OrchestrationContext`, with the exception of
`context.run`, mutating this context will have no effect on the broader
orchestration environment.
Returns:
None
"""
async def invalid(self) -> bool:
"""
Determines if a rule is invalid.
Invalid rules do nothing and no hooks fire upon entering or exiting a governed
context. Rules are invalid if the transition states types are not contained in
`self.FROM_STATES` and `self.TO_STATES`, or if the context is proposing
a transition that differs from the transition the rule was instantiated with.
Returns:
True if the rules in invalid, False otherwise.
"""
# invalid and fizzled states are mutually exclusive,
# `_invalid_on_entry` holds this statefulness
if self.from_state_type not in self.FROM_STATES:
self._invalid_on_entry = True
if self.to_state_type not in self.TO_STATES:
self._invalid_on_entry = True
if self._invalid_on_entry is None:
self._invalid_on_entry = await self.invalid_transition()
return self._invalid_on_entry
async def fizzled(self) -> bool:
"""
Determines if a rule is fizzled and side-effects need to be reverted.
Rules are fizzled if the transitions were valid on entry (thus firing
`self.before_transition`) but are invalid upon exiting the governed context,
most likely caused by another rule mutating the transition.
Returns:
True if the rule is fizzled, False otherwise.
"""
if self._invalid_on_entry:
return False
return await self.invalid_transition()
async def invalid_transition(self) -> bool:
"""
Determines if the transition proposed by the `OrchestrationContext` is invalid.
If the `OrchestrationContext` is attempting to manage a transition with this
rule that differs from the transition the rule was instantiated with, the
transition is considered to be invalid. Depending on the context, a rule with an
invalid transition is either "invalid" or "fizzled".
Returns:
True if the transition is invalid, False otherwise.
"""
initial_state_type = self.context.initial_state_type
proposed_state_type = self.context.proposed_state_type
return (self.from_state_type != initial_state_type) or (
self.to_state_type != proposed_state_type
)
async def reject_transition(self, state: states.State, reason: str):
"""
Rejects a proposed transition before the transition is validated.
This method will reject a proposed transition, mutating the proposed state to
the provided `state`. A reason for rejecting the transition is also passed on
to the `OrchestrationContext`. Rules that reject the transition will not fizzle,
despite the proposed state type changing.
Args:
state: The new proposed state
reason: The reason for rejecting the transition
"""
# don't run if the transition is already validated
if self.context.validated_state:
raise RuntimeError("The transition is already validated")
# a rule that mutates state should not fizzle itself
self.to_state_type = state.type
self.context.proposed_state = state
self.context.response_status = SetStateStatus.REJECT
self.context.response_details = StateRejectDetails(reason=reason)
async def delay_transition(
self,
delay_seconds: int,
reason: str,
):
"""
Delays a proposed transition before the transition is validated.
This method will delay a proposed transition, setting the proposed state to
`None`, signaling to the `OrchestrationContext` that no state should be
written to the database. The number of seconds a transition should be delayed is
passed to the `OrchestrationContext`. A reason for delaying the transition is
also provided. Rules that delay the transition will not fizzle, despite the
proposed state type changing.
Args:
delay_seconds: The number of seconds the transition should be delayed
reason: The reason for delaying the transition
"""
# don't run if the transition is already validated
if self.context.validated_state:
raise RuntimeError("The transition is already validated")
# a rule that mutates state should not fizzle itself
self.to_state_type = None
self.context.proposed_state = None
self.context.response_status = SetStateStatus.WAIT
self.context.response_details = StateWaitDetails(
delay_seconds=delay_seconds, reason=reason
)
async def abort_transition(self, reason: str):
"""
Aborts a proposed transition before the transition is validated.
This method will abort a proposed transition, expecting no further action to
occur for this run. The proposed state is set to `None`, signaling to the
`OrchestrationContext` that no state should be written to the database. A
reason for aborting the transition is also provided. Rules that abort the
transition will not fizzle, despite the proposed state type changing. Rules that
abort the transition will not fizzle, despite the proposed state type changing.
Args:
reason: The reason for aborting the transition
"""
# don't run if the transition is already validated
if self.context.validated_state:
raise RuntimeError("The transition is already validated")
# a rule that mutates state should not fizzle itself
self.to_state_type = None
self.context.proposed_state = None
self.context.response_status = SetStateStatus.ABORT
self.context.response_details = StateAbortDetails(reason=reason)
async def rename_state(self, state_name):
"""
Sets the "name" attribute on a proposed state.
The name of a state is an annotation intended to provide rich, human-readable
context for how a run is progressing. This method only updates the name and not
the canonical state TYPE, and will not fizzle or invalidate any other rules
that might govern this state transition.
"""
self.context.proposed_state.name = state_name
BaseOrchestrationRule.before_transition
async
¶
Implements a hook that can fire before a state is committed to the database.
This hook may produce side-effects or mutate the proposed state of a
transition using one of four methods: self.reject_transition
,
self.delay_transition
, self.abort_transition
, and self.rename_state
.
Note
As currently implemented, the before_transition
hook is not
perfectly isolated from mutating the transition. It is a standard instance
method that has access to self
, and therefore self.context
. This should
never be modified directly. Furthermore, context.run
is an ORM model, and
mutating the run can also cause unintended writes to the database.
Parameters:
Name | Description | Default |
---|---|---|
initial_state |
The initial state of a transtion Optional[prefect.orion.schemas.states.State] |
required |
proposed_state |
The proposed state of a transition Optional[prefect.orion.schemas.states.State] |
required |
context |
A safe copy of the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/rules.py
async def before_transition(
self,
initial_state: Optional[states.State],
proposed_state: Optional[states.State],
context: OrchestrationContext,
) -> None:
"""
Implements a hook that can fire before a state is committed to the database.
This hook may produce side-effects or mutate the proposed state of a
transition using one of four methods: `self.reject_transition`,
`self.delay_transition`, `self.abort_transition`, and `self.rename_state`.
Note:
As currently implemented, the `before_transition` hook is not
perfectly isolated from mutating the transition. It is a standard instance
method that has access to `self`, and therefore `self.context`. This should
never be modified directly. Furthermore, `context.run` is an ORM model, and
mutating the run can also cause unintended writes to the database.
Args:
initial_state: The initial state of a transtion
proposed_state: The proposed state of a transition
context: A safe copy of the `OrchestrationContext`, with the exception of
`context.run`, mutating this context will have no effect on the broader
orchestration environment.
Returns:
None
"""
BaseOrchestrationRule.after_transition
async
¶
Implements a hook that can fire after a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
initial_state |
The initial state of a transtion Optional[prefect.orion.schemas.states.State] |
required |
validated_state |
The governed state that has been committed to the database Optional[prefect.orion.schemas.states.State] |
required |
context |
A safe copy of the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/rules.py
async def after_transition(
self,
initial_state: Optional[states.State],
validated_state: Optional[states.State],
context: OrchestrationContext,
) -> None:
"""
Implements a hook that can fire after a state is committed to the database.
Args:
initial_state: The initial state of a transtion
validated_state: The governed state that has been committed to the database
context: A safe copy of the `OrchestrationContext`, with the exception of
`context.run`, mutating this context will have no effect on the broader
orchestration environment.
Returns:
None
"""
BaseOrchestrationRule.cleanup
async
¶
Implements a hook that can fire after a state is committed to the database.
The intended use of this method is to revert side-effects produced by
self.before_transition
when the transition is found to be invalid on exit.
This allows multiple rules to be gracefully run in sequence, without logic that
keeps track of all other rules that might govern a transition.
Parameters:
Name | Description | Default |
---|---|---|
initial_state |
The initial state of a transtion Optional[prefect.orion.schemas.states.State] |
required |
validated_state |
The governed state that has been committed to the database Optional[prefect.orion.schemas.states.State] |
required |
context |
A safe copy of the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/rules.py
async def cleanup(
self,
initial_state: Optional[states.State],
validated_state: Optional[states.State],
context: OrchestrationContext,
) -> None:
"""
Implements a hook that can fire after a state is committed to the database.
The intended use of this method is to revert side-effects produced by
`self.before_transition` when the transition is found to be invalid on exit.
This allows multiple rules to be gracefully run in sequence, without logic that
keeps track of all other rules that might govern a transition.
Args:
initial_state: The initial state of a transtion
validated_state: The governed state that has been committed to the database
context: A safe copy of the `OrchestrationContext`, with the exception of
`context.run`, mutating this context will have no effect on the broader
orchestration environment.
Returns:
None
"""
BaseOrchestrationRule.invalid
async
¶
Determines if a rule is invalid.
Invalid rules do nothing and no hooks fire upon entering or exiting a governed
context. Rules are invalid if the transition states types are not contained in
self.FROM_STATES
and self.TO_STATES
, or if the context is proposing
a transition that differs from the transition the rule was instantiated with.
Returns:
Type | Description |
---|---|
bool |
True if the rules in invalid, False otherwise. |
Source code in prefect/orion/orchestration/rules.py
async def invalid(self) -> bool:
"""
Determines if a rule is invalid.
Invalid rules do nothing and no hooks fire upon entering or exiting a governed
context. Rules are invalid if the transition states types are not contained in
`self.FROM_STATES` and `self.TO_STATES`, or if the context is proposing
a transition that differs from the transition the rule was instantiated with.
Returns:
True if the rules in invalid, False otherwise.
"""
# invalid and fizzled states are mutually exclusive,
# `_invalid_on_entry` holds this statefulness
if self.from_state_type not in self.FROM_STATES:
self._invalid_on_entry = True
if self.to_state_type not in self.TO_STATES:
self._invalid_on_entry = True
if self._invalid_on_entry is None:
self._invalid_on_entry = await self.invalid_transition()
return self._invalid_on_entry
BaseOrchestrationRule.fizzled
async
¶
Determines if a rule is fizzled and side-effects need to be reverted.
Rules are fizzled if the transitions were valid on entry (thus firing
self.before_transition
) but are invalid upon exiting the governed context,
most likely caused by another rule mutating the transition.
Returns:
Type | Description |
---|---|
bool |
True if the rule is fizzled, False otherwise. |
Source code in prefect/orion/orchestration/rules.py
async def fizzled(self) -> bool:
"""
Determines if a rule is fizzled and side-effects need to be reverted.
Rules are fizzled if the transitions were valid on entry (thus firing
`self.before_transition`) but are invalid upon exiting the governed context,
most likely caused by another rule mutating the transition.
Returns:
True if the rule is fizzled, False otherwise.
"""
if self._invalid_on_entry:
return False
return await self.invalid_transition()
BaseOrchestrationRule.invalid_transition
async
¶
Determines if the transition proposed by the OrchestrationContext
is invalid.
If the OrchestrationContext
is attempting to manage a transition with this
rule that differs from the transition the rule was instantiated with, the
transition is considered to be invalid. Depending on the context, a rule with an
invalid transition is either "invalid" or "fizzled".
Returns:
Type | Description |
---|---|
bool |
True if the transition is invalid, False otherwise. |
Source code in prefect/orion/orchestration/rules.py
async def invalid_transition(self) -> bool:
"""
Determines if the transition proposed by the `OrchestrationContext` is invalid.
If the `OrchestrationContext` is attempting to manage a transition with this
rule that differs from the transition the rule was instantiated with, the
transition is considered to be invalid. Depending on the context, a rule with an
invalid transition is either "invalid" or "fizzled".
Returns:
True if the transition is invalid, False otherwise.
"""
initial_state_type = self.context.initial_state_type
proposed_state_type = self.context.proposed_state_type
return (self.from_state_type != initial_state_type) or (
self.to_state_type != proposed_state_type
)
BaseOrchestrationRule.reject_transition
async
¶
Rejects a proposed transition before the transition is validated.
This method will reject a proposed transition, mutating the proposed state to
the provided state
. A reason for rejecting the transition is also passed on
to the OrchestrationContext
. Rules that reject the transition will not fizzle,
despite the proposed state type changing.
Parameters:
Name | Description | Default |
---|---|---|
state |
The new proposed state State |
required |
reason |
The reason for rejecting the transition str |
required |
Source code in prefect/orion/orchestration/rules.py
async def reject_transition(self, state: states.State, reason: str):
"""
Rejects a proposed transition before the transition is validated.
This method will reject a proposed transition, mutating the proposed state to
the provided `state`. A reason for rejecting the transition is also passed on
to the `OrchestrationContext`. Rules that reject the transition will not fizzle,
despite the proposed state type changing.
Args:
state: The new proposed state
reason: The reason for rejecting the transition
"""
# don't run if the transition is already validated
if self.context.validated_state:
raise RuntimeError("The transition is already validated")
# a rule that mutates state should not fizzle itself
self.to_state_type = state.type
self.context.proposed_state = state
self.context.response_status = SetStateStatus.REJECT
self.context.response_details = StateRejectDetails(reason=reason)
BaseOrchestrationRule.delay_transition
async
¶
Delays a proposed transition before the transition is validated.
This method will delay a proposed transition, setting the proposed state to
None
, signaling to the OrchestrationContext
that no state should be
written to the database. The number of seconds a transition should be delayed is
passed to the OrchestrationContext
. A reason for delaying the transition is
also provided. Rules that delay the transition will not fizzle, despite the
proposed state type changing.
Parameters:
Name | Description | Default |
---|---|---|
delay_seconds |
The number of seconds the transition should be delayed int |
required |
reason |
The reason for delaying the transition str |
required |
Source code in prefect/orion/orchestration/rules.py
async def delay_transition(
self,
delay_seconds: int,
reason: str,
):
"""
Delays a proposed transition before the transition is validated.
This method will delay a proposed transition, setting the proposed state to
`None`, signaling to the `OrchestrationContext` that no state should be
written to the database. The number of seconds a transition should be delayed is
passed to the `OrchestrationContext`. A reason for delaying the transition is
also provided. Rules that delay the transition will not fizzle, despite the
proposed state type changing.
Args:
delay_seconds: The number of seconds the transition should be delayed
reason: The reason for delaying the transition
"""
# don't run if the transition is already validated
if self.context.validated_state:
raise RuntimeError("The transition is already validated")
# a rule that mutates state should not fizzle itself
self.to_state_type = None
self.context.proposed_state = None
self.context.response_status = SetStateStatus.WAIT
self.context.response_details = StateWaitDetails(
delay_seconds=delay_seconds, reason=reason
)
BaseOrchestrationRule.abort_transition
async
¶
Aborts a proposed transition before the transition is validated.
This method will abort a proposed transition, expecting no further action to
occur for this run. The proposed state is set to None
, signaling to the
OrchestrationContext
that no state should be written to the database. A
reason for aborting the transition is also provided. Rules that abort the
transition will not fizzle, despite the proposed state type changing. Rules that
abort the transition will not fizzle, despite the proposed state type changing.
Parameters:
Name | Description | Default |
---|---|---|
reason |
The reason for aborting the transition str |
required |
Source code in prefect/orion/orchestration/rules.py
async def abort_transition(self, reason: str):
"""
Aborts a proposed transition before the transition is validated.
This method will abort a proposed transition, expecting no further action to
occur for this run. The proposed state is set to `None`, signaling to the
`OrchestrationContext` that no state should be written to the database. A
reason for aborting the transition is also provided. Rules that abort the
transition will not fizzle, despite the proposed state type changing. Rules that
abort the transition will not fizzle, despite the proposed state type changing.
Args:
reason: The reason for aborting the transition
"""
# don't run if the transition is already validated
if self.context.validated_state:
raise RuntimeError("The transition is already validated")
# a rule that mutates state should not fizzle itself
self.to_state_type = None
self.context.proposed_state = None
self.context.response_status = SetStateStatus.ABORT
self.context.response_details = StateAbortDetails(reason=reason)
BaseOrchestrationRule.rename_state
async
¶
Sets the "name" attribute on a proposed state.
The name of a state is an annotation intended to provide rich, human-readable context for how a run is progressing. This method only updates the name and not the canonical state TYPE, and will not fizzle or invalidate any other rules that might govern this state transition.
Source code in prefect/orion/orchestration/rules.py
async def rename_state(self, state_name):
"""
Sets the "name" attribute on a proposed state.
The name of a state is an annotation intended to provide rich, human-readable
context for how a run is progressing. This method only updates the name and not
the canonical state TYPE, and will not fizzle or invalidate any other rules
that might govern this state transition.
"""
self.context.proposed_state.name = state_name
BaseUniversalTransform
¶
An abstract base class used to implement privileged bookkeeping logic.
Warning
In almost all cases, use the BaseOrchestrationRule
base class instead.
Beyond the orchestration rules implemented with the BaseOrchestrationRule
ABC,
Universal transforms are not stateful, and fire their before- and after- transition
hooks on every state transition unless the proposed state is None
, indicating that
no state should be written to the database. Because there are no guardrails in place
to prevent directly mutating state or other parts of the orchestration context,
universal transforms should only be used with care.
Attributes:
Name | Description |
---|---|
FROM_STATES |
for compatibility with Iterable |
TO_STATES |
for compatibility with Iterable |
context |
the orchestration context |
Parameters:
Name | Description | Default |
---|---|---|
context |
A OrchestrationContext |
required |
Source code in prefect/orion/orchestration/rules.py
class BaseUniversalTransform(contextlib.AbstractAsyncContextManager):
"""
An abstract base class used to implement privileged bookkeeping logic.
Warning:
In almost all cases, use the `BaseOrchestrationRule` base class instead.
Beyond the orchestration rules implemented with the `BaseOrchestrationRule` ABC,
Universal transforms are not stateful, and fire their before- and after- transition
hooks on every state transition unless the proposed state is `None`, indicating that
no state should be written to the database. Because there are no guardrails in place
to prevent directly mutating state or other parts of the orchestration context,
universal transforms should only be used with care.
Attributes:
FROM_STATES: for compatibility with `BaseOrchestrationPolicy`
TO_STATES: for compatibility with `BaseOrchestrationPolicy`
context: the orchestration context
Args:
context: A `FlowOrchestrationContext` or `TaskOrchestrationContext` that is
passed between transforms
"""
# `BaseUniversalTransform` will always fire on non-null transitions
FROM_STATES: Iterable = ALL_ORCHESTRATION_STATES
TO_STATES: Iterable = ALL_ORCHESTRATION_STATES
def __init__(
self,
context: OrchestrationContext,
):
self.context = context
async def __aenter__(self):
"""
Enter an async runtime context governed by this transform.
The `with` statement will bind a governed `OrchestrationContext` to the target
specified by the `as` clause. If the transition proposed by the
`OrchestrationContext` has been nullified on entry and `context.proposed_state`
is `None`, entering this context will do nothing. Otherwise
`self.before_transition` will fire.
"""
if not self.nullified_transition():
await self.before_transition(self.context)
self.context.rule_signature.append(str(self.__class__))
return self.context
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""
Exit the async runtime context governed by this transform.
If the transition has been nullified upon exiting this transforms's context,
nothing happens. Otherwise, `self.after_transition` will fire on every non-null
proposed state.
"""
if not self.nullified_transition():
await self.after_transition(self.context)
self.context.finalization_signature.append(str(self.__class__))
async def before_transition(self, context) -> None:
"""
Implements a hook that fires before a state is committed to the database.
Args:
context: the `OrchestrationContext` that contains transition details
Returns:
None
"""
async def after_transition(self, context) -> None:
"""
Implements a hook that can fire after a state is committed to the database.
Args:
context: the `OrchestrationContext` that contains transition details
Returns:
None
"""
def nullified_transition(self) -> bool:
"""
Determines if the transition has been nullified.
Transitions are nullified if the proposed state is `None`, indicating that
nothing should be written to the database.
Returns:
True if the transition is nullified, False otherwise.
"""
return self.context.proposed_state is None
BaseUniversalTransform.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/rules.py
async def before_transition(self, context) -> None:
"""
Implements a hook that fires before a state is committed to the database.
Args:
context: the `OrchestrationContext` that contains transition details
Returns:
None
"""
BaseUniversalTransform.after_transition
async
¶
Implements a hook that can fire after a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/rules.py
async def after_transition(self, context) -> None:
"""
Implements a hook that can fire after a state is committed to the database.
Args:
context: the `OrchestrationContext` that contains transition details
Returns:
None
"""
BaseUniversalTransform.nullified_transition
¶
Determines if the transition has been nullified.
Transitions are nullified if the proposed state is None
, indicating that
nothing should be written to the database.
Returns:
Type | Description |
---|---|
bool |
True if the transition is nullified, False otherwise. |
Source code in prefect/orion/orchestration/rules.py
def nullified_transition(self) -> bool:
"""
Determines if the transition has been nullified.
Transitions are nullified if the proposed state is `None`, indicating that
nothing should be written to the database.
Returns:
True if the transition is nullified, False otherwise.
"""
return self.context.proposed_state is None