Skip to content

prefect.orion.orchestration.core_policy

Orchestration logic that fires on state transitions.

CoreFlowPolicy and CoreTaskPolicy contain all default orchestration rules that Orion enforces on a state transition.

CoreFlowPolicy

Orchestration rules that run against flow-run-state transitions in priority order.

Source code in prefect/orion/orchestration/core_policy.py
class CoreFlowPolicy(BaseOrchestrationPolicy):
    """
    Orchestration rules that run against flow-run-state transitions in priority order.
    """

    def priority():
        return [
            PreventTransitionsFromTerminalStates,
            WaitForScheduledTime,
            RetryFailedFlows,
        ]

CoreFlowPolicy.priority

A list of orchestration rules in priority order.

Source code in prefect/orion/orchestration/core_policy.py
def priority():
    return [
        PreventTransitionsFromTerminalStates,
        WaitForScheduledTime,
        RetryFailedFlows,
    ]

CoreTaskPolicy

Orchestration rules that run against task-run-state transitions in priority order.

Source code in prefect/orion/orchestration/core_policy.py
class CoreTaskPolicy(BaseOrchestrationPolicy):
    """
    Orchestration rules that run against task-run-state transitions in priority order.
    """

    def priority():
        return [
            CacheRetrieval,
            SecureTaskConcurrencySlots,  # retrieve cached states even if slots are full
            PreventTransitionsFromTerminalStates,
            WaitForScheduledTime,
            RetryFailedTasks,
            RenameReruns,
            CacheInsertion,
            ReleaseTaskConcurrencySlots,
        ]

CoreTaskPolicy.priority

A list of orchestration rules in priority order.

Source code in prefect/orion/orchestration/core_policy.py
def priority():
    return [
        CacheRetrieval,
        SecureTaskConcurrencySlots,  # retrieve cached states even if slots are full
        PreventTransitionsFromTerminalStates,
        WaitForScheduledTime,
        RetryFailedTasks,
        RenameReruns,
        CacheInsertion,
        ReleaseTaskConcurrencySlots,
    ]

MinimalFlowPolicy

Source code in prefect/orion/orchestration/core_policy.py
class MinimalFlowPolicy(BaseOrchestrationPolicy):
    def priority():
        return []

MinimalFlowPolicy.priority

A list of orchestration rules in priority order.

Source code in prefect/orion/orchestration/core_policy.py
def priority():
    return []

MinimalTaskPolicy

Source code in prefect/orion/orchestration/core_policy.py
class MinimalTaskPolicy(BaseOrchestrationPolicy):
    def priority():
        return [
            ReleaseTaskConcurrencySlots,  # always release concurrency slots
        ]

MinimalTaskPolicy.priority

A list of orchestration rules in priority order.

Source code in prefect/orion/orchestration/core_policy.py
def priority():
    return [
        ReleaseTaskConcurrencySlots,  # always release concurrency slots
    ]

SecureTaskConcurrencySlots

Checks relevant concurrency slots are available before entering a Running state.

This rule checks if concurrency limits have been set on the tags associated with a TaskRun. If so, a concurrency slot will be secured against each concurrency limit before being allowed to transition into a running state. If a concurrency limit has been reached, the client will be instructed to delay the transition for 30 seconds before trying again. If the concurrency limit set on a tag is 0, the transition will be aborted to prevent deadlocks.

