Skip to content

prefect.states

AwaitingRetry

Convenience function for creating AwaitingRetry states.

Returns:

Type Description
State

a AwaitingRetry state

Source code in prefect/states.py
def AwaitingRetry(
    cls: Type[State] = State, scheduled_time: datetime.datetime = None, **kwargs
) -> State:
    """Convenience function for creating `AwaitingRetry` states.

    Returns:
        State: a AwaitingRetry state
    """
    return schemas.states.AwaitingRetry(
        cls=cls, scheduled_time=scheduled_time, **kwargs
    )

Cancelled

Convenience function for creating Cancelled states.

Returns:

Type Description
State

a Cancelled state

Source code in prefect/states.py
def Cancelled(cls: Type[State] = State, **kwargs) -> State:
    """Convenience function for creating `Cancelled` states.

    Returns:
        State: a Cancelled state
    """
    return schemas.states.Cancelled(cls=cls, **kwargs)

Completed

Convenience function for creating Completed states.

Returns:

Type Description
State

a Completed state

Source code in prefect/states.py
def Completed(cls: Type[State] = State, **kwargs) -> State:
    """Convenience function for creating `Completed` states.

    Returns:
        State: a Completed state
    """
    return schemas.states.Completed(cls=cls, **kwargs)

Crashed

Convenience function for creating Crashed states.

Returns:

Type Description
State

a Crashed state

Source code in prefect/states.py
def Crashed(cls: Type[State] = State, **kwargs) -> State:
    """Convenience function for creating `Crashed` states.

    Returns:
        State: a Crashed state
    """
    return schemas.states.Crashed(cls=cls, **kwargs)

Failed

Convenience function for creating Failed states.

Returns:

Type Description
State

a Failed state

Source code in prefect/states.py
def Failed(cls: Type[State] = State, **kwargs) -> State:
    """Convenience function for creating `Failed` states.

    Returns:
        State: a Failed state
    """
    return schemas.states.Failed(cls=cls, **kwargs)

Late

Convenience function for creating Late states.

Returns:

Type Description
State

a Late state

Source code in prefect/states.py
def Late(
    cls: Type[State] = State, scheduled_time: datetime.datetime = None, **kwargs
) -> State:
    """Convenience function for creating `Late` states.

    Returns:
        State: a Late state
    """
    return schemas.states.Late(cls=cls, scheduled_time=scheduled_time, **kwargs)

Paused

Convenience function for creating Paused states.

Returns:

Type Description
State

a Paused state

Source code in prefect/states.py
def Paused(cls: Type[State] = State, **kwargs) -> State:
    """Convenience function for creating `Paused` states.

    Returns:
        State: a Paused state
    """
    return schemas.states.Paused(cls=cls, **kwargs)

Pending

Convenience function for creating Pending states.

Returns:

Type Description
State

a Pending state

Source code in prefect/states.py
def Pending(cls: Type[State] = State, **kwargs) -> State:
    """Convenience function for creating `Pending` states.

    Returns:
        State: a Pending state
    """
    return schemas.states.Pending(cls=cls, **kwargs)

Retrying

Convenience function for creating Retrying states.

Returns:

Type Description
State

a Retrying state

Source code in prefect/states.py
def Retrying(cls: Type[State] = State, **kwargs) -> State:
    """Convenience function for creating `Retrying` states.

    Returns:
        State: a Retrying state
    """
    return schemas.states.Retrying(cls=cls, **kwargs)

Running

Convenience function for creating Running states.

Returns:

Type Description
State

a Running state

Source code in prefect/states.py
def Running(cls: Type[State] = State, **kwargs) -> State:
    """Convenience function for creating `Running` states.

    Returns:
        State: a Running state
    """
    return schemas.states.Running(cls=cls, **kwargs)

Scheduled

Convenience function for creating Scheduled states.

Returns:

Type Description
State

a Scheduled state

Source code in prefect/states.py
def Scheduled(
    cls: Type[State] = State, scheduled_time: datetime.datetime = None, **kwargs
) -> State:
    """Convenience function for creating `Scheduled` states.

    Returns:
        State: a Scheduled state
    """
    return schemas.states.Scheduled(cls=cls, scheduled_time=scheduled_time, **kwargs)

exception_to_crashed_state async

Takes an exception that occurs outside of user code and converts it to a 'Crash' exception with a 'Crashed' state.

