| """ |
| PostgreSQL async connection manager with retry logic. |
| """ |
| import asyncio |
| from typing import Optional |
| from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine |
| from sqlalchemy import text |
|
|
| from app.core.config import settings |
| from app.core.logging import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| async def create_pg_engine(max_retries: int = 3) -> AsyncEngine: |
| """ |
| Create async PostgreSQL engine with connection pooling and retry logic. |
| |
| Args: |
| max_retries: Maximum number of connection attempts (default: 3) |
| |
| Returns: |
| AsyncEngine: SQLAlchemy async engine instance |
| |
| Raises: |
| Exception: If connection fails after all retry attempts |
| """ |
| pg_url = settings.DATABASE_URL |
|
|
| |
| connect_args = {} |
| if settings.DB_SSLMODE == "require": |
| connect_args["ssl"] = "require" |
| elif settings.DB_SSLMODE == "prefer": |
| import ssl as _ssl |
| _ssl_ctx = _ssl.create_default_context() |
| _ssl_ctx.check_hostname = False |
| _ssl_ctx.verify_mode = _ssl.CERT_NONE |
| connect_args["ssl"] = _ssl_ctx |
| |
| base_delay = 1.0 |
| |
| for attempt in range(max_retries): |
| try: |
| logger.info( |
| f"Connecting to PostgreSQL: {settings.DB_HOST}:{settings.DB_PORT} " |
| f"(attempt {attempt + 1}/{max_retries})" |
| ) |
| |
| |
| engine = create_async_engine( |
| pg_url, |
| echo=False, |
| pool_pre_ping=True, |
| pool_size=settings.DB_MIN_POOL_SIZE, |
| max_overflow=settings.DB_MAX_POOL_SIZE - settings.DB_MIN_POOL_SIZE, |
| connect_args=connect_args |
| ) |
| |
| |
| async with engine.begin() as conn: |
| await conn.execute(text("SELECT 1")) |
| |
| logger.info("✅ Connected to PostgreSQL successfully") |
| return engine |
| |
| except Exception as e: |
| if attempt < max_retries - 1: |
| delay = base_delay * (2 ** attempt) |
| logger.warning( |
| f"PostgreSQL connection failed, retrying in {delay}s", |
| extra={ |
| "attempt": attempt + 1, |
| "max_retries": max_retries, |
| "error": str(e) |
| } |
| ) |
| await asyncio.sleep(delay) |
| else: |
| logger.error( |
| f"PostgreSQL connection failed after {max_retries} attempts", |
| extra={"error": str(e)} |
| ) |
| raise |
| |
| |
| raise Exception("Failed to create PostgreSQL engine") |
|
|