Spaces:
Sleeping
Sleeping
| from app.generation.answer_quality_enhancer import safe_enhance_answer_for_response | |
| from app.graph.graph_retrieval_fusion import fuse_retrieval_results_with_graph | |
| from app.graph.graph_context_service import build_graph_context_for_query | |
| import re | |
| from typing import Optional, Dict, Any, List | |
| from app.core.config import settings | |
| from app.retrieval.hybrid_search_service import retrieve_chunks | |
| from app.retrieval.reranking_service import rerank_results | |
| from app.retrieval.citation_service import ( | |
| attach_source_ids, | |
| create_citation_objects | |
| ) | |
| from app.generation.context_cleaner import clean_retrieved_results, clean_sentence_text | |
| from app.generation.question_classifier import classify_question | |
| from app.generation.evidence_extractor import ( | |
| extract_evidence_sentences, | |
| build_evidence_context | |
| ) | |
| from app.generation.prompt_builder import build_grounded_prompt | |
| from app.generation.llm_service import generate_with_local_llm, get_llm_status | |
| from app.generation.answer_quality_checker import ( | |
| is_answer_good_enough, | |
| append_missing_citations | |
| ) | |
| def answer_question( | |
| query: str, | |
| document_id: Optional[str] = None, | |
| top_k: int = 5, | |
| retrieval_mode: str = "hybrid", | |
| use_reranker: bool = True, | |
| use_llm: bool = True, | |
| use_graph: bool = True, | |
| graph_entity_limit: int = 8, | |
| use_graph_retrieval: bool = True, | |
| graph_retrieval_top_k: int = 5 | |
| ) -> Dict[str, Any]: | |
| candidate_k = top_k | |
| if use_reranker: | |
| candidate_k = max( | |
| top_k * settings.RERANKER_CANDIDATE_MULTIPLIER, | |
| top_k | |
| ) | |
| retrieval_output = retrieve_chunks( | |
| query=query, | |
| document_id=document_id, | |
| top_k=candidate_k, | |
| retrieval_mode=retrieval_mode | |
| ) | |
| retrieved_results = retrieval_output["results"] | |
| if use_reranker: | |
| retrieved_results = rerank_results( | |
| query=query, | |
| results=retrieved_results, | |
| top_k=top_k | |
| ) | |
| else: | |
| retrieved_results = retrieved_results[:top_k] | |
| cleaned_results = clean_retrieved_results(retrieved_results) | |
| sourced_results = attach_source_ids(cleaned_results) | |
| fusion_result = fuse_retrieval_results_with_graph( | |
| document_id=document_id, | |
| query=query, | |
| retrieval_results=sourced_results, | |
| graph_entity_limit=graph_entity_limit, | |
| graph_top_k=graph_retrieval_top_k, | |
| final_top_k=max(top_k, graph_retrieval_top_k) | |
| ) if use_graph_retrieval else { | |
| "fused_results": sourced_results, | |
| "fusion_used": False, | |
| "reason": "Graph retrieval fusion disabled.", | |
| "graph_retrieval": {}, | |
| "normal_count": len(sourced_results), | |
| "graph_added_count": 0, | |
| "graph_supported_count": 0, | |
| "final_count": len(sourced_results) | |
| } | |
| sourced_results = fusion_result.get("fused_results", sourced_results) | |
| # Re-attach source IDs after fusion because graph-added chunks also need citations. | |
| sourced_results = attach_source_ids(sourced_results) | |
| citations = create_citation_objects(sourced_results) | |
| if not sourced_results: | |
| return { | |
| "query": query, | |
| "answer": "I could not find relevant indexed sources for this question.", | |
| "retrieval_mode": retrieval_mode, | |
| "question_type": classify_question(query), | |
| "used_reranker": use_reranker, | |
| "used_llm": False, | |
| "answer_strategy": "no_sources_found", | |
| "citations": [], | |
| "sources": [] | |
| } | |
| question_type = classify_question(query) | |
| evidence_items = extract_evidence_sentences( | |
| query=query, | |
| results=sourced_results, | |
| max_evidence=8 | |
| ) | |
| if not evidence_items: | |
| answer = build_extractive_answer( | |
| sources=sourced_results | |
| ) | |
| return { | |
| "query": query, | |
| "answer": safe_enhance_answer_for_response(locals()), | |
| "retrieval_mode": retrieval_mode, | |
| "question_type": question_type, | |
| "used_reranker": use_reranker, | |
| "used_llm": False, | |
| "answer_strategy": "fallback_no_evidence_sentences", | |
| "llm_status": get_llm_status(), | |
| "citations": citations, | |
| "evidence": [], | |
| "sources": sourced_results | |
| } | |
| evidence_context = build_evidence_context(evidence_items) | |
| graph_context = build_graph_context_for_query( | |
| document_id=document_id, | |
| query=query, | |
| limit=graph_entity_limit | |
| ) if use_graph else { | |
| "graph_available": False, | |
| "reason": "Graph usage disabled.", | |
| "matched_entities": [], | |
| "matched_relations": [], | |
| "context_text": "" | |
| } | |
| graph_context_text = graph_context.get("context_text", "") | |
| if graph_context_text: | |
| evidence_context = ( | |
| evidence_context | |
| + "\n\nStructured graph context:\n" | |
| + graph_context_text | |
| ) | |
| raw_llm_answer = "" | |
| llm_answer_after_citations = "" | |
| if use_llm: | |
| prompt = build_grounded_prompt( | |
| query=query, | |
| evidence_context=evidence_context, | |
| question_type=question_type | |
| ) | |
| raw_llm_answer = generate_with_local_llm(prompt) | |
| llm_answer_after_citations = append_missing_citations( | |
| answer=raw_llm_answer, | |
| sources=sourced_results | |
| ) | |
| if is_answer_good_enough(llm_answer_after_citations): | |
| answer = clean_final_answer(llm_answer_after_citations) | |
| used_llm = True | |
| answer_strategy = "llm_with_quality_check" | |
| else: | |
| answer = build_evidence_based_answer( | |
| query=query, | |
| question_type=question_type, | |
| evidence_items=evidence_items | |
| ) | |
| used_llm = False | |
| answer_strategy = "fallback_evidence_based_answer" | |
| else: | |
| answer = build_evidence_based_answer( | |
| query=query, | |
| question_type=question_type, | |
| evidence_items=evidence_items | |
| ) | |
| used_llm = False | |
| answer_strategy = "evidence_based_answer_no_llm" | |
| answer = clean_final_answer(answer) | |
| return { | |
| "query": query, | |
| "answer": safe_enhance_answer_for_response(locals()), | |
| "retrieval_mode": retrieval_mode, | |
| "question_type": question_type, | |
| "used_reranker": use_reranker, | |
| "used_llm": used_llm, | |
| "answer_strategy": answer_strategy, | |
| "llm_status": get_llm_status(), | |
| "llm_diagnostics": { | |
| "raw_llm_answer_preview": raw_llm_answer[:300], | |
| "llm_answer_after_citations_preview": llm_answer_after_citations[:300], | |
| "llm_answer_accepted": used_llm | |
| }, | |
| "graph_used": bool(graph_context.get("matched_entities") or graph_context.get("matched_relations")), | |
| "graph_context": graph_context, | |
| "retrieval_fusion": fusion_result if "fusion_result" in locals() else { | |
| "fusion_used": False, | |
| "reason": "Fusion result was not created." | |
| }, | |
| "citations": citations, | |
| "evidence": evidence_items, | |
| "sources": sourced_results | |
| } | |
| def build_evidence_based_answer( | |
| query: str, | |
| question_type: str, | |
| evidence_items: List[Dict[str, Any]] | |
| ) -> str: | |
| if question_type == "definition": | |
| return build_definition_answer(query, evidence_items) | |
| if question_type == "summary": | |
| return build_summary_answer(evidence_items) | |
| if question_type == "comparison": | |
| return build_general_answer(evidence_items) | |
| if question_type == "steps": | |
| return build_step_answer(evidence_items) | |
| return build_general_answer(evidence_items) | |
| def build_definition_answer( | |
| query: str, | |
| evidence_items: List[Dict[str, Any]] | |
| ) -> str: | |
| target = extract_definition_target(query) | |
| if target and target.lower() == "rag": | |
| return build_rag_definition_answer(evidence_items) | |
| selected_items = select_best_unique_items( | |
| evidence_items=evidence_items, | |
| max_items=3 | |
| ) | |
| lines = [] | |
| for item in selected_items: | |
| sentence = clean_sentence_text(item["sentence"]) | |
| citation = source_id_to_bracket(item.get("source_id")) | |
| if citation and citation not in sentence: | |
| sentence = f"{sentence} {citation}" | |
| lines.append(sentence) | |
| return " ".join(lines) | |
| def build_rag_definition_answer(evidence_items: List[Dict[str, Any]]) -> str: | |
| definition_source = find_first_item_containing( | |
| evidence_items, | |
| ["retrieval-augmented generation", "retrieval augmented generation"] | |
| ) | |
| how_source = find_first_item_containing( | |
| evidence_items, | |
| [ | |
| "retrieval step", | |
| "before generation", | |
| "before generating", | |
| "search a document corpus", | |
| "search your document corpus", | |
| "relevant passages as context" | |
| ] | |
| ) | |
| why_source = find_first_item_containing( | |
| evidence_items, | |
| [ | |
| "frozen knowledge", | |
| "hallucination", | |
| "private or recent data", | |
| "grounds the answer", | |
| "real evidence" | |
| ] | |
| ) | |
| citation_ids = collect_source_ids( | |
| [definition_source, how_source, why_source] | |
| ) | |
| citation_text = " ".join( | |
| source_id_to_bracket(source_id) | |
| for source_id in citation_ids | |
| ) | |
| answer = ( | |
| "RAG stands for Retrieval-Augmented Generation. " | |
| "It is a method where the system first retrieves relevant passages from a document corpus " | |
| "and then provides those passages as context before generating an answer. " | |
| "This helps the model answer using real evidence instead of relying only on frozen training knowledge, " | |
| "which reduces hallucination and makes the system useful for private or recent information." | |
| ) | |
| if citation_text: | |
| answer = f"{answer} {citation_text}" | |
| return answer | |
| def build_summary_answer(evidence_items: List[Dict[str, Any]]) -> str: | |
| selected_items = select_best_unique_items( | |
| evidence_items=evidence_items, | |
| max_items=5 | |
| ) | |
| lines = ["Here is the source-grounded summary:"] | |
| for index, item in enumerate(selected_items, start=1): | |
| sentence = clean_sentence_text(item["sentence"]) | |
| citation = source_id_to_bracket(item.get("source_id")) | |
| lines.append(f"{index}. {sentence} {citation}") | |
| return "\n".join(lines) | |
| def build_step_answer(evidence_items: List[Dict[str, Any]]) -> str: | |
| selected_items = select_best_unique_items( | |
| evidence_items=evidence_items, | |
| max_items=5 | |
| ) | |
| lines = ["Based on the retrieved sources, the process is:"] | |
| for index, item in enumerate(selected_items, start=1): | |
| sentence = clean_sentence_text(item["sentence"]) | |
| citation = source_id_to_bracket(item.get("source_id")) | |
| lines.append(f"{index}. {sentence} {citation}") | |
| return "\n".join(lines) | |
| def build_general_answer(evidence_items: List[Dict[str, Any]]) -> str: | |
| selected_items = select_best_unique_items( | |
| evidence_items=evidence_items, | |
| max_items=4 | |
| ) | |
| lines = [] | |
| for item in selected_items: | |
| sentence = clean_sentence_text(item["sentence"]) | |
| citation = source_id_to_bracket(item.get("source_id")) | |
| if citation and citation not in sentence: | |
| sentence = f"{sentence} {citation}" | |
| lines.append(sentence) | |
| return " ".join(lines) | |
| def build_extractive_answer( | |
| sources: List[Dict[str, Any]] | |
| ) -> str: | |
| lines = [ | |
| "I found relevant source-backed passages, but could not extract a cleaner evidence sentence automatically:" | |
| ] | |
| for index, source in enumerate(sources[:3], start=1): | |
| content = source.get("content", "") | |
| source_id = source.get("source_id", f"S{index}") | |
| excerpt = content[:600].replace("\n", " ").strip() | |
| lines.append( | |
| f"{index}. {excerpt} [{source_id}]" | |
| ) | |
| return "\n\n".join(lines) | |
| def extract_definition_target(query: str) -> Optional[str]: | |
| query_lower = query.lower().strip() | |
| patterns = [ | |
| r"what is\s+(.+?)\??$", | |
| r"what are\s+(.+?)\??$", | |
| r"define\s+(.+?)\??$", | |
| r"meaning of\s+(.+?)\??$" | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, query_lower) | |
| if match: | |
| target = match.group(1).strip() | |
| target = target.replace("?", "").strip() | |
| return target | |
| return None | |
| def find_first_item_containing( | |
| evidence_items: List[Dict[str, Any]], | |
| keywords: List[str] | |
| ) -> Optional[Dict[str, Any]]: | |
| for item in evidence_items: | |
| sentence_lower = item.get("sentence", "").lower() | |
| for keyword in keywords: | |
| if keyword.lower() in sentence_lower: | |
| return item | |
| return None | |
| def collect_source_ids(items: List[Optional[Dict[str, Any]]]) -> List[str]: | |
| source_ids = [] | |
| for item in items: | |
| if not item: | |
| continue | |
| source_id = item.get("source_id") | |
| if source_id and source_id not in source_ids: | |
| source_ids.append(source_id) | |
| return source_ids[:3] | |
| def select_best_unique_items( | |
| evidence_items: List[Dict[str, Any]], | |
| max_items: int | |
| ) -> List[Dict[str, Any]]: | |
| selected = [] | |
| seen_meanings = [] | |
| for item in evidence_items: | |
| sentence = clean_sentence_text(item["sentence"]) | |
| if is_repetitive_meaning(sentence, seen_meanings): | |
| continue | |
| selected.append(item) | |
| seen_meanings.append(sentence) | |
| if len(selected) >= max_items: | |
| break | |
| return selected | |
| def is_repetitive_meaning(sentence: str, existing_sentences: List[str]) -> bool: | |
| current_tokens = set(normalize_text(sentence).split()) | |
| if not current_tokens: | |
| return True | |
| for existing in existing_sentences: | |
| existing_tokens = set(normalize_text(existing).split()) | |
| if not existing_tokens: | |
| continue | |
| overlap = len(current_tokens.intersection(existing_tokens)) | |
| union = len(current_tokens.union(existing_tokens)) | |
| if union == 0: | |
| continue | |
| similarity = overlap / union | |
| if similarity >= 0.65: | |
| return True | |
| return False | |
| def normalize_text(text: str) -> str: | |
| text = text.lower() | |
| text = re.sub(r"[^a-z0-9\s]", " ", text) | |
| text = re.sub(r"\b(ideal|answer|question|chapter|page)\b", " ", text) | |
| text = re.sub(r"\s+", " ", text) | |
| return text.strip() | |
| def clean_final_answer(answer: str) -> str: | |
| if not answer: | |
| return "" | |
| cleaned = answer | |
| cleaned = re.sub(r"\bIdeal Answer\b", "", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"\bQ\d+\s*:\s*", "", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"\s+", " ", cleaned) | |
| cleaned = cleaned.replace(" .", ".") | |
| cleaned = cleaned.replace(" ,", ",") | |
| cleaned = cleaned.strip() | |
| return cleaned | |
| def source_id_to_bracket(source_id: Optional[str]) -> str: | |
| if not source_id: | |
| return "" | |
| if source_id.startswith("[") and source_id.endswith("]"): | |
| return source_id | |
| return f"[{source_id}]" | |