classics-rag-qa / src /compose.py
Tuminha's picture
Upload src/compose.py with huggingface_hub
46fa8d2 verified
"""
Compose a grounded answer from retrieved chunks with verbatim quotes + citations.
This module defines a *deterministic, reproducible* pipeline that never invents facts.
"""
from typing import List, Dict
import re
STOP_WORDS = {
"the", "and", "a", "an", "of", "to", "in", "for", "on", "at", "with",
"about", "by", "from", "is", "it", "this", "that", "these", "those",
"be", "was", "were", "am", "are", "as", "or", "if", "but", "what",
"which", "who", "whom", "when", "where", "why", "how", "does", "do",
"did", "can", "could", "would", "should", "may", "might", "so"
}
def _tokenize(text: str) -> List[str]:
"""Lowercase tokenization with stop-word removal."""
tokens = re.findall(r"\b\w+\b", text.lower())
return [tok for tok in tokens if tok not in STOP_WORDS]
def segment_sentences(text: str) -> List[str]:
"""
Split text into sentences using punctuation boundaries.
Returns non-empty sentences (minimum 20 chars or contains query terms).
"""
# Split on sentence boundaries (. ! ?) while keeping punctuation
sentences = re.split(r'([.!?]+)', text)
# Recombine sentences with their punctuation
result = []
for i in range(0, len(sentences) - 1, 2):
if i + 1 < len(sentences):
sentence = (sentences[i] + sentences[i + 1]).strip()
else:
sentence = sentences[i].strip()
# Keep sentences that are at least 20 chars or contain meaningful content
if len(sentence) >= 20:
result.append(sentence)
return result if result else [text] # Fallback to full text if no sentences found
def score_sentence(query: str, sentence: str, sent_vec=None) -> float: # sent_vec reserved for future embedding-based scoring
"""
Score how well a sentence supports the query.
Combines token overlap, precision, and simple phrase heuristics
for a 0-1 score. Purely lexical to stay deterministic/offline.
"""
query_tokens = _tokenize(query)
sentence_tokens = _tokenize(sentence)
if not query_tokens or not sentence_tokens:
return 0.0
q_set = set(query_tokens)
s_set = set(sentence_tokens)
overlap = q_set & s_set
coverage = len(overlap) / len(q_set)
precision = len(overlap) / len(s_set)
# Favor sentences that contain the unmodified question focus
normalized_query = re.sub(r"[^a-z0-9\s]", " ", query.lower()).strip()
phrase_bonus = 0.0
if normalized_query and normalized_query in sentence.lower():
phrase_bonus = 0.2
# Reward matching proper nouns (character names, places, etc.)
query_propers = {w.lower() for w in re.findall(r"\b[A-Z][a-z]+\b", query)}
sent_lower = sentence.lower()
proper_bonus = 0.0
if query_propers:
matches = sum(1 for name in query_propers if name in sent_lower)
if matches:
proper_bonus = min(0.2, matches * 0.05)
# Penalize sentences that only match names but not topical words
content_query_tokens = q_set - query_propers
if overlap and content_query_tokens and not (overlap & content_query_tokens):
name_only_penalty = 0.4
else:
name_only_penalty = 1.0
# Check for focus terms (longer words that are more meaningful)
# But don't completely reject if they're missing - just penalize
non_name_terms = q_set - query_propers
if non_name_terms:
focus_terms = {tok for tok in non_name_terms if len(tok) >= 4} # Lowered from 5 to 4
else:
focus_terms = {tok for tok in q_set if len(tok) >= 4} # Lowered from 5 to 4
# Penalize if no focus terms match, but don't return 0.0
focus_penalty = 0.3 if (focus_terms and not (focus_terms & s_set)) else 1.0
# Prefer sentences roughly tweet-length (avoid super short/long)
length = len(sentence)
if 60 <= length <= 280:
length_bonus = 0.1
else:
length_bonus = 0.0
raw_score = (coverage * 0.6) + (precision * 0.3) + phrase_bonus + proper_bonus + length_bonus
score = raw_score * name_only_penalty * focus_penalty
return max(0.0, min(1.0, score))
def select_quotes(query: str, retrieved: List[Dict], n: int = 3) -> List[Dict]:
"""
Select top-N quotes from retrieved chunks with diversity.
Segments chunks into sentences, scores them, filters low-signal
lines, and keeps diverse evidence.
"""
all_sentences = []
min_score = 0.05 # Lower threshold to allow more sentences through
# For each retrieved chunk, segment and score sentences
for item in retrieved:
text = item.get('text', '')
if not text:
continue
sentences = segment_sentences(text)
for sent in sentences:
score = score_sentence(query, sent)
if score < min_score:
continue
all_sentences.append({
'text': sent.strip(),
'score': score,
'chunk_id': item.get('chunk_id', ''),
'cite': item.get('meta', {})
})
# Sort by score and take top-N
all_sentences.sort(key=lambda x: x['score'], reverse=True)
# Simple diversity: skip sentences that are too similar to already selected ones
selected = []
def _too_similar(a_text: str, b_text: str) -> bool:
a_tokens = set(_tokenize(a_text))
b_tokens = set(_tokenize(b_text))
if not a_tokens or not b_tokens:
return False
jaccard = len(a_tokens & b_tokens) / len(a_tokens | b_tokens)
return jaccard > 0.8
for sent_data in all_sentences:
if len(selected) >= n:
break
is_duplicate = any(
sent_data['chunk_id'] == existing['chunk_id'] and _too_similar(sent_data['text'], existing['text'])
for existing in selected
)
if not is_duplicate:
selected.append(sent_data)
# Fallback: if filtering removed everything, use the best sentences regardless of score
if not selected and all_sentences:
selected = all_sentences[:n]
# Final fallback: if still no quotes, take first sentences from retrieved chunks
if not selected and retrieved:
for item in retrieved[:n]:
text = item.get('text', '')
if text:
sentences = segment_sentences(text)
if sentences:
selected.append({
'text': sentences[0].strip(),
'score': 0.1, # Low score but still included
'chunk_id': item.get('chunk_id', ''),
'cite': item.get('meta', {})
})
return selected[:n]
def synthesize_answer(query: str, quotes: List[Dict]) -> str:
"""
Compose a short synthetic answer that references selected quotes.
Deterministic template with proper grammar + explicit evidence callouts.
"""
if not quotes:
return "I couldn't find relevant information to answer this question."
query_clean = query.strip().rstrip("?")
intro = f"Here's what the text says about β€œ{query_clean}”:"
bullet_lines = []
for i, quote in enumerate(quotes, 1):
text = " ".join(quote['text'].split())
if len(text) > 200:
text = text[:200].rstrip() + "..."
cite = quote.get('cite') or {}
location = ""
if cite:
book = cite.get('book')
para_start = cite.get('para_idx_start')
para_end = cite.get('para_idx_end')
if book and para_start is not None and para_end is not None:
location = f" ({book.title()}, paragraphs {para_start}-{para_end})"
bullet_lines.append(f"[{i}] {text}{location}")
body = "\n".join(bullet_lines)
closing = "Together these cited passages directly answer the question."
return f"{intro}\n\n{body}\n\n{closing}"
def render_citations(quotes: List[Dict]) -> List[str]:
"""
Render citations block for UI.
Format: [n] short_snippet β€” source (book), location (paragraphs).
"""
citations = []
for i, quote in enumerate(quotes, 1):
text = quote['text']
# Shorten to ~200 chars with ellipses
if len(text) > 200:
text = text[:200] + "..."
cite = quote.get('cite', {})
book = cite.get('book', 'unknown')
para_start = cite.get('para_idx_start', '?')
para_end = cite.get('para_idx_end', '?')
citation = f"[{i}] {text} β€” {book.title()}, paragraphs {para_start}-{para_end}"
citations.append(citation)
return citations
def compose_answer(query: str, retrieved: List[Dict], max_quotes: int = 3) -> Dict:
"""
Main composition entrypoint called by app layer.
Returns structured payload for UI.
"""
if not retrieved:
return {
'answer': "I couldn't find any relevant information to answer this question.",
'quotes': [],
'references': []
}
# Select top quotes
quotes = select_quotes(query, retrieved, n=max_quotes)
# Synthesize answer
answer = synthesize_answer(query, quotes)
# Render citations
references = render_citations(quotes)
return {
'answer': answer,
'quotes': quotes,
'references': references
}