| """ |
| Integrate SQLite Schema - Legacy + New Tables |
| |
| This script integrates the existing legacy AegisLM tables with our new schema |
| to create a complete, unified database structure. |
| """ |
|
|
| import asyncio |
| import sys |
| from pathlib import Path |
| import logging |
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession |
| from sqlalchemy.orm import sessionmaker |
| from sqlalchemy import text |
|
|
| |
| backend_path = Path(__file__).parent |
| sys.path.insert(0, str(backend_path)) |
|
|
| from core.config import settings |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| async def integrate_sqlite_schema(): |
| """Integrate legacy and new SQLite tables""" |
| |
| logger.info("🔧 Integrating SQLite Schema - Legacy + New Tables") |
| logger.info("🎯 Creating unified AegisLM database structure") |
| |
| try: |
| |
| sqlite_url = f"sqlite+aiosqlite:///{settings.SQLITE_DATABASE_PATH}" |
| |
| engine = create_async_engine( |
| sqlite_url, |
| echo=False, |
| pool_pre_ping=True, |
| ) |
| |
| AsyncSessionLocal = sessionmaker( |
| engine, class_=AsyncSession, expire_on_commit=False |
| ) |
| |
| async with AsyncSessionLocal() as session: |
| |
| await verify_complete_schema(session) |
| |
| |
| await create_performance_indexes(session) |
| |
| |
| await enhance_legacy_tables(session) |
| |
| |
| await validate_relationships(session) |
| |
| |
| await test_integrated_operations(session) |
| |
| await session.commit() |
| |
| await engine.dispose() |
| |
| logger.info("🎉 SQLite schema integration completed successfully!") |
| |
| except Exception as e: |
| logger.error(f"❌ Schema integration failed: {e}") |
| raise |
|
|
| async def verify_complete_schema(session: AsyncSession): |
| """Verify complete schema exists""" |
| |
| logger.info("📋 Verifying complete schema...") |
| |
| |
| result = await session.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;")) |
| all_tables = [row[0] for row in result.fetchall()] |
| |
| |
| expected_tables = { |
| |
| 'api_keys': 'API key management', |
| 'evaluations': 'AI evaluation results', |
| 'permissions': 'RBAC permissions', |
| 'role_permissions': 'Role-permission mapping', |
| 'roles': 'User roles', |
| 'user_roles': 'User-role assignments', |
| |
| |
| 'users': 'User accounts', |
| 'experiments': 'AI experiments', |
| 'datasets': 'Dataset management', |
| 'audit_trails': 'Audit logging', |
| 'subscriptions': 'User subscriptions', |
| 'invoices': 'Billing invoices', |
| 'payment_methods': 'Payment methods', |
| 'benchmarks': 'Performance benchmarks', |
| 'analytics': 'Usage analytics', |
| 'notifications': 'User notifications', |
| 'system_logs': 'System logging', |
| |
| |
| 'sqlite_sequence': 'SQLite system table' |
| } |
| |
| logger.info("📊 Complete Schema Status:") |
| for table_name in sorted(all_tables): |
| if table_name in expected_tables: |
| status = "✅" |
| description = expected_tables[table_name] |
| else: |
| status = "⚠️" |
| description = "Unknown table" |
| |
| |
| try: |
| result = await session.execute(text(f"SELECT COUNT(*) FROM {table_name}")) |
| count = result.scalar() |
| logger.info(f" {status} {table_name}: {count} records ({description})") |
| except Exception as e: |
| logger.info(f" {status} {table_name}: Error counting records ({description})") |
| |
| |
| missing_tables = set(expected_tables.keys()) - set(all_tables) |
| if missing_tables: |
| logger.warning(f"⚠️ Missing tables: {missing_tables}") |
| else: |
| logger.info("✅ All expected tables present") |
|
|
| async def create_performance_indexes(session: AsyncSession): |
| """Create performance indexes for all tables""" |
| |
| logger.info("🔧 Creating performance indexes...") |
| |
| indexes = [ |
| |
| "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);", |
| "CREATE INDEX IF NOT EXISTS idx_users_uuid ON users(uuid);", |
| "CREATE INDEX IF NOT EXISTS idx_users_active ON users(is_active);", |
| "CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);", |
| |
| |
| "CREATE INDEX IF NOT EXISTS idx_experiments_run_id ON experiments(run_id);", |
| "CREATE INDEX IF NOT EXISTS idx_experiments_user_id ON experiments(created_by);", |
| "CREATE INDEX IF NOT EXISTS idx_experiments_status ON experiments(status);", |
| "CREATE INDEX IF NOT EXISTS idx_experiments_created_at ON experiments(created_at);", |
| |
| |
| "CREATE INDEX IF NOT EXISTS idx_datasets_type_version ON datasets(type, version);", |
| "CREATE INDEX IF NOT EXISTS idx_datasets_user_id ON datasets(created_by);", |
| "CREATE INDEX IF NOT EXISTS idx_datasets_public ON datasets(is_public);", |
| |
| |
| "CREATE INDEX IF NOT EXISTS idx_audit_experiments ON audit_trails(experiment_id);", |
| "CREATE INDEX IF NOT EXISTS idx_audit_user_id ON audit_trails(user_id);", |
| "CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_trails(timestamp);", |
| |
| |
| "CREATE INDEX IF NOT EXISTS idx_subscriptions_user_id ON subscriptions(user_id);", |
| "CREATE INDEX IF NOT EXISTS idx_subscriptions_status ON subscriptions(status);", |
| |
| |
| "CREATE INDEX IF NOT EXISTS idx_analytics_user_id ON analytics(user_id);", |
| "CREATE INDEX IF NOT EXISTS idx_analytics_event_type ON analytics(event_type);", |
| "CREATE INDEX IF NOT EXISTS idx_analytics_timestamp ON analytics(timestamp);", |
| |
| |
| "CREATE INDEX IF NOT EXISTS idx_api_keys_key_hash ON api_keys(key_hash);", |
| "CREATE INDEX IF NOT EXISTS idx_api_keys_user_id ON api_keys(user_id);", |
| "CREATE INDEX IF NOT EXISTS idx_evaluations_experiment_id ON evaluations(experiment_id);", |
| "CREATE INDEX IF NOT EXISTS idx_evaluations_status ON evaluations(status);", |
| "CREATE INDEX IF NOT EXISTS idx_permissions_resource ON permissions(resource);", |
| "CREATE INDEX IF NOT EXISTS idx_role_permissions_role_id ON role_permissions(role_id);", |
| "CREATE INDEX IF NOT EXISTS idx_role_permissions_permission_id ON role_permissions(permission_id);", |
| "CREATE INDEX IF NOT EXISTS idx_roles_name ON roles(name);", |
| "CREATE INDEX IF NOT EXISTS idx_user_roles_user_id ON user_roles(user_id);", |
| "CREATE INDEX IF NOT EXISTS idx_user_roles_role_id ON user_roles(role_id);", |
| ] |
| |
| for index_sql in indexes: |
| try: |
| await session.execute(text(index_sql)) |
| logger.info(f"✅ Index created: {index_sql.split('idx_')[1].split(' ')[0]}") |
| except Exception as e: |
| logger.warning(f"⚠️ Index creation warning: {e}") |
|
|
| async def enhance_legacy_tables(session: AsyncSession): |
| """Enhance legacy tables with missing columns""" |
| |
| logger.info("🔧 Enhancing legacy tables...") |
| |
| |
| try: |
| result = await session.execute(text("PRAGMA table_info(users);")) |
| columns = {row[1]: row[2] for row in result.fetchall()} |
| |
| |
| enhancements = { |
| 'stripe_customer_id': 'TEXT', |
| 'phone': 'TEXT', |
| 'avatar_url': 'TEXT', |
| 'preferences': 'TEXT DEFAULT "{}"', |
| 'metadata': 'TEXT DEFAULT "{}"', |
| 'subscription_id': 'INTEGER', |
| 'is_premium': 'BOOLEAN DEFAULT 0' |
| } |
| |
| for column_name, column_type in enhancements.items(): |
| if column_name not in columns: |
| try: |
| await session.execute(text(f"ALTER TABLE users ADD COLUMN {column_name} {column_type};")) |
| logger.info(f"✅ Added column to users: {column_name}") |
| except Exception as e: |
| logger.warning(f"⚠️ Could not add column {column_name}: {e}") |
| else: |
| logger.info(f"✅ Column already exists: {column_name}") |
| |
| await session.commit() |
| |
| except Exception as e: |
| logger.error(f"❌ Failed to enhance users table: {e}") |
|
|
| async def validate_relationships(session: AsyncSession): |
| """Validate foreign key relationships""" |
| |
| logger.info("🔍 Validating foreign key relationships...") |
| |
| |
| try: |
| result = await session.execute(text("PRAGMA foreign_key_list(users);")) |
| fk_list = result.fetchall() |
| |
| if fk_list: |
| logger.info("✅ Foreign keys are enabled") |
| for fk in fk_list: |
| logger.info(f" FK: {fk[2]} -> {fk[3]}({fk[4]})") |
| else: |
| logger.info("ℹ️ No foreign keys found (normal for SQLite)") |
| |
| |
| await session.execute(text("PRAGMA foreign_keys = ON;")) |
| logger.info("✅ Foreign keys enabled") |
| |
| except Exception as e: |
| logger.warning(f"⚠️ Foreign key validation warning: {e}") |
|
|
| async def test_integrated_operations(session: AsyncSession): |
| """Test integrated database operations""" |
| |
| logger.info("🧪 Testing integrated operations...") |
| |
| try: |
| |
| from core.security import get_password_hash |
| |
| |
| test_user_data = { |
| "email": "integrated_test@example.com", |
| "password_hash": get_password_hash("test123456"), |
| "full_name": "Integrated Test User", |
| "company": "Test Company", |
| "is_premium": 1, |
| "metadata": '{"test": "integration"}' |
| } |
| |
| await session.execute(text(""" |
| INSERT INTO users (email, password_hash, full_name, company, is_active, is_verified, is_premium, metadata) |
| VALUES (:email, :password_hash, :full_name, :company, 1, 1, :is_premium, :metadata) |
| ON CONFLICT (email) DO NOTHING |
| """), test_user_data) |
| |
| await session.commit() |
| |
| |
| result = await session.execute(text("SELECT id FROM users WHERE email = :email"), |
| {"email": "integrated_test@example.com"}) |
| user_row = result.fetchone() |
| |
| if user_row: |
| user_id = user_row[0] |
| logger.info(f"✅ Integrated user created: {user_id}") |
| |
| |
| try: |
| result = await session.execute(text("SELECT id FROM roles WHERE name = 'user' LIMIT 1")) |
| role_row = result.fetchone() |
| |
| if role_row: |
| role_id = role_row[0] |
| |
| |
| await session.execute(text(""" |
| INSERT OR IGNORE INTO user_roles (user_id, role_id) |
| VALUES (:user_id, :role_id) |
| """), {"user_id": user_id, "role_id": role_id}) |
| |
| await session.commit() |
| logger.info(f"✅ Role assigned to user: role_id={role_id}") |
| else: |
| logger.info("ℹ️ No 'user' role found in roles table") |
| |
| except Exception as e: |
| logger.warning(f"⚠️ Role assignment test failed: {e}") |
| |
| |
| await session.execute(text(""" |
| INSERT INTO experiments (run_id, experiment_name, description, config_snapshot, created_by) |
| VALUES ('integrated-test-run-456', 'Integrated Test Experiment', 'Integration test', '{"integrated": true}', :user_id) |
| ON CONFLICT (run_id) DO NOTHING |
| """), {"user_id": user_id if user_row else 1}) |
| |
| await session.commit() |
| logger.info("✅ Integrated experiment created") |
| |
| |
| try: |
| await session.execute(text(""" |
| INSERT INTO evaluations (experiment_id, model_name, dataset_name, metrics, status) |
| VALUES (:experiment_id, 'test-model', 'test-dataset', '{"accuracy": 0.95}', 'completed') |
| """), {"experiment_id": "integrated-test-run-456"}) |
| |
| await session.commit() |
| logger.info("✅ Integrated evaluation created") |
| |
| except Exception as e: |
| logger.warning(f"⚠️ Evaluation creation test failed: {e}") |
| |
| logger.info("✅ Integrated operations test completed") |
| |
| except Exception as e: |
| logger.error(f"❌ Integrated operations test failed: {e}") |
| await session.rollback() |
|
|
| async def generate_schema_report(): |
| """Generate comprehensive schema report""" |
| |
| logger.info("📊 Generating comprehensive schema report...") |
| |
| sqlite_url = f"sqlite+aiosqlite:///{settings.SQLITE_DATABASE_PATH}" |
| |
| engine = create_async_engine( |
| sqlite_url, |
| echo=False, |
| pool_pre_ping=True, |
| ) |
| |
| AsyncSessionLocal = sessionmaker( |
| engine, class_=AsyncSession, expire_on_commit=False |
| ) |
| |
| async with AsyncSessionLocal() as session: |
| |
| result = await session.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;")) |
| tables = [row[0] for row in result.fetchall()] |
| |
| report = [] |
| report.append("# 🗄️ AEGISLM SQLITE DATABASE SCHEMA REPORT") |
| report.append(f"Generated: {asyncio.get_event_loop().time()}") |
| report.append("") |
| report.append("## 📊 TABLE SUMMARY") |
| report.append("") |
| |
| total_records = 0 |
| |
| for table_name in sorted(tables): |
| if table_name == 'sqlite_sequence': |
| continue |
| |
| try: |
| result = await session.execute(text(f"SELECT COUNT(*) FROM {table_name}")) |
| count = result.scalar() |
| total_records += count |
| |
| |
| result = await session.execute(text(f"PRAGMA table_info({table_name});")) |
| columns = result.fetchall() |
| |
| report.append(f"### 📋 {table_name}") |
| report.append(f"- **Records**: {count}") |
| report.append(f"- **Columns**: {len(columns)}") |
| report.append("- **Schema**:") |
| |
| for col in columns: |
| nullable = "NULL" if col[3] == 0 else "NOT NULL" |
| pk = "PRIMARY KEY" if col[5] == 1 else "" |
| report.append(f" - `{col[1]}` {col[2]} {nullable} {pk}".strip()) |
| |
| report.append("") |
| |
| except Exception as e: |
| report.append(f"### 📋 {table_name}") |
| report.append(f"- **Error**: {e}") |
| report.append("") |
| |
| report.append("## 📈 SUMMARY") |
| report.append(f"- **Total Tables**: {len(tables) - 1}") |
| report.append(f"- **Total Records**: {total_records}") |
| report.append("- **Database Engine**: SQLite") |
| report.append("- **Status**: ✅ Integrated and Functional") |
| report.append("") |
| |
| report_content = "\n".join(report) |
| |
| |
| with open("SQLITE_SCHEMA_REPORT.md", "w", encoding="utf-8") as f: |
| f.write(report_content) |
| |
| logger.info("✅ Schema report saved to SQLITE_SCHEMA_REPORT.md") |
| |
| await engine.dispose() |
|
|
| async def main(): |
| """Main integration function""" |
| try: |
| await integrate_sqlite_schema() |
| await generate_schema_report() |
| |
| logger.info("🎉 SQLite schema integration completed successfully!") |
| logger.info("📋 Legacy and new tables are now unified") |
| logger.info("📊 Comprehensive report saved to SQLITE_SCHEMA_REPORT.md") |
| |
| except Exception as e: |
| logger.error(f"❌ Schema integration failed: {e}") |
| sys.exit(1) |
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|