hasari-api / services /backend /database.py
erdoganpeker's picture
fix(deploy): auto-disable prepared statements on Supabase pooler
d0ce1e8
"""
backend/database.py
PostgreSQL + SQLAlchemy 2.0 async engine ve session factory.
Diger modullerin (auth, ml_service, worker, main, ws) kullanacagi temel altyapi:
from database import Base, get_db, AsyncSessionLocal, engine
Driver: asyncpg
Pool: production-grade defaults (pool_size=10, max_overflow=20, pool_pre_ping)
URL: DATABASE_URL ortam degiskeninden. Eski (sync) DATABASE_URL "postgresql://"
ile baslarsa otomatik olarak "postgresql+asyncpg://"a normalize edilir.
Not: Bu surum sync (psycopg2) helper'larini Pilot/MVP'den kaldirdi.
Eski endpoint'ler ORM uzerinden async olarak yeniden yazilmalidir.
"""
from __future__ import annotations
import logging
import os
import socket
from contextlib import asynccontextmanager
from typing import AsyncIterator
# ---------------------------------------------------------------------------
# IPv4-only DNS resolution (Render + Supabase compatibility shim)
# ---------------------------------------------------------------------------
# Render free tier has no IPv6 routes. Supabase publishes AAAA records (and
# the new shared pooler hosts publish BOTH A and AAAA). asyncpg's connect
# pipeline calls asyncio.loop.getaddrinfo with family=0 (AF_UNSPEC), which
# returns AAAA first on dual-stack hosts; the IPv6 connect attempt then
# fails with `OSError: [Errno 101] Network is unreachable` and asyncpg
# bubbles up the IPv6 error instead of falling back to A.
#
# Setting FORCE_IPV4=1 (default in Render env) hard-pins getaddrinfo to
# AF_INET so every DNS lookup — asyncpg, psycopg2, requests, boto3 — only
# sees IPv4. This is the smallest patch that makes Supabase work on Render
# without paying for the $4/mo IPv4 add-on.
if os.getenv("FORCE_IPV4", "1") == "1":
_orig_getaddrinfo = socket.getaddrinfo
def _ipv4_only_getaddrinfo(host, port, family=0, *args, **kwargs):
return _orig_getaddrinfo(host, port, socket.AF_INET, *args, **kwargs)
socket.getaddrinfo = _ipv4_only_getaddrinfo
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
try:
# Backend kendi paketi olarak import edilirken
from config import settings # type: ignore
_DEFAULT_URL = getattr(
settings,
"database_url_async",
getattr(settings, "database_url", "postgresql+asyncpg://postgres:postgres@db:5432/arac_hasar"),
)
except Exception: # pragma: no cover - alembic context'inden import edildiginde
_DEFAULT_URL = os.getenv(
"DATABASE_URL_ASYNC",
os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@db:5432/arac_hasar",
),
)
logger = logging.getLogger(__name__)
def _normalize_async_url(url: str) -> str:
"""`postgresql://` -> `postgresql+asyncpg://` (defansif normalizasyon)."""
if url.startswith("postgresql://"):
return "postgresql+asyncpg://" + url[len("postgresql://") :]
if url.startswith("postgres://"):
return "postgresql+asyncpg://" + url[len("postgres://") :]
return url
DATABASE_URL: str = _normalize_async_url(_DEFAULT_URL)
# ---------------- Declarative base ----------------
class Base(DeclarativeBase):
"""Tum ORM modelleri bu base'i miras alir.
SQLAlchemy 2.0 style declarative base.
`db_models.py` icindeki modeller burayi import eder.
"""
# ---------------- Engine + session factory ----------------
# pool_pre_ping=True: kopan baglantilari (Supabase pooler, kontainer restart)
# yakalar; ilk SELECT 1 testi yapar.
# pool_recycle=1800: 30 dk uzerindeki connection'lari geri donusturur.
# pool_size + max_overflow: pilot trafik icin makul; observability'den izleyip
# gerekirse ENV ile artirilir.
#
# Supabase / PgBouncer (transaction mode) notu:
# - Transaction pooler arkasinda calisirken `DB_USE_PGBOUNCER=true` set edilmeli.
# - Bu durumda asyncpg statement cache devre disi (prepared statement sorunu)
# ve pool_size kucuk tutulur (PgBouncer kendi pool'unu yonetir).
_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "10"))
_MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "20"))
_POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30"))
_POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "1800"))
_SQL_ECHO = os.getenv("DB_ECHO", "false").lower() == "true"
# Auto-detect: any Supabase pooler URL (port 6543 or "pooler.supabase.com" in host)
# implies pgBouncer transaction mode → prepared statements MUST be disabled.
# Explicit DB_USE_PGBOUNCER=true still wins for non-Supabase pgBouncer setups.
_USE_PGBOUNCER = (
os.getenv("DB_USE_PGBOUNCER", "false").lower() == "true"
or ":6543/" in DATABASE_URL
or "pooler.supabase.com" in DATABASE_URL
)
def _create_engine(url: str = DATABASE_URL) -> AsyncEngine:
kwargs: dict = dict(
echo=_SQL_ECHO,
pool_size=_POOL_SIZE,
max_overflow=_MAX_OVERFLOW,
pool_timeout=_POOL_TIMEOUT,
pool_recycle=_POOL_RECYCLE,
pool_pre_ping=True,
future=True,
)
if _USE_PGBOUNCER:
# asyncpg: PgBouncer transaction mode'da prepared statement cache kapali olmali.
kwargs["connect_args"] = {
"statement_cache_size": 0,
"prepared_statement_cache_size": 0,
}
return create_async_engine(url, **kwargs)
engine: AsyncEngine = _create_engine()
AsyncSessionLocal: async_sessionmaker[AsyncSession] = async_sessionmaker(
bind=engine,
expire_on_commit=False,
autoflush=False,
autocommit=False,
class_=AsyncSession,
)
# ---------------- FastAPI dependency ----------------
async def get_db() -> AsyncIterator[AsyncSession]:
"""FastAPI dependency. Endpoint imzasinda kullan:
from database import get_db
@router.get("/...")
async def handler(db: AsyncSession = Depends(get_db)):
...
Rollback on exception, commit'i caller yapmali (explicit).
"""
async with AsyncSessionLocal() as session:
try:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()
@asynccontextmanager
async def session_scope() -> AsyncIterator[AsyncSession]:
"""Worker/script icin context manager. Otomatik commit + rollback.
async with session_scope() as db:
db.add(obj)
# commit otomatik (exception yoksa)
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# ---------------- Healthcheck ----------------
async def ping() -> bool:
"""SELECT 1 — readiness probe icin."""
from sqlalchemy import text
try:
async with engine.connect() as conn:
result = await conn.execute(text("SELECT 1"))
return result.scalar() == 1
except Exception as exc: # pragma: no cover
logger.warning("DB ping failed: %s", exc)
return False
__all__ = [
"Base",
"DATABASE_URL",
"engine",
"AsyncSessionLocal",
"get_db",
"session_scope",
"ping",
]