Source code in prefect/states.py
async def exception_to_crashed_state(
    exc: BaseException,
    result_factory: Optional[ResultFactory] = None,
) -> State:
    """
    Takes an exception that occurs _outside_ of user code and converts it to a
    'Crash' exception with a 'Crashed' state.
    """
    state_message = None

    if isinstance(exc, anyio.get_cancelled_exc_class()):
        state_message = "Execution was cancelled by the runtime environment."

    elif isinstance(exc, KeyboardInterrupt):
        state_message = "Execution was aborted by an interrupt signal."

    elif isinstance(exc, SystemExit):
        state_message = "Execution was aborted by Python system exit call."

    elif isinstance(exc, (httpx.TimeoutException, httpx.ConnectError)):
        try:
            request: httpx.Request = exc.request
        except RuntimeError:
            # The request property is not set
            state_message = f"Request failed while attempting to contact the server: {format_exception(exc)}"
        else:
            # TODO: We can check if this is actually our API url
            state_message = f"Request to {request.url} failed: {format_exception(exc)}."

    else:
        state_message = f"Execution was interrupted by an unexpected exception: {format_exception(exc)}"

    if result_factory:
        data = await result_factory.create_result(exc)
    else:
        # Attach the exception for local usage, will not be available when retrieved
        # from the API
        data = exc

    return Crashed(message=state_message, data=data)

exception_to_failed_state async

Convenience function for creating Failed states from exceptions

Source code in prefect/states.py
async def exception_to_failed_state(
    exc: Optional[BaseException] = None,
    result_factory: Optional[ResultFactory] = None,
    **kwargs,
) -> State:
    """
    Convenience function for creating `Failed` states from exceptions
    """
    if not exc:
        _, exc, exc_tb = sys.exc_info()
        if exc is None:
            raise ValueError(
                "Exception was not passed and no active exception could be found."
            )
    else:
        exc_tb = exc.__traceback__

    if result_factory:
        data = await result_factory.create_result(exc)
    else:
        # Attach the exception for local usage, will not be available when retrieved
        # from the API
        data = exc

    existing_message = kwargs.pop("message", "")
    if existing_message and not existing_message.endswith(" "):
        existing_message += " "

    # TODO: Consider if we want to include traceback information, it is intentionally
    #       excluded from messages for now
    message = existing_message + format_exception(exc)

    return Failed(data=data, message=message, **kwargs)

get_state_exception async

If not given a FAILED or CRASHED state, this raise a value error.

If the state result is a state, its exception will be returned.

If the state result is an iterable of states, the exception of the first failure will be returned.

If the state result is a string, a wrapper exception will be returned with the string as the message.

If the state result is null, a wrapper exception will be returned with the state message attached.

If the state result is not of a known type, a TypeError will be returned.

When a wrapper exception is returned, the type will be: - FailedRun if the state type is FAILED. - CrashedRun if the state type is CRASHED. - CancelledRun if the state type is CANCELLED.

Source code in prefect/states.py
@sync_compatible
async def get_state_exception(state: State) -> BaseException:
    """
    If not given a FAILED or CRASHED state, this raise a value error.

    If the state result is a state, its exception will be returned.

    If the state result is an iterable of states, the exception of the first failure
    will be returned.

    If the state result is a string, a wrapper exception will be returned with the
    string as the message.

    If the state result is null, a wrapper exception will be returned with the state
    message attached.

    If the state result is not of a known type, a `TypeError` will be returned.

    When a wrapper exception is returned, the type will be:
        - `FailedRun` if the state type is FAILED.
        - `CrashedRun` if the state type is CRASHED.
        - `CancelledRun` if the state type is CANCELLED.
    """

    if state.is_failed():
        wrapper = FailedRun
        default_message = "Run failed."
    elif state.is_crashed():
        wrapper = CrashedRun
        default_message = "Run crashed."
    elif state.is_cancelled():
        wrapper = CancelledRun
        default_message = "Run cancelled."
    else:
        raise ValueError(f"Expected failed or crashed state got {state!r}.")

    if isinstance(state.data, BaseResult):
        result = await state.data.get()
    elif state.data is None:
        result = None
    else:
        result = state.data

    if result is None:
        return wrapper(state.message or default_message)

    if isinstance(result, Exception):
        return result

    elif isinstance(result, BaseException):
        return result

    elif isinstance(result, str):
        return wrapper(result)

    elif isinstance(result, State):
        # Return the exception from the inner state
        return await get_state_exception(result)

    elif is_state_iterable(result):
        # Return the first failure
        for state in result:
            if state.is_failed() or state.is_crashed() or state.is_cancelled():
                return await get_state_exception(state)

        raise ValueError(
            "Failed state result was an iterable of states but none were failed."
        )

    else:
        raise TypeError(
            f"Unexpected result for failed state: {result!r} —— "
            f"{type(result).__name__} cannot be resolved into an exception"
        )

get_state_result

Get the result from a state.

See State.result()

