File size: 2,973 Bytes
7f9f4b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
09f05a1
 
f4a6ef3
60538f4
 
 
f4a6ef3
 
 
 
 
 
60538f4
7f9f4b4
 
 
 
 
60538f4
7f9f4b4
 
 
60538f4
7f9f4b4
 
 
 
60538f4
 
 
7f9f4b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
PostgreSQL async connection manager with retry logic.
"""
import asyncio
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlalchemy import text

from app.core.config import settings
from app.core.logging import get_logger

logger = get_logger(__name__)


async def create_pg_engine(max_retries: int = 3) -> AsyncEngine:
    """
    Create async PostgreSQL engine with connection pooling and retry logic.
    
    Args:
        max_retries: Maximum number of connection attempts (default: 3)
    
    Returns:
        AsyncEngine: SQLAlchemy async engine instance
    
    Raises:
        Exception: If connection fails after all retry attempts
    """
    pg_url = settings.DATABASE_URL

    # Add SSL mode to connection args based on DB_SSLMODE setting
    connect_args = {}
    if settings.DB_SSLMODE == "require":
        connect_args["ssl"] = "require"
    elif settings.DB_SSLMODE == "prefer":
        import ssl as _ssl
        _ssl_ctx = _ssl.create_default_context()
        _ssl_ctx.check_hostname = False
        _ssl_ctx.verify_mode = _ssl.CERT_NONE
        connect_args["ssl"] = _ssl_ctx
    
    base_delay = 1.0
    
    for attempt in range(max_retries):
        try:
            logger.info(
                f"Connecting to PostgreSQL: {settings.DB_HOST}:{settings.DB_PORT} "
                f"(attempt {attempt + 1}/{max_retries})"
            )
            
            # Create engine with connection pooling from config
            engine = create_async_engine(
                pg_url,
                echo=False,
                pool_pre_ping=True,  # Enable connection health checks
                pool_size=settings.DB_MIN_POOL_SIZE,
                max_overflow=settings.DB_MAX_POOL_SIZE - settings.DB_MIN_POOL_SIZE,
                connect_args=connect_args
            )
            
            # Test connection
            async with engine.begin() as conn:
                await conn.execute(text("SELECT 1"))
            
            logger.info("✅ Connected to PostgreSQL successfully")
            return engine
            
        except Exception as e:
            if attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)  # Exponential backoff: 1s, 2s, 4s
                logger.warning(
                    f"PostgreSQL connection failed, retrying in {delay}s",
                    extra={
                        "attempt": attempt + 1,
                        "max_retries": max_retries,
                        "error": str(e)
                    }
                )
                await asyncio.sleep(delay)
            else:
                logger.error(
                    f"PostgreSQL connection failed after {max_retries} attempts",
                    extra={"error": str(e)}
                )
                raise
    
    # This should never be reached due to raise in the loop
    raise Exception("Failed to create PostgreSQL engine")