Skip to main content

prefect_sqlalchemy.database

Tasks for querying a database with SQLAlchemy

Functions

check_make_url

check_make_url(url: str) -> str

Classes

SqlAlchemyConnector

Block used to manage authentication with a database using synchronous drivers. Upon instantiating, an engine is created and maintained for the life of the object until the close method is called. It is recommended to use this block as a context manager, which will automatically close the engine and its connections when the context is exited. It is also recommended that this block is loaded and consumed within a single task or flow because if the block is passed across separate tasks and flows, the state of the block’s connection and cursor could be lost. Attributes:
  • connection_info: SQLAlchemy URL to create the engine; either create from components or create from a string.
  • connect_args: The options which will be passed directly to the DBAPI’s connect() method as additional keyword arguments.
  • fetch_size: The number of rows to fetch at a time.
Methods:

block_initialization

block_initialization(self) -> None
Initializes the engine.

close

close(self) -> None
Closes sync connections and its cursors.

execute

execute(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> CursorResult
Executes an operation on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE. Unlike the fetch methods, this method will always execute the operation upon calling. Args:
  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.
Examples: Create a table and insert one row into it.
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        parameters={"name": "Marvin", "address": "Highway 42"},
    )

execute_many

execute_many(self, operation: str, seq_of_parameters: List[Dict[str, Any]], **execution_options: Dict[str, Any]) -> CursorResult
Executes many operations on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE. Unlike the fetch methods, this method will always execute the operation upon calling. Args:
  • operation: The SQL query or other operation to be executed.
  • seq_of_parameters: The sequence of parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.
Examples: Create a table and insert two rows into it.
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute_many(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )

fetch_all

fetch_all(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]
Fetch all results from the database. Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called. Args:
  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.
Returns:
  • A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple.