Source code in prefect/orion/orchestration/core_policy.py
class SecureTaskConcurrencySlots(BaseOrchestrationRule):
    """
    Checks relevant concurrency slots are available before entering a Running state.

    This rule checks if concurrency limits have been set on the tags associated with a
    TaskRun. If so, a concurrency slot will be secured against each concurrency limit
    before being allowed to transition into a running state. If a concurrency limit has
    been reached, the client will be instructed to delay the transition for 30 seconds
    before trying again. If the concurrency limit set on a tag is 0, the transition will
    be aborted to prevent deadlocks.
    """

    FROM_STATES = ALL_ORCHESTRATION_STATES
    TO_STATES = [states.StateType.RUNNING]

    async def before_transition(
        self,
        initial_state: Optional[states.State],
        validated_state: Optional[states.State],
        context: TaskOrchestrationContext,
    ) -> None:

        self._applied_limits = []
        filtered_limits = (
            await concurrency_limits.filter_concurrency_limits_for_orchestration(
                context.session, tags=context.run.tags
            )
        )
        run_limits = {limit.tag: limit for limit in filtered_limits}
        for tag, cl in run_limits.items():

            limit = cl.concurrency_limit
            if limit == 0:
                # limits of 0 will deadlock, and the transition needs to abort
                for stale_tag in self._applied_limits:
                    stale_limit = run_limits.get(stale_tag, None)
                    active_slots = set(stale_limit.active_slots)
                    active_slots.discard(str(context.run.id))
                    stale_limit.active_slots = list(active_slots)

                await self.abort_transition(
                    reason=f'The concurrency limit on tag "{tag}" is 0 and will deadlock if the task tries to run again.',
                )
            elif len(cl.active_slots) >= limit:
                # if the limit has already been reached, delay the transition
                for stale_tag in self._applied_limits:
                    stale_limit = run_limits.get(stale_tag, None)
                    active_slots = set(stale_limit.active_slots)
                    active_slots.discard(str(context.run.id))
                    stale_limit.active_slots = list(active_slots)

                await self.delay_transition(
                    30,
                    f"Concurrency limit for the {tag} tag has been reached",
                )
            else:
                # log the TaskRun ID to active_slots
                self._applied_limits.append(tag)
                active_slots = set(cl.active_slots)
                active_slots.add(str(context.run.id))
                cl.active_slots = list(active_slots)

    async def cleanup(
        self,
        initial_state: Optional[states.State],
        validated_state: Optional[states.State],
        context: OrchestrationContext,
    ) -> None:
        for tag in self._applied_limits:
            cl = await concurrency_limits.read_concurrency_limit_by_tag(
                context.session, tag
            )
            active_slots = set(cl.active_slots)
            active_slots.discard(str(context.run.id))
            cl.active_slots = list(active_slots)

SecureTaskConcurrencySlots.before_transition async

Implements a hook that can fire before a state is committed to the database.

This hook may produce side-effects or mutate the proposed state of a transition using one of four methods: self.reject_transition, self.delay_transition, self.abort_transition, and self.rename_state.

Note

As currently implemented, the before_transition hook is not perfectly isolated from mutating the transition. It is a standard instance method that has access to self, and therefore self.context. This should never be modified directly. Furthermore, context.run is an ORM model, and mutating the run can also cause unintended writes to the database.

Parameters:

Name Description Default
initial_state

The initial state of a transtion

Optional[prefect.orion.schemas.states.State]
required
proposed_state

The proposed state of a transition

required
context

A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.

TaskOrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/core_policy.py
async def before_transition(
    self,
    initial_state: Optional[states.State],
    validated_state: Optional[states.State],
    context: TaskOrchestrationContext,
) -> None:

    self._applied_limits = []
    filtered_limits = (
        await concurrency_limits.filter_concurrency_limits_for_orchestration(
            context.session, tags=context.run.tags
        )
    )
    run_limits = {limit.tag: limit for limit in filtered_limits}
    for tag, cl in run_limits.items():

        limit = cl.concurrency_limit
        if limit == 0:
            # limits of 0 will deadlock, and the transition needs to abort
            for stale_tag in self._applied_limits:
                stale_limit = run_limits.get(stale_tag, None)
                active_slots = set(stale_limit.active_slots)
                active_slots.discard(str(context.run.id))
                stale_limit.active_slots = list(active_slots)

            await self.abort_transition(
                reason=f'The concurrency limit on tag "{tag}" is 0 and will deadlock if the task tries to run again.',
            )
        elif len(cl.active_slots) >= limit:
            # if the limit has already been reached, delay the transition
            for stale_tag in self._applied_limits:
                stale_limit = run_limits.get(stale_tag, None)
                active_slots = set(stale_limit.active_slots)
                active_slots.discard(str(context.run.id))
                stale_limit.active_slots = list(active_slots)

            await self.delay_transition(
                30,
                f"Concurrency limit for the {tag} tag has been reached",
            )
        else:
            # log the TaskRun ID to active_slots
            self._applied_limits.append(tag)
            active_slots = set(cl.active_slots)
            active_slots.add(str(context.run.id))
            cl.active_slots = list(active_slots)

