| """ |
| SQLite compatibility utilities for AegisLM SaaS Backend. |
| |
| Provides compatibility layer for database models to work seamlessly |
| with both PostgreSQL and SQLite databases. |
| """ |
|
|
| import logging |
| from typing import Any, Dict |
| from sqlalchemy import text |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy.sql import func |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SQLiteCompatibilityMixin: |
| """ |
| Mixin class to provide SQLite compatibility for database models. |
| |
| Handles differences between PostgreSQL and SQLite such as: |
| - JSON field types |
| - DateTime timezone handling |
| - Boolean field handling |
| - Auto-incrementing IDs |
| """ |
| |
| @classmethod |
| def get_sqlite_compatible_table_args(cls): |
| """Get table arguments compatible with SQLite.""" |
| from sqlalchemy import Index |
| |
| |
| table_args = [] |
| |
| |
| if hasattr(cls, '__table__'): |
| for column in cls.__table__.columns: |
| if column.index and not column.primary_key: |
| table_args.append(Index(f"idx_{cls.__tablename__}_{column.name}", column.name)) |
| |
| return tuple(table_args) if table_args else None |
|
|
|
|
| def get_database_type(session: AsyncSession) -> str: |
| """ |
| Determine the current database type. |
| |
| Args: |
| session: Database session |
| |
| Returns: |
| str: "postgresql" or "sqlite" |
| """ |
| try: |
| result = session.execute(text("SELECT sqlite_version()")) |
| return "sqlite" |
| except Exception: |
| try: |
| result = session.execute(text("SELECT version()")) |
| return "postgresql" |
| except Exception: |
| return "unknown" |
|
|
|
|
| def is_sqlite_session(session: AsyncSession) -> bool: |
| """ |
| Check if the current session is using SQLite. |
| |
| Args: |
| session: Database session |
| |
| Returns: |
| bool: True if using SQLite |
| """ |
| return get_database_type(session) == "sqlite" |
|
|
|
|
| def is_postgresql_session(session: AsyncSession) -> bool: |
| """ |
| Check if the current session is using PostgreSQL. |
| |
| Args: |
| session: Database session |
| |
| Returns: |
| bool: True if using PostgreSQL |
| """ |
| return get_database_type(session) == "postgresql" |
|
|
|
|
| def get_compatible_now(session: AsyncSession): |
| """ |
| Get database-compatible current timestamp function. |
| |
| Args: |
| session: Database session |
| |
| Returns: |
| SQL function for current timestamp |
| """ |
| if is_sqlite_session(session): |
| return func.datetime('now', 'localtime') |
| else: |
| return func.now() |
|
|
|
|
| def get_json_field_compatible_value(value: Any, session: AsyncSession) -> Any: |
| """ |
| Convert JSON field value to be compatible with current database. |
| |
| Args: |
| value: JSON value to convert |
| session: Database session |
| |
| Returns: |
| Any: Database-compatible JSON value |
| """ |
| if is_sqlite_session(session): |
| |
| import json |
| return json.dumps(value) if value is not None else None |
| else: |
| |
| return value |
|
|
|
|
| def parse_json_field_value(value: Any, session: AsyncSession) -> Any: |
| """ |
| Parse JSON field value from database. |
| |
| Args: |
| value: JSON value from database |
| session: Database session |
| |
| Returns: |
| Any: Parsed JSON value |
| """ |
| if is_sqlite_session(session): |
| |
| import json |
| if isinstance(value, str): |
| try: |
| return json.loads(value) |
| except json.JSONDecodeError: |
| return value |
| return value |
| else: |
| |
| return value |
|
|
|
|
| async def migrate_postgres_to_sqlite(pg_session: AsyncSession, sqlite_session: AsyncSession) -> bool: |
| """ |
| Migrate data from PostgreSQL to SQLite. |
| |
| Args: |
| pg_session: PostgreSQL session |
| sqlite_session: SQLite session |
| |
| Returns: |
| bool: True if migration successful |
| """ |
| try: |
| logger.info("Starting PostgreSQL to SQLite migration...") |
| |
| |
| from core.database import Base |
| tables = Base.metadata.tables.keys() |
| |
| for table_name in tables: |
| if table_name in ['alembic_version']: |
| continue |
| |
| logger.info(f"Migrating table: {table_name}") |
| |
| |
| pg_result = await pg_session.execute(text(f"SELECT * FROM {table_name}")) |
| rows = pg_result.fetchall() |
| |
| if not rows: |
| continue |
| |
| |
| columns = [desc[0] for desc in pg_result.cursor.description] |
| |
| |
| for row in rows: |
| |
| row_dict = dict(zip(columns, row)) |
| |
| |
| for key, value in row_dict.items(): |
| if isinstance(value, dict) or isinstance(value, list): |
| row_dict[key] = get_json_field_compatible_value(value, sqlite_session) |
| |
| |
| placeholders = ', '.join([':' + col for col in columns]) |
| insert_query = text(f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})") |
| |
| await sqlite_session.execute(insert_query, row_dict) |
| |
| await sqlite_session.commit() |
| |
| logger.info("PostgreSQL to SQLite migration completed successfully") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Migration failed: {e}") |
| await sqlite_session.rollback() |
| return False |
|
|
|
|
| async def sync_sqlite_to_postgresql(sqlite_session: AsyncSession, pg_session: AsyncSession) -> bool: |
| """ |
| Sync data from SQLite back to PostgreSQL. |
| |
| Args: |
| sqlite_session: SQLite session |
| pg_session: PostgreSQL session |
| |
| Returns: |
| bool: True if sync successful |
| """ |
| try: |
| logger.info("Starting SQLite to PostgreSQL sync...") |
| |
| |
| from core.database import Base |
| tables = Base.metadata.tables.keys() |
| |
| for table_name in tables: |
| if table_name in ['alembic_version']: |
| continue |
| |
| logger.info(f"Syncing table: {table_name}") |
| |
| |
| sqlite_result = await sqlite_session.execute(text(f"SELECT * FROM {table_name}")) |
| rows = sqlite_result.fetchall() |
| |
| if not rows: |
| continue |
| |
| |
| columns = [desc[0] for desc in sqlite_result.cursor.description] |
| |
| |
| for row in rows: |
| |
| row_dict = dict(zip(columns, row)) |
| |
| |
| for key, value in row_dict.items(): |
| if isinstance(value, str): |
| try: |
| import json |
| parsed = json.loads(value) |
| row_dict[key] = parsed |
| except json.JSONDecodeError: |
| pass |
| |
| |
| placeholders = ', '.join([':' + col for col in columns]) |
| insert_query = text(f""" |
| INSERT INTO {table_name} ({', '.join(columns)}) |
| VALUES ({placeholders}) |
| ON CONFLICT (id) DO UPDATE SET |
| {', '.join([f"{col} = EXCLUDED.{col}" for col in columns if col != 'id'])} |
| """) |
| |
| await pg_session.execute(insert_query, row_dict) |
| |
| await pg_session.commit() |
| |
| logger.info("SQLite to PostgreSQL sync completed successfully") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Sync failed: {e}") |
| await pg_session.rollback() |
| return False |
|
|
|
|
| class DatabaseTypeDetector: |
| """Utility class for detecting and managing database types.""" |
| |
| def __init__(self, session: AsyncSession): |
| self.session = session |
| self._db_type = None |
| |
| @property |
| def is_sqlite(self) -> bool: |
| """Check if using SQLite.""" |
| if self._db_type is None: |
| self._db_type = get_database_type(self.session) |
| return self._db_type == "sqlite" |
| |
| @property |
| def is_postgresql(self) -> bool: |
| """Check if using PostgreSQL.""" |
| if self._db_type is None: |
| self._db_type = get_database_type(self.session) |
| return self._db_type == "postgresql" |
| |
| @property |
| def db_type(self) -> str: |
| """Get database type.""" |
| if self._db_type is None: |
| self._db_type = get_database_type(self.session) |
| return self._db_type |
| |
| def get_now_function(self): |
| """Get appropriate now function for database.""" |
| return get_compatible_now(self.session) |
| |
| def process_json_value(self, value: Any) -> Any: |
| """Process JSON value for database compatibility.""" |
| if self.is_sqlite: |
| return get_json_field_compatible_value(value, self.session) |
| return value |
| |
| def parse_json_value(self, value: Any) -> Any: |
| """Parse JSON value from database.""" |
| if self.is_sqlite: |
| return parse_json_field_value(value, self.session) |
| return value |
|
|