""" Integrate SQLite Schema - Legacy + New Tables This script integrates the existing legacy AegisLM tables with our new schema to create a complete, unified database structure. """ import asyncio import sys from pathlib import Path import logging from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy import text # Add backend to path backend_path = Path(__file__).parent sys.path.insert(0, str(backend_path)) from core.config import settings logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def integrate_sqlite_schema(): """Integrate legacy and new SQLite tables""" logger.info("๐Ÿ”ง Integrating SQLite Schema - Legacy + New Tables") logger.info("๐ŸŽฏ Creating unified AegisLM database structure") try: # Create SQLite engine sqlite_url = f"sqlite+aiosqlite:///{settings.SQLITE_DATABASE_PATH}" engine = create_async_engine( sqlite_url, echo=False, pool_pre_ping=True, ) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) async with AsyncSessionLocal() as session: # Step 1: Verify all tables exist await verify_complete_schema(session) # Step 2: Create missing indexes for performance await create_performance_indexes(session) # Step 3: Add missing columns if needed await enhance_legacy_tables(session) # Step 4: Validate foreign key relationships await validate_relationships(session) # Step 5: Test integrated operations await test_integrated_operations(session) await session.commit() await engine.dispose() logger.info("๐ŸŽ‰ SQLite schema integration completed successfully!") except Exception as e: logger.error(f"โŒ Schema integration failed: {e}") raise async def verify_complete_schema(session: AsyncSession): """Verify complete schema exists""" logger.info("๐Ÿ“‹ Verifying complete schema...") # Get all tables result = await session.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;")) all_tables = [row[0] for row in result.fetchall()] # Expected complete schema expected_tables = { # Legacy tables (keep these) 'api_keys': 'API key management', 'evaluations': 'AI evaluation results', 'permissions': 'RBAC permissions', 'role_permissions': 'Role-permission mapping', 'roles': 'User roles', 'user_roles': 'User-role assignments', # New AegisLM tables (already created) 'users': 'User accounts', 'experiments': 'AI experiments', 'datasets': 'Dataset management', 'audit_trails': 'Audit logging', 'subscriptions': 'User subscriptions', 'invoices': 'Billing invoices', 'payment_methods': 'Payment methods', 'benchmarks': 'Performance benchmarks', 'analytics': 'Usage analytics', 'notifications': 'User notifications', 'system_logs': 'System logging', # System tables 'sqlite_sequence': 'SQLite system table' } logger.info("๐Ÿ“Š Complete Schema Status:") for table_name in sorted(all_tables): if table_name in expected_tables: status = "โœ…" description = expected_tables[table_name] else: status = "โš ๏ธ" description = "Unknown table" # Get record count try: result = await session.execute(text(f"SELECT COUNT(*) FROM {table_name}")) count = result.scalar() logger.info(f" {status} {table_name}: {count} records ({description})") except Exception as e: logger.info(f" {status} {table_name}: Error counting records ({description})") # Check for missing tables missing_tables = set(expected_tables.keys()) - set(all_tables) if missing_tables: logger.warning(f"โš ๏ธ Missing tables: {missing_tables}") else: logger.info("โœ… All expected tables present") async def create_performance_indexes(session: AsyncSession): """Create performance indexes for all tables""" logger.info("๐Ÿ”ง Creating performance indexes...") indexes = [ # Users table indexes "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);", "CREATE INDEX IF NOT EXISTS idx_users_uuid ON users(uuid);", "CREATE INDEX IF NOT EXISTS idx_users_active ON users(is_active);", "CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);", # Experiments table indexes "CREATE INDEX IF NOT EXISTS idx_experiments_run_id ON experiments(run_id);", "CREATE INDEX IF NOT EXISTS idx_experiments_user_id ON experiments(created_by);", "CREATE INDEX IF NOT EXISTS idx_experiments_status ON experiments(status);", "CREATE INDEX IF NOT EXISTS idx_experiments_created_at ON experiments(created_at);", # Datasets table indexes "CREATE INDEX IF NOT EXISTS idx_datasets_type_version ON datasets(type, version);", "CREATE INDEX IF NOT EXISTS idx_datasets_user_id ON datasets(created_by);", "CREATE INDEX IF NOT EXISTS idx_datasets_public ON datasets(is_public);", # Audit trails table indexes "CREATE INDEX IF NOT EXISTS idx_audit_experiments ON audit_trails(experiment_id);", "CREATE INDEX IF NOT EXISTS idx_audit_user_id ON audit_trails(user_id);", "CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_trails(timestamp);", # Subscriptions table indexes "CREATE INDEX IF NOT EXISTS idx_subscriptions_user_id ON subscriptions(user_id);", "CREATE INDEX IF NOT EXISTS idx_subscriptions_status ON subscriptions(status);", # Analytics table indexes "CREATE INDEX IF NOT EXISTS idx_analytics_user_id ON analytics(user_id);", "CREATE INDEX IF NOT EXISTS idx_analytics_event_type ON analytics(event_type);", "CREATE INDEX IF NOT EXISTS idx_analytics_timestamp ON analytics(timestamp);", # Legacy tables indexes "CREATE INDEX IF NOT EXISTS idx_api_keys_key_hash ON api_keys(key_hash);", "CREATE INDEX IF NOT EXISTS idx_api_keys_user_id ON api_keys(user_id);", "CREATE INDEX IF NOT EXISTS idx_evaluations_experiment_id ON evaluations(experiment_id);", "CREATE INDEX IF NOT EXISTS idx_evaluations_status ON evaluations(status);", "CREATE INDEX IF NOT EXISTS idx_permissions_resource ON permissions(resource);", "CREATE INDEX IF NOT EXISTS idx_role_permissions_role_id ON role_permissions(role_id);", "CREATE INDEX IF NOT EXISTS idx_role_permissions_permission_id ON role_permissions(permission_id);", "CREATE INDEX IF NOT EXISTS idx_roles_name ON roles(name);", "CREATE INDEX IF NOT EXISTS idx_user_roles_user_id ON user_roles(user_id);", "CREATE INDEX IF NOT EXISTS idx_user_roles_role_id ON user_roles(role_id);", ] for index_sql in indexes: try: await session.execute(text(index_sql)) logger.info(f"โœ… Index created: {index_sql.split('idx_')[1].split(' ')[0]}") except Exception as e: logger.warning(f"โš ๏ธ Index creation warning: {e}") async def enhance_legacy_tables(session: AsyncSession): """Enhance legacy tables with missing columns""" logger.info("๐Ÿ”ง Enhancing legacy tables...") # Check if users table needs additional columns try: result = await session.execute(text("PRAGMA table_info(users);")) columns = {row[1]: row[2] for row in result.fetchall()} # Add missing columns to users table enhancements = { 'stripe_customer_id': 'TEXT', 'phone': 'TEXT', 'avatar_url': 'TEXT', 'preferences': 'TEXT DEFAULT "{}"', 'metadata': 'TEXT DEFAULT "{}"', 'subscription_id': 'INTEGER', 'is_premium': 'BOOLEAN DEFAULT 0' } for column_name, column_type in enhancements.items(): if column_name not in columns: try: await session.execute(text(f"ALTER TABLE users ADD COLUMN {column_name} {column_type};")) logger.info(f"โœ… Added column to users: {column_name}") except Exception as e: logger.warning(f"โš ๏ธ Could not add column {column_name}: {e}") else: logger.info(f"โœ… Column already exists: {column_name}") await session.commit() except Exception as e: logger.error(f"โŒ Failed to enhance users table: {e}") async def validate_relationships(session: AsyncSession): """Validate foreign key relationships""" logger.info("๐Ÿ” Validating foreign key relationships...") # Check if foreign keys are enabled try: result = await session.execute(text("PRAGMA foreign_key_list(users);")) fk_list = result.fetchall() if fk_list: logger.info("โœ… Foreign keys are enabled") for fk in fk_list: logger.info(f" FK: {fk[2]} -> {fk[3]}({fk[4]})") else: logger.info("โ„น๏ธ No foreign keys found (normal for SQLite)") # Enable foreign keys if not enabled await session.execute(text("PRAGMA foreign_keys = ON;")) logger.info("โœ… Foreign keys enabled") except Exception as e: logger.warning(f"โš ๏ธ Foreign key validation warning: {e}") async def test_integrated_operations(session: AsyncSession): """Test integrated database operations""" logger.info("๐Ÿงช Testing integrated operations...") try: # Test user creation with role assignment from core.security import get_password_hash # Create test user test_user_data = { "email": "integrated_test@example.com", "password_hash": get_password_hash("test123456"), "full_name": "Integrated Test User", "company": "Test Company", "is_premium": 1, "metadata": '{"test": "integration"}' } await session.execute(text(""" INSERT INTO users (email, password_hash, full_name, company, is_active, is_verified, is_premium, metadata) VALUES (:email, :password_hash, :full_name, :company, 1, 1, :is_premium, :metadata) ON CONFLICT (email) DO NOTHING """), test_user_data) await session.commit() # Get user ID result = await session.execute(text("SELECT id FROM users WHERE email = :email"), {"email": "integrated_test@example.com"}) user_row = result.fetchone() if user_row: user_id = user_row[0] logger.info(f"โœ… Integrated user created: {user_id}") # Test role assignment (if roles table exists and has data) try: result = await session.execute(text("SELECT id FROM roles WHERE name = 'user' LIMIT 1")) role_row = result.fetchone() if role_row: role_id = role_row[0] # Assign role to user await session.execute(text(""" INSERT OR IGNORE INTO user_roles (user_id, role_id) VALUES (:user_id, :role_id) """), {"user_id": user_id, "role_id": role_id}) await session.commit() logger.info(f"โœ… Role assigned to user: role_id={role_id}") else: logger.info("โ„น๏ธ No 'user' role found in roles table") except Exception as e: logger.warning(f"โš ๏ธ Role assignment test failed: {e}") # Test experiment creation await session.execute(text(""" INSERT INTO experiments (run_id, experiment_name, description, config_snapshot, created_by) VALUES ('integrated-test-run-456', 'Integrated Test Experiment', 'Integration test', '{"integrated": true}', :user_id) ON CONFLICT (run_id) DO NOTHING """), {"user_id": user_id if user_row else 1}) await session.commit() logger.info("โœ… Integrated experiment created") # Test evaluation creation (if evaluations table exists) try: await session.execute(text(""" INSERT INTO evaluations (experiment_id, model_name, dataset_name, metrics, status) VALUES (:experiment_id, 'test-model', 'test-dataset', '{"accuracy": 0.95}', 'completed') """), {"experiment_id": "integrated-test-run-456"}) await session.commit() logger.info("โœ… Integrated evaluation created") except Exception as e: logger.warning(f"โš ๏ธ Evaluation creation test failed: {e}") logger.info("โœ… Integrated operations test completed") except Exception as e: logger.error(f"โŒ Integrated operations test failed: {e}") await session.rollback() async def generate_schema_report(): """Generate comprehensive schema report""" logger.info("๐Ÿ“Š Generating comprehensive schema report...") sqlite_url = f"sqlite+aiosqlite:///{settings.SQLITE_DATABASE_PATH}" engine = create_async_engine( sqlite_url, echo=False, pool_pre_ping=True, ) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) async with AsyncSessionLocal() as session: # Get all tables result = await session.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;")) tables = [row[0] for row in result.fetchall()] report = [] report.append("# ๐Ÿ—„๏ธ AEGISLM SQLITE DATABASE SCHEMA REPORT") report.append(f"Generated: {asyncio.get_event_loop().time()}") report.append("") report.append("## ๐Ÿ“Š TABLE SUMMARY") report.append("") total_records = 0 for table_name in sorted(tables): if table_name == 'sqlite_sequence': continue # Skip system table try: result = await session.execute(text(f"SELECT COUNT(*) FROM {table_name}")) count = result.scalar() total_records += count # Get table schema result = await session.execute(text(f"PRAGMA table_info({table_name});")) columns = result.fetchall() report.append(f"### ๐Ÿ“‹ {table_name}") report.append(f"- **Records**: {count}") report.append(f"- **Columns**: {len(columns)}") report.append("- **Schema**:") for col in columns: nullable = "NULL" if col[3] == 0 else "NOT NULL" pk = "PRIMARY KEY" if col[5] == 1 else "" report.append(f" - `{col[1]}` {col[2]} {nullable} {pk}".strip()) report.append("") except Exception as e: report.append(f"### ๐Ÿ“‹ {table_name}") report.append(f"- **Error**: {e}") report.append("") report.append("## ๐Ÿ“ˆ SUMMARY") report.append(f"- **Total Tables**: {len(tables) - 1}") # Exclude sqlite_sequence report.append(f"- **Total Records**: {total_records}") report.append("- **Database Engine**: SQLite") report.append("- **Status**: โœ… Integrated and Functional") report.append("") report_content = "\n".join(report) # Save report with open("SQLITE_SCHEMA_REPORT.md", "w", encoding="utf-8") as f: f.write(report_content) logger.info("โœ… Schema report saved to SQLITE_SCHEMA_REPORT.md") await engine.dispose() async def main(): """Main integration function""" try: await integrate_sqlite_schema() await generate_schema_report() logger.info("๐ŸŽ‰ SQLite schema integration completed successfully!") logger.info("๐Ÿ“‹ Legacy and new tables are now unified") logger.info("๐Ÿ“Š Comprehensive report saved to SQLITE_SCHEMA_REPORT.md") except Exception as e: logger.error(f"โŒ Schema integration failed: {e}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())