SecureTaskConcurrencySlots.cleanup async

Implements a hook that can fire after a state is committed to the database.

The intended use of this method is to revert side-effects produced by self.before_transition when the transition is found to be invalid on exit. This allows multiple rules to be gracefully run in sequence, without logic that keeps track of all other rules that might govern a transition.

Parameters:

Name Description Default
initial_state

The initial state of a transtion

Optional[prefect.orion.schemas.states.State]
required
validated_state

The governed state that has been committed to the database

Optional[prefect.orion.schemas.states.State]
required
context

A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/core_policy.py
async def cleanup(
    self,
    initial_state: Optional[states.State],
    validated_state: Optional[states.State],
    context: OrchestrationContext,
) -> None:
    for tag in self._applied_limits:
        cl = await concurrency_limits.read_concurrency_limit_by_tag(
            context.session, tag
        )
        active_slots = set(cl.active_slots)
        active_slots.discard(str(context.run.id))
        cl.active_slots = list(active_slots)

ReleaseTaskConcurrencySlots

Releases any concurrency slots held by a run upon exiting a Running state.

Source code in prefect/orion/orchestration/core_policy.py
class ReleaseTaskConcurrencySlots(BaseOrchestrationRule):
    """
    Releases any concurrency slots held by a run upon exiting a Running state.
    """

    FROM_STATES = [states.StateType.RUNNING]
    TO_STATES = ALL_ORCHESTRATION_STATES

    async def after_transition(
        self,
        initial_state: Optional[states.State],
        validated_state: Optional[states.State],
        context: TaskOrchestrationContext,
    ) -> None:

        filtered_limits = (
            await concurrency_limits.filter_concurrency_limits_for_orchestration(
                context.session, tags=context.run.tags
            )
        )
        run_limits = {limit.tag: limit for limit in filtered_limits}
        for tag, cl in run_limits.items():
            active_slots = set(cl.active_slots)
            active_slots.discard(str(context.run.id))
            cl.active_slots = list(active_slots)

ReleaseTaskConcurrencySlots.after_transition async

Implements a hook that can fire after a state is committed to the database.

Parameters:

Name Description Default
initial_state

The initial state of a transtion

Optional[prefect.orion.schemas.states.State]
required
validated_state

The governed state that has been committed to the database

Optional[prefect.orion.schemas.states.State]
required
context

A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.

TaskOrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/core_policy.py
async def after_transition(
    self,
    initial_state: Optional[states.State],
    validated_state: Optional[states.State],
    context: TaskOrchestrationContext,
) -> None:

    filtered_limits = (
        await concurrency_limits.filter_concurrency_limits_for_orchestration(
            context.session, tags=context.run.tags
        )
    )
    run_limits = {limit.tag: limit for limit in filtered_limits}
    for tag, cl in run_limits.items():
        active_slots = set(cl.active_slots)
        active_slots.discard(str(context.run.id))
        cl.active_slots = list(active_slots)

CacheInsertion

Caches completed states with cache keys after they are validated.

Source code in prefect/orion/orchestration/core_policy.py
class CacheInsertion(BaseOrchestrationRule):
    """
    Caches completed states with cache keys after they are validated.
    """

    FROM_STATES = ALL_ORCHESTRATION_STATES
    TO_STATES = [states.StateType.COMPLETED]

    @inject_db
    async def after_transition(
        self,
        initial_state: Optional[states.State],
        validated_state: Optional[states.State],
        context: TaskOrchestrationContext,
        db: OrionDBInterface,
    ) -> None:
        cache_key = validated_state.state_details.cache_key
        if cache_key:
            new_cache_item = db.TaskRunStateCache(
                cache_key=cache_key,
                cache_expiration=validated_state.state_details.cache_expiration,
                task_run_state_id=validated_state.id,
            )
            context.session.add(new_cache_item)
            await context.session.flush()

