File size: 5,394 Bytes
9a1712b
 
c88cd08
60b7790
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a1712b
 
60b7790
 
 
c88cd08
 
 
 
 
60b7790
f7e4b81
9a1712b
 
 
c88cd08
60b7790
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a1712b
60b7790
 
9a1712b
c88cd08
60b7790
 
 
c88cd08
60b7790
 
 
f7e4b81
60b7790
9a1712b
 
f7e4b81
60b7790
9a1712b
 
 
c88cd08
9a1712b
 
 
 
60b7790
 
f7e4b81
60b7790
9a1712b
 
 
 
 
 
f7e4b81
 
 
60b7790
f7e4b81
 
 
9a1712b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
database/db.py β€” Async SQLAlchemy engine and session factory.

═══════════════════════════════════════════════════════════════════
ROOT CAUSE (why previous fixes failed):
═══════════════════════════════════════════════════════════════════
Even with NullPool + statement_cache_size=0 in connect_args,
SQLAlchemy's asyncpg *dialect* runs this internally during
connection setup:

    [SQL: select pg_catalog.version()]

It executes this as a prepared statement using its OWN internal
mechanism β€” BEFORE our connect_args are applied to the session.
PgBouncer sees the second client try to prepare the same statement
on the recycled connection and raises:

    DuplicatePreparedStatementError:
    prepared statement "__asyncpg_stmt_2__" already exists

═══════════════════════════════════════════════════════════════════
THE DEFINITIVE FIX β€” async_creator:
═══════════════════════════════════════════════════════════════════
SQLAlchemy 2.x supports an `async_creator` parameter.
When provided, SQLAlchemy skips its entire dialect connection
bootstrap and uses our raw asyncpg connection directly.
We create that connection with statement_cache_size=0, so no
prepared statements are EVER created β€” the conflict is impossible.

This is the only approach that fully resolves the issue with
Supabase Transaction Pooler (PgBouncer in transaction mode).
"""

import ssl as ssl_module
import asyncpg

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
    async_sessionmaker,
)
from sqlalchemy.pool import NullPool

from config import config
from database.models import Base


# ── Build a plain DSN for asyncpg (no SQLAlchemy dialect prefix) ──────────
# asyncpg.connect() expects "postgresql://..." not "postgresql+asyncpg://..."
_PLAIN_DSN: str = config.DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://")

# Build an SSL context that requires SSL but doesn't verify the cert
# (Supabase's cert chain can vary by region; this keeps it simple).
_SSL_CTX = ssl_module.create_default_context()
_SSL_CTX.check_hostname = False
_SSL_CTX.verify_mode = ssl_module.CERT_NONE


async def _asyncpg_creator() -> asyncpg.Connection:
    """
    Raw asyncpg connection factory passed to SQLAlchemy as async_creator.

    Key parameters:
      statement_cache_size=0  β€” disables ALL prepared-statement caching
                                inside asyncpg. This is the single setting
                                that PgBouncer's own error message tells us
                                to use. By using async_creator we guarantee
                                it applies to *every* connection, including
                                the internal dialect init queries.
    """
    return await asyncpg.connect(
        dsn=_PLAIN_DSN,
        ssl=_SSL_CTX,
        statement_cache_size=0,   # ← THE critical fix
        timeout=30,
        command_timeout=60,
    )


# ── Engine ────────────────────────────────────────────────────────────────
engine = create_async_engine(
    # The URL still needs to be a valid SQLAlchemy URL so the dialect is
    # loaded; async_creator overrides the actual connection creation.
    config.DATABASE_URL,

    # async_creator: bypass SQLAlchemy's dialect connection bootstrap.
    # Our _asyncpg_creator() is called for every new connection instead.
    async_creator=_asyncpg_creator,

    # NullPool: don't let SQLAlchemy pool connections.
    # PgBouncer (Supabase) is the pool β€” two pools stacked = conflicts.
    poolclass=NullPool,

    echo=False,  # Set True temporarily to log SQL queries while debugging
)


# ── Session factory ───────────────────────────────────────────────────────
AsyncSessionFactory = async_sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=True,
)


# ── Public helpers ────────────────────────────────────────────────────────

async def init_db() -> None:
    """Create all tables defined in models.py (idempotent β€” safe to re-run)."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    print("βœ… Database tables initialized.")


async def get_session() -> AsyncSession:
    """
    Async context-manager that yields a DB session.

    Usage (in every crud/handler function):
        async with AsyncSessionFactory() as session:
            result = await session.execute(...)
    """
    async with AsyncSessionFactory() as session:
        yield session