srilakshu012456 commited on
Commit
e293c42
·
verified ·
1 Parent(s): 46344cb

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +70 -76
main.py CHANGED
@@ -105,6 +105,47 @@ DOMAIN_STATUS_TERMS = (
105
  "asn", "grn", "pick", "picking"
106
  )
107
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
  def _is_domain_status_context(msg_norm: str) -> bool:
109
  if "status locked" in msg_norm or "locked status" in msg_norm:
110
  return True
@@ -161,80 +202,45 @@ def _ensure_numbering(text: str) -> str:
161
  out.append(f"{marker} {seg}")
162
  return "\n".join(out)
163
 
164
-
165
- def _filter_error_lines_by_query(text: str, query: str, max_lines: int = 4) -> str:
166
  """
167
- Pick the most relevant 'Common Errors & Resolution' bullets for the user's message.
168
- Generic across SOPs via error families + phrase overlap.
169
-
170
- Prioritization:
171
  1) error-family match (NOT_FOUND/MISMATCH/LOCKED/PERMISSION/TIMEOUT/SYNC),
172
- 2) anchored starts (line begins with the error phrase/heading),
173
  3) multi-word overlap (bigrams/trigrams),
174
  4) token overlap,
175
- 5) bullet/heading formatting bonus.
176
 
177
- If no line matches positively, falls back to the first few lines.
178
  """
179
 
180
  import re
181
  from typing import List, Tuple
182
 
183
- # --- Generic error families (SOP-wide) ---
184
- ERROR_FAMILIES = {
185
- "NOT_FOUND": (
186
- "not found", "missing", "does not exist", "doesn't exist",
187
- "unavailable", "not available", "cannot find", "no such", "not present", "absent"
188
- ),
189
- "MISMATCH": (
190
- "mismatch", "doesn't match", "does not match", "variance",
191
- "difference", "discrepancy", "not equal"
192
- ),
193
- "LOCKED": (
194
- "locked", "status locked", "blocked", "read only", "read-only", "frozen", "freeze"
195
- ),
196
- "PERMISSION": (
197
- "permission", "permissions", "access denied", "not authorized",
198
- "not authorised", "insufficient privileges", "no access", "authorization", "authorisation"
199
- ),
200
- "TIMEOUT": (
201
- "timeout", "timed out", "network", "connection", "unable to connect",
202
- "disconnected", "no network"
203
- ),
204
- "SYNC": (
205
- "sync", "synchronization", "synchronisation", "replication",
206
- "refresh", "out of sync", "stale", "delay", "lag"
207
- ),
208
- }
209
-
210
- # Normalizer
211
  def _norm(s: str) -> str:
212
  s = (s or "").lower()
213
  s = re.sub(r"[^\w\s]", " ", s)
214
  s = re.sub(r"\s+", " ", s).strip()
215
  return s
216
 
217
- # Detect error families mentioned in a string
218
- def _families_for(s: str) -> List[str]:
219
- out = []
220
- low = _norm(s)
221
- for fam, syns in ERROR_FAMILIES.items():
222
- if any(k in low for k in syns):
223
- out.append(fam)
224
- return out
225
-
226
- # N-grams
227
  def _ngrams(tokens: List[str], n: int) -> List[str]:
228
  return [" ".join(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]
229
 
230
- # Normalize query
 
 
 
 
 
 
 
231
  q = _norm(query)
232
  q_tokens = [t for t in q.split() if len(t) > 1]
233
  q_bi = _ngrams(q_tokens, 2)
234
  q_tri = _ngrams(q_tokens, 3)
235
- q_families = set(_families_for(query))
236
 
237
- # Candidate lines
238
  lines = _normalize_lines(text)
239
  if not lines:
240
  return (text or "").strip()
@@ -242,53 +248,41 @@ def _filter_error_lines_by_query(text: str, query: str, max_lines: int = 4) -> s
242
  scored: List[Tuple[float, str]] = []
243
  for ln in lines:
244
  ln_norm = _norm(ln)
245
- ln_families = set(_families_for(ln))
246
 
247
- # --- Signals ---
248
- # Family match (strong): any overlap between query families and line families
249
- fam_overlap = len(q_families & ln_families)
250
- fam_score = 1.60 * fam_overlap # strong boost when families line up
251
-
252
- # Exact phrase (medium-strong)
253
- exact_phrase = 1.00 if (q and q in ln_norm) else 0.0
254
-
255
- # Anchored start (strong for bullet headings like "ASN not found: ...")
256
  first2 = " ".join(q_tokens[:2]) if len(q_tokens) >= 2 else ""
257
  first3 = " ".join(q_tokens[:3]) if len(q_tokens) >= 3 else ""
258
- anchored = 1.00 if (first3 and ln_norm.startswith(first3)) or (first2 and ln_norm.startswith(first2)) else 0.0
 
259
 
260
- # Multi-word phrase overlap
261
  bigram_hits = sum(1 for bg in q_bi if bg and bg in ln_norm)
262
  trigram_hits = sum(1 for tg in q_tri if tg and tg in ln_norm)
263
-
264
- # Token overlap (fallback)
265
  token_overlap = sum(1 for t in q_tokens if t and t in ln_norm)
 
266
 
267
- # --- Score composition (tuned for generic SOPs) ---
268
  score = (
269
- fam_score +
270
- 0.90 * anchored +
271
  0.80 * trigram_hits +
272
  0.55 * bigram_hits +
273
- 0.45 * exact_phrase +
274
  0.30 * token_overlap
275
  )
276
 
277
- # Small bonuses for bullets/heading-like lines
278
- if re.match(r"^\s*[\-\*\u2022]\s*", ln): # bullet dot
279
  score += 0.10
280
- # Heading before ':' matches some part of the query
281
  heading = ln_norm.split(":")[0].strip()
282
  if heading and (heading in q or (first2 and first2 in heading)):
283
  score += 0.15
284
 
285
  scored.append((score, ln))
286
 
287
- # Sort by score desc and take top max_lines
288
  scored.sort(key=lambda x: x[0], reverse=True)
289
  top = [ln for s, ln in scored[:max_lines] if s > 0.0]
290
 
291
- # Fallback if everything scored zero
292
  if not top:
293
  top = lines[:max_lines]
294
 
@@ -863,8 +857,8 @@ async def chat_with_ai(input_data: ChatInput):
863
 
864
  # Bypass gate when strong steps signals are present for Receiving module
865
  strong_steps_bypass = looks_like_steps_query and looks_like_receiving
866
-
867
- if (weak_domain_only or (low_context_hit and not combined_ok)) and not strong_steps_bypass:
868
  return {
869
  "bot_response": _build_clarifying_message(),
870
  "status": "NO_KB_MATCH",
@@ -909,7 +903,7 @@ async def chat_with_ai(input_data: ChatInput):
909
  if is_perm_query:
910
  context = _filter_permission_lines(ctx_err, max_lines=6)
911
  else:
912
- context = _filter_error_lines_by_query(ctx_err, input_data.user_message, max_lines=6)
913
  escalation_line = _extract_escalation_line(full_errors)
914
 
915
  elif detected_intent == "prereqs":
 
105
  "asn", "grn", "pick", "picking"
106
  )
107
 
108
+ # --- Generic error families (SOP-wide, reusable in gating and line selection) ---
109
+ ERROR_FAMILY_SYNS = {
110
+ "NOT_FOUND": (
111
+ "not found", "missing", "does not exist", "doesn't exist",
112
+ "unavailable", "not available", "cannot find", "no such",
113
+ "not present", "absent"
114
+ ),
115
+ "MISMATCH": (
116
+ "mismatch", "doesn't match", "does not match", "variance",
117
+ "difference", "discrepancy", "not equal"
118
+ ),
119
+ "LOCKED": (
120
+ "locked", "status locked", "blocked", "read only", "read-only", "frozen", "freeze"
121
+ ),
122
+ "PERMISSION": (
123
+ "permission", "permissions", "access denied", "not authorized",
124
+ "not authorised", "insufficient privileges", "no access",
125
+ "authorization", "authorisation"
126
+ ),
127
+ "TIMEOUT": (
128
+ "timeout", "timed out", "network", "connection",
129
+ "unable to connect", "disconnected", "no network"
130
+ ),
131
+ "SYNC": (
132
+ "sync", "synchronization", "synchronisation", "replication",
133
+ "refresh", "out of sync", "stale", "delay", "lag"
134
+ ),
135
+ }
136
+
137
+ def _detect_error_families(msg: str) -> list:
138
+ """Return matching error family names found in the message (generic across SOPs)."""
139
+ low = (msg or "").lower()
140
+ import re
141
+ low_norm = re.sub(r"[^\w\s]", " ", low)
142
+ low_norm = re.sub(r"\s+", " ", low_norm).strip()
143
+ fams = []
144
+ for fam, syns in ERROR_FAMILY_SYNS.items():
145
+ if any(s in low_norm for s in syns):
146
+ fams.append(fam)
147
+ return fams
148
+
149
  def _is_domain_status_context(msg_norm: str) -> bool:
150
  if "status locked" in msg_norm or "locked status" in msg_norm:
151
  return True
 
202
  out.append(f"{marker} {seg}")
203
  return "\n".join(out)
204
 
205
+ def _filter_error_lines_by_query(text: str, query: str, max_lines: int = 1) -> str:
 
206
  """
207
+ Pick the most relevant 'Common Errors & Resolution' bullet(s) for the user's message.
208
+ Generic (SOP-agnostic) scoring:
 
 
209
  1) error-family match (NOT_FOUND/MISMATCH/LOCKED/PERMISSION/TIMEOUT/SYNC),
210
+ 2) anchored starts (line begins with error heading),
211
  3) multi-word overlap (bigrams/trigrams),
212
  4) token overlap,
213
+ 5) formatting bonus for bullets/headings.
214
 
215
+ Returns exactly `max_lines` best-scoring lines (defaults to 1).
216
  """
217
 
218
  import re
219
  from typing import List, Tuple
220
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
  def _norm(s: str) -> str:
222
  s = (s or "").lower()
223
  s = re.sub(r"[^\w\s]", " ", s)
224
  s = re.sub(r"\s+", " ", s).strip()
225
  return s
226
 
 
 
 
 
 
 
 
 
 
 
227
  def _ngrams(tokens: List[str], n: int) -> List[str]:
228
  return [" ".join(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]
229
 
230
+ def _families_for(s: str) -> set:
231
+ low = _norm(s)
232
+ fams = set()
233
+ for fam, syns in ERROR_FAMILY_SYNS.items():
234
+ if any(k in low for k in syns):
235
+ fams.add(fam)
236
+ return fams
237
+
238
  q = _norm(query)
239
  q_tokens = [t for t in q.split() if len(t) > 1]
240
  q_bi = _ngrams(q_tokens, 2)
241
  q_tri = _ngrams(q_tokens, 3)
242
+ q_fams = _families_for(query)
243
 
 
244
  lines = _normalize_lines(text)
245
  if not lines:
246
  return (text or "").strip()
 
248
  scored: List[Tuple[float, str]] = []
249
  for ln in lines:
250
  ln_norm = _norm(ln)
251
+ ln_fams = _families_for(ln)
252
 
253
+ fam_overlap = len(q_fams & ln_fams) # strong signal
254
+ anchored = 0.0
 
 
 
 
 
 
 
255
  first2 = " ".join(q_tokens[:2]) if len(q_tokens) >= 2 else ""
256
  first3 = " ".join(q_tokens[:3]) if len(q_tokens) >= 3 else ""
257
+ if (first3 and ln_norm.startswith(first3)) or (first2 and ln_norm.startswith(first2)):
258
+ anchored = 1.0
259
 
 
260
  bigram_hits = sum(1 for bg in q_bi if bg and bg in ln_norm)
261
  trigram_hits = sum(1 for tg in q_tri if tg and tg in ln_norm)
 
 
262
  token_overlap = sum(1 for t in q_tokens if t and t in ln_norm)
263
+ exact_phrase = 1.0 if (q and q in ln_norm) else 0.0
264
 
265
+ # Composite score (tuned generically)
266
  score = (
267
+ 1.70 * fam_overlap +
268
+ 1.00 * anchored +
269
  0.80 * trigram_hits +
270
  0.55 * bigram_hits +
271
+ 0.40 * exact_phrase +
272
  0.30 * token_overlap
273
  )
274
 
275
+ if re.match(r"^\s*[\-\*\u2022]\s*", ln): # bullet
 
276
  score += 0.10
 
277
  heading = ln_norm.split(":")[0].strip()
278
  if heading and (heading in q or (first2 and first2 in heading)):
279
  score += 0.15
280
 
281
  scored.append((score, ln))
282
 
 
283
  scored.sort(key=lambda x: x[0], reverse=True)
284
  top = [ln for s, ln in scored[:max_lines] if s > 0.0]
285
 
 
286
  if not top:
287
  top = lines[:max_lines]
288
 
 
857
 
858
  # Bypass gate when strong steps signals are present for Receiving module
859
  strong_steps_bypass = looks_like_steps_query and looks_like_receiving
860
+ strong_error_signal = len(_detect_error_families(msg_low)) > 0
861
+ if (weak_domain_only or (low_context_hit and not combined_ok)) and not strong_steps_bypass and not strong_error_signal:
862
  return {
863
  "bot_response": _build_clarifying_message(),
864
  "status": "NO_KB_MATCH",
 
903
  if is_perm_query:
904
  context = _filter_permission_lines(ctx_err, max_lines=6)
905
  else:
906
+ context = _filter_error_lines_by_query(ctx_err, input_data.user_message, max_lines=1)
907
  escalation_line = _extract_escalation_line(full_errors)
908
 
909
  elif detected_intent == "prereqs":