CacheRetrieval

Rejects running states if a completed state has been cached.

This rule rejects transitions into a running state with a cache key if the key has already been associated with a completed state in the cache table. The client will be instructed to transition into the cached completed state instead.

Source code in prefect/orion/orchestration/core_policy.py
class CacheRetrieval(BaseOrchestrationRule):
    """
    Rejects running states if a completed state has been cached.

    This rule rejects transitions into a running state with a cache key if the key
    has already been associated with a completed state in the cache table. The client
    will be instructed to transition into the cached completed state instead.
    """

    FROM_STATES = ALL_ORCHESTRATION_STATES
    TO_STATES = [states.StateType.RUNNING]

    @inject_db
    async def before_transition(
        self,
        initial_state: Optional[states.State],
        proposed_state: Optional[states.State],
        context: TaskOrchestrationContext,
        db: OrionDBInterface,
    ) -> None:
        cache_key = proposed_state.state_details.cache_key
        if cache_key:
            # Check for cached states matching the cache key
            cached_state_id = (
                select(db.TaskRunStateCache.task_run_state_id)
                .where(
                    sa.and_(
                        db.TaskRunStateCache.cache_key == cache_key,
                        sa.or_(
                            db.TaskRunStateCache.cache_expiration.is_(None),
                            db.TaskRunStateCache.cache_expiration > pendulum.now("utc"),
                        ),
                    ),
                )
                .order_by(db.TaskRunStateCache.created.desc())
                .limit(1)
            ).scalar_subquery()
            query = select(db.TaskRunState).where(db.TaskRunState.id == cached_state_id)
            cached_state = (await context.session.execute(query)).scalar()
            if cached_state:
                new_state = cached_state.as_state().copy(reset_fields=True)
                new_state.name = "Cached"
                await self.reject_transition(
                    state=new_state, reason="Retrieved state from cache"
                )

RetryFailedFlows

Rejects failed states and schedules a retry if the retry limit has not been reached.

This rule rejects transitions into a failed state if max_retries has been set and the run count has not reached the specified limit. The client will be instructed to transition into a scheduled state to retry flow execution.

Source code in prefect/orion/orchestration/core_policy.py
class RetryFailedFlows(BaseOrchestrationRule):
    """
    Rejects failed states and schedules a retry if the retry limit has not been reached.

    This rule rejects transitions into a failed state if `max_retries` has been
    set and the run count has not reached the specified limit. The client will be
    instructed to transition into a scheduled state to retry flow execution.
    """

    FROM_STATES = [states.StateType.RUNNING]
    TO_STATES = [states.StateType.FAILED]

    async def before_transition(
        self,
        initial_state: Optional[states.State],
        proposed_state: Optional[states.State],
        context: FlowOrchestrationContext,
    ) -> None:
        from prefect.orion.models import task_runs

        run_settings = context.run_settings
        run_count = context.run.run_count
        if run_count > run_settings.max_retries:
            return  # Retry count exceeded, allow transition to failed

        scheduled_start_time = pendulum.now("UTC").add(
            seconds=run_settings.retry_delay_seconds
        )

        failed_task_runs = await task_runs.read_task_runs(
            context.session,
            flow_run_filter=filters.FlowRunFilter(id={"any_": [context.run.id]}),
            task_run_filter=filters.TaskRunFilter(state={"type": {"any_": ["FAILED"]}}),
        )
        for run in failed_task_runs:
            await task_runs.set_task_run_state(
                context.session,
                run.id,
                state=states.AwaitingRetry(scheduled_time=scheduled_start_time),
                force=True,
            )
            # Reset the run count so that the task run retries still work correctly
            run.run_count = 0

        # Generate a new state for the flow
        retry_state = states.AwaitingRetry(
            scheduled_time=scheduled_start_time,
            message=proposed_state.message,
            data=proposed_state.data,
        )
        await self.reject_transition(state=retry_state, reason="Retrying")

