Skip to content

prefect.server.utilities.database

Utilities for interacting with Prefect REST API database and ORM layer.

Prefect supports both SQLite and Postgres. Many of these utilities allow the Prefect REST API to seamlessly switch between the two.

GenerateUUID

Bases: FunctionElement

Platform-independent UUID default generator. Note the actual functionality for this class is specified in the compiles-decorated functions below

Source code in prefect/server/utilities/database.py
33
34
35
36
37
38
39
40
class GenerateUUID(FunctionElement):
    """
    Platform-independent UUID default generator.
    Note the actual functionality for this class is specified in the
    `compiles`-decorated functions below
    """

    name = "uuid_default"

JSON

Bases: TypeDecorator

JSON type that returns SQLAlchemy's dialect-specific JSON types, where possible. Uses generic JSON otherwise.

The "base" type is postgresql.JSONB to expose useful methods prior to SQL compilation

Source code in prefect/server/utilities/database.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class JSON(TypeDecorator):
    """
    JSON type that returns SQLAlchemy's dialect-specific JSON types, where
    possible. Uses generic JSON otherwise.

    The "base" type is postgresql.JSONB to expose useful methods prior
    to SQL compilation
    """

    impl = postgresql.JSONB
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.JSONB(none_as_null=True))
        elif dialect.name == "sqlite":
            return dialect.type_descriptor(sqlite.JSON(none_as_null=True))
        else:
            return dialect.type_descriptor(sa.JSON(none_as_null=True))

    def process_bind_param(self, value, dialect):
        """Prepares the given value to be used as a JSON field in a parameter binding"""
        if not value:
            return value

        # PostgreSQL does not support the floating point extrema values `NaN`,
        # `-Infinity`, or `Infinity`
        # https://www.postgresql.org/docs/current/datatype-json.html#JSON-TYPE-MAPPING-TABLE
        #
        # SQLite supports storing and retrieving full JSON values that include
        # `NaN`, `-Infinity`, or `Infinity`, but any query that requires SQLite to parse
        # the value (like `json_extract`) will fail.
        #
        # Replace any `NaN`, `-Infinity`, or `Infinity` values with `None` in the
        # returned value.  See more about `parse_constant` at
        # https://docs.python.org/3/library/json.html#json.load.
        return json.loads(json.dumps(value), parse_constant=lambda c: None)

Pydantic

Bases: TypeDecorator

A pydantic type that converts inserted parameters to json and converts read values to the pydantic type.

Source code in prefect/server/utilities/database.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class Pydantic(TypeDecorator):
    """
    A pydantic type that converts inserted parameters to
    json and converts read values to the pydantic type.
    """

    impl = JSON
    cache_ok = True

    def __init__(self, pydantic_type, sa_column_type=None):
        super().__init__()
        self._pydantic_type = pydantic_type
        if sa_column_type is not None:
            self.impl = sa_column_type

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        # parse the value to ensure it complies with the schema
        # (this will raise validation errors if not)
        value = pydantic.parse_obj_as(self._pydantic_type, value)
        # sqlalchemy requires the bind parameter's value to be a python-native
        # collection of JSON-compatible objects. we achieve that by dumping the
        # value to a json string using the pydantic JSON encoder and re-parsing
        # it into a python-native form.
        return json.loads(json.dumps(value, default=pydantic.json.pydantic_encoder))

    def process_result_value(self, value, dialect):
        if value is not None:
            # load the json object into a fully hydrated typed object
            return pydantic.parse_obj_as(self._pydantic_type, value)

Timestamp

Bases: TypeDecorator

TypeDecorator that ensures that timestamps have a timezone.

For SQLite, all timestamps are converted to UTC (since they are stored as naive timestamps without timezones) and recovered as UTC.

Source code in prefect/server/utilities/database.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class Timestamp(TypeDecorator):
    """TypeDecorator that ensures that timestamps have a timezone.

    For SQLite, all timestamps are converted to UTC (since they are stored
    as naive timestamps without timezones) and recovered as UTC.
    """

    impl = sa.TIMESTAMP(timezone=True)
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.TIMESTAMP(timezone=True))
        elif dialect.name == "sqlite":
            return dialect.type_descriptor(
                sqlite.DATETIME(
                    # SQLite is very particular about datetimes, and performs all comparisons
                    # as alphanumeric comparisons without regard for actual timestamp
                    # semantics or timezones. Therefore, it's important to have uniform
                    # and sortable datetime representations. The default is an ISO8601-compatible
                    # string with NO time zone and a space (" ") delimiter between the date
                    # and the time. The below settings can be used to add a "T" delimiter but
                    # will require all other sqlite datetimes to be set similarly, including
                    # the custom default value for datetime columns and any handwritten SQL
                    # formed with `strftime()`.
                    #
                    # store with "T" separator for time
                    # storage_format=(
                    #     "%(year)04d-%(month)02d-%(day)02d"
                    #     "T%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
                    # ),
                    # handle ISO 8601 with "T" or " " as the time separator
                    # regexp=r"(\d+)-(\d+)-(\d+)[T ](\d+):(\d+):(\d+).(\d+)",
                )
            )
        else:
            return dialect.type_descriptor(sa.TIMESTAMP(timezone=True))

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        else:
            if value.tzinfo is None:
                raise ValueError("Timestamps must have a timezone.")
            elif dialect.name == "sqlite":
                return pendulum.instance(value).in_timezone("UTC")
            else:
                return value

    def process_result_value(self, value, dialect):
        # retrieve timestamps in their native timezone (or UTC)
        if value is not None:
            return pendulum.instance(value).in_timezone("utc")

UUID

Bases: TypeDecorator

Platform-independent UUID type.

