""" Compose a grounded answer from retrieved chunks with verbatim quotes + citations. This module defines a *deterministic, reproducible* pipeline that never invents facts. """ from typing import List, Dict import re STOP_WORDS = { "the", "and", "a", "an", "of", "to", "in", "for", "on", "at", "with", "about", "by", "from", "is", "it", "this", "that", "these", "those", "be", "was", "were", "am", "are", "as", "or", "if", "but", "what", "which", "who", "whom", "when", "where", "why", "how", "does", "do", "did", "can", "could", "would", "should", "may", "might", "so" } def _tokenize(text: str) -> List[str]: """Lowercase tokenization with stop-word removal.""" tokens = re.findall(r"\b\w+\b", text.lower()) return [tok for tok in tokens if tok not in STOP_WORDS] def segment_sentences(text: str) -> List[str]: """ Split text into sentences using punctuation boundaries. Returns non-empty sentences (minimum 20 chars or contains query terms). """ # Split on sentence boundaries (. ! ?) while keeping punctuation sentences = re.split(r'([.!?]+)', text) # Recombine sentences with their punctuation result = [] for i in range(0, len(sentences) - 1, 2): if i + 1 < len(sentences): sentence = (sentences[i] + sentences[i + 1]).strip() else: sentence = sentences[i].strip() # Keep sentences that are at least 20 chars or contain meaningful content if len(sentence) >= 20: result.append(sentence) return result if result else [text] # Fallback to full text if no sentences found def score_sentence(query: str, sentence: str, sent_vec=None) -> float: # sent_vec reserved for future embedding-based scoring """ Score how well a sentence supports the query. Combines token overlap, precision, and simple phrase heuristics for a 0-1 score. Purely lexical to stay deterministic/offline. """ query_tokens = _tokenize(query) sentence_tokens = _tokenize(sentence) if not query_tokens or not sentence_tokens: return 0.0 q_set = set(query_tokens) s_set = set(sentence_tokens) overlap = q_set & s_set coverage = len(overlap) / len(q_set) precision = len(overlap) / len(s_set) # Favor sentences that contain the unmodified question focus normalized_query = re.sub(r"[^a-z0-9\s]", " ", query.lower()).strip() phrase_bonus = 0.0 if normalized_query and normalized_query in sentence.lower(): phrase_bonus = 0.2 # Reward matching proper nouns (character names, places, etc.) query_propers = {w.lower() for w in re.findall(r"\b[A-Z][a-z]+\b", query)} sent_lower = sentence.lower() proper_bonus = 0.0 if query_propers: matches = sum(1 for name in query_propers if name in sent_lower) if matches: proper_bonus = min(0.2, matches * 0.05) # Penalize sentences that only match names but not topical words content_query_tokens = q_set - query_propers if overlap and content_query_tokens and not (overlap & content_query_tokens): name_only_penalty = 0.4 else: name_only_penalty = 1.0 # Check for focus terms (longer words that are more meaningful) # But don't completely reject if they're missing - just penalize non_name_terms = q_set - query_propers if non_name_terms: focus_terms = {tok for tok in non_name_terms if len(tok) >= 4} # Lowered from 5 to 4 else: focus_terms = {tok for tok in q_set if len(tok) >= 4} # Lowered from 5 to 4 # Penalize if no focus terms match, but don't return 0.0 focus_penalty = 0.3 if (focus_terms and not (focus_terms & s_set)) else 1.0 # Prefer sentences roughly tweet-length (avoid super short/long) length = len(sentence) if 60 <= length <= 280: length_bonus = 0.1 else: length_bonus = 0.0 raw_score = (coverage * 0.6) + (precision * 0.3) + phrase_bonus + proper_bonus + length_bonus score = raw_score * name_only_penalty * focus_penalty return max(0.0, min(1.0, score)) def select_quotes(query: str, retrieved: List[Dict], n: int = 3) -> List[Dict]: """ Select top-N quotes from retrieved chunks with diversity. Segments chunks into sentences, scores them, filters low-signal lines, and keeps diverse evidence. """ all_sentences = [] min_score = 0.05 # Lower threshold to allow more sentences through # For each retrieved chunk, segment and score sentences for item in retrieved: text = item.get('text', '') if not text: continue sentences = segment_sentences(text) for sent in sentences: score = score_sentence(query, sent) if score < min_score: continue all_sentences.append({ 'text': sent.strip(), 'score': score, 'chunk_id': item.get('chunk_id', ''), 'cite': item.get('meta', {}) }) # Sort by score and take top-N all_sentences.sort(key=lambda x: x['score'], reverse=True) # Simple diversity: skip sentences that are too similar to already selected ones selected = [] def _too_similar(a_text: str, b_text: str) -> bool: a_tokens = set(_tokenize(a_text)) b_tokens = set(_tokenize(b_text)) if not a_tokens or not b_tokens: return False jaccard = len(a_tokens & b_tokens) / len(a_tokens | b_tokens) return jaccard > 0.8 for sent_data in all_sentences: if len(selected) >= n: break is_duplicate = any( sent_data['chunk_id'] == existing['chunk_id'] and _too_similar(sent_data['text'], existing['text']) for existing in selected ) if not is_duplicate: selected.append(sent_data) # Fallback: if filtering removed everything, use the best sentences regardless of score if not selected and all_sentences: selected = all_sentences[:n] # Final fallback: if still no quotes, take first sentences from retrieved chunks if not selected and retrieved: for item in retrieved[:n]: text = item.get('text', '') if text: sentences = segment_sentences(text) if sentences: selected.append({ 'text': sentences[0].strip(), 'score': 0.1, # Low score but still included 'chunk_id': item.get('chunk_id', ''), 'cite': item.get('meta', {}) }) return selected[:n] def synthesize_answer(query: str, quotes: List[Dict]) -> str: """ Compose a short synthetic answer that references selected quotes. Deterministic template with proper grammar + explicit evidence callouts. """ if not quotes: return "I couldn't find relevant information to answer this question." query_clean = query.strip().rstrip("?") intro = f"Here's what the text says about “{query_clean}”:" bullet_lines = [] for i, quote in enumerate(quotes, 1): text = " ".join(quote['text'].split()) if len(text) > 200: text = text[:200].rstrip() + "..." cite = quote.get('cite') or {} location = "" if cite: book = cite.get('book') para_start = cite.get('para_idx_start') para_end = cite.get('para_idx_end') if book and para_start is not None and para_end is not None: location = f" ({book.title()}, paragraphs {para_start}-{para_end})" bullet_lines.append(f"[{i}] {text}{location}") body = "\n".join(bullet_lines) closing = "Together these cited passages directly answer the question." return f"{intro}\n\n{body}\n\n{closing}" def render_citations(quotes: List[Dict]) -> List[str]: """ Render citations block for UI. Format: [n] short_snippet — source (book), location (paragraphs). """ citations = [] for i, quote in enumerate(quotes, 1): text = quote['text'] # Shorten to ~200 chars with ellipses if len(text) > 200: text = text[:200] + "..." cite = quote.get('cite', {}) book = cite.get('book', 'unknown') para_start = cite.get('para_idx_start', '?') para_end = cite.get('para_idx_end', '?') citation = f"[{i}] {text} — {book.title()}, paragraphs {para_start}-{para_end}" citations.append(citation) return citations def compose_answer(query: str, retrieved: List[Dict], max_quotes: int = 3) -> Dict: """ Main composition entrypoint called by app layer. Returns structured payload for UI. """ if not retrieved: return { 'answer': "I couldn't find any relevant information to answer this question.", 'quotes': [], 'references': [] } # Select top quotes quotes = select_quotes(query, retrieved, n=max_quotes) # Synthesize answer answer = synthesize_answer(query, quotes) # Render citations references = render_citations(quotes) return { 'answer': answer, 'quotes': quotes, 'references': references }