Connect to PostgreSQL with SQLAlchemy and asyncio

This article was first published on Python – Makimo – Consultancy & Software Development Services , and kindly contributed to python-bloggers. (You can report issue about the content on this page here)
Want to share your content on python-bloggers? click here.

Recently, more and more Python applications have been built based on async communication using the asyncio library that allows a programmer to write concurrent code with async/await syntax. This tutorial shows how to connect to the PostgreSQL database within an asynchronous environment using SQLAlchemy and asyncio.

Installing all necessary libraries

To connect to the PostgreSQL database using asynchronous communication, we need several libraries that will enable us to do so.

To get the ball rolling, we need the asyncio PostgreSQL client library for Python that will allow us to connect to the database. Luckily, there is one called asyncpg. To install the package, you should execute the following command:

$ pip install asyncpg

Second of all, we need to install SQLAlchemy. Sadly, we cannot just install the casual SQLAlchemy dependency. With a focus on async communication, we will use one that supports asyncio operations. In order to install the correct version of SQLAlchemy, we will use the following PIP command:

$ pip install sqlalchemy[asyncio]

Last but not least, we should install the Alembic dependency:

$ pip install alembic

to generate the database migration files.

Connecting to the PostgreSQL database

Now that all libraries have been installed, we can eventually connect to the PostgreSQL database. To achieve that, we need the connection string. 

Depending on your database credentials, the connection string should look like this:

postgresql+asyncpg://<db_username>:<db_secret>@<db_host>:<db_port>/<db_name>

You should pass the above connection string to the create_async_engine function provided by the asyncio SQLAlchemy extension. For instance, it could look like this:

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    <your_connection_string>,
    echo=True,
    future=True,
)

You can read more on create_async_engine parameters here in the SQLAlchemy documentation.

Generating database migrations

It is usually advisable to generate database migrations for your data models. For that job, SQLAlchemy recommends using the Alembic tool. To start using Alembic in async mode, you should run the following command in your terminal:

$ alembic init -t async alembic

Running the above command will generate the alembic directory. Inside this directory, we have to modify the env.py file with the connection string to your database and import your database models. Let’s say we have the following database.py file with one SQLAlchemy ORM:

from sqlalchemy import (
    Column,
    Integer,
    String,
)
from sqlalchemy.orm import declarative_base

Base = declarative_base()


class Tutorial(Base):
    __tablename__ = "tutorials"

    tutorial_id = Column(Integer, primary_key=True)
    name = Column(String(255), nullable=False)

With such configuration, we need to add the following code to the env.py file inside alembic :

from . import database

# Set target metadata so Alembic knows where the models are
target_metadata = database.Base.metadata

# Set URL to the database
config.set_main_option("sqlalchemy.url",  <your_connection_string>)

You can look at the whole file here.

Now, we can create the migration file that will generate the database table corresponding to the Tutorial model. In order to do that, you should you use the following command:

# Generate new migration file
$ alembic revision --autogenerate -m "Add Tutorial model"

# Apply the migration
$ alembic upgrade head

Managing session with contextmanager

To save, read or delete data from the database, we require a session. To create an AsyncSession instance, we need to add a session generator first. To create one, we should use the sessionmaker function from SQLAlchemy ORM:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker

def async_session_generator():
    return sessionmaker(
        engine, class_=AsyncSession
    )

Now we can create the AsyncSession instance and use it in our code. One of the best options is to create a contextmanager that will close the session for us when our code stops using it. An important thing to note is that contextmanager won’t work in the async environment. Instead, we should use the asynccontextmanager:

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_session():
    try:
        async_session = async_session_generator()

        async with async_session() as session:
            yield session
    except:
        await session.rollback()
        raise
    finally:
        await session.close()

Using the asynccontextmanager

In order to utilize the asynccontextmanager, we will use Python’s async with statement:

from .database import get_session
from .dto import Tutorial

async save_tutorial(tutorial: Tutorial) -> Tutorial:
    async with get_session() as session:
        …
        await session.commit()
        …

Conclusion

With the increasing popularity of libraries like Starlette or FastAPI, the demand for async/await support has grown a lot. Nowadays, database integration is a fundamental operation that should cause no difficulties even in the async/await world. A world that is slowly replacing synchronous Python applications.

You can find the entire example here on GitHub.

If you need a dedicated development team keeping up with the Python ecosystem…

Let’s talk!

To leave a comment for the author, please follow the link and comment on their blog: Python – Makimo – Consultancy & Software Development Services .

Want to share your content on python-bloggers? click here.