j-js commited on
Commit
076a07d
·
verified ·
1 Parent(s): efac600

Update question_support_loader.py

Browse files
Files changed (1) hide show
  1. question_support_loader.py +188 -53
question_support_loader.py CHANGED
@@ -2,6 +2,7 @@ from __future__ import annotations
2
 
3
  import json
4
  import re
 
5
  from pathlib import Path
6
  from typing import Any, Dict, List, Optional, Tuple
7
 
@@ -13,34 +14,78 @@ class QuestionSupportBank:
13
  self._loaded = False
14
  self._by_id: Dict[str, Dict[str, Any]] = {}
15
  self._by_text: Dict[str, Dict[str, Any]] = {}
 
16
  self._by_signature: Dict[str, Dict[str, Any]] = {}
 
17
  self._items: List[Dict[str, Any]] = []
18
 
19
  def _normalize(self, text: Optional[str]) -> str:
20
  cleaned = (text or "").strip().lower()
21
  cleaned = cleaned.replace("’", "'")
 
 
 
 
22
  cleaned = re.sub(r"\s+", " ", cleaned)
23
  return cleaned
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  def _tokenize(self, text: Optional[str]) -> List[str]:
26
- return re.findall(r"[a-z0-9%/]+", self._normalize(text))
 
27
 
28
  def _normalize_choice(self, value: Any) -> str:
29
- return self._normalize(str(value) if value is not None else "")
 
 
30
 
31
  def _choice_signature(self, choices: Optional[List[Any]]) -> str:
32
  cleaned = [self._normalize_choice(choice) for choice in (choices or []) if self._normalize_choice(choice)]
33
  return " || ".join(cleaned)
34
 
 
 
 
 
35
  def _question_signature(self, question_text: Optional[str], choices: Optional[List[Any]] = None) -> str:
36
- q = self._normalize(question_text)
37
  c = self._choice_signature(choices)
38
  return f"{q} ## {c}" if c else q
39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
  def load(self) -> None:
41
  self._by_id = {}
42
  self._by_text = {}
 
43
  self._by_signature = {}
 
44
  self._items = []
45
 
46
  if self.data_path.exists():
@@ -57,10 +102,6 @@ class QuestionSupportBank:
57
 
58
  self._loaded = True
59
 
60
- def _ensure_loaded(self) -> None:
61
- if not self._loaded:
62
- self.load()
63
-
64
  def _store_item(self, item: Dict[str, Any]) -> None:
65
  if not isinstance(item, dict):
66
  return
@@ -70,45 +111,102 @@ class QuestionSupportBank:
70
  stem = stored.get("question_text") or stored.get("stem") or ""
71
  choices = stored.get("options_text") or stored.get("choices") or []
72
 
73
- qtext = self._normalize(stem)
 
74
  signature = self._question_signature(stem, choices)
 
75
 
76
  if qid:
77
  self._by_id[qid] = stored
78
- if qtext:
79
- self._by_text[qtext] = stored
 
 
80
  if signature:
81
  self._by_signature[signature] = stored
 
 
82
 
83
  self._items.append(stored)
84
 
 
 
 
 
 
85
  def _score_candidate(
86
  self,
87
  *,
88
  query_text: str,
89
  query_choices: Optional[List[Any]],
90
  candidate: Dict[str, Any],
91
- ) -> Tuple[float, float, float]:
92
  cand_text = candidate.get("question_text") or candidate.get("stem") or ""
93
  cand_choices = candidate.get("options_text") or candidate.get("choices") or []
94
 
95
- q_tokens = set(self._tokenize(query_text))
96
- c_tokens = set(self._tokenize(cand_text))
97
- if not q_tokens or not c_tokens:
98
- token_overlap = 0.0
99
- else:
100
- token_overlap = len(q_tokens & c_tokens) / max(len(q_tokens | c_tokens), 1)
 
 
 
 
 
 
 
 
 
 
101
 
102
  q_choice_sig = self._choice_signature(query_choices)
