""" Database connection pooling and optimization module """ import logging import os from contextlib import asynccontextmanager from collections.abc import AsyncGenerator import redis from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import QueuePool logger = logging.getLogger(__name__) # Database configuration DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:password@database:5432/zenith") REDIS_URL = os.getenv("REDIS_URL", "redis://cache-service:6379") # Connection pool configuration POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "20")) MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "30")) POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30")) POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "3600")) # Create optimized async engine with connection pooling engine = create_async_engine( DATABASE_URL, poolclass=QueuePool, pool_size=POOL_SIZE, max_overflow=MAX_OVERFLOW, pool_timeout=POOL_TIMEOUT, pool_recycle=POOL_RECYCLE, echo=os.getenv("DB_ECHO", "false").lower() == "true", pool_pre_ping=True, server_side_cursors=True, ) # Create session factory AsyncSessionLocal = sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False, autoflush=False, autocommit=False, ) @asynccontextmanager async def get_db_session() -> AsyncGenerator[AsyncSession, None]: """Async context manager for database sessions""" async with AsyncSessionLocal() as session: try: yield session except Exception as e: logger.error(f"Database session error: {e}") await session.rollback() raise finally: await session.close() # Redis connection for caching redis_client = redis.from_url(REDIS_URL, decode_responses=True) async def get_redis(): """Get Redis connection""" return redis_client class DatabaseManager: """Enhanced database manager with connection pooling""" def __init__(self): self.engine = engine self.redis = redis_client async def health_check(self) -> dict: """Check database and cache health""" try: # Database health async with get_db_session() as session: await session.execute("SELECT 1") db_status = "healthy" except Exception as e: db_status = f"unhealthy: {e}" # Redis health try: await self.redis.ping() redis_status = "healthy" except Exception as e: redis_status = f"unhealthy: {e}" return { "database": db_status, "redis": redis_status, "pool_size": POOL_SIZE, "active_connections": self.engine.pool.size() if hasattr(self.engine.pool, "size") else "unknown", } def get_pool_stats(self) -> dict: """Get connection pool statistics""" if hasattr(self.engine.pool, "size"): return { "size": self.engine.pool.size(), "checked_in": self.engine.pool.checkedin(), "checked_out": self.engine.pool.checkedout(), "overflow": self.engine.pool.overflow(), "invalid": self.engine.pool.invalid(), } return {"status": "pool stats not available"} # Global database manager instance db_manager = DatabaseManager()