Skip to content

prefect.orion.services.loop_service

The base class for all Orion loop services.

LoopService

Loop services are relatively lightweight maintenance routines that need to run periodically.

This class makes it straightforward to design and integrate them. Users only need to define the run_once coroutine to describe the behavior of the service on each loop.

Source code in prefect/orion/services/loop_service.py
class LoopService:
    """
    Loop services are relatively lightweight maintenance routines that need to run periodically.

    This class makes it straightforward to design and integrate them. Users only need to
    define the `run_once` coroutine to describe the behavior of the service on each loop.
    """

    loop_seconds = 60

    def __init__(self, loop_seconds: float = None, handle_signals: bool = True):
        """
        Args:
            loop_seconds (float): if provided, overrides the loop interval
                otherwise specified as a class variable
            handle_signals (bool): if True (default), SIGINT and SIGTERM are
                gracefully intercepted and shut down the running service.
        """
        if loop_seconds:
            self.loop_seconds = loop_seconds  # seconds between runs
        self.should_stop = False  # flag for whether the service should stop running
        self.name = type(self).__name__
        self.logger = get_logger(f"orion.services.{self.name.lower()}")

        if handle_signals:
            signal.signal(signal.SIGINT, self._stop)
            signal.signal(signal.SIGTERM, self._stop)

    @inject_db
    async def setup(self, db: OrionDBInterface) -> None:
        """
        Called prior to running the service
        """
        self.should_stop = False

    async def shutdown(self) -> None:
        """
        Called after running the service
        """
        # reset `should_stop` to False
        self.should_stop = False

    async def start(self, loops=None) -> None:
        """
        Run the service `loops` time. Pass loops=None to run forever.

        Args:
            loops (int, optional): the number of loops to run before exiting.
        """

        await self.setup()

        i = 0
        while not self.should_stop:
            start_time = pendulum.now("UTC")

            try:
                self.logger.debug(f"About to run {self.name}...")
                await self.run_once()

            except NotImplementedError as exc:
                raise exc from None

            # if an error is raised, log and continue
            except Exception as exc:
                self.logger.error(f"Unexpected error in: {repr(exc)}", exc_info=True)

            end_time = pendulum.now("UTC")

            # if service took longer than its loop interval, log a warning
            # that the interval might be too short
            if (end_time - start_time).total_seconds() > self.loop_seconds:
                self.logger.warning(
                    f"{self.name} took {(end_time-start_time).total_seconds()} seconds to run, "
                    f"which is longer than its loop interval of {self.loop_seconds} seconds."
                )

            # check if early stopping was requested
            i += 1
            if loops is not None and i == loops:
                self.logger.debug(f"{self.name} exiting after {loops} loop(s).")
                break

            # next run is every "loop seconds" after each previous run *started*.
            # note that if the loop took unexpectedly long, the "next_run" time
            # might be in the past, which will result in an instant start
            next_run = max(
                start_time.add(seconds=self.loop_seconds), pendulum.now("UTC")
            )
            self.logger.debug(f"Finished running {self.name}. Next run at {next_run}")

            # check the `should_stop` flag every 1 seconds until the next run time is reached
            while pendulum.now("UTC") < next_run and not self.should_stop:
                await asyncio.sleep(
                    min(1, (next_run - pendulum.now("UTC")).total_seconds())
                )

        await self.shutdown()

    async def stop(self) -> None:
        """
        Gracefully stops a running LoopService and blocks until the service
        stops (indicated by resetting the `should_stop` flag).
        """
        self._stop()

        while self.should_stop:
            await asyncio.sleep(0.1)

    def _stop(self, *_) -> None:
        """
        Private method for setting the `should_stop` flag. Takes arbitrary
        arguments so it can be used as a signal handler.
        """
        self.should_stop = True

    async def run_once(self) -> None:
        """
        Represents one loop of the service.

        Users should override this method.

        To actually run the service once, call `LoopService.start(loops=1)` instead
        of this method, in order to handle setup/shutdown properly.
        """
        raise NotImplementedError("LoopService subclasses must implement this method.")

LoopService.__init__ special

Parameters:

Name Description Default
loop_seconds

if provided, overrides the loop interval otherwise specified as a class variable

float
None
handle_signals

if True (default), SIGINT and SIGTERM are gracefully intercepted and shut down the running service.

