""" PostgreSQL async connection manager with retry logic. """ import asyncio from typing import Optional from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from sqlalchemy import text from app.core.config import settings from app.core.logging import get_logger logger = get_logger(__name__) async def create_pg_engine(max_retries: int = 3) -> AsyncEngine: """ Create async PostgreSQL engine with connection pooling and retry logic. Args: max_retries: Maximum number of connection attempts (default: 3) Returns: AsyncEngine: SQLAlchemy async engine instance Raises: Exception: If connection fails after all retry attempts """ pg_url = settings.DATABASE_URL # Add SSL mode to connection args based on DB_SSLMODE setting connect_args = {} if settings.DB_SSLMODE == "require": connect_args["ssl"] = "require" elif settings.DB_SSLMODE == "prefer": import ssl as _ssl _ssl_ctx = _ssl.create_default_context() _ssl_ctx.check_hostname = False _ssl_ctx.verify_mode = _ssl.CERT_NONE connect_args["ssl"] = _ssl_ctx base_delay = 1.0 for attempt in range(max_retries): try: logger.info( f"Connecting to PostgreSQL: {settings.DB_HOST}:{settings.DB_PORT} " f"(attempt {attempt + 1}/{max_retries})" ) # Create engine with connection pooling from config engine = create_async_engine( pg_url, echo=False, pool_pre_ping=True, # Enable connection health checks pool_size=settings.DB_MIN_POOL_SIZE, max_overflow=settings.DB_MAX_POOL_SIZE - settings.DB_MIN_POOL_SIZE, connect_args=connect_args ) # Test connection async with engine.begin() as conn: await conn.execute(text("SELECT 1")) logger.info("✅ Connected to PostgreSQL successfully") return engine except Exception as e: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) # Exponential backoff: 1s, 2s, 4s logger.warning( f"PostgreSQL connection failed, retrying in {delay}s", extra={ "attempt": attempt + 1, "max_retries": max_retries, "error": str(e) } ) await asyncio.sleep(delay) else: logger.error( f"PostgreSQL connection failed after {max_retries} attempts", extra={"error": str(e)} ) raise # This should never be reached due to raise in the loop raise Exception("Failed to create PostgreSQL engine")