"""Evaluation metrics for RAG systems.""" import time from typing import List, Dict, Any, Tuple, Optional import numpy as np from sentence_transformers import SentenceTransformer, util class RAGEvaluator: """Evaluate RAG system performance.""" def __init__(self, embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): """ Initialize evaluator. Args: embedding_model_name: Model for semantic similarity """ self.embedding_model = SentenceTransformer(embedding_model_name) def hit_at_k( self, retrieved_ids: List[str], relevant_ids: List[str], k: int = 5 ) -> float: """ Calculate Hit@k metric. Args: retrieved_ids: List of retrieved document IDs relevant_ids: List of relevant document IDs k: Number of top results to consider Returns: Hit@k score (1 if any relevant doc in top-k, else 0) """ top_k = retrieved_ids[:k] return 1.0 if any(rid in relevant_ids for rid in top_k) else 0.0 def precision_at_k( self, retrieved_ids: List[str], relevant_ids: List[str], k: int = 5 ) -> float: """ Calculate Precision@k. Args: retrieved_ids: List of retrieved document IDs relevant_ids: List of relevant document IDs k: Number of top results to consider Returns: Precision@k score """ top_k = retrieved_ids[:k] if not top_k: return 0.0 relevant_in_top_k = sum(1 for rid in top_k if rid in relevant_ids) return relevant_in_top_k / len(top_k) def recall_at_k( self, retrieved_ids: List[str], relevant_ids: List[str], k: int = 5 ) -> float: """ Calculate Recall@k. Args: retrieved_ids: List of retrieved document IDs relevant_ids: List of relevant document IDs k: Number of top results to consider Returns: Recall@k score """ if not relevant_ids: return 0.0 top_k = retrieved_ids[:k] relevant_in_top_k = sum(1 for rid in top_k if rid in relevant_ids) return relevant_in_top_k / len(relevant_ids) def mrr( self, retrieved_ids: List[str], relevant_ids: List[str] ) -> float: """ Calculate Mean Reciprocal Rank. Args: retrieved_ids: List of retrieved document IDs relevant_ids: List of relevant document IDs Returns: MRR score """ for i, rid in enumerate(retrieved_ids, 1): if rid in relevant_ids: return 1.0 / i return 0.0 def semantic_similarity( self, answer: str, reference: str ) -> float: """ Calculate semantic similarity between answer and reference. Args: answer: Generated answer reference: Reference answer Returns: Cosine similarity score """ embeddings = self.embedding_model.encode([answer, reference]) similarity = util.cos_sim(embeddings[0], embeddings[1]) return float(similarity[0][0]) def evaluate_retrieval( self, retrieved_results: List[Dict[str, Any]], relevant_ids: List[str], k_values: List[int] = [1, 3, 5, 10] ) -> Dict[str, Any]: """ Comprehensive retrieval evaluation. Args: retrieved_results: List of retrieval results relevant_ids: List of relevant document IDs k_values: List of k values for Hit@k, Precision@k, Recall@k Returns: Dictionary with all metrics """ retrieved_ids = [r["id"] for r in retrieved_results] metrics = { "mrr": self.mrr(retrieved_ids, relevant_ids) } for k in k_values: metrics[f"hit@{k}"] = self.hit_at_k(retrieved_ids, relevant_ids, k) metrics[f"precision@{k}"] = self.precision_at_k(retrieved_ids, relevant_ids, k) metrics[f"recall@{k}"] = self.recall_at_k(retrieved_ids, relevant_ids, k) return metrics def evaluate_generation( self, generated_answer: str, reference_answer: str ) -> Dict[str, float]: """ Evaluate generated answer quality. Args: generated_answer: Generated answer reference_answer: Reference answer Returns: Dictionary with generation metrics """ return { "semantic_similarity": self.semantic_similarity(generated_answer, reference_answer) } def evaluate_rag_pipeline( self, rag_result: Dict[str, Any], relevant_ids: List[str], reference_answer: Optional[str] = None, k_values: List[int] = [1, 3, 5] ) -> Dict[str, Any]: """ Evaluate complete RAG pipeline. Args: rag_result: Result from RAG query relevant_ids: List of relevant document IDs reference_answer: Optional reference answer k_values: List of k values for metrics Returns: Dictionary with all evaluation metrics """ metrics = { "pipeline": rag_result.get("pipeline", "Unknown"), "retrieval_time": rag_result.get("retrieval_time", 0), "generation_time": rag_result.get("generation_time", 0), "total_time": rag_result.get("total_time", 0) } # Retrieval metrics retrieval_metrics = self.evaluate_retrieval( rag_result["contexts"], relevant_ids, k_values ) metrics.update(retrieval_metrics) # Generation metrics (if reference provided) if reference_answer: generation_metrics = self.evaluate_generation( rag_result["answer"], reference_answer ) metrics.update(generation_metrics) return metrics def compare_pipelines( self, base_result: Dict[str, Any], hier_result: Dict[str, Any], relevant_ids: List[str], reference_answer: Optional[str] = None, k_values: List[int] = [1, 3, 5] ) -> Dict[str, Any]: """ Compare Base-RAG and Hier-RAG results. Args: base_result: Result from Base-RAG hier_result: Result from Hier-RAG relevant_ids: List of relevant document IDs reference_answer: Optional reference answer k_values: List of k values for metrics Returns: Dictionary with comparison metrics """ base_metrics = self.evaluate_rag_pipeline( base_result, relevant_ids, reference_answer, k_values ) hier_metrics = self.evaluate_rag_pipeline( hier_result, relevant_ids, reference_answer, k_values ) # Calculate improvements comparison = { "base_rag": base_metrics, "hier_rag": hier_metrics, "improvements": {} } # Speed improvements if base_metrics["total_time"] > 0: comparison["improvements"]["speedup"] = base_metrics["total_time"] / hier_metrics["total_time"] # Accuracy improvements for k in k_values: hit_key = f"hit@{k}" if hit_key in base_metrics and hit_key in hier_metrics: comparison["improvements"][f"{hit_key}_delta"] = hier_metrics[hit_key] - base_metrics[hit_key] if "mrr" in base_metrics and "mrr" in hier_metrics: comparison["improvements"]["mrr_delta"] = hier_metrics["mrr"] - base_metrics["mrr"] if "semantic_similarity" in base_metrics and "semantic_similarity" in hier_metrics: comparison["improvements"]["similarity_delta"] = ( hier_metrics["semantic_similarity"] - base_metrics["semantic_similarity"] ) return comparison class BenchmarkDataset: """Generate or load benchmark datasets for evaluation.""" def __init__(self): """Initialize benchmark dataset.""" self.queries = [] self.ground_truth = {} def add_query( self, query: str, relevant_ids: List[str], reference_answer: Optional[str] = None ) -> None: """ Add a query to the benchmark. Args: query: Query text relevant_ids: List of relevant document IDs reference_answer: Optional reference answer """ self.queries.append(query) self.ground_truth[query] = { "relevant_ids": relevant_ids, "reference_answer": reference_answer } def get_sample_hospital_queries(self) -> List[Dict[str, Any]]: """ Get sample queries for hospital domain. Returns: List of query dictionaries """ return [ { "query": "What are the patient admission procedures?", "domain": "Clinical Care", "expected_doc_type": "protocol" }, { "query": "What are the infection control policies?", "domain": "Quality & Safety", "expected_doc_type": "policy" }, { "query": "How should medication errors be reported?", "domain": "Quality & Safety", "expected_doc_type": "policy" }, { "query": "What training is required for new nurses?", "domain": "Education & Training", "expected_doc_type": "manual" }, { "query": "What are the emergency response procedures?", "domain": "Clinical Care", "expected_doc_type": "protocol" } ] def get_sample_bank_queries(self) -> List[Dict[str, Any]]: """ Get sample queries for banking domain. Returns: List of query dictionaries """ return [ { "query": "What are the KYC requirements for new accounts?", "domain": "Compliance & Legal", "expected_doc_type": "policy" }, { "query": "How do I process a personal loan application?", "domain": "Retail Banking", "expected_doc_type": "manual" }, { "query": "What is the credit risk assessment procedure?", "domain": "Risk Management", "expected_doc_type": "guideline" }, { "query": "What are the fraud prevention measures?", "domain": "Risk Management", "expected_doc_type": "policy" }, { "query": "How should suspicious transactions be reported?", "domain": "Compliance & Legal", "expected_doc_type": "policy" } ] def get_sample_fluid_simulation_queries(self) -> List[Dict[str, Any]]: """ Get sample queries for fluid simulation domain. Returns: List of query dictionaries """ return [ { "query": "How does the SIMPLE algorithm work?", "domain": "Numerical Methods", "expected_doc_type": "paper" }, { "query": "What turbulence models are available?", "domain": "Physical Models", "expected_doc_type": "manual" }, { "query": "How do I set up a cavity flow benchmark?", "domain": "Validation & Verification", "expected_doc_type": "tutorial" }, { "query": "What mesh generation techniques are recommended?", "domain": "Numerical Methods", "expected_doc_type": "manual" }, { "query": "How do I enable parallel computing for simulations?", "domain": "Software & Tools", "expected_doc_type": "manual" } ] def load_from_file(self, filepath: str) -> None: """ Load benchmark dataset from JSON file. Args: filepath: Path to JSON file """ import json with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) self.queries = data.get("queries", []) self.ground_truth = data.get("ground_truth", {}) def save_to_file(self, filepath: str) -> None: """ Save benchmark dataset to JSON file. Args: filepath: Path to output JSON file """ import json from pathlib import Path Path(filepath).parent.mkdir(parents=True, exist_ok=True) data = { "queries": self.queries, "ground_truth": self.ground_truth } with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False)