prefect-sqlalchemy
Welcome!
prefect-sqlalchemy
helps you connect to a database in your Prefect flows.
Getting started
Install prefect-sqlalchemy
The following command will install a version of prefect-sqlalchemy
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
pip install "prefect[sqlalchemy]"
Upgrade to the latest versions of prefect
and prefect-sqlalchemy
:
pip install -U "prefect[sqlalchemy]"
Register newly installed block types
Register the block types in the prefect-sqlalchemy
module to make them available for use.
prefect block register -m prefect_sqlalchemy
Examples
Save credentials to a block
To use the load
method on Blocks, you must have a block saved through code or saved through the UI.
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.POSTGRESQL_PSYCOPG2,
username="USERNAME-PLACEHOLDER",
password="PASSWORD-PLACEHOLDER",
host="localhost",
port=5432,
database="DATABASE-PLACEHOLDER",
)
)
connector.save("BLOCK_NAME-PLACEHOLDER")
Load the saved block that holds your credentials:
from prefect_sqlalchemy import SqlAlchemyConnector
SqlAlchemyConnector.load("BLOCK_NAME-PLACEHOLDER")
The required arguments depend upon the desired driver. For example, SQLite requires only the driver
and database
arguments:
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.SQLITE_PYSQLITE,
database="DATABASE-PLACEHOLDER.db"
)
)
connector.save("BLOCK_NAME-PLACEHOLDER")
Work with databases in a flow
To set up a table, use the execute
and execute_many
methods.
Use the fetch_many
method to retrieve data in a stream until there’s no more data.
Use the SqlAlchemyConnector
as a context manager, to ensure that the SQLAlchemy engine and any connected resources are closed properly after you’re done with them.
Async support
SqlAlchemyConnector
supports async workflows. Just be sure to save, load, and use an async driver, as in the example below.
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, AsyncDriver
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=AsyncDriver.SQLITE_AIOSQLITE,
database="DATABASE-PLACEHOLDER.db"
)
)
if __name__ == "__main__":
connector.save("BLOCK_NAME-PLACEHOLDER")
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
def setup_table(block_name: str) -> None:
with SqlAlchemyConnector.load(block_name) as connector:
connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
connector.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
connector.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Highway 42"},
],
)
@task
def fetch_data(block_name: str) -> list:
all_rows = []
with SqlAlchemyConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
def sqlalchemy_flow(block_name: str) -> list:
setup_table(block_name)
all_rows = fetch_data(block_name)
return all_rows
if __name__ == "__main__":
sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER")
Resources
Refer to the prefect-sqlalchemy
SDK documentation linked in the sidebar to explore all the capabilities of the prefect-sqlalchemy
library.
For assistance using SQLAlchemy, consult the SQLAlchemy documentation.