j-js commited on
Commit
bcf6874
·
verified ·
1 Parent(s): 010a947

Update question_support_loader.py

Browse files
Files changed (1) hide show
  1. question_support_loader.py +53 -188
question_support_loader.py CHANGED
@@ -2,7 +2,6 @@ from __future__ import annotations
2
 
3
  import json
4
  import re
5
- from difflib import SequenceMatcher
6
  from pathlib import Path
7
  from typing import Any, Dict, List, Optional, Tuple
8
 
@@ -14,78 +13,34 @@ class QuestionSupportBank:
14
  self._loaded = False
15
  self._by_id: Dict[str, Dict[str, Any]] = {}
16
  self._by_text: Dict[str, Dict[str, Any]] = {}
17
- self._by_canonical_text: Dict[str, Dict[str, Any]] = {}
18
  self._by_signature: Dict[str, Dict[str, Any]] = {}
19
- self._by_signature_nolabels: Dict[str, Dict[str, Any]] = {}
20
  self._items: List[Dict[str, Any]] = []
21
 
22
  def _normalize(self, text: Optional[str]) -> str:
23
  cleaned = (text or "").strip().lower()
24
  cleaned = cleaned.replace("’", "'")
25
- cleaned = cleaned.replace("‘", "'")
26
- cleaned = cleaned.replace("“", '"').replace("”", '"')
27
- cleaned = cleaned.replace("–", "-").replace("—", "-")
28
- cleaned = cleaned.replace("×", "x")
29
  cleaned = re.sub(r"\s+", " ", cleaned)
30
  return cleaned
31
 
32
- def _canonicalize_text(self, text: Optional[str]) -> str:
33
- cleaned = self._normalize(text)
34
- if not cleaned:
35
- return ""
36
- cleaned = re.sub(r"\b([a-e])\s*[\)\.]\s*", " ", cleaned)
37
- cleaned = re.sub(r"\boption\s+[a-e]\b", " ", cleaned)
38
- cleaned = re.sub(r"\bchoices?\s*:\s*", " ", cleaned)
39
- cleaned = re.sub(r"\s*\?\s*$", "", cleaned)
40
- cleaned = re.sub(r"\s*[:;,]\s*", " ", cleaned)
41
- cleaned = re.sub(r"\s*([=+\-/*()])\s*", r" \1 ", cleaned)
42
- cleaned = re.sub(r"[^a-z0-9%/=+\-/*(). ]+", " ", cleaned)
43
- cleaned = re.sub(r"\s+", " ", cleaned).strip()
44
- return cleaned
45
-
46
  def _tokenize(self, text: Optional[str]) -> List[str]:
47
- canon = self._canonicalize_text(text)
48
- return re.findall(r"[a-z0-9%/\.]+", canon)
49
 
50
  def _normalize_choice(self, value: Any) -> str:
51
- text = self._canonicalize_text(str(value) if value is not None else "")
52
- text = re.sub(r"^([a-e])\s*[\)\.]\s*", "", text).strip()
53
- return text
54
 
55
  def _choice_signature(self, choices: Optional[List[Any]]) -> str:
56
  cleaned = [self._normalize_choice(choice) for choice in (choices or []) if self._normalize_choice(choice)]
57
  return " || ".join(cleaned)
58
 
59
- def _choice_signature_nolabels(self, choices: Optional[List[Any]]) -> str:
60
- cleaned = sorted([self._normalize_choice(choice) for choice in (choices or []) if self._normalize_choice(choice)])
61
- return " || ".join(cleaned)
62
-
63
  def _question_signature(self, question_text: Optional[str], choices: Optional[List[Any]] = None) -> str:
64
- q = self._canonicalize_text(question_text)
65
  c = self._choice_signature(choices)
66
  return f"{q} ## {c}" if c else q
67
 
