""" utils/database.py - Database Schema Management (Phase 4A) ======================================================== Extends the evaluation_results database with quality scoring tables: - evaluation_scores: Multi-dimensional quality scores from LLM judge - error_analysis: Categorized failure patterns """ import sqlite3 from pathlib import Path from typing import Optional import json from datetime import datetime class EvaluationDatabase: """ Manages SQLite database schema for RAG evaluation results Phase 3 Tables: - evaluation_results: Basic evaluation metrics (accuracy, time, cost) Phase 4A Tables (NEW): - evaluation_scores: Quality scores from LLM judge - error_analysis: Error categorization and patterns """ def __init__(self, db_path: str = "data/evaluation_results.db"): """ Initialize database connection Args: db_path: Path to SQLite database file """ self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self.conn = sqlite3.connect(str(self.db_path)) self.conn.row_factory = sqlite3.Row # Access columns by name def create_phase4_tables(self): """ Create Phase 4A tables for quality evaluation These tables extend evaluation_results with judge scores and error analysis. """ cursor = self.conn.cursor() # =================================================================== # Table 1: evaluation_scores # =================================================================== cursor.execute(""" CREATE TABLE IF NOT EXISTS evaluation_scores ( id INTEGER PRIMARY KEY AUTOINCREMENT, evaluation_result_id INTEGER NOT NULL, -- Multi-dimensional scores (0-10) correctness_score REAL NOT NULL, relevance_score REAL NOT NULL, completeness_score REAL NOT NULL, clarity_score REAL NOT NULL, conciseness_score REAL NOT NULL, overall_score REAL NOT NULL, -- Judge metadata confidence REAL NOT NULL, explanation TEXT NOT NULL, issues TEXT NOT NULL, -- JSON array of issue types -- Evaluation metadata evaluator_model TEXT NOT NULL, evaluation_cost_usd REAL NOT NULL, evaluation_time_ms REAL NOT NULL, timestamp TEXT NOT NULL, -- Foreign key to evaluation_results FOREIGN KEY (evaluation_result_id) REFERENCES evaluation_results(id) ON DELETE CASCADE ) """) # Index for fast lookups by evaluation_result_id cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_scores_result_id ON evaluation_scores(evaluation_result_id) """) # Index for filtering by overall score cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_scores_overall ON evaluation_scores(overall_score) """) # =================================================================== # Table 2: error_analysis # =================================================================== cursor.execute(""" CREATE TABLE IF NOT EXISTS error_analysis ( id INTEGER PRIMARY KEY AUTOINCREMENT, evaluation_result_id INTEGER NOT NULL, -- Error classification error_type TEXT NOT NULL, -- 'retrieval_failure', 'generation_error', 'hallucination', etc. error_description TEXT NOT NULL, severity TEXT NOT NULL, -- 'low', 'medium', 'high', 'critical' -- Diagnostics suggested_fix TEXT, affected_component TEXT, -- 'retriever', 'generator', 'embedder', 'reranker' -- Metadata timestamp TEXT NOT NULL, -- Foreign key to evaluation_results FOREIGN KEY (evaluation_result_id) REFERENCES evaluation_results(id) ON DELETE CASCADE ) """) # Index for error type analysis cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_error_type ON error_analysis(error_type) """) # Index for severity filtering cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_error_severity ON error_analysis(severity) """) self.conn.commit() print(f"āœ… Phase 4A tables created in: {self.db_path}") def insert_evaluation_score( self, evaluation_result_id: int, correctness_score: float, relevance_score: float, completeness_score: float, clarity_score: float, conciseness_score: float, overall_score: float, confidence: float, explanation: str, issues: list, evaluator_model: str, evaluation_cost_usd: float, evaluation_time_ms: float ) -> int: """ Insert quality scores from LLM judge Args: evaluation_result_id: FK to evaluation_results table correctness_score: 0-10 relevance_score: 0-10 completeness_score: 0-10 clarity_score: 0-10 conciseness_score: 0-10 overall_score: 0-10 weighted average confidence: 0-1 explanation: Judge's reasoning issues: List of issue types evaluator_model: Judge model name evaluation_cost_usd: Cost of evaluation evaluation_time_ms: Evaluation latency Returns: ID of inserted record """ cursor = self.conn.cursor() cursor.execute(""" INSERT INTO evaluation_scores ( evaluation_result_id, correctness_score, relevance_score, completeness_score, clarity_score, conciseness_score, overall_score, confidence, explanation, issues, evaluator_model, evaluation_cost_usd, evaluation_time_ms, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( evaluation_result_id, correctness_score, relevance_score, completeness_score, clarity_score, conciseness_score, overall_score, confidence, explanation, json.dumps(issues), evaluator_model, evaluation_cost_usd, evaluation_time_ms, datetime.now().isoformat() )) self.conn.commit() return cursor.lastrowid def insert_error_analysis( self, evaluation_result_id: int, error_type: str, error_description: str, severity: str = "medium", suggested_fix: Optional[str] = None, affected_component: Optional[str] = None ) -> int: """ Insert error analysis record Args: evaluation_result_id: FK to evaluation_results table error_type: 'retrieval_failure', 'generation_error', 'hallucination', etc. error_description: Human-readable description severity: 'low', 'medium', 'high', 'critical' suggested_fix: Recommended solution affected_component: 'retriever', 'generator', 'embedder', 'reranker' Returns: ID of inserted record """ cursor = self.conn.cursor() cursor.execute(""" INSERT INTO error_analysis ( evaluation_result_id, error_type, error_description, severity, suggested_fix, affected_component, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( evaluation_result_id, error_type, error_description, severity, suggested_fix, affected_component, datetime.now().isoformat() )) self.conn.commit() return cursor.lastrowid def get_evaluation_with_scores(self, evaluation_result_id: int) -> Optional[dict]: """ Get evaluation result with quality scores Args: evaluation_result_id: ID from evaluation_results table Returns: Dict with evaluation data + scores, or None if not found """ cursor = self.conn.cursor() # Join evaluation_results with evaluation_scores cursor.execute(""" SELECT er.*, es.correctness_score, es.relevance_score, es.completeness_score, es.clarity_score, es.conciseness_score, es.overall_score, es.confidence, es.explanation, es.issues, es.evaluator_model FROM evaluation_results er LEFT JOIN evaluation_scores es ON er.id = es.evaluation_result_id WHERE er.id = ? """, (evaluation_result_id,)) row = cursor.fetchone() if row: return dict(row) return None def get_quality_summary_by_pipeline(self, run_id: str) -> list: """ Get quality score summary for each pipeline in a run Args: run_id: Evaluation run ID Returns: List of dicts with pipeline quality metrics """ cursor = self.conn.cursor() cursor.execute(""" SELECT er.pipeline_name, COUNT(es.id) as evaluated_count, ROUND(AVG(es.correctness_score), 2) as avg_correctness, ROUND(AVG(es.relevance_score), 2) as avg_relevance, ROUND(AVG(es.completeness_score), 2) as avg_completeness, ROUND(AVG(es.clarity_score), 2) as avg_clarity, ROUND(AVG(es.conciseness_score), 2) as avg_conciseness, ROUND(AVG(es.overall_score), 2) as avg_overall, ROUND(AVG(es.confidence), 3) as avg_confidence, SUM(es.evaluation_cost_usd) as total_eval_cost FROM evaluation_results er INNER JOIN evaluation_scores es ON er.id = es.evaluation_result_id WHERE er.run_id = ? GROUP BY er.pipeline_name ORDER BY avg_overall DESC """, (run_id,)) return [dict(row) for row in cursor.fetchall()] def get_error_summary(self, run_id: Optional[str] = None) -> list: """ Get error analysis summary Args: run_id: Optional run ID filter Returns: List of error type counts and severity distribution """ cursor = self.conn.cursor() if run_id: cursor.execute(""" SELECT ea.error_type, ea.severity, COUNT(*) as count, GROUP_CONCAT(DISTINCT ea.affected_component) as components FROM error_analysis ea INNER JOIN evaluation_results er ON ea.evaluation_result_id = er.id WHERE er.run_id = ? GROUP BY ea.error_type, ea.severity ORDER BY count DESC """, (run_id,)) else: cursor.execute(""" SELECT error_type, severity, COUNT(*) as count, GROUP_CONCAT(DISTINCT affected_component) as components FROM error_analysis GROUP BY error_type, severity ORDER BY count DESC """) return [dict(row) for row in cursor.fetchall()] def close(self): """Close database connection""" if self.conn: self.conn.close() # ============================================================================ # MIGRATION SCRIPT - Run this to update existing database # ============================================================================ def migrate_database(db_path: str = "data/evaluation_results.db"): """ Migrate existing Phase 3 database to Phase 4A schema Args: db_path: Path to evaluation_results.db """ print("šŸ”„ Migrating database to Phase 4A schema...") print("=" * 80) db = EvaluationDatabase(db_path) # Create new tables db.create_phase4_tables() # Check existing data cursor = db.conn.cursor() cursor.execute("SELECT COUNT(*) FROM evaluation_results") result_count = cursor.fetchone()[0] cursor.execute(""" SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='evaluation_scores' """) scores_table_exists = cursor.fetchone()[0] > 0 print(f"\nšŸ“Š Database Status:") print(f" - Evaluation results: {result_count} records") print(f" - Quality scores table: {'āœ… Created' if scores_table_exists else 'āŒ Missing'}") db.close() print("\n" + "=" * 80) print("āœ… Migration complete!") print("\nšŸš€ Next: Run scripts/evaluate_with_judge.py to populate scores") if __name__ == "__main__": # Run migration migrate_database()