| """ |
| Fallback database configuration for AegisLM SaaS Backend. |
| |
| SQLite fallback database implementation with automatic failover |
| when PostgreSQL is unavailable. |
| """ |
|
|
| import asyncio |
| import logging |
| from typing import AsyncGenerator, Optional, Tuple, Any |
| from pathlib import Path |
| from sqlalchemy import create_engine, text, event |
| from sqlalchemy.orm import sessionmaker |
| from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker |
| from sqlalchemy.exc import SQLAlchemyError, IllegalStateChangeError |
| from sqlalchemy.pool import StaticPool |
| import time |
|
|
| from fastapi import HTTPException, status |
|
|
| from core.config import settings |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| primary_engine: Optional[Any] = None |
| fallback_engine: Optional[Any] = None |
| AsyncSessionLocal: Optional[async_sessionmaker] = None |
| FallbackSessionLocal: Optional[async_sessionmaker] = None |
| current_db_type: str = "primary" |
|
|
|
|
| async def init_database_engines(): |
| """Initialize both primary and fallback database engines.""" |
| global primary_engine, fallback_engine, AsyncSessionLocal, FallbackSessionLocal |
| |
| |
| try: |
| primary_engine = create_async_engine( |
| settings.DATABASE_URL, |
| pool_pre_ping=True, |
| pool_recycle=3600, |
| pool_size=settings.DATABASE_POOL_SIZE, |
| max_overflow=settings.DATABASE_MAX_OVERFLOW, |
| echo=settings.DEBUG |
| ) |
| |
| AsyncSessionLocal = async_sessionmaker( |
| primary_engine, |
| class_=AsyncSession, |
| expire_on_commit=False |
| ) |
| |
| logger.info("Primary PostgreSQL engine initialized") |
| |
| except Exception as e: |
| logger.error(f"Failed to initialize primary PostgreSQL engine: {e}") |
| primary_engine = None |
| AsyncSessionLocal = None |
| |
| |
| if settings.ENABLE_SQLITE_FALLBACK: |
| try: |
| |
| db_path = Path(settings.SQLITE_DATABASE_PATH) |
| db_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| sqlite_url = f"sqlite+aiosqlite:///{settings.SQLITE_DATABASE_PATH}" |
| |
| fallback_engine = create_async_engine( |
| sqlite_url, |
| poolclass=StaticPool, |
| connect_args={ |
| "check_same_thread": False, |
| "timeout": settings.SQLITE_FALLBACK_TIMEOUT |
| }, |
| echo=settings.DEBUG |
| ) |
| |
| FallbackSessionLocal = async_sessionmaker( |
| fallback_engine, |
| class_=AsyncSession, |
| expire_on_commit=False |
| ) |
| |
| |
| @event.listens_for(fallback_engine.sync_engine, "connect") |
| def set_sqlite_pragma(dbapi_connection, connection_record): |
| cursor = dbapi_connection.cursor() |
| cursor.execute("PRAGMA foreign_keys=ON") |
| cursor.close() |
| |
| logger.info("Fallback SQLite engine initialized") |
| |
| except Exception as e: |
| logger.error(f"Failed to initialize fallback SQLite engine: {e}") |
| fallback_engine = None |
| FallbackSessionLocal = None |
|
|
|
|
| async def get_active_session() -> Tuple[AsyncSession, str]: |
| """ |
| Get an active database session with automatic fallback. |
| |
| Returns: |
| Tuple[AsyncSession, str]: Database session and database type ("primary" or "fallback") |
| """ |
| global current_db_type |
| |
| |
| if current_db_type == "primary" and primary_engine and AsyncSessionLocal: |
| try: |
| session = AsyncSessionLocal() |
| |
| await session.execute(text("SELECT 1")) |
| return session, "primary" |
| except Exception as e: |
| logger.warning(f"Primary database connection failed: {e}") |
| await session.close() |
| current_db_type = "fallback" |
| |
| |
| if settings.ENABLE_SQLITE_FALLBACK and fallback_engine and FallbackSessionLocal: |
| session = None |
| try: |
| session = FallbackSessionLocal() |
| |
| await session.execute(text("SELECT 1")) |
| logger.info("Using SQLite fallback database") |
| return session, "fallback" |
| except Exception as e: |
| logger.error(f"Fallback database connection failed: {e}") |
| if session: |
| await session.close() |
| raise Exception("Both primary and fallback databases are unavailable") |
| |
| |
| if AsyncSessionLocal: |
| session = None |
| try: |
| session = AsyncSessionLocal() |
| await session.execute(text("SELECT 1")) |
| current_db_type = "primary" |
| return session, "primary" |
| except Exception as e: |
| logger.error(f"Primary database reconnection failed: {e}") |
| if session: |
| await session.close() |
| |
| raise Exception("No database connection available") |
|
|
|
|
| async def get_db() -> AsyncGenerator[AsyncSession, None]: |
| """ |
| Dependency to get async database session with automatic fallback. |
| |
| Yields: |
| AsyncSession: Database session |
| """ |
| session = None |
| try: |
| session, db_type = await get_active_session() |
| logger.debug(f"Using {db_type} database session") |
| yield session |
| except Exception as e: |
| if session: |
| try: |
| |
| if session.is_active: |
| await session.rollback() |
| except Exception as rollback_err: |
| logger.debug(f"Could not rollback session: {rollback_err}") |
| logger.error(f"Database session error: {e}") |
| raise |
| finally: |
| if session: |
| try: |
| await session.close() |
| except Exception as close_err: |
| logger.debug(f"Error closing session: {close_err}") |
|
|
|
|
| async def get_primary_db() -> AsyncGenerator[AsyncSession, None]: |
| """ |
| Dependency to get primary PostgreSQL session only (no fallback). |
| |
| Yields: |
| AsyncSession: Primary database session |
| """ |
| if not AsyncSessionLocal: |
| logger.error("Primary database engine not initialized") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="Primary database connection unavailable" |
| ) |
| |
| async with AsyncSessionLocal() as session: |
| try: |
| yield session |
| except Exception as e: |
| try: |
| await session.rollback() |
| except IllegalStateChangeError: |
| pass |
| logger.error(f"Primary database session error: {e}") |
| raise |
| |
|
|
|
|
| async def get_fallback_db() -> AsyncGenerator[AsyncSession, None]: |
| """ |
| Dependency to get SQLite fallback session only. |
| |
| Yields: |
| AsyncSession: Fallback database session |
| """ |
| if not settings.ENABLE_SQLITE_FALLBACK or not fallback_engine or not FallbackSessionLocal: |
| raise Exception("Fallback database not available") |
| |
| |
| if FallbackSessionLocal is None: |
| raise Exception("Fallback database session maker not initialized") |
| |
| async with FallbackSessionLocal() as session: |
| try: |
| yield session |
| except Exception as e: |
| try: |
| await session.rollback() |
| except IllegalStateChangeError: |
| pass |
| logger.error(f"Fallback database session error: {e}") |
| raise |
| |
|
|
|
|
|
|
| async def init_databases(): |
| """Initialize both primary and fallback databases.""" |
| await init_database_engines() |
| |
| |
| from core.database import Base |
| |
| |
| if primary_engine: |
| try: |
| |
| logger.info("Primary database connected successfully") |
| except Exception as e: |
| logger.error(f"Failed to connect to primary database: {e}") |
| |
| |
| if settings.ENABLE_SQLITE_FALLBACK and fallback_engine: |
| try: |
| async with fallback_engine.begin() as conn: |
| await conn.run_sync(Base.metadata.create_all) |
| logger.info("Fallback database tables initialized") |
| except Exception as e: |
| logger.error(f"Failed to initialize fallback database tables: {e}") |
|
|
|
|
| async def close_databases(): |
| """Close all database connections.""" |
| global primary_engine, fallback_engine, AsyncSessionLocal, FallbackSessionLocal |
| |
| if primary_engine: |
| await primary_engine.dispose() |
| primary_engine = None |
| logger.info("Primary database connection closed") |
| |
| if fallback_engine: |
| await fallback_engine.dispose() |
| fallback_engine = None |
| logger.info("Fallback database connection closed") |
| |
| AsyncSessionLocal = None |
| FallbackSessionLocal = None |
|
|
|
|
| async def check_primary_health() -> bool: |
| """ |
| Check primary database health. |
| |
| Returns: |
| bool: True if primary database is healthy |
| """ |
| if not primary_engine or not AsyncSessionLocal: |
| return False |
| |
| try: |
| |
| if AsyncSessionLocal is None: |
| return False |
| |
| async with AsyncSessionLocal() as session: |
| await session.execute(text("SELECT 1")) |
| return True |
| except Exception: |
| return False |
|
|
|
|
| async def check_fallback_health() -> bool: |
| """ |
| Check fallback database health. |
| |
| Returns: |
| bool: True if fallback database is healthy |
| """ |
| if not settings.ENABLE_SQLITE_FALLBACK or not fallback_engine or not FallbackSessionLocal: |
| return False |
| |
| try: |
| |
| if FallbackSessionLocal is None: |
| return False |
| |
| async with FallbackSessionLocal() as session: |
| await session.execute(text("SELECT 1")) |
| return True |
| except Exception: |
| return False |
|
|
|
|
| async def check_database_health() -> Tuple[bool, str]: |
| """ |
| Check overall database health and current active database. |
| |
| Returns: |
| Tuple[bool, str]: Health status and active database type |
| """ |
| global current_db_type |
| |
| primary_healthy = await check_primary_health() |
| fallback_healthy = await check_fallback_health() |
| |
| if current_db_type == "primary" and primary_healthy: |
| return True, "primary" |
| elif current_db_type == "fallback" and fallback_healthy: |
| return True, "fallback" |
| elif primary_healthy: |
| current_db_type = "primary" |
| return True, "primary" |
| elif fallback_healthy: |
| current_db_type = "fallback" |
| return True, "fallback" |
| else: |
| return False, "none" |
|
|
|
|
| async def switch_to_primary(): |
| """Force switch back to primary database if available.""" |
| global current_db_type |
| |
| if await check_primary_health(): |
| current_db_type = "primary" |
| logger.info("Switched back to primary database") |
| return True |
| return False |
|
|
|
|
| async def switch_to_fallback(): |
| """Force switch to fallback database.""" |
| global current_db_type |
| |
| if settings.ENABLE_SQLITE_FALLBACK and await check_fallback_health(): |
| current_db_type = "fallback" |
| logger.info("Switched to fallback database") |
| return True |
| return False |
|
|
|
|
| def get_current_database_type() -> str: |
| """ |
| Get the current active database type. |
| |
| Returns: |
| str: "primary", "fallback", or "none" |
| """ |
| return current_db_type |
|
|
|
|
| |
| Base = None |
|
|