Skip to content

prefect.orion.utilities.database

Utilities for interacting with Orion database and ORM layer.

Orion supports both SQLite and Postgres. Many of these utilities allow Orion to seamlessly switch between the two.

GenerateUUID

Platform-independent UUID default generator. Note the actual functionality for this class is specified in the compiles-decorated functions below

Source code in prefect/orion/utilities/database.py
class GenerateUUID(FunctionElement):
    """
    Platform-independent UUID default generator.
    Note the actual functionality for this class is specified in the
    `compiles`-decorated functions below
    """

    name = "uuid_default"

JSON

JSON type that returns SQLAlchemy's dialect-specific JSON types, where possible. Uses generic JSON otherwise.

The "base" type is postgresql.JSONB to expose useful methods prior to SQL compilation

Source code in prefect/orion/utilities/database.py
class JSON(TypeDecorator):
    """
    JSON type that returns SQLAlchemy's dialect-specific JSON types, where
    possible. Uses generic JSON otherwise.

    The "base" type is postgresql.JSONB to expose useful methods prior
    to SQL compilation
    """

    impl = postgresql.JSONB
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.JSONB(none_as_null=True))
        elif dialect.name == "sqlite":
            return dialect.type_descriptor(sqlite.JSON(none_as_null=True))
        else:
            return dialect.type_descriptor(sa.JSON(none_as_null=True))

    def process_bind_param(self, value, dialect):
        """Prepares the given value to be used as a JSON field in a parameter binding"""
        if not value:
            return value

        # PostgreSQL does not support the floating point extrema values `NaN`,
        # `-Infinity`, or `Infinity`
        # https://www.postgresql.org/docs/current/datatype-json.html#JSON-TYPE-MAPPING-TABLE
        #
        # SQLite supports storing and retrieving full JSON values that include
        # `NaN`, `-Infinity`, or `Infinity`, but any query that requires SQLite to parse
        # the value (like `json_extract`) will fail.
        #
        # Replace any `NaN`, `-Infinity`, or `Infinity` values with `None` in the
        # returned value.  See more about `parse_constant` at
        # https://docs.python.org/3/library/json.html#json.load.
        return json.loads(json.dumps(value), parse_constant=lambda c: None)

Pydantic

A pydantic type that converts inserted parameters to json and converts read values to the pydantic type.

Source code in prefect/orion/utilities/database.py
class Pydantic(TypeDecorator):
    """
    A pydantic type that converts inserted parameters to
    json and converts read values to the pydantic type.
    """

    impl = JSON
    cache_ok = True

    def __init__(self, pydantic_type, sa_column_type=None):
        super().__init__()
        self._pydantic_type = pydantic_type
        if sa_column_type is not None:
            self.impl = sa_column_type

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        # parse the value to ensure it complies with the schema
        # (this will raise validation errors if not)
        value = pydantic.parse_obj_as(self._pydantic_type, value)
        # sqlalchemy requires the bind parameter's value to be a python-native
        # collection of JSON-compatible objects. we achieve that by dumping the
        # value to a json string using the pydantic JSON encoder and re-parsing
        # it into a python-native form.
        return json.loads(json.dumps(value, default=pydantic.json.pydantic_encoder))

    def process_result_value(self, value, dialect):
        if value is not None:
            # load the json object into a fully hydrated typed object
            return pydantic.parse_obj_as(self._pydantic_type, value)

Timestamp

TypeDecorator that ensures that timestamps have a timezone.

For SQLite, all timestamps are converted to UTC (since they are stored as naive timestamps without timezones) and recovered as UTC.

Source code in prefect/orion/utilities/database.py
class Timestamp(TypeDecorator):
    """TypeDecorator that ensures that timestamps have a timezone.

    For SQLite, all timestamps are converted to UTC (since they are stored
    as naive timestamps without timezones) and recovered as UTC.
    """

    impl = sa.TIMESTAMP(timezone=True)
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.TIMESTAMP(timezone=True))
        elif dialect.name == "sqlite":
            return dialect.type_descriptor(
                sqlite.DATETIME(
                    # SQLite is very particular about datetimes, and performs all comparisons
                    # as alphanumeric comparisons without regard for actual timestamp
                    # semantics or timezones. Therefore, it's important to have uniform
                    # and sortable datetime representations. The default is an ISO8601-compatible
                    # string with NO time zone and a space (" ") delimeter between the date
                    # and the time. The below settings can be used to add a "T" delimiter but
                    # will require all other sqlite datetimes to be set similarly, including
                    # the custom default value for datetime columns and any handwritten SQL
                    # formed with `strftime()`.
                    #
                    # store with "T" separator for time
                    # storage_format=(
                    #     "%(year)04d-%(month)02d-%(day)02d"
                    #     "T%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
                    # ),
                    # handle ISO 8601 with "T" or " " as the time separator
                    # regexp=r"(\d+)-(\d+)-(\d+)[T ](\d+):(\d+):(\d+).(\d+)",
                )
            )
        else:
            return dialect.type_descriptor(sa.TIMESTAMP(timezone=True))

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        else:
            if value.tzinfo is None:
                raise ValueError("Timestamps must have a timezone.")
            elif dialect.name == "sqlite":
                return pendulum.instance(value).in_timezone("UTC")
            else:
                return value

    def process_result_value(self, value, dialect):
        # retrieve timestamps in their native timezone (or UTC)
        if value is not None:
            return pendulum.instance(value).in_timezone("utc")

UUID

