MyPal / app /database /connection.py
KhaledSalehKL1's picture
Deploy 9XAIPal: Gradio+FastAPI app, backend, React build
1086e43 verified
Raw
History Blame Contribute Delete
2.01 kB
"""PostgreSQL async engine and session factory."""
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import text
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""Yield a database session."""
async with async_session_factory() as session:
yield session
async def verify_connection() -> None:
"""Verify PostgreSQL connectivity and pgvector extension."""
async with engine.begin() as conn:
result = await conn.execute(text("SELECT 1"))
result.scalar_one()
logger.info("PostgreSQL connection verified")
# Verify pgvector
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
logger.info("pgvector extension verified")
# Synchronous Database Engine and Session Factory for Celery Workers
from collections.abc import Generator
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
sync_engine = create_engine(
settings.database_url_sync,
echo=settings.debug,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
)
sync_session_factory = sessionmaker(
bind=sync_engine,
expire_on_commit=False,
)
@contextmanager
def sync_session() -> Generator[Session, None, None]:
"""Yield a synchronous database session within a transaction."""
session = sync_session_factory()
try:
yield session
except Exception:
session.rollback()
raise
finally:
session.close()