Recently, more and more Python applications have been built based on async communication using the
asyncio library that allows a programmer to write concurrent code with async/await syntax. This tutorial shows how to connect to the PostgreSQL database within an asynchronous environment using SQLAlchemy and asyncio.
Installing all necessary libraries
To connect to the PostgreSQL database using asynchronous communication, we need several libraries that will enable us to do so.
To get the ball rolling, we need the asyncio PostgreSQL client library for Python that will allow us to connect to the database. Luckily, there is one called asyncpg. To install the package, you should execute the following command:
$ pip install asyncpg
Second of all, we need to install SQLAlchemy. Sadly, we cannot just install the casual SQLAlchemy dependency. With a focus on async communication, we will use one that supports asyncio operations. In order to install the correct version of SQLAlchemy, we will use the following PIP command:
$ pip install sqlalchemy[asyncio]
Last but not least, we should install the Alembic dependency:
$ pip install alembic
to generate the database migration files.
Connecting to the PostgreSQL database
Now that all libraries have been installed, we can eventually connect to the PostgreSQL database. To achieve that, we need the connection string.
Depending on your database credentials, the connection string should look like this:
You should pass the above connection string to the
create_async_engine function provided by the asyncio SQLAlchemy extension. For instance, it could look like this:
from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine( <your_connection_string>, echo=True, future=True, )
You can read more on
create_async_engine parameters here in the SQLAlchemy documentation.
Generating database migrations
It is usually advisable to generate database migrations for your data models. For that job, SQLAlchemy recommends using the Alembic tool. To start using Alembic in async mode, you should run the following command in your terminal:
$ alembic init -t async alembic
Running the above command will generate the
alembic directory. Inside this directory, we have to modify the
env.py file with the connection string to your database and import your database models. Let’s say we have the following
database.py file with one SQLAlchemy ORM:
from sqlalchemy import ( Column, Integer, String, ) from sqlalchemy.orm import declarative_base Base = declarative_base() class Tutorial(Base): __tablename__ = "tutorials" tutorial_id = Column(Integer, primary_key=True) name = Column(String(255), nullable=False)
With such configuration, we need to add the following code to the
env.py file inside
from . import database # Set target metadata so Alembic knows where the models are target_metadata = database.Base.metadata # Set URL to the database config.set_main_option("sqlalchemy.url", <your_connection_string>)
You can look at the whole file here.
Now, we can create the migration file that will generate the database table corresponding to the
Tutorial model. In order to do that, you should you use the following command:
# Generate new migration file $ alembic revision --autogenerate -m "Add Tutorial model" # Apply the migration $ alembic upgrade head
Managing session with
To save, read or delete data from the database, we require a session. To create an
AsyncSession instance, we need to add a session generator first. To create one, we should use the
sessionmaker function from SQLAlchemy ORM:
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import sessionmaker def async_session_generator(): return sessionmaker( engine, class_=AsyncSession )
Now we can create the
AsyncSession instance and use it in our code. One of the best options is to create a
contextmanager that will close the session for us when our code stops using it. An important thing to note is that
contextmanager won’t work in the async environment. Instead, we should use the
from contextlib import asynccontextmanager @asynccontextmanager async def get_session(): try: async_session = async_session_generator() async with async_session() as session: yield session except: await session.rollback() raise finally: await session.close()
In order to utilize the
asynccontextmanager, we will use Python’s
async with statement:
from .database import get_session from .dto import Tutorial async save_tutorial(tutorial: Tutorial) -> Tutorial: async with get_session() as session: … await session.commit() …
With the increasing popularity of libraries like Starlette or FastAPI, the demand for async/await support has grown a lot. Nowadays, database integration is a fundamental operation that should cause no difficulties even in the async/await world. A world that is slowly replacing synchronous Python applications.
You can find the entire example here on GitHub.