mysql / db.py
ariansyahdedy's picture
Single docker
aa26805
import asyncio, contextlib, os
from dotenv import load_dotenv
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine
)
from sqlalchemy.orm import sessionmaker
from models import Base
from typing import AsyncIterator, Any, Dict, List, Optional, Tuple, Type, Union
load_dotenv()
# Define the MySQL connection details
DATABASE_URL = os.getenv('DATABASE_URL')
# Create an engine to connect to the MySQL server
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
async_session = async_sessionmaker(
bind=engine,
expire_on_commit=False
)
async def create_async_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
class DatabaseSessionManager:
def __init__(self, sessionmaker, engine):
self.sessionmaker = sessionmaker
self.engine = engine
async def close(self):
if self.engine:
await self.engine.dispose()
@contextlib.asynccontextmanager
async def create_session(self) -> AsyncIterator[AsyncSession]:
session = self.sessionmaker()
try:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()
session_manager = DatabaseSessionManager(async_session, engine)
async def get_async_db():
async with session_manager.create_session() as session:
try:
yield session
finally:
await session.close()