prefect.orion.orchestration.global_policy
¶
Bookkeeping logic that fires on every state transition.
For clarity, GlobalFlowpolicy
and GlobalTaskPolicy
contain all transition logic
implemented using BaseUniversalTransform
.
None of these operations modify state, and regardless of what orchestration Orion might
enforce on a transtition, the global policies contain Orion's necessary bookkeeping.
Because these transforms record information about the validated state committed to the
state database, they should be the most deeply nested contexts in orchestration loop.
GlobalFlowPolicy
¶
Global transforms that run against flow-run-state transitions in priority order.
These transforms are intended to run immediately before and after a state transition is validated.
Source code in prefect/orion/orchestration/global_policy.py
class GlobalFlowPolicy(BaseOrchestrationPolicy):
"""
Global transforms that run against flow-run-state transitions in priority order.
These transforms are intended to run immediately before and after a state transition
is validated.
"""
def priority():
return COMMON_GLOBAL_TRANSFORMS() + [
UpdateSubflowParentTask,
UpdateSubflowStateDetails,
]
GlobalFlowPolicy.priority
¶
A list of orchestration rules in priority order.
Source code in prefect/orion/orchestration/global_policy.py
def priority():
return COMMON_GLOBAL_TRANSFORMS() + [
UpdateSubflowParentTask,
UpdateSubflowStateDetails,
]
GlobalTaskPolicy
¶
Global transforms that run against task-run-state transitions in priority order.
These transforms are intended to run immediately before and after a state transition is validated.
Source code in prefect/orion/orchestration/global_policy.py
class GlobalTaskPolicy(BaseOrchestrationPolicy):
"""
Global transforms that run against task-run-state transitions in priority order.
These transforms are intended to run immediately before and after a state transition
is validated.
"""
def priority():
return COMMON_GLOBAL_TRANSFORMS()
GlobalTaskPolicy.priority
¶
A list of orchestration rules in priority order.
Source code in prefect/orion/orchestration/global_policy.py
def priority():
return COMMON_GLOBAL_TRANSFORMS()
SetRunStateType
¶
Updates the state type of a run on a state transition.
Source code in prefect/orion/orchestration/global_policy.py
class SetRunStateType(BaseUniversalTransform):
"""
Updates the state type of a run on a state transition.
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# record the new state's type
context.run.state_type = context.proposed_state.type
SetRunStateType.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# record the new state's type
context.run.state_type = context.proposed_state.type
SetRunStateName
¶
Updates the state name of a run on a state transition.
Source code in prefect/orion/orchestration/global_policy.py
class SetRunStateName(BaseUniversalTransform):
"""
Updates the state name of a run on a state transition.
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# record the new state's name
context.run.state_name = context.proposed_state.name
SetRunStateName.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# record the new state's name
context.run.state_name = context.proposed_state.name
SetStartTime
¶
Records the time a run enters a running state for the first time.
Source code in prefect/orion/orchestration/global_policy.py
class SetStartTime(BaseUniversalTransform):
"""
Records the time a run enters a running state for the first time.
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# if entering a running state and no start time is set...
if context.proposed_state.is_running() and context.run.start_time is None:
# set the start time
context.run.start_time = context.proposed_state.timestamp
SetStartTime.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# if entering a running state and no start time is set...
if context.proposed_state.is_running() and context.run.start_time is None:
# set the start time
context.run.start_time = context.proposed_state.timestamp
SetEndTime
¶
Records the time a run enters a terminal state.
With normal client usage, a run will not transition out of a terminal state. However, it's possible to force these transitions manually via the API. While leaving a terminal state, the end time will be unset.
Source code in prefect/orion/orchestration/global_policy.py
class SetEndTime(BaseUniversalTransform):
"""
Records the time a run enters a terminal state.
With normal client usage, a run will not transition out of a terminal state.
However, it's possible to force these transitions manually via the API. While
leaving a terminal state, the end time will be unset.
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# if exiting a final state for a non-final state...
if (
context.initial_state
and context.initial_state.is_final()
and not context.proposed_state.is_final()
):
# clear the end time
context.run.end_time = None
# if entering a final state...
if context.proposed_state.is_final():
# if the run has a start time and no end time, give it one
if context.run.start_time and not context.run.end_time:
context.run.end_time = context.proposed_state.timestamp
SetEndTime.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# if exiting a final state for a non-final state...
if (
context.initial_state
and context.initial_state.is_final()
and not context.proposed_state.is_final()
):
# clear the end time
context.run.end_time = None
# if entering a final state...
if context.proposed_state.is_final():
# if the run has a start time and no end time, give it one
if context.run.start_time and not context.run.end_time:
context.run.end_time = context.proposed_state.timestamp
IncrementRunTime
¶
Records the amount of time a run spends in the running state.
Source code in prefect/orion/orchestration/global_policy.py
class IncrementRunTime(BaseUniversalTransform):
"""
Records the amount of time a run spends in the running state.
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# if exiting a running state...
if context.initial_state and context.initial_state.is_running():
# increment the run time by the time spent in the previous state
context.run.total_run_time += (
context.proposed_state.timestamp - context.initial_state.timestamp
)
IncrementRunTime.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# if exiting a running state...
if context.initial_state and context.initial_state.is_running():
# increment the run time by the time spent in the previous state
context.run.total_run_time += (
context.proposed_state.timestamp - context.initial_state.timestamp
)
IncrementRunCount
¶
Records the number of times a run enters a running state. For use with retries.
Source code in prefect/orion/orchestration/global_policy.py
class IncrementRunCount(BaseUniversalTransform):
"""
Records the number of times a run enters a running state. For use with retries.
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# if entering a running state...
if context.proposed_state.is_running():
# increment the run count
context.run.run_count += 1
IncrementRunCount.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# if entering a running state...
if context.proposed_state.is_running():
# increment the run count
context.run.run_count += 1
SetExpectedStartTime
¶
Estimates the time a state is expected to start running if not set.
For scheduled states, this estimate is simply the scheduled time. For other states, this is set to the time the proposed state was created by Orion.
Source code in prefect/orion/orchestration/global_policy.py
class SetExpectedStartTime(BaseUniversalTransform):
"""
Estimates the time a state is expected to start running if not set.
For scheduled states, this estimate is simply the scheduled time. For other states,
this is set to the time the proposed state was created by Orion.
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# set expected start time if this is the first state
if not context.run.expected_start_time:
if context.proposed_state.is_scheduled():
context.run.expected_start_time = (
context.proposed_state.state_details.scheduled_time
)
else:
context.run.expected_start_time = context.proposed_state.timestamp
SetExpectedStartTime.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# set expected start time if this is the first state
if not context.run.expected_start_time:
if context.proposed_state.is_scheduled():
context.run.expected_start_time = (
context.proposed_state.state_details.scheduled_time
)
else:
context.run.expected_start_time = context.proposed_state.timestamp
SetNextScheduledStartTime
¶
Records the scheduled time on a run.
When a run enters a scheduled state, run.next_scheduled_start_time
is set to
the state's scheduled time. When leaving a scheduled state,
run.next_scheduled_start_time
is unset.
Source code in prefect/orion/orchestration/global_policy.py
class SetNextScheduledStartTime(BaseUniversalTransform):
"""
Records the scheduled time on a run.
When a run enters a scheduled state, `run.next_scheduled_start_time` is set to
the state's scheduled time. When leaving a scheduled state,
`run.next_scheduled_start_time` is unset.
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# remove the next scheduled start time if exiting a scheduled state
if context.initial_state and context.initial_state.is_scheduled():
context.run.next_scheduled_start_time = None
# set next scheduled start time if entering a scheduled state
if context.proposed_state.is_scheduled():
context.run.next_scheduled_start_time = (
context.proposed_state.state_details.scheduled_time
)
SetNextScheduledStartTime.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# remove the next scheduled start time if exiting a scheduled state
if context.initial_state and context.initial_state.is_scheduled():
context.run.next_scheduled_start_time = None
# set next scheduled start time if entering a scheduled state
if context.proposed_state.is_scheduled():
context.run.next_scheduled_start_time = (
context.proposed_state.state_details.scheduled_time
)
UpdateSubflowParentTask
¶
Whenever a subflow changes state, it must update its parent task run's state.
Source code in prefect/orion/orchestration/global_policy.py
class UpdateSubflowParentTask(BaseUniversalTransform):
"""
Whenever a subflow changes state, it must update its parent task run's state.
"""
async def after_transition(self, context: OrchestrationContext) -> None:
# only applies to flow runs with a parent task run id
if context.run.parent_task_run_id is not None:
# avoid mutation of the flow run state
subflow_parent_task_state = context.validated_state.copy(
reset_fields=True,
include={
"type",
"timestamp",
"name",
"message",
"state_details",
"data",
},
)
# set the task's "child flow run id" to be the subflow run id
subflow_parent_task_state.state_details.child_flow_run_id = context.run.id
await models.task_runs.set_task_run_state(
session=context.session,
task_run_id=context.run.parent_task_run_id,
state=subflow_parent_task_state,
force=True,
)
UpdateSubflowParentTask.after_transition
async
¶
Implements a hook that can fire after a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def after_transition(self, context: OrchestrationContext) -> None:
# only applies to flow runs with a parent task run id
if context.run.parent_task_run_id is not None:
# avoid mutation of the flow run state
subflow_parent_task_state = context.validated_state.copy(
reset_fields=True,
include={
"type",
"timestamp",
"name",
"message",
"state_details",
"data",
},
)
# set the task's "child flow run id" to be the subflow run id
subflow_parent_task_state.state_details.child_flow_run_id = context.run.id
await models.task_runs.set_task_run_state(
session=context.session,
task_run_id=context.run.parent_task_run_id,
state=subflow_parent_task_state,
force=True,
)
UpdateSubflowStateDetails
¶
Update a child subflow state's references to a corresponding tracking task run id in the parent flow run
Source code in prefect/orion/orchestration/global_policy.py
class UpdateSubflowStateDetails(BaseUniversalTransform):
"""
Update a child subflow state's references to a corresponding tracking task run id
in the parent flow run
"""
async def before_transition(self, context: OrchestrationContext) -> None:
# only applies to flow runs with a parent task run id
if context.run.parent_task_run_id is not None:
context.proposed_state.state_details.task_run_id = (
context.run.parent_task_run_id
)
UpdateSubflowStateDetails.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(self, context: OrchestrationContext) -> None:
# only applies to flow runs with a parent task run id
if context.run.parent_task_run_id is not None:
context.proposed_state.state_details.task_run_id = (
context.run.parent_task_run_id
)
UpdateStateDetails
¶
Update a state's references to a corresponding flow- or task- run.
Source code in prefect/orion/orchestration/global_policy.py
class UpdateStateDetails(BaseUniversalTransform):
"""
Update a state's references to a corresponding flow- or task- run.
"""
async def before_transition(
self,
context: OrchestrationContext,
) -> None:
if isinstance(context, FlowOrchestrationContext):
flow_run = await context.flow_run()
context.proposed_state.state_details.flow_run_id = flow_run.id
elif isinstance(context, TaskOrchestrationContext):
task_run = await context.task_run()
context.proposed_state.state_details.flow_run_id = task_run.flow_run_id
context.proposed_state.state_details.task_run_id = task_run.id
UpdateStateDetails.before_transition
async
¶
Implements a hook that fires before a state is committed to the database.
Parameters:
Name | Description | Default |
---|---|---|
context |
the OrchestrationContext |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in prefect/orion/orchestration/global_policy.py
async def before_transition(
self,
context: OrchestrationContext,
) -> None:
if isinstance(context, FlowOrchestrationContext):
flow_run = await context.flow_run()
context.proposed_state.state_details.flow_run_id = flow_run.id
elif isinstance(context, TaskOrchestrationContext):
task_run = await context.task_run()
context.proposed_state.state_details.flow_run_id = task_run.flow_run_id
context.proposed_state.state_details.task_run_id = task_run.id