File size: 2,537 Bytes
b143975
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
PostgreSQL connection pool management for Analytics Microservice.
Adapted from cuatrolabs-scm-ms/app/postgres.py.
"""
import ssl
import asyncpg
from typing import Optional
from app.core.logging import get_logger
from app.core.config import settings

logger = get_logger(__name__)


class PostgreSQLConnectionPool:
    _pool: Optional[asyncpg.Pool] = None

    @classmethod
    async def initialize(cls) -> None:
        if cls._pool is not None:
            logger.warning("PostgreSQL pool already initialized")
            return
        try:
            ssl_context = None
            mode = (settings.POSTGRES_SSL_MODE or "disable").lower()
            if mode != "disable":
                ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
                ssl_context.check_hostname = False
                ssl_context.verify_mode = ssl.CERT_NONE

            cls._pool = await asyncpg.create_pool(
                host=settings.POSTGRES_HOST,
                port=settings.POSTGRES_PORT,
                database=settings.POSTGRES_DB,
                user=settings.POSTGRES_USER,
                password=settings.POSTGRES_PASSWORD,
                min_size=settings.POSTGRES_MIN_POOL_SIZE,
                max_size=settings.POSTGRES_MAX_POOL_SIZE,
                command_timeout=30.0,
                timeout=30.0,
                ssl=ssl_context,
            )
            async with cls._pool.acquire() as conn:
                await conn.fetchval("SELECT 1")
            logger.info("PostgreSQL pool initialized", extra={
                "event": "postgres_connected",
                "host": settings.POSTGRES_HOST,
                "db": settings.POSTGRES_DB,
            })
        except Exception as e:
            logger.error("Failed to initialize PostgreSQL pool", exc_info=e)
            raise

    @classmethod
    async def close(cls) -> None:
        if cls._pool:
            await cls._pool.close()
            cls._pool = None
            logger.info("PostgreSQL pool closed", extra={"event": "postgres_disconnected"})

    @classmethod
    def get_pool(cls) -> asyncpg.Pool:
        if cls._pool is None:
            raise RuntimeError("PostgreSQL pool not initialized. Call connect_to_postgres() first.")
        return cls._pool


async def connect_to_postgres() -> None:
    await PostgreSQLConnectionPool.initialize()


async def close_postgres_connection() -> None:
    await PostgreSQLConnectionPool.close()


def get_postgres_pool() -> asyncpg.Pool:
    return PostgreSQLConnectionPool.get_pool()