Spaces:
Runtime error
Runtime error
File size: 5,394 Bytes
9a1712b c88cd08 60b7790 9a1712b 60b7790 c88cd08 60b7790 f7e4b81 9a1712b c88cd08 60b7790 9a1712b 60b7790 9a1712b c88cd08 60b7790 c88cd08 60b7790 f7e4b81 60b7790 9a1712b f7e4b81 60b7790 9a1712b c88cd08 9a1712b 60b7790 f7e4b81 60b7790 9a1712b f7e4b81 60b7790 f7e4b81 9a1712b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | """
database/db.py β Async SQLAlchemy engine and session factory.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ROOT CAUSE (why previous fixes failed):
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Even with NullPool + statement_cache_size=0 in connect_args,
SQLAlchemy's asyncpg *dialect* runs this internally during
connection setup:
[SQL: select pg_catalog.version()]
It executes this as a prepared statement using its OWN internal
mechanism β BEFORE our connect_args are applied to the session.
PgBouncer sees the second client try to prepare the same statement
on the recycled connection and raises:
DuplicatePreparedStatementError:
prepared statement "__asyncpg_stmt_2__" already exists
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
THE DEFINITIVE FIX β async_creator:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
SQLAlchemy 2.x supports an `async_creator` parameter.
When provided, SQLAlchemy skips its entire dialect connection
bootstrap and uses our raw asyncpg connection directly.
We create that connection with statement_cache_size=0, so no
prepared statements are EVER created β the conflict is impossible.
This is the only approach that fully resolves the issue with
Supabase Transaction Pooler (PgBouncer in transaction mode).
"""
import ssl as ssl_module
import asyncpg
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,
)
from sqlalchemy.pool import NullPool
from config import config
from database.models import Base
# ββ Build a plain DSN for asyncpg (no SQLAlchemy dialect prefix) ββββββββββ
# asyncpg.connect() expects "postgresql://..." not "postgresql+asyncpg://..."
_PLAIN_DSN: str = config.DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://")
# Build an SSL context that requires SSL but doesn't verify the cert
# (Supabase's cert chain can vary by region; this keeps it simple).
_SSL_CTX = ssl_module.create_default_context()
_SSL_CTX.check_hostname = False
_SSL_CTX.verify_mode = ssl_module.CERT_NONE
async def _asyncpg_creator() -> asyncpg.Connection:
"""
Raw asyncpg connection factory passed to SQLAlchemy as async_creator.
Key parameters:
statement_cache_size=0 β disables ALL prepared-statement caching
inside asyncpg. This is the single setting
that PgBouncer's own error message tells us
to use. By using async_creator we guarantee
it applies to *every* connection, including
the internal dialect init queries.
"""
return await asyncpg.connect(
dsn=_PLAIN_DSN,
ssl=_SSL_CTX,
statement_cache_size=0, # β THE critical fix
timeout=30,
command_timeout=60,
)
# ββ Engine ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
engine = create_async_engine(
# The URL still needs to be a valid SQLAlchemy URL so the dialect is
# loaded; async_creator overrides the actual connection creation.
config.DATABASE_URL,
# async_creator: bypass SQLAlchemy's dialect connection bootstrap.
# Our _asyncpg_creator() is called for every new connection instead.
async_creator=_asyncpg_creator,
# NullPool: don't let SQLAlchemy pool connections.
# PgBouncer (Supabase) is the pool β two pools stacked = conflicts.
poolclass=NullPool,
echo=False, # Set True temporarily to log SQL queries while debugging
)
# ββ Session factory βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
AsyncSessionFactory = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=True,
)
# ββ Public helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def init_db() -> None:
"""Create all tables defined in models.py (idempotent β safe to re-run)."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("β
Database tables initialized.")
async def get_session() -> AsyncSession:
"""
Async context-manager that yields a DB session.
Usage (in every crud/handler function):
async with AsyncSessionFactory() as session:
result = await session.execute(...)
"""
async with AsyncSessionFactory() as session:
yield session |