""" PostgreSQL connection pool management for Analytics Microservice. Adapted from cuatrolabs-scm-ms/app/postgres.py. """ import ssl import asyncpg from typing import Optional from app.core.logging import get_logger from app.core.config import settings logger = get_logger(__name__) class PostgreSQLConnectionPool: _pool: Optional[asyncpg.Pool] = None @classmethod async def initialize(cls) -> None: if cls._pool is not None: logger.warning("PostgreSQL pool already initialized") return try: ssl_context = None mode = (settings.POSTGRES_SSL_MODE or "disable").lower() if mode != "disable": ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE cls._pool = await asyncpg.create_pool( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, database=settings.POSTGRES_DB, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, min_size=settings.POSTGRES_MIN_POOL_SIZE, max_size=settings.POSTGRES_MAX_POOL_SIZE, command_timeout=30.0, timeout=30.0, ssl=ssl_context, ) async with cls._pool.acquire() as conn: await conn.fetchval("SELECT 1") logger.info("PostgreSQL pool initialized", extra={ "event": "postgres_connected", "host": settings.POSTGRES_HOST, "db": settings.POSTGRES_DB, }) except Exception as e: logger.error("Failed to initialize PostgreSQL pool", exc_info=e) raise @classmethod async def close(cls) -> None: if cls._pool: await cls._pool.close() cls._pool = None logger.info("PostgreSQL pool closed", extra={"event": "postgres_disconnected"}) @classmethod def get_pool(cls) -> asyncpg.Pool: if cls._pool is None: raise RuntimeError("PostgreSQL pool not initialized. Call connect_to_postgres() first.") return cls._pool async def connect_to_postgres() -> None: await PostgreSQLConnectionPool.initialize() async def close_postgres_connection() -> None: await PostgreSQLConnectionPool.close() def get_postgres_pool() -> asyncpg.Pool: return PostgreSQLConnectionPool.get_pool()