Spaces:
Sleeping
Sleeping
| import time | |
| import json | |
| import pandas as pd | |
| from typing import List, Dict, Any, Tuple, Optional | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from .retrieval import RAGManager, RetrievalResult | |
| class RAGEvaluator: | |
| """Evaluation framework for RAG systems""" | |
| def __init__(self, rag_manager: RAGManager): | |
| self.rag_manager = rag_manager | |
| def evaluate_single_query(self, query: str, ground_truth: List[str], | |
| k_values: List[int] = [1, 3, 5, 10], | |
| level1: Optional[str] = None, | |
| level2: Optional[str] = None, | |
| level3: Optional[str] = None, | |
| doc_type: Optional[str] = None) -> Dict[str, Any]: | |
| """Evaluate retrieval for a single query""" | |
| base_results = {} | |
| hier_results = {} | |
| for k in k_values: | |
| # Get results from both pipelines | |
| base_result, hier_result = self.rag_manager.compare_retrieval( | |
| query, k, level1, level2, level3, doc_type | |
| ) | |
| base_results[k] = base_result | |
| hier_results[k] = hier_result | |
| # Calculate metrics | |
| metrics = { | |
| "query": query, | |
| "ground_truth": ground_truth, | |
| "base_rag": self._calculate_metrics(base_results, ground_truth), | |
| "hier_rag": self._calculate_metrics(hier_results, ground_truth), | |
| "filters": { | |
| "level1": level1, | |
| "level2": level2, | |
| "level3": level3, | |
| "doc_type": doc_type | |
| } | |
| } | |
| return metrics | |
| def _calculate_metrics(self, results_dict: Dict[int, RetrievalResult], | |
| ground_truth: List[str]) -> Dict[str, Any]: | |
| """Calculate evaluation metrics""" | |
| metrics = {} | |
| for k, result in results_dict.items(): | |
| retrieved_docs = [source['content'] for source in result.sources] | |
| # Hit@k | |
| hit_at_k = self._calculate_hit_at_k(retrieved_docs, ground_truth, k) | |
| # MRR | |
| mrr = self._calculate_mrr(retrieved_docs, ground_truth) | |
| # Semantic similarity | |
| semantic_sim = self._calculate_semantic_similarity(retrieved_docs, ground_truth) | |
| metrics[k] = { | |
| "hit_at_k": hit_at_k, | |
| "mrr": mrr, | |
| "semantic_similarity": semantic_sim, | |
| "latency": result.latency, | |
| "retrieved_count": len(retrieved_docs) | |
| } | |
| return metrics | |
| def _calculate_hit_at_k(self, retrieved: List[str], ground_truth: List[str], k: int) -> float: | |
| """Calculate Hit@k metric""" | |
| if not ground_truth: | |
| return 0.0 | |
| # Simple exact match (can be enhanced with semantic matching) | |
| for doc in retrieved[:k]: | |
| for gt_doc in ground_truth: | |
| if self._documents_match(doc, gt_doc): | |
| return 1.0 | |
| return 0.0 | |
| def _calculate_mrr(self, retrieved: List[str], ground_truth: List[str]) -> float: | |
| """Calculate Mean Reciprocal Rank""" | |
| if not ground_truth: | |
| return 0.0 | |
| for rank, doc in enumerate(retrieved, 1): | |
| for gt_doc in ground_truth: | |
| if self._documents_match(doc, gt_doc): | |
| return 1.0 / rank | |
| return 0.0 | |
| def _calculate_semantic_similarity(self, retrieved: List[str], ground_truth: List[str]) -> float: | |
| """Calculate average semantic similarity""" | |
| if not retrieved or not ground_truth: | |
| return 0.0 | |
| # Use the same embedding model as the vector store | |
| embeddings_retrieved = [self.rag_manager.vector_store.embed_text(doc) for doc in retrieved] | |
| embeddings_gt = [self.rag_manager.vector_store.embed_text(doc) for doc in ground_truth] | |
| # Calculate cosine similarity matrix | |
| similarity_matrix = cosine_similarity(embeddings_retrieved, embeddings_gt) | |
| # Return max similarity for each retrieved document, then average | |
| max_similarities = np.max(similarity_matrix, axis=1) | |
| return float(np.mean(max_similarities)) | |
| def _documents_match(self, doc1: str, doc2: str, threshold: float = 0.8) -> bool: | |
| """Check if two documents match (semantically or exactly)""" | |
| # Simple implementation - can be enhanced | |
| embedding1 = self.rag_manager.vector_store.embed_text(doc1) | |
| embedding2 = self.rag_manager.vector_store.embed_text(doc2) | |
| similarity = cosine_similarity([embedding1], [embedding2])[0][0] | |
| return similarity > threshold | |
| def batch_evaluate(self, queries: List[Dict[str, Any]], | |
| output_file: Optional[str] = None) -> pd.DataFrame: | |
| """Batch evaluation on multiple queries""" | |
| results = [] | |
| for i, query_data in enumerate(queries): | |
| print(f"Evaluating query {i+1}/{len(queries)}: {query_data['query'][:50]}...") | |
| metrics = self.evaluate_single_query( | |
| query=query_data['query'], | |
| ground_truth=query_data.get('ground_truth', []), | |
| k_values=query_data.get('k_values', [1, 3, 5, 10]), | |
| level1=query_data.get('level1'), | |
| level2=query_data.get('level2'), | |
| level3=query_data.get('level3'), | |
| doc_type=query_data.get('doc_type') | |
| ) | |
| results.append(metrics) | |
| # Convert to DataFrame for analysis | |
| df = self._results_to_dataframe(results) | |
| # Save results if output file specified | |
| if output_file: | |
| # Ensure reports directory exists | |
| import os | |
| reports_dir = os.path.join(os.getcwd(), "reports") | |
| os.makedirs(reports_dir, exist_ok=True) | |
| # Save to reports directory | |
| csv_path = os.path.join(reports_dir, output_file) | |
| json_path = os.path.join(reports_dir, output_file.replace('.csv', '.json')) | |
| df.to_csv(csv_path, index=False) | |
| with open(json_path, 'w') as f: | |
| json.dump(results, f, indent=2) | |
| return df, results | |
| def _results_to_dataframe(self, results: List[Dict[str, Any]]) -> pd.DataFrame: | |
| """Convert evaluation results to DataFrame""" | |
| rows = [] | |
| for result in results: | |
| query = result['query'] | |
| for k in result['base_rag'].keys(): | |
| base_metrics = result['base_rag'][k] | |
| hier_metrics = result['hier_rag'][k] | |
| rows.append({ | |
| 'query': query, | |
| 'k': k, | |
| 'pipeline': 'base_rag', | |
| 'hit_at_k': base_metrics['hit_at_k'], | |
| 'mrr': base_metrics['mrr'], | |
| 'semantic_similarity': base_metrics['semantic_similarity'], | |
| 'latency': base_metrics['latency'], | |
| 'retrieved_count': base_metrics['retrieved_count'] | |
| }) | |
| rows.append({ | |
| 'query': query, | |
| 'k': k, | |
| 'pipeline': 'hier_rag', | |
| 'hit_at_k': hier_metrics['hit_at_k'], | |
| 'mrr': hier_metrics['mrr'], | |
| 'semantic_similarity': hier_metrics['semantic_similarity'], | |
| 'latency': hier_metrics['latency'], | |
| 'retrieved_count': hier_metrics['retrieved_count'] | |
| }) | |
| return pd.DataFrame(rows) |