File size: 8,535 Bytes
ae81756
 
d68661a
efac600
ae81756
d68661a
ae81756
d68661a
 
 
ae81756
 
280a1a8
 
 
d68661a
ae81756
280a1a8
 
efac600
ae81756
280a1a8
 
efac600
ae81756
 
efac600
19bee64
ae81756
efac600
 
ae81756
efac600
 
bcf6874
efac600
ae81756
 
 
 
 
 
 
 
 
 
 
 
 
 
efac600
 
ae81756
bcf6874
ae81756
efac600
d68661a
 
ae81756
d68661a
280a1a8
efac600
ae81756
280a1a8
 
 
 
 
 
 
 
 
 
 
 
d68661a
 
 
bcf6874
 
 
 
280a1a8
 
 
efac600
 
 
ae81756
 
 
 
efac600
280a1a8
efac600
ae81756
 
efac600
 
ae81756
 
efac600
 
ae81756
efac600
 
280a1a8
ae81756
 
 
 
 
bcf6874
 
ae81756
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
280a1a8
 
076a07d
ae81756
 
 
 
 
efac600
bcf6874
 
ae81756
076a07d
ae81756
 
 
076a07d
bcf6874
efac600
 
ae81756
 
bcf6874
efac600
ae81756
 
bcf6874
ae81756
 
bcf6874
ae81756
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcf6874
280a1a8
 
 
 
 
 
 
efac600
280a1a8
 
ae81756
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from __future__ import annotations

import json
import re
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any, Dict, List, Optional