103
  c_choice_sig = self._choice_signature(cand_choices)
104
- if q_choice_sig and c_choice_sig:
105
- choice_match = 1.0 if q_choice_sig == c_choice_sig else 0.0
106
- else:
107
- choice_match = 0.0
108
-
109
- exact_text = 1.0 if self._normalize(query_text) == self._normalize(cand_text) else 0.0
110
- score = (0.55 * token_overlap) + (0.35 * choice_match) + (0.10 * exact_text)
111
- return score, token_overlap, choice_match
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
 
113
  def get(
114
  self,
@@ -118,49 +216,86 @@ class QuestionSupportBank:
118
  ) -> Optional[Dict[str, Any]]:
119
  self._ensure_loaded()
120
  qid = str(question_id or "").strip()
121
- if qid and qid in self._by_id:
122
- return dict(self._by_id[qid])
 
 
123
 
124
- qtext = self._normalize(question_text)
125
- if qtext and qtext in self._by_text:
126
- return dict(self._by_text[qtext])
 
 
127
 
128
- signature = self._question_signature(question_text, options_text)
129
  if signature and signature in self._by_signature:
130
- return dict(self._by_signature[signature])
 
 
 
 
 
 
 
 
 
131
 
132
- if not qtext:
 
 
 
 
 
 
 
 
 
 
 
 
133
  return None
134
 
135
  best: Optional[Dict[str, Any]] = None
136
- best_score = 0.0
137
- best_overlap = 0.0
138
- best_choice = 0.0
139
 
140
  for item in self._items:
141
- score, token_overlap, choice_match = self._score_candidate(
142
  query_text=question_text or "",
143
  query_choices=options_text,
144
  candidate=item,
145
  )
146
- if score > best_score:
147
  best = item
148
- best_score = score
149
- best_overlap = token_overlap
150
- best_choice = choice_match
151
-
152
- threshold = 0.84 if options_text else 0.92
153
- if best is not None and (best_score >= threshold or (best_choice >= 1.0 and best_overlap >= 0.55)):
154
- out = dict(best)
155
- out.setdefault("support_match", {})
156
- out["support_match"] = {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
  "mode": "fuzzy",
158
- "score": round(best_score, 4),
159
- "token_overlap": round(best_overlap, 4),
160
- "choice_match": round(best_choice, 4),
161
- }
162
- return out
163
- return None
164
 
165
  def upsert(self, item: Dict[str, Any]) -> None:
166
  self._ensure_loaded()
 
2
 
3
  import json
4
  import re
5
+ from difflib import SequenceMatcher
6
  from pathlib import Path
7
  from typing import Any, Dict, List, Optional, Tuple
8
 
 
14
  self._loaded = False
15
  self._by_id: Dict[str, Dict[str, Any]] = {}
16
  self._by_text: Dict[str, Dict[str, Any]] = {}
17
+ self._by_canonical_text: Dict[str, Dict[str, Any]] = {}
18
  self._by_signature: Dict[str, Dict[str, Any]] = {}
19
+ self._by_signature_nolabels: Dict[str, Dict[str, Any]] = {}
20
  self._items: List[Dict[str, Any]] = []
21
 
22
  def _normalize(self, text: Optional[str]) -> str:
23
  cleaned = (text or "").strip().lower()
24
  cleaned = cleaned.replace("’", "'")
25
+ cleaned = cleaned.replace("‘", "'")
26
+ cleaned = cleaned.replace("“", '"').replace("”", '"')
27
+ cleaned = cleaned.replace("–", "-").replace("—", "-")
28
+ cleaned = cleaned.replace("×", "x")
29
  cleaned = re.sub(r"\s+", " ", cleaned)
30
  return cleaned
31
 
