Connect to PostgreSQL with SQLAlchemy and asyncio
Want to share your content on python-bloggers? click here.
Recently, more and more Python applications have been built based on async communication using the asyncio
library that allows a programmer to write concurrent code with async/await syntax. This tutorial shows how to connect to the PostgreSQL database within an asynchronous environment using SQLAlchemy and asyncio.
Installing all necessary libraries
To connect to the PostgreSQL database using asynchronous communication, we need several libraries that will enable us to do so.
To get the ball rolling, we need the asyncio PostgreSQL client library for Python that will allow us to connect to the database. Luckily, there is one called asyncpg. To install the package, you should execute the following command:
$ pip install asyncpg
Second of all, we need to install SQLAlchemy. Sadly, we cannot just install the casual SQLAlchemy dependency. With a focus on async communication, we will use one that supports asyncio operations. In order to install the correct version of SQLAlchemy, we will use the following PIP command:
$ pip install sqlalchemy[asyncio]
Last but not least, we should install the Alembic dependency:
$ pip install alembic
to generate the database migration files.
Connecting to the PostgreSQL database
Now that all libraries have been installed, we can eventually connect to the PostgreSQL database. To achieve that, we need the connection string.
Depending on your database credentials, the connection string should look like this:
postgresql+asyncpg://<db_username>:<db_secret>@<db_host>:<db_port>/<db_name>
You should pass the above connection string to the create_async_engine
function provided by the asyncio SQLAlchemy extension. For instance, it could look like this:
from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine( <your_connection_string>, echo=True, future=True, )
You can read more on create_async_engine
parameters here in the SQLAlchemy documentation.
Generating database migrations
It is usually advisable to generate database migrations for your data models. For that job, SQLAlchemy recommends using the Alembic tool. To start using Alembic in async mode, you should run the following command in your terminal:
$ alembic init -t async alembic
Running the above command will generate the alembic
directory. Inside this directory, we have to modify the env.py
file with the connection string to your database and import your database models. Let’s say we have the following database.py
file with one SQLAlchemy ORM:
from sqlalchemy import ( Column, Integer, String, ) from sqlalchemy.orm import declarative_base Base = declarative_base() class Tutorial(Base): __tablename__ = "tutorials" tutorial_id = Column(Integer, primary_key=True) name = Column(String(255), nullable=False)
With such configuration, we need to add the following code to the env.py
file inside alembic
:
from . import database # Set target metadata so Alembic knows where the models are target_metadata = database.Base.metadata # Set URL to the database config.set_main_option("sqlalchemy.url", <your_connection_string>)
You can look at the whole file here.
Now, we can create the migration file that will generate the database table corresponding to the Tutorial
model. In order to do that, you should you use the following command:
# Generate new migration file $ alembic revision --autogenerate -m "Add Tutorial model" # Apply the migration $ alembic upgrade head
Managing session with contextmanager
To save, read or delete data from the database, we require a session. To create an AsyncSession
instance, we need to add a session generator first. To create one, we should use the sessionmaker
function from SQLAlchemy ORM:
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import sessionmaker def async_session_generator(): return sessionmaker( engine, class_=AsyncSession )
Now we can create the AsyncSession
instance and use it in our code. One of the best options is to create a contextmanager
that will close the session for us when our code stops using it. An important thing to note is that contextmanager
won’t work in the async environment. Instead, we should use the asynccontextmanager
:
from contextlib import asynccontextmanager @asynccontextmanager async def get_session(): try: async_session = async_session_generator() async with async_session() as session: yield session except: await session.rollback() raise finally: await session.close()
Using the asynccontextmanager
In order to utilize the asynccontextmanager
, we will use Python’s async with
statement:
from .database import get_session from .dto import Tutorial async save_tutorial(tutorial: Tutorial) -> Tutorial: async with get_session() as session: … await session.commit() …
Conclusion
With the increasing popularity of libraries like Starlette or FastAPI, the demand for async/await support has grown a lot. Nowadays, database integration is a fundamental operation that should cause no difficulties even in the async/await world. A world that is slowly replacing synchronous Python applications.
You can find the entire example here on GitHub.
If you need a dedicated development team keeping up with the Python ecosystem…
Want to share your content on python-bloggers? click here.