soft.engineer
init project
e71fabd
import time
import json
import pandas as pd
from typing import List, Dict, Any, Tuple, Optional
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from .retrieval import RAGManager, RetrievalResult
class RAGEvaluator:
"""Evaluation framework for RAG systems"""
def __init__(self, rag_manager: RAGManager):
self.rag_manager = rag_manager
def evaluate_single_query(self, query: str, ground_truth: List[str],
k_values: List[int] = [1, 3, 5, 10],
level1: Optional[str] = None,
level2: Optional[str] = None,
level3: Optional[str] = None,
doc_type: Optional[str] = None) -> Dict[str, Any]:
"""Evaluate retrieval for a single query"""
base_results = {}
hier_results = {}
for k in k_values:
# Get results from both pipelines
base_result, hier_result = self.rag_manager.compare_retrieval(
query, k, level1, level2, level3, doc_type
)
base_results[k] = base_result
hier_results[k] = hier_result
# Calculate metrics
metrics = {
"query": query,
"ground_truth": ground_truth,
"base_rag": self._calculate_metrics(base_results, ground_truth),
"hier_rag": self._calculate_metrics(hier_results, ground_truth),
"filters": {
"level1": level1,
"level2": level2,
"level3": level3,
"doc_type": doc_type
}
}
return metrics
def _calculate_metrics(self, results_dict: Dict[int, RetrievalResult],
ground_truth: List[str]) -> Dict[str, Any]:
"""Calculate evaluation metrics"""
metrics = {}
for k, result in results_dict.items():
retrieved_docs = [source['content'] for source in result.sources]
# Hit@k
hit_at_k = self._calculate_hit_at_k(retrieved_docs, ground_truth, k)
# MRR
mrr = self._calculate_mrr(retrieved_docs, ground_truth)
# Semantic similarity
semantic_sim = self._calculate_semantic_similarity(retrieved_docs, ground_truth)
metrics[k] = {
"hit_at_k": hit_at_k,
"mrr": mrr,
"semantic_similarity": semantic_sim,
"latency": result.latency,
"retrieved_count": len(retrieved_docs)
}
return metrics
def _calculate_hit_at_k(self, retrieved: List[str], ground_truth: List[str], k: int) -> float:
"""Calculate Hit@k metric"""
if not ground_truth:
return 0.0
# Simple exact match (can be enhanced with semantic matching)
for doc in retrieved[:k]:
for gt_doc in ground_truth:
if self._documents_match(doc, gt_doc):
return 1.0
return 0.0
def _calculate_mrr(self, retrieved: List[str], ground_truth: List[str]) -> float:
"""Calculate Mean Reciprocal Rank"""
if not ground_truth:
return 0.0
for rank, doc in enumerate(retrieved, 1):
for gt_doc in ground_truth:
if self._documents_match(doc, gt_doc):
return 1.0 / rank
return 0.0
def _calculate_semantic_similarity(self, retrieved: List[str], ground_truth: List[str]) -> float:
"""Calculate average semantic similarity"""
if not retrieved or not ground_truth:
return 0.0
# Use the same embedding model as the vector store
embeddings_retrieved = [self.rag_manager.vector_store.embed_text(doc) for doc in retrieved]
embeddings_gt = [self.rag_manager.vector_store.embed_text(doc) for doc in ground_truth]
# Calculate cosine similarity matrix
similarity_matrix = cosine_similarity(embeddings_retrieved, embeddings_gt)
# Return max similarity for each retrieved document, then average
max_similarities = np.max(similarity_matrix, axis=1)
return float(np.mean(max_similarities))
def _documents_match(self, doc1: str, doc2: str, threshold: float = 0.8) -> bool:
"""Check if two documents match (semantically or exactly)"""
# Simple implementation - can be enhanced
embedding1 = self.rag_manager.vector_store.embed_text(doc1)
embedding2 = self.rag_manager.vector_store.embed_text(doc2)
similarity = cosine_similarity([embedding1], [embedding2])[0][0]
return similarity > threshold
def batch_evaluate(self, queries: List[Dict[str, Any]],
output_file: Optional[str] = None) -> pd.DataFrame:
"""Batch evaluation on multiple queries"""
results = []
for i, query_data in enumerate(queries):
print(f"Evaluating query {i+1}/{len(queries)}: {query_data['query'][:50]}...")
metrics = self.evaluate_single_query(
query=query_data['query'],
ground_truth=query_data.get('ground_truth', []),
k_values=query_data.get('k_values', [1, 3, 5, 10]),
level1=query_data.get('level1'),
level2=query_data.get('level2'),
level3=query_data.get('level3'),
doc_type=query_data.get('doc_type')
)
results.append(metrics)
# Convert to DataFrame for analysis
df = self._results_to_dataframe(results)
# Save results if output file specified
if output_file:
# Ensure reports directory exists
import os
reports_dir = os.path.join(os.getcwd(), "reports")
os.makedirs(reports_dir, exist_ok=True)
# Save to reports directory
csv_path = os.path.join(reports_dir, output_file)
json_path = os.path.join(reports_dir, output_file.replace('.csv', '.json'))
df.to_csv(csv_path, index=False)
with open(json_path, 'w') as f:
json.dump(results, f, indent=2)
return df, results
def _results_to_dataframe(self, results: List[Dict[str, Any]]) -> pd.DataFrame:
"""Convert evaluation results to DataFrame"""
rows = []
for result in results:
query = result['query']
for k in result['base_rag'].keys():
base_metrics = result['base_rag'][k]
hier_metrics = result['hier_rag'][k]
rows.append({
'query': query,
'k': k,
'pipeline': 'base_rag',
'hit_at_k': base_metrics['hit_at_k'],
'mrr': base_metrics['mrr'],
'semantic_similarity': base_metrics['semantic_similarity'],
'latency': base_metrics['latency'],
'retrieved_count': base_metrics['retrieved_count']
})
rows.append({
'query': query,
'k': k,
'pipeline': 'hier_rag',
'hit_at_k': hier_metrics['hit_at_k'],
'mrr': hier_metrics['mrr'],
'semantic_similarity': hier_metrics['semantic_similarity'],
'latency': hier_metrics['latency'],
'retrieved_count': hier_metrics['retrieved_count']
})
return pd.DataFrame(rows)