32
+ def _canonicalize_text(self, text: Optional[str]) -> str:
33
+ cleaned = self._normalize(text)
34
+ if not cleaned:
35
+ return ""
36
+ cleaned = re.sub(r"\b([a-e])\s*[\)\.]\s*", " ", cleaned)
37
+ cleaned = re.sub(r"\boption\s+[a-e]\b", " ", cleaned)
38
+ cleaned = re.sub(r"\bchoices?\s*:\s*", " ", cleaned)
39
+ cleaned = re.sub(r"\s*\?\s*$", "", cleaned)
40
+ cleaned = re.sub(r"\s*[:;,]\s*", " ", cleaned)
41
+ cleaned = re.sub(r"\s*([=+\-/*()])\s*", r" \1 ", cleaned)
42
+ cleaned = re.sub(r"[^a-z0-9%/=+\-/*(). ]+", " ", cleaned)
43
+ cleaned = re.sub(r"\s+", " ", cleaned).strip()
44
+ return cleaned
45
+
46
  def _tokenize(self, text: Optional[str]) -> List[str]:
47
+ canon = self._canonicalize_text(text)
48
+ return re.findall(r"[a-z0-9%/\.]+", canon)
49
 
50
  def _normalize_choice(self, value: Any) -> str:
51
+ text = self._canonicalize_text(str(value) if value is not None else "")
52
+ text = re.sub(r"^([a-e])\s*[\)\.]\s*", "", text).strip()
53
+ return text
54
 
55
  def _choice_signature(self, choices: Optional[List[Any]]) -> str:
56
  cleaned = [self._normalize_choice(choice) for choice in (choices or []) if self._normalize_choice(choice)]
57
  return " || ".join(cleaned)
58
 
59
+ def _choice_signature_nolabels(self, choices: Optional[List[Any]]) -> str:
60
+ cleaned = sorted([self._normalize_choice(choice) for choice in (choices or []) if self._normalize_choice(choice)])
61
+ return " || ".join(cleaned)
62
+
63
  def _question_signature(self, question_text: Optional[str], choices: Optional[List[Any]] = None) -> str:
64
+ q = self._canonicalize_text(question_text)
65
  c = self._choice_signature(choices)
66
  return f"{q} ## {c}" if c else q
67
 
68
+ def _question_signature_nolabels(self, question_text: Optional[str], choices: Optional[List[Any]] = None) -> str:
69
+ q = self._canonicalize_text(question_text)
70
+ c = self._choice_signature_nolabels(choices)
71
+ return f"{q} ## {c}" if c else q
72
+
73
+ def _shingles(self, text: Optional[str], size: int = 3) -> set[str]:
74
+ tokens = self._tokenize(text)
75
+ if len(tokens) < size:
76
+ return {" ".join(tokens)} if tokens else set()
77
+ return {" ".join(tokens[i : i + size]) for i in range(len(tokens) - size + 1)}
78
+
79
+ def _ensure_loaded(self) -> None:
80
+ if not self._loaded:
81
+ self.load()
82
+
83
  def load(self) -> None:
84
  self._by_id = {}
85
  self._by_text = {}
86
+ self._by_canonical_text = {}
87
  self._by_signature = {}
88
+ self._by_signature_nolabels = {}
89
  self._items = []
90
 
91
  if self.data_path.exists():
 
102
 
103
  self._loaded = True
104
 
 
 
 
 
105
  def _store_item(self, item: Dict[str, Any]) -> None:
106
  if not isinstance(item, dict):
107
  return
 
111
  stem = stored.get("question_text") or stored.get("stem") or ""
112
  choices = stored.get("options_text") or stored.get("choices") or []
113
 
114
+ raw_text = self._normalize(stem)
115
+ canonical_text = self._canonicalize_text(stem)
116
  signature = self._question_signature(stem, choices)
117
+ signature_nolabels = self._question_signature_nolabels(stem, choices)
118
 
119
  if qid:
120
  self._by_id[qid] = stored
121
+ if raw_text:
122
+ self._by_text[raw_text] = stored
123
+ if canonical_text:
124
+ self._by_canonical_text[canonical_text] = stored
125
  if signature:
126
  self._by_signature[signature] = stored
127
+ if signature_nolabels:
128
+ self._by_signature_nolabels[signature_nolabels] = stored
129
 
