Spaces:
Sleeping
Sleeping
File size: 7,239 Bytes
e327f0d f4bb922 e327f0d f4bb922 e327f0d d0ce1e8 e327f0d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | """
backend/database.py
PostgreSQL + SQLAlchemy 2.0 async engine ve session factory.
Diger modullerin (auth, ml_service, worker, main, ws) kullanacagi temel altyapi:
from database import Base, get_db, AsyncSessionLocal, engine
Driver: asyncpg
Pool: production-grade defaults (pool_size=10, max_overflow=20, pool_pre_ping)
URL: DATABASE_URL ortam degiskeninden. Eski (sync) DATABASE_URL "postgresql://"
ile baslarsa otomatik olarak "postgresql+asyncpg://"a normalize edilir.
Not: Bu surum sync (psycopg2) helper'larini Pilot/MVP'den kaldirdi.
Eski endpoint'ler ORM uzerinden async olarak yeniden yazilmalidir.
"""
from __future__ import annotations
import logging
import os
import socket
from contextlib import asynccontextmanager
from typing import AsyncIterator
# ---------------------------------------------------------------------------
# IPv4-only DNS resolution (Render + Supabase compatibility shim)
# ---------------------------------------------------------------------------
# Render free tier has no IPv6 routes. Supabase publishes AAAA records (and
# the new shared pooler hosts publish BOTH A and AAAA). asyncpg's connect
# pipeline calls asyncio.loop.getaddrinfo with family=0 (AF_UNSPEC), which
# returns AAAA first on dual-stack hosts; the IPv6 connect attempt then
# fails with `OSError: [Errno 101] Network is unreachable` and asyncpg
# bubbles up the IPv6 error instead of falling back to A.
#
# Setting FORCE_IPV4=1 (default in Render env) hard-pins getaddrinfo to
# AF_INET so every DNS lookup — asyncpg, psycopg2, requests, boto3 — only
# sees IPv4. This is the smallest patch that makes Supabase work on Render
# without paying for the $4/mo IPv4 add-on.
if os.getenv("FORCE_IPV4", "1") == "1":
_orig_getaddrinfo = socket.getaddrinfo
def _ipv4_only_getaddrinfo(host, port, family=0, *args, **kwargs):
return _orig_getaddrinfo(host, port, socket.AF_INET, *args, **kwargs)
socket.getaddrinfo = _ipv4_only_getaddrinfo
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
try:
# Backend kendi paketi olarak import edilirken
from config import settings # type: ignore
_DEFAULT_URL = getattr(
settings,
"database_url_async",
getattr(settings, "database_url", "postgresql+asyncpg://postgres:postgres@db:5432/arac_hasar"),
)
except Exception: # pragma: no cover - alembic context'inden import edildiginde
_DEFAULT_URL = os.getenv(
"DATABASE_URL_ASYNC",
os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@db:5432/arac_hasar",
),
)
logger = logging.getLogger(__name__)
def _normalize_async_url(url: str) -> str:
"""`postgresql://` -> `postgresql+asyncpg://` (defansif normalizasyon)."""
if url.startswith("postgresql://"):
return "postgresql+asyncpg://" + url[len("postgresql://") :]
if url.startswith("postgres://"):
return "postgresql+asyncpg://" + url[len("postgres://") :]
return url
DATABASE_URL: str = _normalize_async_url(_DEFAULT_URL)
# ---------------- Declarative base ----------------
class Base(DeclarativeBase):
"""Tum ORM modelleri bu base'i miras alir.
SQLAlchemy 2.0 style declarative base.
`db_models.py` icindeki modeller burayi import eder.
"""
# ---------------- Engine + session factory ----------------
# pool_pre_ping=True: kopan baglantilari (Supabase pooler, kontainer restart)
# yakalar; ilk SELECT 1 testi yapar.
# pool_recycle=1800: 30 dk uzerindeki connection'lari geri donusturur.
# pool_size + max_overflow: pilot trafik icin makul; observability'den izleyip
# gerekirse ENV ile artirilir.
#
# Supabase / PgBouncer (transaction mode) notu:
# - Transaction pooler arkasinda calisirken `DB_USE_PGBOUNCER=true` set edilmeli.
# - Bu durumda asyncpg statement cache devre disi (prepared statement sorunu)
# ve pool_size kucuk tutulur (PgBouncer kendi pool'unu yonetir).
_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "10"))
_MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "20"))
_POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30"))
_POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "1800"))
_SQL_ECHO = os.getenv("DB_ECHO", "false").lower() == "true"
# Auto-detect: any Supabase pooler URL (port 6543 or "pooler.supabase.com" in host)
# implies pgBouncer transaction mode → prepared statements MUST be disabled.
# Explicit DB_USE_PGBOUNCER=true still wins for non-Supabase pgBouncer setups.
_USE_PGBOUNCER = (
os.getenv("DB_USE_PGBOUNCER", "false").lower() == "true"
or ":6543/" in DATABASE_URL
or "pooler.supabase.com" in DATABASE_URL
)
def _create_engine(url: str = DATABASE_URL) -> AsyncEngine:
kwargs: dict = dict(
echo=_SQL_ECHO,
pool_size=_POOL_SIZE,
max_overflow=_MAX_OVERFLOW,
pool_timeout=_POOL_TIMEOUT,
pool_recycle=_POOL_RECYCLE,
pool_pre_ping=True,
future=True,
)
if _USE_PGBOUNCER:
# asyncpg: PgBouncer transaction mode'da prepared statement cache kapali olmali.
kwargs["connect_args"] = {
"statement_cache_size": 0,
"prepared_statement_cache_size": 0,
}
return create_async_engine(url, **kwargs)
engine: AsyncEngine = _create_engine()
AsyncSessionLocal: async_sessionmaker[AsyncSession] = async_sessionmaker(
bind=engine,
expire_on_commit=False,
autoflush=False,
autocommit=False,
class_=AsyncSession,
)
# ---------------- FastAPI dependency ----------------
async def get_db() -> AsyncIterator[AsyncSession]:
"""FastAPI dependency. Endpoint imzasinda kullan:
from database import get_db
@router.get("/...")
async def handler(db: AsyncSession = Depends(get_db)):
...
Rollback on exception, commit'i caller yapmali (explicit).
"""
async with AsyncSessionLocal() as session:
try:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()
@asynccontextmanager
async def session_scope() -> AsyncIterator[AsyncSession]:
"""Worker/script icin context manager. Otomatik commit + rollback.
async with session_scope() as db:
db.add(obj)
# commit otomatik (exception yoksa)
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# ---------------- Healthcheck ----------------
async def ping() -> bool:
"""SELECT 1 — readiness probe icin."""
from sqlalchemy import text
try:
async with engine.connect() as conn:
result = await conn.execute(text("SELECT 1"))
return result.scalar() == 1
except Exception as exc: # pragma: no cover
logger.warning("DB ping failed: %s", exc)
return False
__all__ = [
"Base",
"DATABASE_URL",
"engine",
"AsyncSessionLocal",
"get_db",
"session_scope",
"ping",
]
|