File size: 7,239 Bytes
e327f0d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4bb922
e327f0d
 
 
f4bb922
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e327f0d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d0ce1e8
 
 
 
 
 
 
 
e327f0d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
"""
backend/database.py
PostgreSQL + SQLAlchemy 2.0 async engine ve session factory.

Diger modullerin (auth, ml_service, worker, main, ws) kullanacagi temel altyapi:
    from database import Base, get_db, AsyncSessionLocal, engine

Driver: asyncpg
Pool: production-grade defaults (pool_size=10, max_overflow=20, pool_pre_ping)
URL: DATABASE_URL ortam degiskeninden. Eski (sync) DATABASE_URL "postgresql://"
ile baslarsa otomatik olarak "postgresql+asyncpg://"a normalize edilir.

Not: Bu surum sync (psycopg2) helper'larini Pilot/MVP'den kaldirdi.
Eski endpoint'ler ORM uzerinden async olarak yeniden yazilmalidir.
"""
from __future__ import annotations

import logging
import os
import socket
from contextlib import asynccontextmanager
from typing import AsyncIterator

# ---------------------------------------------------------------------------
# IPv4-only DNS resolution (Render + Supabase compatibility shim)
# ---------------------------------------------------------------------------
# Render free tier has no IPv6 routes. Supabase publishes AAAA records (and
# the new shared pooler hosts publish BOTH A and AAAA). asyncpg's connect
# pipeline calls asyncio.loop.getaddrinfo with family=0 (AF_UNSPEC), which
# returns AAAA first on dual-stack hosts; the IPv6 connect attempt then
# fails with `OSError: [Errno 101] Network is unreachable` and asyncpg
# bubbles up the IPv6 error instead of falling back to A.
#
# Setting FORCE_IPV4=1 (default in Render env) hard-pins getaddrinfo to
# AF_INET so every DNS lookup — asyncpg, psycopg2, requests, boto3 — only
# sees IPv4. This is the smallest patch that makes Supabase work on Render
# without paying for the $4/mo IPv4 add-on.
if os.getenv("FORCE_IPV4", "1") == "1":
    _orig_getaddrinfo = socket.getaddrinfo

    def _ipv4_only_getaddrinfo(host, port, family=0, *args, **kwargs):
        return _orig_getaddrinfo(host, port, socket.AF_INET, *args, **kwargs)

    socket.getaddrinfo = _ipv4_only_getaddrinfo

from sqlalchemy.ext.asyncio import (
    AsyncEngine,
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase

try:
    # Backend kendi paketi olarak import edilirken
    from config import settings  # type: ignore
    _DEFAULT_URL = getattr(
        settings,
        "database_url_async",
        getattr(settings, "database_url", "postgresql+asyncpg://postgres:postgres@db:5432/arac_hasar"),
    )
except Exception:  # pragma: no cover - alembic context'inden import edildiginde
    _DEFAULT_URL = os.getenv(
        "DATABASE_URL_ASYNC",
        os.getenv(
            "DATABASE_URL",
            "postgresql+asyncpg://postgres:postgres@db:5432/arac_hasar",
        ),
    )


logger = logging.getLogger(__name__)


def _normalize_async_url(url: str) -> str:
    """`postgresql://` -> `postgresql+asyncpg://` (defansif normalizasyon)."""
    if url.startswith("postgresql://"):
        return "postgresql+asyncpg://" + url[len("postgresql://") :]
    if url.startswith("postgres://"):
        return "postgresql+asyncpg://" + url[len("postgres://") :]
    return url


DATABASE_URL: str = _normalize_async_url(_DEFAULT_URL)


# ---------------- Declarative base ----------------

class Base(DeclarativeBase):
    """Tum ORM modelleri bu base'i miras alir.

    SQLAlchemy 2.0 style declarative base.
    `db_models.py` icindeki modeller burayi import eder.
    """


# ---------------- Engine + session factory ----------------

# pool_pre_ping=True: kopan baglantilari (Supabase pooler, kontainer restart)
#   yakalar; ilk SELECT 1 testi yapar.
# pool_recycle=1800: 30 dk uzerindeki connection'lari geri donusturur.
# pool_size + max_overflow: pilot trafik icin makul; observability'den izleyip
#   gerekirse ENV ile artirilir.
#
# Supabase / PgBouncer (transaction mode) notu:
#   - Transaction pooler arkasinda calisirken `DB_USE_PGBOUNCER=true` set edilmeli.
#   - Bu durumda asyncpg statement cache devre disi (prepared statement sorunu)
#     ve pool_size kucuk tutulur (PgBouncer kendi pool'unu yonetir).
_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "10"))
_MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "20"))
_POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30"))
_POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "1800"))
_SQL_ECHO = os.getenv("DB_ECHO", "false").lower() == "true"
# Auto-detect: any Supabase pooler URL (port 6543 or "pooler.supabase.com" in host)
# implies pgBouncer transaction mode → prepared statements MUST be disabled.
# Explicit DB_USE_PGBOUNCER=true still wins for non-Supabase pgBouncer setups.
_USE_PGBOUNCER = (
    os.getenv("DB_USE_PGBOUNCER", "false").lower() == "true"
    or ":6543/" in DATABASE_URL
    or "pooler.supabase.com" in DATABASE_URL
)


def _create_engine(url: str = DATABASE_URL) -> AsyncEngine:
    kwargs: dict = dict(
        echo=_SQL_ECHO,
        pool_size=_POOL_SIZE,
        max_overflow=_MAX_OVERFLOW,
        pool_timeout=_POOL_TIMEOUT,
        pool_recycle=_POOL_RECYCLE,
        pool_pre_ping=True,
        future=True,
    )
    if _USE_PGBOUNCER:
        # asyncpg: PgBouncer transaction mode'da prepared statement cache kapali olmali.
        kwargs["connect_args"] = {
            "statement_cache_size": 0,
            "prepared_statement_cache_size": 0,
        }
    return create_async_engine(url, **kwargs)


engine: AsyncEngine = _create_engine()

AsyncSessionLocal: async_sessionmaker[AsyncSession] = async_sessionmaker(
    bind=engine,
    expire_on_commit=False,
    autoflush=False,
    autocommit=False,
    class_=AsyncSession,
)


# ---------------- FastAPI dependency ----------------

async def get_db() -> AsyncIterator[AsyncSession]:
    """FastAPI dependency. Endpoint imzasinda kullan:

        from database import get_db

        @router.get("/...")
        async def handler(db: AsyncSession = Depends(get_db)):
            ...

    Rollback on exception, commit'i caller yapmali (explicit).
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()


@asynccontextmanager
async def session_scope() -> AsyncIterator[AsyncSession]:
    """Worker/script icin context manager. Otomatik commit + rollback.

        async with session_scope() as db:
            db.add(obj)
            # commit otomatik (exception yoksa)
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()


# ---------------- Healthcheck ----------------

async def ping() -> bool:
    """SELECT 1 — readiness probe icin."""
    from sqlalchemy import text

    try:
        async with engine.connect() as conn:
            result = await conn.execute(text("SELECT 1"))
            return result.scalar() == 1
    except Exception as exc:  # pragma: no cover
        logger.warning("DB ping failed: %s", exc)
        return False


__all__ = [
    "Base",
    "DATABASE_URL",
    "engine",
    "AsyncSessionLocal",
    "get_db",
    "session_scope",
    "ping",
]