from pathlib import Path # Remove BOM from Python files for path in Path("app").rglob("*.py"): text = path.read_text(encoding="utf-8-sig") text = text.replace("\ufeff", "") path.write_text(text, encoding="utf-8") print("BOM cleanup completed.") # ===================================================== # 1. Create batch evaluator # ===================================================== Path("app/evaluation/graphrag_batch_evaluator.py").write_text(r''' from typing import Dict, Any, List, Optional from datetime import datetime, timezone from app.evaluation.graph_fusion_evaluator import compare_graph_fusion_retrieval DEFAULT_GRAPHRAG_TEST_QUERIES = [ "What is RAG?", "Why does RAG exist?", "What are the main components of a RAG system?", "What is vectorless RAG?", "Why can vector search fail?", "How does BM25 help in retrieval?", "How does RAG reduce hallucination?", "What is the role of citations in RAG?" ] def parse_custom_queries(custom_queries: Optional[str]) -> List[str]: if not custom_queries: return [] # User can pass queries separated by || # Example: What is RAG?||Why does RAG exist? queries = [ item.strip() for item in custom_queries.split("||") if item.strip() ] return queries def safe_number(value, default=0.0) -> float: try: return float(value) except Exception: return default def summarize_batch_results(results: List[Dict[str, Any]]) -> Dict[str, Any]: total = len(results) if total == 0: return { "total_questions": 0, "fusion_improved_count": 0, "fusion_same_count": 0, "fusion_worse_count": 0, "average_normal_quality": 0.0, "average_graph_quality": 0.0, "average_fused_quality": 0.0, "average_fusion_delta": 0.0, "total_graph_added_chunks": 0, "total_graph_supported_chunks": 0, "final_verdict": "no_questions_evaluated" } normal_scores = [] graph_scores = [] fused_scores = [] deltas = [] fusion_improved_count = 0 fusion_same_count = 0 fusion_worse_count = 0 total_graph_added_chunks = 0 total_graph_supported_chunks = 0 for result in results: comparison = result.get("comparison", {}) fusion_stats = result.get("fusion_stats", {}) normal_score = safe_number(comparison.get("normal_average_quality")) graph_score = safe_number(comparison.get("graph_average_quality")) fused_score = safe_number(comparison.get("fused_average_quality")) delta = safe_number(comparison.get("fusion_quality_delta")) normal_scores.append(normal_score) graph_scores.append(graph_score) fused_scores.append(fused_score) deltas.append(delta) if delta > 0: fusion_improved_count += 1 elif delta == 0: fusion_same_count += 1 else: fusion_worse_count += 1 total_graph_added_chunks += int(fusion_stats.get("graph_added_count") or 0) total_graph_supported_chunks += int(fusion_stats.get("graph_supported_count") or 0) average_normal = round(sum(normal_scores) / total, 4) average_graph = round(sum(graph_scores) / total, 4) average_fused = round(sum(fused_scores) / total, 4) average_delta = round(sum(deltas) / total, 4) if fusion_improved_count > fusion_worse_count and average_delta > 0: final_verdict = "graph_fusion_helped_overall" elif fusion_worse_count > fusion_improved_count and average_delta < 0: final_verdict = "graph_fusion_added_noise_overall" else: final_verdict = "graph_fusion_mixed_or_neutral" return { "total_questions": total, "fusion_improved_count": fusion_improved_count, "fusion_same_count": fusion_same_count, "fusion_worse_count": fusion_worse_count, "average_normal_quality": average_normal, "average_graph_quality": average_graph, "average_fused_quality": average_fused, "average_fusion_delta": average_delta, "total_graph_added_chunks": total_graph_added_chunks, "total_graph_supported_chunks": total_graph_supported_chunks, "final_verdict": final_verdict } def build_compact_question_result( query: str, full_result: Dict[str, Any] ) -> Dict[str, Any]: comparison = full_result.get("comparison", {}) fusion_stats = full_result.get("fusion_stats", {}) normal_results = ( full_result .get("normal_retrieval", {}) .get("results", []) ) fused_results = ( full_result .get("fused_retrieval", {}) .get("results", []) ) return { "query": query, "comparison": comparison, "fusion_stats": fusion_stats, "top_normal_chunks": [ { "rank": item.get("rank"), "chunk_id": item.get("chunk_id"), "page_number": item.get("page_number"), "quality_score": item.get("quality_score"), "penalties": item.get("penalties"), "preview": item.get("content_preview") } for item in normal_results[:3] ], "top_fused_chunks": [ { "rank": item.get("rank"), "chunk_id": item.get("chunk_id"), "page_number": item.get("page_number"), "retrieval_source": item.get("retrieval_source"), "graph_supported": item.get("graph_supported"), "quality_score": item.get("quality_score"), "penalties": item.get("penalties"), "preview": item.get("content_preview") } for item in fused_results[:3] ] } def run_graphrag_batch_evaluation( document_id: str, custom_queries: Optional[str] = None, top_k: int = 5, retrieval_mode: str = "hybrid", use_reranker: bool = True, graph_entity_limit: int = 8, graph_retrieval_top_k: int = 5, compact: bool = True ) -> Dict[str, Any]: queries = parse_custom_queries(custom_queries) if not queries: queries = DEFAULT_GRAPHRAG_TEST_QUERIES detailed_results = [] compact_results = [] failed_questions = [] for query in queries: try: result = compare_graph_fusion_retrieval( document_id=document_id, query=query, top_k=top_k, retrieval_mode=retrieval_mode, use_reranker=use_reranker, graph_entity_limit=graph_entity_limit, graph_retrieval_top_k=graph_retrieval_top_k ) detailed_results.append(result) compact_results.append( build_compact_question_result( query=query, full_result=result ) ) except Exception as error: failed_questions.append( { "query": query, "error": str(error) } ) summary = summarize_batch_results(detailed_results) response = { "status": "success", "document_id": document_id, "created_at": datetime.now(timezone.utc).isoformat(), "evaluation_type": "graphrag_batch_fusion_evaluation", "settings": { "top_k": top_k, "retrieval_mode": retrieval_mode, "use_reranker": use_reranker, "graph_entity_limit": graph_entity_limit, "graph_retrieval_top_k": graph_retrieval_top_k, "custom_queries_used": bool(custom_queries) }, "summary": summary, "failed_questions": failed_questions, "questions": compact_results if compact else detailed_results, "notes": [ "This is a heuristic debug report, not a final academic benchmark.", "The report helps inspect whether graph fusion improves retrieval quality across multiple questions.", "For formal metrics, create a labeled benchmark with ground-truth relevant chunks." ] } return response ''', encoding="utf-8") # ===================================================== # 2. Patch main.py # ===================================================== main_path = Path("app/main.py") text = main_path.read_text(encoding="utf-8-sig") text = text.replace("\ufeff", "") if "from app.evaluation.graphrag_batch_evaluator import run_graphrag_batch_evaluation" not in text: text = "from app.evaluation.graphrag_batch_evaluator import run_graphrag_batch_evaluation\n" + text old_phases = [ "Phase 19 - GraphRAG Retrieval Fusion Evaluation", "Phase 18 - Graph Quality Cleanup", "Phase 17 - Graph Vector Retrieval Fusion" ] for old in old_phases: text = text.replace(old, "Phase 20 - GraphRAG Batch Evaluation Report") if "# GraphRAG batch evaluation endpoint" not in text: text += ''' # GraphRAG batch evaluation endpoint @app.get("/documents/{document_id}/evaluation/graph-fusion/batch") def evaluate_graph_fusion_batch_for_document( document_id: str, custom_queries: Optional[str] = None, top_k: int = Query(5, ge=1, le=20), retrieval_mode: str = Query("hybrid"), use_reranker: bool = True, graph_entity_limit: int = Query(8, ge=1, le=30), graph_retrieval_top_k: int = Query(5, ge=1, le=20), compact: bool = True ): return run_graphrag_batch_evaluation( document_id=document_id, custom_queries=custom_queries, top_k=top_k, retrieval_mode=retrieval_mode, use_reranker=use_reranker, graph_entity_limit=graph_entity_limit, graph_retrieval_top_k=graph_retrieval_top_k, compact=compact ) ''' main_path.write_text(text, encoding="utf-8") print("Phase 20 GraphRAG batch evaluation report added.")