Ahmad3g commited on
Commit
60b7790
Β·
1 Parent(s): f7e4b81
Files changed (1) hide show
  1. database/db.py +79 -37
database/db.py CHANGED
@@ -1,61 +1,101 @@
1
  """
2
  database/db.py β€” Async SQLAlchemy engine and session factory.
3
 
4
- ROOT CAUSE OF THE BUG:
5
- Supabase uses PgBouncer in **Transaction Pooler** mode.
6
- PgBouncer reuses the same underlying DB connection for many different
7
- clients. asyncpg caches "prepared statements" per-connection. When
8
- PgBouncer hands the same connection to another client, asyncpg tries
9
- to create the same prepared statement again and PostgreSQL raises:
10
-
11
- DuplicatePreparedStatementError:
12
- prepared statement "__asyncpg_stmt_2__" already exists
13
-
14
- THE FIX β€” use NullPool:
15
- SQLAlchemy's NullPool opens a brand-new raw connection for every
16
- session and closes it immediately when the session ends.
17
- Because every connection is fresh, there is never a leftover prepared
18
- statement to conflict with. PgBouncer handles the actual pooling on
19
- its side, so SQLAlchemy doesn't need its own pool at all.
20
-
21
- This is the officially recommended approach for Supabase + asyncpg.
 
 
 
 
 
 
 
 
 
 
22
  """
23
 
 
 
 
24
  from sqlalchemy.ext.asyncio import (
25
  AsyncSession,
26
  create_async_engine,
27
  async_sessionmaker,
28
  )
29
- from sqlalchemy.pool import NullPool # ← THE KEY FIX
30
 
31
  from config import config
32
  from database.models import Base
33
 
34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  engine = create_async_engine(
 
 
36
  config.DATABASE_URL,
37
 
38
- # ── NullPool: no SQLAlchemy-level connection pooling ─────
39
- # PgBouncer (Supabase) is the pool. SQLAlchemy should not
40
- # add another caching layer on top of it.
41
- poolclass=NullPool,
42
-
43
- # ── asyncpg connection arguments ─────────────────────────
44
- connect_args={
45
- "ssl" : "require", # Mandatory for Supabase
46
- "timeout" : 30, # Connect timeout (seconds)
47
- "command_timeout": 60, # Query timeout (seconds)
48
 
49
- # Belt-and-suspenders: also disable the statement cache
50
- # inside asyncpg itself (the NullPool already prevents
51
- # the conflict, but this makes doubly sure).
52
- "statement_cache_size": 0,
53
- },
54
 
55
- echo=False, # Set True temporarily to log all SQL for debugging
56
  )
57
 
