zenith-backend / app /core /database.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Database connection pooling and optimization module
"""
import logging
import os
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
import redis
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
logger = logging.getLogger(__name__)
# Database configuration
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:password@database:5432/zenith")
REDIS_URL = os.getenv("REDIS_URL", "redis://cache-service:6379")
# Connection pool configuration
POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "20"))
MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "30"))
POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30"))
POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "3600"))
# Create optimized async engine with connection pooling
engine = create_async_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=POOL_SIZE,
max_overflow=MAX_OVERFLOW,
pool_timeout=POOL_TIMEOUT,
pool_recycle=POOL_RECYCLE,
echo=os.getenv("DB_ECHO", "false").lower() == "true",
pool_pre_ping=True,
server_side_cursors=True,
)
# Create session factory
AsyncSessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
@asynccontextmanager
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
"""Async context manager for database sessions"""
async with AsyncSessionLocal() as session:
try:
yield session
except Exception as e:
logger.error(f"Database session error: {e}")
await session.rollback()
raise
finally:
await session.close()
# Redis connection for caching
redis_client = redis.from_url(REDIS_URL, decode_responses=True)
async def get_redis():
"""Get Redis connection"""
return redis_client
class DatabaseManager:
"""Enhanced database manager with connection pooling"""
def __init__(self):
self.engine = engine
self.redis = redis_client
async def health_check(self) -> dict:
"""Check database and cache health"""
try:
# Database health
async with get_db_session() as session:
await session.execute("SELECT 1")
db_status = "healthy"
except Exception as e:
db_status = f"unhealthy: {e}"
# Redis health
try:
await self.redis.ping()
redis_status = "healthy"
except Exception as e:
redis_status = f"unhealthy: {e}"
return {
"database": db_status,
"redis": redis_status,
"pool_size": POOL_SIZE,
"active_connections": self.engine.pool.size() if hasattr(self.engine.pool, "size") else "unknown",
}
def get_pool_stats(self) -> dict:
"""Get connection pool statistics"""
if hasattr(self.engine.pool, "size"):
return {
"size": self.engine.pool.size(),
"checked_in": self.engine.pool.checkedin(),
"checked_out": self.engine.pool.checkedout(),
"overflow": self.engine.pool.overflow(),
"invalid": self.engine.pool.invalid(),
}
return {"status": "pool stats not available"}
# Global database manager instance
db_manager = DatabaseManager()