RetryFailedFlows.before_transition async

Implements a hook that can fire before a state is committed to the database.

This hook may produce side-effects or mutate the proposed state of a transition using one of four methods: self.reject_transition, self.delay_transition, self.abort_transition, and self.rename_state.

Note

As currently implemented, the before_transition hook is not perfectly isolated from mutating the transition. It is a standard instance method that has access to self, and therefore self.context. This should never be modified directly. Furthermore, context.run is an ORM model, and mutating the run can also cause unintended writes to the database.

Parameters:

Name Description Default
initial_state

The initial state of a transtion

Optional[prefect.orion.schemas.states.State]
required
proposed_state

The proposed state of a transition

Optional[prefect.orion.schemas.states.State]
required
context

A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.

FlowOrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/core_policy.py
async def before_transition(
    self,
    initial_state: Optional[states.State],
    proposed_state: Optional[states.State],
    context: FlowOrchestrationContext,
) -> None:
    from prefect.orion.models import task_runs

    run_settings = context.run_settings
    run_count = context.run.run_count
    if run_count > run_settings.max_retries:
        return  # Retry count exceeded, allow transition to failed

    scheduled_start_time = pendulum.now("UTC").add(
        seconds=run_settings.retry_delay_seconds
    )

    failed_task_runs = await task_runs.read_task_runs(
        context.session,
        flow_run_filter=filters.FlowRunFilter(id={"any_": [context.run.id]}),
        task_run_filter=filters.TaskRunFilter(state={"type": {"any_": ["FAILED"]}}),
    )
    for run in failed_task_runs:
        await task_runs.set_task_run_state(
            context.session,
            run.id,
            state=states.AwaitingRetry(scheduled_time=scheduled_start_time),
            force=True,
        )
        # Reset the run count so that the task run retries still work correctly
        run.run_count = 0

    # Generate a new state for the flow
    retry_state = states.AwaitingRetry(
        scheduled_time=scheduled_start_time,
        message=proposed_state.message,
        data=proposed_state.data,
    )
    await self.reject_transition(state=retry_state, reason="Retrying")

RetryFailedTasks

Rejects failed states and schedules a retry if the retry limit has not been reached.

This rule rejects transitions into a failed state if max_retries has been set and the run count has not reached the specified limit. The client will be instructed to transition into a scheduled state to retry task execution.

Source code in prefect/orion/orchestration/core_policy.py
class RetryFailedTasks(BaseOrchestrationRule):
    """
    Rejects failed states and schedules a retry if the retry limit has not been reached.

    This rule rejects transitions into a failed state if `max_retries` has been
    set and the run count has not reached the specified limit. The client will be
    instructed to transition into a scheduled state to retry task execution.
    """

    FROM_STATES = [states.StateType.RUNNING]
    TO_STATES = [states.StateType.FAILED]

    async def before_transition(
        self,
        initial_state: Optional[states.State],
        proposed_state: Optional[states.State],
        context: TaskOrchestrationContext,
    ) -> None:
        run_settings = context.run_settings
        run_count = context.run.run_count
        if run_count <= run_settings.max_retries:
            retry_state = states.AwaitingRetry(
                scheduled_time=pendulum.now("UTC").add(
                    seconds=run_settings.retry_delay_seconds
                ),
                message=proposed_state.message,
                data=proposed_state.data,
            )
            await self.reject_transition(state=retry_state, reason="Retrying")

RetryFailedTasks.before_transition async

Implements a hook that can fire before a state is committed to the database.

This hook may produce side-effects or mutate the proposed state of a transition using one of four methods: self.reject_transition, self.delay_transition, self.abort_transition, and self.rename_state.

Note