58
 
 
59
  AsyncSessionFactory = async_sessionmaker(
60
  bind=engine,
61
  class_=AsyncSession,
@@ -64,8 +104,10 @@ AsyncSessionFactory = async_sessionmaker(
64
  )
65
 
66
 
 
 
67
  async def init_db() -> None:
68
- """Create all tables defined in models.py if they don't exist yet."""
69
  async with engine.begin() as conn:
70
  await conn.run_sync(Base.metadata.create_all)
71
  print("βœ… Database tables initialized.")
@@ -75,7 +117,7 @@ async def get_session() -> AsyncSession:
75
  """
76
  Async context-manager that yields a DB session.
77
 
78
- Usage:
79
  async with AsyncSessionFactory() as session:
80
  result = await session.execute(...)
81
  """
 
1
  """
2
  database/db.py β€” Async SQLAlchemy engine and session factory.
3
 
4
+ ═══════════════════════════════════════════════════════════════════
5
+ ROOT CAUSE (why previous fixes failed):
6
+ ═══════════════════════════════════════════════════════════════════
7
+ Even with NullPool + statement_cache_size=0 in connect_args,
8
+ SQLAlchemy's asyncpg *dialect* runs this internally during
9
+ connection setup:
10
+
11
+ [SQL: select pg_catalog.version()]
12
+
13
+ It executes this as a prepared statement using its OWN internal
14
+ mechanism β€” BEFORE our connect_args are applied to the session.
15
+ PgBouncer sees the second client try to prepare the same statement
16
+ on the recycled connection and raises:
17
+
18
+ DuplicatePreparedStatementError:
19
+ prepared statement "__asyncpg_stmt_2__" already exists
20
+
21
+ ═══════════════════════════════════════════════════════════════════
22
+ THE DEFINITIVE FIX β€” async_creator:
23
+ ═══════════════════════════════════════════════════════════════════
24
+ SQLAlchemy 2.x supports an `async_creator` parameter.
25
+ When provided, SQLAlchemy skips its entire dialect connection
26
+ bootstrap and uses our raw asyncpg connection directly.
27
+ We create that connection with statement_cache_size=0, so no
28
+ prepared statements are EVER created β€” the conflict is impossible.
29
+
30
+ This is the only approach that fully resolves the issue with
31
+ Supabase Transaction Pooler (PgBouncer in transaction mode).
32
  """
33
 
34
+ import ssl as ssl_module
35
+ import asyncpg
36
+
37
  from sqlalchemy.ext.asyncio import (
38
  AsyncSession,
39
  create_async_engine,
40
  async_sessionmaker,
41
  )
42
+ from sqlalchemy.pool import NullPool
43
 
44
  from config import config
45
  from database.models import Base
46
 
47
 
48
+ # ── Build a plain DSN for asyncpg (no SQLAlchemy dialect prefix) ──────────
49
+ # asyncpg.connect() expects "postgresql://..." not "postgresql+asyncpg://..."
50
+ _PLAIN_DSN: str = config.DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://")
51
+
52
+ # Build an SSL context that requires SSL but doesn't verify the cert
53
+ # (Supabase's cert chain can vary by region; this keeps it simple).
54
+ _SSL_CTX = ssl_module.create_default_context()
55
+ _SSL_CTX.check_hostname = False
56
+ _SSL_CTX.verify_mode = ssl_module.CERT_NONE
57
+
58
+
59
+ async def _asyncpg_creator() -> asyncpg.Connection:
60
+ """
61
+ Raw asyncpg connection factory passed to SQLAlchemy as async_creator.
62
+
63
+ Key parameters:
64
+ statement_cache_size=0 β€” disables ALL prepared-statement caching
65
+ inside asyncpg. This is the single setting
66
+ that PgBouncer's own error message tells us
67
+ to use. By using async_creator we guarantee
68
+ it applies to *every* connection, including
69
+ the internal dialect init queries.
70
+ """
71
+ return await asyncpg.connect(
72
+ dsn=_PLAIN_DSN,
73
+ ssl=_SSL_CTX,
74
+ statement_cache_size=0, # ← THE critical fix
75
+ timeout=30,
76
+ command_timeout=60,
77
+ )
78
+
79
+
80
+ # ── Engine ────────────────────────────────────────────────────────────────
81
  engine = create_async_engine(
82
+ # The URL still needs to be a valid SQLAlchemy URL so the dialect is
83
+ # loaded; async_creator overrides the actual connection creation.
84
  config.DATABASE_URL,
85
 
86
+ # async_creator: bypass SQLAlchemy's dialect connection bootstrap.
87
+ # Our _asyncpg_creator() is called for every new connection instead.
88
+ async_creator=_asyncpg_creator,
 
 
 
 
 
 
 
89
 
90
+ # NullPool: don't let SQLAlchemy pool connections.
91
+ # PgBouncer (Supabase) is the pool β€” two pools stacked = conflicts.
92
+ poolclass=NullPool,
 
 
93
 
94
+ echo=False, # Set True temporarily to log SQL queries while debugging
95
  )
96
 
97
 
98
+ # ── Session factory ───────────────────────────────────────────────────────
99
  AsyncSessionFactory = async_sessionmaker(
100
  bind=engine,
101
  class_=AsyncSession,
 
104
  )
105
 
106
 
107
+ # ── Public helpers ────────────────────────────────────────────────────────
108
+
109
  async def init_db() -> None:
110
+ """Create all tables defined in models.py (idempotent β€” safe to re-run)."""
111
  async with engine.begin() as conn:
112
  await conn.run_sync(Base.metadata.create_all)
113
  print("βœ… Database tables initialized.")
 
117
  """
118
  Async context-manager that yields a DB session.
119
 
120
+ Usage (in every crud/handler function):
121
  async with AsyncSessionFactory() as session:
122
  result = await session.execute(...)
123
  """