GraphResearcher / app /generation /answer_service.py
yugbirla's picture
Improve answer quality with detailed source-grounded responses
98a8ef8
Raw
History Blame Contribute Delete
15.3 kB
from app.generation.answer_quality_enhancer import safe_enhance_answer_for_response
from app.graph.graph_retrieval_fusion import fuse_retrieval_results_with_graph
from app.graph.graph_context_service import build_graph_context_for_query
import re
from typing import Optional, Dict, Any, List
from app.core.config import settings
from app.retrieval.hybrid_search_service import retrieve_chunks
from app.retrieval.reranking_service import rerank_results
from app.retrieval.citation_service import (
attach_source_ids,
create_citation_objects
)
from app.generation.context_cleaner import clean_retrieved_results, clean_sentence_text
from app.generation.question_classifier import classify_question
from app.generation.evidence_extractor import (
extract_evidence_sentences,
build_evidence_context
)
from app.generation.prompt_builder import build_grounded_prompt
from app.generation.llm_service import generate_with_local_llm, get_llm_status
from app.generation.answer_quality_checker import (
is_answer_good_enough,
append_missing_citations
)
def answer_question(
query: str,
document_id: Optional[str] = None,
top_k: int = 5,
retrieval_mode: str = "hybrid",
use_reranker: bool = True,
use_llm: bool = True,
use_graph: bool = True,
graph_entity_limit: int = 8,
use_graph_retrieval: bool = True,
graph_retrieval_top_k: int = 5
) -> Dict[str, Any]:
candidate_k = top_k
if use_reranker:
candidate_k = max(
top_k * settings.RERANKER_CANDIDATE_MULTIPLIER,
top_k
)
retrieval_output = retrieve_chunks(
query=query,
document_id=document_id,
top_k=candidate_k,
retrieval_mode=retrieval_mode
)
retrieved_results = retrieval_output["results"]
if use_reranker:
retrieved_results = rerank_results(
query=query,
results=retrieved_results,
top_k=top_k
)
else:
retrieved_results = retrieved_results[:top_k]
cleaned_results = clean_retrieved_results(retrieved_results)
sourced_results = attach_source_ids(cleaned_results)
fusion_result = fuse_retrieval_results_with_graph(
document_id=document_id,
query=query,
retrieval_results=sourced_results,
graph_entity_limit=graph_entity_limit,
graph_top_k=graph_retrieval_top_k,
final_top_k=max(top_k, graph_retrieval_top_k)
) if use_graph_retrieval else {
"fused_results": sourced_results,
"fusion_used": False,
"reason": "Graph retrieval fusion disabled.",
"graph_retrieval": {},
"normal_count": len(sourced_results),
"graph_added_count": 0,
"graph_supported_count": 0,
"final_count": len(sourced_results)
}
sourced_results = fusion_result.get("fused_results", sourced_results)
# Re-attach source IDs after fusion because graph-added chunks also need citations.
sourced_results = attach_source_ids(sourced_results)
citations = create_citation_objects(sourced_results)
if not sourced_results:
return {
"query": query,
"answer": "I could not find relevant indexed sources for this question.",
"retrieval_mode": retrieval_mode,
"question_type": classify_question(query),
"used_reranker": use_reranker,
"used_llm": False,
"answer_strategy": "no_sources_found",
"citations": [],
"sources": []
}
question_type = classify_question(query)
evidence_items = extract_evidence_sentences(
query=query,
results=sourced_results,
max_evidence=8
)
if not evidence_items:
answer = build_extractive_answer(
sources=sourced_results
)
return {
"query": query,
"answer": safe_enhance_answer_for_response(locals()),
"retrieval_mode": retrieval_mode,
"question_type": question_type,
"used_reranker": use_reranker,
"used_llm": False,
"answer_strategy": "fallback_no_evidence_sentences",
"llm_status": get_llm_status(),
"citations": citations,
"evidence": [],
"sources": sourced_results
}
evidence_context = build_evidence_context(evidence_items)
graph_context = build_graph_context_for_query(
document_id=document_id,
query=query,
limit=graph_entity_limit
) if use_graph else {
"graph_available": False,
"reason": "Graph usage disabled.",
"matched_entities": [],
"matched_relations": [],
"context_text": ""
}
graph_context_text = graph_context.get("context_text", "")
if graph_context_text:
evidence_context = (
evidence_context
+ "\n\nStructured graph context:\n"
+ graph_context_text
)
raw_llm_answer = ""
llm_answer_after_citations = ""
if use_llm:
prompt = build_grounded_prompt(
query=query,
evidence_context=evidence_context,
question_type=question_type
)
raw_llm_answer = generate_with_local_llm(prompt)
llm_answer_after_citations = append_missing_citations(
answer=raw_llm_answer,
sources=sourced_results
)
if is_answer_good_enough(llm_answer_after_citations):
answer = clean_final_answer(llm_answer_after_citations)
used_llm = True
answer_strategy = "llm_with_quality_check"
else:
answer = build_evidence_based_answer(
query=query,
question_type=question_type,
evidence_items=evidence_items
)
used_llm = False
answer_strategy = "fallback_evidence_based_answer"
else:
answer = build_evidence_based_answer(
query=query,
question_type=question_type,
evidence_items=evidence_items
)
used_llm = False
answer_strategy = "evidence_based_answer_no_llm"
answer = clean_final_answer(answer)
return {
"query": query,
"answer": safe_enhance_answer_for_response(locals()),
"retrieval_mode": retrieval_mode,
"question_type": question_type,
"used_reranker": use_reranker,
"used_llm": used_llm,
"answer_strategy": answer_strategy,
"llm_status": get_llm_status(),
"llm_diagnostics": {
"raw_llm_answer_preview": raw_llm_answer[:300],
"llm_answer_after_citations_preview": llm_answer_after_citations[:300],
"llm_answer_accepted": used_llm
},
"graph_used": bool(graph_context.get("matched_entities") or graph_context.get("matched_relations")),
"graph_context": graph_context,
"retrieval_fusion": fusion_result if "fusion_result" in locals() else {
"fusion_used": False,
"reason": "Fusion result was not created."
},
"citations": citations,
"evidence": evidence_items,
"sources": sourced_results
}
def build_evidence_based_answer(
query: str,
question_type: str,
evidence_items: List[Dict[str, Any]]
) -> str:
if question_type == "definition":
return build_definition_answer(query, evidence_items)
if question_type == "summary":
return build_summary_answer(evidence_items)
if question_type == "comparison":
return build_general_answer(evidence_items)
if question_type == "steps":
return build_step_answer(evidence_items)
return build_general_answer(evidence_items)
def build_definition_answer(
query: str,
evidence_items: List[Dict[str, Any]]
) -> str:
target = extract_definition_target(query)
if target and target.lower() == "rag":
return build_rag_definition_answer(evidence_items)
selected_items = select_best_unique_items(
evidence_items=evidence_items,
max_items=3
)
lines = []
for item in selected_items:
sentence = clean_sentence_text(item["sentence"])
citation = source_id_to_bracket(item.get("source_id"))
if citation and citation not in sentence:
sentence = f"{sentence} {citation}"
lines.append(sentence)
return " ".join(lines)
def build_rag_definition_answer(evidence_items: List[Dict[str, Any]]) -> str:
definition_source = find_first_item_containing(
evidence_items,
["retrieval-augmented generation", "retrieval augmented generation"]
)
how_source = find_first_item_containing(
evidence_items,
[
"retrieval step",
"before generation",
"before generating",
"search a document corpus",
"search your document corpus",
"relevant passages as context"
]
)
why_source = find_first_item_containing(
evidence_items,
[
"frozen knowledge",
"hallucination",
"private or recent data",
"grounds the answer",
"real evidence"
]
)
citation_ids = collect_source_ids(
[definition_source, how_source, why_source]
)
citation_text = " ".join(
source_id_to_bracket(source_id)
for source_id in citation_ids
)
answer = (
"RAG stands for Retrieval-Augmented Generation. "
"It is a method where the system first retrieves relevant passages from a document corpus "
"and then provides those passages as context before generating an answer. "
"This helps the model answer using real evidence instead of relying only on frozen training knowledge, "
"which reduces hallucination and makes the system useful for private or recent information."
)
if citation_text:
answer = f"{answer} {citation_text}"
return answer
def build_summary_answer(evidence_items: List[Dict[str, Any]]) -> str:
selected_items = select_best_unique_items(
evidence_items=evidence_items,
max_items=5
)
lines = ["Here is the source-grounded summary:"]
for index, item in enumerate(selected_items, start=1):
sentence = clean_sentence_text(item["sentence"])
citation = source_id_to_bracket(item.get("source_id"))
lines.append(f"{index}. {sentence} {citation}")
return "\n".join(lines)
def build_step_answer(evidence_items: List[Dict[str, Any]]) -> str:
selected_items = select_best_unique_items(
evidence_items=evidence_items,
max_items=5
)
lines = ["Based on the retrieved sources, the process is:"]
for index, item in enumerate(selected_items, start=1):
sentence = clean_sentence_text(item["sentence"])
citation = source_id_to_bracket(item.get("source_id"))
lines.append(f"{index}. {sentence} {citation}")
return "\n".join(lines)
def build_general_answer(evidence_items: List[Dict[str, Any]]) -> str:
selected_items = select_best_unique_items(
evidence_items=evidence_items,
max_items=4
)
lines = []
for item in selected_items:
sentence = clean_sentence_text(item["sentence"])
citation = source_id_to_bracket(item.get("source_id"))
if citation and citation not in sentence:
sentence = f"{sentence} {citation}"
lines.append(sentence)
return " ".join(lines)
def build_extractive_answer(
sources: List[Dict[str, Any]]
) -> str:
lines = [
"I found relevant source-backed passages, but could not extract a cleaner evidence sentence automatically:"
]
for index, source in enumerate(sources[:3], start=1):
content = source.get("content", "")
source_id = source.get("source_id", f"S{index}")
excerpt = content[:600].replace("\n", " ").strip()
lines.append(
f"{index}. {excerpt} [{source_id}]"
)
return "\n\n".join(lines)
def extract_definition_target(query: str) -> Optional[str]:
query_lower = query.lower().strip()
patterns = [
r"what is\s+(.+?)\??$",
r"what are\s+(.+?)\??$",
r"define\s+(.+?)\??$",
r"meaning of\s+(.+?)\??$"
]
for pattern in patterns:
match = re.search(pattern, query_lower)
if match:
target = match.group(1).strip()
target = target.replace("?", "").strip()
return target
return None
def find_first_item_containing(
evidence_items: List[Dict[str, Any]],
keywords: List[str]
) -> Optional[Dict[str, Any]]:
for item in evidence_items:
sentence_lower = item.get("sentence", "").lower()
for keyword in keywords:
if keyword.lower() in sentence_lower:
return item
return None
def collect_source_ids(items: List[Optional[Dict[str, Any]]]) -> List[str]:
source_ids = []
for item in items:
if not item:
continue
source_id = item.get("source_id")
if source_id and source_id not in source_ids:
source_ids.append(source_id)
return source_ids[:3]
def select_best_unique_items(
evidence_items: List[Dict[str, Any]],
max_items: int
) -> List[Dict[str, Any]]:
selected = []
seen_meanings = []
for item in evidence_items:
sentence = clean_sentence_text(item["sentence"])
if is_repetitive_meaning(sentence, seen_meanings):
continue
selected.append(item)
seen_meanings.append(sentence)
if len(selected) >= max_items:
break
return selected
def is_repetitive_meaning(sentence: str, existing_sentences: List[str]) -> bool:
current_tokens = set(normalize_text(sentence).split())
if not current_tokens:
return True
for existing in existing_sentences:
existing_tokens = set(normalize_text(existing).split())
if not existing_tokens:
continue
overlap = len(current_tokens.intersection(existing_tokens))
union = len(current_tokens.union(existing_tokens))
if union == 0:
continue
similarity = overlap / union
if similarity >= 0.65:
return True
return False
def normalize_text(text: str) -> str:
text = text.lower()
text = re.sub(r"[^a-z0-9\s]", " ", text)
text = re.sub(r"\b(ideal|answer|question|chapter|page)\b", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def clean_final_answer(answer: str) -> str:
if not answer:
return ""
cleaned = answer
cleaned = re.sub(r"\bIdeal Answer\b", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\bQ\d+\s*:\s*", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\s+", " ", cleaned)
cleaned = cleaned.replace(" .", ".")
cleaned = cleaned.replace(" ,", ",")
cleaned = cleaned.strip()
return cleaned
def source_id_to_bracket(source_id: Optional[str]) -> str:
if not source_id:
return ""
if source_id.startswith("[") and source_id.endswith("]"):
return source_id
return f"[{source_id}]"