| """ |
| SQLite fallback database migration script for AegisLM SaaS Backend. |
| |
| Handles initialization, migration, and synchronization between |
| PostgreSQL primary and SQLite fallback databases. |
| """ |
|
|
| import asyncio |
| import logging |
| from pathlib import Path |
| from typing import Optional |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy import text |
|
|
| from core.config import settings |
| from core.database import get_primary_db_only |
| from core.fallback_database import ( |
| init_database_engines, |
| check_primary_health, |
| check_fallback_health, |
| get_fallback_db |
| ) |
| from core.sqlite_compatibility import migrate_postgres_to_sqlite, sync_sqlite_to_postgresql |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SQLiteFallbackManager: |
| """Manages SQLite fallback database operations.""" |
| |
| def __init__(self): |
| self.sqlite_db_path = Path(settings.SQLITE_DATABASE_PATH) |
| self.initialized = False |
| |
| async def initialize_fallback_database(self) -> bool: |
| """ |
| Initialize SQLite fallback database with schema. |
| |
| Returns: |
| bool: True if initialization successful |
| """ |
| try: |
| logger.info("Initializing SQLite fallback database...") |
| |
| |
| self.sqlite_db_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| |
| await init_database_engines() |
| |
| |
| async for session in get_fallback_db(): |
| from core.database import Base |
| from core.fallback_database import fallback_engine |
| |
| if fallback_engine: |
| async with fallback_engine.begin() as conn: |
| await conn.run_sync(Base.metadata.create_all) |
| |
| |
| await session.execute(text("PRAGMA journal_mode=WAL")) |
| await session.execute(text("PRAGMA synchronous=NORMAL")) |
| await session.execute(text("PRAGMA cache_size=10000")) |
| await session.execute(text("PRAGMA temp_store=memory")) |
| |
| await session.commit() |
| |
| logger.info("SQLite fallback database initialized successfully") |
| self.initialized = True |
| return True |
| break |
| |
| logger.error("Failed to initialize SQLite fallback database") |
| return False |
| |
| except Exception as e: |
| logger.error(f"SQLite fallback initialization failed: {e}") |
| return False |
| |
| async def migrate_from_postgresql(self) -> bool: |
| """ |
| Migrate all data from PostgreSQL to SQLite. |
| |
| Returns: |
| bool: True if migration successful |
| """ |
| try: |
| logger.info("Starting migration from PostgreSQL to SQLite...") |
| |
| |
| if not await check_primary_health(): |
| logger.error("Primary PostgreSQL database is not available") |
| return False |
| |
| if not await check_fallback_health(): |
| logger.error("SQLite fallback database is not available") |
| return False |
| |
| |
| async for pg_session in get_primary_db_only(): |
| async for sqlite_session in get_fallback_db(): |
| success = await migrate_postgres_to_sqlite(pg_session, sqlite_session) |
| |
| if success: |
| logger.info("PostgreSQL to SQLite migration completed successfully") |
| return True |
| else: |
| logger.error("PostgreSQL to SQLite migration failed") |
| return False |
| break |
| |
| except Exception as e: |
| logger.error(f"Migration from PostgreSQL failed: {e}") |
| return False |
| |
| async def sync_to_postgresql(self) -> bool: |
| """ |
| Sync data from SQLite back to PostgreSQL. |
| |
| Returns: |
| bool: True if sync successful |
| """ |
| try: |
| logger.info("Starting sync from SQLite to PostgreSQL...") |
| |
| |
| if not await check_fallback_health(): |
| logger.error("SQLite fallback database is not available") |
| return False |
| |
| if not await check_primary_health(): |
| logger.error("Primary PostgreSQL database is not available") |
| return False |
| |
| |
| async for sqlite_session in get_fallback_db(): |
| async for pg_session in get_primary_db_only(): |
| success = await sync_sqlite_to_postgresql(sqlite_session, pg_session) |
| |
| if success: |
| logger.info("SQLite to PostgreSQL sync completed successfully") |
| return True |
| else: |
| logger.error("SQLite to PostgreSQL sync failed") |
| return False |
| break |
| |
| except Exception as e: |
| logger.error(f"Sync to PostgreSQL failed: {e}") |
| return False |
| |
| async def get_fallback_status(self) -> dict: |
| """ |
| Get status of fallback database. |
| |
| Returns: |
| dict: Fallback database status |
| """ |
| try: |
| primary_healthy = await check_primary_health() |
| fallback_healthy = await check_fallback_health() |
| |
| |
| db_size = 0 |
| db_modified = None |
| if self.sqlite_db_path.exists(): |
| db_size = self.sqlite_db_path.stat().st_size |
| db_modified = self.sqlite_db_path.stat().st_mtime |
| |
| return { |
| "primary_database_healthy": primary_healthy, |
| "fallback_database_healthy": fallback_healthy, |
| "fallback_enabled": settings.ENABLE_SQLITE_FALLBACK, |
| "fallback_database_path": str(self.sqlite_db_path), |
| "fallback_database_exists": self.sqlite_db_path.exists(), |
| "fallback_database_size_bytes": db_size, |
| "fallback_database_modified": db_modified, |
| "initialized": self.initialized |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to get fallback status: {e}") |
| return { |
| "error": str(e), |
| "primary_database_healthy": False, |
| "fallback_database_healthy": False, |
| "fallback_enabled": settings.ENABLE_SQLITE_FALLBACK |
| } |
| |
| async def cleanup_fallback_database(self) -> bool: |
| """ |
| Clean up fallback database (remove file). |
| |
| Returns: |
| bool: True if cleanup successful |
| """ |
| try: |
| if self.sqlite_db_path.exists(): |
| self.sqlite_db_path.unlink() |
| logger.info("SQLite fallback database file removed") |
| |
| self.initialized = False |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to cleanup fallback database: {e}") |
| return False |
| |
| async def backup_fallback_database(self, backup_path: Optional[str] = None) -> bool: |
| """ |
| Create backup of SQLite fallback database. |
| |
| Args: |
| backup_path: Optional backup file path |
| |
| Returns: |
| bool: True if backup successful |
| """ |
| try: |
| if not self.sqlite_db_path.exists(): |
| logger.error("SQLite fallback database does not exist") |
| return False |
| |
| if backup_path is None: |
| import datetime |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| backup_path = f"{self.sqlite_db_path.stem}_backup_{timestamp}.db" |
| |
| backup_path = Path(backup_path) |
| |
| |
| import shutil |
| shutil.copy2(self.sqlite_db_path, backup_path) |
| |
| logger.info(f"SQLite fallback database backed up to: {backup_path}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to backup fallback database: {e}") |
| return False |
|
|
|
|
| async def setup_sqlite_fallback() -> bool: |
| """ |
| Setup SQLite fallback database system. |
| |
| Returns: |
| bool: True if setup successful |
| """ |
| try: |
| if not settings.ENABLE_SQLITE_FALLBACK: |
| logger.info("SQLite fallback is disabled in configuration") |
| return True |
| |
| manager = SQLiteFallbackManager() |
| |
| |
| if not await manager.initialize_fallback_database(): |
| logger.error("Failed to initialize SQLite fallback database") |
| return False |
| |
| |
| if await check_primary_health(): |
| logger.info("Primary database is available, checking if migration is needed...") |
| |
| |
| async for session in get_fallback_db(): |
| result = await session.execute(text("SELECT name FROM sqlite_master WHERE type='table'")) |
| tables = result.fetchall() |
| |
| |
| data_tables = [t[0] for t in tables if t[0] not in ('sqlite_sequence',)] |
| if not data_tables: |
| logger.info("SQLite fallback database is empty, migrating from PostgreSQL...") |
| await manager.migrate_from_postgresql() |
| break |
| |
| logger.info("SQLite fallback database setup completed successfully") |
| return True |
| |
| except Exception as e: |
| logger.error(f"SQLite fallback setup failed: {e}") |
| return False |
|
|
|
|
| async def test_fallback_mechanism() -> dict: |
| """ |
| Test the fallback database mechanism. |
| |
| Returns: |
| dict: Test results |
| """ |
| try: |
| logger.info("Testing SQLite fallback mechanism...") |
| |
| results = { |
| "primary_health": await check_primary_health(), |
| "fallback_health": await check_fallback_health(), |
| "fallback_enabled": settings.ENABLE_SQLITE_FALLBACK, |
| "tests": {} |
| } |
| |
| |
| if results["primary_health"]: |
| try: |
| async for session in get_primary_db_only(): |
| await session.execute(text("SELECT 1")) |
| results["tests"]["primary_connection"] = "PASS" |
| break |
| except Exception as e: |
| results["tests"]["primary_connection"] = f"FAIL: {e}" |
| else: |
| results["tests"]["primary_connection"] = "SKIP (Primary unavailable)" |
| |
| |
| if results["fallback_health"]: |
| try: |
| async for session in get_fallback_db(): |
| await session.execute(text("SELECT 1")) |
| results["tests"]["fallback_connection"] = "PASS" |
| break |
| except Exception as e: |
| results["tests"]["fallback_connection"] = f"FAIL: {e}" |
| else: |
| results["tests"]["fallback_connection"] = "SKIP (Fallback unavailable)" |
| |
| |
| try: |
| from core.fallback_database import get_active_session |
| session, db_type = await get_active_session() |
| await session.close() |
| results["tests"]["automatic_fallback"] = f"PASS (Using {db_type})" |
| except Exception as e: |
| results["tests"]["automatic_fallback"] = f"FAIL: {e}" |
| |
| |
| if results["primary_health"] and results["fallback_health"]: |
| try: |
| manager = SQLiteFallbackManager() |
| migration_success = await manager.migrate_from_postgresql() |
| results["tests"]["data_migration"] = "PASS" if migration_success else "FAIL" |
| except Exception as e: |
| results["tests"]["data_migration"] = f"FAIL: {e}" |
| else: |
| results["tests"]["data_migration"] = "SKIP (Both databases not available)" |
| |
| logger.info("SQLite fallback mechanism test completed") |
| return results |
| |
| except Exception as e: |
| logger.error(f"Fallback mechanism test failed: {e}") |
| return {"error": str(e)} |
|
|
|
|
| if __name__ == "__main__": |
| """Run SQLite fallback setup and tests.""" |
| import sys |
| |
| async def main(): |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| |
| if len(sys.argv) > 1: |
| command = sys.argv[1] |
| |
| if command == "setup": |
| success = await setup_sqlite_fallback() |
| print(f"Setup {'completed successfully' if success else 'failed'}") |
| |
| elif command == "test": |
| results = await test_fallback_mechanism() |
| print("Test Results:") |
| for key, value in results.items(): |
| print(f" {key}: {value}") |
| |
| elif command == "status": |
| manager = SQLiteFallbackManager() |
| status = await manager.get_fallback_status() |
| print("Fallback Status:") |
| for key, value in status.items(): |
| print(f" {key}: {value}") |
| |
| else: |
| print("Usage: python sqlite_fallback_manager.py [setup|test|status]") |
| else: |
| print("Usage: python sqlite_fallback_manager.py [setup|test|status]") |
| |
| asyncio.run(main()) |
|
|