MukeshKapoor25's picture
Initial commit
b143975
"""
PostgreSQL connection pool management for Analytics Microservice.
Adapted from cuatrolabs-scm-ms/app/postgres.py.
"""
import ssl
import asyncpg
from typing import Optional
from app.core.logging import get_logger
from app.core.config import settings
logger = get_logger(__name__)
class PostgreSQLConnectionPool:
_pool: Optional[asyncpg.Pool] = None
@classmethod
async def initialize(cls) -> None:
if cls._pool is not None:
logger.warning("PostgreSQL pool already initialized")
return
try:
ssl_context = None
mode = (settings.POSTGRES_SSL_MODE or "disable").lower()
if mode != "disable":
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
cls._pool = await asyncpg.create_pool(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
database=settings.POSTGRES_DB,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
min_size=settings.POSTGRES_MIN_POOL_SIZE,
max_size=settings.POSTGRES_MAX_POOL_SIZE,
command_timeout=30.0,
timeout=30.0,
ssl=ssl_context,
)
async with cls._pool.acquire() as conn:
await conn.fetchval("SELECT 1")
logger.info("PostgreSQL pool initialized", extra={
"event": "postgres_connected",
"host": settings.POSTGRES_HOST,
"db": settings.POSTGRES_DB,
})
except Exception as e:
logger.error("Failed to initialize PostgreSQL pool", exc_info=e)
raise
@classmethod
async def close(cls) -> None:
if cls._pool:
await cls._pool.close()
cls._pool = None
logger.info("PostgreSQL pool closed", extra={"event": "postgres_disconnected"})
@classmethod
def get_pool(cls) -> asyncpg.Pool:
if cls._pool is None:
raise RuntimeError("PostgreSQL pool not initialized. Call connect_to_postgres() first.")
return cls._pool
async def connect_to_postgres() -> None:
await PostgreSQLConnectionPool.initialize()
async def close_postgres_connection() -> None:
await PostgreSQLConnectionPool.close()
def get_postgres_pool() -> asyncpg.Pool:
return PostgreSQLConnectionPool.get_pool()