As currently implemented, the before_transition hook is not perfectly isolated from mutating the transition. It is a standard instance method that has access to self, and therefore self.context. This should never be modified directly. Furthermore, context.run is an ORM model, and mutating the run can also cause unintended writes to the database.

Parameters:

Name Description Default
initial_state

The initial state of a transtion

Optional[prefect.orion.schemas.states.State]
required
proposed_state

The proposed state of a transition

Optional[prefect.orion.schemas.states.State]
required
context

A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.

TaskOrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/core_policy.py
async def before_transition(
    self,
    initial_state: Optional[states.State],
    proposed_state: Optional[states.State],
    context: TaskOrchestrationContext,
) -> None:
    run_settings = context.run_settings
    run_count = context.run.run_count
    if run_count <= run_settings.max_retries:
        retry_state = states.AwaitingRetry(
            scheduled_time=pendulum.now("UTC").add(
                seconds=run_settings.retry_delay_seconds
            ),
            message=proposed_state.message,
            data=proposed_state.data,
        )
        await self.reject_transition(state=retry_state, reason="Retrying")

RenameReruns

Name the states if they have run more than once.

In the special case where the initial state is an "AwaitingRetry" scheduled state, the proposed state will be renamed to "Retrying" instead.

Source code in prefect/orion/orchestration/core_policy.py
class RenameReruns(BaseOrchestrationRule):
    """
    Name the states if they have run more than once.

    In the special case where the initial state is an "AwaitingRetry" scheduled state,
    the proposed state will be renamed to "Retrying" instead.
    """

    FROM_STATES = ALL_ORCHESTRATION_STATES
    TO_STATES = [states.StateType.RUNNING]

    async def before_transition(
        self,
        initial_state: Optional[states.State],
        proposed_state: Optional[states.State],
        context: TaskOrchestrationContext,
    ) -> None:
        run_count = context.run.run_count
        if run_count > 0:
            if initial_state.name == "AwaitingRetry":
                await self.rename_state("Retrying")
            else:
                await self.rename_state("Rerunning")

RenameReruns.before_transition async

Implements a hook that can fire before a state is committed to the database.

This hook may produce side-effects or mutate the proposed state of a transition using one of four methods: self.reject_transition, self.delay_transition, self.abort_transition, and self.rename_state.

Note

As currently implemented, the before_transition hook is not perfectly isolated from mutating the transition. It is a standard instance method that has access to self, and therefore self.context. This should never be modified directly. Furthermore, context.run is an ORM model, and mutating the run can also cause unintended writes to the database.

Parameters:

Name Description Default
initial_state

The initial state of a transtion

Optional[prefect.orion.schemas.states.State]
required
proposed_state

The proposed state of a transition

Optional[prefect.orion.schemas.states.State]
required
context

A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.

TaskOrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/core_policy.py
async def before_transition(
    self,
    initial_state: Optional[states.State],
    proposed_state: Optional[states.State],
    context: TaskOrchestrationContext,
) -> None:
    run_count = context.run.run_count
    if run_count > 0:
        if initial_state.name == "AwaitingRetry":
            await self.rename_state("Retrying")
        else:
            await self.rename_state("Rerunning")

WaitForScheduledTime

Prevents transitions from scheduled states that happen too early.

This rule enforces that all scheduled states will only start with the machine clock used by the Orion instance. This rule will identify transitions from scheduled states that are too early and nullify them. Instead, no state will be written to the database and the client will be sent an instruction to wait for delay_seconds before attempting the transition again.