bool
True
Source code in prefect/orion/services/loop_service.py
def __init__(self, loop_seconds: float = None, handle_signals: bool = True):
    """
    Args:
        loop_seconds (float): if provided, overrides the loop interval
            otherwise specified as a class variable
        handle_signals (bool): if True (default), SIGINT and SIGTERM are
            gracefully intercepted and shut down the running service.
    """
    if loop_seconds:
        self.loop_seconds = loop_seconds  # seconds between runs
    self.should_stop = False  # flag for whether the service should stop running
    self.name = type(self).__name__
    self.logger = get_logger(f"orion.services.{self.name.lower()}")

    if handle_signals:
        signal.signal(signal.SIGINT, self._stop)
        signal.signal(signal.SIGTERM, self._stop)

LoopService.run_once async

Represents one loop of the service.

Users should override this method.

To actually run the service once, call LoopService.start(loops=1) instead of this method, in order to handle setup/shutdown properly.

Source code in prefect/orion/services/loop_service.py
async def run_once(self) -> None:
    """
    Represents one loop of the service.

    Users should override this method.

    To actually run the service once, call `LoopService.start(loops=1)` instead
    of this method, in order to handle setup/shutdown properly.
    """
    raise NotImplementedError("LoopService subclasses must implement this method.")

LoopService.setup async

Called prior to running the service

Source code in prefect/orion/services/loop_service.py
@inject_db
async def setup(self, db: OrionDBInterface) -> None:
    """
    Called prior to running the service
    """
    self.should_stop = False

LoopService.shutdown async

Called after running the service

Source code in prefect/orion/services/loop_service.py
async def shutdown(self) -> None:
    """
    Called after running the service
    """
    # reset `should_stop` to False
    self.should_stop = False

LoopService.start async

Run the service loops time. Pass loops=None to run forever.

Parameters:

Name Description Default
loops

the number of loops to run before exiting.

int
None
Source code in prefect/orion/services/loop_service.py
async def start(self, loops=None) -> None:
    """
    Run the service `loops` time. Pass loops=None to run forever.

    Args:
        loops (int, optional): the number of loops to run before exiting.
    """

    await self.setup()

    i = 0
    while not self.should_stop:
        start_time = pendulum.now("UTC")

        try:
            self.logger.debug(f"About to run {self.name}...")
            await self.run_once()

        except NotImplementedError as exc:
            raise exc from None

        # if an error is raised, log and continue
        except Exception as exc:
            self.logger.error(f"Unexpected error in: {repr(exc)}", exc_info=True)

        end_time = pendulum.now("UTC")

        # if service took longer than its loop interval, log a warning
        # that the interval might be too short
        if (end_time - start_time).total_seconds() > self.loop_seconds:
            self.logger.warning(
                f"{self.name} took {(end_time-start_time).total_seconds()} seconds to run, "
                f"which is longer than its loop interval of {self.loop_seconds} seconds."
            )

        # check if early stopping was requested
        i += 1
        if loops is not None and i == loops:
            self.logger.debug(f"{self.name} exiting after {loops} loop(s).")
            break

        # next run is every "loop seconds" after each previous run *started*.
        # note that if the loop took unexpectedly long, the "next_run" time
        # might be in the past, which will result in an instant start
        next_run = max(
            start_time.add(seconds=self.loop_seconds), pendulum.now("UTC")
        )
        self.logger.debug(f"Finished running {self.name}. Next run at {next_run}")

        # check the `should_stop` flag every 1 seconds until the next run time is reached
        while pendulum.now("UTC") < next_run and not self.should_stop:
            await asyncio.sleep(
                min(1, (next_run - pendulum.now("UTC")).total_seconds())
            )

    await self.shutdown()

LoopService.stop async

Gracefully stops a running LoopService and blocks until the service stops (indicated by resetting the should_stop flag).

Source code in prefect/orion/services/loop_service.py
async def stop(self) -> None:
    """
    Gracefully stops a running LoopService and blocks until the service
    stops (indicated by resetting the `should_stop` flag).
    """
    self._stop()

    while self.should_stop:
        await asyncio.sleep(0.1)

run_multiple_services async

Only one signal handler can be active at a time, so this function takes a list of loop services and runs all of them with a global signal handler.

Source code in prefect/orion/services/loop_service.py
async def run_multiple_services(loop_services: List[LoopService]):
    """
    Only one signal handler can be active at a time, so this function takes a list
    of loop services and runs all of them with a global signal handler.
    """

    def stop_all_services(self, *_):
        for service in loop_services:
            service._stop()

    signal.signal(signal.SIGINT, stop_all_services)
    signal.signal(signal.SIGTERM, stop_all_services)
    await asyncio.gather(*[service.start() for service in loop_services])