RAG-Pipeline-Optimizer / scripts /evaluate_with_judge.py
puji4ml's picture
Upload 30 files
2b22a59 verified
"""
scripts/evaluate_with_judge.py - Re-evaluate with LLM Judge (Phase 4A)
======================================================================
Loads existing evaluation results from Phase 3 and scores them using
GPT-5 as an expert judge. Stores quality scores in evaluation_scores table.
Usage:
python scripts/evaluate_with_judge.py --run-id <run_id> --limit 5
python scripts/evaluate_with_judge.py --latest --limit 5
"""
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
import sqlite3
import argparse
from typing import List, Dict, Optional
import json
from tqdm import tqdm
import time
from core.evaluator import LLMJudge, EvaluationScores
from utils.database import EvaluationDatabase
def get_latest_run_id(db_path: str) -> Optional[str]:
"""Get the most recent run_id from database"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT run_id
FROM evaluation_results
ORDER BY timestamp DESC
LIMIT 1
""")
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def get_unevaluated_results(
db_path: str,
run_id: str,
limit: Optional[int] = None
) -> List[Dict]:
"""
Get evaluation results that don't have quality scores yet
Args:
db_path: Path to database
run_id: Evaluation run ID
limit: Max number of QUESTIONS (not total results) to fetch
Returns:
List of dicts with evaluation result data
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if limit:
# First, get N distinct questions
cursor.execute("""
SELECT DISTINCT question_id
FROM evaluation_results
WHERE run_id = ?
ORDER BY question_id
LIMIT ?
""", (run_id, limit))
question_ids = [row[0] for row in cursor.fetchall()]
if not question_ids:
conn.close()
return []
# Then get all pipeline results for those questions (without scores)
placeholders = ','.join(['?' for _ in question_ids])
query = f"""
SELECT
er.id,
er.run_id,
er.pipeline_id,
er.pipeline_name,
er.question_id,
er.query,
er.ground_truth_answers,
er.generated_answer,
er.retrieved_chunks,
er.answer_found
FROM evaluation_results er
LEFT JOIN evaluation_scores es ON er.id = es.evaluation_result_id
WHERE er.run_id = ? AND er.question_id IN ({placeholders}) AND es.id IS NULL
ORDER BY er.question_id, er.pipeline_id
"""
cursor.execute(query, [run_id] + question_ids)
else:
# Get all unevaluated results
query = """
SELECT
er.id,
er.run_id,
er.pipeline_id,
er.pipeline_name,
er.question_id,
er.query,
er.ground_truth_answers,
er.generated_answer,
er.retrieved_chunks,
er.answer_found
FROM evaluation_results er
LEFT JOIN evaluation_scores es ON er.id = es.evaluation_result_id
WHERE er.run_id = ? AND es.id IS NULL
ORDER BY er.question_id, er.pipeline_id
"""
cursor.execute(query, (run_id,))
results = [dict(row) for row in cursor.fetchall()]
conn.close()
return results
def evaluate_results_with_judge(
db_path: str,
run_id: str,
limit: Optional[int] = None,
verbose: bool = True
):
"""
Evaluate existing results with LLM judge
Args:
db_path: Path to evaluation_results.db
run_id: Run ID to evaluate
limit: Max number to evaluate (for testing)
verbose: Print progress
"""
if verbose:
print("\n" + "=" * 80)
print("πŸ§‘β€βš–οΈ LLM JUDGE RE-EVALUATION - Phase 4A")
print("=" * 80)
# Initialize judge
if verbose:
print("\n[Step 1] Initializing LLM Judge...")
judge = LLMJudge(
judge_model="gpt-5-chat",
temperature=0.0,
verbose=False # Don't print each evaluation
)
if verbose:
print(f" βœ… Judge ready: {judge.judge_model} (deployment: {judge.deployment})")
# Load unevaluated results
if verbose:
print(f"\n[Step 2] Loading evaluation results...")
print(f" Run ID: {run_id}")
if limit:
print(f" Limit: {limit} questions Γ— all pipelines (test mode)")
results = get_unevaluated_results(db_path, run_id, limit)
if len(results) == 0:
print("\n❌ No unevaluated results found!")
print(" Either all results have been scored, or run_id doesn't exist.")
return
# Count unique questions and pipelines
unique_questions = len(set(r['question_id'] for r in results))
unique_pipelines = len(set(r['pipeline_name'] for r in results))
if verbose:
print(f" βœ… Loaded {len(results)} results ({unique_questions} questions Γ— {unique_pipelines} pipelines)")
# Initialize database
db = EvaluationDatabase(db_path)
# Evaluate each result
if verbose:
print(f"\n[Step 3] Evaluating with LLM Judge...")
print("=" * 80)
total_cost = 0.0
success_count = 0
error_count = 0
# Group results for progress tracking
progress_bar = tqdm(results, desc="Judging answers", unit="answer") if verbose else results
for result in progress_bar:
try:
# Parse JSON fields
ground_truth = json.loads(result['ground_truth_answers'])
context = json.loads(result['retrieved_chunks'])
# Evaluate with judge
scores = judge.evaluate(
query=result['query'],
ground_truth_answers=ground_truth,
generated_answer=result['generated_answer'],
retrieved_context=context
)
# Insert scores into database
db.insert_evaluation_score(
evaluation_result_id=result['id'],
correctness_score=scores.correctness_score,
relevance_score=scores.relevance_score,
completeness_score=scores.completeness_score,
clarity_score=scores.clarity_score,
conciseness_score=scores.conciseness_score,
overall_score=scores.overall_score,
confidence=scores.confidence,
explanation=scores.explanation,
issues=scores.issues,
evaluator_model=scores.evaluator_model,
evaluation_cost_usd=scores.evaluation_cost_usd,
evaluation_time_ms=scores.evaluation_time_ms
)
# Analyze errors if score is low
if scores.overall_score < 5.0:
# Determine error type from issues
error_type = "unknown_error"
severity = "medium"
if "retrieval_failure" in scores.issues:
error_type = "retrieval_failure"
severity = "high"
elif "hallucination" in scores.issues:
error_type = "hallucination"
severity = "critical"
elif "generation_error" in scores.issues:
error_type = "generation_error"
severity = "high"
elif "factual_error" in scores.issues:
error_type = "factual_error"
severity = "high"
elif "incomplete" in scores.issues:
error_type = "incomplete"
severity = "medium"
# Insert error analysis
db.insert_error_analysis(
evaluation_result_id=result['id'],
error_type=error_type,
error_description=scores.explanation,
severity=severity,
suggested_fix=f"Overall score: {scores.overall_score:.1f}/10. Issues: {', '.join(scores.issues)}",
affected_component="pipeline" # Could be more specific
)
total_cost += scores.evaluation_cost_usd
success_count += 1
except Exception as e:
error_count += 1
if verbose:
print(f"\n❌ Error evaluating result {result['id']}: {e}")
continue
# Close database
db.close()
# Print summary
if verbose:
print("\n" + "=" * 80)
print("πŸ“Š EVALUATION SUMMARY")
print("=" * 80)
print(f" Total evaluated: {success_count}/{len(results)}")
print(f" Errors: {error_count}")
print(f" Total cost: ${total_cost:.4f}")
print(f" Avg cost per answer: ${total_cost/success_count:.6f}")
print("=" * 80)
# Show quality summary by pipeline
print("\n[Step 4] Quality Summary by Pipeline...")
db2 = EvaluationDatabase(db_path)
summary = db2.get_quality_summary_by_pipeline(run_id)
if summary:
print("\n" + "-" * 120)
print(f"{'Pipeline':<40} {'Count':<8} {'Correct':<8} {'Relevant':<8} {'Complete':<8} {'Clear':<8} {'Concise':<8} {'Overall':<8}")
print("-" * 120)
for row in summary:
print(
f"{row['pipeline_name']:<40} "
f"{row['evaluated_count']:<8} "
f"{row['avg_correctness']:<8.1f} "
f"{row['avg_relevance']:<8.1f} "
f"{row['avg_completeness']:<8.1f} "
f"{row['avg_clarity']:<8.1f} "
f"{row['avg_conciseness']:<8.1f} "
f"{row['avg_overall']:<8.1f}"
)
print("-" * 120)
# Show error summary
error_summary = db2.get_error_summary(run_id)
if error_summary:
print("\n[Step 5] Error Analysis Summary...")
print("\n" + "-" * 80)
print(f"{'Error Type':<30} {'Severity':<12} {'Count':<8}")
print("-" * 80)
for row in error_summary:
print(
f"{row['error_type']:<30} "
f"{row['severity']:<12} "
f"{row['count']:<8}"
)
print("-" * 80)
db2.close()
print("\nβœ… Re-evaluation complete!")
print("\nπŸš€ Next Steps:")
print(" 1. Analyze quality scores in database")
print(" 2. Compare pipeline quality metrics")
print(" 3. Proceed to Phase 4B: Build Streamlit dashboard")
def main():
parser = argparse.ArgumentParser(
description="Re-evaluate existing results with LLM judge"
)
parser.add_argument(
"--run-id",
type=str,
help="Specific run ID to evaluate"
)
parser.add_argument(
"--latest",
action="store_true",
help="Evaluate latest run"
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limit number of results to evaluate (for testing)"
)
parser.add_argument(
"--db-path",
type=str,
default="data/evaluation_results.db",
help="Path to database"
)
args = parser.parse_args()
# Determine run_id
if args.latest:
run_id = get_latest_run_id(args.db_path)
if not run_id:
print("❌ No evaluation runs found in database!")
return
elif args.run_id:
run_id = args.run_id
else:
print("❌ Must specify either --run-id or --latest")
parser.print_help()
return
# Run evaluation
evaluate_results_with_judge(
db_path=args.db_path,
run_id=run_id,
limit=args.limit,
verbose=True
)
if __name__ == "__main__":
main()