Source code in prefect/states.py
def get_state_result(
    state: State[R], raise_on_failure: bool = True, fetch: Optional[bool] = None
) -> R:
    """
    Get the result from a state.

    See `State.result()`
    """

    if fetch is None and (
        PREFECT_ASYNC_FETCH_STATE_RESULT or not in_async_main_thread()
    ):
        # Fetch defaults to `True` for sync users or async users who have opted in
        fetch = True

    if not fetch:
        if fetch is None and in_async_main_thread():
            warnings.warn(
                "State.result() was called from an async context but not awaited. "
                "This method will be updated to return a coroutine by default in "
                "the future. Pass `fetch=True` and `await` the call to get rid of "
                "this warning.",
                DeprecationWarning,
                stacklevel=2,
            )
        # Backwards compatibility
        if isinstance(state.data, DataDocument):
            return result_from_state_with_data_document(
                state, raise_on_failure=raise_on_failure
            )
        else:
            return state.data
    else:
        return _get_state_result(state, raise_on_failure=raise_on_failure)

is_state

Check if the given object is a state instance

Source code in prefect/states.py
def is_state(obj: Any) -> TypeGuard[schemas.states.State]:
    """
    Check if the given object is a state instance
    """
    # We may want to narrow this to client-side state types but for now this provides
    # backwards compatibility
    return isinstance(obj, schemas.states.State)

is_state_iterable

Check if a the given object is an iterable of states types

Supported iterables are: - set - list - tuple

Other iterables will return False even if they contain states.

Source code in prefect/states.py
def is_state_iterable(obj: Any) -> TypeGuard[Iterable[State]]:
    """
    Check if a the given object is an iterable of states types

    Supported iterables are:
    - set
    - list
    - tuple

    Other iterables will return `False` even if they contain states.
    """
    # We do not check for arbitary iterables because this is not intended to be used
    # for things like dictionaries, dataframes, or pydantic models

    if isinstance(obj, (list, set, tuple)) and obj:
        return all([is_state(o) for o in obj])
    else:
        return False

raise_state_exception async

Given a FAILED or CRASHED state, raise the contained exception.

Source code in prefect/states.py
@sync_compatible
async def raise_state_exception(state: State) -> None:
    """
    Given a FAILED or CRASHED state, raise the contained exception.
    """
    if not (state.is_failed() or state.is_crashed() or state.is_cancelled()):
        return None

    raise await get_state_exception(state)

return_value_to_state async

Given a return value from a user's function, create a State the run should be placed in.

  • If data is returned, we create a 'COMPLETED' state with the data
  • If a single, manually created state is returned, we use that state as given (manual creation is determined by the lack of ids)
  • If an upstream state or iterable of upstream states is returned, we apply the aggregate rule

The aggregate rule says that given multiple states we will determine the final state such that:

  • If any states are not COMPLETED the final state is FAILED
  • If all of the states are COMPLETED the final state is COMPLETED
  • The states will be placed in the final state data attribute

Callers should resolve all futures into states before passing return values to this function.

Source code in prefect/states.py
async def return_value_to_state(retval: R, result_factory: ResultFactory) -> State[R]:
    """
    Given a return value from a user's function, create a `State` the run should
    be placed in.

    - If data is returned, we create a 'COMPLETED' state with the data
    - If a single, manually created state is returned, we use that state as given
        (manual creation is determined by the lack of ids)
    - If an upstream state or iterable of upstream states is returned, we apply the
        aggregate rule

    The aggregate rule says that given multiple states we will determine the final state
    such that:

    - If any states are not COMPLETED the final state is FAILED
    - If all of the states are COMPLETED the final state is COMPLETED
    - The states will be placed in the final state `data` attribute

    Callers should resolve all futures into states before passing return values to this
    function.
    """

    if (
        is_state(retval)
        # Check for manual creation
        and not retval.state_details.flow_run_id
        and not retval.state_details.task_run_id
    ):
        state = retval

        # Unless the user has already constructed a result explicitly, use the factory
        # to update the data to the correct type
        if not isinstance(state.data, BaseResult):
            state.data = await result_factory.create_result(state.data)

        return state

    # Determine a new state from the aggregate of contained states
    if is_state(retval) or is_state_iterable(retval):
        states = StateGroup(ensure_iterable(retval))

        # Determine the new state type
        if states.all_completed():
            new_state_type = StateType.COMPLETED
        elif states.any_cancelled():
            new_state_type = StateType.CANCELLED
        else:
            new_state_type = StateType.FAILED

        # Generate a nice message for the aggregate
        if states.all_completed():
            message = "All states completed."
        elif states.any_cancelled():
            message = f"{states.cancelled_count}/{states.total_count} states cancelled."
        elif states.any_failed():
            message = f"{states.fail_count}/{states.total_count} states failed."
        elif not states.all_final():
            message = (
                f"{states.not_final_count}/{states.total_count} states are not final."
            )
        else:
            message = "Given states: " + states.counts_message()

        # TODO: We may actually want to set the data to a `StateGroup` object and just
        #       allow it to be unpacked into a tuple and such so users can interact with
        #       it
        return State(
            type=new_state_type,
            message=message,
            data=await result_factory.create_result(retval),
        )

    # Otherwise, they just gave data and this is a completed retval
    return Completed(data=await result_factory.create_result(retval))