Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| # Remove BOM from Python files | |
| for path in Path("app").rglob("*.py"): | |
| text = path.read_text(encoding="utf-8-sig") | |
| text = text.replace("\ufeff", "") | |
| path.write_text(text, encoding="utf-8") | |
| print("BOM cleanup completed.") | |
| # ===================================================== | |
| # 1. Shared graph quality filters | |
| # ===================================================== | |
| Path("app/graph/graph_quality.py").write_text(r''' | |
| import re | |
| from typing import Any | |
| BAD_ENTITY_NAMES = { | |
| "what", "why", "when", "where", "who", "how", | |
| "is", "are", "was", "were", "be", "been", "being", | |
| "this", "that", "these", "those", "it", "they", "them", | |
| "page", "chapter", "section", "paragraph", "figure", "table", | |
| "contents", "overview", "summary", "introduction", "conclusion", | |
| "question", "answer", "example", "note", "notes", | |
| "part", "step", "case", "item", "level", "scope" | |
| } | |
| BAD_SINGLE_WORDS = BAD_ENTITY_NAMES | { | |
| "one", "two", "three", "first", "second", "third", | |
| "good", "bad", "new", "old", "main", "basic", "advanced" | |
| } | |
| def get_value(obj: Any, key: str, default=None): | |
| if isinstance(obj, dict): | |
| return obj.get(key, default) | |
| return getattr(obj, key, default) | |
| def normalize_name(name: str) -> str: | |
| return re.sub(r"\s+", " ", str(name or "")).strip() | |
| def tokenize_name(name: str): | |
| return re.findall(r"[a-zA-Z0-9_]+", str(name or "").lower()) | |
| def is_noisy_entity_name(name: str) -> bool: | |
| name = normalize_name(name) | |
| if not name: | |
| return True | |
| name_lower = name.lower() | |
| tokens = tokenize_name(name) | |
| if name_lower in BAD_ENTITY_NAMES: | |
| return True | |
| if len(tokens) == 1 and tokens[0] in BAD_SINGLE_WORDS: | |
| return True | |
| if len(name) <= 1: | |
| return True | |
| # Very short uppercase words like IS, OR, TO are usually not entities. | |
| # Keep useful acronyms like RAG, LLM, API, OCR, SQL, NLP, BM25. | |
| useful_acronyms = {"rag", "llm", "api", "ocr", "sql", "nlp", "bm25", "gpt", "pdf", "mvp"} | |
| if name.isupper() and len(name) <= 3 and name_lower not in useful_acronyms: | |
| return True | |
| if name_lower.startswith("chapter ") and len(tokens) <= 4: | |
| return True | |
| if name_lower.startswith("page ") and len(tokens) <= 4: | |
| return True | |
| return False | |
| def is_noisy_relation(relation: Any) -> bool: | |
| source = get_value(relation, "source_name") or get_value(relation, "source") | |
| target = get_value(relation, "target_name") or get_value(relation, "target") | |
| relation_type = str(get_value(relation, "relation_type", "")).upper() | |
| if is_noisy_entity_name(source): | |
| return True | |
| if is_noisy_entity_name(target): | |
| return True | |
| # IS_A from rule-based extraction is noisy unless both sides look meaningful. | |
| if relation_type == "IS_A": | |
| target_tokens = tokenize_name(target) | |
| if len(target_tokens) == 1 and target_tokens[0] in BAD_SINGLE_WORDS: | |
| return True | |
| return False | |
| def is_low_quality_chunk_text(text: str) -> bool: | |
| text = str(text or "").strip() | |
| if not text: | |
| return True | |
| lower = text.lower() | |
| dot_leaders = len(re.findall(r"\.{5,}", text)) | |
| words = re.findall(r"[a-zA-Z]{3,}", text) | |
| # Table-of-content pages often contain many dot leaders. | |
| if dot_leaders >= 3: | |
| return True | |
| if "table of contents" in lower and dot_leaders >= 1: | |
| return True | |
| # Mostly heading/index text, not answer evidence. | |
| heading_markers = [ | |
| "chapter ", | |
| "page ", | |
| "................................................................" | |
| ] | |
| marker_count = sum(1 for marker in heading_markers if marker in lower) | |
| if marker_count >= 2 and len(words) < 90: | |
| return True | |
| return False | |
| ''', encoding="utf-8") | |
| # ===================================================== | |
| # 2. Improve entity extractor | |
| # ===================================================== | |
| Path("app/graph/entity_extractor.py").write_text(r''' | |
| import re | |
| from typing import List, Dict, Any | |
| from app.graph.graph_quality import is_noisy_entity_name | |
| STOP_ENTITIES = { | |
| "The", "This", "That", "These", "Those", "It", "They", "We", "You", | |
| "Page", "Chapter", "Figure", "Table", "Example", "Answer", "Question", | |
| "Introduction", "Conclusion", "Summary", "Overview", "Paragraph", | |
| "What", "Why", "When", "Where", "Who", "How", "Is", "Are", "IS" | |
| } | |
| def normalize_entity_name(name: str) -> str: | |
| name = re.sub(r"\s+", " ", name or "").strip() | |
| name = name.strip(".,;:()[]{}") | |
| return name | |
| def make_entity_id(name: str) -> str: | |
| cleaned = name.lower() | |
| cleaned = re.sub(r"[^a-z0-9]+", "_", cleaned) | |
| cleaned = cleaned.strip("_") | |
| return cleaned[:80] or "unknown_entity" | |
| def classify_entity(name: str) -> str: | |
| if re.fullmatch(r"[A-Z][A-Z0-9]{1,9}", name): | |
| return "ACRONYM" | |
| org_markers = [ | |
| "University", "Institute", "Corporation", "Corp", "Inc", "Ltd", | |
| "Company", "OpenAI", "Microsoft", "Google", "Amazon" | |
| ] | |
| if any(marker.lower() in name.lower() for marker in org_markers): | |
| return "ORGANIZATION" | |
| if any(char.isdigit() for char in name): | |
| return "TECHNICAL_TERM" | |
| if "-" in name or "/" in name: | |
| return "TECHNICAL_TERM" | |
| return "CONCEPT" | |
| def is_valid_entity(name: str) -> bool: | |
| if not name: | |
| return False | |
| if name in STOP_ENTITIES: | |
| return False | |
| if is_noisy_entity_name(name): | |
| return False | |
| if len(name) < 2: | |
| return False | |
| if len(name) > 90: | |
| return False | |
| return True | |
| def extract_entities_from_text(text: str) -> List[Dict[str, Any]]: | |
| if not text: | |
| return [] | |
| candidates = [] | |
| # Acronyms like RAG, LLM, API, OCR, BM25 | |
| for match in re.finditer(r"\b[A-Z][A-Z0-9]{1,9}\b", text): | |
| candidates.append(match.group(0)) | |
| # Capitalized technical phrases like Retrieval-Augmented Generation | |
| capitalized_phrase_pattern = ( | |
| r"\b[A-Z][a-zA-Z0-9]*(?:[-/][A-Z]?[a-zA-Z0-9]+)?" | |
| r"(?:\s+[A-Z][a-zA-Z0-9]*(?:[-/][A-Z]?[a-zA-Z0-9]+)?){0,5}\b" | |
| ) | |
| for match in re.finditer(capitalized_phrase_pattern, text): | |
| candidates.append(match.group(0)) | |
| cleaned_entities = [] | |
| seen = set() | |
| for candidate in candidates: | |
| name = normalize_entity_name(candidate) | |
| if not is_valid_entity(name): | |
| continue | |
| entity_id = make_entity_id(name) | |
| if entity_id in seen: | |
| continue | |
| seen.add(entity_id) | |
| cleaned_entities.append( | |
| { | |
| "entity_id": entity_id, | |
| "name": name, | |
| "entity_type": classify_entity(name) | |
| } | |
| ) | |
| return cleaned_entities | |
| def split_sentences(text: str) -> List[str]: | |
| if not text: | |
| return [] | |
| parts = re.split(r"(?<=[.!?])\s+", text) | |
| return [part.strip() for part in parts if len(part.strip()) > 20] | |
| ''', encoding="utf-8") | |
| # ===================================================== | |
| # 3. Improve relation extractor | |
| # ===================================================== | |
| Path("app/graph/relation_extractor.py").write_text(r''' | |
| import itertools | |
| import re | |
| from typing import List, Dict, Any | |
| from app.graph.entity_extractor import split_sentences | |
| from app.graph.graph_quality import is_noisy_entity_name | |
| VERB_RELATION_MAP = { | |
| "stands for": "STANDS_FOR", | |
| "refers to": "REFERS_TO", | |
| "uses": "USES", | |
| "use": "USES", | |
| "retrieves": "RETRIEVES", | |
| "retrieve": "RETRIEVES", | |
| "generates": "GENERATES", | |
| "generate": "GENERATES", | |
| "provides": "PROVIDES", | |
| "provide": "PROVIDES", | |
| "reduces": "REDUCES", | |
| "reduce": "REDUCES", | |
| "improves": "IMPROVES", | |
| "improve": "IMPROVES", | |
| "contains": "CONTAINS", | |
| "include": "INCLUDES", | |
| "includes": "INCLUDES" | |
| } | |
| def relation_id(source_id: str, relation_type: str, target_id: str) -> str: | |
| return f"{source_id}__{relation_type.lower()}__{target_id}"[:160] | |
| def entity_appears_in_sentence(entity_name: str, sentence: str) -> bool: | |
| pattern = r"\b" + re.escape(entity_name) + r"\b" | |
| return re.search(pattern, sentence, flags=re.IGNORECASE) is not None | |
| def extract_relations_from_text( | |
| text: str, | |
| entities: List[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| if not text or len(entities) < 2: | |
| return [] | |
| relations = [] | |
| sentences = split_sentences(text) | |
| clean_entities = [ | |
| entity for entity in entities | |
| if not is_noisy_entity_name(entity.get("name", "")) | |
| ] | |
| if len(clean_entities) < 2: | |
| return [] | |
| for sentence in sentences: | |
| present_entities = [ | |
| entity for entity in clean_entities | |
| if entity_appears_in_sentence(entity["name"], sentence) | |
| ] | |
| # Avoid relation explosion | |
| present_entities = present_entities[:5] | |
| if len(present_entities) < 2: | |
| continue | |
| relation_type = detect_relation_type(sentence) | |
| for source, target in itertools.combinations(present_entities, 2): | |
| if source["entity_id"] == target["entity_id"]: | |
| continue | |
| if is_noisy_entity_name(source["name"]) or is_noisy_entity_name(target["name"]): | |
| continue | |
| relations.append( | |
| { | |
| "relation_id": relation_id( | |
| source["entity_id"], | |
| relation_type, | |
| target["entity_id"] | |
| ), | |
| "source_entity_id": source["entity_id"], | |
| "target_entity_id": target["entity_id"], | |
| "source_name": source["name"], | |
| "target_name": target["name"], | |
| "relation_type": relation_type, | |
| "evidence_sentence": sentence | |
| } | |
| ) | |
| return relations | |
| def detect_relation_type(sentence: str) -> str: | |
| sentence_lower = sentence.lower() | |
| for phrase, relation_type in VERB_RELATION_MAP.items(): | |
| if phrase in sentence_lower: | |
| return relation_type | |
| return "RELATED_TO" | |
| ''', encoding="utf-8") | |
| # ===================================================== | |
| # 4. Improve graph context filtering | |
| # ===================================================== | |
| Path("app/graph/graph_context_service.py").write_text(r''' | |
| import re | |
| from typing import Dict, Any, List, Optional | |
| from app.graph.graph_storage import read_document_graph | |
| from app.graph.graph_quality import is_noisy_entity_name, is_noisy_relation | |
| STOPWORDS = { | |
| "what", "is", "are", "the", "a", "an", "of", "to", "and", "or", | |
| "in", "on", "for", "with", "from", "by", "how", "why", "explain", | |
| "define", "meaning", "does", "do", "it", "this", "that" | |
| } | |
| def tokenize_query(query: str) -> List[str]: | |
| words = re.findall(r"[a-zA-Z0-9_]+", (query or "").lower()) | |
| return [ | |
| word for word in words | |
| if word not in STOPWORDS and len(word) > 1 | |
| ] | |
| def tokenize_entity_name(name: str) -> List[str]: | |
| return re.findall(r"[a-zA-Z0-9_]+", (name or "").lower()) | |
| def entity_relevance_score(entity, query_terms: List[str]) -> float: | |
| if not query_terms: | |
| return 0.0 | |
| if is_noisy_entity_name(entity.name): | |
| return 0.0 | |
| name_lower = entity.name.lower() | |
| entity_id_lower = entity.entity_id.lower() | |
| name_tokens = tokenize_entity_name(entity.name) | |
| entity_id_tokens = tokenize_entity_name(entity.entity_id.replace("_", " ")) | |
| score = 0.0 | |
| for term in query_terms: | |
| if term == name_lower or term == entity_id_lower: | |
| score += 10.0 | |
| continue | |
| if term in name_tokens: | |
| score += 6.0 | |
| continue | |
| if term in entity_id_tokens: | |
| score += 5.0 | |
| continue | |
| # Avoid rag matching paragraph. Substring only for longer terms. | |
| if len(term) >= 4 and term in name_lower: | |
| score += 2.0 | |
| if score > 0: | |
| score += min(entity.mention_count, 10) * 0.15 | |
| return score | |
| def build_graph_context_for_query( | |
| document_id: Optional[str], | |
| query: str, | |
| limit: int = 8 | |
| ) -> Dict[str, Any]: | |
| if not document_id: | |
| return { | |
| "graph_available": False, | |
| "reason": "No document_id provided.", | |
| "matched_entities": [], | |
| "matched_relations": [], | |
| "context_text": "" | |
| } | |
| graph = read_document_graph(document_id) | |
| if graph is None: | |
| return { | |
| "graph_available": False, | |
| "reason": "Graph not built for this document.", | |
| "matched_entities": [], | |
| "matched_relations": [], | |
| "context_text": "" | |
| } | |
| query_terms = tokenize_query(query) | |
| scored_entities = [] | |
| for entity in graph.entities: | |
| score = entity_relevance_score(entity, query_terms) | |
| if score > 0: | |
| scored_entities.append((score, entity)) | |
| scored_entities.sort(key=lambda item: item[0], reverse=True) | |
| matched_entities = [ | |
| entity for score, entity in scored_entities[:limit] | |
| ] | |
| matched_entity_ids = { | |
| entity.entity_id for entity in matched_entities | |
| } | |
| matched_relations = [] | |
| for relation in graph.relations: | |
| if is_noisy_relation(relation): | |
| continue | |
| if ( | |
| relation.source_entity_id in matched_entity_ids | |
| or relation.target_entity_id in matched_entity_ids | |
| ): | |
| matched_relations.append(relation) | |
| matched_relations = sorted( | |
| matched_relations, | |
| key=lambda relation: relation.weight, | |
| reverse=True | |
| )[:limit] | |
| context_text = build_graph_context_text( | |
| matched_entities=matched_entities, | |
| matched_relations=matched_relations | |
| ) | |
| return { | |
| "graph_available": True, | |
| "document_id": document_id, | |
| "source_file_name": graph.source_file_name, | |
| "query_terms": query_terms, | |
| "matched_entities": [ | |
| { | |
| "entity_id": entity.entity_id, | |
| "name": entity.name, | |
| "entity_type": entity.entity_type, | |
| "mention_count": entity.mention_count, | |
| "pages": entity.pages[:10], | |
| "chunk_ids": entity.chunk_ids[:10] | |
| } | |
| for entity in matched_entities | |
| ], | |
| "matched_relations": [ | |
| { | |
| "relation_id": relation.relation_id, | |
| "source": relation.source_name, | |
| "relation_type": relation.relation_type, | |
| "target": relation.target_name, | |
| "weight": relation.weight, | |
| "pages": relation.pages[:10], | |
| "chunk_ids": relation.chunk_ids[:10] | |
| } | |
| for relation in matched_relations | |
| ], | |
| "context_text": context_text | |
| } | |
| def build_graph_context_text( | |
| matched_entities, | |
| matched_relations | |
| ) -> str: | |
| lines = [] | |
| if matched_entities: | |
| lines.append("Relevant graph entities:") | |
| for entity in matched_entities: | |
| pages = ", ".join(str(page) for page in entity.pages[:5]) | |
| lines.append( | |
| f"- {entity.name} ({entity.entity_type}), mentions={entity.mention_count}, pages={pages}" | |
| ) | |
| if matched_relations: | |
| lines.append("") | |
| lines.append("Relevant graph relations:") | |
| for relation in matched_relations: | |
| lines.append( | |
| f"- {relation.source_name} --{relation.relation_type}--> {relation.target_name} " | |
| f"(weight={relation.weight})" | |
| ) | |
| return "\n".join(lines).strip() | |
| ''', encoding="utf-8") | |
| # ===================================================== | |
| # 5. Improve graph-guided retrieval | |
| # ===================================================== | |
| retriever_path = Path("app/graph/graph_guided_retriever.py") | |
| text = retriever_path.read_text(encoding="utf-8-sig") | |
| text = text.replace("\ufeff", "") | |
| if "from app.graph.graph_quality import is_low_quality_chunk_text" not in text: | |
| text = text.replace( | |
| "from app.storage.processed_storage import read_processed_chunks", | |
| "from app.storage.processed_storage import read_processed_chunks\nfrom app.graph.graph_quality import is_low_quality_chunk_text" | |
| ) | |
| old = ''' results.append( | |
| { | |
| "rank": rank, | |
| "chunk_id": chunk_id, | |
| "graph_score": round(info["score"], 4), | |
| "page_number": get_value(chunk, "page_number"), | |
| "source_file_name": ( | |
| get_value(chunk, "source_file_name") | |
| or get_value(chunk, "file_name") | |
| or get_value(chunk, "filename") | |
| ), | |
| "matched_entities": sorted(set(info["matched_entities"])), | |
| "matched_relations": sorted(set(info["matched_relations"])), | |
| "text_preview": extract_text_preview(chunk) | |
| } | |
| ) | |
| ''' | |
| new = ''' text_preview = extract_text_preview(chunk) | |
| if is_low_quality_chunk_text(text_preview): | |
| continue | |
| results.append( | |
| { | |
| "rank": len(results) + 1, | |
| "chunk_id": chunk_id, | |
| "graph_score": round(info["score"], 4), | |
| "page_number": get_value(chunk, "page_number"), | |
| "source_file_name": ( | |
| get_value(chunk, "source_file_name") | |
| or get_value(chunk, "file_name") | |
| or get_value(chunk, "filename") | |
| ), | |
| "matched_entities": sorted(set(info["matched_entities"])), | |
| "matched_relations": sorted(set(info["matched_relations"])), | |
| "text_preview": text_preview | |
| } | |
| ) | |
| ''' | |
| if old in text: | |
| text = text.replace(old, new) | |
| else: | |
| print("Graph retriever append block not found. It may already be patched.") | |
| retriever_path.write_text(text, encoding="utf-8") | |
| print("Phase 18 graph quality cleanup applied.") | |