130
  self._items.append(stored)
131
 
132
+ def _clone_with_match(self, item: Dict[str, Any], match: Dict[str, Any]) -> Dict[str, Any]:
133
+ out = dict(item)
134
+ out["support_match"] = match
135
+ return out
136
+
137
  def _score_candidate(
138
  self,
139
  *,
140
  query_text: str,
141
  query_choices: Optional[List[Any]],
142
  candidate: Dict[str, Any],
143
+ ) -> Dict[str, float]:
144
  cand_text = candidate.get("question_text") or candidate.get("stem") or ""
145
  cand_choices = candidate.get("options_text") or candidate.get("choices") or []
146
 
147
+ query_norm = self._canonicalize_text(query_text)
148
+ cand_norm = self._canonicalize_text(cand_text)
149
+
150
+ q_tokens = set(self._tokenize(query_norm))
151
+ c_tokens = set(self._tokenize(cand_norm))
152
+ token_overlap = len(q_tokens & c_tokens) / max(len(q_tokens | c_tokens), 1) if q_tokens and c_tokens else 0.0
153
+
154
+ q_shingles = self._shingles(query_norm)
155
+ c_shingles = self._shingles(cand_norm)
156
+ shingle_overlap = len(q_shingles & c_shingles) / max(len(q_shingles | c_shingles), 1) if q_shingles and c_shingles else 0.0
157
+
158
+ seq = SequenceMatcher(None, query_norm, cand_norm).ratio() if query_norm and cand_norm else 0.0
159
+
160
+ q_nums = set(re.findall(r"\d+(?:\.\d+)?%?", query_norm))
161
+ c_nums = set(re.findall(r"\d+(?:\.\d+)?%?", cand_norm))
162
+ number_overlap = len(q_nums & c_nums) / max(len(q_nums | c_nums), 1) if q_nums and c_nums else (1.0 if not q_nums and not c_nums else 0.0)
163
 
164
  q_choice_sig = self._choice_signature(query_choices)
165
  c_choice_sig = self._choice_signature(cand_choices)
166
+ q_choice_sig_nl = self._choice_signature_nolabels(query_choices)
167
+ c_choice_sig_nl = self._choice_signature_nolabels(cand_choices)
168
+
169
+ choice_match = 1.0 if q_choice_sig and c_choice_sig and q_choice_sig == c_choice_sig else 0.0
170
+ choice_set_match = 1.0 if q_choice_sig_nl and c_choice_sig_nl and q_choice_sig_nl == c_choice_sig_nl else 0.0
171
+
172
+ exact_text = 1.0 if query_norm and query_norm == cand_norm else 0.0
173
+ exact_signature = 1.0 if self._question_signature(query_text, query_choices) == self._question_signature(cand_text, cand_choices) else 0.0
174
+ exact_signature_nolabels = 1.0 if self._question_signature_nolabels(query_text, query_choices) == self._question_signature_nolabels(cand_text, cand_choices) else 0.0
175
+
176
+ score = (
177
+ 0.28 * exact_text
178
+ + 0.18 * exact_signature
179
+ + 0.08 * exact_signature_nolabels
180
+ + 0.16 * choice_match
181
+ + 0.08 * choice_set_match
182
+ + 0.12 * token_overlap
183
+ + 0.06 * shingle_overlap
184
+ + 0.02 * number_overlap
185
+ + 0.02 * seq
186
+ )
187
+
188
+ return {
189
+ "score": round(score, 6),
190
+ "token_overlap": round(token_overlap, 6),
191
+ "shingle_overlap": round(shingle_overlap, 6),
192
+ "sequence_ratio": round(seq, 6),
193
+ "number_overlap": round(number_overlap, 6),
194
+ "choice_match": round(choice_match, 6),
195
+ "choice_set_match": round(choice_set_match, 6),
196
+ "exact_text": round(exact_text, 6),
197
+ "exact_signature": round(exact_signature, 6),
198
+ "exact_signature_nolabels": round(exact_signature_nolabels, 6),
199
+ }
200
+
201
+ def _confidence_label(self, metrics: Dict[str, float]) -> str:
202
+ score = metrics.get("score", 0.0)
203
+ if metrics.get("exact_signature", 0.0) >= 1.0 or metrics.get("exact_text", 0.0) >= 1.0:
204
+ return "exact"
205
+ if score >= 0.82:
206
+ return "high"
207
+ if score >= 0.70:
208
+ return "medium"
209
+ return "low"
210
 
