Asynchronous SqlAlchemy and multiple databases

This article was first published on Python – Makimo – Consultancy & Software Development Services , and kindly contributed to python-bloggers. (You can report issue about the content on this page here)
Want to share your content on python-bloggers? click here.

Before we start

Please notice that the text below is a continuation of Kamil Kucharski’s article, which describes how to use SqlAlchemy asynchronously.

I encourage you to recall/familiarize yourself with it before reading the following to make the best use of the information presented below.

Connect to PostgreSQL with SQLAlchemy and asyncio

This tutorial shows how to connect to the PostgreSQL database within an asynchronous environment using SQLAlchemy and asyncio.

Introduction

In today’s rapidly evolving technological landscape, the need for handling multiple databases is more pressing than ever. Whether it’s about enhancing redundancy, optimizing performance, testing the validity of a new solution, or meeting various data storage requirements, using multiple databases can provide significant benefits.

Leveraging the power of Python asyncio library and SqlAlchemy, this article explores how to efficiently connect to and manage multiple databases. By embracing this approach, developers can ensure a scalable and resilient architecture, adeptly meeting the demands of modern applications as well as safely refactoring legacy systems without the risk of degrading production data.

A prime example of this can be seen in Amazon’s Aurora database system, which may have distinct endpoints for read and write operations. Aurora’s architecture allows developers to direct write traffic to the primary instance using the writer endpoint, while read queries can be balanced across available Aurora Replicas through reader endpoints. Utilizing Python’s asyncio and SQLAlchemy, developers can dynamically manage these different endpoints, enhancing performance and adding flexibility. Another compelling use case is found in a multi-tenant platform, such as a SaaS provider, which needs to manage separate databases for various businesses or tenants. With potentially hundreds or even thousands of distinct databases, efficient connection management becomes crucial. Leveraging asyncio and SQLAlchemy enables the platform to handle connections to various tenant databases concurrently and securely. This ensures data isolation and scalability as the company grows. These two examples highlight the multifaceted applications and substantial benefits of handling multiple databases in today’s complex and demanding technological environment.

The problem to solve

The task at hand is to add support for multiple databases to existing and well-functioning database-related code.

Implementing the same logic for different databases, as described in the article mentioned in the Before we start section, would work, but this is not the way a good developer (i.e. you, the reader) would do.

Solution 1 with routings

Let’s say that all business requirements defining which data must be stored or manipulated in a particular database are already defined. The rules are quite simple; moreover, the data stored in the first database are not related to those from the second database.

Under the above assumptions, the solution where you manually define the conditions for operations to be performed on each database will work perfectly well. To be precise, such database selection behavior is called routing.

To support multiple databases, a connection to each of them is required.

from enum import Enum


class Engines(Enum):
   PRIMARY = create_async_engine(
       url="<your_connection_string>",
       echo=True,
   )
   SECONDARY = create_async_engine(
       url="<your_connection_string>",
       echo=True,
   )

Routing rules must be defined in the get_bind method, since this is the one returning the expected database engine for the session instance.

In that case, all insert, update, or delete operations are to be performed on a secondary database.

In any other situation, the primary database is used.

from sqlalchemy import Insert, Update, Delete
from sqlalchemy.orm import Session


class RoutingSession(Session):
   def get_bind(self, mapper=None, clause=None, **kw):
       if isinstance(clause, (Insert, Update, Delete)):
           return Engines.SECONDARY.value.sync_engine

       return Engines.PRIMARY.value.sync_engine

This is just one example of routing rules. More of them are described in the official documentation.
Last but not least, the asynchronoussession instance must be created with defined routing rules.

from sqlalchemy.ext.asyncio import async_sessionmaker


def async_session_generator():
   return async_sessionmaker(
       sync_session_class=RoutingSession
   )

Now the async_session_generator method can be used in the same way as described in the article from the Before we start section with the difference that the individual queries/operations will be distributed to the corresponding database.

Entire file can be found here on GitHub.

Solution 2 with explicit engine selection

As it happens in life, the requirements for operations on two databases are not always so clearly defined, or worse, the data in one table depends on the other. In such a situation, the routing rules can be complicated and thus intricate to write and prone to errors.

Under such circumstances, explicitly defining the expected engine during session initialization may be a good approach since there are almost no limits for defining the engine for each use case.

In that solution, a defined generator must accept the engine instance as a parameter:

def async_session_generator(engine: AsyncEngine):
   return async_sessionmaker(
       bind=engine,
   )

as well as context manager must accept the name of the engine:

@asynccontextmanager
async def get_session(engine: Engines = Engines.PRIMARY):
   try:
       async_session = async_session_generator(engine=engine.value)

       async with async_session() as session:
           yield session
   except:
       await session.rollback()
       raise
   finally:
       await session.close()

This solution, although it allows to explicitly define which engine to use in a particular function, also has drawbacks; for example, incorrect data delineation can result in inconsistent data in the databases, so it is key for the developer to think carefully about defining the engine for each method. 

Entire files can be found here on GitHub.

What not to try

The session instance has a bind argument defining an engine assigned to a particular database session. There may be a temptation to directly modify this attribute in an active SQLAlchemy session, which is quite irresponsible. Such a change may lead to inconsistencies and unexpected behavior since the session’s state is not cleanly disconnected from the original engine and reconnected to the new one.

Examples of problems resulting from such a change are connections not returned to the pool or the transactional state becoming inconsistent, which in turn leads to data integrity issues.

Conclusion

Operations on multiple databases are now becoming more and more common, so it’s important for developers to learn how to work with them properly.

It’s crucial to review the requirements and then decide which solution to choose. The above tips will allow you to make the right decisions and point code adjustments.

And if you need a development team that can scale up your application to multiple databases…

Drop us a line!

To leave a comment for the author, please follow the link and comment on their blog: Python – Makimo – Consultancy & Software Development Services .

Want to share your content on python-bloggers? click here.