hierRAG / src /core /eval.py
ikram98ai's picture
refactoring get_vectorstore
0c20b58
"""
Retrieval Evaluation Script for Base RAG vs Hierarchical RAG
Generates synthetic test data, evaluates retrieval performance, and produces reports.
"""
import json
import csv
import time
import uuid
from tqdm import tqdm
from random import shuffle
from pathlib import Path
from typing import List, Dict
from datetime import datetime
from dataclasses import dataclass, asdict
import numpy as np
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from dotenv import load_dotenv, find_dotenv
from .index import MetaData, get_vectorstore
from .retrieval import retrieval, generate
from .ingest import ingest_documents, get_chunks
from .synthetic_data import SYNTHETIC_DOCUMENTS, EVAL_QUERIES, EvalQuery
find_dotenv()
load_dotenv()
# Embedding model for semantic similarity
emb_model = OpenAIEmbeddings(model="text-embedding-3-small", dimensions=1536)
@dataclass
class EvalResult:
"""Evaluation result for a single query"""
query_id: str
collection: str
query: str
rag_type: str # "base" or "hierarchical"
# Retrieval metrics
retrieved_docs: int
hit_at_1: bool
hit_at_3: bool
hit_at_5: bool
mrr: float
avg_similarity_score: float
# Latency
retrieval_latency_ms: float
generation_latency_ms: float
total_latency_ms: float
# Semantic similarity
avg_semantic_similarity: float
# Generated answer
generated_answer: str
# Metadata
filters_used: Dict
timestamp: str
# ============================================================================
# EVALUATION FUNCTIONS
# ============================================================================
def calculate_semantic_similarity(query: str, documents: List[Document]) -> float:
"""Calculate average semantic similarity between query and retrieved documents"""
if not documents:
return 0.0
query_embedding = emb_model.embed_query(query)
doc_embeddings = emb_model.embed_documents([doc.page_content for doc in documents])
similarities = []
for doc_emb in doc_embeddings:
# Cosine similarity
similarity = np.dot(query_embedding, doc_emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(doc_emb)
)
similarities.append(similarity)
return float(np.mean(similarities))
def calculate_mrr(ground_truth: List[str], retrieved_docs: List[Document]) -> float:
"""Calculate Mean Reciprocal Rank"""
for rank, doc in enumerate(retrieved_docs, start=1):
# Check if any ground truth snippet appears in the document
for truth in ground_truth:
if truth.lower() in doc.page_content.lower():
return 1.0 / rank
return 0.0
def calculate_hit_at_k(ground_truth: List[str], retrieved_docs: List[Document], k: int) -> bool:
"""Check if any ground truth appears in top-k results"""
for doc in retrieved_docs[:k]:
for truth in ground_truth:
if truth.lower() in doc.page_content.lower():
return True
return False
def evaluate_single_query(
eval_query: EvalQuery,
rag_type: str = "base"
) -> EvalResult:
"""Evaluate a single query with either base or hierarchical RAG"""
# Set up filters based on RAG type
if rag_type == "base":
filters = MetaData(language=eval_query.language)
filters_dict = {"language": eval_query.language}
else: # hierarchical
filters = MetaData(
language=eval_query.language,
domain=eval_query.domain,
section=eval_query.section,
topic=eval_query.topic,
doc_type=eval_query.doc_type
)
filters_dict = {
"language": eval_query.language,
"domain": eval_query.domain,
"section": eval_query.section,
"topic": eval_query.topic,
"doc_type": eval_query.doc_type
}
# Retrieval
ret_start = time.time()
vectorstore = get_vectorstore("eval_"+eval_query.collection)
docs = retrieval(eval_query.query, filters, vectorstore)
ret_end = time.time()
ret_latency = (ret_end - ret_start) * 1000 # Convert to ms
# Generation
gen_start = time.time()
answer = generate(eval_query.query, docs) if docs else "No relevant documents found."
gen_end = time.time()
gen_latency = (gen_end - gen_start) * 1000 # Convert to ms
total_latency = ret_latency + gen_latency
# Calculate metrics
hit_1 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 1)
hit_3 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 3)
hit_5 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 5)
mrr = calculate_mrr(eval_query.ground_truth_chunks, docs)
avg_sim_score = np.mean([doc.metadata.get('similarity_score', 0) for doc in docs]) if docs else 0.0
semantic_sim = calculate_semantic_similarity(eval_query.query, docs)
query_id = f"{eval_query.collection}_{rag_type}_{hash(eval_query.query) % 10000}"
return EvalResult(
query_id=query_id,
collection=eval_query.collection,
query=eval_query.query,
rag_type=rag_type,
retrieved_docs=len(docs),
hit_at_1=hit_1,
hit_at_3=hit_3,
hit_at_5=hit_5,
mrr=mrr,
avg_similarity_score=float(avg_sim_score),
retrieval_latency_ms=ret_latency,
generation_latency_ms=gen_latency,
total_latency_ms=total_latency,
avg_semantic_similarity=semantic_sim,
generated_answer=answer,
filters_used=filters_dict,
timestamp=datetime.now().isoformat()
)
def run_full_evaluation(
collections: List[str] = None,
output_dir: str = "reports"
) -> Dict[str, List[EvalResult]]:
"""Run complete evaluation on all queries"""
if collections is None:
collections = ["hospital", "bank", "fluid_simulation"]
Path(output_dir).mkdir(exist_ok=True)
all_results = {
"base": [],
"hierarchical": []
}
# Filter queries by requested collections
queries_to_eval = [q for q in EVAL_QUERIES if q.collection in collections]
shuffle(queries_to_eval)
print(f"\n{'='*70}")
print(f"Starting Evaluation: {len(queries_to_eval)} queries across {len(collections)} collections")
print(f"{'='*70}\n")
for eval_query in tqdm(queries_to_eval, desc="Running evaluation queries"):
# Evaluate with base RAG
base_result = evaluate_single_query(eval_query, "base")
all_results["base"].append(base_result)
# Evaluate with hierarchical RAG
hier_result = evaluate_single_query(eval_query, "hierarchical")
all_results["hierarchical"].append(hier_result)
return all_results
def save_results(results: Dict[str, List[EvalResult]], output_dir: str = "reports"):
"""Save evaluation results to CSV and JSON"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
Path(output_dir).mkdir(exist_ok=True)
# Combine all results
all_results = results["base"] + results["hierarchical"]
# Save as CSV
csv_path = Path(output_dir) / f"eval_results_{timestamp}.csv"
with open(csv_path, 'w', newline='') as f:
if all_results:
fieldnames = list(asdict(all_results[0]).keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for result in all_results:
row = asdict(result)
# Convert complex types to strings
row['filters_used'] = json.dumps(row['filters_used'])
writer.writerow(row)
print(f"βœ“ Saved CSV report: {csv_path}")
# Save as JSON
json_path = Path(output_dir) / f"eval_results_{timestamp}.json"
json_data = {
"metadata": {
"timestamp": timestamp,
"total_queries": len(all_results),
"collections_tested": list(set(r.collection for r in all_results))
},
"results": {
"base": [asdict(r) for r in results["base"]],
"hierarchical": [asdict(r) for r in results["hierarchical"]]
}
}
with open(json_path, 'w') as f:
json.dump(json_data, f, indent=2)
print(f"βœ“ Saved JSON report: {json_path}")
return csv_path, json_path
def generate_summary_report(results: Dict[str, List[EvalResult]], output_dir: str = "reports"):
"""Generate markdown summary report with comparative analysis"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
md_path = Path(output_dir) / f"eval_summary_{timestamp}.md"
base_results = results["base"]
hier_results = results["hierarchical"]
# Calculate aggregate metrics
def calc_metrics(result_list):
return {
"total_queries": len(result_list),
"avg_hit_at_1": np.mean([r.hit_at_1 for r in result_list]) * 100,
"avg_hit_at_3": np.mean([r.hit_at_3 for r in result_list]) * 100,
"avg_hit_at_5": np.mean([r.hit_at_5 for r in result_list]) * 100,
"avg_mrr": np.mean([r.mrr for r in result_list]),
"avg_similarity": np.mean([r.avg_similarity_score for r in result_list]),
"avg_semantic_sim": np.mean([r.avg_semantic_similarity for r in result_list]),
"avg_retrieval_latency": np.mean([r.retrieval_latency_ms for r in result_list]),
"avg_generation_latency": np.mean([r.generation_latency_ms for r in result_list]),
"avg_total_latency": np.mean([r.total_latency_ms for r in result_list]),
}
base_metrics = calc_metrics(base_results)
hier_metrics = calc_metrics(hier_results)
# Calculate per-collection metrics
collections = list(set(r.collection for r in base_results))
collection_metrics = {}
for collection in collections:
collection_metrics[collection] = {
"base": calc_metrics([r for r in base_results if r.collection == collection]),
"hierarchical": calc_metrics([r for r in hier_results if r.collection == collection])
}
# Generate markdown report
with open(md_path, 'w') as f:
f.write("# RAG Retrieval Evaluation Report\n\n")
f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("---\n\n")
# Executive Summary
f.write("## Executive Summary\n\n")
f.write(f"This report compares **Base RAG** (language-only filtering) against ")
f.write(f"**Hierarchical RAG** (domain/section/topic/doc_type filtering) across ")
f.write(f"{base_metrics['total_queries']} evaluation queries.\n\n")
# Overall Performance Comparison
f.write("## Overall Performance Comparison\n\n")
f.write("| Metric | Base RAG | Hierarchical RAG | Ξ” (Improvement) |\n")
f.write("|--------|----------|------------------|------------------|\n")
metrics_to_show = [
("Hit@1", "avg_hit_at_1", "%", True),
("Hit@3", "avg_hit_at_3", "%", True),
("Hit@5", "avg_hit_at_5", "%", True),
("MRR", "avg_mrr", "", True),
("Avg Similarity Score", "avg_similarity", "", True),
("Semantic Similarity", "avg_semantic_sim", "", True),
("Retrieval Latency", "avg_retrieval_latency", "ms", False),
("Generation Latency", "avg_generation_latency", "ms", False),
("Total Latency", "avg_total_latency", "ms", False),
]
for label, key, unit, higher_better in metrics_to_show:
base_val = base_metrics[key]
hier_val = hier_metrics[key]
if higher_better:
delta = hier_val - base_val
delta_pct = (delta / base_val * 100) if base_val > 0 else 0
delta_str = f"+{delta:.2f}{unit} ({delta_pct:+.1f}%)" if delta >= 0 else f"{delta:.2f}{unit} ({delta_pct:.1f}%)"
else:
delta = base_val - hier_val
delta_pct = (delta / base_val * 100) if base_val > 0 else 0
delta_str = f"-{delta:.2f}{unit} ({delta_pct:.1f}% faster)" if delta >= 0 else f"+{abs(delta):.2f}{unit} ({abs(delta_pct):.1f}% slower)"
f.write(f"| {label} | {base_val:.2f}{unit} | {hier_val:.2f}{unit} | {delta_str} |\n")
f.write("\n")
# Per-Collection Analysis
f.write("## Per-Collection Analysis\n\n")
for collection in sorted(collections):
f.write(f"### {collection.replace('_', ' ').title()}\n\n")
base_coll = collection_metrics[collection]["base"]
hier_coll = collection_metrics[collection]["hierarchical"]
f.write("| Metric | Base RAG | Hierarchical RAG |\n")
f.write("|--------|----------|------------------|\n")
f.write(f"| Hit@1 | {base_coll['avg_hit_at_1']:.1f}% | {hier_coll['avg_hit_at_1']:.1f}% |\n")
f.write(f"| Hit@5 | {base_coll['avg_hit_at_5']:.1f}% | {hier_coll['avg_hit_at_5']:.1f}% |\n")
f.write(f"| MRR | {base_coll['avg_mrr']:.3f} | {hier_coll['avg_mrr']:.3f} |\n")
f.write(f"| Total Latency | {base_coll['avg_total_latency']:.0f}ms | {hier_coll['avg_total_latency']:.0f}ms |\n")
f.write("\n")
# Key Findings
f.write("## Key Findings\n\n")
# Accuracy improvement
hit5_improvement = hier_metrics['avg_hit_at_5'] - base_metrics['avg_hit_at_5']
mrr_improvement = hier_metrics['avg_mrr'] - base_metrics['avg_mrr']
f.write(f"1. **Accuracy:** Hierarchical RAG achieved {hier_metrics['avg_hit_at_5']:.1f}% Hit@5 ")
f.write(f"compared to {base_metrics['avg_hit_at_5']:.1f}% for Base RAG ")
f.write(f"({hit5_improvement:+.1f}% improvement).\n\n")
f.write(f"2. **Ranking Quality:** Mean Reciprocal Rank improved from {base_metrics['avg_mrr']:.3f} ")
f.write(f"to {hier_metrics['avg_mrr']:.3f} ({mrr_improvement:+.3f}).\n\n")
# Latency analysis
latency_change = hier_metrics['avg_total_latency'] - base_metrics['avg_total_latency']
latency_pct = (latency_change / base_metrics['avg_total_latency'] * 100)
f.write(f"3. **Latency:** Hierarchical RAG ")
if latency_change < 0:
f.write(f"was faster by {abs(latency_change):.0f}ms ({abs(latency_pct):.1f}% reduction)")
else:
f.write(f"added {latency_change:.0f}ms ({latency_pct:.1f}% increase)")
f.write(f" compared to Base RAG.\n\n")
# Best performing collection
best_collection = max(collections,
key=lambda c: collection_metrics[c]["hierarchical"]["avg_hit_at_5"])
f.write(f"4. **Best Performance:** The '{best_collection}' collection showed ")
f.write(f"strongest results with {collection_metrics[best_collection]['hierarchical']['avg_hit_at_5']:.1f}% Hit@5.\n\n")
# Recommendations
f.write("## Recommendations\n\n")
if hier_metrics['avg_hit_at_5'] > base_metrics['avg_hit_at_5'] + 5:
f.write("- βœ… **Use Hierarchical RAG** for production deployments where metadata filtering is available.\n")
else:
f.write("- ⚠️ **Limited benefit** from hierarchical filtering detected. Consider reviewing metadata quality.\n")
if hier_metrics['avg_total_latency'] < base_metrics['avg_total_latency'] * 1.2:
f.write("- βœ… Latency impact is acceptable for the accuracy gains.\n")
else:
f.write("- ⚠️ Consider optimizing index structure to reduce latency overhead.\n")
f.write("\n---\n\n")
f.write("## Detailed Query Results\n\n")
# Sample queries with comparison
for i, (base_r, hier_r) in enumerate(zip(base_results[:20], hier_results[:20]), 1):
f.write(f"### Query {i}: {base_r.query}\n\n")
f.write(f"### Base Response {i}:\n{base_r.generated_answer}\n\n")
f.write(f"### Hier Response {i}:\n{hier_r.generated_answer}\n\n")
f.write(f"**Collection:** {base_r.collection}\n\n")
f.write("| Aspect | Base RAG | Hierarchical RAG |\n")
f.write("|--------|----------|------------------|\n")
f.write(f"| Hit@5 | {'βœ“' if base_r.hit_at_5 else 'βœ—'} | {'βœ“' if hier_r.hit_at_5 else 'βœ—'} |\n")
f.write(f"| MRR | {base_r.mrr:.3f} | {hier_r.mrr:.3f} |\n")
f.write(f"| Retrieved Docs | {base_r.retrieved_docs} | {hier_r.retrieved_docs} |\n")
f.write(f"| Total Latency | {base_r.total_latency_ms:.0f}ms | {hier_r.total_latency_ms:.0f}ms |\n")
f.write(f"| Semantic Sim | {base_r.avg_semantic_similarity:.3f} | {hier_r.avg_semantic_similarity:.3f} |\n")
f.write("\n")
print(f"βœ“ Saved summary report: {md_path}")
return md_path
def setup_test_data(collections: List[str] = None):
"""Ingest synthetic test documents into vector stores"""
print("\n" + "="*70)
print("Setting up test data for evaluation")
print("="*70 + "\n")
tot_docs = 0
for collection_name in collections:
if collection_name not in SYNTHETIC_DOCUMENTS:
print(f"⚠️ No synthetic data available for '{collection_name}', skipping...")
continue
docs = SYNTHETIC_DOCUMENTS[collection_name]
print(f"\nπŸ“š Ingesting {len(docs)} documents into '{collection_name}' collection...")
documents = []
for i, doc_data in enumerate(docs, 1):
metadata = doc_data["metadata"]
doc = Document(page_content=doc_data["content"], metadata=metadata)
metadata = MetaData(**metadata)
chunks = get_chunks([doc], metadata)
documents.extend(chunks)
vectorstore = get_vectorstore("eval_"+collection_name, drop_old=True)
ingest_documents(documents, vectorstore)
tot_docs += len(docs)
print(f"βœ“ Completed '{collection_name}' collection")
print("\n" + "="*70)
print("Test data setup complete!")
print("="*70 + "\n")
return tot_docs