Spaces:
Paused
Paused
| """ | |
| Database connection pooling and optimization module | |
| """ | |
| import logging | |
| import os | |
| from contextlib import asynccontextmanager | |
| from collections.abc import AsyncGenerator | |
| import redis | |
| from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine | |
| from sqlalchemy.orm import sessionmaker | |
| from sqlalchemy.pool import QueuePool | |
| logger = logging.getLogger(__name__) | |
| # Database configuration | |
| DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:password@database:5432/zenith") | |
| REDIS_URL = os.getenv("REDIS_URL", "redis://cache-service:6379") | |
| # Connection pool configuration | |
| POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "20")) | |
| MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "30")) | |
| POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30")) | |
| POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "3600")) | |
| # Create optimized async engine with connection pooling | |
| engine = create_async_engine( | |
| DATABASE_URL, | |
| poolclass=QueuePool, | |
| pool_size=POOL_SIZE, | |
| max_overflow=MAX_OVERFLOW, | |
| pool_timeout=POOL_TIMEOUT, | |
| pool_recycle=POOL_RECYCLE, | |
| echo=os.getenv("DB_ECHO", "false").lower() == "true", | |
| pool_pre_ping=True, | |
| server_side_cursors=True, | |
| ) | |
| # Create session factory | |
| AsyncSessionLocal = sessionmaker( | |
| bind=engine, | |
| class_=AsyncSession, | |
| expire_on_commit=False, | |
| autoflush=False, | |
| autocommit=False, | |
| ) | |
| async def get_db_session() -> AsyncGenerator[AsyncSession, None]: | |
| """Async context manager for database sessions""" | |
| async with AsyncSessionLocal() as session: | |
| try: | |
| yield session | |
| except Exception as e: | |
| logger.error(f"Database session error: {e}") | |
| await session.rollback() | |
| raise | |
| finally: | |
| await session.close() | |
| # Redis connection for caching | |
| redis_client = redis.from_url(REDIS_URL, decode_responses=True) | |
| async def get_redis(): | |
| """Get Redis connection""" | |
| return redis_client | |
| class DatabaseManager: | |
| """Enhanced database manager with connection pooling""" | |
| def __init__(self): | |
| self.engine = engine | |
| self.redis = redis_client | |
| async def health_check(self) -> dict: | |
| """Check database and cache health""" | |
| try: | |
| # Database health | |
| async with get_db_session() as session: | |
| await session.execute("SELECT 1") | |
| db_status = "healthy" | |
| except Exception as e: | |
| db_status = f"unhealthy: {e}" | |
| # Redis health | |
| try: | |
| await self.redis.ping() | |
| redis_status = "healthy" | |
| except Exception as e: | |
| redis_status = f"unhealthy: {e}" | |
| return { | |
| "database": db_status, | |
| "redis": redis_status, | |
| "pool_size": POOL_SIZE, | |
| "active_connections": self.engine.pool.size() if hasattr(self.engine.pool, "size") else "unknown", | |
| } | |
| def get_pool_stats(self) -> dict: | |
| """Get connection pool statistics""" | |
| if hasattr(self.engine.pool, "size"): | |
| return { | |
| "size": self.engine.pool.size(), | |
| "checked_in": self.engine.pool.checkedin(), | |
| "checked_out": self.engine.pool.checkedout(), | |
| "overflow": self.engine.pool.overflow(), | |
| "invalid": self.engine.pool.invalid(), | |
| } | |
| return {"status": "pool stats not available"} | |
| # Global database manager instance | |
| db_manager = DatabaseManager() | |