cuatrolabs-spa-ms / app /postgres.py
MukeshKapoor25's picture
first commit
ed74f09
"""
PostgreSQL connection pool management.
Provides async connection pool for PostgreSQL operations.
"""
import asyncpg
import ssl
from typing import Optional, Dict, Any
from insightfy_utils.logging import get_logger
from app.core.config import settings
import time
logger = get_logger(__name__)
class PostgreSQLConnectionPool:
"""Singleton class to manage PostgreSQL connection pool"""
_pool: Optional[asyncpg.Pool] = None
# Connection pool metrics
_metrics = {
"connections_acquired": 0,
"connections_released": 0,
"connections_failed": 0,
"health_check_failures": 0,
"acquisition_times": [],
}
@classmethod
async def initialize(cls) -> None:
"""Initialize PostgreSQL connection pool."""
if cls._pool is not None:
logger.warning("PostgreSQL connection pool already initialized")
return
try:
logger.info("Initializing PostgreSQL connection pool", extra={
"host": settings.POSTGRES_HOST,
"port": settings.POSTGRES_PORT,
"database": settings.POSTGRES_DB,
"user": settings.POSTGRES_USER,
"min_pool_size": settings.POSTGRES_MIN_POOL_SIZE,
"max_pool_size": settings.POSTGRES_MAX_POOL_SIZE
})
# Optional SSL context
ssl_context = None
mode = (settings.POSTGRES_SSL_MODE or "disable").lower()
if mode != "disable":
if mode == "verify-full":
ssl_context = ssl.create_default_context(cafile=settings.POSTGRES_SSL_ROOT_CERT) if settings.POSTGRES_SSL_ROOT_CERT else ssl.create_default_context()
if settings.POSTGRES_SSL_CERT and settings.POSTGRES_SSL_KEY:
try:
ssl_context.load_cert_chain(certfile=settings.POSTGRES_SSL_CERT, keyfile=settings.POSTGRES_SSL_KEY)
except Exception as e:
logger.warning("Failed to load client SSL cert/key for PostgreSQL pool", exc_info=e)
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
else:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
logger.info("PostgreSQL pool SSL enabled", extra={"ssl_mode": settings.POSTGRES_SSL_MODE})
# Create connection pool
cls._pool = await asyncpg.create_pool(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
database=settings.POSTGRES_DB,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
min_size=settings.POSTGRES_MIN_POOL_SIZE,
max_size=settings.POSTGRES_MAX_POOL_SIZE,
command_timeout=30.0,
timeout=30.0,
ssl=ssl_context,
)
# Test connection
async with cls._pool.acquire() as conn:
await conn.fetchval("SELECT 1")
logger.info("PostgreSQL connection pool initialized successfully", extra={
"database": settings.POSTGRES_DB,
"pool_size": f"{settings.POSTGRES_MIN_POOL_SIZE}-{settings.POSTGRES_MAX_POOL_SIZE}"
})
except Exception as e:
logger.error("Failed to initialize PostgreSQL connection pool", exc_info=e)
raise
@classmethod
async def get_connection(cls) -> asyncpg.Connection:
"""Acquire a connection from the pool."""
if cls._pool is None:
raise RuntimeError("PostgreSQL connection pool not initialized. Call initialize() first.")
start_time = time.time()
try:
conn = await cls._pool.acquire()
# Health check
try:
await conn.fetchval("SELECT 1")
except Exception as health_check_error:
logger.warning("Connection health check failed, releasing dead connection",
exc_info=health_check_error)
cls._metrics["health_check_failures"] += 1
await cls._pool.release(conn)
conn = await cls._pool.acquire()
await conn.fetchval("SELECT 1")
acquisition_time = (time.time() - start_time) * 1000
cls._metrics["connections_acquired"] += 1
cls._metrics["acquisition_times"].append(acquisition_time)
if len(cls._metrics["acquisition_times"]) > 1000:
cls._metrics["acquisition_times"] = cls._metrics["acquisition_times"][-1000:]
return conn
except Exception as e:
cls._metrics["connections_failed"] += 1
logger.error("Failed to acquire PostgreSQL connection", exc_info=e)
raise
@classmethod
async def release_connection(cls, conn: asyncpg.Connection) -> None:
"""Release a connection back to the pool."""
if cls._pool is None:
raise RuntimeError("PostgreSQL connection pool not initialized")
try:
await cls._pool.release(conn)
cls._metrics["connections_released"] += 1
except Exception as e:
logger.error("Failed to release PostgreSQL connection", exc_info=e)
@classmethod
async def close(cls) -> None:
"""Close all connections in the pool."""
if cls._pool is None:
logger.warning("PostgreSQL connection pool not initialized, nothing to close")
return
try:
logger.info("Closing PostgreSQL connection pool")
await cls._pool.close()
cls._pool = None
logger.info("PostgreSQL connection pool closed successfully")
except Exception as e:
logger.error("Error closing PostgreSQL connection pool", exc_info=e)
cls._pool = None
@classmethod
def is_initialized(cls) -> bool:
"""Check if connection pool is initialized."""
return cls._pool is not None
# Public API
async def connect_to_postgres() -> None:
"""Initialize PostgreSQL connection pool."""
await PostgreSQLConnectionPool.initialize()
async def close_postgres_connection() -> None:
"""Close PostgreSQL connection pool."""
await PostgreSQLConnectionPool.close()
async def get_postgres_connection() -> asyncpg.Connection:
"""Get a connection from the PostgreSQL pool."""
return await PostgreSQLConnectionPool.get_connection()
async def release_postgres_connection(conn: asyncpg.Connection) -> None:
"""Release a connection back to the pool."""
await PostgreSQLConnectionPool.release_connection(conn)
def is_postgres_connected() -> bool:
"""Check if PostgreSQL connection pool is initialized."""
return PostgreSQLConnectionPool.is_initialized()