Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| # Remove BOM from Python files | |
| for path in Path("app").rglob("*.py"): | |
| text = path.read_text(encoding="utf-8-sig") | |
| text = text.replace("\ufeff", "") | |
| path.write_text(text, encoding="utf-8") | |
| print("BOM cleanup completed.") | |
| # ===================================================== | |
| # 1. Create GraphRAG fusion evaluator | |
| # ===================================================== | |
| Path("app/evaluation/graph_fusion_evaluator.py").write_text(r''' | |
| import re | |
| from typing import Dict, Any, List, Optional | |
| from app.retrieval.hybrid_search_service import retrieve_chunks | |
| from app.retrieval.reranking_service import rerank_results | |
| from app.retrieval.citation_service import attach_source_ids | |
| from app.generation.context_cleaner import clean_retrieved_results | |
| from app.graph.graph_guided_retriever import graph_guided_retrieve | |
| from app.graph.graph_retrieval_fusion import fuse_retrieval_results_with_graph | |
| try: | |
| from app.graph.graph_quality import ( | |
| is_low_quality_chunk_text, | |
| is_meta_showcase_chunk_text, | |
| is_cover_or_marketing_chunk_text | |
| ) | |
| except Exception: | |
| def is_low_quality_chunk_text(text: str) -> bool: | |
| return False | |
| def is_meta_showcase_chunk_text(text: str) -> bool: | |
| return False | |
| def is_cover_or_marketing_chunk_text(text: str) -> bool: | |
| return False | |
| def get_value(obj, key: str, default=None): | |
| if isinstance(obj, dict): | |
| return obj.get(key, default) | |
| return getattr(obj, key, default) | |
| def tokenize(text: str) -> List[str]: | |
| return re.findall(r"[a-zA-Z0-9_]+", str(text or "").lower()) | |
| def get_content(result: Any) -> str: | |
| return ( | |
| get_value(result, "content") | |
| or get_value(result, "text") | |
| or get_value(result, "raw_content") | |
| or "" | |
| ) | |
| def get_chunk_id(result: Any) -> str: | |
| return str( | |
| get_value(result, "chunk_id") | |
| or get_value(result, "id") | |
| or "" | |
| ) | |
| def preview(text: str, max_chars: int = 350) -> str: | |
| text = str(text or "").replace("\n", " ").strip() | |
| if len(text) > max_chars: | |
| return text[:max_chars] + "..." | |
| return text | |
| def query_terms(query: str) -> List[str]: | |
| stopwords = { | |
| "what", "is", "are", "the", "a", "an", "of", "to", "and", | |
| "or", "why", "how", "does", "do", "explain", "define" | |
| } | |
| return [ | |
| term for term in tokenize(query) | |
| if term not in stopwords and len(term) > 1 | |
| ] | |
| def quality_score_for_result(query: str, result: Any) -> Dict[str, Any]: | |
| content = get_content(result) | |
| lower = content.lower() | |
| tokens = set(tokenize(content)) | |
| terms = query_terms(query) | |
| score = 0.0 | |
| reasons = [] | |
| for term in terms: | |
| if term in tokens: | |
| score += 2.0 | |
| reasons.append(f"contains query term: {term}") | |
| elif len(term) >= 4 and term in lower: | |
| score += 1.0 | |
| reasons.append(f"contains query substring: {term}") | |
| if "rag" in terms: | |
| definition_markers = [ | |
| "rag is", | |
| "rag stands for", | |
| "retrieval-augmented generation", | |
| "retrieval augmented generation", | |
| "adds a retrieval step", | |
| "before generation", | |
| "document corpus", | |
| "reduces hallucination" | |
| ] | |
| for marker in definition_markers: | |
| if marker in lower: | |
| score += 3.0 | |
| reasons.append(f"definition marker: {marker}") | |
| if get_value(result, "graph_supported", False): | |
| score += 1.5 | |
| reasons.append("supported by graph and retrieval") | |
| retrieval_source = get_value(result, "retrieval_source") | |
| if retrieval_source == "graph": | |
| score += 0.5 | |
| reasons.append("selected by graph retrieval") | |
| penalties = [] | |
| if is_low_quality_chunk_text(content): | |
| score -= 5.0 | |
| penalties.append("low quality / TOC-like chunk") | |
| if is_meta_showcase_chunk_text(content): | |
| score -= 5.0 | |
| penalties.append("meta / LinkedIn / resume-style chunk") | |
| if is_cover_or_marketing_chunk_text(content): | |
| score -= 5.0 | |
| penalties.append("cover / marketing chunk") | |
| score = round(score, 4) | |
| return { | |
| "quality_score": score, | |
| "positive_reasons": reasons, | |
| "penalties": penalties | |
| } | |
| def summarize_results(query: str, results: List[Any], label: str) -> Dict[str, Any]: | |
| rows = [] | |
| for rank, result in enumerate(results, start=1): | |
| quality = quality_score_for_result(query, result) | |
| rows.append( | |
| { | |
| "rank": rank, | |
| "chunk_id": get_chunk_id(result), | |
| "page_number": get_value(result, "page_number"), | |
| "source_file_name": get_value(result, "source_file_name"), | |
| "retrieval_source": get_value(result, "retrieval_source"), | |
| "graph_supported": get_value(result, "graph_supported", False), | |
| "score": get_value(result, "score"), | |
| "graph_score": get_value(result, "graph_score"), | |
| "quality_score": quality["quality_score"], | |
| "positive_reasons": quality["positive_reasons"], | |
| "penalties": quality["penalties"], | |
| "content_preview": preview(get_content(result)) | |
| } | |
| ) | |
| avg_quality = 0.0 | |
| if rows: | |
| avg_quality = round( | |
| sum(row["quality_score"] for row in rows) / len(rows), | |
| 4 | |
| ) | |
| noisy_count = sum(1 for row in rows if row["penalties"]) | |
| return { | |
| "label": label, | |
| "count": len(rows), | |
| "average_quality_score": avg_quality, | |
| "noisy_chunk_count": noisy_count, | |
| "results": rows | |
| } | |
| def compare_graph_fusion_retrieval( | |
| document_id: str, | |
| query: str, | |
| top_k: int = 5, | |
| retrieval_mode: str = "hybrid", | |
| use_reranker: bool = True, | |
| graph_entity_limit: int = 8, | |
| graph_retrieval_top_k: int = 5 | |
| ) -> Dict[str, Any]: | |
| retrieval_output = retrieve_chunks( | |
| query=query, | |
| document_id=document_id, | |
| top_k=top_k, | |
| retrieval_mode=retrieval_mode | |
| ) | |
| normal_results = retrieval_output.get("results", []) | |
| if use_reranker: | |
| normal_results = rerank_results( | |
| query=query, | |
| results=normal_results, | |
| top_k=top_k | |
| ) | |
| else: | |
| normal_results = normal_results[:top_k] | |
| cleaned_normal_results = clean_retrieved_results(normal_results) | |
| sourced_normal_results = attach_source_ids(cleaned_normal_results) | |
| graph_result = graph_guided_retrieve( | |
| document_id=document_id, | |
| query=query, | |
| graph_entity_limit=graph_entity_limit, | |
| top_k=graph_retrieval_top_k | |
| ) | |
| fusion_result = fuse_retrieval_results_with_graph( | |
| document_id=document_id, | |
| query=query, | |
| retrieval_results=sourced_normal_results, | |
| graph_entity_limit=graph_entity_limit, | |
| graph_top_k=graph_retrieval_top_k, | |
| final_top_k=max(top_k, graph_retrieval_top_k) | |
| ) | |
| fused_results = fusion_result.get("fused_results", []) | |
| normal_summary = summarize_results( | |
| query=query, | |
| results=sourced_normal_results, | |
| label="normal_retrieval" | |
| ) | |
| graph_summary = summarize_results( | |
| query=query, | |
| results=graph_result.get("results", []), | |
| label="graph_guided_retrieval" | |
| ) | |
| fused_summary = summarize_results( | |
| query=query, | |
| results=fused_results, | |
| label="fused_retrieval" | |
| ) | |
| improvement = round( | |
| fused_summary["average_quality_score"] | |
| - normal_summary["average_quality_score"], | |
| 4 | |
| ) | |
| if improvement > 0: | |
| verdict = "fusion_improved_retrieval_quality" | |
| elif improvement == 0: | |
| verdict = "fusion_quality_same_as_normal_retrieval" | |
| else: | |
| verdict = "fusion_may_be_adding_noise" | |
| return { | |
| "status": "success", | |
| "document_id": document_id, | |
| "query": query, | |
| "retrieval_mode": retrieval_mode, | |
| "use_reranker": use_reranker, | |
| "comparison": { | |
| "normal_average_quality": normal_summary["average_quality_score"], | |
| "graph_average_quality": graph_summary["average_quality_score"], | |
| "fused_average_quality": fused_summary["average_quality_score"], | |
| "fusion_quality_delta": improvement, | |
| "verdict": verdict | |
| }, | |
| "fusion_stats": { | |
| "fusion_used": fusion_result.get("fusion_used", False), | |
| "normal_count": fusion_result.get("normal_count"), | |
| "graph_added_count": fusion_result.get("graph_added_count"), | |
| "graph_supported_count": fusion_result.get("graph_supported_count"), | |
| "final_count": fusion_result.get("final_count"), | |
| "reason": fusion_result.get("reason") | |
| }, | |
| "normal_retrieval": normal_summary, | |
| "graph_guided_retrieval": graph_summary, | |
| "fused_retrieval": fused_summary, | |
| "notes": [ | |
| "This is a heuristic debug evaluator, not a benchmark metric.", | |
| "Use it to inspect whether graph retrieval is adding useful evidence or noisy chunks.", | |
| "For formal evaluation, use labeled questions and relevance judgments." | |
| ] | |
| } | |
| ''', encoding="utf-8") | |
| # ===================================================== | |
| # 2. Patch main.py | |
| # ===================================================== | |
| main_path = Path("app/main.py") | |
| text = main_path.read_text(encoding="utf-8-sig") | |
| text = text.replace("\ufeff", "") | |
| if "from app.evaluation.graph_fusion_evaluator import compare_graph_fusion_retrieval" not in text: | |
| text = "from app.evaluation.graph_fusion_evaluator import compare_graph_fusion_retrieval\n" + text | |
| old_phases = [ | |
| "Phase 18 - Graph Quality Cleanup", | |
| "Phase 17 - Graph Vector Retrieval Fusion", | |
| "Phase 16 - Graph-Guided Retrieval Debug Layer" | |
| ] | |
| for old in old_phases: | |
| text = text.replace(old, "Phase 19 - GraphRAG Retrieval Fusion Evaluation") | |
| if "# GraphRAG fusion evaluation endpoint" not in text: | |
| text += ''' | |
| # GraphRAG fusion evaluation endpoint | |
| @app.get("/documents/{document_id}/evaluation/graph-fusion") | |
| def evaluate_graph_fusion_for_document( | |
| document_id: str, | |
| query: str = Query(..., min_length=1), | |
| top_k: int = Query(5, ge=1, le=20), | |
| retrieval_mode: str = Query("hybrid"), | |
| use_reranker: bool = True, | |
| graph_entity_limit: int = Query(8, ge=1, le=30), | |
| graph_retrieval_top_k: int = Query(5, ge=1, le=20) | |
| ): | |
| return compare_graph_fusion_retrieval( | |
| document_id=document_id, | |
| query=query, | |
| top_k=top_k, | |
| retrieval_mode=retrieval_mode, | |
| use_reranker=use_reranker, | |
| graph_entity_limit=graph_entity_limit, | |
| graph_retrieval_top_k=graph_retrieval_top_k | |
| ) | |
| ''' | |
| main_path.write_text(text, encoding="utf-8") | |
| print("Phase 19 GraphRAG retrieval fusion evaluation added.") | |