Spaces:
Sleeping
Sleeping
File size: 7,889 Bytes
e71fabd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | import time
import json
import pandas as pd
from typing import List, Dict, Any, Tuple, Optional
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from .retrieval import RAGManager, RetrievalResult
class RAGEvaluator:
"""Evaluation framework for RAG systems"""
def __init__(self, rag_manager: RAGManager):
self.rag_manager = rag_manager
def evaluate_single_query(self, query: str, ground_truth: List[str],
k_values: List[int] = [1, 3, 5, 10],
level1: Optional[str] = None,
level2: Optional[str] = None,
level3: Optional[str] = None,
doc_type: Optional[str] = None) -> Dict[str, Any]:
"""Evaluate retrieval for a single query"""
base_results = {}
hier_results = {}
for k in k_values:
# Get results from both pipelines
base_result, hier_result = self.rag_manager.compare_retrieval(
query, k, level1, level2, level3, doc_type
)
base_results[k] = base_result
hier_results[k] = hier_result
# Calculate metrics
metrics = {
"query": query,
"ground_truth": ground_truth,
"base_rag": self._calculate_metrics(base_results, ground_truth),
"hier_rag": self._calculate_metrics(hier_results, ground_truth),
"filters": {
"level1": level1,
"level2": level2,
"level3": level3,
"doc_type": doc_type
}
}
return metrics
def _calculate_metrics(self, results_dict: Dict[int, RetrievalResult],
ground_truth: List[str]) -> Dict[str, Any]:
"""Calculate evaluation metrics"""
metrics = {}
for k, result in results_dict.items():
retrieved_docs = [source['content'] for source in result.sources]
# Hit@k
hit_at_k = self._calculate_hit_at_k(retrieved_docs, ground_truth, k)
# MRR
mrr = self._calculate_mrr(retrieved_docs, ground_truth)
# Semantic similarity
semantic_sim = self._calculate_semantic_similarity(retrieved_docs, ground_truth)
metrics[k] = {
"hit_at_k": hit_at_k,
"mrr": mrr,
"semantic_similarity": semantic_sim,
"latency": result.latency,
"retrieved_count": len(retrieved_docs)
}
return metrics
def _calculate_hit_at_k(self, retrieved: List[str], ground_truth: List[str], k: int) -> float:
"""Calculate Hit@k metric"""
if not ground_truth:
return 0.0
# Simple exact match (can be enhanced with semantic matching)
for doc in retrieved[:k]:
for gt_doc in ground_truth:
if self._documents_match(doc, gt_doc):
return 1.0
return 0.0
def _calculate_mrr(self, retrieved: List[str], ground_truth: List[str]) -> float:
"""Calculate Mean Reciprocal Rank"""
if not ground_truth:
return 0.0
for rank, doc in enumerate(retrieved, 1):
for gt_doc in ground_truth:
if self._documents_match(doc, gt_doc):
return 1.0 / rank
return 0.0
def _calculate_semantic_similarity(self, retrieved: List[str], ground_truth: List[str]) -> float:
"""Calculate average semantic similarity"""
if not retrieved or not ground_truth:
return 0.0
# Use the same embedding model as the vector store
embeddings_retrieved = [self.rag_manager.vector_store.embed_text(doc) for doc in retrieved]
embeddings_gt = [self.rag_manager.vector_store.embed_text(doc) for doc in ground_truth]
# Calculate cosine similarity matrix
similarity_matrix = cosine_similarity(embeddings_retrieved, embeddings_gt)
# Return max similarity for each retrieved document, then average
max_similarities = np.max(similarity_matrix, axis=1)
return float(np.mean(max_similarities))
def _documents_match(self, doc1: str, doc2: str, threshold: float = 0.8) -> bool:
"""Check if two documents match (semantically or exactly)"""
# Simple implementation - can be enhanced
embedding1 = self.rag_manager.vector_store.embed_text(doc1)
embedding2 = self.rag_manager.vector_store.embed_text(doc2)
similarity = cosine_similarity([embedding1], [embedding2])[0][0]
return similarity > threshold
def batch_evaluate(self, queries: List[Dict[str, Any]],
output_file: Optional[str] = None) -> pd.DataFrame:
"""Batch evaluation on multiple queries"""
results = []
for i, query_data in enumerate(queries):
print(f"Evaluating query {i+1}/{len(queries)}: {query_data['query'][:50]}...")
metrics = self.evaluate_single_query(
query=query_data['query'],
ground_truth=query_data.get('ground_truth', []),
k_values=query_data.get('k_values', [1, 3, 5, 10]),
level1=query_data.get('level1'),
level2=query_data.get('level2'),
level3=query_data.get('level3'),
doc_type=query_data.get('doc_type')
)
results.append(metrics)
# Convert to DataFrame for analysis
df = self._results_to_dataframe(results)
# Save results if output file specified
if output_file:
# Ensure reports directory exists
import os
reports_dir = os.path.join(os.getcwd(), "reports")
os.makedirs(reports_dir, exist_ok=True)
# Save to reports directory
csv_path = os.path.join(reports_dir, output_file)
json_path = os.path.join(reports_dir, output_file.replace('.csv', '.json'))
df.to_csv(csv_path, index=False)
with open(json_path, 'w') as f:
json.dump(results, f, indent=2)
return df, results
def _results_to_dataframe(self, results: List[Dict[str, Any]]) -> pd.DataFrame:
"""Convert evaluation results to DataFrame"""
rows = []
for result in results:
query = result['query']
for k in result['base_rag'].keys():
base_metrics = result['base_rag'][k]
hier_metrics = result['hier_rag'][k]
rows.append({
'query': query,
'k': k,
'pipeline': 'base_rag',
'hit_at_k': base_metrics['hit_at_k'],
'mrr': base_metrics['mrr'],
'semantic_similarity': base_metrics['semantic_similarity'],
'latency': base_metrics['latency'],
'retrieved_count': base_metrics['retrieved_count']
})
rows.append({
'query': query,
'k': k,
'pipeline': 'hier_rag',
'hit_at_k': hier_metrics['hit_at_k'],
'mrr': hier_metrics['mrr'],
'semantic_similarity': hier_metrics['semantic_similarity'],
'latency': hier_metrics['latency'],
'retrieved_count': hier_metrics['retrieved_count']
})
return pd.DataFrame(rows) |