""" database/db.py — Async SQLAlchemy engine and session factory. ═══════════════════════════════════════════════════════════════════ ROOT CAUSE (why previous fixes failed): ═══════════════════════════════════════════════════════════════════ Even with NullPool + statement_cache_size=0 in connect_args, SQLAlchemy's asyncpg *dialect* runs this internally during connection setup: [SQL: select pg_catalog.version()] It executes this as a prepared statement using its OWN internal mechanism — BEFORE our connect_args are applied to the session. PgBouncer sees the second client try to prepare the same statement on the recycled connection and raises: DuplicatePreparedStatementError: prepared statement "__asyncpg_stmt_2__" already exists ═══════════════════════════════════════════════════════════════════ THE DEFINITIVE FIX — async_creator: ═══════════════════════════════════════════════════════════════════ SQLAlchemy 2.x supports an `async_creator` parameter. When provided, SQLAlchemy skips its entire dialect connection bootstrap and uses our raw asyncpg connection directly. We create that connection with statement_cache_size=0, so no prepared statements are EVER created — the conflict is impossible. This is the only approach that fully resolves the issue with Supabase Transaction Pooler (PgBouncer in transaction mode). """ import ssl as ssl_module import asyncpg from sqlalchemy.ext.asyncio import ( AsyncSession, create_async_engine, async_sessionmaker, ) from sqlalchemy.pool import NullPool from config import config from database.models import Base # ── Build a plain DSN for asyncpg (no SQLAlchemy dialect prefix) ────────── # asyncpg.connect() expects "postgresql://..." not "postgresql+asyncpg://..." _PLAIN_DSN: str = config.DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://") # Build an SSL context that requires SSL but doesn't verify the cert # (Supabase's cert chain can vary by region; this keeps it simple). _SSL_CTX = ssl_module.create_default_context() _SSL_CTX.check_hostname = False _SSL_CTX.verify_mode = ssl_module.CERT_NONE async def _asyncpg_creator() -> asyncpg.Connection: """ Raw asyncpg connection factory passed to SQLAlchemy as async_creator. Key parameters: statement_cache_size=0 — disables ALL prepared-statement caching inside asyncpg. This is the single setting that PgBouncer's own error message tells us to use. By using async_creator we guarantee it applies to *every* connection, including the internal dialect init queries. """ return await asyncpg.connect( dsn=_PLAIN_DSN, ssl=_SSL_CTX, statement_cache_size=0, # ← THE critical fix timeout=30, command_timeout=60, ) # ── Engine ──────────────────────────────────────────────────────────────── engine = create_async_engine( # The URL still needs to be a valid SQLAlchemy URL so the dialect is # loaded; async_creator overrides the actual connection creation. config.DATABASE_URL, # async_creator: bypass SQLAlchemy's dialect connection bootstrap. # Our _asyncpg_creator() is called for every new connection instead. async_creator=_asyncpg_creator, # NullPool: don't let SQLAlchemy pool connections. # PgBouncer (Supabase) is the pool — two pools stacked = conflicts. poolclass=NullPool, echo=False, # Set True temporarily to log SQL queries while debugging ) # ── Session factory ─────────────────────────────────────────────────────── AsyncSessionFactory = async_sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False, autoflush=True, ) # ── Public helpers ──────────────────────────────────────────────────────── async def init_db() -> None: """Create all tables defined in models.py (idempotent — safe to re-run).""" async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) print("✅ Database tables initialized.") async def get_session() -> AsyncSession: """ Async context-manager that yields a DB session. Usage (in every crud/handler function): async with AsyncSessionFactory() as session: result = await session.execute(...) """ async with AsyncSessionFactory() as session: yield session