import time import json import pandas as pd from typing import List, Dict, Any, Tuple, Optional import numpy as np from sklearn.metrics.pairwise import cosine_similarity from .retrieval import RAGManager, RetrievalResult class RAGEvaluator: """Evaluation framework for RAG systems""" def __init__(self, rag_manager: RAGManager): self.rag_manager = rag_manager def evaluate_single_query(self, query: str, ground_truth: List[str], k_values: List[int] = [1, 3, 5, 10], level1: Optional[str] = None, level2: Optional[str] = None, level3: Optional[str] = None, doc_type: Optional[str] = None) -> Dict[str, Any]: """Evaluate retrieval for a single query""" base_results = {} hier_results = {} for k in k_values: # Get results from both pipelines base_result, hier_result = self.rag_manager.compare_retrieval( query, k, level1, level2, level3, doc_type ) base_results[k] = base_result hier_results[k] = hier_result # Calculate metrics metrics = { "query": query, "ground_truth": ground_truth, "base_rag": self._calculate_metrics(base_results, ground_truth), "hier_rag": self._calculate_metrics(hier_results, ground_truth), "filters": { "level1": level1, "level2": level2, "level3": level3, "doc_type": doc_type } } return metrics def _calculate_metrics(self, results_dict: Dict[int, RetrievalResult], ground_truth: List[str]) -> Dict[str, Any]: """Calculate evaluation metrics""" metrics = {} for k, result in results_dict.items(): retrieved_docs = [source['content'] for source in result.sources] # Hit@k hit_at_k = self._calculate_hit_at_k(retrieved_docs, ground_truth, k) # MRR mrr = self._calculate_mrr(retrieved_docs, ground_truth) # Semantic similarity semantic_sim = self._calculate_semantic_similarity(retrieved_docs, ground_truth) metrics[k] = { "hit_at_k": hit_at_k, "mrr": mrr, "semantic_similarity": semantic_sim, "latency": result.latency, "retrieved_count": len(retrieved_docs) } return metrics def _calculate_hit_at_k(self, retrieved: List[str], ground_truth: List[str], k: int) -> float: """Calculate Hit@k metric""" if not ground_truth: return 0.0 # Simple exact match (can be enhanced with semantic matching) for doc in retrieved[:k]: for gt_doc in ground_truth: if self._documents_match(doc, gt_doc): return 1.0 return 0.0 def _calculate_mrr(self, retrieved: List[str], ground_truth: List[str]) -> float: """Calculate Mean Reciprocal Rank""" if not ground_truth: return 0.0 for rank, doc in enumerate(retrieved, 1): for gt_doc in ground_truth: if self._documents_match(doc, gt_doc): return 1.0 / rank return 0.0 def _calculate_semantic_similarity(self, retrieved: List[str], ground_truth: List[str]) -> float: """Calculate average semantic similarity""" if not retrieved or not ground_truth: return 0.0 # Use the same embedding model as the vector store embeddings_retrieved = [self.rag_manager.vector_store.embed_text(doc) for doc in retrieved] embeddings_gt = [self.rag_manager.vector_store.embed_text(doc) for doc in ground_truth] # Calculate cosine similarity matrix similarity_matrix = cosine_similarity(embeddings_retrieved, embeddings_gt) # Return max similarity for each retrieved document, then average max_similarities = np.max(similarity_matrix, axis=1) return float(np.mean(max_similarities)) def _documents_match(self, doc1: str, doc2: str, threshold: float = 0.8) -> bool: """Check if two documents match (semantically or exactly)""" # Simple implementation - can be enhanced embedding1 = self.rag_manager.vector_store.embed_text(doc1) embedding2 = self.rag_manager.vector_store.embed_text(doc2) similarity = cosine_similarity([embedding1], [embedding2])[0][0] return similarity > threshold def batch_evaluate(self, queries: List[Dict[str, Any]], output_file: Optional[str] = None) -> pd.DataFrame: """Batch evaluation on multiple queries""" results = [] for i, query_data in enumerate(queries): print(f"Evaluating query {i+1}/{len(queries)}: {query_data['query'][:50]}...") metrics = self.evaluate_single_query( query=query_data['query'], ground_truth=query_data.get('ground_truth', []), k_values=query_data.get('k_values', [1, 3, 5, 10]), level1=query_data.get('level1'), level2=query_data.get('level2'), level3=query_data.get('level3'), doc_type=query_data.get('doc_type') ) results.append(metrics) # Convert to DataFrame for analysis df = self._results_to_dataframe(results) # Save results if output file specified if output_file: # Ensure reports directory exists import os reports_dir = os.path.join(os.getcwd(), "reports") os.makedirs(reports_dir, exist_ok=True) # Save to reports directory csv_path = os.path.join(reports_dir, output_file) json_path = os.path.join(reports_dir, output_file.replace('.csv', '.json')) df.to_csv(csv_path, index=False) with open(json_path, 'w') as f: json.dump(results, f, indent=2) return df, results def _results_to_dataframe(self, results: List[Dict[str, Any]]) -> pd.DataFrame: """Convert evaluation results to DataFrame""" rows = [] for result in results: query = result['query'] for k in result['base_rag'].keys(): base_metrics = result['base_rag'][k] hier_metrics = result['hier_rag'][k] rows.append({ 'query': query, 'k': k, 'pipeline': 'base_rag', 'hit_at_k': base_metrics['hit_at_k'], 'mrr': base_metrics['mrr'], 'semantic_similarity': base_metrics['semantic_similarity'], 'latency': base_metrics['latency'], 'retrieved_count': base_metrics['retrieved_count'] }) rows.append({ 'query': query, 'k': k, 'pipeline': 'hier_rag', 'hit_at_k': hier_metrics['hit_at_k'], 'mrr': hier_metrics['mrr'], 'semantic_similarity': hier_metrics['semantic_similarity'], 'latency': hier_metrics['latency'], 'retrieved_count': hier_metrics['retrieved_count'] }) return pd.DataFrame(rows)