211
  def get(
212
  self,
 
216
  ) -> Optional[Dict[str, Any]]:
217
  self._ensure_loaded()
218
  qid = str(question_id or "").strip()
219
+ raw_text = self._normalize(question_text)
220
+ canonical_text = self._canonicalize_text(question_text)
221
+ signature = self._question_signature(question_text, options_text)
222
+ signature_nolabels = self._question_signature_nolabels(question_text, options_text)
223
 
224
+ if qid and qid in self._by_id:
225
+ return self._clone_with_match(
226
+ self._by_id[qid],
227
+ {"mode": "question_id", "confidence": "exact", "score": 1.0},
228
+ )
229
 
 
230
  if signature and signature in self._by_signature:
231
+ return self._clone_with_match(
232
+ self._by_signature[signature],
233
+ {"mode": "signature", "confidence": "exact", "score": 0.995},
234
+ )
235
+
236
+ if signature_nolabels and signature_nolabels in self._by_signature_nolabels:
237
+ return self._clone_with_match(
238
+ self._by_signature_nolabels[signature_nolabels],
239
+ {"mode": "signature_nolabels", "confidence": "exact", "score": 0.99},
240
+ )
241
 
242
+ if raw_text and raw_text in self._by_text:
243
+ return self._clone_with_match(
244
+ self._by_text[raw_text],
245
+ {"mode": "question_text", "confidence": "exact", "score": 0.985},
246
+ )
247
+
248
+ if canonical_text and canonical_text in self._by_canonical_text:
249
+ return self._clone_with_match(
250
+ self._by_canonical_text[canonical_text],
251
+ {"mode": "canonical_text", "confidence": "exact", "score": 0.98},
252
+ )
253
+
254
+ if not canonical_text:
255
  return None
256
 
257
  best: Optional[Dict[str, Any]] = None
258
+ best_metrics: Optional[Dict[str, float]] = None
 
 
259
 
260
  for item in self._items:
261
+ metrics = self._score_candidate(
262
  query_text=question_text or "",
263
  query_choices=options_text,
264
  candidate=item,
265
  )
266
+ if best_metrics is None or metrics["score"] > best_metrics["score"]:
267
  best = item
268
+ best_metrics = metrics
269
+
270
+ if best is None or best_metrics is None:
271
+ return None
272
+
273
+ confidence = self._confidence_label(best_metrics)
274
+ score = best_metrics["score"]
275
+
276
+ accept = False
277
+ if confidence == "exact":
278
+ accept = True
279
+ elif score >= 0.82:
280
+ accept = True
281
+ elif best_metrics.get("choice_set_match", 0.0) >= 1.0 and best_metrics.get("token_overlap", 0.0) >= 0.55:
282
+ accept = True
283
+ elif best_metrics.get("shingle_overlap", 0.0) >= 0.72 and best_metrics.get("sequence_ratio", 0.0) >= 0.84:
284
+ accept = True
285
+ elif best_metrics.get("number_overlap", 0.0) >= 1.0 and best_metrics.get("token_overlap", 0.0) >= 0.68:
286
+ accept = True
287
+
288
+ if not accept:
289
+ return None
290
+
291
+ return self._clone_with_match(
292
+ best,
293
+ {
294
  "mode": "fuzzy",
295
+ "confidence": confidence,
296
+ **best_metrics,
297
+ },
298
+ )
 
 
299
 
300
  def upsert(self, item: Dict[str, Any]) -> None:
301
  self._ensure_loaded()