Spaces:
Sleeping
Sleeping
| """ | |
| scripts/evaluate_with_judge.py - Re-evaluate with LLM Judge (Phase 4A) | |
| ====================================================================== | |
| Loads existing evaluation results from Phase 3 and scores them using | |
| GPT-5 as an expert judge. Stores quality scores in evaluation_scores table. | |
| Usage: | |
| python scripts/evaluate_with_judge.py --run-id <run_id> --limit 5 | |
| python scripts/evaluate_with_judge.py --latest --limit 5 | |
| """ | |
| import sys | |
| from pathlib import Path | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| import sqlite3 | |
| import argparse | |
| from typing import List, Dict, Optional | |
| import json | |
| from tqdm import tqdm | |
| import time | |
| from core.evaluator import LLMJudge, EvaluationScores | |
| from utils.database import EvaluationDatabase | |
| def get_latest_run_id(db_path: str) -> Optional[str]: | |
| """Get the most recent run_id from database""" | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT run_id | |
| FROM evaluation_results | |
| ORDER BY timestamp DESC | |
| LIMIT 1 | |
| """) | |
| result = cursor.fetchone() | |
| conn.close() | |
| return result[0] if result else None | |
| def get_unevaluated_results( | |
| db_path: str, | |
| run_id: str, | |
| limit: Optional[int] = None | |
| ) -> List[Dict]: | |
| """ | |
| Get evaluation results that don't have quality scores yet | |
| Args: | |
| db_path: Path to database | |
| run_id: Evaluation run ID | |
| limit: Max number of QUESTIONS (not total results) to fetch | |
| Returns: | |
| List of dicts with evaluation result data | |
| """ | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| if limit: | |
| # First, get N distinct questions | |
| cursor.execute(""" | |
| SELECT DISTINCT question_id | |
| FROM evaluation_results | |
| WHERE run_id = ? | |
| ORDER BY question_id | |
| LIMIT ? | |
| """, (run_id, limit)) | |
| question_ids = [row[0] for row in cursor.fetchall()] | |
| if not question_ids: | |
| conn.close() | |
| return [] | |
| # Then get all pipeline results for those questions (without scores) | |
| placeholders = ','.join(['?' for _ in question_ids]) | |
| query = f""" | |
| SELECT | |
| er.id, | |
| er.run_id, | |
| er.pipeline_id, | |
| er.pipeline_name, | |
| er.question_id, | |
| er.query, | |
| er.ground_truth_answers, | |
| er.generated_answer, | |
| er.retrieved_chunks, | |
| er.answer_found | |
| FROM evaluation_results er | |
| LEFT JOIN evaluation_scores es ON er.id = es.evaluation_result_id | |
| WHERE er.run_id = ? AND er.question_id IN ({placeholders}) AND es.id IS NULL | |
| ORDER BY er.question_id, er.pipeline_id | |
| """ | |
| cursor.execute(query, [run_id] + question_ids) | |
| else: | |
| # Get all unevaluated results | |
| query = """ | |
| SELECT | |
| er.id, | |
| er.run_id, | |
| er.pipeline_id, | |
| er.pipeline_name, | |
| er.question_id, | |
| er.query, | |
| er.ground_truth_answers, | |
| er.generated_answer, | |
| er.retrieved_chunks, | |
| er.answer_found | |
| FROM evaluation_results er | |
| LEFT JOIN evaluation_scores es ON er.id = es.evaluation_result_id | |
| WHERE er.run_id = ? AND es.id IS NULL | |
| ORDER BY er.question_id, er.pipeline_id | |
| """ | |
| cursor.execute(query, (run_id,)) | |
| results = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| return results | |
| def evaluate_results_with_judge( | |
| db_path: str, | |
| run_id: str, | |
| limit: Optional[int] = None, | |
| verbose: bool = True | |
| ): | |
| """ | |
| Evaluate existing results with LLM judge | |
| Args: | |
| db_path: Path to evaluation_results.db | |
| run_id: Run ID to evaluate | |
| limit: Max number to evaluate (for testing) | |
| verbose: Print progress | |
| """ | |
| if verbose: | |
| print("\n" + "=" * 80) | |
| print("π§ββοΈ LLM JUDGE RE-EVALUATION - Phase 4A") | |
| print("=" * 80) | |
| # Initialize judge | |
| if verbose: | |
| print("\n[Step 1] Initializing LLM Judge...") | |
| judge = LLMJudge( | |
| judge_model="gpt-5-chat", | |
| temperature=0.0, | |
| verbose=False # Don't print each evaluation | |
| ) | |
| if verbose: | |
| print(f" β Judge ready: {judge.judge_model} (deployment: {judge.deployment})") | |
| # Load unevaluated results | |
| if verbose: | |
| print(f"\n[Step 2] Loading evaluation results...") | |
| print(f" Run ID: {run_id}") | |
| if limit: | |
| print(f" Limit: {limit} questions Γ all pipelines (test mode)") | |
| results = get_unevaluated_results(db_path, run_id, limit) | |
| if len(results) == 0: | |
| print("\nβ No unevaluated results found!") | |
| print(" Either all results have been scored, or run_id doesn't exist.") | |
| return | |
| # Count unique questions and pipelines | |
| unique_questions = len(set(r['question_id'] for r in results)) | |
| unique_pipelines = len(set(r['pipeline_name'] for r in results)) | |
| if verbose: | |
| print(f" β Loaded {len(results)} results ({unique_questions} questions Γ {unique_pipelines} pipelines)") | |
| # Initialize database | |
| db = EvaluationDatabase(db_path) | |
| # Evaluate each result | |
| if verbose: | |
| print(f"\n[Step 3] Evaluating with LLM Judge...") | |
| print("=" * 80) | |
| total_cost = 0.0 | |
| success_count = 0 | |
| error_count = 0 | |
| # Group results for progress tracking | |
| progress_bar = tqdm(results, desc="Judging answers", unit="answer") if verbose else results | |
| for result in progress_bar: | |
| try: | |
| # Parse JSON fields | |
| ground_truth = json.loads(result['ground_truth_answers']) | |
| context = json.loads(result['retrieved_chunks']) | |
| # Evaluate with judge | |
| scores = judge.evaluate( | |
| query=result['query'], | |
| ground_truth_answers=ground_truth, | |
| generated_answer=result['generated_answer'], | |
| retrieved_context=context | |
| ) | |
| # Insert scores into database | |
| db.insert_evaluation_score( | |
| evaluation_result_id=result['id'], | |
| correctness_score=scores.correctness_score, | |
| relevance_score=scores.relevance_score, | |
| completeness_score=scores.completeness_score, | |
| clarity_score=scores.clarity_score, | |
| conciseness_score=scores.conciseness_score, | |
| overall_score=scores.overall_score, | |
| confidence=scores.confidence, | |
| explanation=scores.explanation, | |
| issues=scores.issues, | |
| evaluator_model=scores.evaluator_model, | |
| evaluation_cost_usd=scores.evaluation_cost_usd, | |
| evaluation_time_ms=scores.evaluation_time_ms | |
| ) | |
| # Analyze errors if score is low | |
| if scores.overall_score < 5.0: | |
| # Determine error type from issues | |
| error_type = "unknown_error" | |
| severity = "medium" | |
| if "retrieval_failure" in scores.issues: | |
| error_type = "retrieval_failure" | |
| severity = "high" | |
| elif "hallucination" in scores.issues: | |
| error_type = "hallucination" | |
| severity = "critical" | |
| elif "generation_error" in scores.issues: | |
| error_type = "generation_error" | |
| severity = "high" | |
| elif "factual_error" in scores.issues: | |
| error_type = "factual_error" | |
| severity = "high" | |
| elif "incomplete" in scores.issues: | |
| error_type = "incomplete" | |
| severity = "medium" | |
| # Insert error analysis | |
| db.insert_error_analysis( | |
| evaluation_result_id=result['id'], | |
| error_type=error_type, | |
| error_description=scores.explanation, | |
| severity=severity, | |
| suggested_fix=f"Overall score: {scores.overall_score:.1f}/10. Issues: {', '.join(scores.issues)}", | |
| affected_component="pipeline" # Could be more specific | |
| ) | |
| total_cost += scores.evaluation_cost_usd | |
| success_count += 1 | |
| except Exception as e: | |
| error_count += 1 | |
| if verbose: | |
| print(f"\nβ Error evaluating result {result['id']}: {e}") | |
| continue | |
| # Close database | |
| db.close() | |
| # Print summary | |
| if verbose: | |
| print("\n" + "=" * 80) | |
| print("π EVALUATION SUMMARY") | |
| print("=" * 80) | |
| print(f" Total evaluated: {success_count}/{len(results)}") | |
| print(f" Errors: {error_count}") | |
| print(f" Total cost: ${total_cost:.4f}") | |
| print(f" Avg cost per answer: ${total_cost/success_count:.6f}") | |
| print("=" * 80) | |
| # Show quality summary by pipeline | |
| print("\n[Step 4] Quality Summary by Pipeline...") | |
| db2 = EvaluationDatabase(db_path) | |
| summary = db2.get_quality_summary_by_pipeline(run_id) | |
| if summary: | |
| print("\n" + "-" * 120) | |
| print(f"{'Pipeline':<40} {'Count':<8} {'Correct':<8} {'Relevant':<8} {'Complete':<8} {'Clear':<8} {'Concise':<8} {'Overall':<8}") | |
| print("-" * 120) | |
| for row in summary: | |
| print( | |
| f"{row['pipeline_name']:<40} " | |
| f"{row['evaluated_count']:<8} " | |
| f"{row['avg_correctness']:<8.1f} " | |
| f"{row['avg_relevance']:<8.1f} " | |
| f"{row['avg_completeness']:<8.1f} " | |
| f"{row['avg_clarity']:<8.1f} " | |
| f"{row['avg_conciseness']:<8.1f} " | |
| f"{row['avg_overall']:<8.1f}" | |
| ) | |
| print("-" * 120) | |
| # Show error summary | |
| error_summary = db2.get_error_summary(run_id) | |
| if error_summary: | |
| print("\n[Step 5] Error Analysis Summary...") | |
| print("\n" + "-" * 80) | |
| print(f"{'Error Type':<30} {'Severity':<12} {'Count':<8}") | |
| print("-" * 80) | |
| for row in error_summary: | |
| print( | |
| f"{row['error_type']:<30} " | |
| f"{row['severity']:<12} " | |
| f"{row['count']:<8}" | |
| ) | |
| print("-" * 80) | |
| db2.close() | |
| print("\nβ Re-evaluation complete!") | |
| print("\nπ Next Steps:") | |
| print(" 1. Analyze quality scores in database") | |
| print(" 2. Compare pipeline quality metrics") | |
| print(" 3. Proceed to Phase 4B: Build Streamlit dashboard") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Re-evaluate existing results with LLM judge" | |
| ) | |
| parser.add_argument( | |
| "--run-id", | |
| type=str, | |
| help="Specific run ID to evaluate" | |
| ) | |
| parser.add_argument( | |
| "--latest", | |
| action="store_true", | |
| help="Evaluate latest run" | |
| ) | |
| parser.add_argument( | |
| "--limit", | |
| type=int, | |
| default=None, | |
| help="Limit number of results to evaluate (for testing)" | |
| ) | |
| parser.add_argument( | |
| "--db-path", | |
| type=str, | |
| default="data/evaluation_results.db", | |
| help="Path to database" | |
| ) | |
| args = parser.parse_args() | |
| # Determine run_id | |
| if args.latest: | |
| run_id = get_latest_run_id(args.db_path) | |
| if not run_id: | |
| print("β No evaluation runs found in database!") | |
| return | |
| elif args.run_id: | |
| run_id = args.run_id | |
| else: | |
| print("β Must specify either --run-id or --latest") | |
| parser.print_help() | |
| return | |
| # Run evaluation | |
| evaluate_results_with_judge( | |
| db_path=args.db_path, | |
| run_id=run_id, | |
| limit=args.limit, | |
| verbose=True | |
| ) | |
| if __name__ == "__main__": | |
| main() | |