import asyncio, contextlib, os from dotenv import load_dotenv from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine ) from sqlalchemy.orm import sessionmaker from models import Base from typing import AsyncIterator, Any, Dict, List, Optional, Tuple, Type, Union load_dotenv() # Define the MySQL connection details DATABASE_URL = os.getenv('DATABASE_URL') # Create an engine to connect to the MySQL server engine = create_async_engine(DATABASE_URL, echo=True, future=True) async_session = async_sessionmaker( bind=engine, expire_on_commit=False ) async def create_async_db(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) class DatabaseSessionManager: def __init__(self, sessionmaker, engine): self.sessionmaker = sessionmaker self.engine = engine async def close(self): if self.engine: await self.engine.dispose() @contextlib.asynccontextmanager async def create_session(self) -> AsyncIterator[AsyncSession]: session = self.sessionmaker() try: yield session except Exception: await session.rollback() raise finally: await session.close() session_manager = DatabaseSessionManager(async_session, engine) async def get_async_db(): async with session_manager.create_session() as session: try: yield session finally: await session.close()