ALM-2 / backend /tests /integrate_sqlite_schema.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Integrate SQLite Schema - Legacy + New Tables
This script integrates the existing legacy AegisLM tables with our new schema
to create a complete, unified database structure.
"""
import asyncio
import sys
from pathlib import Path
import logging
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
# Add backend to path
backend_path = Path(__file__).parent
sys.path.insert(0, str(backend_path))
from core.config import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def integrate_sqlite_schema():
"""Integrate legacy and new SQLite tables"""
logger.info("🔧 Integrating SQLite Schema - Legacy + New Tables")
logger.info("🎯 Creating unified AegisLM database structure")
try:
# Create SQLite engine
sqlite_url = f"sqlite+aiosqlite:///{settings.SQLITE_DATABASE_PATH}"
engine = create_async_engine(
sqlite_url,
echo=False,
pool_pre_ping=True,
)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with AsyncSessionLocal() as session:
# Step 1: Verify all tables exist
await verify_complete_schema(session)
# Step 2: Create missing indexes for performance
await create_performance_indexes(session)
# Step 3: Add missing columns if needed
await enhance_legacy_tables(session)
# Step 4: Validate foreign key relationships
await validate_relationships(session)
# Step 5: Test integrated operations
await test_integrated_operations(session)
await session.commit()
await engine.dispose()
logger.info("🎉 SQLite schema integration completed successfully!")
except Exception as e:
logger.error(f"❌ Schema integration failed: {e}")
raise
async def verify_complete_schema(session: AsyncSession):
"""Verify complete schema exists"""
logger.info("📋 Verifying complete schema...")
# Get all tables
result = await session.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;"))
all_tables = [row[0] for row in result.fetchall()]
# Expected complete schema
expected_tables = {
# Legacy tables (keep these)
'api_keys': 'API key management',
'evaluations': 'AI evaluation results',
'permissions': 'RBAC permissions',
'role_permissions': 'Role-permission mapping',
'roles': 'User roles',
'user_roles': 'User-role assignments',
# New AegisLM tables (already created)
'users': 'User accounts',
'experiments': 'AI experiments',
'datasets': 'Dataset management',
'audit_trails': 'Audit logging',
'subscriptions': 'User subscriptions',
'invoices': 'Billing invoices',
'payment_methods': 'Payment methods',
'benchmarks': 'Performance benchmarks',
'analytics': 'Usage analytics',
'notifications': 'User notifications',
'system_logs': 'System logging',
# System tables
'sqlite_sequence': 'SQLite system table'
}
logger.info("📊 Complete Schema Status:")
for table_name in sorted(all_tables):
if table_name in expected_tables:
status = "✅"
description = expected_tables[table_name]
else:
status = "⚠️"
description = "Unknown table"
# Get record count
try:
result = await session.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
count = result.scalar()
logger.info(f" {status} {table_name}: {count} records ({description})")
except Exception as e:
logger.info(f" {status} {table_name}: Error counting records ({description})")
# Check for missing tables
missing_tables = set(expected_tables.keys()) - set(all_tables)
if missing_tables:
logger.warning(f"⚠️ Missing tables: {missing_tables}")
else:
logger.info("✅ All expected tables present")
async def create_performance_indexes(session: AsyncSession):
"""Create performance indexes for all tables"""
logger.info("🔧 Creating performance indexes...")
indexes = [
# Users table indexes
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);",
"CREATE INDEX IF NOT EXISTS idx_users_uuid ON users(uuid);",
"CREATE INDEX IF NOT EXISTS idx_users_active ON users(is_active);",
"CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);",
# Experiments table indexes
"CREATE INDEX IF NOT EXISTS idx_experiments_run_id ON experiments(run_id);",
"CREATE INDEX IF NOT EXISTS idx_experiments_user_id ON experiments(created_by);",
"CREATE INDEX IF NOT EXISTS idx_experiments_status ON experiments(status);",
"CREATE INDEX IF NOT EXISTS idx_experiments_created_at ON experiments(created_at);",
# Datasets table indexes
"CREATE INDEX IF NOT EXISTS idx_datasets_type_version ON datasets(type, version);",
"CREATE INDEX IF NOT EXISTS idx_datasets_user_id ON datasets(created_by);",
"CREATE INDEX IF NOT EXISTS idx_datasets_public ON datasets(is_public);",
# Audit trails table indexes
"CREATE INDEX IF NOT EXISTS idx_audit_experiments ON audit_trails(experiment_id);",
"CREATE INDEX IF NOT EXISTS idx_audit_user_id ON audit_trails(user_id);",
"CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_trails(timestamp);",
# Subscriptions table indexes
"CREATE INDEX IF NOT EXISTS idx_subscriptions_user_id ON subscriptions(user_id);",
"CREATE INDEX IF NOT EXISTS idx_subscriptions_status ON subscriptions(status);",
# Analytics table indexes
"CREATE INDEX IF NOT EXISTS idx_analytics_user_id ON analytics(user_id);",
"CREATE INDEX IF NOT EXISTS idx_analytics_event_type ON analytics(event_type);",
"CREATE INDEX IF NOT EXISTS idx_analytics_timestamp ON analytics(timestamp);",
# Legacy tables indexes
"CREATE INDEX IF NOT EXISTS idx_api_keys_key_hash ON api_keys(key_hash);",
"CREATE INDEX IF NOT EXISTS idx_api_keys_user_id ON api_keys(user_id);",
"CREATE INDEX IF NOT EXISTS idx_evaluations_experiment_id ON evaluations(experiment_id);",
"CREATE INDEX IF NOT EXISTS idx_evaluations_status ON evaluations(status);",
"CREATE INDEX IF NOT EXISTS idx_permissions_resource ON permissions(resource);",
"CREATE INDEX IF NOT EXISTS idx_role_permissions_role_id ON role_permissions(role_id);",
"CREATE INDEX IF NOT EXISTS idx_role_permissions_permission_id ON role_permissions(permission_id);",
"CREATE INDEX IF NOT EXISTS idx_roles_name ON roles(name);",
"CREATE INDEX IF NOT EXISTS idx_user_roles_user_id ON user_roles(user_id);",
"CREATE INDEX IF NOT EXISTS idx_user_roles_role_id ON user_roles(role_id);",
]
for index_sql in indexes:
try:
await session.execute(text(index_sql))
logger.info(f"✅ Index created: {index_sql.split('idx_')[1].split(' ')[0]}")
except Exception as e:
logger.warning(f"⚠️ Index creation warning: {e}")
async def enhance_legacy_tables(session: AsyncSession):
"""Enhance legacy tables with missing columns"""
logger.info("🔧 Enhancing legacy tables...")
# Check if users table needs additional columns
try:
result = await session.execute(text("PRAGMA table_info(users);"))
columns = {row[1]: row[2] for row in result.fetchall()}
# Add missing columns to users table
enhancements = {
'stripe_customer_id': 'TEXT',
'phone': 'TEXT',
'avatar_url': 'TEXT',
'preferences': 'TEXT DEFAULT "{}"',
'metadata': 'TEXT DEFAULT "{}"',
'subscription_id': 'INTEGER',
'is_premium': 'BOOLEAN DEFAULT 0'
}
for column_name, column_type in enhancements.items():
if column_name not in columns:
try:
await session.execute(text(f"ALTER TABLE users ADD COLUMN {column_name} {column_type};"))
logger.info(f"✅ Added column to users: {column_name}")
except Exception as e:
logger.warning(f"⚠️ Could not add column {column_name}: {e}")
else:
logger.info(f"✅ Column already exists: {column_name}")
await session.commit()
except Exception as e:
logger.error(f"❌ Failed to enhance users table: {e}")
async def validate_relationships(session: AsyncSession):
"""Validate foreign key relationships"""
logger.info("🔍 Validating foreign key relationships...")
# Check if foreign keys are enabled
try:
result = await session.execute(text("PRAGMA foreign_key_list(users);"))
fk_list = result.fetchall()
if fk_list:
logger.info("✅ Foreign keys are enabled")
for fk in fk_list:
logger.info(f" FK: {fk[2]} -> {fk[3]}({fk[4]})")
else:
logger.info("ℹ️ No foreign keys found (normal for SQLite)")
# Enable foreign keys if not enabled
await session.execute(text("PRAGMA foreign_keys = ON;"))
logger.info("✅ Foreign keys enabled")
except Exception as e:
logger.warning(f"⚠️ Foreign key validation warning: {e}")
async def test_integrated_operations(session: AsyncSession):
"""Test integrated database operations"""
logger.info("🧪 Testing integrated operations...")
try:
# Test user creation with role assignment
from core.security import get_password_hash
# Create test user
test_user_data = {
"email": "integrated_test@example.com",
"password_hash": get_password_hash("test123456"),
"full_name": "Integrated Test User",
"company": "Test Company",
"is_premium": 1,
"metadata": '{"test": "integration"}'
}
await session.execute(text("""
INSERT INTO users (email, password_hash, full_name, company, is_active, is_verified, is_premium, metadata)
VALUES (:email, :password_hash, :full_name, :company, 1, 1, :is_premium, :metadata)
ON CONFLICT (email) DO NOTHING
"""), test_user_data)
await session.commit()
# Get user ID
result = await session.execute(text("SELECT id FROM users WHERE email = :email"),
{"email": "integrated_test@example.com"})
user_row = result.fetchone()
if user_row:
user_id = user_row[0]
logger.info(f"✅ Integrated user created: {user_id}")
# Test role assignment (if roles table exists and has data)
try:
result = await session.execute(text("SELECT id FROM roles WHERE name = 'user' LIMIT 1"))
role_row = result.fetchone()
if role_row:
role_id = role_row[0]
# Assign role to user
await session.execute(text("""
INSERT OR IGNORE INTO user_roles (user_id, role_id)
VALUES (:user_id, :role_id)
"""), {"user_id": user_id, "role_id": role_id})
await session.commit()
logger.info(f"✅ Role assigned to user: role_id={role_id}")
else:
logger.info("ℹ️ No 'user' role found in roles table")
except Exception as e:
logger.warning(f"⚠️ Role assignment test failed: {e}")
# Test experiment creation
await session.execute(text("""
INSERT INTO experiments (run_id, experiment_name, description, config_snapshot, created_by)
VALUES ('integrated-test-run-456', 'Integrated Test Experiment', 'Integration test', '{"integrated": true}', :user_id)
ON CONFLICT (run_id) DO NOTHING
"""), {"user_id": user_id if user_row else 1})
await session.commit()
logger.info("✅ Integrated experiment created")
# Test evaluation creation (if evaluations table exists)
try:
await session.execute(text("""
INSERT INTO evaluations (experiment_id, model_name, dataset_name, metrics, status)
VALUES (:experiment_id, 'test-model', 'test-dataset', '{"accuracy": 0.95}', 'completed')
"""), {"experiment_id": "integrated-test-run-456"})
await session.commit()
logger.info("✅ Integrated evaluation created")
except Exception as e:
logger.warning(f"⚠️ Evaluation creation test failed: {e}")
logger.info("✅ Integrated operations test completed")
except Exception as e:
logger.error(f"❌ Integrated operations test failed: {e}")
await session.rollback()
async def generate_schema_report():
"""Generate comprehensive schema report"""
logger.info("📊 Generating comprehensive schema report...")
sqlite_url = f"sqlite+aiosqlite:///{settings.SQLITE_DATABASE_PATH}"
engine = create_async_engine(
sqlite_url,
echo=False,
pool_pre_ping=True,
)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with AsyncSessionLocal() as session:
# Get all tables
result = await session.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;"))
tables = [row[0] for row in result.fetchall()]
report = []
report.append("# 🗄️ AEGISLM SQLITE DATABASE SCHEMA REPORT")
report.append(f"Generated: {asyncio.get_event_loop().time()}")
report.append("")
report.append("## 📊 TABLE SUMMARY")
report.append("")
total_records = 0
for table_name in sorted(tables):
if table_name == 'sqlite_sequence':
continue # Skip system table
try:
result = await session.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
count = result.scalar()
total_records += count
# Get table schema
result = await session.execute(text(f"PRAGMA table_info({table_name});"))
columns = result.fetchall()
report.append(f"### 📋 {table_name}")
report.append(f"- **Records**: {count}")
report.append(f"- **Columns**: {len(columns)}")
report.append("- **Schema**:")
for col in columns:
nullable = "NULL" if col[3] == 0 else "NOT NULL"
pk = "PRIMARY KEY" if col[5] == 1 else ""
report.append(f" - `{col[1]}` {col[2]} {nullable} {pk}".strip())
report.append("")
except Exception as e:
report.append(f"### 📋 {table_name}")
report.append(f"- **Error**: {e}")
report.append("")
report.append("## 📈 SUMMARY")
report.append(f"- **Total Tables**: {len(tables) - 1}") # Exclude sqlite_sequence
report.append(f"- **Total Records**: {total_records}")
report.append("- **Database Engine**: SQLite")
report.append("- **Status**: ✅ Integrated and Functional")
report.append("")
report_content = "\n".join(report)
# Save report
with open("SQLITE_SCHEMA_REPORT.md", "w", encoding="utf-8") as f:
f.write(report_content)
logger.info("✅ Schema report saved to SQLITE_SCHEMA_REPORT.md")
await engine.dispose()
async def main():
"""Main integration function"""
try:
await integrate_sqlite_schema()
await generate_schema_report()
logger.info("🎉 SQLite schema integration completed successfully!")
logger.info("📋 Legacy and new tables are now unified")
logger.info("📊 Comprehensive report saved to SQLITE_SCHEMA_REPORT.md")
except Exception as e:
logger.error(f"❌ Schema integration failed: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())