#!/usr/bin/env python3 """ Evaluation script for EyeWiki RAG system. Evaluates the system on a set of test questions and measures: - Retrieval recall (relevant sources retrieved) - Answer relevance (expected topics covered) - Source citation accuracy Usage: python scripts/evaluate.py python scripts/evaluate.py --questions tests/custom_questions.json python scripts/evaluate.py --output results/eval_results.json """ import argparse import json import sys import time from pathlib import Path from typing import Dict, List, Any from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn from rich.table import Table from rich.panel import Panel # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from config.settings import Settings from src.llm.ollama_client import OllamaClient from src.rag.query_engine import EyeWikiQueryEngine from src.rag.reranker import CrossEncoderReranker from src.rag.retriever import HybridRetriever from src.vectorstore.qdrant_store import QdrantStoreManager console = Console() # ============================================================================ # Evaluation Metrics # ============================================================================ def calculate_retrieval_recall( retrieved_sources: List[str], expected_sources: List[str], ) -> float: """ Calculate retrieval recall. Recall = (# of expected sources retrieved) / (# of expected sources) Args: retrieved_sources: List of retrieved source titles expected_sources: List of expected source titles Returns: Recall score (0-1) """ if not expected_sources: return 1.0 # Normalize for case-insensitive matching retrieved_lower = {s.lower() for s in retrieved_sources} expected_lower = {s.lower() for s in expected_sources} # Count matches (allow partial matching) matches = 0 for expected in expected_lower: for retrieved in retrieved_lower: # Check if expected source name is in retrieved source or vice versa if expected in retrieved or retrieved in expected: matches += 1 break recall = matches / len(expected_sources) if expected_sources else 0.0 return recall def calculate_answer_relevance( answer: str, expected_topics: List[str], ) -> float: """ Calculate answer relevance based on topic coverage. Relevance = (# of expected topics found) / (# of expected topics) Args: answer: Generated answer text expected_topics: List of expected topic keywords Returns: Relevance score (0-1) """ if not expected_topics: return 1.0 answer_lower = answer.lower() # Count how many expected topics appear in answer topics_found = sum(1 for topic in expected_topics if topic.lower() in answer_lower) relevance = topics_found / len(expected_topics) if expected_topics else 0.0 return relevance def calculate_citation_accuracy( answer: str, cited_sources: List[str], expected_sources: List[str], ) -> Dict[str, float]: """ Calculate citation accuracy metrics. Args: answer: Generated answer text cited_sources: Sources returned by system expected_sources: Expected sources Returns: Dictionary with citation metrics """ # Check if answer contains explicit citations has_citations = "[Source:" in answer or "According to" in answer # Calculate precision and recall if cited_sources and expected_sources: cited_set = {s.lower() for s in cited_sources} expected_set = {s.lower() for s in expected_sources} # Allow partial matching true_positives = 0 for cited in cited_set: for expected in expected_set: if expected in cited or cited in expected: true_positives += 1 break precision = true_positives / len(cited_sources) if cited_sources else 0.0 recall = true_positives / len(expected_sources) if expected_sources else 0.0 # F1 score f1 = ( 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0 ) else: precision = 0.0 recall = 0.0 f1 = 0.0 return { "has_explicit_citations": has_citations, "precision": precision, "recall": recall, "f1": f1, } # ============================================================================ # Question Evaluation # ============================================================================ def evaluate_question( question_data: Dict[str, Any], query_engine: EyeWikiQueryEngine, ) -> Dict[str, Any]: """ Evaluate a single question. Args: question_data: Question data with expected answers query_engine: Query engine instance Returns: Evaluation results """ question_id = question_data["id"] question = question_data["question"] expected_topics = question_data["expected_topics"] expected_sources = question_data["expected_sources"] # Query the system start_time = time.time() try: response = query_engine.query( question=question, include_sources=True, ) query_time = time.time() - start_time # Extract retrieved sources retrieved_sources = [s.title for s in response.sources] # Calculate metrics retrieval_recall = calculate_retrieval_recall( retrieved_sources, expected_sources ) answer_relevance = calculate_answer_relevance( response.answer, expected_topics ) citation_metrics = calculate_citation_accuracy( response.answer, retrieved_sources, expected_sources ) # Detailed topic analysis topics_found = [ topic for topic in expected_topics if topic.lower() in response.answer.lower() ] topics_missing = [ topic for topic in expected_topics if topic.lower() not in response.answer.lower() ] # Source analysis sources_retrieved = [] sources_missing = [] for expected in expected_sources: found = False for retrieved in retrieved_sources: if expected.lower() in retrieved.lower() or retrieved.lower() in expected.lower(): sources_retrieved.append(expected) found = True break if not found: sources_missing.append(expected) result = { "id": question_id, "question": question, "category": question_data.get("category", "unknown"), "answer": response.answer, "confidence": response.confidence, "query_time": query_time, "metrics": { "retrieval_recall": retrieval_recall, "answer_relevance": answer_relevance, "citation_precision": citation_metrics["precision"], "citation_recall": citation_metrics["recall"], "citation_f1": citation_metrics["f1"], }, "details": { "retrieved_sources": retrieved_sources, "expected_sources": expected_sources, "sources_retrieved": sources_retrieved, "sources_missing": sources_missing, "topics_found": topics_found, "topics_missing": topics_missing, "has_explicit_citations": citation_metrics["has_explicit_citations"], }, "success": True, } except Exception as e: result = { "id": question_id, "question": question, "category": question_data.get("category", "unknown"), "error": str(e), "query_time": time.time() - start_time, "success": False, } return result # ============================================================================ # Aggregate Analysis # ============================================================================ def calculate_aggregate_metrics(results: List[Dict[str, Any]]) -> Dict[str, Any]: """ Calculate aggregate metrics across all questions. Args: results: List of evaluation results Returns: Aggregate metrics """ successful_results = [r for r in results if r["success"]] if not successful_results: return {"error": "No successful evaluations"} # Average metrics avg_retrieval_recall = sum( r["metrics"]["retrieval_recall"] for r in successful_results ) / len(successful_results) avg_answer_relevance = sum( r["metrics"]["answer_relevance"] for r in successful_results ) / len(successful_results) avg_citation_precision = sum( r["metrics"]["citation_precision"] for r in successful_results ) / len(successful_results) avg_citation_recall = sum( r["metrics"]["citation_recall"] for r in successful_results ) / len(successful_results) avg_citation_f1 = sum( r["metrics"]["citation_f1"] for r in successful_results ) / len(successful_results) avg_confidence = sum(r["confidence"] for r in successful_results) / len( successful_results ) avg_query_time = sum(r["query_time"] for r in successful_results) / len( successful_results ) # Citation statistics citations_present = sum( 1 for r in successful_results if r["details"]["has_explicit_citations"] ) # Category breakdown categories = {} for result in successful_results: category = result["category"] if category not in categories: categories[category] = { "count": 0, "retrieval_recall": 0, "answer_relevance": 0, } categories[category]["count"] += 1 categories[category]["retrieval_recall"] += result["metrics"]["retrieval_recall"] categories[category]["answer_relevance"] += result["metrics"]["answer_relevance"] # Average by category for category, data in categories.items(): count = data["count"] data["retrieval_recall"] /= count data["answer_relevance"] /= count return { "total_questions": len(results), "successful": len(successful_results), "failed": len(results) - len(successful_results), "metrics": { "retrieval_recall": avg_retrieval_recall, "answer_relevance": avg_answer_relevance, "citation_precision": avg_citation_precision, "citation_recall": avg_citation_recall, "citation_f1": avg_citation_f1, "avg_confidence": avg_confidence, "avg_query_time": avg_query_time, "citation_rate": citations_present / len(successful_results), }, "by_category": categories, } # ============================================================================ # Output Functions # ============================================================================ def print_question_result(result: Dict[str, Any]): """Print result for a single question.""" if not result["success"]: console.print( f"\n[red]✗ {result['id']}: {result['question']}[/red]", f"[red]Error: {result['error']}[/red]", ) return # Create metrics table table = Table(show_header=False, box=None, padding=(0, 1)) table.add_column(style="cyan") table.add_column(style="yellow") metrics = result["metrics"] table.add_row("Retrieval Recall", f"{metrics['retrieval_recall']:.2%}") table.add_row("Answer Relevance", f"{metrics['answer_relevance']:.2%}") table.add_row("Citation F1", f"{metrics['citation_f1']:.2%}") table.add_row("Confidence", f"{result['confidence']:.2%}") table.add_row("Query Time", f"{result['query_time']:.2f}s") # Determine overall status avg_score = (metrics["retrieval_recall"] + metrics["answer_relevance"]) / 2 if avg_score >= 0.8: status = "[green]✓ PASS[/green]" elif avg_score >= 0.6: status = "[yellow]~ PARTIAL[/yellow]" else: status = "[red]✗ FAIL[/red]" console.print(f"\n{status} [bold]{result['id']}:[/bold] {result['question']}") console.print(table) # Print missing items details = result["details"] if details["topics_missing"]: console.print( f" [dim]Missing topics: {', '.join(details['topics_missing'])}[/dim]" ) if details["sources_missing"]: console.print( f" [dim]Missing sources: {', '.join(details['sources_missing'])}[/dim]" ) def print_aggregate_results(aggregate: Dict[str, Any]): """Print aggregate results.""" console.print("\n") console.print( Panel.fit( "[bold cyan]Evaluation Summary[/bold cyan]", border_style="cyan", ) ) # Overall metrics table table = Table(show_header=True, header_style="bold magenta") table.add_column("Metric", style="cyan") table.add_column("Score", style="yellow", justify="right") table.add_column("Grade", style="green", justify="center") metrics = aggregate["metrics"] def get_grade(score: float) -> str: if score >= 0.9: return "[green]A[/green]" elif score >= 0.8: return "[green]B[/green]" elif score >= 0.7: return "[yellow]C[/yellow]" elif score >= 0.6: return "[yellow]D[/yellow]" else: return "[red]F[/red]" table.add_row( "Retrieval Recall", f"{metrics['retrieval_recall']:.2%}", get_grade(metrics["retrieval_recall"]), ) table.add_row( "Answer Relevance", f"{metrics['answer_relevance']:.2%}", get_grade(metrics["answer_relevance"]), ) table.add_row( "Citation Precision", f"{metrics['citation_precision']:.2%}", get_grade(metrics["citation_precision"]), ) table.add_row( "Citation Recall", f"{metrics['citation_recall']:.2%}", get_grade(metrics["citation_recall"]), ) table.add_row( "Citation F1", f"{metrics['citation_f1']:.2%}", get_grade(metrics["citation_f1"]), ) console.print(table) # Statistics console.print(f"\n[bold]Statistics:[/bold]") console.print( f" Total Questions: {aggregate['total_questions']}", f" Successful: [green]{aggregate['successful']}[/green]", f" Failed: [red]{aggregate['failed']}[/red]", f" Avg Confidence: {metrics['avg_confidence']:.2%}", f" Avg Query Time: {metrics['avg_query_time']:.2f}s", f" Citation Rate: {metrics['citation_rate']:.2%}", ) # Category breakdown if aggregate["by_category"]: console.print(f"\n[bold]Performance by Category:[/bold]") cat_table = Table(show_header=True, header_style="bold magenta") cat_table.add_column("Category", style="cyan") cat_table.add_column("Count", justify="right") cat_table.add_column("Retrieval", justify="right") cat_table.add_column("Relevance", justify="right") for category, data in sorted(aggregate["by_category"].items()): cat_table.add_row( category, str(data["count"]), f"{data['retrieval_recall']:.2%}", f"{data['answer_relevance']:.2%}", ) console.print(cat_table) # ============================================================================ # Main Evaluation # ============================================================================ def load_test_questions(questions_file: Path) -> List[Dict[str, Any]]: """Load test questions from JSON file.""" if not questions_file.exists(): console.print(f"[red]Error: Questions file not found: {questions_file}[/red]") sys.exit(1) with open(questions_file, "r") as f: questions = json.load(f) console.print(f"[green]✓[/green] Loaded {len(questions)} test questions") return questions def initialize_system() -> EyeWikiQueryEngine: """Initialize the RAG system.""" console.print("[bold]Initializing RAG system...[/bold]") # Load settings settings = Settings() # Initialize components ollama_client = OllamaClient( base_url=settings.ollama_base_url, llm_model=settings.llm_model, embedding_model=settings.embedding_model, ) qdrant_manager = QdrantStoreManager( collection_name=settings.qdrant_collection_name, qdrant_path=settings.qdrant_path, vector_size=settings.embedding_dim, ) retriever = HybridRetriever( qdrant_manager=qdrant_manager, ollama_client=ollama_client, ) reranker = CrossEncoderReranker( model_name=settings.reranker_model, ) # Load prompts prompts_dir = project_root / "prompts" system_prompt_path = prompts_dir / "system_prompt.txt" query_prompt_path = prompts_dir / "query_prompt.txt" disclaimer_path = prompts_dir / "medical_disclaimer.txt" query_engine = EyeWikiQueryEngine( retriever=retriever, reranker=reranker, llm_client=ollama_client, system_prompt_path=system_prompt_path if system_prompt_path.exists() else None, query_prompt_path=query_prompt_path if query_prompt_path.exists() else None, disclaimer_path=disclaimer_path if disclaimer_path.exists() else None, max_context_tokens=settings.max_context_tokens, retrieval_k=20, rerank_k=5, ) console.print("[green]✓[/green] System initialized\n") return query_engine def run_evaluation( questions_file: Path, output_file: Path = None, verbose: bool = False, ): """ Run evaluation on test questions. Args: questions_file: Path to test questions JSON output_file: Optional path to save results verbose: Print detailed results """ console.print( Panel.fit( "[bold blue]EyeWiki RAG Evaluation[/bold blue]", border_style="blue", ) ) # Load questions questions = load_test_questions(questions_file) # Initialize system query_engine = initialize_system() # Evaluate questions results = [] console.print("[bold]Evaluating questions...[/bold]\n") with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeElapsedColumn(), console=console, ) as progress: task = progress.add_task("Processing...", total=len(questions)) for question_data in questions: result = evaluate_question(question_data, query_engine) results.append(result) if verbose: print_question_result(result) progress.update(task, advance=1) # Calculate aggregate metrics aggregate = calculate_aggregate_metrics(results) # Print results if not verbose: console.print("\n[bold]Per-Question Results:[/bold]") for result in results: print_question_result(result) print_aggregate_results(aggregate) # Save results if output_file: output_data = { "results": results, "aggregate": aggregate, "timestamp": time.time(), } output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w") as f: json.dump(output_data, f, indent=2) console.print(f"\n[green]✓[/green] Results saved to {output_file}") def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Evaluate EyeWiki RAG system on test questions" ) parser.add_argument( "--questions", type=Path, default=project_root / "tests" / "test_questions.json", help="Path to test questions JSON file", ) parser.add_argument( "--output", type=Path, default=None, help="Path to save evaluation results (JSON)", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Print detailed results for each question", ) args = parser.parse_args() try: run_evaluation( questions_file=args.questions, output_file=args.output, verbose=args.verbose, ) except KeyboardInterrupt: console.print("\n[yellow]Evaluation interrupted by user[/yellow]") sys.exit(1) except Exception as e: console.print(f"\n[red]Error: {e}[/red]") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()