Skip to main content

Welcome!

prefect-sqlalchemy helps you connect to a database in your Prefect flows.

Getting started

Install prefect-sqlalchemy

The following command will install a version of prefect-sqlalchemy compatible with your installed version of prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.
pip install "prefect[sqlalchemy]"
Upgrade to the latest versions of prefect and prefect-sqlalchemy:
pip install -U "prefect[sqlalchemy]"

Register newly installed block types

Register the block types in the prefect-sqlalchemy module to make them available for use.
prefect block register -m prefect_sqlalchemy

Examples

Save credentials to a block

To use the load method on Blocks, you must have a block saved through code or saved through the UI.
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=SyncDriver.POSTGRESQL_PSYCOPG2,
        username="USERNAME-PLACEHOLDER",
        password="PASSWORD-PLACEHOLDER",
        host="localhost",
        port=5432,
        database="DATABASE-PLACEHOLDER",
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")
Load the saved block that holds your credentials:
from prefect_sqlalchemy import SqlAlchemyConnector

SqlAlchemyConnector.load("BLOCK_NAME-PLACEHOLDER")
The required arguments depend upon the desired driver. For example, SQLite requires only the driver and database arguments:
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=SyncDriver.SQLITE_PYSQLITE,
        database="DATABASE-PLACEHOLDER.db"
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")

Work with databases in a flow

To set up a table, use the execute and execute_many methods. Use the fetch_many method to retrieve data in a stream until there’s no more data. Use the SqlAlchemyConnector as a context manager, to ensure that the SQLAlchemy engine and any connected resources are closed properly after you’re done with them.
Async supportFor async workflows with async database drivers (like AsyncDriver.SQLITE_AIOSQLITE or AsyncDriver.POSTGRESQL_ASYNCPG), use AsyncSqlAlchemyConnector instead of SqlAlchemyConnector. See the Async tab below for a complete example.
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector


@task
def setup_table(block_name: str) -> None:
    with SqlAlchemyConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            parameters={"name": "Marvin", "address": "Highway 42"},
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Highway 42"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SqlAlchemyConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def sqlalchemy_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows


if __name__ == "__main__":
    sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER")

Resources

Refer to the prefect-sqlalchemy SDK documentation to explore all the capabilities of the prefect-sqlalchemy library. For assistance using SQLAlchemy, consult the SQLAlchemy documentation.