Spaces:
Sleeping
Sleeping
| """ | |
| backend/database.py | |
| PostgreSQL + SQLAlchemy 2.0 async engine ve session factory. | |
| Diger modullerin (auth, ml_service, worker, main, ws) kullanacagi temel altyapi: | |
| from database import Base, get_db, AsyncSessionLocal, engine | |
| Driver: asyncpg | |
| Pool: production-grade defaults (pool_size=10, max_overflow=20, pool_pre_ping) | |
| URL: DATABASE_URL ortam degiskeninden. Eski (sync) DATABASE_URL "postgresql://" | |
| ile baslarsa otomatik olarak "postgresql+asyncpg://"a normalize edilir. | |
| Not: Bu surum sync (psycopg2) helper'larini Pilot/MVP'den kaldirdi. | |
| Eski endpoint'ler ORM uzerinden async olarak yeniden yazilmalidir. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import socket | |
| from contextlib import asynccontextmanager | |
| from typing import AsyncIterator | |
| # --------------------------------------------------------------------------- | |
| # IPv4-only DNS resolution (Render + Supabase compatibility shim) | |
| # --------------------------------------------------------------------------- | |
| # Render free tier has no IPv6 routes. Supabase publishes AAAA records (and | |
| # the new shared pooler hosts publish BOTH A and AAAA). asyncpg's connect | |
| # pipeline calls asyncio.loop.getaddrinfo with family=0 (AF_UNSPEC), which | |
| # returns AAAA first on dual-stack hosts; the IPv6 connect attempt then | |
| # fails with `OSError: [Errno 101] Network is unreachable` and asyncpg | |
| # bubbles up the IPv6 error instead of falling back to A. | |
| # | |
| # Setting FORCE_IPV4=1 (default in Render env) hard-pins getaddrinfo to | |
| # AF_INET so every DNS lookup — asyncpg, psycopg2, requests, boto3 — only | |
| # sees IPv4. This is the smallest patch that makes Supabase work on Render | |
| # without paying for the $4/mo IPv4 add-on. | |
| if os.getenv("FORCE_IPV4", "1") == "1": | |
| _orig_getaddrinfo = socket.getaddrinfo | |
| def _ipv4_only_getaddrinfo(host, port, family=0, *args, **kwargs): | |
| return _orig_getaddrinfo(host, port, socket.AF_INET, *args, **kwargs) | |
| socket.getaddrinfo = _ipv4_only_getaddrinfo | |
| from sqlalchemy.ext.asyncio import ( | |
| AsyncEngine, | |
| AsyncSession, | |
| async_sessionmaker, | |
| create_async_engine, | |
| ) | |
| from sqlalchemy.orm import DeclarativeBase | |
| try: | |
| # Backend kendi paketi olarak import edilirken | |
| from config import settings # type: ignore | |
| _DEFAULT_URL = getattr( | |
| settings, | |
| "database_url_async", | |
| getattr(settings, "database_url", "postgresql+asyncpg://postgres:postgres@db:5432/arac_hasar"), | |
| ) | |
| except Exception: # pragma: no cover - alembic context'inden import edildiginde | |
| _DEFAULT_URL = os.getenv( | |
| "DATABASE_URL_ASYNC", | |
| os.getenv( | |
| "DATABASE_URL", | |
| "postgresql+asyncpg://postgres:postgres@db:5432/arac_hasar", | |
| ), | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def _normalize_async_url(url: str) -> str: | |
| """`postgresql://` -> `postgresql+asyncpg://` (defansif normalizasyon).""" | |
| if url.startswith("postgresql://"): | |
| return "postgresql+asyncpg://" + url[len("postgresql://") :] | |
| if url.startswith("postgres://"): | |
| return "postgresql+asyncpg://" + url[len("postgres://") :] | |
| return url | |
| DATABASE_URL: str = _normalize_async_url(_DEFAULT_URL) | |
| # ---------------- Declarative base ---------------- | |
| class Base(DeclarativeBase): | |
| """Tum ORM modelleri bu base'i miras alir. | |
| SQLAlchemy 2.0 style declarative base. | |
| `db_models.py` icindeki modeller burayi import eder. | |
| """ | |
| # ---------------- Engine + session factory ---------------- | |
| # pool_pre_ping=True: kopan baglantilari (Supabase pooler, kontainer restart) | |
| # yakalar; ilk SELECT 1 testi yapar. | |
| # pool_recycle=1800: 30 dk uzerindeki connection'lari geri donusturur. | |
| # pool_size + max_overflow: pilot trafik icin makul; observability'den izleyip | |
| # gerekirse ENV ile artirilir. | |
| # | |
| # Supabase / PgBouncer (transaction mode) notu: | |
| # - Transaction pooler arkasinda calisirken `DB_USE_PGBOUNCER=true` set edilmeli. | |
| # - Bu durumda asyncpg statement cache devre disi (prepared statement sorunu) | |
| # ve pool_size kucuk tutulur (PgBouncer kendi pool'unu yonetir). | |
| _POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "10")) | |
| _MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "20")) | |
| _POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30")) | |
| _POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "1800")) | |
| _SQL_ECHO = os.getenv("DB_ECHO", "false").lower() == "true" | |
| # Auto-detect: any Supabase pooler URL (port 6543 or "pooler.supabase.com" in host) | |
| # implies pgBouncer transaction mode → prepared statements MUST be disabled. | |
| # Explicit DB_USE_PGBOUNCER=true still wins for non-Supabase pgBouncer setups. | |
| _USE_PGBOUNCER = ( | |
| os.getenv("DB_USE_PGBOUNCER", "false").lower() == "true" | |
| or ":6543/" in DATABASE_URL | |
| or "pooler.supabase.com" in DATABASE_URL | |
| ) | |
| def _create_engine(url: str = DATABASE_URL) -> AsyncEngine: | |
| kwargs: dict = dict( | |
| echo=_SQL_ECHO, | |
| pool_size=_POOL_SIZE, | |
| max_overflow=_MAX_OVERFLOW, | |
| pool_timeout=_POOL_TIMEOUT, | |
| pool_recycle=_POOL_RECYCLE, | |
| pool_pre_ping=True, | |
| future=True, | |
| ) | |
| if _USE_PGBOUNCER: | |
| # asyncpg: PgBouncer transaction mode'da prepared statement cache kapali olmali. | |
| kwargs["connect_args"] = { | |
| "statement_cache_size": 0, | |
| "prepared_statement_cache_size": 0, | |
| } | |
| return create_async_engine(url, **kwargs) | |
| engine: AsyncEngine = _create_engine() | |
| AsyncSessionLocal: async_sessionmaker[AsyncSession] = async_sessionmaker( | |
| bind=engine, | |
| expire_on_commit=False, | |
| autoflush=False, | |
| autocommit=False, | |
| class_=AsyncSession, | |
| ) | |
| # ---------------- FastAPI dependency ---------------- | |
| async def get_db() -> AsyncIterator[AsyncSession]: | |
| """FastAPI dependency. Endpoint imzasinda kullan: | |
| from database import get_db | |
| @router.get("/...") | |
| async def handler(db: AsyncSession = Depends(get_db)): | |
| ... | |
| Rollback on exception, commit'i caller yapmali (explicit). | |
| """ | |
| async with AsyncSessionLocal() as session: | |
| try: | |
| yield session | |
| except Exception: | |
| await session.rollback() | |
| raise | |
| finally: | |
| await session.close() | |
| async def session_scope() -> AsyncIterator[AsyncSession]: | |
| """Worker/script icin context manager. Otomatik commit + rollback. | |
| async with session_scope() as db: | |
| db.add(obj) | |
| # commit otomatik (exception yoksa) | |
| """ | |
| async with AsyncSessionLocal() as session: | |
| try: | |
| yield session | |
| await session.commit() | |
| except Exception: | |
| await session.rollback() | |
| raise | |
| finally: | |
| await session.close() | |
| # ---------------- Healthcheck ---------------- | |
| async def ping() -> bool: | |
| """SELECT 1 — readiness probe icin.""" | |
| from sqlalchemy import text | |
| try: | |
| async with engine.connect() as conn: | |
| result = await conn.execute(text("SELECT 1")) | |
| return result.scalar() == 1 | |
| except Exception as exc: # pragma: no cover | |
| logger.warning("DB ping failed: %s", exc) | |
| return False | |
| __all__ = [ | |
| "Base", | |
| "DATABASE_URL", | |
| "engine", | |
| "AsyncSessionLocal", | |
| "get_db", | |
| "session_scope", | |
| "ping", | |
| ] | |