from __future__ import annotations import json import re from difflib import SequenceMatcher from pathlib import Path from typing import Any, Dict, List, Optional class QuestionSupportBank: """Load and retrieve authored question support entries with strong matching.""" def __init__(self, data_path: Optional[str] = None) -> None: base_dir = Path(__file__).resolve().parent self.data_path = Path(data_path) if data_path else base_dir / "data" / "question_support_bank.jsonl" self._loaded = False self._items: List[Dict[str, Any]] = [] self._by_id: Dict[str, Dict[str, Any]] = {} self._by_text: Dict[str, Dict[str, Any]] = {} self._by_signature: Dict[str, Dict[str, Any]] = {} self._by_unordered_signature: Dict[str, Dict[str, Any]] = {} def _normalize(self, text: Optional[str]) -> str: cleaned = (text or "").strip().lower() cleaned = cleaned.replace("’", "'").replace("“", '"').replace("”", '"') cleaned = cleaned.replace("−", "-").replace("–", "-") cleaned = re.sub(r"\s+", " ", cleaned) cleaned = re.sub(r"\s*([=+\-*/:,;()])\s*", r"\1", cleaned) return cleaned.strip() def _tokenize(self, text: Optional[str]) -> List[str]: return re.findall(r"[a-z0-9%/.]+", self._normalize(text)) def _normalize_choice(self, value: Any) -> str: return self._normalize(str(value) if value is not None else "") def _coerce_choices(self, choices: Optional[List[Any]]) -> List[str]: if not choices: return [] out: List[str] = [] for choice in choices: normalized = self._normalize_choice(choice) if normalized: out.append(normalized) return out def _choice_signature(self, choices: Optional[List[Any]], *, ordered: bool = True) -> str: cleaned = self._coerce_choices(choices) if not ordered: cleaned = sorted(cleaned) return " || ".join(cleaned) def _question_signature(self, question_text: Optional[str], choices: Optional[List[Any]] = None, *, ordered: bool = True) -> str: q = self._normalize(question_text) c = self._choice_signature(choices, ordered=ordered) return f"{q} ## {c}" if c else q def load(self) -> None: self._items = [] self._by_id = {} self._by_text = {} self._by_signature = {} self._by_unordered_signature = {} if self.data_path.exists(): with self.data_path.open("r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line: continue try: item = json.loads(line) except json.JSONDecodeError: continue self._store_item(item) self._loaded = True def _ensure_loaded(self) -> None: if not self._loaded: self.load() def _store_item(self, item: Dict[str, Any]) -> None: if not isinstance(item, dict): return stored = dict(item) stem = stored.get("question_text") or stored.get("stem") or "" choices = stored.get("options_text") or stored.get("choices") or [] qid = str(stored.get("question_id") or "").strip() normalized_text = self._normalize(stem) signature = self._question_signature(stem, choices, ordered=True) unordered_signature = self._question_signature(stem, choices, ordered=False) if qid: self._by_id[qid] = stored if normalized_text: self._by_text[normalized_text] = stored if signature: self._by_signature[signature] = stored if unordered_signature: self._by_unordered_signature[unordered_signature] = stored self._items.append(stored) def _candidate_stats(self, *, query_text: str, query_choices: Optional[List[Any]], candidate: Dict[str, Any]) -> Dict[str, float]: cand_text = candidate.get("question_text") or candidate.get("stem") or "" cand_choices = candidate.get("options_text") or candidate.get("choices") or [] norm_query = self._normalize(query_text) norm_cand = self._normalize(cand_text) text_exact = 1.0 if norm_query and norm_query == norm_cand else 0.0 text_ratio = SequenceMatcher(None, norm_query, norm_cand).ratio() if norm_query and norm_cand else 0.0 q_tokens = set(self._tokenize(query_text)) c_tokens = set(self._tokenize(cand_text)) token_overlap = len(q_tokens & c_tokens) / max(len(q_tokens | c_tokens), 1) if q_tokens and c_tokens else 0.0 q_sig = self._choice_signature(query_choices, ordered=True) c_sig = self._choice_signature(cand_choices, ordered=True) q_unsig = self._choice_signature(query_choices, ordered=False) c_unsig = self._choice_signature(cand_choices, ordered=False) ordered_choice_match = 1.0 if q_sig and c_sig and q_sig == c_sig else 0.0 unordered_choice_match = 1.0 if q_unsig and c_unsig and q_unsig == c_unsig else 0.0 score = ( 0.30 * text_exact + 0.28 * text_ratio + 0.22 * token_overlap + 0.12 * ordered_choice_match + 0.08 * unordered_choice_match ) return { "score": score, "text_exact": text_exact, "text_ratio": text_ratio, "token_overlap": token_overlap, "ordered_choice_match": ordered_choice_match, "unordered_choice_match": unordered_choice_match, } def _annotate(self, item: Dict[str, Any], *, mode: str, stats: Optional[Dict[str, float]] = None) -> Dict[str, Any]: out = dict(item) out["support_match"] = {"mode": mode} if stats: out["support_match"].update({k: round(v, 4) for k, v in stats.items()}) return out def get(self, question_id: Optional[str] = None, question_text: Optional[str] = None, options_text: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]: self._ensure_loaded() qid = str(question_id or "").strip() if qid and qid in self._by_id: return self._annotate(self._by_id[qid], mode="question_id") signature = self._question_signature(question_text, options_text, ordered=True) if signature and signature in self._by_signature: return self._annotate(self._by_signature[signature], mode="signature_exact") qtext = self._normalize(question_text) if qtext and qtext in self._by_text: return self._annotate(self._by_text[qtext], mode="text_exact") unordered_signature = self._question_signature(question_text, options_text, ordered=False) if unordered_signature and unordered_signature in self._by_unordered_signature: return self._annotate(self._by_unordered_signature[unordered_signature], mode="signature_unordered") if not qtext: return None best_item: Optional[Dict[str, Any]] = None best_stats: Optional[Dict[str, float]] = None best_score = 0.0 for item in self._items: stats = self._candidate_stats(query_text=question_text or "", query_choices=options_text, candidate=item) score = stats["score"] if score > best_score: best_item = item best_stats = stats best_score = score if not best_item or not best_stats: return None strong_choice = best_stats["ordered_choice_match"] >= 1.0 or best_stats["unordered_choice_match"] >= 1.0 threshold = 0.70 if strong_choice else 0.82 if best_stats["text_exact"] >= 1.0: threshold = min(threshold, 0.55) elif best_stats["text_ratio"] >= 0.94: threshold = min(threshold, 0.68) elif best_stats["token_overlap"] >= 0.75: threshold = min(threshold, 0.74) if best_score >= threshold: return self._annotate(best_item, mode="fuzzy", stats=best_stats) return None def upsert(self, item: Dict[str, Any]) -> None: self._ensure_loaded() self._store_item(item) def all_items(self) -> List[Dict[str, Any]]: self._ensure_loaded() return [dict(v) for v in self._items] question_support_bank = QuestionSupportBank()