Source code in prefect/orion/orchestration/core_policy.py
class WaitForScheduledTime(BaseOrchestrationRule):
    """
    Prevents transitions from scheduled states that happen too early.

    This rule enforces that all scheduled states will only start with the machine clock
    used by the Orion instance. This rule will identify transitions from scheduled
    states that are too early and nullify them. Instead, no state will be written to the
    database and the client will be sent an instruction to wait for `delay_seconds`
    before attempting the transition again.
    """

    FROM_STATES = [states.StateType.SCHEDULED]
    TO_STATES = ALL_ORCHESTRATION_STATES

    async def before_transition(
        self,
        initial_state: Optional[states.State],
        proposed_state: Optional[states.State],
        context: OrchestrationContext,
    ) -> None:
        scheduled_time = initial_state.state_details.scheduled_time
        if not scheduled_time:
            raise ValueError("Received state without a scheduled time")

        delay_seconds = (scheduled_time - pendulum.now()).in_seconds()
        if delay_seconds > 0:
            await self.delay_transition(
                delay_seconds, reason="Scheduled time is in the future"
            )

WaitForScheduledTime.before_transition async

Implements a hook that can fire before a state is committed to the database.

This hook may produce side-effects or mutate the proposed state of a transition using one of four methods: self.reject_transition, self.delay_transition, self.abort_transition, and self.rename_state.

Note

As currently implemented, the before_transition hook is not perfectly isolated from mutating the transition. It is a standard instance method that has access to self, and therefore self.context. This should never be modified directly. Furthermore, context.run is an ORM model, and mutating the run can also cause unintended writes to the database.

Parameters:

Name Description Default
initial_state

The initial state of a transtion

Optional[prefect.orion.schemas.states.State]
required
proposed_state

The proposed state of a transition

Optional[prefect.orion.schemas.states.State]
required
context

A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/core_policy.py
async def before_transition(
    self,
    initial_state: Optional[states.State],
    proposed_state: Optional[states.State],
    context: OrchestrationContext,
) -> None:
    scheduled_time = initial_state.state_details.scheduled_time
    if not scheduled_time:
        raise ValueError("Received state without a scheduled time")

    delay_seconds = (scheduled_time - pendulum.now()).in_seconds()
    if delay_seconds > 0:
        await self.delay_transition(
            delay_seconds, reason="Scheduled time is in the future"
        )

PreventTransitionsFromTerminalStates

Prevents transitions from terminal states.

Orchestration logic in Orion assumes that once runs enter a terminal state, no further action will be taken on them. This rule prevents unintended transitions out of terminal states and sents an instruction to the client to abort any execution.

Source code in prefect/orion/orchestration/core_policy.py
class PreventTransitionsFromTerminalStates(BaseOrchestrationRule):
    """
    Prevents transitions from terminal states.

    Orchestration logic in Orion assumes that once runs enter a terminal state, no
    further action will be taken on them. This rule prevents unintended transitions out
    of terminal states and sents an instruction to the client to abort any execution.
    """

    FROM_STATES = TERMINAL_STATES
    TO_STATES = ALL_ORCHESTRATION_STATES

    async def before_transition(
        self,
        initial_state: Optional[states.State],
        proposed_state: Optional[states.State],
        context: OrchestrationContext,
    ) -> None:
        await self.abort_transition(reason="This run has already terminated.")

PreventTransitionsFromTerminalStates.before_transition async

Implements a hook that can fire before a state is committed to the database.

This hook may produce side-effects or mutate the proposed state of a transition using one of four methods: self.reject_transition, self.delay_transition, self.abort_transition, and self.rename_state.

Note

As currently implemented, the before_transition hook is not perfectly isolated from mutating the transition. It is a standard instance method that has access to self, and therefore self.context. This should never be modified directly. Furthermore, context.run is an ORM model, and mutating the run can also cause unintended writes to the database.

Parameters:

Name Description Default
initial_state

The initial state of a transtion

Optional[prefect.orion.schemas.states.State]
required
proposed_state

The proposed state of a transition

Optional[prefect.orion.schemas.states.State]
required
context

A safe copy of the OrchestrationContext, with the exception of context.run, mutating this context will have no effect on the broader orchestration environment.

OrchestrationContext
required

Returns:

Type Description
None

None

Source code in prefect/orion/orchestration/core_policy.py
async def before_transition(
    self,
    initial_state: Optional[states.State],
    proposed_state: Optional[states.State],
    context: OrchestrationContext,
) -> None:
    await self.abort_transition(reason="This run has already terminated.")