Spaces:
Running
Running
| """ | |
| PostgreSQL connection pool management. | |
| Provides async connection pool for PostgreSQL operations. | |
| """ | |
| import asyncpg | |
| import ssl | |
| from typing import Optional, Dict, Any | |
| from app.core.logging import get_logger | |
| from app.core.config import settings | |
| from collections import defaultdict | |
| import time | |
| logger = get_logger(__name__) | |
| class PostgreSQLConnectionPool: | |
| """Singleton class to manage PostgreSQL connection pool""" | |
| _pool: Optional[asyncpg.Pool] = None | |
| # Connection pool metrics | |
| _metrics = { | |
| "connections_acquired": 0, | |
| "connections_released": 0, | |
| "connections_failed": 0, | |
| "health_check_failures": 0, | |
| "acquisition_times": [], # Track acquisition times for performance monitoring | |
| } | |
| async def initialize(cls) -> None: | |
| """ | |
| Initialize PostgreSQL connection pool. | |
| Called during application startup. | |
| Creates a connection pool with configurable min/max connections. | |
| Tests connectivity by acquiring and releasing a connection. | |
| Raises: | |
| Exception: If connection pool initialization fails | |
| """ | |
| if cls._pool is not None: | |
| logger.warning("PostgreSQL connection pool already initialized") | |
| return | |
| try: | |
| logger.info("Initializing PostgreSQL connection pool", extra={ | |
| "host": settings.POSTGRES_HOST, | |
| "port": settings.POSTGRES_PORT, | |
| "database": settings.POSTGRES_DB, | |
| "user": settings.POSTGRES_USER, | |
| "min_pool_size": settings.POSTGRES_MIN_POOL_SIZE, | |
| "max_pool_size": settings.POSTGRES_MAX_POOL_SIZE | |
| }) | |
| # Optional SSL context | |
| ssl_context = None | |
| mode = (settings.POSTGRES_SSL_MODE or "disable").lower() | |
| if mode != "disable": | |
| if mode == "verify-full": | |
| ssl_context = ssl.create_default_context(cafile=settings.POSTGRES_SSL_ROOT_CERT) if settings.POSTGRES_SSL_ROOT_CERT else ssl.create_default_context() | |
| if settings.POSTGRES_SSL_CERT and settings.POSTGRES_SSL_KEY: | |
| try: | |
| ssl_context.load_cert_chain(certfile=settings.POSTGRES_SSL_CERT, keyfile=settings.POSTGRES_SSL_KEY) | |
| except Exception as e: | |
| logger.warning("Failed to load client SSL cert/key for PostgreSQL pool", exc_info=e) | |
| ssl_context.check_hostname = True | |
| ssl_context.verify_mode = ssl.CERT_REQUIRED | |
| else: | |
| ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
| ssl_context.check_hostname = False | |
| ssl_context.verify_mode = ssl.CERT_NONE | |
| logger.info("PostgreSQL pool SSL enabled", extra={"ssl_mode": settings.POSTGRES_SSL_MODE}) | |
| # Create connection pool | |
| cls._pool = await asyncpg.create_pool( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| database=settings.POSTGRES_DB, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD, | |
| min_size=settings.POSTGRES_MIN_POOL_SIZE, | |
| max_size=settings.POSTGRES_MAX_POOL_SIZE, | |
| command_timeout=30.0, | |
| timeout=30.0, | |
| ssl=ssl_context, | |
| ) | |
| # Test connection by acquiring and releasing | |
| async with cls._pool.acquire() as conn: | |
| await conn.fetchval("SELECT 1") | |
| logger.info("PostgreSQL connection pool initialized successfully", extra={ | |
| "database": settings.POSTGRES_DB, | |
| "pool_size": f"{settings.POSTGRES_MIN_POOL_SIZE}-{settings.POSTGRES_MAX_POOL_SIZE}" | |
| }) | |
| except Exception as e: | |
| logger.error("Failed to initialize PostgreSQL connection pool", exc_info=e, extra={ | |
| "host": settings.POSTGRES_HOST, | |
| "port": settings.POSTGRES_PORT, | |
| "database": settings.POSTGRES_DB | |
| }) | |
| raise | |
| async def get_connection(cls) -> asyncpg.Connection: | |
| """ | |
| Acquire a connection from the pool. | |
| Returns: | |
| asyncpg.Connection: Database connection from pool | |
| Raises: | |
| RuntimeError: If connection pool is not initialized | |
| Exception: If connection acquisition fails | |
| """ | |
| if cls._pool is None: | |
| raise RuntimeError("PostgreSQL connection pool not initialized. Call initialize() first.") | |
| start_time = time.time() | |
| try: | |
| conn = await cls._pool.acquire() | |
| # Health check: verify connection is alive | |
| try: | |
| await conn.fetchval("SELECT 1") | |
| except Exception as health_check_error: | |
| # Connection is dead, release it and try to get a new one | |
| logger.warning("Connection health check failed, releasing dead connection", | |
| exc_info=health_check_error) | |
| cls._metrics["health_check_failures"] += 1 | |
| await cls._pool.release(conn) | |
| # Try to get a fresh connection | |
| conn = await cls._pool.acquire() | |
| await conn.fetchval("SELECT 1") | |
| # Track successful acquisition | |
| acquisition_time = (time.time() - start_time) * 1000 # Convert to ms | |
| cls._metrics["connections_acquired"] += 1 | |
| cls._metrics["acquisition_times"].append(acquisition_time) | |
| # Keep only last 1000 acquisition times to prevent memory growth | |
| if len(cls._metrics["acquisition_times"]) > 1000: | |
| cls._metrics["acquisition_times"] = cls._metrics["acquisition_times"][-1000:] | |
| logger.debug( | |
| "PostgreSQL connection acquired", | |
| extra={ | |
| "acquisition_time_ms": acquisition_time, | |
| "pool_size": cls._pool.get_size() if cls._pool else 0, | |
| "pool_free": cls._pool.get_idle_size() if cls._pool else 0 | |
| } | |
| ) | |
| return conn | |
| except Exception as e: | |
| cls._metrics["connections_failed"] += 1 | |
| logger.error("Failed to acquire PostgreSQL connection", exc_info=e) | |
| raise | |
| async def release_connection(cls, conn: asyncpg.Connection) -> None: | |
| """ | |
| Release a connection back to the pool. | |
| Args: | |
| conn: Connection to release | |
| Raises: | |
| RuntimeError: If connection pool is not initialized | |
| """ | |
| if cls._pool is None: | |
| raise RuntimeError("PostgreSQL connection pool not initialized") | |
| try: | |
| await cls._pool.release(conn) | |
| cls._metrics["connections_released"] += 1 | |
| logger.debug( | |
| "PostgreSQL connection released", | |
| extra={ | |
| "pool_size": cls._pool.get_size() if cls._pool else 0, | |
| "pool_free": cls._pool.get_idle_size() if cls._pool else 0 | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error("Failed to release PostgreSQL connection", exc_info=e) | |
| # Don't raise - we don't want to break the application if release fails | |
| async def close(cls) -> None: | |
| """ | |
| Close all connections in the pool. | |
| Called during application shutdown. | |
| Gracefully closes all connections and cleans up resources. | |
| """ | |
| if cls._pool is None: | |
| logger.warning("PostgreSQL connection pool not initialized, nothing to close") | |
| return | |
| try: | |
| logger.info("Closing PostgreSQL connection pool") | |
| await cls._pool.close() | |
| cls._pool = None | |
| logger.info("PostgreSQL connection pool closed successfully") | |
| except Exception as e: | |
| logger.error("Error closing PostgreSQL connection pool", exc_info=e) | |
| # Set pool to None anyway to allow re-initialization | |
| cls._pool = None | |
| def is_initialized(cls) -> bool: | |
| """ | |
| Check if connection pool is initialized. | |
| Returns: | |
| bool: True if pool is initialized, False otherwise | |
| """ | |
| return cls._pool is not None | |
| def get_pool_metrics(cls) -> Dict[str, Any]: | |
| """ | |
| Get connection pool metrics. | |
| Returns: | |
| Dictionary containing pool metrics: | |
| - connections_acquired: Total connections acquired | |
| - connections_released: Total connections released | |
| - connections_failed: Total failed connection attempts | |
| - health_check_failures: Total health check failures | |
| - avg_acquisition_time_ms: Average connection acquisition time | |
| - pool_size: Current pool size | |
| - pool_free: Number of idle connections | |
| - pool_used: Number of connections in use | |
| """ | |
| metrics = { | |
| "connections_acquired": cls._metrics["connections_acquired"], | |
| "connections_released": cls._metrics["connections_released"], | |
| "connections_failed": cls._metrics["connections_failed"], | |
| "health_check_failures": cls._metrics["health_check_failures"], | |
| } | |
| # Calculate average acquisition time | |
| acquisition_times = cls._metrics["acquisition_times"] | |
| if acquisition_times: | |
| metrics["avg_acquisition_time_ms"] = sum(acquisition_times) / len(acquisition_times) | |
| metrics["max_acquisition_time_ms"] = max(acquisition_times) | |
| metrics["min_acquisition_time_ms"] = min(acquisition_times) | |
| else: | |
| metrics["avg_acquisition_time_ms"] = 0.0 | |
| metrics["max_acquisition_time_ms"] = 0.0 | |
| metrics["min_acquisition_time_ms"] = 0.0 | |
| # Get pool status | |
| if cls._pool: | |
| metrics["pool_size"] = cls._pool.get_size() | |
| metrics["pool_free"] = cls._pool.get_idle_size() | |
| metrics["pool_used"] = cls._pool.get_size() - cls._pool.get_idle_size() | |
| else: | |
| metrics["pool_size"] = 0 | |
| metrics["pool_free"] = 0 | |
| metrics["pool_used"] = 0 | |
| return metrics | |
| def reset_metrics(cls) -> None: | |
| """ | |
| Reset connection pool metrics. | |
| Useful for testing or periodic metric resets. | |
| """ | |
| cls._metrics = { | |
| "connections_acquired": 0, | |
| "connections_released": 0, | |
| "connections_failed": 0, | |
| "health_check_failures": 0, | |
| "acquisition_times": [], | |
| } | |
| # Public API | |
| async def connect_to_postgres() -> None: | |
| """ | |
| Initialize PostgreSQL connection pool. | |
| Should be called during application startup. | |
| """ | |
| await PostgreSQLConnectionPool.initialize() | |
| async def close_postgres_connection() -> None: | |
| """ | |
| Close PostgreSQL connection pool. | |
| Should be called during application shutdown. | |
| """ | |
| await PostgreSQLConnectionPool.close() | |
| async def get_postgres_connection() -> asyncpg.Connection: | |
| """ | |
| Get a connection from the PostgreSQL pool. | |
| Returns: | |
| asyncpg.Connection: Database connection | |
| Usage: | |
| conn = await get_postgres_connection() | |
| try: | |
| # Use connection | |
| result = await conn.fetch("SELECT * FROM table") | |
| finally: | |
| await release_postgres_connection(conn) | |
| """ | |
| return await PostgreSQLConnectionPool.get_connection() | |
| async def release_postgres_connection(conn: asyncpg.Connection) -> None: | |
| """ | |
| Release a connection back to the pool. | |
| Args: | |
| conn: Connection to release | |
| """ | |
| await PostgreSQLConnectionPool.release_connection(conn) | |
| def is_postgres_connected() -> bool: | |
| """ | |
| Check if PostgreSQL connection pool is initialized. | |
| Returns: | |
| bool: True if connected, False otherwise | |
| """ | |
| return PostgreSQLConnectionPool.is_initialized() | |
| def get_connection_pool_metrics() -> Dict[str, Any]: | |
| """ | |
| Get connection pool metrics. | |
| Returns: | |
| Dictionary containing pool metrics | |
| """ | |
| return PostgreSQLConnectionPool.get_pool_metrics() | |
| def reset_connection_pool_metrics() -> None: | |
| """ | |
| Reset connection pool metrics. | |
| """ | |
| PostgreSQLConnectionPool.reset_metrics() | |