Spaces:
Sleeping
Sleeping
| """ | |
| Compose a grounded answer from retrieved chunks with verbatim quotes + citations. | |
| This module defines a *deterministic, reproducible* pipeline that never invents facts. | |
| """ | |
| from typing import List, Dict | |
| import re | |
| STOP_WORDS = { | |
| "the", "and", "a", "an", "of", "to", "in", "for", "on", "at", "with", | |
| "about", "by", "from", "is", "it", "this", "that", "these", "those", | |
| "be", "was", "were", "am", "are", "as", "or", "if", "but", "what", | |
| "which", "who", "whom", "when", "where", "why", "how", "does", "do", | |
| "did", "can", "could", "would", "should", "may", "might", "so" | |
| } | |
| def _tokenize(text: str) -> List[str]: | |
| """Lowercase tokenization with stop-word removal.""" | |
| tokens = re.findall(r"\b\w+\b", text.lower()) | |
| return [tok for tok in tokens if tok not in STOP_WORDS] | |
| def segment_sentences(text: str) -> List[str]: | |
| """ | |
| Split text into sentences using punctuation boundaries. | |
| Returns non-empty sentences (minimum 20 chars or contains query terms). | |
| """ | |
| # Split on sentence boundaries (. ! ?) while keeping punctuation | |
| sentences = re.split(r'([.!?]+)', text) | |
| # Recombine sentences with their punctuation | |
| result = [] | |
| for i in range(0, len(sentences) - 1, 2): | |
| if i + 1 < len(sentences): | |
| sentence = (sentences[i] + sentences[i + 1]).strip() | |
| else: | |
| sentence = sentences[i].strip() | |
| # Keep sentences that are at least 20 chars or contain meaningful content | |
| if len(sentence) >= 20: | |
| result.append(sentence) | |
| return result if result else [text] # Fallback to full text if no sentences found | |
| def score_sentence(query: str, sentence: str, sent_vec=None) -> float: # sent_vec reserved for future embedding-based scoring | |
| """ | |
| Score how well a sentence supports the query. | |
| Combines token overlap, precision, and simple phrase heuristics | |
| for a 0-1 score. Purely lexical to stay deterministic/offline. | |
| """ | |
| query_tokens = _tokenize(query) | |
| sentence_tokens = _tokenize(sentence) | |
| if not query_tokens or not sentence_tokens: | |
| return 0.0 | |
| q_set = set(query_tokens) | |
| s_set = set(sentence_tokens) | |
| overlap = q_set & s_set | |
| coverage = len(overlap) / len(q_set) | |
| precision = len(overlap) / len(s_set) | |
| # Favor sentences that contain the unmodified question focus | |
| normalized_query = re.sub(r"[^a-z0-9\s]", " ", query.lower()).strip() | |
| phrase_bonus = 0.0 | |
| if normalized_query and normalized_query in sentence.lower(): | |
| phrase_bonus = 0.2 | |
| # Reward matching proper nouns (character names, places, etc.) | |
| query_propers = {w.lower() for w in re.findall(r"\b[A-Z][a-z]+\b", query)} | |
| sent_lower = sentence.lower() | |
| proper_bonus = 0.0 | |
| if query_propers: | |
| matches = sum(1 for name in query_propers if name in sent_lower) | |
| if matches: | |
| proper_bonus = min(0.2, matches * 0.05) | |
| # Penalize sentences that only match names but not topical words | |
| content_query_tokens = q_set - query_propers | |
| if overlap and content_query_tokens and not (overlap & content_query_tokens): | |
| name_only_penalty = 0.4 | |
| else: | |
| name_only_penalty = 1.0 | |
| # Check for focus terms (longer words that are more meaningful) | |
| # But don't completely reject if they're missing - just penalize | |
| non_name_terms = q_set - query_propers | |
| if non_name_terms: | |
| focus_terms = {tok for tok in non_name_terms if len(tok) >= 4} # Lowered from 5 to 4 | |
| else: | |
| focus_terms = {tok for tok in q_set if len(tok) >= 4} # Lowered from 5 to 4 | |
| # Penalize if no focus terms match, but don't return 0.0 | |
| focus_penalty = 0.3 if (focus_terms and not (focus_terms & s_set)) else 1.0 | |
| # Prefer sentences roughly tweet-length (avoid super short/long) | |
| length = len(sentence) | |
| if 60 <= length <= 280: | |
| length_bonus = 0.1 | |
| else: | |
| length_bonus = 0.0 | |
| raw_score = (coverage * 0.6) + (precision * 0.3) + phrase_bonus + proper_bonus + length_bonus | |
| score = raw_score * name_only_penalty * focus_penalty | |
| return max(0.0, min(1.0, score)) | |
| def select_quotes(query: str, retrieved: List[Dict], n: int = 3) -> List[Dict]: | |
| """ | |
| Select top-N quotes from retrieved chunks with diversity. | |
| Segments chunks into sentences, scores them, filters low-signal | |
| lines, and keeps diverse evidence. | |
| """ | |
| all_sentences = [] | |
| min_score = 0.05 # Lower threshold to allow more sentences through | |
| # For each retrieved chunk, segment and score sentences | |
| for item in retrieved: | |
| text = item.get('text', '') | |
| if not text: | |
| continue | |
| sentences = segment_sentences(text) | |
| for sent in sentences: | |
| score = score_sentence(query, sent) | |
| if score < min_score: | |
| continue | |
| all_sentences.append({ | |
| 'text': sent.strip(), | |
| 'score': score, | |
| 'chunk_id': item.get('chunk_id', ''), | |
| 'cite': item.get('meta', {}) | |
| }) | |
| # Sort by score and take top-N | |
| all_sentences.sort(key=lambda x: x['score'], reverse=True) | |
| # Simple diversity: skip sentences that are too similar to already selected ones | |
| selected = [] | |
| def _too_similar(a_text: str, b_text: str) -> bool: | |
| a_tokens = set(_tokenize(a_text)) | |
| b_tokens = set(_tokenize(b_text)) | |
| if not a_tokens or not b_tokens: | |
| return False | |
| jaccard = len(a_tokens & b_tokens) / len(a_tokens | b_tokens) | |
| return jaccard > 0.8 | |
| for sent_data in all_sentences: | |
| if len(selected) >= n: | |
| break | |
| is_duplicate = any( | |
| sent_data['chunk_id'] == existing['chunk_id'] and _too_similar(sent_data['text'], existing['text']) | |
| for existing in selected | |
| ) | |
| if not is_duplicate: | |
| selected.append(sent_data) | |
| # Fallback: if filtering removed everything, use the best sentences regardless of score | |
| if not selected and all_sentences: | |
| selected = all_sentences[:n] | |
| # Final fallback: if still no quotes, take first sentences from retrieved chunks | |
| if not selected and retrieved: | |
| for item in retrieved[:n]: | |
| text = item.get('text', '') | |
| if text: | |
| sentences = segment_sentences(text) | |
| if sentences: | |
| selected.append({ | |
| 'text': sentences[0].strip(), | |
| 'score': 0.1, # Low score but still included | |
| 'chunk_id': item.get('chunk_id', ''), | |
| 'cite': item.get('meta', {}) | |
| }) | |
| return selected[:n] | |
| def synthesize_answer(query: str, quotes: List[Dict]) -> str: | |
| """ | |
| Compose a short synthetic answer that references selected quotes. | |
| Deterministic template with proper grammar + explicit evidence callouts. | |
| """ | |
| if not quotes: | |
| return "I couldn't find relevant information to answer this question." | |
| query_clean = query.strip().rstrip("?") | |
| intro = f"Here's what the text says about β{query_clean}β:" | |
| bullet_lines = [] | |
| for i, quote in enumerate(quotes, 1): | |
| text = " ".join(quote['text'].split()) | |
| if len(text) > 200: | |
| text = text[:200].rstrip() + "..." | |
| cite = quote.get('cite') or {} | |
| location = "" | |
| if cite: | |
| book = cite.get('book') | |
| para_start = cite.get('para_idx_start') | |
| para_end = cite.get('para_idx_end') | |
| if book and para_start is not None and para_end is not None: | |
| location = f" ({book.title()}, paragraphs {para_start}-{para_end})" | |
| bullet_lines.append(f"[{i}] {text}{location}") | |
| body = "\n".join(bullet_lines) | |
| closing = "Together these cited passages directly answer the question." | |
| return f"{intro}\n\n{body}\n\n{closing}" | |
| def render_citations(quotes: List[Dict]) -> List[str]: | |
| """ | |
| Render citations block for UI. | |
| Format: [n] short_snippet β source (book), location (paragraphs). | |
| """ | |
| citations = [] | |
| for i, quote in enumerate(quotes, 1): | |
| text = quote['text'] | |
| # Shorten to ~200 chars with ellipses | |
| if len(text) > 200: | |
| text = text[:200] + "..." | |
| cite = quote.get('cite', {}) | |
| book = cite.get('book', 'unknown') | |
| para_start = cite.get('para_idx_start', '?') | |
| para_end = cite.get('para_idx_end', '?') | |
| citation = f"[{i}] {text} β {book.title()}, paragraphs {para_start}-{para_end}" | |
| citations.append(citation) | |
| return citations | |
| def compose_answer(query: str, retrieved: List[Dict], max_quotes: int = 3) -> Dict: | |
| """ | |
| Main composition entrypoint called by app layer. | |
| Returns structured payload for UI. | |
| """ | |
| if not retrieved: | |
| return { | |
| 'answer': "I couldn't find any relevant information to answer this question.", | |
| 'quotes': [], | |
| 'references': [] | |
| } | |
| # Select top quotes | |
| quotes = select_quotes(query, retrieved, n=max_quotes) | |
| # Synthesize answer | |
| answer = synthesize_answer(query, quotes) | |
| # Render citations | |
| references = render_citations(quotes) | |
| return { | |
| 'answer': answer, | |
| 'quotes': quotes, | |
| 'references': references | |
| } | |