68
- def _question_signature_nolabels(self, question_text: Optional[str], choices: Optional[List[Any]] = None) -> str:
69
- q = self._canonicalize_text(question_text)
70
- c = self._choice_signature_nolabels(choices)
71
- return f"{q} ## {c}" if c else q
72
-
73
- def _shingles(self, text: Optional[str], size: int = 3) -> set[str]:
74
- tokens = self._tokenize(text)
75
- if len(tokens) < size:
76
- return {" ".join(tokens)} if tokens else set()
77
- return {" ".join(tokens[i : i + size]) for i in range(len(tokens) - size + 1)}
78
-
79
- def _ensure_loaded(self) -> None:
80
- if not self._loaded:
81
- self.load()
82
-
83
  def load(self) -> None:
84
  self._by_id = {}
85
  self._by_text = {}
86
- self._by_canonical_text = {}
87
  self._by_signature = {}
88
- self._by_signature_nolabels = {}
89
  self._items = []
90
 
91
  if self.data_path.exists():
@@ -102,6 +57,10 @@ class QuestionSupportBank:
102
 
103
  self._loaded = True
104
 
 
 
 
 
105
  def _store_item(self, item: Dict[str, Any]) -> None:
106
  if not isinstance(item, dict):
107
  return
@@ -111,102 +70,45 @@ class QuestionSupportBank:
111
  stem = stored.get("question_text") or stored.get("stem") or ""
112
  choices = stored.get("options_text") or stored.get("choices") or []
113
 
114
- raw_text = self._normalize(stem)
115
- canonical_text = self._canonicalize_text(stem)
116
  signature = self._question_signature(stem, choices)
117
- signature_nolabels = self._question_signature_nolabels(stem, choices)
118
 
119
  if qid:
120
  self._by_id[qid] = stored
121
- if raw_text:
122
- self._by_text[raw_text] = stored
123
- if canonical_text:
124
- self._by_canonical_text[canonical_text] = stored
125
  if signature:
126
  self._by_signature[signature] = stored
127
- if signature_nolabels:
128
- self._by_signature_nolabels[signature_nolabels] = stored
129
 
130
  self._items.append(stored)
131
 
132
- def _clone_with_match(self, item: Dict[str, Any], match: Dict[str, Any]) -> Dict[str, Any]:
133
- out = dict(item)
134
- out["support_match"] = match
135
- return out
136
-
137
  def _score_candidate(
138
  self,
139
  *,
140
  query_text: str,
141
  query_choices: Optional[List[Any]],
142
  candidate: Dict[str, Any],
143
- ) -> Dict[str, float]:
144
  cand_text = candidate.get("question_text") or candidate.get("stem") or ""
145
  cand_choices = candidate.get("options_text") or candidate.get("choices") or []
146
 
147
- query_norm = self._canonicalize_text(query_text)
148
- cand_norm = self._canonicalize_text(cand_text)
149
-
150
- q_tokens = set(self._tokenize(query_norm))
151
- c_tokens = set(self._tokenize(cand_norm))
152
- token_overlap = len(q_tokens & c_tokens) / max(len(q_tokens | c_tokens), 1) if q_tokens and c_tokens else 0.0
153
-
154
- q_shingles = self._shingles(query_norm)
155
- c_shingles = self._shingles(cand_norm)
156
- shingle_overlap = len(q_shingles & c_shingles) / max(len(q_shingles | c_shingles), 1) if q_shingles and c_shingles else 0.0
157
-
158
- seq = SequenceMatcher(None, query_norm, cand_norm).ratio() if query_norm and cand_norm else 0.0
159
-
160
- q_nums = set(re.findall(r"\d+(?:\.\d+)?%?", query_norm))
161
- c_nums = set(re.findall(r"\d+(?:\.\d+)?%?", cand_norm))
162
- number_overlap = len(q_nums & c_nums) / max(len(q_nums | c_nums), 1) if q_nums and c_nums else (1.0 if not q_nums and not c_nums else 0.0)
163
 
164
  q_choice_sig = self._choice_signature(query_choices)
