Spaces:
Sleeping
Sleeping
| """ | |
| utils/database.py - Database Schema Management (Phase 4A) | |
| ======================================================== | |
| Extends the evaluation_results database with quality scoring tables: | |
| - evaluation_scores: Multi-dimensional quality scores from LLM judge | |
| - error_analysis: Categorized failure patterns | |
| """ | |
| import sqlite3 | |
| from pathlib import Path | |
| from typing import Optional | |
| import json | |
| from datetime import datetime | |
| class EvaluationDatabase: | |
| """ | |
| Manages SQLite database schema for RAG evaluation results | |
| Phase 3 Tables: | |
| - evaluation_results: Basic evaluation metrics (accuracy, time, cost) | |
| Phase 4A Tables (NEW): | |
| - evaluation_scores: Quality scores from LLM judge | |
| - error_analysis: Error categorization and patterns | |
| """ | |
| def __init__(self, db_path: str = "data/evaluation_results.db"): | |
| """ | |
| Initialize database connection | |
| Args: | |
| db_path: Path to SQLite database file | |
| """ | |
| self.db_path = Path(db_path) | |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
| self.conn = sqlite3.connect(str(self.db_path)) | |
| self.conn.row_factory = sqlite3.Row # Access columns by name | |
| def create_phase4_tables(self): | |
| """ | |
| Create Phase 4A tables for quality evaluation | |
| These tables extend evaluation_results with judge scores and error analysis. | |
| """ | |
| cursor = self.conn.cursor() | |
| # =================================================================== | |
| # Table 1: evaluation_scores | |
| # =================================================================== | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS evaluation_scores ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| evaluation_result_id INTEGER NOT NULL, | |
| -- Multi-dimensional scores (0-10) | |
| correctness_score REAL NOT NULL, | |
| relevance_score REAL NOT NULL, | |
| completeness_score REAL NOT NULL, | |
| clarity_score REAL NOT NULL, | |
| conciseness_score REAL NOT NULL, | |
| overall_score REAL NOT NULL, | |
| -- Judge metadata | |
| confidence REAL NOT NULL, | |
| explanation TEXT NOT NULL, | |
| issues TEXT NOT NULL, -- JSON array of issue types | |
| -- Evaluation metadata | |
| evaluator_model TEXT NOT NULL, | |
| evaluation_cost_usd REAL NOT NULL, | |
| evaluation_time_ms REAL NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| -- Foreign key to evaluation_results | |
| FOREIGN KEY (evaluation_result_id) REFERENCES evaluation_results(id) | |
| ON DELETE CASCADE | |
| ) | |
| """) | |
| # Index for fast lookups by evaluation_result_id | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_scores_result_id | |
| ON evaluation_scores(evaluation_result_id) | |
| """) | |
| # Index for filtering by overall score | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_scores_overall | |
| ON evaluation_scores(overall_score) | |
| """) | |
| # =================================================================== | |
| # Table 2: error_analysis | |
| # =================================================================== | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS error_analysis ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| evaluation_result_id INTEGER NOT NULL, | |
| -- Error classification | |
| error_type TEXT NOT NULL, -- 'retrieval_failure', 'generation_error', 'hallucination', etc. | |
| error_description TEXT NOT NULL, | |
| severity TEXT NOT NULL, -- 'low', 'medium', 'high', 'critical' | |
| -- Diagnostics | |
| suggested_fix TEXT, | |
| affected_component TEXT, -- 'retriever', 'generator', 'embedder', 'reranker' | |
| -- Metadata | |
| timestamp TEXT NOT NULL, | |
| -- Foreign key to evaluation_results | |
| FOREIGN KEY (evaluation_result_id) REFERENCES evaluation_results(id) | |
| ON DELETE CASCADE | |
| ) | |
| """) | |
| # Index for error type analysis | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_error_type | |
| ON error_analysis(error_type) | |
| """) | |
| # Index for severity filtering | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_error_severity | |
| ON error_analysis(severity) | |
| """) | |
| self.conn.commit() | |
| print(f"β Phase 4A tables created in: {self.db_path}") | |
| def insert_evaluation_score( | |
| self, | |
| evaluation_result_id: int, | |
| correctness_score: float, | |
| relevance_score: float, | |
| completeness_score: float, | |
| clarity_score: float, | |
| conciseness_score: float, | |
| overall_score: float, | |
| confidence: float, | |
| explanation: str, | |
| issues: list, | |
| evaluator_model: str, | |
| evaluation_cost_usd: float, | |
| evaluation_time_ms: float | |
| ) -> int: | |
| """ | |
| Insert quality scores from LLM judge | |
| Args: | |
| evaluation_result_id: FK to evaluation_results table | |
| correctness_score: 0-10 | |
| relevance_score: 0-10 | |
| completeness_score: 0-10 | |
| clarity_score: 0-10 | |
| conciseness_score: 0-10 | |
| overall_score: 0-10 weighted average | |
| confidence: 0-1 | |
| explanation: Judge's reasoning | |
| issues: List of issue types | |
| evaluator_model: Judge model name | |
| evaluation_cost_usd: Cost of evaluation | |
| evaluation_time_ms: Evaluation latency | |
| Returns: | |
| ID of inserted record | |
| """ | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO evaluation_scores ( | |
| evaluation_result_id, | |
| correctness_score, relevance_score, completeness_score, | |
| clarity_score, conciseness_score, overall_score, | |
| confidence, explanation, issues, | |
| evaluator_model, evaluation_cost_usd, evaluation_time_ms, | |
| timestamp | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| evaluation_result_id, | |
| correctness_score, relevance_score, completeness_score, | |
| clarity_score, conciseness_score, overall_score, | |
| confidence, explanation, json.dumps(issues), | |
| evaluator_model, evaluation_cost_usd, evaluation_time_ms, | |
| datetime.now().isoformat() | |
| )) | |
| self.conn.commit() | |
| return cursor.lastrowid | |
| def insert_error_analysis( | |
| self, | |
| evaluation_result_id: int, | |
| error_type: str, | |
| error_description: str, | |
| severity: str = "medium", | |
| suggested_fix: Optional[str] = None, | |
| affected_component: Optional[str] = None | |
| ) -> int: | |
| """ | |
| Insert error analysis record | |
| Args: | |
| evaluation_result_id: FK to evaluation_results table | |
| error_type: 'retrieval_failure', 'generation_error', 'hallucination', etc. | |
| error_description: Human-readable description | |
| severity: 'low', 'medium', 'high', 'critical' | |
| suggested_fix: Recommended solution | |
| affected_component: 'retriever', 'generator', 'embedder', 'reranker' | |
| Returns: | |
| ID of inserted record | |
| """ | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO error_analysis ( | |
| evaluation_result_id, | |
| error_type, error_description, severity, | |
| suggested_fix, affected_component, | |
| timestamp | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| evaluation_result_id, | |
| error_type, error_description, severity, | |
| suggested_fix, affected_component, | |
| datetime.now().isoformat() | |
| )) | |
| self.conn.commit() | |
| return cursor.lastrowid | |
| def get_evaluation_with_scores(self, evaluation_result_id: int) -> Optional[dict]: | |
| """ | |
| Get evaluation result with quality scores | |
| Args: | |
| evaluation_result_id: ID from evaluation_results table | |
| Returns: | |
| Dict with evaluation data + scores, or None if not found | |
| """ | |
| cursor = self.conn.cursor() | |
| # Join evaluation_results with evaluation_scores | |
| cursor.execute(""" | |
| SELECT | |
| er.*, | |
| es.correctness_score, | |
| es.relevance_score, | |
| es.completeness_score, | |
| es.clarity_score, | |
| es.conciseness_score, | |
| es.overall_score, | |
| es.confidence, | |
| es.explanation, | |
| es.issues, | |
| es.evaluator_model | |
| FROM evaluation_results er | |
| LEFT JOIN evaluation_scores es ON er.id = es.evaluation_result_id | |
| WHERE er.id = ? | |
| """, (evaluation_result_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return dict(row) | |
| return None | |
| def get_quality_summary_by_pipeline(self, run_id: str) -> list: | |
| """ | |
| Get quality score summary for each pipeline in a run | |
| Args: | |
| run_id: Evaluation run ID | |
| Returns: | |
| List of dicts with pipeline quality metrics | |
| """ | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| SELECT | |
| er.pipeline_name, | |
| COUNT(es.id) as evaluated_count, | |
| ROUND(AVG(es.correctness_score), 2) as avg_correctness, | |
| ROUND(AVG(es.relevance_score), 2) as avg_relevance, | |
| ROUND(AVG(es.completeness_score), 2) as avg_completeness, | |
| ROUND(AVG(es.clarity_score), 2) as avg_clarity, | |
| ROUND(AVG(es.conciseness_score), 2) as avg_conciseness, | |
| ROUND(AVG(es.overall_score), 2) as avg_overall, | |
| ROUND(AVG(es.confidence), 3) as avg_confidence, | |
| SUM(es.evaluation_cost_usd) as total_eval_cost | |
| FROM evaluation_results er | |
| INNER JOIN evaluation_scores es ON er.id = es.evaluation_result_id | |
| WHERE er.run_id = ? | |
| GROUP BY er.pipeline_name | |
| ORDER BY avg_overall DESC | |
| """, (run_id,)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def get_error_summary(self, run_id: Optional[str] = None) -> list: | |
| """ | |
| Get error analysis summary | |
| Args: | |
| run_id: Optional run ID filter | |
| Returns: | |
| List of error type counts and severity distribution | |
| """ | |
| cursor = self.conn.cursor() | |
| if run_id: | |
| cursor.execute(""" | |
| SELECT | |
| ea.error_type, | |
| ea.severity, | |
| COUNT(*) as count, | |
| GROUP_CONCAT(DISTINCT ea.affected_component) as components | |
| FROM error_analysis ea | |
| INNER JOIN evaluation_results er ON ea.evaluation_result_id = er.id | |
| WHERE er.run_id = ? | |
| GROUP BY ea.error_type, ea.severity | |
| ORDER BY count DESC | |
| """, (run_id,)) | |
| else: | |
| cursor.execute(""" | |
| SELECT | |
| error_type, | |
| severity, | |
| COUNT(*) as count, | |
| GROUP_CONCAT(DISTINCT affected_component) as components | |
| FROM error_analysis | |
| GROUP BY error_type, severity | |
| ORDER BY count DESC | |
| """) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def close(self): | |
| """Close database connection""" | |
| if self.conn: | |
| self.conn.close() | |
| # ============================================================================ | |
| # MIGRATION SCRIPT - Run this to update existing database | |
| # ============================================================================ | |
| def migrate_database(db_path: str = "data/evaluation_results.db"): | |
| """ | |
| Migrate existing Phase 3 database to Phase 4A schema | |
| Args: | |
| db_path: Path to evaluation_results.db | |
| """ | |
| print("π Migrating database to Phase 4A schema...") | |
| print("=" * 80) | |
| db = EvaluationDatabase(db_path) | |
| # Create new tables | |
| db.create_phase4_tables() | |
| # Check existing data | |
| cursor = db.conn.cursor() | |
| cursor.execute("SELECT COUNT(*) FROM evaluation_results") | |
| result_count = cursor.fetchone()[0] | |
| cursor.execute(""" | |
| SELECT COUNT(*) FROM sqlite_master | |
| WHERE type='table' AND name='evaluation_scores' | |
| """) | |
| scores_table_exists = cursor.fetchone()[0] > 0 | |
| print(f"\nπ Database Status:") | |
| print(f" - Evaluation results: {result_count} records") | |
| print(f" - Quality scores table: {'β Created' if scores_table_exists else 'β Missing'}") | |
| db.close() | |
| print("\n" + "=" * 80) | |
| print("β Migration complete!") | |
| print("\nπ Next: Run scripts/evaluate_with_judge.py to populate scores") | |
| if __name__ == "__main__": | |
| # Run migration | |
| migrate_database() | |