Spaces:
Configuration error
Configuration error
File size: 4,587 Bytes
7705125 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | """
PostgreSQL connection pool management for Workforce Microservice.
"""
import asyncpg
import ssl
from typing import Optional, Dict, Any
from app.core.logging import get_logger
from app.core.config import settings
import time
logger = get_logger(__name__)
class PostgreSQLConnectionPool:
_pool: Optional[asyncpg.Pool] = None
_metrics = {
"connections_acquired": 0,
"connections_released": 0,
"connections_failed": 0,
"acquisition_times": [],
}
@classmethod
async def initialize(cls) -> None:
if cls._pool is not None:
return
try:
logger.info("Initializing PostgreSQL connection pool", extra={
"host": settings.POSTGRES_HOST,
"port": settings.POSTGRES_PORT,
"database": settings.POSTGRES_DB,
})
ssl_context = None
mode = (settings.POSTGRES_SSL_MODE or "disable").lower()
if mode != "disable":
if mode == "verify-full":
ssl_context = ssl.create_default_context(
cafile=settings.POSTGRES_SSL_ROOT_CERT
) if settings.POSTGRES_SSL_ROOT_CERT else ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
else:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
cls._pool = await asyncpg.create_pool(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
database=settings.POSTGRES_DB,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
min_size=settings.POSTGRES_MIN_POOL_SIZE,
max_size=settings.POSTGRES_MAX_POOL_SIZE,
command_timeout=30.0,
timeout=30.0,
ssl=ssl_context,
)
async with cls._pool.acquire() as conn:
await conn.fetchval("SELECT 1")
logger.info("PostgreSQL connection pool initialized successfully")
except Exception as e:
logger.error("Failed to initialize PostgreSQL connection pool", exc_info=e)
raise
@classmethod
async def get_connection(cls) -> asyncpg.Connection:
if cls._pool is None:
raise RuntimeError("PostgreSQL connection pool not initialized.")
start = time.time()
try:
conn = await cls._pool.acquire()
await conn.fetchval("SELECT 1")
cls._metrics["connections_acquired"] += 1
cls._metrics["acquisition_times"].append((time.time() - start) * 1000)
if len(cls._metrics["acquisition_times"]) > 1000:
cls._metrics["acquisition_times"] = cls._metrics["acquisition_times"][-1000:]
return conn
except Exception as e:
cls._metrics["connections_failed"] += 1
logger.error("Failed to acquire PostgreSQL connection", exc_info=e)
raise
@classmethod
async def release_connection(cls, conn: asyncpg.Connection) -> None:
if cls._pool is None:
return
try:
await cls._pool.release(conn)
cls._metrics["connections_released"] += 1
except Exception as e:
logger.error("Failed to release PostgreSQL connection", exc_info=e)
@classmethod
async def close(cls) -> None:
if cls._pool is None:
return
try:
await cls._pool.close()
cls._pool = None
logger.info("PostgreSQL connection pool closed")
except Exception as e:
logger.error("Error closing PostgreSQL connection pool", exc_info=e)
cls._pool = None
@classmethod
def is_initialized(cls) -> bool:
return cls._pool is not None
async def connect_to_postgres() -> None:
await PostgreSQLConnectionPool.initialize()
async def close_postgres_connection() -> None:
await PostgreSQLConnectionPool.close()
async def get_postgres_connection() -> asyncpg.Connection:
return await PostgreSQLConnectionPool.get_connection()
async def release_postgres_connection(conn: asyncpg.Connection) -> None:
await PostgreSQLConnectionPool.release_connection(conn)
def is_postgres_connected() -> bool:
return PostgreSQLConnectionPool.is_initialized()
|