GraphResearcher / scripts /phase20_graphrag_batch_evaluation.py
yugbirla's picture
Add GraphRAG batch fusion evaluation report
db765e0
Raw
History Blame Contribute Delete
10 kB
from pathlib import Path
# Remove BOM from Python files
for path in Path("app").rglob("*.py"):
text = path.read_text(encoding="utf-8-sig")
text = text.replace("\ufeff", "")
path.write_text(text, encoding="utf-8")
print("BOM cleanup completed.")
# =====================================================
# 1. Create batch evaluator
# =====================================================
Path("app/evaluation/graphrag_batch_evaluator.py").write_text(r'''
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
from app.evaluation.graph_fusion_evaluator import compare_graph_fusion_retrieval
DEFAULT_GRAPHRAG_TEST_QUERIES = [
"What is RAG?",
"Why does RAG exist?",
"What are the main components of a RAG system?",
"What is vectorless RAG?",
"Why can vector search fail?",
"How does BM25 help in retrieval?",
"How does RAG reduce hallucination?",
"What is the role of citations in RAG?"
]
def parse_custom_queries(custom_queries: Optional[str]) -> List[str]:
if not custom_queries:
return []
# User can pass queries separated by ||
# Example: What is RAG?||Why does RAG exist?
queries = [
item.strip()
for item in custom_queries.split("||")
if item.strip()
]
return queries
def safe_number(value, default=0.0) -> float:
try:
return float(value)
except Exception:
return default
def summarize_batch_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
total = len(results)
if total == 0:
return {
"total_questions": 0,
"fusion_improved_count": 0,
"fusion_same_count": 0,
"fusion_worse_count": 0,
"average_normal_quality": 0.0,
"average_graph_quality": 0.0,
"average_fused_quality": 0.0,
"average_fusion_delta": 0.0,
"total_graph_added_chunks": 0,
"total_graph_supported_chunks": 0,
"final_verdict": "no_questions_evaluated"
}
normal_scores = []
graph_scores = []
fused_scores = []
deltas = []
fusion_improved_count = 0
fusion_same_count = 0
fusion_worse_count = 0
total_graph_added_chunks = 0
total_graph_supported_chunks = 0
for result in results:
comparison = result.get("comparison", {})
fusion_stats = result.get("fusion_stats", {})
normal_score = safe_number(comparison.get("normal_average_quality"))
graph_score = safe_number(comparison.get("graph_average_quality"))
fused_score = safe_number(comparison.get("fused_average_quality"))
delta = safe_number(comparison.get("fusion_quality_delta"))
normal_scores.append(normal_score)
graph_scores.append(graph_score)
fused_scores.append(fused_score)
deltas.append(delta)
if delta > 0:
fusion_improved_count += 1
elif delta == 0:
fusion_same_count += 1
else:
fusion_worse_count += 1
total_graph_added_chunks += int(fusion_stats.get("graph_added_count") or 0)
total_graph_supported_chunks += int(fusion_stats.get("graph_supported_count") or 0)
average_normal = round(sum(normal_scores) / total, 4)
average_graph = round(sum(graph_scores) / total, 4)
average_fused = round(sum(fused_scores) / total, 4)
average_delta = round(sum(deltas) / total, 4)
if fusion_improved_count > fusion_worse_count and average_delta > 0:
final_verdict = "graph_fusion_helped_overall"
elif fusion_worse_count > fusion_improved_count and average_delta < 0:
final_verdict = "graph_fusion_added_noise_overall"
else:
final_verdict = "graph_fusion_mixed_or_neutral"
return {
"total_questions": total,
"fusion_improved_count": fusion_improved_count,
"fusion_same_count": fusion_same_count,
"fusion_worse_count": fusion_worse_count,
"average_normal_quality": average_normal,
"average_graph_quality": average_graph,
"average_fused_quality": average_fused,
"average_fusion_delta": average_delta,
"total_graph_added_chunks": total_graph_added_chunks,
"total_graph_supported_chunks": total_graph_supported_chunks,
"final_verdict": final_verdict
}
def build_compact_question_result(
query: str,
full_result: Dict[str, Any]
) -> Dict[str, Any]:
comparison = full_result.get("comparison", {})
fusion_stats = full_result.get("fusion_stats", {})
normal_results = (
full_result
.get("normal_retrieval", {})
.get("results", [])
)
fused_results = (
full_result
.get("fused_retrieval", {})
.get("results", [])
)
return {
"query": query,
"comparison": comparison,
"fusion_stats": fusion_stats,
"top_normal_chunks": [
{
"rank": item.get("rank"),
"chunk_id": item.get("chunk_id"),
"page_number": item.get("page_number"),
"quality_score": item.get("quality_score"),
"penalties": item.get("penalties"),
"preview": item.get("content_preview")
}
for item in normal_results[:3]
],
"top_fused_chunks": [
{
"rank": item.get("rank"),
"chunk_id": item.get("chunk_id"),
"page_number": item.get("page_number"),
"retrieval_source": item.get("retrieval_source"),
"graph_supported": item.get("graph_supported"),
"quality_score": item.get("quality_score"),
"penalties": item.get("penalties"),
"preview": item.get("content_preview")
}
for item in fused_results[:3]
]
}
def run_graphrag_batch_evaluation(
document_id: str,
custom_queries: Optional[str] = None,
top_k: int = 5,
retrieval_mode: str = "hybrid",
use_reranker: bool = True,
graph_entity_limit: int = 8,
graph_retrieval_top_k: int = 5,
compact: bool = True
) -> Dict[str, Any]:
queries = parse_custom_queries(custom_queries)
if not queries:
queries = DEFAULT_GRAPHRAG_TEST_QUERIES
detailed_results = []
compact_results = []
failed_questions = []
for query in queries:
try:
result = compare_graph_fusion_retrieval(
document_id=document_id,
query=query,
top_k=top_k,
retrieval_mode=retrieval_mode,
use_reranker=use_reranker,
graph_entity_limit=graph_entity_limit,
graph_retrieval_top_k=graph_retrieval_top_k
)
detailed_results.append(result)
compact_results.append(
build_compact_question_result(
query=query,
full_result=result
)
)
except Exception as error:
failed_questions.append(
{
"query": query,
"error": str(error)
}
)
summary = summarize_batch_results(detailed_results)
response = {
"status": "success",
"document_id": document_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"evaluation_type": "graphrag_batch_fusion_evaluation",
"settings": {
"top_k": top_k,
"retrieval_mode": retrieval_mode,
"use_reranker": use_reranker,
"graph_entity_limit": graph_entity_limit,
"graph_retrieval_top_k": graph_retrieval_top_k,
"custom_queries_used": bool(custom_queries)
},
"summary": summary,
"failed_questions": failed_questions,
"questions": compact_results if compact else detailed_results,
"notes": [
"This is a heuristic debug report, not a final academic benchmark.",
"The report helps inspect whether graph fusion improves retrieval quality across multiple questions.",
"For formal metrics, create a labeled benchmark with ground-truth relevant chunks."
]
}
return response
''', encoding="utf-8")
# =====================================================
# 2. Patch main.py
# =====================================================
main_path = Path("app/main.py")
text = main_path.read_text(encoding="utf-8-sig")
text = text.replace("\ufeff", "")
if "from app.evaluation.graphrag_batch_evaluator import run_graphrag_batch_evaluation" not in text:
text = "from app.evaluation.graphrag_batch_evaluator import run_graphrag_batch_evaluation\n" + text
old_phases = [
"Phase 19 - GraphRAG Retrieval Fusion Evaluation",
"Phase 18 - Graph Quality Cleanup",
"Phase 17 - Graph Vector Retrieval Fusion"
]
for old in old_phases:
text = text.replace(old, "Phase 20 - GraphRAG Batch Evaluation Report")
if "# GraphRAG batch evaluation endpoint" not in text:
text += '''
# GraphRAG batch evaluation endpoint
@app.get("/documents/{document_id}/evaluation/graph-fusion/batch")
def evaluate_graph_fusion_batch_for_document(
document_id: str,
custom_queries: Optional[str] = None,
top_k: int = Query(5, ge=1, le=20),
retrieval_mode: str = Query("hybrid"),
use_reranker: bool = True,
graph_entity_limit: int = Query(8, ge=1, le=30),
graph_retrieval_top_k: int = Query(5, ge=1, le=20),
compact: bool = True
):
return run_graphrag_batch_evaluation(
document_id=document_id,
custom_queries=custom_queries,
top_k=top_k,
retrieval_mode=retrieval_mode,
use_reranker=use_reranker,
graph_entity_limit=graph_entity_limit,
graph_retrieval_top_k=graph_retrieval_top_k,
compact=compact
)
'''
main_path.write_text(text, encoding="utf-8")
print("Phase 20 GraphRAG batch evaluation report added.")