class QuestionSupportBank:
    """Load and retrieve authored question support entries with strong matching."""

    def __init__(self, data_path: Optional[str] = None) -> None:
        base_dir = Path(__file__).resolve().parent
        self.data_path = Path(data_path) if data_path else base_dir / "data" / "question_support_bank.jsonl"
        self._loaded = False
        self._items: List[Dict[str, Any]] = []
        self._by_id: Dict[str, Dict[str, Any]] = {}
        self._by_text: Dict[str, Dict[str, Any]] = {}
        self._by_signature: Dict[str, Dict[str, Any]] = {}
        self._by_unordered_signature: Dict[str, Dict[str, Any]] = {}

    def _normalize(self, text: Optional[str]) -> str:
        cleaned = (text or "").strip().lower()
        cleaned = cleaned.replace("’", "'").replace("“", '"').replace("”", '"')
        cleaned = cleaned.replace("−", "-").replace("–", "-")
        cleaned = re.sub(r"\s+", " ", cleaned)
        cleaned = re.sub(r"\s*([=+\-*/:,;()])\s*", r"\1", cleaned)
        return cleaned.strip()

    def _tokenize(self, text: Optional[str]) -> List[str]:
        return re.findall(r"[a-z0-9%/.]+", self._normalize(text))

    def _normalize_choice(self, value: Any) -> str:
        return self._normalize(str(value) if value is not None else "")

    def _coerce_choices(self, choices: Optional[List[Any]]) -> List[str]:
        if not choices:
            return []
        out: List[str] = []
        for choice in choices:
            normalized = self._normalize_choice(choice)
            if normalized:
                out.append(normalized)
        return out

    def _choice_signature(self, choices: Optional[List[Any]], *, ordered: bool = True) -> str:
        cleaned = self._coerce_choices(choices)
        if not ordered:
            cleaned = sorted(cleaned)
        return " || ".join(cleaned)

    def _question_signature(self, question_text: Optional[str], choices: Optional[List[Any]] = None, *, ordered: bool = True) -> str:
        q = self._normalize(question_text)
        c = self._choice_signature(choices, ordered=ordered)
        return f"{q} ## {c}" if c else q

    def load(self) -> None:
        self._items = []
        self._by_id = {}
        self._by_text = {}
        self._by_signature = {}
        self._by_unordered_signature = {}

        if self.data_path.exists():
            with self.data_path.open("r", encoding="utf-8") as handle:
                for raw_line in handle:
                    line = raw_line.strip()
                    if not line:
                        continue
                    try:
                        item = json.loads(line)
                    except json.JSONDecodeError:
                        continue
                    self._store_item(item)

        self._loaded = True

    def _ensure_loaded(self) -> None:
        if not self._loaded:
            self.load()

    def _store_item(self, item: Dict[str, Any]) -> None:
        if not isinstance(item, dict):
            return
        stored = dict(item)
        stem = stored.get("question_text") or stored.get("stem") or ""
        choices = stored.get("options_text") or stored.get("choices") or []
        qid = str(stored.get("question_id") or "").strip()
        normalized_text = self._normalize(stem)
        signature = self._question_signature(stem, choices, ordered=True)
        unordered_signature = self._question_signature(stem, choices, ordered=False)

        if qid:
            self._by_id[qid] = stored
        if normalized_text:
            self._by_text[normalized_text] = stored
        if signature:
            self._by_signature[signature] = stored
        if unordered_signature:
            self._by_unordered_signature[unordered_signature] = stored
        self._items.append(stored)

    def _candidate_stats(self, *, query_text: str, query_choices: Optional[List[Any]], candidate: Dict[str, Any]) -> Dict[str, float]:
        cand_text = candidate.get("question_text") or candidate.get("stem") or ""
        cand_choices = candidate.get("options_text") or candidate.get("choices") or []

        norm_query = self._normalize(query_text)
        norm_cand = self._normalize(cand_text)
        text_exact = 1.0 if norm_query and norm_query == norm_cand else 0.0
        text_ratio = SequenceMatcher(None, norm_query, norm_cand).ratio() if norm_query and norm_cand else 0.0

        q_tokens = set(self._tokenize(query_text))
        c_tokens = set(self._tokenize(cand_text))
        token_overlap = len(q_tokens & c_tokens) / max(len(q_tokens | c_tokens), 1) if q_tokens and c_tokens else 0.0

        q_sig = self._choice_signature(query_choices, ordered=True)
        c_sig = self._choice_signature(cand_choices, ordered=True)
        q_unsig = self._choice_signature(query_choices, ordered=False)
        c_unsig = self._choice_signature(cand_choices, ordered=False)
        ordered_choice_match = 1.0 if q_sig and c_sig and q_sig == c_sig else 0.0
        unordered_choice_match = 1.0 if q_unsig and c_unsig and q_unsig == c_unsig else 0.0

        score = (
            0.30 * text_exact
            + 0.28 * text_ratio
            + 0.22 * token_overlap
            + 0.12 * ordered_choice_match
            + 0.08 * unordered_choice_match
        )
        return {
            "score": score,
            "text_exact": text_exact,
            "text_ratio": text_ratio,
            "token_overlap": token_overlap,
            "ordered_choice_match": ordered_choice_match,
            "unordered_choice_match": unordered_choice_match,
        }

    def _annotate(self, item: Dict[str, Any], *, mode: str, stats: Optional[Dict[str, float]] = None) -> Dict[str, Any]:
        out = dict(item)
        out["support_match"] = {"mode": mode}
        if stats:
            out["support_match"].update({k: round(v, 4) for k, v in stats.items()})
        return out

    def get(self, question_id: Optional[str] = None, question_text: Optional[str] = None, options_text: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
        self._ensure_loaded()
        qid = str(question_id or "").strip()
        if qid and qid in self._by_id:
            return self._annotate(self._by_id[qid], mode="question_id")

        signature = self._question_signature(question_text, options_text, ordered=True)
        if signature and signature in self._by_signature:
            return self._annotate(self._by_signature[signature], mode="signature_exact")

        qtext = self._normalize(question_text)
        if qtext and qtext in self._by_text:
            return self._annotate(self._by_text[qtext], mode="text_exact")

        unordered_signature = self._question_signature(question_text, options_text, ordered=False)
        if unordered_signature and unordered_signature in self._by_unordered_signature:
            return self._annotate(self._by_unordered_signature[unordered_signature], mode="signature_unordered")

        if not qtext:
            return None

        best_item: Optional[Dict[str, Any]] = None
        best_stats: Optional[Dict[str, float]] = None
        best_score = 0.0
        for item in self._items:
            stats = self._candidate_stats(query_text=question_text or "", query_choices=options_text, candidate=item)
            score = stats["score"]
            if score > best_score:
                best_item = item
                best_stats = stats
                best_score = score

        if not best_item or not best_stats:
            return None

        strong_choice = best_stats["ordered_choice_match"] >= 1.0 or best_stats["unordered_choice_match"] >= 1.0
        threshold = 0.70 if strong_choice else 0.82
        if best_stats["text_exact"] >= 1.0:
            threshold = min(threshold, 0.55)
        elif best_stats["text_ratio"] >= 0.94:
            threshold = min(threshold, 0.68)
        elif best_stats["token_overlap"] >= 0.75:
            threshold = min(threshold, 0.74)

        if best_score >= threshold:
            return self._annotate(best_item, mode="fuzzy", stats=best_stats)
        return None

    def upsert(self, item: Dict[str, Any]) -> None:
        self._ensure_loaded()
        self._store_item(item)

    def all_items(self) -> List[Dict[str, Any]]:
        self._ensure_loaded()
        return [dict(v) for v in self._items]


question_support_bank = QuestionSupportBank()