puji4ml's picture
Upload 30 files
2b22a59 verified
"""
utils/database.py - Database Schema Management (Phase 4A)
========================================================
Extends the evaluation_results database with quality scoring tables:
- evaluation_scores: Multi-dimensional quality scores from LLM judge
- error_analysis: Categorized failure patterns
"""
import sqlite3
from pathlib import Path
from typing import Optional
import json
from datetime import datetime
class EvaluationDatabase:
"""
Manages SQLite database schema for RAG evaluation results
Phase 3 Tables:
- evaluation_results: Basic evaluation metrics (accuracy, time, cost)
Phase 4A Tables (NEW):
- evaluation_scores: Quality scores from LLM judge
- error_analysis: Error categorization and patterns
"""
def __init__(self, db_path: str = "data/evaluation_results.db"):
"""
Initialize database connection
Args:
db_path: Path to SQLite database file
"""
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row # Access columns by name
def create_phase4_tables(self):
"""
Create Phase 4A tables for quality evaluation
These tables extend evaluation_results with judge scores and error analysis.
"""
cursor = self.conn.cursor()
# ===================================================================
# Table 1: evaluation_scores
# ===================================================================
cursor.execute("""
CREATE TABLE IF NOT EXISTS evaluation_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
evaluation_result_id INTEGER NOT NULL,
-- Multi-dimensional scores (0-10)
correctness_score REAL NOT NULL,
relevance_score REAL NOT NULL,
completeness_score REAL NOT NULL,
clarity_score REAL NOT NULL,
conciseness_score REAL NOT NULL,
overall_score REAL NOT NULL,
-- Judge metadata
confidence REAL NOT NULL,
explanation TEXT NOT NULL,
issues TEXT NOT NULL, -- JSON array of issue types
-- Evaluation metadata
evaluator_model TEXT NOT NULL,
evaluation_cost_usd REAL NOT NULL,
evaluation_time_ms REAL NOT NULL,
timestamp TEXT NOT NULL,
-- Foreign key to evaluation_results
FOREIGN KEY (evaluation_result_id) REFERENCES evaluation_results(id)
ON DELETE CASCADE
)
""")
# Index for fast lookups by evaluation_result_id
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_scores_result_id
ON evaluation_scores(evaluation_result_id)
""")
# Index for filtering by overall score
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_scores_overall
ON evaluation_scores(overall_score)
""")
# ===================================================================
# Table 2: error_analysis
# ===================================================================
cursor.execute("""
CREATE TABLE IF NOT EXISTS error_analysis (
id INTEGER PRIMARY KEY AUTOINCREMENT,
evaluation_result_id INTEGER NOT NULL,
-- Error classification
error_type TEXT NOT NULL, -- 'retrieval_failure', 'generation_error', 'hallucination', etc.
error_description TEXT NOT NULL,
severity TEXT NOT NULL, -- 'low', 'medium', 'high', 'critical'
-- Diagnostics
suggested_fix TEXT,
affected_component TEXT, -- 'retriever', 'generator', 'embedder', 'reranker'
-- Metadata
timestamp TEXT NOT NULL,
-- Foreign key to evaluation_results
FOREIGN KEY (evaluation_result_id) REFERENCES evaluation_results(id)
ON DELETE CASCADE
)
""")
# Index for error type analysis
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_error_type
ON error_analysis(error_type)
""")
# Index for severity filtering
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_error_severity
ON error_analysis(severity)
""")
self.conn.commit()
print(f"βœ… Phase 4A tables created in: {self.db_path}")
def insert_evaluation_score(
self,
evaluation_result_id: int,
correctness_score: float,
relevance_score: float,
completeness_score: float,
clarity_score: float,
conciseness_score: float,
overall_score: float,
confidence: float,
explanation: str,
issues: list,
evaluator_model: str,
evaluation_cost_usd: float,
evaluation_time_ms: float
) -> int:
"""
Insert quality scores from LLM judge
Args:
evaluation_result_id: FK to evaluation_results table
correctness_score: 0-10
relevance_score: 0-10
completeness_score: 0-10
clarity_score: 0-10
conciseness_score: 0-10
overall_score: 0-10 weighted average
confidence: 0-1
explanation: Judge's reasoning
issues: List of issue types
evaluator_model: Judge model name
evaluation_cost_usd: Cost of evaluation
evaluation_time_ms: Evaluation latency
Returns:
ID of inserted record
"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO evaluation_scores (
evaluation_result_id,
correctness_score, relevance_score, completeness_score,
clarity_score, conciseness_score, overall_score,
confidence, explanation, issues,
evaluator_model, evaluation_cost_usd, evaluation_time_ms,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
evaluation_result_id,
correctness_score, relevance_score, completeness_score,
clarity_score, conciseness_score, overall_score,
confidence, explanation, json.dumps(issues),
evaluator_model, evaluation_cost_usd, evaluation_time_ms,
datetime.now().isoformat()
))
self.conn.commit()
return cursor.lastrowid
def insert_error_analysis(
self,
evaluation_result_id: int,
error_type: str,
error_description: str,
severity: str = "medium",
suggested_fix: Optional[str] = None,
affected_component: Optional[str] = None
) -> int:
"""
Insert error analysis record
Args:
evaluation_result_id: FK to evaluation_results table
error_type: 'retrieval_failure', 'generation_error', 'hallucination', etc.
error_description: Human-readable description
severity: 'low', 'medium', 'high', 'critical'
suggested_fix: Recommended solution
affected_component: 'retriever', 'generator', 'embedder', 'reranker'
Returns:
ID of inserted record
"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO error_analysis (
evaluation_result_id,
error_type, error_description, severity,
suggested_fix, affected_component,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
evaluation_result_id,
error_type, error_description, severity,
suggested_fix, affected_component,
datetime.now().isoformat()
))
self.conn.commit()
return cursor.lastrowid
def get_evaluation_with_scores(self, evaluation_result_id: int) -> Optional[dict]:
"""
Get evaluation result with quality scores
Args:
evaluation_result_id: ID from evaluation_results table
Returns:
Dict with evaluation data + scores, or None if not found
"""
cursor = self.conn.cursor()
# Join evaluation_results with evaluation_scores
cursor.execute("""
SELECT
er.*,
es.correctness_score,
es.relevance_score,
es.completeness_score,
es.clarity_score,
es.conciseness_score,
es.overall_score,
es.confidence,
es.explanation,
es.issues,
es.evaluator_model
FROM evaluation_results er
LEFT JOIN evaluation_scores es ON er.id = es.evaluation_result_id
WHERE er.id = ?
""", (evaluation_result_id,))
row = cursor.fetchone()
if row:
return dict(row)
return None
def get_quality_summary_by_pipeline(self, run_id: str) -> list:
"""
Get quality score summary for each pipeline in a run
Args:
run_id: Evaluation run ID
Returns:
List of dicts with pipeline quality metrics
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT
er.pipeline_name,
COUNT(es.id) as evaluated_count,
ROUND(AVG(es.correctness_score), 2) as avg_correctness,
ROUND(AVG(es.relevance_score), 2) as avg_relevance,
ROUND(AVG(es.completeness_score), 2) as avg_completeness,
ROUND(AVG(es.clarity_score), 2) as avg_clarity,
ROUND(AVG(es.conciseness_score), 2) as avg_conciseness,
ROUND(AVG(es.overall_score), 2) as avg_overall,
ROUND(AVG(es.confidence), 3) as avg_confidence,
SUM(es.evaluation_cost_usd) as total_eval_cost
FROM evaluation_results er
INNER JOIN evaluation_scores es ON er.id = es.evaluation_result_id
WHERE er.run_id = ?
GROUP BY er.pipeline_name
ORDER BY avg_overall DESC
""", (run_id,))
return [dict(row) for row in cursor.fetchall()]
def get_error_summary(self, run_id: Optional[str] = None) -> list:
"""
Get error analysis summary
Args:
run_id: Optional run ID filter
Returns:
List of error type counts and severity distribution
"""
cursor = self.conn.cursor()
if run_id:
cursor.execute("""
SELECT
ea.error_type,
ea.severity,
COUNT(*) as count,
GROUP_CONCAT(DISTINCT ea.affected_component) as components
FROM error_analysis ea
INNER JOIN evaluation_results er ON ea.evaluation_result_id = er.id
WHERE er.run_id = ?
GROUP BY ea.error_type, ea.severity
ORDER BY count DESC
""", (run_id,))
else:
cursor.execute("""
SELECT
error_type,
severity,
COUNT(*) as count,
GROUP_CONCAT(DISTINCT affected_component) as components
FROM error_analysis
GROUP BY error_type, severity
ORDER BY count DESC
""")
return [dict(row) for row in cursor.fetchall()]
def close(self):
"""Close database connection"""
if self.conn:
self.conn.close()
# ============================================================================
# MIGRATION SCRIPT - Run this to update existing database
# ============================================================================
def migrate_database(db_path: str = "data/evaluation_results.db"):
"""
Migrate existing Phase 3 database to Phase 4A schema
Args:
db_path: Path to evaluation_results.db
"""
print("πŸ”„ Migrating database to Phase 4A schema...")
print("=" * 80)
db = EvaluationDatabase(db_path)
# Create new tables
db.create_phase4_tables()
# Check existing data
cursor = db.conn.cursor()
cursor.execute("SELECT COUNT(*) FROM evaluation_results")
result_count = cursor.fetchone()[0]
cursor.execute("""
SELECT COUNT(*) FROM sqlite_master
WHERE type='table' AND name='evaluation_scores'
""")
scores_table_exists = cursor.fetchone()[0] > 0
print(f"\nπŸ“Š Database Status:")
print(f" - Evaluation results: {result_count} records")
print(f" - Quality scores table: {'βœ… Created' if scores_table_exists else '❌ Missing'}")
db.close()
print("\n" + "=" * 80)
print("βœ… Migration complete!")
print("\nπŸš€ Next: Run scripts/evaluate_with_judge.py to populate scores")
if __name__ == "__main__":
# Run migration
migrate_database()