""" PostgreSQL connection pool management. Provides async connection pool for PostgreSQL operations. """ import asyncpg import ssl from typing import Optional, Dict, Any from insightfy_utils.logging import get_logger from app.core.config import settings from collections import defaultdict import time logger = get_logger(__name__) class PostgreSQLConnectionPool: """Singleton class to manage PostgreSQL connection pool""" _pool: Optional[asyncpg.Pool] = None # Connection pool metrics _metrics = { "connections_acquired": 0, "connections_released": 0, "connections_failed": 0, "health_check_failures": 0, "acquisition_times": [], # Track acquisition times for performance monitoring } @classmethod async def initialize(cls) -> None: """ Initialize PostgreSQL connection pool. Called during application startup. Creates a connection pool with configurable min/max connections. Tests connectivity by acquiring and releasing a connection. Raises: Exception: If connection pool initialization fails """ if cls._pool is not None: logger.warning("PostgreSQL connection pool already initialized") return try: logger.info("Initializing PostgreSQL connection pool", extra={ "host": settings.POSTGRES_HOST, "port": settings.POSTGRES_PORT, "database": settings.POSTGRES_DB, "user": settings.POSTGRES_USER, "min_pool_size": settings.POSTGRES_MIN_POOL_SIZE, "max_pool_size": settings.POSTGRES_MAX_POOL_SIZE }) # Optional SSL context ssl_context = None mode = (settings.POSTGRES_SSL_MODE or "disable").lower() if mode != "disable": if mode == "verify-full": ssl_context = ssl.create_default_context(cafile=settings.POSTGRES_SSL_ROOT_CERT) if settings.POSTGRES_SSL_ROOT_CERT else ssl.create_default_context() if settings.POSTGRES_SSL_CERT and settings.POSTGRES_SSL_KEY: try: ssl_context.load_cert_chain(certfile=settings.POSTGRES_SSL_CERT, keyfile=settings.POSTGRES_SSL_KEY) except Exception as e: logger.warning("Failed to load client SSL cert/key for PostgreSQL pool", exc_info=e) ssl_context.check_hostname = True ssl_context.verify_mode = ssl.CERT_REQUIRED else: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE logger.info("PostgreSQL pool SSL enabled", extra={"ssl_mode": settings.POSTGRES_SSL_MODE}) # Create connection pool cls._pool = await asyncpg.create_pool( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, database=settings.POSTGRES_DB, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, min_size=settings.POSTGRES_MIN_POOL_SIZE, max_size=settings.POSTGRES_MAX_POOL_SIZE, command_timeout=30.0, timeout=30.0, ssl=ssl_context, ) # Test connection by acquiring and releasing async with cls._pool.acquire() as conn: await conn.fetchval("SELECT 1") logger.info("PostgreSQL connection pool initialized successfully", extra={ "database": settings.POSTGRES_DB, "pool_size": f"{settings.POSTGRES_MIN_POOL_SIZE}-{settings.POSTGRES_MAX_POOL_SIZE}" }) except Exception as e: logger.error("Failed to initialize PostgreSQL connection pool", exc_info=e, extra={ "host": settings.POSTGRES_HOST, "port": settings.POSTGRES_PORT, "database": settings.POSTGRES_DB }) raise @classmethod async def get_connection(cls) -> asyncpg.Connection: """ Acquire a connection from the pool. Returns: asyncpg.Connection: Database connection from pool Raises: RuntimeError: If connection pool is not initialized Exception: If connection acquisition fails """ if cls._pool is None: raise RuntimeError("PostgreSQL connection pool not initialized. Call initialize() first.") start_time = time.time() try: conn = await cls._pool.acquire() # Health check: verify connection is alive try: await conn.fetchval("SELECT 1") except Exception as health_check_error: # Connection is dead, release it and try to get a new one logger.warning("Connection health check failed, releasing dead connection", exc_info=health_check_error) cls._metrics["health_check_failures"] += 1 await cls._pool.release(conn) # Try to get a fresh connection conn = await cls._pool.acquire() await conn.fetchval("SELECT 1") # Track successful acquisition acquisition_time = (time.time() - start_time) * 1000 # Convert to ms cls._metrics["connections_acquired"] += 1 cls._metrics["acquisition_times"].append(acquisition_time) # Keep only last 1000 acquisition times to prevent memory growth if len(cls._metrics["acquisition_times"]) > 1000: cls._metrics["acquisition_times"] = cls._metrics["acquisition_times"][-1000:] logger.debug( "PostgreSQL connection acquired", extra={ "acquisition_time_ms": acquisition_time, "pool_size": cls._pool.get_size() if cls._pool else 0, "pool_free": cls._pool.get_idle_size() if cls._pool else 0 } ) return conn except Exception as e: cls._metrics["connections_failed"] += 1 logger.error("Failed to acquire PostgreSQL connection", exc_info=e) raise @classmethod async def release_connection(cls, conn: asyncpg.Connection) -> None: """ Release a connection back to the pool. Args: conn: Connection to release Raises: RuntimeError: If connection pool is not initialized """ if cls._pool is None: raise RuntimeError("PostgreSQL connection pool not initialized") try: await cls._pool.release(conn) cls._metrics["connections_released"] += 1 logger.debug( "PostgreSQL connection released", extra={ "pool_size": cls._pool.get_size() if cls._pool else 0, "pool_free": cls._pool.get_idle_size() if cls._pool else 0 } ) except Exception as e: logger.error("Failed to release PostgreSQL connection", exc_info=e) # Don't raise - we don't want to break the application if release fails @classmethod async def close(cls) -> None: """ Close all connections in the pool. Called during application shutdown. Gracefully closes all connections and cleans up resources. """ if cls._pool is None: logger.warning("PostgreSQL connection pool not initialized, nothing to close") return try: logger.info("Closing PostgreSQL connection pool") await cls._pool.close() cls._pool = None logger.info("PostgreSQL connection pool closed successfully") except Exception as e: logger.error("Error closing PostgreSQL connection pool", exc_info=e) # Set pool to None anyway to allow re-initialization cls._pool = None @classmethod def is_initialized(cls) -> bool: """ Check if connection pool is initialized. Returns: bool: True if pool is initialized, False otherwise """ return cls._pool is not None @classmethod def get_pool_metrics(cls) -> Dict[str, Any]: """ Get connection pool metrics. Returns: Dictionary containing pool metrics """ metrics = { "connections_acquired": cls._metrics["connections_acquired"], "connections_released": cls._metrics["connections_released"], "connections_failed": cls._metrics["connections_failed"], "health_check_failures": cls._metrics["health_check_failures"], } # Calculate average acquisition time acquisition_times = cls._metrics["acquisition_times"] if acquisition_times: metrics["avg_acquisition_time_ms"] = sum(acquisition_times) / len(acquisition_times) metrics["max_acquisition_time_ms"] = max(acquisition_times) metrics["min_acquisition_time_ms"] = min(acquisition_times) else: metrics["avg_acquisition_time_ms"] = 0.0 metrics["max_acquisition_time_ms"] = 0.0 metrics["min_acquisition_time_ms"] = 0.0 # Get pool status if cls._pool: metrics["pool_size"] = cls._pool.get_size() metrics["pool_free"] = cls._pool.get_idle_size() metrics["pool_used"] = cls._pool.get_size() - cls._pool.get_idle_size() else: metrics["pool_size"] = 0 metrics["pool_free"] = 0 metrics["pool_used"] = 0 return metrics @classmethod def reset_metrics(cls) -> None: """ Reset connection pool metrics. Useful for testing or periodic metric resets. """ cls._metrics = { "connections_acquired": 0, "connections_released": 0, "connections_failed": 0, "health_check_failures": 0, "acquisition_times": [], } # Public API async def connect_to_postgres() -> None: """ Initialize PostgreSQL connection pool. Should be called during application startup. """ await PostgreSQLConnectionPool.initialize() async def close_postgres_connection() -> None: """ Close PostgreSQL connection pool. Should be called during application shutdown. """ await PostgreSQLConnectionPool.close() async def get_postgres_connection() -> asyncpg.Connection: """ Get a connection from the PostgreSQL pool. Returns: asyncpg.Connection: Database connection Usage: conn = await get_postgres_connection() try: # Use connection result = await conn.fetch("SELECT * FROM table") finally: await release_postgres_connection(conn) """ return await PostgreSQLConnectionPool.get_connection() async def release_postgres_connection(conn: asyncpg.Connection) -> None: """ Release a connection back to the pool. Args: conn: Connection to release """ await PostgreSQLConnectionPool.release_connection(conn) def is_postgres_connected() -> bool: """ Check if PostgreSQL connection pool is initialized. Returns: bool: True if connected, False otherwise """ return PostgreSQLConnectionPool.is_initialized() def get_connection_pool_metrics() -> Dict[str, Any]: """ Get connection pool metrics. Returns: Dictionary containing pool metrics """ return PostgreSQLConnectionPool.get_pool_metrics() def reset_connection_pool_metrics() -> None: """ Reset connection pool metrics. """ PostgreSQLConnectionPool.reset_metrics()