Examples: Create a table, insert three rows into it, and fetch all where name is ‘Me’.
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute_many(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = database.fetch_all(
        "SELECT * FROM customers WHERE name = :name",
        parameters={"name": "Me"}
    )

fetch_many

fetch_many(self, operation: str, parameters: Optional[Dict[str, Any]] = None, size: Optional[int] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]
Fetch a limited number of results from the database. Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called. Args:
  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • size: The number of results to return; if None or 0, uses the value of fetch_size configured on the block.
  • **execution_options: Options to pass to Connection.execution_options.
Returns:
  • A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple.
Examples: Create a table, insert three rows into it, and fetch two rows repeatedly.
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute_many(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = database.fetch_many("SELECT * FROM customers", size=2)
    print(results)
    results = database.fetch_many("SELECT * FROM customers", size=2)
    print(results)

fetch_one

fetch_one(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> Tuple[Any]
Fetch a single result from the database. Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called. Args:
  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.
Returns:
  • A tuple containing the data returned by the database, where each column is a value in the tuple.
Examples: Create a table, insert three rows into it, and fetch a row repeatedly.
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    database.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    database.execute_many(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = True
    while results:
        results = database.fetch_one("SELECT * FROM customers")
        print(results)

get_client

get_client(self, client_type: Literal['engine', 'connection'], **get_client_kwargs: Dict[str, Any]) -> Union[Engine, Connection]
Returns either an engine or connection that can be used to query from databases. Args:
  • client_type: Select from either ‘engine’ or ‘connection’.
  • **get_client_kwargs: Additional keyword arguments to pass to either get_engine or get_connection.
Returns:
  • The authenticated SQLAlchemy engine or connection.
Examples: Create an engine.
from prefect_sqlalchemy import SqlAlchemyConnector

sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
engine = sqlalchemy_connector.get_client(client_type="engine")
Create a context managed connection.
from prefect_sqlalchemy import SqlAlchemyConnector

sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
with sqlalchemy_connector.get_client(client_type="connection") as conn:
    ...

get_connection

get_connection(self, begin: bool = True, **connect_kwargs: Dict[str, Any]) -> Connection
Returns a connection that can be used to query from databases. Args:
  • begin: Whether to begin a transaction on the connection; if True, if any operations fail, the entire transaction will be rolled back.
  • **connect_kwargs: Additional keyword arguments to pass to either engine.begin or engine.connect`.
Returns:
  • The SQLAlchemy Connection.
Examples: Create a synchronous connection as a context-managed transaction.
from prefect_sqlalchemy import SqlAlchemyConnector

sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
with sqlalchemy_connector.get_connection(begin=False) as connection:
    connection.execute("SELECT * FROM table LIMIT 1;")

get_engine

get_engine(self, **create_engine_kwargs: Dict[str, Any]) -> Engine
Returns an authenticated engine that can be used to query from databases. If an existing engine exists, return that one. Returns:
  • The authenticated SQLAlchemy Engine.
Examples: Create a synchronous engine to PostgreSQL using URL params.
from prefect import flow
from prefect_sqlalchemy import (
    SqlAlchemyConnector, ConnectionComponents, SyncDriver
)

@flow
def sqlalchemy_credentials_flow():
    sqlalchemy_credentials = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
            driver=SyncDriver.POSTGRESQL_PSYCOPG2,
            username="prefect",
            password="prefect_password",
            database="postgres"
        )
    )
    print(sqlalchemy_credentials.get_engine())

sqlalchemy_credentials_flow()

reset_connections

reset_connections(self) -> None
Tries to close all opened connections and their results. Examples: Resets connections so fetch_* methods return new results.
from prefect_sqlalchemy import SqlAlchemyConnector

with SqlAlchemyConnector.load("MY_BLOCK") as database:
    results = database.fetch_one("SELECT * FROM customers")
    database.reset_connections()
    results = database.fetch_one("SELECT * FROM customers")

AsyncSqlAlchemyConnector

Block used to manage authentication with a database using asynchronous drivers. Upon instantiating, an engine is created and maintained for the life of the object until the close method is called. It is recommended to use this block as an async context manager, which will automatically close the engine and its connections when the context is exited. It is also recommended that this block is loaded and consumed within a single task or flow because if the block is passed across separate tasks and flows, the state of the block’s connection and cursor could be lost. Attributes:
  • connection_info: SQLAlchemy URL to create the engine; either create from components or create from a string.
  • connect_args: The options which will be passed directly to the DBAPI’s connect() method as additional keyword arguments.
  • fetch_size: The number of rows to fetch at a time.
Methods:

block_initialization

block_initialization(self) -> None
Initializes the engine.

close

close(self) -> None
Closes async connections and its cursors.

execute

execute(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> CursorResult
Executes an operation on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE. Unlike the fetch methods, this method will always execute the operation upon calling. Args:
  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.
Examples: Create a table and insert one row into it.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            parameters={"name": "Marvin", "address": "Highway 42"},
        )

asyncio.run(example_run())

execute_many

execute_many(self, operation: str, seq_of_parameters: List[Dict[str, Any]], **execution_options: Dict[str, Any]) -> CursorResult
Executes many operations on the database. This method is intended to be used for operations that do not return data, such as INSERT, UPDATE, or DELETE. Unlike the fetch methods, this method will always execute the operation upon calling. Args:
  • operation: The SQL query or other operation to be executed.
  • seq_of_parameters: The sequence of parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.
Examples: Create a table and insert two rows into it.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

asyncio.run(example_run())

fetch_all

fetch_all(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]
Fetch all results from the database. Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called. Args:
  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.
Returns:
  • A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple.
Examples: Create a table, insert three rows into it, and fetch all where name is ‘Me’.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )
        results = await database.fetch_all(
            "SELECT * FROM customers WHERE name = :name",
            parameters={"name": "Me"}
        )

asyncio.run(example_run())

fetch_many

fetch_many(self, operation: str, parameters: Optional[Dict[str, Any]] = None, size: Optional[int] = None, **execution_options: Dict[str, Any]) -> List[Tuple[Any]]
Fetch a limited number of results from the database. Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called. Args:
  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • size: The number of results to return; if None or 0, uses the value of fetch_size configured on the block.
  • **execution_options: Options to pass to Connection.execution_options.
Returns:
  • A list of tuples containing the data returned by the database, where each row is a tuple and each column is a value in the tuple.
Examples: Create a table, insert three rows into it, and fetch two rows repeatedly.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )
        results = await database.fetch_many("SELECT * FROM customers", size=2)
        print(results)
        results = await database.fetch_many("SELECT * FROM customers", size=2)
        print(results)

asyncio.run(example_run())

fetch_one

fetch_one(self, operation: str, parameters: Optional[Dict[str, Any]] = None, **execution_options: Dict[str, Any]) -> Tuple[Any]
Fetch a single result from the database. Repeated calls using the same inputs to any of the fetch methods of this block will skip executing the operation again, and instead, return the next set of results from the previous execution, until the reset_connections method is called. Args:
  • operation: The SQL query or other operation to be executed.
  • parameters: The parameters for the operation.
  • **execution_options: Options to pass to Connection.execution_options.
Returns:
  • A tuple containing the data returned by the database, where each column is a value in the tuple.
Examples: Create a table, insert three rows into it, and fetch a row repeatedly.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await database.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )
        results = True
        while results:
            results = await database.fetch_one("SELECT * FROM customers")
            print(results)

asyncio.run(example_run())

get_client

get_client(self, client_type: Literal['engine', 'connection'], **get_client_kwargs: Dict[str, Any]) -> Union[AsyncEngine, AsyncConnection]
Returns either an engine or connection that can be used to query from databases. Args:
  • client_type: Select from either ‘engine’ or ‘connection’.
  • **get_client_kwargs: Additional keyword arguments to pass to either get_engine or get_connection.
Returns:
  • The authenticated SQLAlchemy engine or connection.
Examples: Create an engine.
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

sqlalchemy_connector = AsyncSqlAlchemyConnector.load("BLOCK_NAME")
engine = sqlalchemy_connector.get_client(client_type="engine")
Create a context managed connection.
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

sqlalchemy_connector = AsyncSqlAlchemyConnector.load("BLOCK_NAME")
async with sqlalchemy_connector.get_client(client_type="connection") as conn:
    ...

get_connection

get_connection(self, begin: bool = True, **connect_kwargs: Dict[str, Any]) -> AsyncConnection
Returns a connection that can be used to query from databases. Args:
  • begin: Whether to begin a transaction on the connection; if True, if any operations fail, the entire transaction will be rolled back.
  • **connect_kwargs: Additional keyword arguments to pass to either engine.begin or engine.connect`.
Returns:
  • The SQLAlchemy AsyncConnection.
Examples: Create an asynchronous connection as a context-managed transaction.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def main():
    sqlalchemy_connector = await AsyncSqlAlchemyConnector.load("BLOCK_NAME")
    async with sqlalchemy_connector.get_connection(begin=False) as connection:
        await connection.execute("SELECT * FROM table LIMIT 1;")

asyncio.run(main())

get_engine

get_engine(self, **create_engine_kwargs: Dict[str, Any]) -> AsyncEngine
Returns an authenticated engine that can be used to query from databases. If an existing engine exists, return that one. Returns:
  • The authenticated SQLAlchemy AsyncEngine.
Examples: Create an asynchronous engine to PostgreSQL using URL params.
from prefect import flow
from prefect_sqlalchemy import (
    AsyncSqlAlchemyConnector, ConnectionComponents, AsyncDriver
)

@flow
async def sqlalchemy_credentials_flow():
    sqlalchemy_credentials = AsyncSqlAlchemyConnector(
    connection_info=ConnectionComponents(
            driver=AsyncDriver.POSTGRESQL_ASYNCPG,
            username="prefect",
            password="prefect_password",
            database="postgres"
        )
    )
    print(sqlalchemy_credentials.get_engine())

asyncio.run(sqlalchemy_credentials_flow())

reset_connections

reset_connections(self) -> None
Tries to close all opened connections and their results. Examples: Resets connections so fetch_* methods return new results.
import asyncio
from prefect_sqlalchemy import AsyncSqlAlchemyConnector

async def example_run():
    async with AsyncSqlAlchemyConnector.load("MY_BLOCK") as database:
        results = await database.fetch_one("SELECT * FROM customers")
        await database.reset_connections()
        results = await database.fetch_one("SELECT * FROM customers")

asyncio.run(example_run())