165
  c_choice_sig = self._choice_signature(cand_choices)
166
- q_choice_sig_nl = self._choice_signature_nolabels(query_choices)
167
- c_choice_sig_nl = self._choice_signature_nolabels(cand_choices)
168
-
169
- choice_match = 1.0 if q_choice_sig and c_choice_sig and q_choice_sig == c_choice_sig else 0.0
170
- choice_set_match = 1.0 if q_choice_sig_nl and c_choice_sig_nl and q_choice_sig_nl == c_choice_sig_nl else 0.0
171
-
172
- exact_text = 1.0 if query_norm and query_norm == cand_norm else 0.0
173
- exact_signature = 1.0 if self._question_signature(query_text, query_choices) == self._question_signature(cand_text, cand_choices) else 0.0
174
- exact_signature_nolabels = 1.0 if self._question_signature_nolabels(query_text, query_choices) == self._question_signature_nolabels(cand_text, cand_choices) else 0.0
175
-
176
- score = (
177
- 0.28 * exact_text
178
- + 0.18 * exact_signature
179
- + 0.08 * exact_signature_nolabels
180
- + 0.16 * choice_match
181
- + 0.08 * choice_set_match
182
- + 0.12 * token_overlap
183
- + 0.06 * shingle_overlap
184
- + 0.02 * number_overlap
185
- + 0.02 * seq
186
- )
187
-
188
- return {
189
- "score": round(score, 6),
190
- "token_overlap": round(token_overlap, 6),
191
- "shingle_overlap": round(shingle_overlap, 6),
192
- "sequence_ratio": round(seq, 6),
193
- "number_overlap": round(number_overlap, 6),
194
- "choice_match": round(choice_match, 6),
195
- "choice_set_match": round(choice_set_match, 6),
196
- "exact_text": round(exact_text, 6),
197
- "exact_signature": round(exact_signature, 6),
198
- "exact_signature_nolabels": round(exact_signature_nolabels, 6),
199
- }
200
-
201
- def _confidence_label(self, metrics: Dict[str, float]) -> str:
202
- score = metrics.get("score", 0.0)
203
- if metrics.get("exact_signature", 0.0) >= 1.0 or metrics.get("exact_text", 0.0) >= 1.0:
204
- return "exact"
205
- if score >= 0.82:
206
- return "high"
207
- if score >= 0.70:
208
- return "medium"
209
- return "low"
210
 
211
  def get(
212
  self,
@@ -216,86 +118,49 @@ class QuestionSupportBank:
216
  ) -> Optional[Dict[str, Any]]:
217
  self._ensure_loaded()
218
  qid = str(question_id or "").strip()
219
- raw_text = self._normalize(question_text)
220
- canonical_text = self._canonicalize_text(question_text)
221
- signature = self._question_signature(question_text, options_text)
222
- signature_nolabels = self._question_signature_nolabels(question_text, options_text)
223
-
224
  if qid and qid in self._by_id:
225
- return self._clone_with_match(
226
- self._by_id[qid],
227
- {"mode": "question_id", "confidence": "exact", "score": 1.0},
228
- )
229
 
230
- if signature and signature in self._by_signature:
231
- return self._clone_with_match(
232
- self._by_signature[signature],
233
- {"mode": "signature", "confidence": "exact", "score": 0.995},
234
- )
235
 
236
- if signature_nolabels and signature_nolabels in self._by_signature_nolabels:
237
- return self._clone_with_match(
238
- self._by_signature_nolabels[signature_nolabels],
239
- {"mode": "signature_nolabels", "confidence": "exact", "score": 0.99},
240
- )
241
-
242
- if raw_text and raw_text in self._by_text:
243
- return self._clone_with_match(
244
- self._by_text[raw_text],
245
- {"mode": "question_text", "confidence": "exact", "score": 0.985},
246
- )
247
-
248
- if canonical_text and canonical_text in self._by_canonical_text:
249
- return self._clone_with_match(
250
- self._by_canonical_text[canonical_text],
251
- {"mode": "canonical_text", "confidence": "exact", "score": 0.98},
252
- )
253
 
