Ahmad3g's picture
h
60b7790
"""
database/db.py β€” Async SQLAlchemy engine and session factory.
═══════════════════════════════════════════════════════════════════
ROOT CAUSE (why previous fixes failed):
═══════════════════════════════════════════════════════════════════
Even with NullPool + statement_cache_size=0 in connect_args,
SQLAlchemy's asyncpg *dialect* runs this internally during
connection setup:
[SQL: select pg_catalog.version()]
It executes this as a prepared statement using its OWN internal
mechanism β€” BEFORE our connect_args are applied to the session.
PgBouncer sees the second client try to prepare the same statement
on the recycled connection and raises:
DuplicatePreparedStatementError:
prepared statement "__asyncpg_stmt_2__" already exists
═══════════════════════════════════════════════════════════════════
THE DEFINITIVE FIX β€” async_creator:
═══════════════════════════════════════════════════════════════════
SQLAlchemy 2.x supports an `async_creator` parameter.
When provided, SQLAlchemy skips its entire dialect connection
bootstrap and uses our raw asyncpg connection directly.
We create that connection with statement_cache_size=0, so no
prepared statements are EVER created β€” the conflict is impossible.
This is the only approach that fully resolves the issue with
Supabase Transaction Pooler (PgBouncer in transaction mode).
"""
import ssl as ssl_module
import asyncpg
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,
)
from sqlalchemy.pool import NullPool
from config import config
from database.models import Base
# ── Build a plain DSN for asyncpg (no SQLAlchemy dialect prefix) ──────────
# asyncpg.connect() expects "postgresql://..." not "postgresql+asyncpg://..."
_PLAIN_DSN: str = config.DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://")
# Build an SSL context that requires SSL but doesn't verify the cert
# (Supabase's cert chain can vary by region; this keeps it simple).
_SSL_CTX = ssl_module.create_default_context()
_SSL_CTX.check_hostname = False
_SSL_CTX.verify_mode = ssl_module.CERT_NONE
async def _asyncpg_creator() -> asyncpg.Connection:
"""
Raw asyncpg connection factory passed to SQLAlchemy as async_creator.
Key parameters:
statement_cache_size=0 β€” disables ALL prepared-statement caching
inside asyncpg. This is the single setting
that PgBouncer's own error message tells us
to use. By using async_creator we guarantee
it applies to *every* connection, including
the internal dialect init queries.
"""
return await asyncpg.connect(
dsn=_PLAIN_DSN,
ssl=_SSL_CTX,
statement_cache_size=0, # ← THE critical fix
timeout=30,
command_timeout=60,
)
# ── Engine ────────────────────────────────────────────────────────────────
engine = create_async_engine(
# The URL still needs to be a valid SQLAlchemy URL so the dialect is
# loaded; async_creator overrides the actual connection creation.
config.DATABASE_URL,
# async_creator: bypass SQLAlchemy's dialect connection bootstrap.
# Our _asyncpg_creator() is called for every new connection instead.
async_creator=_asyncpg_creator,
# NullPool: don't let SQLAlchemy pool connections.
# PgBouncer (Supabase) is the pool β€” two pools stacked = conflicts.
poolclass=NullPool,
echo=False, # Set True temporarily to log SQL queries while debugging
)
# ── Session factory ───────────────────────────────────────────────────────
AsyncSessionFactory = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=True,
)
# ── Public helpers ────────────────────────────────────────────────────────
async def init_db() -> None:
"""Create all tables defined in models.py (idempotent β€” safe to re-run)."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("βœ… Database tables initialized.")
async def get_session() -> AsyncSession:
"""
Async context-manager that yields a DB session.
Usage (in every crud/handler function):
async with AsyncSessionFactory() as session:
result = await session.execute(...)
"""
async with AsyncSessionFactory() as session:
yield session