|
|
""" |
|
|
Retrieval Evaluation Script for Base RAG vs Hierarchical RAG |
|
|
Generates synthetic test data, evaluates retrieval performance, and produces reports. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import csv |
|
|
import time |
|
|
import uuid |
|
|
from tqdm import tqdm |
|
|
from random import shuffle |
|
|
from pathlib import Path |
|
|
from typing import List, Dict |
|
|
from datetime import datetime |
|
|
from dataclasses import dataclass, asdict |
|
|
import numpy as np |
|
|
from langchain_core.documents import Document |
|
|
from langchain_openai import OpenAIEmbeddings |
|
|
from dotenv import load_dotenv, find_dotenv |
|
|
from .index import MetaData, get_vectorstore |
|
|
from .retrieval import retrieval, generate |
|
|
from .ingest import ingest_documents, get_chunks |
|
|
from .synthetic_data import SYNTHETIC_DOCUMENTS, EVAL_QUERIES, EvalQuery |
|
|
|
|
|
find_dotenv() |
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
emb_model = OpenAIEmbeddings(model="text-embedding-3-small", dimensions=1536) |
|
|
|
|
|
@dataclass |
|
|
class EvalResult: |
|
|
"""Evaluation result for a single query""" |
|
|
query_id: str |
|
|
collection: str |
|
|
query: str |
|
|
rag_type: str |
|
|
|
|
|
|
|
|
retrieved_docs: int |
|
|
hit_at_1: bool |
|
|
hit_at_3: bool |
|
|
hit_at_5: bool |
|
|
mrr: float |
|
|
avg_similarity_score: float |
|
|
|
|
|
|
|
|
retrieval_latency_ms: float |
|
|
generation_latency_ms: float |
|
|
total_latency_ms: float |
|
|
|
|
|
|
|
|
avg_semantic_similarity: float |
|
|
|
|
|
|
|
|
generated_answer: str |
|
|
|
|
|
|
|
|
filters_used: Dict |
|
|
timestamp: str |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_semantic_similarity(query: str, documents: List[Document]) -> float: |
|
|
"""Calculate average semantic similarity between query and retrieved documents""" |
|
|
if not documents: |
|
|
return 0.0 |
|
|
|
|
|
query_embedding = emb_model.embed_query(query) |
|
|
doc_embeddings = emb_model.embed_documents([doc.page_content for doc in documents]) |
|
|
|
|
|
similarities = [] |
|
|
for doc_emb in doc_embeddings: |
|
|
|
|
|
similarity = np.dot(query_embedding, doc_emb) / ( |
|
|
np.linalg.norm(query_embedding) * np.linalg.norm(doc_emb) |
|
|
) |
|
|
similarities.append(similarity) |
|
|
|
|
|
return float(np.mean(similarities)) |
|
|
|
|
|
|
|
|
def calculate_mrr(ground_truth: List[str], retrieved_docs: List[Document]) -> float: |
|
|
"""Calculate Mean Reciprocal Rank""" |
|
|
for rank, doc in enumerate(retrieved_docs, start=1): |
|
|
|
|
|
for truth in ground_truth: |
|
|
if truth.lower() in doc.page_content.lower(): |
|
|
return 1.0 / rank |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def calculate_hit_at_k(ground_truth: List[str], retrieved_docs: List[Document], k: int) -> bool: |
|
|
"""Check if any ground truth appears in top-k results""" |
|
|
for doc in retrieved_docs[:k]: |
|
|
for truth in ground_truth: |
|
|
if truth.lower() in doc.page_content.lower(): |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
def evaluate_single_query( |
|
|
eval_query: EvalQuery, |
|
|
rag_type: str = "base" |
|
|
) -> EvalResult: |
|
|
"""Evaluate a single query with either base or hierarchical RAG""" |
|
|
|
|
|
|
|
|
if rag_type == "base": |
|
|
filters = MetaData(language=eval_query.language) |
|
|
filters_dict = {"language": eval_query.language} |
|
|
else: |
|
|
filters = MetaData( |
|
|
language=eval_query.language, |
|
|
domain=eval_query.domain, |
|
|
section=eval_query.section, |
|
|
topic=eval_query.topic, |
|
|
doc_type=eval_query.doc_type |
|
|
) |
|
|
filters_dict = { |
|
|
"language": eval_query.language, |
|
|
"domain": eval_query.domain, |
|
|
"section": eval_query.section, |
|
|
"topic": eval_query.topic, |
|
|
"doc_type": eval_query.doc_type |
|
|
} |
|
|
|
|
|
|
|
|
ret_start = time.time() |
|
|
vectorstore = get_vectorstore("eval_"+eval_query.collection) |
|
|
docs = retrieval(eval_query.query, filters, vectorstore) |
|
|
ret_end = time.time() |
|
|
ret_latency = (ret_end - ret_start) * 1000 |
|
|
|
|
|
|
|
|
gen_start = time.time() |
|
|
answer = generate(eval_query.query, docs) if docs else "No relevant documents found." |
|
|
gen_end = time.time() |
|
|
gen_latency = (gen_end - gen_start) * 1000 |
|
|
|
|
|
total_latency = ret_latency + gen_latency |
|
|
|
|
|
|
|
|
hit_1 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 1) |
|
|
hit_3 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 3) |
|
|
hit_5 = calculate_hit_at_k(eval_query.ground_truth_chunks, docs, 5) |
|
|
mrr = calculate_mrr(eval_query.ground_truth_chunks, docs) |
|
|
|
|
|
avg_sim_score = np.mean([doc.metadata.get('similarity_score', 0) for doc in docs]) if docs else 0.0 |
|
|
semantic_sim = calculate_semantic_similarity(eval_query.query, docs) |
|
|
|
|
|
query_id = f"{eval_query.collection}_{rag_type}_{hash(eval_query.query) % 10000}" |
|
|
|
|
|
return EvalResult( |
|
|
query_id=query_id, |
|
|
collection=eval_query.collection, |
|
|
query=eval_query.query, |
|
|
rag_type=rag_type, |
|
|
retrieved_docs=len(docs), |
|
|
hit_at_1=hit_1, |
|
|
hit_at_3=hit_3, |
|
|
hit_at_5=hit_5, |
|
|
mrr=mrr, |
|
|
avg_similarity_score=float(avg_sim_score), |
|
|
retrieval_latency_ms=ret_latency, |
|
|
generation_latency_ms=gen_latency, |
|
|
total_latency_ms=total_latency, |
|
|
avg_semantic_similarity=semantic_sim, |
|
|
generated_answer=answer, |
|
|
filters_used=filters_dict, |
|
|
timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
|
|
|
def run_full_evaluation( |
|
|
collections: List[str] = None, |
|
|
output_dir: str = "reports" |
|
|
) -> Dict[str, List[EvalResult]]: |
|
|
"""Run complete evaluation on all queries""" |
|
|
|
|
|
if collections is None: |
|
|
collections = ["hospital", "bank", "fluid_simulation"] |
|
|
|
|
|
Path(output_dir).mkdir(exist_ok=True) |
|
|
|
|
|
all_results = { |
|
|
"base": [], |
|
|
"hierarchical": [] |
|
|
} |
|
|
|
|
|
|
|
|
queries_to_eval = [q for q in EVAL_QUERIES if q.collection in collections] |
|
|
shuffle(queries_to_eval) |
|
|
print(f"\n{'='*70}") |
|
|
print(f"Starting Evaluation: {len(queries_to_eval)} queries across {len(collections)} collections") |
|
|
print(f"{'='*70}\n") |
|
|
|
|
|
for eval_query in tqdm(queries_to_eval, desc="Running evaluation queries"): |
|
|
|
|
|
base_result = evaluate_single_query(eval_query, "base") |
|
|
all_results["base"].append(base_result) |
|
|
|
|
|
|
|
|
hier_result = evaluate_single_query(eval_query, "hierarchical") |
|
|
all_results["hierarchical"].append(hier_result) |
|
|
|
|
|
return all_results |
|
|
|
|
|
|
|
|
def save_results(results: Dict[str, List[EvalResult]], output_dir: str = "reports"): |
|
|
"""Save evaluation results to CSV and JSON""" |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
Path(output_dir).mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
all_results = results["base"] + results["hierarchical"] |
|
|
|
|
|
|
|
|
csv_path = Path(output_dir) / f"eval_results_{timestamp}.csv" |
|
|
with open(csv_path, 'w', newline='') as f: |
|
|
if all_results: |
|
|
fieldnames = list(asdict(all_results[0]).keys()) |
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames) |
|
|
writer.writeheader() |
|
|
for result in all_results: |
|
|
row = asdict(result) |
|
|
|
|
|
row['filters_used'] = json.dumps(row['filters_used']) |
|
|
writer.writerow(row) |
|
|
|
|
|
print(f"β Saved CSV report: {csv_path}") |
|
|
|
|
|
|
|
|
json_path = Path(output_dir) / f"eval_results_{timestamp}.json" |
|
|
json_data = { |
|
|
"metadata": { |
|
|
"timestamp": timestamp, |
|
|
"total_queries": len(all_results), |
|
|
"collections_tested": list(set(r.collection for r in all_results)) |
|
|
}, |
|
|
"results": { |
|
|
"base": [asdict(r) for r in results["base"]], |
|
|
"hierarchical": [asdict(r) for r in results["hierarchical"]] |
|
|
} |
|
|
} |
|
|
with open(json_path, 'w') as f: |
|
|
json.dump(json_data, f, indent=2) |
|
|
|
|
|
print(f"β Saved JSON report: {json_path}") |
|
|
|
|
|
return csv_path, json_path |
|
|
|
|
|
|
|
|
def generate_summary_report(results: Dict[str, List[EvalResult]], output_dir: str = "reports"): |
|
|
"""Generate markdown summary report with comparative analysis""" |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
md_path = Path(output_dir) / f"eval_summary_{timestamp}.md" |
|
|
|
|
|
base_results = results["base"] |
|
|
hier_results = results["hierarchical"] |
|
|
|
|
|
|
|
|
def calc_metrics(result_list): |
|
|
return { |
|
|
"total_queries": len(result_list), |
|
|
"avg_hit_at_1": np.mean([r.hit_at_1 for r in result_list]) * 100, |
|
|
"avg_hit_at_3": np.mean([r.hit_at_3 for r in result_list]) * 100, |
|
|
"avg_hit_at_5": np.mean([r.hit_at_5 for r in result_list]) * 100, |
|
|
"avg_mrr": np.mean([r.mrr for r in result_list]), |
|
|
"avg_similarity": np.mean([r.avg_similarity_score for r in result_list]), |
|
|
"avg_semantic_sim": np.mean([r.avg_semantic_similarity for r in result_list]), |
|
|
"avg_retrieval_latency": np.mean([r.retrieval_latency_ms for r in result_list]), |
|
|
"avg_generation_latency": np.mean([r.generation_latency_ms for r in result_list]), |
|
|
"avg_total_latency": np.mean([r.total_latency_ms for r in result_list]), |
|
|
} |
|
|
|
|
|
base_metrics = calc_metrics(base_results) |
|
|
hier_metrics = calc_metrics(hier_results) |
|
|
|
|
|
|
|
|
collections = list(set(r.collection for r in base_results)) |
|
|
collection_metrics = {} |
|
|
|
|
|
for collection in collections: |
|
|
collection_metrics[collection] = { |
|
|
"base": calc_metrics([r for r in base_results if r.collection == collection]), |
|
|
"hierarchical": calc_metrics([r for r in hier_results if r.collection == collection]) |
|
|
} |
|
|
|
|
|
|
|
|
with open(md_path, 'w') as f: |
|
|
f.write("# RAG Retrieval Evaluation Report\n\n") |
|
|
f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") |
|
|
f.write("---\n\n") |
|
|
|
|
|
|
|
|
f.write("## Executive Summary\n\n") |
|
|
f.write(f"This report compares **Base RAG** (language-only filtering) against ") |
|
|
f.write(f"**Hierarchical RAG** (domain/section/topic/doc_type filtering) across ") |
|
|
f.write(f"{base_metrics['total_queries']} evaluation queries.\n\n") |
|
|
|
|
|
|
|
|
f.write("## Overall Performance Comparison\n\n") |
|
|
f.write("| Metric | Base RAG | Hierarchical RAG | Ξ (Improvement) |\n") |
|
|
f.write("|--------|----------|------------------|------------------|\n") |
|
|
|
|
|
metrics_to_show = [ |
|
|
("Hit@1", "avg_hit_at_1", "%", True), |
|
|
("Hit@3", "avg_hit_at_3", "%", True), |
|
|
("Hit@5", "avg_hit_at_5", "%", True), |
|
|
("MRR", "avg_mrr", "", True), |
|
|
("Avg Similarity Score", "avg_similarity", "", True), |
|
|
("Semantic Similarity", "avg_semantic_sim", "", True), |
|
|
("Retrieval Latency", "avg_retrieval_latency", "ms", False), |
|
|
("Generation Latency", "avg_generation_latency", "ms", False), |
|
|
("Total Latency", "avg_total_latency", "ms", False), |
|
|
] |
|
|
|
|
|
for label, key, unit, higher_better in metrics_to_show: |
|
|
base_val = base_metrics[key] |
|
|
hier_val = hier_metrics[key] |
|
|
|
|
|
if higher_better: |
|
|
delta = hier_val - base_val |
|
|
delta_pct = (delta / base_val * 100) if base_val > 0 else 0 |
|
|
delta_str = f"+{delta:.2f}{unit} ({delta_pct:+.1f}%)" if delta >= 0 else f"{delta:.2f}{unit} ({delta_pct:.1f}%)" |
|
|
else: |
|
|
delta = base_val - hier_val |
|
|
delta_pct = (delta / base_val * 100) if base_val > 0 else 0 |
|
|
delta_str = f"-{delta:.2f}{unit} ({delta_pct:.1f}% faster)" if delta >= 0 else f"+{abs(delta):.2f}{unit} ({abs(delta_pct):.1f}% slower)" |
|
|
|
|
|
f.write(f"| {label} | {base_val:.2f}{unit} | {hier_val:.2f}{unit} | {delta_str} |\n") |
|
|
|
|
|
f.write("\n") |
|
|
|
|
|
|
|
|
f.write("## Per-Collection Analysis\n\n") |
|
|
|
|
|
for collection in sorted(collections): |
|
|
f.write(f"### {collection.replace('_', ' ').title()}\n\n") |
|
|
|
|
|
base_coll = collection_metrics[collection]["base"] |
|
|
hier_coll = collection_metrics[collection]["hierarchical"] |
|
|
|
|
|
f.write("| Metric | Base RAG | Hierarchical RAG |\n") |
|
|
f.write("|--------|----------|------------------|\n") |
|
|
f.write(f"| Hit@1 | {base_coll['avg_hit_at_1']:.1f}% | {hier_coll['avg_hit_at_1']:.1f}% |\n") |
|
|
f.write(f"| Hit@5 | {base_coll['avg_hit_at_5']:.1f}% | {hier_coll['avg_hit_at_5']:.1f}% |\n") |
|
|
f.write(f"| MRR | {base_coll['avg_mrr']:.3f} | {hier_coll['avg_mrr']:.3f} |\n") |
|
|
f.write(f"| Total Latency | {base_coll['avg_total_latency']:.0f}ms | {hier_coll['avg_total_latency']:.0f}ms |\n") |
|
|
f.write("\n") |
|
|
|
|
|
|
|
|
f.write("## Key Findings\n\n") |
|
|
|
|
|
|
|
|
hit5_improvement = hier_metrics['avg_hit_at_5'] - base_metrics['avg_hit_at_5'] |
|
|
mrr_improvement = hier_metrics['avg_mrr'] - base_metrics['avg_mrr'] |
|
|
|
|
|
f.write(f"1. **Accuracy:** Hierarchical RAG achieved {hier_metrics['avg_hit_at_5']:.1f}% Hit@5 ") |
|
|
f.write(f"compared to {base_metrics['avg_hit_at_5']:.1f}% for Base RAG ") |
|
|
f.write(f"({hit5_improvement:+.1f}% improvement).\n\n") |
|
|
|
|
|
f.write(f"2. **Ranking Quality:** Mean Reciprocal Rank improved from {base_metrics['avg_mrr']:.3f} ") |
|
|
f.write(f"to {hier_metrics['avg_mrr']:.3f} ({mrr_improvement:+.3f}).\n\n") |
|
|
|
|
|
|
|
|
latency_change = hier_metrics['avg_total_latency'] - base_metrics['avg_total_latency'] |
|
|
latency_pct = (latency_change / base_metrics['avg_total_latency'] * 100) |
|
|
|
|
|
f.write(f"3. **Latency:** Hierarchical RAG ") |
|
|
if latency_change < 0: |
|
|
f.write(f"was faster by {abs(latency_change):.0f}ms ({abs(latency_pct):.1f}% reduction)") |
|
|
else: |
|
|
f.write(f"added {latency_change:.0f}ms ({latency_pct:.1f}% increase)") |
|
|
f.write(f" compared to Base RAG.\n\n") |
|
|
|
|
|
|
|
|
best_collection = max(collections, |
|
|
key=lambda c: collection_metrics[c]["hierarchical"]["avg_hit_at_5"]) |
|
|
f.write(f"4. **Best Performance:** The '{best_collection}' collection showed ") |
|
|
f.write(f"strongest results with {collection_metrics[best_collection]['hierarchical']['avg_hit_at_5']:.1f}% Hit@5.\n\n") |
|
|
|
|
|
|
|
|
f.write("## Recommendations\n\n") |
|
|
|
|
|
if hier_metrics['avg_hit_at_5'] > base_metrics['avg_hit_at_5'] + 5: |
|
|
f.write("- β
**Use Hierarchical RAG** for production deployments where metadata filtering is available.\n") |
|
|
else: |
|
|
f.write("- β οΈ **Limited benefit** from hierarchical filtering detected. Consider reviewing metadata quality.\n") |
|
|
|
|
|
if hier_metrics['avg_total_latency'] < base_metrics['avg_total_latency'] * 1.2: |
|
|
f.write("- β
Latency impact is acceptable for the accuracy gains.\n") |
|
|
else: |
|
|
f.write("- β οΈ Consider optimizing index structure to reduce latency overhead.\n") |
|
|
|
|
|
f.write("\n---\n\n") |
|
|
f.write("## Detailed Query Results\n\n") |
|
|
|
|
|
|
|
|
for i, (base_r, hier_r) in enumerate(zip(base_results[:20], hier_results[:20]), 1): |
|
|
f.write(f"### Query {i}: {base_r.query}\n\n") |
|
|
f.write(f"### Base Response {i}:\n{base_r.generated_answer}\n\n") |
|
|
f.write(f"### Hier Response {i}:\n{hier_r.generated_answer}\n\n") |
|
|
f.write(f"**Collection:** {base_r.collection}\n\n") |
|
|
|
|
|
f.write("| Aspect | Base RAG | Hierarchical RAG |\n") |
|
|
f.write("|--------|----------|------------------|\n") |
|
|
f.write(f"| Hit@5 | {'β' if base_r.hit_at_5 else 'β'} | {'β' if hier_r.hit_at_5 else 'β'} |\n") |
|
|
f.write(f"| MRR | {base_r.mrr:.3f} | {hier_r.mrr:.3f} |\n") |
|
|
f.write(f"| Retrieved Docs | {base_r.retrieved_docs} | {hier_r.retrieved_docs} |\n") |
|
|
f.write(f"| Total Latency | {base_r.total_latency_ms:.0f}ms | {hier_r.total_latency_ms:.0f}ms |\n") |
|
|
f.write(f"| Semantic Sim | {base_r.avg_semantic_similarity:.3f} | {hier_r.avg_semantic_similarity:.3f} |\n") |
|
|
f.write("\n") |
|
|
|
|
|
print(f"β Saved summary report: {md_path}") |
|
|
return md_path |
|
|
|
|
|
|
|
|
def setup_test_data(collections: List[str] = None): |
|
|
"""Ingest synthetic test documents into vector stores""" |
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("Setting up test data for evaluation") |
|
|
print("="*70 + "\n") |
|
|
tot_docs = 0 |
|
|
for collection_name in collections: |
|
|
if collection_name not in SYNTHETIC_DOCUMENTS: |
|
|
print(f"β οΈ No synthetic data available for '{collection_name}', skipping...") |
|
|
continue |
|
|
|
|
|
docs = SYNTHETIC_DOCUMENTS[collection_name] |
|
|
print(f"\nπ Ingesting {len(docs)} documents into '{collection_name}' collection...") |
|
|
documents = [] |
|
|
for i, doc_data in enumerate(docs, 1): |
|
|
metadata = doc_data["metadata"] |
|
|
doc = Document(page_content=doc_data["content"], metadata=metadata) |
|
|
metadata = MetaData(**metadata) |
|
|
chunks = get_chunks([doc], metadata) |
|
|
documents.extend(chunks) |
|
|
|
|
|
vectorstore = get_vectorstore("eval_"+collection_name, drop_old=True) |
|
|
ingest_documents(documents, vectorstore) |
|
|
tot_docs += len(docs) |
|
|
print(f"β Completed '{collection_name}' collection") |
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("Test data setup complete!") |
|
|
print("="*70 + "\n") |
|
|
|
|
|
return tot_docs |