254
- if not canonical_text:
255
  return None
256
 
257
  best: Optional[Dict[str, Any]] = None
258
- best_metrics: Optional[Dict[str, float]] = None
 
 
259
 
260
  for item in self._items:
261
- metrics = self._score_candidate(
262
  query_text=question_text or "",
263
  query_choices=options_text,
264
  candidate=item,
265
  )
266
- if best_metrics is None or metrics["score"] > best_metrics["score"]:
267
  best = item
268
- best_metrics = metrics
269
-
270
- if best is None or best_metrics is None:
271
- return None
272
-
273
- confidence = self._confidence_label(best_metrics)
274
- score = best_metrics["score"]
275
-
276
- accept = False
277
- if confidence == "exact":
278
- accept = True
279
- elif score >= 0.82:
280
- accept = True
281
- elif best_metrics.get("choice_set_match", 0.0) >= 1.0 and best_metrics.get("token_overlap", 0.0) >= 0.55:
282
- accept = True
283
- elif best_metrics.get("shingle_overlap", 0.0) >= 0.72 and best_metrics.get("sequence_ratio", 0.0) >= 0.84:
284
- accept = True
285
- elif best_metrics.get("number_overlap", 0.0) >= 1.0 and best_metrics.get("token_overlap", 0.0) >= 0.68:
286
- accept = True
287
-
288
- if not accept:
289
- return None
290
-
291
- return self._clone_with_match(
292
- best,
293
- {
294
  "mode": "fuzzy",
295
- "confidence": confidence,
296
- **best_metrics,
297
- },
298
- )
 
 
299
 
300
  def upsert(self, item: Dict[str, Any]) -> None:
301
  self._ensure_loaded()
 
2
 
3
  import json
4
  import re
 
5
  from pathlib import Path
6
  from typing import Any, Dict, List, Optional, Tuple
7
 
 
13
  self._loaded = False
14
  self._by_id: Dict[str, Dict[str, Any]] = {}
15
  self._by_text: Dict[str, Dict[str, Any]] = {}
 
16
  self._by_signature: Dict[str, Dict[str, Any]] = {}
 
17
  self._items: List[Dict[str, Any]] = []
18
 
19
  def _normalize(self, text: Optional[str]) -> str:
20
  cleaned = (text or "").strip().lower()
21
  cleaned = cleaned.replace("’", "'")
 
 
 
 
22
  cleaned = re.sub(r"\s+", " ", cleaned)
23
  return cleaned
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  def _tokenize(self, text: Optional[str]) -> List[str]:
26
+ return re.findall(r"[a-z0-9%/]+", self._normalize(text))
 
27
 
28
  def _normalize_choice(self, value: Any) -> str:
29
+ return self._normalize(str(value) if value is not None else "")
 
 
30
 
31
  def _choice_signature(self, choices: Optional[List[Any]]) -> str:
32
  cleaned = [self._normalize_choice(choice) for choice in (choices or []) if self._normalize_choice(choice)]
33
  return " || ".join(cleaned)
34
 
 
 
 
 
35
  def _question_signature(self, question_text: Optional[str], choices: Optional[List[Any]] = None) -> str:
36
+ q = self._normalize(question_text)
37
  c = self._choice_signature(choices)
38
  return f"{q} ## {c}" if c else q
39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
  def load(self) -> None:
41
  self._by_id = {}
42
  self._by_text = {}
 
43
  self._by_signature = {}
 
44
  self._items = []
45
 
46
  if self.data_path.exists():
 
57
 
58
  self._loaded = True
59
 
60
+ def _ensure_loaded(self) -> None:
61
+ if not self._loaded:
62
+ self.load()
63
+
64
  def _store_item(self, item: Dict[str, Any]) -> None:
