cuatrolabs-ecomm-ms / app /postgres.py
MukeshKapoor25's picture
feat(cuatrolabs-ecomm-ms): Initialize e-commerce microservice project
7da1497
"""
PostgreSQL connection pool management.
Provides async connection pool for PostgreSQL operations.
"""
import asyncpg
import ssl
from typing import Optional, Dict, Any
from insightfy_utils.logging import get_logger
from app.core.config import settings
from collections import defaultdict
import time
logger = get_logger(__name__)
class PostgreSQLConnectionPool:
"""Singleton class to manage PostgreSQL connection pool"""
_pool: Optional[asyncpg.Pool] = None
# Connection pool metrics
_metrics = {
"connections_acquired": 0,
"connections_released": 0,
"connections_failed": 0,
"health_check_failures": 0,
"acquisition_times": [], # Track acquisition times for performance monitoring
}
@classmethod
async def initialize(cls) -> None:
"""
Initialize PostgreSQL connection pool.
Called during application startup.
Creates a connection pool with configurable min/max connections.
Tests connectivity by acquiring and releasing a connection.
Raises:
Exception: If connection pool initialization fails
"""
if cls._pool is not None:
logger.warning("PostgreSQL connection pool already initialized")
return
try:
logger.info("Initializing PostgreSQL connection pool", extra={
"host": settings.POSTGRES_HOST,
"port": settings.POSTGRES_PORT,
"database": settings.POSTGRES_DB,
"user": settings.POSTGRES_USER,
"min_pool_size": settings.POSTGRES_MIN_POOL_SIZE,
"max_pool_size": settings.POSTGRES_MAX_POOL_SIZE
})
# Optional SSL context
ssl_context = None
mode = (settings.POSTGRES_SSL_MODE or "disable").lower()
if mode != "disable":
if mode == "verify-full":
ssl_context = ssl.create_default_context(cafile=settings.POSTGRES_SSL_ROOT_CERT) if settings.POSTGRES_SSL_ROOT_CERT else ssl.create_default_context()
if settings.POSTGRES_SSL_CERT and settings.POSTGRES_SSL_KEY:
try:
ssl_context.load_cert_chain(certfile=settings.POSTGRES_SSL_CERT, keyfile=settings.POSTGRES_SSL_KEY)
except Exception as e:
logger.warning("Failed to load client SSL cert/key for PostgreSQL pool", exc_info=e)
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
else:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
logger.info("PostgreSQL pool SSL enabled", extra={"ssl_mode": settings.POSTGRES_SSL_MODE})
# Create connection pool
cls._pool = await asyncpg.create_pool(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
database=settings.POSTGRES_DB,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
min_size=settings.POSTGRES_MIN_POOL_SIZE,
max_size=settings.POSTGRES_MAX_POOL_SIZE,
command_timeout=30.0,
timeout=30.0,
ssl=ssl_context,
)
# Test connection by acquiring and releasing
async with cls._pool.acquire() as conn:
await conn.fetchval("SELECT 1")
logger.info("PostgreSQL connection pool initialized successfully", extra={
"database": settings.POSTGRES_DB,
"pool_size": f"{settings.POSTGRES_MIN_POOL_SIZE}-{settings.POSTGRES_MAX_POOL_SIZE}"
})
except Exception as e:
logger.error("Failed to initialize PostgreSQL connection pool", exc_info=e, extra={
"host": settings.POSTGRES_HOST,
"port": settings.POSTGRES_PORT,
"database": settings.POSTGRES_DB
})
raise
@classmethod
async def get_connection(cls) -> asyncpg.Connection:
"""
Acquire a connection from the pool.
Returns:
asyncpg.Connection: Database connection from pool
Raises:
RuntimeError: If connection pool is not initialized
Exception: If connection acquisition fails
"""
if cls._pool is None:
raise RuntimeError("PostgreSQL connection pool not initialized. Call initialize() first.")
start_time = time.time()
try:
conn = await cls._pool.acquire()
# Health check: verify connection is alive
try:
await conn.fetchval("SELECT 1")
except Exception as health_check_error:
# Connection is dead, release it and try to get a new one
logger.warning("Connection health check failed, releasing dead connection",
exc_info=health_check_error)
cls._metrics["health_check_failures"] += 1
await cls._pool.release(conn)
# Try to get a fresh connection
conn = await cls._pool.acquire()
await conn.fetchval("SELECT 1")
# Track successful acquisition
acquisition_time = (time.time() - start_time) * 1000 # Convert to ms
cls._metrics["connections_acquired"] += 1
cls._metrics["acquisition_times"].append(acquisition_time)
# Keep only last 1000 acquisition times to prevent memory growth
if len(cls._metrics["acquisition_times"]) > 1000:
cls._metrics["acquisition_times"] = cls._metrics["acquisition_times"][-1000:]
logger.debug(
"PostgreSQL connection acquired",
extra={
"acquisition_time_ms": acquisition_time,
"pool_size": cls._pool.get_size() if cls._pool else 0,
"pool_free": cls._pool.get_idle_size() if cls._pool else 0
}
)
return conn
except Exception as e:
cls._metrics["connections_failed"] += 1
logger.error("Failed to acquire PostgreSQL connection", exc_info=e)
raise
@classmethod
async def release_connection(cls, conn: asyncpg.Connection) -> None:
"""
Release a connection back to the pool.
Args:
conn: Connection to release
Raises:
RuntimeError: If connection pool is not initialized
"""
if cls._pool is None:
raise RuntimeError("PostgreSQL connection pool not initialized")
try:
await cls._pool.release(conn)
cls._metrics["connections_released"] += 1
logger.debug(
"PostgreSQL connection released",
extra={
"pool_size": cls._pool.get_size() if cls._pool else 0,
"pool_free": cls._pool.get_idle_size() if cls._pool else 0
}
)
except Exception as e:
logger.error("Failed to release PostgreSQL connection", exc_info=e)
# Don't raise - we don't want to break the application if release fails
@classmethod
async def close(cls) -> None:
"""
Close all connections in the pool.
Called during application shutdown.
Gracefully closes all connections and cleans up resources.
"""
if cls._pool is None:
logger.warning("PostgreSQL connection pool not initialized, nothing to close")
return
try:
logger.info("Closing PostgreSQL connection pool")
await cls._pool.close()
cls._pool = None
logger.info("PostgreSQL connection pool closed successfully")
except Exception as e:
logger.error("Error closing PostgreSQL connection pool", exc_info=e)
# Set pool to None anyway to allow re-initialization
cls._pool = None
@classmethod
def is_initialized(cls) -> bool:
"""
Check if connection pool is initialized.
Returns:
bool: True if pool is initialized, False otherwise
"""
return cls._pool is not None
@classmethod
def get_pool_metrics(cls) -> Dict[str, Any]:
"""
Get connection pool metrics.
Returns:
Dictionary containing pool metrics
"""
metrics = {
"connections_acquired": cls._metrics["connections_acquired"],
"connections_released": cls._metrics["connections_released"],
"connections_failed": cls._metrics["connections_failed"],
"health_check_failures": cls._metrics["health_check_failures"],
}
# Calculate average acquisition time
acquisition_times = cls._metrics["acquisition_times"]
if acquisition_times:
metrics["avg_acquisition_time_ms"] = sum(acquisition_times) / len(acquisition_times)
metrics["max_acquisition_time_ms"] = max(acquisition_times)
metrics["min_acquisition_time_ms"] = min(acquisition_times)
else:
metrics["avg_acquisition_time_ms"] = 0.0
metrics["max_acquisition_time_ms"] = 0.0
metrics["min_acquisition_time_ms"] = 0.0
# Get pool status
if cls._pool:
metrics["pool_size"] = cls._pool.get_size()
metrics["pool_free"] = cls._pool.get_idle_size()
metrics["pool_used"] = cls._pool.get_size() - cls._pool.get_idle_size()
else:
metrics["pool_size"] = 0
metrics["pool_free"] = 0
metrics["pool_used"] = 0
return metrics
@classmethod
def reset_metrics(cls) -> None:
"""
Reset connection pool metrics.
Useful for testing or periodic metric resets.
"""
cls._metrics = {
"connections_acquired": 0,
"connections_released": 0,
"connections_failed": 0,
"health_check_failures": 0,
"acquisition_times": [],
}
# Public API
async def connect_to_postgres() -> None:
"""
Initialize PostgreSQL connection pool.
Should be called during application startup.
"""
await PostgreSQLConnectionPool.initialize()
async def close_postgres_connection() -> None:
"""
Close PostgreSQL connection pool.
Should be called during application shutdown.
"""
await PostgreSQLConnectionPool.close()
async def get_postgres_connection() -> asyncpg.Connection:
"""
Get a connection from the PostgreSQL pool.
Returns:
asyncpg.Connection: Database connection
Usage:
conn = await get_postgres_connection()
try:
# Use connection
result = await conn.fetch("SELECT * FROM table")
finally:
await release_postgres_connection(conn)
"""
return await PostgreSQLConnectionPool.get_connection()
async def release_postgres_connection(conn: asyncpg.Connection) -> None:
"""
Release a connection back to the pool.
Args:
conn: Connection to release
"""
await PostgreSQLConnectionPool.release_connection(conn)
def is_postgres_connected() -> bool:
"""
Check if PostgreSQL connection pool is initialized.
Returns:
bool: True if connected, False otherwise
"""
return PostgreSQLConnectionPool.is_initialized()
def get_connection_pool_metrics() -> Dict[str, Any]:
"""
Get connection pool metrics.
Returns:
Dictionary containing pool metrics
"""
return PostgreSQLConnectionPool.get_pool_metrics()
def reset_connection_pool_metrics() -> None:
"""
Reset connection pool metrics.
"""
PostgreSQLConnectionPool.reset_metrics()