File size: 18,454 Bytes
4cfbb41 9f772a8 4cfbb41 0c20b58 4cfbb41 0f1a143 4cfbb41 0c20b58 4cfbb41 cd57d73 4cfbb41 9f772a8 4cfbb41 9f772a8 4cfbb41 9f772a8 4cfbb41 9f772a8 4cfbb41 9f772a8 4cfbb41 0f1a143 0c20b58 0f1a143 4cfbb41 cd57d73 9f772a8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 |
"""
Retrieval Evaluation Script for Base RAG vs Hierarchical RAG
Generates synthetic test data, evaluates retrieval performance, and produces reports.
"""
import json
import csv
import time
import uuid
from tqdm import tqdm
from random import shuffle
from pathlib import Path
from typing import List, Dict
from datetime import datetime
from dataclasses import dataclass, asdict
import numpy as np
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from dotenv import load_dotenv, find_dotenv
from .index import MetaData, get_vectorstore
from .retrieval import retrieval, generate
from .ingest import ingest_documents, get_chunks
from .synthetic_data import SYNTHETIC_DOCUMENTS, EVAL_QUERIES, EvalQuery
find_dotenv()
load_dotenv()
# Embedding model for semantic similarity
emb_model = OpenAIEmbeddings(model="text-embedding-3-small", dimensions=1536)
@dataclass
class EvalResult:
"""Evaluation result for a single query"""
query_id: str
collection: str
query: str
rag_type: str # "base" or "hierarchical"
# Retrieval metrics
retrieved_docs: int
hit_at_1: bool
hit_at_3: bool
hit_at_5: bool
mrr: float
avg_similarity_score: float
# Latency
retrieval_latency_ms: float
generation_latency_ms: float
total_latency_ms: float
# Semantic similarity
avg_semantic_similarity: float
# Generated answer
generated_answer: str
# Metadata
filters_used: Dict
timestamp: str
# ============================================================================
# EVALUATION FUNCTIONS
# ============================================================================
def calculate_semantic_similarity(query: str, documents: List[Document]) -> float:
"""Calculate average semantic similarity between query and retrieved documents"""
if not documents:
return 0.0
query_embedding = emb_model.embed_query(query)
doc_embeddings = emb_model.embed_documents([doc.page_content for doc in documents])
similarities = []
for doc_emb in doc_embeddings:
# Cosine similarity
similarity = np.dot(query_embedding, doc_emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(doc_emb)
)
similarities.append(similarity)
return float(np.mean(similarities))
def calculate_mrr(ground_truth: List[str], retrieved_docs: List[Document]) -> float:
"""Calculate Mean Reciprocal Rank"""
for rank, doc in enumerate(retrieved_docs, start=1):
# Check if any ground truth snippet appears in the document
for truth in ground_truth:
if truth.lower() in doc.page_content.lower():
return 1.0 / rank
return 0.0
def calculate_hit_at_k(ground_truth: List[str], retrieved_docs: List[Document], k: int) -> bool:
"""Check if any ground truth appears in top-k results"""
for doc in retrieved_docs[:k]:
for truth in ground_truth:
if truth.lower() in doc.page_content.lower():
return True
return False
def evaluate_single_query(
eval_query: EvalQuery,
rag_type: str = "base"
) -> EvalResult:
"""Evaluate a single query with either base or hierarchical RAG"""
# Set up filters based on RAG type
if rag_type == "base":
filters = MetaData(language=eval_query.language)
filters_dict = {"language": eval_query.language}
else: # hierarchical
filters = MetaData(
language=eval_query.language,
domain=eval_query.domain,
section=eval_query.section,
topic=eval_query.topic,
doc_type=eval_query.doc_type
)
filters_dict = {
"language": eval_query.language,
"domain": eval_query.domain,
"section": eval_query.section,
"topic": eval_query.topic,
"doc_type": eval_query.doc_type
}
# Retrieval
ret_start = time.time()
vectorstore = get_vectorstore("eval_"+eval_query.collection)
docs = retrieval(eval_query.query, filters, vectorstore)
ret_end = time.time()
ret_latency = (ret_end - ret_start) * 1000 # Convert to ms
# Generation
gen_start = time.time()
answer = generate(eval_query.query, docs) if docs else "No relevant documents found."
gen_end = time.time()
gen_latency = (gen_end - gen_start) * 1000 # Convert to ms
total_latency = ret_latency + gen_latency
# Calculate metrics
hit_1 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 1)
hit_3 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 3)
hit_5 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 5)
mrr = calculate_mrr(eval_query.ground_truth_chunks, docs)
avg_sim_score = np.mean([doc.metadata.get('similarity_score', 0) for doc in docs]) if docs else 0.0
semantic_sim = calculate_semantic_similarity(eval_query.query, docs)
query_id = f"{eval_query.collection}_{rag_type}_{hash(eval_query.query) % 10000}"
return EvalResult(
query_id=query_id,
collection=eval_query.collection,
query=eval_query.query,
rag_type=rag_type,
retrieved_docs=len(docs),
hit_at_1=hit_1,
hit_at_3=hit_3,
hit_at_5=hit_5,
mrr=mrr,
avg_similarity_score=float(avg_sim_score),
retrieval_latency_ms=ret_latency,
generation_latency_ms=gen_latency,
total_latency_ms=total_latency,
avg_semantic_similarity=semantic_sim,
generated_answer=answer,
filters_used=filters_dict,
timestamp=datetime.now().isoformat()
)
def run_full_evaluation(
collections: List[str] = None,
output_dir: str = "reports"
) -> Dict[str, List[EvalResult]]:
"""Run complete evaluation on all queries"""
if collections is None:
collections = ["hospital", "bank", "fluid_simulation"]
Path(output_dir).mkdir(exist_ok=True)
all_results = {
"base": [],
"hierarchical": []
}
# Filter queries by requested collections
queries_to_eval = [q for q in EVAL_QUERIES if q.collection in collections]
shuffle(queries_to_eval)
print(f"\n{'='*70}")
print(f"Starting Evaluation: {len(queries_to_eval)} queries across {len(collections)} collections")
print(f"{'='*70}\n")
for eval_query in tqdm(queries_to_eval, desc="Running evaluation queries"):
# Evaluate with base RAG
base_result = evaluate_single_query(eval_query, "base")
all_results["base"].append(base_result)
# Evaluate with hierarchical RAG
hier_result = evaluate_single_query(eval_query, "hierarchical")
all_results["hierarchical"].append(hier_result)
return all_results
def save_results(results: Dict[str, List[EvalResult]], output_dir: str = "reports"):
"""Save evaluation results to CSV and JSON"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
Path(output_dir).mkdir(exist_ok=True)
# Combine all results
all_results = results["base"] + results["hierarchical"]
# Save as CSV
csv_path = Path(output_dir) / f"eval_results_{timestamp}.csv"
with open(csv_path, 'w', newline='') as f:
if all_results:
fieldnames = list(asdict(all_results[0]).keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for result in all_results:
row = asdict(result)
# Convert complex types to strings
row['filters_used'] = json.dumps(row['filters_used'])
writer.writerow(row)
print(f"β Saved CSV report: {csv_path}")
# Save as JSON
json_path = Path(output_dir) / f"eval_results_{timestamp}.json"
json_data = {
"metadata": {
"timestamp": timestamp,
"total_queries": len(all_results),
"collections_tested": list(set(r.collection for r in all_results))
},
"results": {
"base": [asdict(r) for r in results["base"]],
"hierarchical": [asdict(r) for r in results["hierarchical"]]
}
}
with open(json_path, 'w') as f:
json.dump(json_data, f, indent=2)
print(f"β Saved JSON report: {json_path}")
return csv_path, json_path
def generate_summary_report(results: Dict[str, List[EvalResult]], output_dir: str = "reports"):
"""Generate markdown summary report with comparative analysis"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
md_path = Path(output_dir) / f"eval_summary_{timestamp}.md"
base_results = results["base"]
hier_results = results["hierarchical"]
# Calculate aggregate metrics
def calc_metrics(result_list):
return {
"total_queries": len(result_list),
"avg_hit_at_1": np.mean([r.hit_at_1 for r in result_list]) * 100,
"avg_hit_at_3": np.mean([r.hit_at_3 for r in result_list]) * 100,
"avg_hit_at_5": np.mean([r.hit_at_5 for r in result_list]) * 100,
"avg_mrr": np.mean([r.mrr for r in result_list]),
"avg_similarity": np.mean([r.avg_similarity_score for r in result_list]),
"avg_semantic_sim": np.mean([r.avg_semantic_similarity for r in result_list]),
"avg_retrieval_latency": np.mean([r.retrieval_latency_ms for r in result_list]),
"avg_generation_latency": np.mean([r.generation_latency_ms for r in result_list]),
"avg_total_latency": np.mean([r.total_latency_ms for r in result_list]),
}
base_metrics = calc_metrics(base_results)
hier_metrics = calc_metrics(hier_results)
# Calculate per-collection metrics
collections = list(set(r.collection for r in base_results))
collection_metrics = {}
for collection in collections:
collection_metrics[collection] = {
"base": calc_metrics([r for r in base_results if r.collection == collection]),
"hierarchical": calc_metrics([r for r in hier_results if r.collection == collection])
}
# Generate markdown report
with open(md_path, 'w') as f:
f.write("# RAG Retrieval Evaluation Report\n\n")
f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("---\n\n")
# Executive Summary
f.write("## Executive Summary\n\n")
f.write(f"This report compares **Base RAG** (language-only filtering) against ")
f.write(f"**Hierarchical RAG** (domain/section/topic/doc_type filtering) across ")
f.write(f"{base_metrics['total_queries']} evaluation queries.\n\n")
# Overall Performance Comparison
f.write("## Overall Performance Comparison\n\n")
f.write("| Metric | Base RAG | Hierarchical RAG | Ξ (Improvement) |\n")
f.write("|--------|----------|------------------|------------------|\n")
metrics_to_show = [
("Hit@1", "avg_hit_at_1", "%", True),
("Hit@3", "avg_hit_at_3", "%", True),
("Hit@5", "avg_hit_at_5", "%", True),
("MRR", "avg_mrr", "", True),
("Avg Similarity Score", "avg_similarity", "", True),
("Semantic Similarity", "avg_semantic_sim", "", True),
("Retrieval Latency", "avg_retrieval_latency", "ms", False),
("Generation Latency", "avg_generation_latency", "ms", False),
("Total Latency", "avg_total_latency", "ms", False),
]
for label, key, unit, higher_better in metrics_to_show:
base_val = base_metrics[key]
hier_val = hier_metrics[key]
if higher_better:
delta = hier_val - base_val
delta_pct = (delta / base_val * 100) if base_val > 0 else 0
delta_str = f"+{delta:.2f}{unit} ({delta_pct:+.1f}%)" if delta >= 0 else f"{delta:.2f}{unit} ({delta_pct:.1f}%)"
else:
delta = base_val - hier_val
delta_pct = (delta / base_val * 100) if base_val > 0 else 0
delta_str = f"-{delta:.2f}{unit} ({delta_pct:.1f}% faster)" if delta >= 0 else f"+{abs(delta):.2f}{unit} ({abs(delta_pct):.1f}% slower)"
f.write(f"| {label} | {base_val:.2f}{unit} | {hier_val:.2f}{unit} | {delta_str} |\n")
f.write("\n")
# Per-Collection Analysis
f.write("## Per-Collection Analysis\n\n")
for collection in sorted(collections):
f.write(f"### {collection.replace('_', ' ').title()}\n\n")
base_coll = collection_metrics[collection]["base"]
hier_coll = collection_metrics[collection]["hierarchical"]
f.write("| Metric | Base RAG | Hierarchical RAG |\n")
f.write("|--------|----------|------------------|\n")
f.write(f"| Hit@1 | {base_coll['avg_hit_at_1']:.1f}% | {hier_coll['avg_hit_at_1']:.1f}% |\n")
f.write(f"| Hit@5 | {base_coll['avg_hit_at_5']:.1f}% | {hier_coll['avg_hit_at_5']:.1f}% |\n")
f.write(f"| MRR | {base_coll['avg_mrr']:.3f} | {hier_coll['avg_mrr']:.3f} |\n")
f.write(f"| Total Latency | {base_coll['avg_total_latency']:.0f}ms | {hier_coll['avg_total_latency']:.0f}ms |\n")
f.write("\n")
# Key Findings
f.write("## Key Findings\n\n")
# Accuracy improvement
hit5_improvement = hier_metrics['avg_hit_at_5'] - base_metrics['avg_hit_at_5']
mrr_improvement = hier_metrics['avg_mrr'] - base_metrics['avg_mrr']
f.write(f"1. **Accuracy:** Hierarchical RAG achieved {hier_metrics['avg_hit_at_5']:.1f}% Hit@5 ")
f.write(f"compared to {base_metrics['avg_hit_at_5']:.1f}% for Base RAG ")
f.write(f"({hit5_improvement:+.1f}% improvement).\n\n")
f.write(f"2. **Ranking Quality:** Mean Reciprocal Rank improved from {base_metrics['avg_mrr']:.3f} ")
f.write(f"to {hier_metrics['avg_mrr']:.3f} ({mrr_improvement:+.3f}).\n\n")
# Latency analysis
latency_change = hier_metrics['avg_total_latency'] - base_metrics['avg_total_latency']
latency_pct = (latency_change / base_metrics['avg_total_latency'] * 100)
f.write(f"3. **Latency:** Hierarchical RAG ")
if latency_change < 0:
f.write(f"was faster by {abs(latency_change):.0f}ms ({abs(latency_pct):.1f}% reduction)")
else:
f.write(f"added {latency_change:.0f}ms ({latency_pct:.1f}% increase)")
f.write(f" compared to Base RAG.\n\n")
# Best performing collection
best_collection = max(collections,
key=lambda c: collection_metrics[c]["hierarchical"]["avg_hit_at_5"])
f.write(f"4. **Best Performance:** The '{best_collection}' collection showed ")
f.write(f"strongest results with {collection_metrics[best_collection]['hierarchical']['avg_hit_at_5']:.1f}% Hit@5.\n\n")
# Recommendations
f.write("## Recommendations\n\n")
if hier_metrics['avg_hit_at_5'] > base_metrics['avg_hit_at_5'] + 5:
f.write("- β
**Use Hierarchical RAG** for production deployments where metadata filtering is available.\n")
else:
f.write("- β οΈ **Limited benefit** from hierarchical filtering detected. Consider reviewing metadata quality.\n")
if hier_metrics['avg_total_latency'] < base_metrics['avg_total_latency'] * 1.2:
f.write("- β
Latency impact is acceptable for the accuracy gains.\n")
else:
f.write("- β οΈ Consider optimizing index structure to reduce latency overhead.\n")
f.write("\n---\n\n")
f.write("## Detailed Query Results\n\n")
# Sample queries with comparison
for i, (base_r, hier_r) in enumerate(zip(base_results[:20], hier_results[:20]), 1):
f.write(f"### Query {i}: {base_r.query}\n\n")
f.write(f"### Base Response {i}:\n{base_r.generated_answer}\n\n")
f.write(f"### Hier Response {i}:\n{hier_r.generated_answer}\n\n")
f.write(f"**Collection:** {base_r.collection}\n\n")
f.write("| Aspect | Base RAG | Hierarchical RAG |\n")
f.write("|--------|----------|------------------|\n")
f.write(f"| Hit@5 | {'β' if base_r.hit_at_5 else 'β'} | {'β' if hier_r.hit_at_5 else 'β'} |\n")
f.write(f"| MRR | {base_r.mrr:.3f} | {hier_r.mrr:.3f} |\n")
f.write(f"| Retrieved Docs | {base_r.retrieved_docs} | {hier_r.retrieved_docs} |\n")
f.write(f"| Total Latency | {base_r.total_latency_ms:.0f}ms | {hier_r.total_latency_ms:.0f}ms |\n")
f.write(f"| Semantic Sim | {base_r.avg_semantic_similarity:.3f} | {hier_r.avg_semantic_similarity:.3f} |\n")
f.write("\n")
print(f"β Saved summary report: {md_path}")
return md_path
def setup_test_data(collections: List[str] = None):
"""Ingest synthetic test documents into vector stores"""
print("\n" + "="*70)
print("Setting up test data for evaluation")
print("="*70 + "\n")
tot_docs = 0
for collection_name in collections:
if collection_name not in SYNTHETIC_DOCUMENTS:
print(f"β οΈ No synthetic data available for '{collection_name}', skipping...")
continue
docs = SYNTHETIC_DOCUMENTS[collection_name]
print(f"\nπ Ingesting {len(docs)} documents into '{collection_name}' collection...")
documents = []
for i, doc_data in enumerate(docs, 1):
metadata = doc_data["metadata"]
doc = Document(page_content=doc_data["content"], metadata=metadata)
metadata = MetaData(**metadata)
chunks = get_chunks([doc], metadata)
documents.extend(chunks)
vectorstore = get_vectorstore("eval_"+collection_name, drop_old=True)
ingest_documents(documents, vectorstore)
tot_docs += len(docs)
print(f"β Completed '{collection_name}' collection")
print("\n" + "="*70)
print("Test data setup complete!")
print("="*70 + "\n")
return tot_docs |