Uses PostgreSQL's UUID type, otherwise uses CHAR(36), storing as stringified hex values with hyphens.

Source code in prefect/server/utilities/database.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class UUID(TypeDecorator):
    """
    Platform-independent UUID type.

    Uses PostgreSQL's UUID type, otherwise uses
    CHAR(36), storing as stringified hex values with
    hyphens.
    """

    impl = TypeEngine
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.UUID())
        else:
            return dialect.type_descriptor(CHAR(36))

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        elif dialect.name == "postgresql":
            return str(value)
        elif isinstance(value, uuid.UUID):
            return str(value)
        else:
            return str(uuid.UUID(value))

    def process_result_value(self, value, dialect):
        if value is None:
            return value
        else:
            if not isinstance(value, uuid.UUID):
                value = uuid.UUID(value)
            return value

date_add

Bases: FunctionElement

Platform-independent way to add a date and an interval.

Source code in prefect/server/utilities/database.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class date_add(FunctionElement):
    """
    Platform-independent way to add a date and an interval.
    """

    type = Timestamp()
    name = "date_add"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, dt, interval):
        self.dt = dt
        self.interval = interval
        super().__init__()

date_diff

Bases: FunctionElement

Platform-independent difference of dates. Computes d1 - d2.

Source code in prefect/server/utilities/database.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
class date_diff(FunctionElement):
    """
    Platform-independent difference of dates. Computes d1 - d2.
    """

    type = sa.Interval()
    name = "date_diff"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, d1, d2):
        self.d1 = d1
        self.d2 = d2
        super().__init__()

interval_add

Bases: FunctionElement

Platform-independent way to add two intervals.

Source code in prefect/server/utilities/database.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
class interval_add(FunctionElement):
    """
    Platform-independent way to add two intervals.
    """

    type = sa.Interval()
    name = "interval_add"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, i1, i2):
        self.i1 = i1
        self.i2 = i2
        super().__init__()

json_contains

Bases: FunctionElement

Platform independent json_contains operator, tests if the left expression contains the right expression.

On postgres this is equivalent to the @> containment operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/server/utilities/database.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
class json_contains(FunctionElement):
    """
    Platform independent json_contains operator, tests if the
    `left` expression contains the `right` expression.

    On postgres this is equivalent to the @> containment operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_contains"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, left, right):
        self.left = left
        self.right = right
        super().__init__()

json_extract

Bases: FunctionElement

Platform independent json_extract operator, extracts a value from a JSON field via key.

On postgres this is equivalent to the ->> operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/server/utilities/database.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
class json_extract(FunctionElement):
    """
    Platform independent json_extract operator, extracts a value from a JSON
    field via key.

    On postgres this is equivalent to the ->> operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = sa.Text()
    name = "json_extract"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, column: sa.Column, path: str, wrap_quotes: bool = False):
        self.column = column
        self.path = path
        self.wrap_quotes = wrap_quotes
        super().__init__()

json_has_all_keys

Bases: FunctionElement

Platform independent json_has_all_keys operator.

On postgres this is equivalent to the ?& existence operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/server/utilities/database.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
class json_has_all_keys(FunctionElement):
    """Platform independent json_has_all_keys operator.

    On postgres this is equivalent to the ?& existence operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_has_all_keys"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, json_expr, values: List):
        self.json_expr = json_expr
        if isinstance(values, list) and not all(isinstance(v, str) for v in values):
            raise ValueError(
                "json_has_all_key values must be strings if provided as a literal list"
            )
        self.values = values
        super().__init__()

json_has_any_key

Bases: FunctionElement

Platform independent json_has_any_key operator.

On postgres this is equivalent to the ?| existence operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/server/utilities/database.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
class json_has_any_key(FunctionElement):
    """
    Platform independent json_has_any_key operator.

    On postgres this is equivalent to the ?| existence operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_has_any_key"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, json_expr, values: List):
        self.json_expr = json_expr
        if not all(isinstance(v, str) for v in values):
            raise ValueError("json_has_any_key values must be strings")
        self.values = values
        super().__init__()

now

Bases: FunctionElement

Platform-independent "now" generator.

Source code in prefect/server/utilities/database.py
242
243
244
245
246
247
248
249
250
class now(FunctionElement):
    """
    Platform-independent "now" generator.
    """

    type = Timestamp()
    name = "now"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = True

get_dialect

Get the dialect of a session, engine, or connection url.

Primary use case is figuring out whether the Prefect REST API is communicating with SQLite or Postgres.

Example
import prefect.settings
from prefect.server.utilities.database import get_dialect

dialect = get_dialect(PREFECT_API_DATABASE_CONNECTION_URL.value())
if dialect.name == "sqlite":
    print("Using SQLite!")
else:
    print("Using Postgres!")
Source code in prefect/server/utilities/database.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
def get_dialect(
    obj: Union[str, sa.orm.Session, sa.engine.Engine],
) -> sa.engine.Dialect:
    """
    Get the dialect of a session, engine, or connection url.

    Primary use case is figuring out whether the Prefect REST API is communicating with
    SQLite or Postgres.

    Example:
        ```python
        import prefect.settings
        from prefect.server.utilities.database import get_dialect

        dialect = get_dialect(PREFECT_API_DATABASE_CONNECTION_URL.value())
        if dialect.name == "sqlite":
            print("Using SQLite!")
        else:
            print("Using Postgres!")
        ```
    """
    if isinstance(obj, sa.orm.Session):
        url = obj.bind.url
    elif isinstance(obj, sa.engine.Engine):
        url = obj.url
    else:
        url = sa.engine.url.make_url(obj)

    return url.get_dialect()