| """Async PostgreSQL connection management.""" | |
| from sqlalchemy.engine import make_url | |
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker | |
| from sqlalchemy.orm import declarative_base | |
| from src.config.settings import settings | |
| # asyncpg doesn't support libpq query params like sslmode/channel_binding. | |
| # Use SQLAlchemy's URL parser to strip all query params cleanly, then pass ssl via connect_args. | |
| _url = make_url(settings.postgres_connstring).set(drivername="postgresql+asyncpg", query={}) | |
| # Separate asyncpg engine for PGVector with prepared_statement_cache_size=0. | |
| # PGVector runs advisory_lock + CREATE EXTENSION as a single multi-statement string. | |
| # asyncpg normally uses prepared statements which reject multi-statement SQL. | |
| # Setting cache_size=0 forces asyncpg to use execute() instead of prepare(), | |
| # which supports multiple statements — no psycopg3 needed, no ProactorEventLoop issue. | |
| _pgvector_engine = create_async_engine( | |
| _url, | |
| pool_pre_ping=True, | |
| connect_args={ | |
| "ssl": "require", | |
| "prepared_statement_cache_size": 0, | |
| }, | |
| ) | |
| engine = create_async_engine( | |
| _url, | |
| echo=False, | |
| pool_pre_ping=True, | |
| pool_size=5, | |
| max_overflow=10, | |
| connect_args={"ssl": "require"}, | |
| ) | |
| AsyncSessionLocal = async_sessionmaker( | |
| engine, | |
| class_=AsyncSession, | |
| expire_on_commit=False, | |
| autocommit=False, | |
| autoflush=False | |
| ) | |
| Base = declarative_base() | |
| async def get_db(): | |
| """Get database session dependency for FastAPI.""" | |
| async with AsyncSessionLocal() as session: | |
| try: | |
| yield session | |
| finally: | |
| await session.close() | |