| from __future__ import annotations |
|
|
| import json |
| import re |
| from difflib import SequenceMatcher |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
|
|
| class QuestionSupportBank: |
| """Load and retrieve authored question support entries with strong matching.""" |
|
|
| def __init__(self, data_path: Optional[str] = None) -> None: |
| base_dir = Path(__file__).resolve().parent |
| self.data_path = Path(data_path) if data_path else base_dir / "data" / "question_support_bank.jsonl" |
| self._loaded = False |
| self._items: List[Dict[str, Any]] = [] |
| self._by_id: Dict[str, Dict[str, Any]] = {} |
| self._by_text: Dict[str, Dict[str, Any]] = {} |
| self._by_signature: Dict[str, Dict[str, Any]] = {} |
| self._by_unordered_signature: Dict[str, Dict[str, Any]] = {} |
|
|
| def _normalize(self, text: Optional[str]) -> str: |
| cleaned = (text or "").strip().lower() |
| cleaned = cleaned.replace("’", "'").replace("“", '"').replace("”", '"') |
| cleaned = cleaned.replace("−", "-").replace("–", "-") |
| cleaned = re.sub(r"\s+", " ", cleaned) |
| cleaned = re.sub(r"\s*([=+\-*/:,;()])\s*", r"\1", cleaned) |
| return cleaned.strip() |
|
|
| def _tokenize(self, text: Optional[str]) -> List[str]: |
| return re.findall(r"[a-z0-9%/.]+", self._normalize(text)) |
|
|
| def _normalize_choice(self, value: Any) -> str: |
| return self._normalize(str(value) if value is not None else "") |
|
|
| def _coerce_choices(self, choices: Optional[List[Any]]) -> List[str]: |
| if not choices: |
| return [] |
| out: List[str] = [] |
| for choice in choices: |
| normalized = self._normalize_choice(choice) |
| if normalized: |
| out.append(normalized) |
| return out |
|
|
| def _choice_signature(self, choices: Optional[List[Any]], *, ordered: bool = True) -> str: |
| cleaned = self._coerce_choices(choices) |
| if not ordered: |
| cleaned = sorted(cleaned) |
| return " || ".join(cleaned) |
|
|
| def _question_signature(self, question_text: Optional[str], choices: Optional[List[Any]] = None, *, ordered: bool = True) -> str: |
| q = self._normalize(question_text) |
| c = self._choice_signature(choices, ordered=ordered) |
| return f"{q} ## {c}" if c else q |
|
|
| def load(self) -> None: |
| self._items = [] |
| self._by_id = {} |
| self._by_text = {} |
| self._by_signature = {} |
| self._by_unordered_signature = {} |
|
|
| if self.data_path.exists(): |
| with self.data_path.open("r", encoding="utf-8") as handle: |
| for raw_line in handle: |
| line = raw_line.strip() |
| if not line: |
| continue |
| try: |
| item = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| self._store_item(item) |
|
|
| self._loaded = True |
|
|
| def _ensure_loaded(self) -> None: |
| if not self._loaded: |
| self.load() |
|
|
| def _store_item(self, item: Dict[str, Any]) -> None: |
| if not isinstance(item, dict): |
| return |
| stored = dict(item) |
| stem = stored.get("question_text") or stored.get("stem") or "" |
| choices = stored.get("options_text") or stored.get("choices") or [] |
| qid = str(stored.get("question_id") or "").strip() |
| normalized_text = self._normalize(stem) |
| signature = self._question_signature(stem, choices, ordered=True) |
| unordered_signature = self._question_signature(stem, choices, ordered=False) |
|
|
| if qid: |
| self._by_id[qid] = stored |
| if normalized_text: |
| self._by_text[normalized_text] = stored |
| if signature: |
| self._by_signature[signature] = stored |
| if unordered_signature: |
| self._by_unordered_signature[unordered_signature] = stored |
| self._items.append(stored) |
|
|
| def _candidate_stats(self, *, query_text: str, query_choices: Optional[List[Any]], candidate: Dict[str, Any]) -> Dict[str, float]: |
| cand_text = candidate.get("question_text") or candidate.get("stem") or "" |
| cand_choices = candidate.get("options_text") or candidate.get("choices") or [] |
|
|
| norm_query = self._normalize(query_text) |
| norm_cand = self._normalize(cand_text) |
| text_exact = 1.0 if norm_query and norm_query == norm_cand else 0.0 |
| text_ratio = SequenceMatcher(None, norm_query, norm_cand).ratio() if norm_query and norm_cand else 0.0 |
|
|
| q_tokens = set(self._tokenize(query_text)) |
| c_tokens = set(self._tokenize(cand_text)) |
| token_overlap = len(q_tokens & c_tokens) / max(len(q_tokens | c_tokens), 1) if q_tokens and c_tokens else 0.0 |
|
|
| q_sig = self._choice_signature(query_choices, ordered=True) |
| c_sig = self._choice_signature(cand_choices, ordered=True) |
| q_unsig = self._choice_signature(query_choices, ordered=False) |
| c_unsig = self._choice_signature(cand_choices, ordered=False) |
| ordered_choice_match = 1.0 if q_sig and c_sig and q_sig == c_sig else 0.0 |
| unordered_choice_match = 1.0 if q_unsig and c_unsig and q_unsig == c_unsig else 0.0 |
|
|
| score = ( |
| 0.30 * text_exact |
| + 0.28 * text_ratio |
| + 0.22 * token_overlap |
| + 0.12 * ordered_choice_match |
| + 0.08 * unordered_choice_match |
| ) |
| return { |
| "score": score, |
| "text_exact": text_exact, |
| "text_ratio": text_ratio, |
| "token_overlap": token_overlap, |
| "ordered_choice_match": ordered_choice_match, |
| "unordered_choice_match": unordered_choice_match, |
| } |
|
|
| def _annotate(self, item: Dict[str, Any], *, mode: str, stats: Optional[Dict[str, float]] = None) -> Dict[str, Any]: |
| out = dict(item) |
| out["support_match"] = {"mode": mode} |
| if stats: |
| out["support_match"].update({k: round(v, 4) for k, v in stats.items()}) |
| return out |
|
|
| def get(self, question_id: Optional[str] = None, question_text: Optional[str] = None, options_text: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]: |
| self._ensure_loaded() |
| qid = str(question_id or "").strip() |
| if qid and qid in self._by_id: |
| return self._annotate(self._by_id[qid], mode="question_id") |
|
|
| signature = self._question_signature(question_text, options_text, ordered=True) |
| if signature and signature in self._by_signature: |
| return self._annotate(self._by_signature[signature], mode="signature_exact") |
|
|
| qtext = self._normalize(question_text) |
| if qtext and qtext in self._by_text: |
| return self._annotate(self._by_text[qtext], mode="text_exact") |
|
|
| unordered_signature = self._question_signature(question_text, options_text, ordered=False) |
| if unordered_signature and unordered_signature in self._by_unordered_signature: |
| return self._annotate(self._by_unordered_signature[unordered_signature], mode="signature_unordered") |
|
|
| if not qtext: |
| return None |
|
|
| best_item: Optional[Dict[str, Any]] = None |
| best_stats: Optional[Dict[str, float]] = None |
| best_score = 0.0 |
| for item in self._items: |
| stats = self._candidate_stats(query_text=question_text or "", query_choices=options_text, candidate=item) |
| score = stats["score"] |
| if score > best_score: |
| best_item = item |
| best_stats = stats |
| best_score = score |
|
|
| if not best_item or not best_stats: |
| return None |
|
|
| strong_choice = best_stats["ordered_choice_match"] >= 1.0 or best_stats["unordered_choice_match"] >= 1.0 |
| threshold = 0.70 if strong_choice else 0.82 |
| if best_stats["text_exact"] >= 1.0: |
| threshold = min(threshold, 0.55) |
| elif best_stats["text_ratio"] >= 0.94: |
| threshold = min(threshold, 0.68) |
| elif best_stats["token_overlap"] >= 0.75: |
| threshold = min(threshold, 0.74) |
|
|
| if best_score >= threshold: |
| return self._annotate(best_item, mode="fuzzy", stats=best_stats) |
| return None |
|
|
| def upsert(self, item: Dict[str, Any]) -> None: |
| self._ensure_loaded() |
| self._store_item(item) |
|
|
| def all_items(self) -> List[Dict[str, Any]]: |
| self._ensure_loaded() |
| return [dict(v) for v in self._items] |
|
|
|
|
| question_support_bank = QuestionSupportBank() |
|
|