Welcome!
prefect-sqlalchemy helps you connect to a database in your Prefect flows.
Getting started
Install prefect-sqlalchemy
The following command will install a version of prefect-sqlalchemy compatible with your installed version of prefect.
If you don’t already have prefect installed, it will install the newest version of prefect as well.
pip install "prefect[sqlalchemy]"
Upgrade to the latest versions of prefect and prefect-sqlalchemy:
pip install -U "prefect[sqlalchemy]"
Register newly installed block types
Register the block types in the prefect-sqlalchemy module to make them available for use.
prefect block register -m prefect_sqlalchemy
Examples
Save credentials to a block
To use the load method on Blocks, you must have a block saved through code or saved through the UI.
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.POSTGRESQL_PSYCOPG2,
username="USERNAME-PLACEHOLDER",
password="PASSWORD-PLACEHOLDER",
host="localhost",
port=5432,
database="DATABASE-PLACEHOLDER",
)
)
connector.save("BLOCK_NAME-PLACEHOLDER")
Load the saved block that holds your credentials:
from prefect_sqlalchemy import SqlAlchemyConnector
SqlAlchemyConnector.load("BLOCK_NAME-PLACEHOLDER")
The required arguments depend upon the desired driver. For example, SQLite requires only the driver and database arguments:
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.SQLITE_PYSQLITE,
database="DATABASE-PLACEHOLDER.db"
)
)
connector.save("BLOCK_NAME-PLACEHOLDER")
Work with databases in a flow
To set up a table, use the execute and execute_many methods.
Use the fetch_many method to retrieve data in a stream until there’s no more data.
Use the SqlAlchemyConnector as a context manager, to ensure that the SQLAlchemy engine and any connected resources are closed properly after you’re done with them.
Async supportFor async workflows with async database drivers (like AsyncDriver.SQLITE_AIOSQLITE or AsyncDriver.POSTGRESQL_ASYNCPG), use AsyncSqlAlchemyConnector instead of SqlAlchemyConnector. See the Async tab below for a complete example.
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
def setup_table(block_name: str) -> None:
with SqlAlchemyConnector.load(block_name) as connector:
connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
connector.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
connector.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Highway 42"},
],
)
@task
def fetch_data(block_name: str) -> list:
all_rows = []
with SqlAlchemyConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
def sqlalchemy_flow(block_name: str) -> list:
setup_table(block_name)
all_rows = fetch_data(block_name)
return all_rows
if __name__ == "__main__":
sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER")
from prefect import flow, task
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
import asyncio
@task
async def setup_table(block_name: str) -> None:
async with await AsyncSqlAlchemyConnector.load(block_name) as connector:
await connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
await connector.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
await connector.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Highway 42"},
],
)
@task
async def fetch_data(block_name: str) -> list:
all_rows = []
async with await AsyncSqlAlchemyConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
async def sqlalchemy_flow(block_name: str) -> list:
await setup_table(block_name)
all_rows = await fetch_data(block_name)
return all_rows
if __name__ == "__main__":
asyncio.run(sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER"))
Resources
Refer to the prefect-sqlalchemy SDK documentation to explore all the capabilities of the prefect-sqlalchemy library.
For assistance using SQLAlchemy, consult the SQLAlchemy documentation.