Platform-independent UUID type.

Uses PostgreSQL's UUID type, otherwise uses CHAR(36), storing as stringified hex values with hyphens.

Source code in prefect/orion/utilities/database.py
class UUID(TypeDecorator):
    """
    Platform-independent UUID type.

    Uses PostgreSQL's UUID type, otherwise uses
    CHAR(36), storing as stringified hex values with
    hyphens.
    """

    impl = TypeEngine
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.UUID())
        else:
            return dialect.type_descriptor(CHAR(36))

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        elif dialect.name == "postgresql":
            return str(value)
        elif isinstance(value, uuid.UUID):
            return str(value)
        else:
            return str(uuid.UUID(value))

    def process_result_value(self, value, dialect):
        if value is None:
            return value
        else:
            if not isinstance(value, uuid.UUID):
                value = uuid.UUID(value)
            return value

date_add

Platform-independent way to add a date and an interval.

Source code in prefect/orion/utilities/database.py
class date_add(FunctionElement):
    """
    Platform-independent way to add a date and an interval.
    """

    type = Timestamp()
    name = "date_add"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, dt, interval):
        self.dt = dt
        self.interval = interval
        super().__init__()

inherit_cache

Indicate if this :class:.HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

.. seealso::

:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.

date_diff

Platform-independent difference of dates. Computes d1 - d2.

Source code in prefect/orion/utilities/database.py
class date_diff(FunctionElement):
    """
    Platform-independent difference of dates. Computes d1 - d2.
    """

    type = sa.Interval()
    name = "date_diff"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, d1, d2):
        self.d1 = d1
        self.d2 = d2
        super().__init__()

inherit_cache

Indicate if this :class:.HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

.. seealso::

:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.

interval_add

Platform-independent way to add two intervals.

Source code in prefect/orion/utilities/database.py
class interval_add(FunctionElement):
    """
    Platform-independent way to add two intervals.
    """

    type = sa.Interval()
    name = "interval_add"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, i1, i2):
        self.i1 = i1
        self.i2 = i2
        super().__init__()

inherit_cache

Indicate if this :class:.HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

.. seealso::

:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.

json_contains

Platform independent json_contains operator, tests if the left expression contains the right expression.

On postgres this is equivalent to the @> containment operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/orion/utilities/database.py
class json_contains(FunctionElement):
    """
    Platform independent json_contains operator, tests if the
    `left` expression contains the `right` expression.

    On postgres this is equivalent to the @> containment operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_contains"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, left, right):
        self.left = left
        self.right = right
        super().__init__()

inherit_cache

Indicate if this :class:.HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

.. seealso::

:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.

json_has_all_keys

Platform independent json_has_all_keys operator.

On postgres this is equivalent to the ?& existence operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/orion/utilities/database.py
class json_has_all_keys(FunctionElement):
    """Platform independent json_has_all_keys operator.

    On postgres this is equivalent to the ?& existence operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_has_all_keys"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, json_expr, values: List):
        self.json_expr = json_expr
        if isinstance(values, list) and not all(isinstance(v, str) for v in values):
            raise ValueError(
                "json_has_all_key values must be strings if provided as a literal list"
            )
        self.values = values
        super().__init__()

inherit_cache

Indicate if this :class:.HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

.. seealso::

:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.

json_has_any_key

Platform independent json_has_any_key operator.

On postgres this is equivalent to the ?| existence operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/orion/utilities/database.py
class json_has_any_key(FunctionElement):
    """
    Platform independent json_has_any_key operator.

    On postgres this is equivalent to the ?| existence operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_has_any_key"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, json_expr, values: List):
        self.json_expr = json_expr
        if not all(isinstance(v, str) for v in values):
            raise ValueError("json_has_any_key values must be strings")
        self.values = values
        super().__init__()

inherit_cache

Indicate if this :class:.HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

.. seealso::

:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.

now

Platform-independent "now" generator.

Source code in prefect/orion/utilities/database.py
class now(FunctionElement):
    """
    Platform-independent "now" generator.
    """

    type = Timestamp()
    name = "now"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = True

inherit_cache

Indicate if this :class:.HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

.. seealso::

:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.

get_dialect

Get the dialect of a session, engine, or connection url.

Primary use case is figuring out whether the Orion API is communicating with SQLite or Postgres.

Examples:

import prefect.settings
from prefect.orion.utilities.database import get_dialect

dialect = get_dialect(PREFECT_ORION_DATABASE_CONNECTION_URL.value())
if dialect == "sqlite":
    print("Using SQLite!")
else:
    print("Using Postgres!")
Source code in prefect/orion/utilities/database.py
def get_dialect(
    obj: Union[str, sa.orm.Session, sa.engine.Engine],
) -> sa.engine.Dialect:
    """
    Get the dialect of a session, engine, or connection url.

    Primary use case is figuring out whether the Orion API is communicating with
    SQLite or Postgres.

    Example:
        ```python
        import prefect.settings
        from prefect.orion.utilities.database import get_dialect

        dialect = get_dialect(PREFECT_ORION_DATABASE_CONNECTION_URL.value())
        if dialect == "sqlite":
            print("Using SQLite!")
        else:
            print("Using Postgres!")
        ```
    """
    if isinstance(obj, sa.orm.Session):
        url = obj.bind.url
    elif isinstance(obj, sa.engine.Engine):
        url = obj.url
    else:
        url = sa.engine.url.make_url(obj)

    return url.get_dialect()