from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.orm import declarative_base from ..core.config import settings from sqlalchemy import text import contextlib # Create async engine async_engine = create_async_engine( settings.DATABASE_URL, echo=True, future=True, pool_pre_ping=True ) # Create async session factory async_session_maker = async_sessionmaker( bind=async_engine, class_=AsyncSession, expire_on_commit=False ) # Declarative base for models Base = declarative_base() # FastAPI dependency for DB access async def get_db(): async with async_session_maker() as session: try: yield session finally: await session.close() # Verify DB connection async def verify_connection(): async with async_session_maker() as session: try: await session.execute(text("SELECT 1")) except Exception as e: raise ConnectionError(f"Database connection verification failed: {str(e)}") # Init DB (called during app startup) async def init_db(): await verify_connection() # Optional: Custom DB access class class Database: def __init__(self): self._session_factory = async_session_maker @contextlib.asynccontextmanager async def session(self): session = self._session_factory() try: yield session await session.commit() except: await session.rollback() raise finally: await session.close() async def get_session(self): return self._session_factory() # Singleton DB instance db = Database()