Spaces:
Configuration error
Configuration error
| """ | |
| PostgreSQL connection pool management for Workforce Microservice. | |
| """ | |
| import asyncpg | |
| import ssl | |
| from typing import Optional, Dict, Any | |
| from app.core.logging import get_logger | |
| from app.core.config import settings | |
| import time | |
| logger = get_logger(__name__) | |
| class PostgreSQLConnectionPool: | |
| _pool: Optional[asyncpg.Pool] = None | |
| _metrics = { | |
| "connections_acquired": 0, | |
| "connections_released": 0, | |
| "connections_failed": 0, | |
| "acquisition_times": [], | |
| } | |
| async def initialize(cls) -> None: | |
| if cls._pool is not None: | |
| return | |
| try: | |
| logger.info("Initializing PostgreSQL connection pool", extra={ | |
| "host": settings.POSTGRES_HOST, | |
| "port": settings.POSTGRES_PORT, | |
| "database": settings.POSTGRES_DB, | |
| }) | |
| ssl_context = None | |
| mode = (settings.POSTGRES_SSL_MODE or "disable").lower() | |
| if mode != "disable": | |
| if mode == "verify-full": | |
| ssl_context = ssl.create_default_context( | |
| cafile=settings.POSTGRES_SSL_ROOT_CERT | |
| ) if settings.POSTGRES_SSL_ROOT_CERT else ssl.create_default_context() | |
| ssl_context.check_hostname = True | |
| ssl_context.verify_mode = ssl.CERT_REQUIRED | |
| else: | |
| ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
| ssl_context.check_hostname = False | |
| ssl_context.verify_mode = ssl.CERT_NONE | |
| cls._pool = await asyncpg.create_pool( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| database=settings.POSTGRES_DB, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD, | |
| min_size=settings.POSTGRES_MIN_POOL_SIZE, | |
| max_size=settings.POSTGRES_MAX_POOL_SIZE, | |
| command_timeout=30.0, | |
| timeout=30.0, | |
| ssl=ssl_context, | |
| ) | |
| async with cls._pool.acquire() as conn: | |
| await conn.fetchval("SELECT 1") | |
| logger.info("PostgreSQL connection pool initialized successfully") | |
| except Exception as e: | |
| logger.error("Failed to initialize PostgreSQL connection pool", exc_info=e) | |
| raise | |
| async def get_connection(cls) -> asyncpg.Connection: | |
| if cls._pool is None: | |
| raise RuntimeError("PostgreSQL connection pool not initialized.") | |
| start = time.time() | |
| try: | |
| conn = await cls._pool.acquire() | |
| await conn.fetchval("SELECT 1") | |
| cls._metrics["connections_acquired"] += 1 | |
| cls._metrics["acquisition_times"].append((time.time() - start) * 1000) | |
| if len(cls._metrics["acquisition_times"]) > 1000: | |
| cls._metrics["acquisition_times"] = cls._metrics["acquisition_times"][-1000:] | |
| return conn | |
| except Exception as e: | |
| cls._metrics["connections_failed"] += 1 | |
| logger.error("Failed to acquire PostgreSQL connection", exc_info=e) | |
| raise | |
| async def release_connection(cls, conn: asyncpg.Connection) -> None: | |
| if cls._pool is None: | |
| return | |
| try: | |
| await cls._pool.release(conn) | |
| cls._metrics["connections_released"] += 1 | |
| except Exception as e: | |
| logger.error("Failed to release PostgreSQL connection", exc_info=e) | |
| async def close(cls) -> None: | |
| if cls._pool is None: | |
| return | |
| try: | |
| await cls._pool.close() | |
| cls._pool = None | |
| logger.info("PostgreSQL connection pool closed") | |
| except Exception as e: | |
| logger.error("Error closing PostgreSQL connection pool", exc_info=e) | |
| cls._pool = None | |
| def is_initialized(cls) -> bool: | |
| return cls._pool is not None | |
| async def connect_to_postgres() -> None: | |
| await PostgreSQLConnectionPool.initialize() | |
| async def close_postgres_connection() -> None: | |
| await PostgreSQLConnectionPool.close() | |
| async def get_postgres_connection() -> asyncpg.Connection: | |
| return await PostgreSQLConnectionPool.get_connection() | |
| async def release_postgres_connection(conn: asyncpg.Connection) -> None: | |
| await PostgreSQLConnectionPool.release_connection(conn) | |
| def is_postgres_connected() -> bool: | |
| return PostgreSQLConnectionPool.is_initialized() | |