File size: 2,973 Bytes
7f9f4b4 09f05a1 f4a6ef3 60538f4 f4a6ef3 60538f4 7f9f4b4 60538f4 7f9f4b4 60538f4 7f9f4b4 60538f4 7f9f4b4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | """
PostgreSQL async connection manager with retry logic.
"""
import asyncio
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlalchemy import text
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
async def create_pg_engine(max_retries: int = 3) -> AsyncEngine:
"""
Create async PostgreSQL engine with connection pooling and retry logic.
Args:
max_retries: Maximum number of connection attempts (default: 3)
Returns:
AsyncEngine: SQLAlchemy async engine instance
Raises:
Exception: If connection fails after all retry attempts
"""
pg_url = settings.DATABASE_URL
# Add SSL mode to connection args based on DB_SSLMODE setting
connect_args = {}
if settings.DB_SSLMODE == "require":
connect_args["ssl"] = "require"
elif settings.DB_SSLMODE == "prefer":
import ssl as _ssl
_ssl_ctx = _ssl.create_default_context()
_ssl_ctx.check_hostname = False
_ssl_ctx.verify_mode = _ssl.CERT_NONE
connect_args["ssl"] = _ssl_ctx
base_delay = 1.0
for attempt in range(max_retries):
try:
logger.info(
f"Connecting to PostgreSQL: {settings.DB_HOST}:{settings.DB_PORT} "
f"(attempt {attempt + 1}/{max_retries})"
)
# Create engine with connection pooling from config
engine = create_async_engine(
pg_url,
echo=False,
pool_pre_ping=True, # Enable connection health checks
pool_size=settings.DB_MIN_POOL_SIZE,
max_overflow=settings.DB_MAX_POOL_SIZE - settings.DB_MIN_POOL_SIZE,
connect_args=connect_args
)
# Test connection
async with engine.begin() as conn:
await conn.execute(text("SELECT 1"))
logger.info("✅ Connected to PostgreSQL successfully")
return engine
except Exception as e:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt) # Exponential backoff: 1s, 2s, 4s
logger.warning(
f"PostgreSQL connection failed, retrying in {delay}s",
extra={
"attempt": attempt + 1,
"max_retries": max_retries,
"error": str(e)
}
)
await asyncio.sleep(delay)
else:
logger.error(
f"PostgreSQL connection failed after {max_retries} attempts",
extra={"error": str(e)}
)
raise
# This should never be reached due to raise in the loop
raise Exception("Failed to create PostgreSQL engine")
|