65
  if not isinstance(item, dict):
66
  return
 
70
  stem = stored.get("question_text") or stored.get("stem") or ""
71
  choices = stored.get("options_text") or stored.get("choices") or []
72
 
73
+ qtext = self._normalize(stem)
 
74
  signature = self._question_signature(stem, choices)
 
75
 
76
  if qid:
77
  self._by_id[qid] = stored
78
+ if qtext:
79
+ self._by_text[qtext] = stored
 
 
80
  if signature:
81
  self._by_signature[signature] = stored
 
 
82
 
83
  self._items.append(stored)
84
 
 
 
 
 
 
85
  def _score_candidate(
86
  self,
87
  *,
88
  query_text: str,
89
  query_choices: Optional[List[Any]],
90
  candidate: Dict[str, Any],
91
+ ) -> Tuple[float, float, float]:
92
  cand_text = candidate.get("question_text") or candidate.get("stem") or ""
93
  cand_choices = candidate.get("options_text") or candidate.get("choices") or []
94
 
95
+ q_tokens = set(self._tokenize(query_text))
96
+ c_tokens = set(self._tokenize(cand_text))
97
+ if not q_tokens or not c_tokens:
98
+ token_overlap = 0.0
99
+ else:
100
+ token_overlap = len(q_tokens & c_tokens) / max(len(q_tokens | c_tokens), 1)
 
 
 
 
 
 
 
 
 
 
101
 
102
  q_choice_sig = self._choice_signature(query_choices)
103
  c_choice_sig = self._choice_signature(cand_choices)
104
+ if q_choice_sig and c_choice_sig:
105
+ choice_match = 1.0 if q_choice_sig == c_choice_sig else 0.0
106
+ else:
107
+ choice_match = 0.0
108
+
109
+ exact_text = 1.0 if self._normalize(query_text) == self._normalize(cand_text) else 0.0
110
+ score = (0.55 * token_overlap) + (0.35 * choice_match) + (0.10 * exact_text)
111
+ return score, token_overlap, choice_match
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
 
113
  def get(
114
  self,
 
118
  ) -> Optional[Dict[str, Any]]:
119
  self._ensure_loaded()
120
  qid = str(question_id or "").strip()
 
 
 
 
 
121
  if qid and qid in self._by_id:
122
+ return dict(self._by_id[qid])
 
 
 
123
 
124
+ qtext = self._normalize(question_text)
125
+ if qtext and qtext in self._by_text:
126
+ return dict(self._by_text[qtext])
 
 
127
 
128
+ signature = self._question_signature(question_text, options_text)
129
+ if signature and signature in self._by_signature:
130
+ return dict(self._by_signature[signature])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
 
132
+ if not qtext:
133
  return None
134
 
135
  best: Optional[Dict[str, Any]] = None
136
+ best_score = 0.0
137
+ best_overlap = 0.0
138
+ best_choice = 0.0
139
 
140
  for item in self._items:
141
+ score, token_overlap, choice_match = self._score_candidate(
142
  query_text=question_text or "",
143
  query_choices=options_text,
144
  candidate=item,
145
  )
146
+ if score > best_score:
147
  best = item
148
+ best_score = score
149
+ best_overlap = token_overlap
150
+ best_choice = choice_match
151
+
152
+ threshold = 0.84 if options_text else 0.92
153
+ if best is not None and (best_score >= threshold or (best_choice >= 1.0 and best_overlap >= 0.55)):
154
+ out = dict(best)
155
+ out.setdefault("support_match", {})
156
+ out["support_match"] = {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
  "mode": "fuzzy",
158
+ "score": round(best_score, 4),
159
+ "token_overlap": round(best_overlap, 4),
160
+ "choice_match": round(best_choice, 4),
161
+ }
162
+ return out
163
+ return None
164
 
165
  def upsert(self, item: Dict[str, Any]) -> None:
166
  self._ensure_loaded()