12. Refactoring 2: Backend - Applying Async Database Management
This refactoring adopts asynchronous database management to avoid blocking caused by synchronous I/O.
Previously, database access was implemented synchronously across services and endpoints, which could block under concurrent workloads. Because this program needs to perform multiple tasks simultaneously, I decided to switch to an async-first approach for database operations.
1. Config
-
To enable asynchronous PostgreSQL support, install
asyncpg
:uv pip install asyncpg
-
In config.py, I added the following property to build the async SQLAlchemy URL for PostgreSQL:
class Settings(BaseSettings): ... @property def async_db_url(self): return f"postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}" ...
2. Async Database
-
This section initializes the async engine, configures a session factory, and exposes a single declarative base for ORM models.
-
I implemented this class by referencing the sync database class.
import os from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy.orm import declarative_base from .config import settings DEFAULT_SCHEMA = os.getenv("DB_SCHEMA", "public") metadata = MetaData(schema=DEFAULT_SCHEMA) Base = declarative_base(metadata=metadata) class AsyncDatabase: """Async database connection and session management.""" Base = declarative_base() def __init__(self): self._engine = create_async_engine( settings.async_db_url, echo=False, pool_pre_ping=True, connect_args={"server_settings": {"search_path": os.getenv("DB_SCHEMA", "public")}}, ) self._SessionLocal = async_sessionmaker( bind=self._engine, expire_on_commit=False, autoflush=False, autocommit=False ) def get_engine(self): """Returns the async SQLAlchemy engine.""" return self._engine def get_session(self) -> AsyncSession: """Returns a new async session.""" return self._SessionLocal() @classmethod def get_base(cls): """Returns the declarative base class.""" return cls.Base
DEFAULT_SCHEMA
/MetaData(schema=DEFAULT_SCHEMA)
: Defines a default PostgreSQL schema (e.g.,public
). Attaching this `metadata to the declarative base makes DDL/Autogenerate target the right schema without qualifying every table name.Base = declarative_base(metadata=metadata)
: Exposes a single declarative base for all ORM models. Import thisBase
everywhere you define models so they share the same metadata and schema.create_async_engine(settings.async_db_url, ...)
: Builds an async SQLAlchemy engine (driver:asyncpg
) for event-loop friendly I/O.echo=False
: Keeps SQL logging off in production (enable when debugging)pool_pre_ping=True
: Pings a connection before checkout to avoid stale-connection errors after network hiccups or DB restarts.connect_args={"server_settings": {"search_path": ...}}
: Setssearch_path
on each connection so queries run against your schema without schema-qualifying identifiers.
async_sessionmaker(bind=_engine, expire_on_commit=False, autoflush=False, autocommit=False)
: Creates a factory forAsyncSession
.expire_on_commit=False
: Objects remain usable afterCOMMIT
(no implicit reload).autoflush=False
: Prevents surprise flushes; you controlflush()
/begin()
explicitly.autocommit=False
: Transactions are explicit (async with session.begin(): ...
).
get_engine()
: Returns the process-wide async engine. It’s safe to share the engine across coroutines.get_session() -> AsyncSession
: Returns a new async session.
3. Async Database Initializer
-
This method is responsible for initializing the async database.
from core import AsyncDatabase from loguru import logger from sqlalchemy import text import os DEFAULT_SCHEMA = os.getenv("DB_SCHEMA", "public") async def initialize_async_database(): logger.info("Starting async database initialization process...") db = AsyncDatabase() Base = db.get_base() engine = db.get_engine() logger.info("Creating async tables...") async with engine.begin() as conn: await conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{DEFAULT_SCHEMA}"')) await conn.run_sync(Base.metadata.create_all) logger.info("Async DB initialized.")
-
To run this at startup, I added it to the FastAPI lifespan context manager.
... @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Starting up the application...") try: await initialize_async_database() logger.info("Async Database initialized successfully.") loop = asyncio.get_running_loop() await loop.run_in_executor(None, initialize_database) logger.info("Sync Database initialized successfully.") except Exception as e: logger.exception(f"Database initialization failed: {e}") raise ...
-
To make the async database session easy to use.
from core import Database, AsyncDatabase from sqlalchemy.orm import Session from sqlalchemy.ext.asyncio import AsyncSession from typing import Generator, AsyncGenerator ... async def get_async_db() -> AsyncGenerator[AsyncSession, None]: db = AsyncDatabase().get_session() try: yield db finally: await db.close()
4. Changes in Services and APIs
4.1 Services
-
Before
from sqlalchemy.orm import Session ... class AuthService: def __init__(self, db: Session, request: Request): ...
-
After
from sqlalchemy.ext.asyncio import AsyncSession ... class AuthService: def __init__(self, db: AsyncSession, request: Request): ...
4.2 APIs
-
Before
... @auth_router.post("/login", response_model=LoginResponse, summary="User Login") def login(request: Request, body: LoginRequest, db: Session = Depends(get_db)): ...
-
After
@auth_router.post("/login", response_model=LoginResponse, summary="User Login") async def login(request: Request, body: LoginRequest, db: AsyncSession = Depends(get_async_db)):