""" PostgreSQL connection pool management for Workforce Microservice. """ import asyncpg import ssl from typing import Optional, Dict, Any from app.core.logging import get_logger from app.core.config import settings import time logger = get_logger(__name__) class PostgreSQLConnectionPool: _pool: Optional[asyncpg.Pool] = None _metrics = { "connections_acquired": 0, "connections_released": 0, "connections_failed": 0, "acquisition_times": [], } @classmethod async def initialize(cls) -> None: if cls._pool is not None: return try: logger.info("Initializing PostgreSQL connection pool", extra={ "host": settings.POSTGRES_HOST, "port": settings.POSTGRES_PORT, "database": settings.POSTGRES_DB, }) ssl_context = None mode = (settings.POSTGRES_SSL_MODE or "disable").lower() if mode != "disable": if mode == "verify-full": ssl_context = ssl.create_default_context( cafile=settings.POSTGRES_SSL_ROOT_CERT ) if settings.POSTGRES_SSL_ROOT_CERT else ssl.create_default_context() ssl_context.check_hostname = True ssl_context.verify_mode = ssl.CERT_REQUIRED else: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE cls._pool = await asyncpg.create_pool( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, database=settings.POSTGRES_DB, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, min_size=settings.POSTGRES_MIN_POOL_SIZE, max_size=settings.POSTGRES_MAX_POOL_SIZE, command_timeout=30.0, timeout=30.0, ssl=ssl_context, ) async with cls._pool.acquire() as conn: await conn.fetchval("SELECT 1") logger.info("PostgreSQL connection pool initialized successfully") except Exception as e: logger.error("Failed to initialize PostgreSQL connection pool", exc_info=e) raise @classmethod async def get_connection(cls) -> asyncpg.Connection: if cls._pool is None: raise RuntimeError("PostgreSQL connection pool not initialized.") start = time.time() try: conn = await cls._pool.acquire() await conn.fetchval("SELECT 1") cls._metrics["connections_acquired"] += 1 cls._metrics["acquisition_times"].append((time.time() - start) * 1000) if len(cls._metrics["acquisition_times"]) > 1000: cls._metrics["acquisition_times"] = cls._metrics["acquisition_times"][-1000:] return conn except Exception as e: cls._metrics["connections_failed"] += 1 logger.error("Failed to acquire PostgreSQL connection", exc_info=e) raise @classmethod async def release_connection(cls, conn: asyncpg.Connection) -> None: if cls._pool is None: return try: await cls._pool.release(conn) cls._metrics["connections_released"] += 1 except Exception as e: logger.error("Failed to release PostgreSQL connection", exc_info=e) @classmethod async def close(cls) -> None: if cls._pool is None: return try: await cls._pool.close() cls._pool = None logger.info("PostgreSQL connection pool closed") except Exception as e: logger.error("Error closing PostgreSQL connection pool", exc_info=e) cls._pool = None @classmethod def is_initialized(cls) -> bool: return cls._pool is not None async def connect_to_postgres() -> None: await PostgreSQLConnectionPool.initialize() async def close_postgres_connection() -> None: await PostgreSQLConnectionPool.close() async def get_postgres_connection() -> asyncpg.Connection: return await PostgreSQLConnectionPool.get_connection() async def release_postgres_connection(conn: asyncpg.Connection) -> None: await PostgreSQLConnectionPool.release_connection(conn) def is_postgres_connected() -> bool: return PostgreSQLConnectionPool.is_initialized()