srilakshu012456 commited on
Commit
84e70a3
·
verified ·
1 Parent(s): 16cbfa8

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +54 -88
main.py CHANGED
@@ -264,7 +264,7 @@ def _pick_default_action_section_with_preference(best_doc: str, prefer_action: O
264
  return t
265
  return sections[0] if sections else None
266
 
267
- # ------------------------------ Action -> section selector (optional fallback by title) ------------------------------
268
  ACTION_SECTION_KEYS = {
269
  "create": ("create", "creation", "appointment creation", "new appointment", "book", "schedule"),
270
  "update": ("update", "updation", "reschedule", "change", "modify", "edit"),
@@ -318,7 +318,7 @@ def _find_save_lines_in_section(section_text: str, max_lines: int = 2) -> str:
318
  break
319
  return "\n".join(lines)
320
 
321
- # ------------------------------ Generic boundary cutter (metadata-driven) ------------------------------
322
  def _build_doc_section_index(best_doc: str) -> Dict[str, Optional[str]]:
323
  """
324
  Build a dictionary for the given doc:
@@ -345,19 +345,9 @@ def _cut_at_next_boundary_generic(section_text: str, best_doc: str, current_acti
345
  if not (section_text or "").strip():
346
  return section_text
347
 
348
- # Build metadata-based index: {lower(section_title): lower(action_tag or None)}
349
- index: Dict[str, Optional[str]] = {}
350
- for d in bm25_docs:
351
- m = d.get("meta", {}) or {}
352
- if m.get("filename") == best_doc and m.get("intent_tag") == "steps":
353
- sec = (m.get("section") or "").strip()
354
- tag = (m.get("action_tag") or "").strip().lower() or None
355
- if sec:
356
- index[sec.lower()] = tag
357
-
358
  known_headings = set(index.keys())
359
 
360
- # Generic action families (no SOP-specific words)
361
  ACTION_FAMILIES = {
362
  "create": ("create", "creation", "new"),
363
  "update": ("update", "updation", "reschedule", "edit", "modify", "change"),
@@ -378,7 +368,6 @@ def _cut_at_next_boundary_generic(section_text: str, best_doc: str, current_acti
378
  return True
379
  if any(h in line_low for h in known_headings):
380
  return True
381
- # Simple title-style heuristic
382
  if len(raw_line.strip()) <= 140:
383
  words = re.findall(r"[A-Za-z][A-Za-z]+", raw_line)
384
  cap_ratio = sum(1 for w in words if (w[0].isupper() or w.isupper())) / (len(words) or 1)
@@ -392,7 +381,7 @@ def _cut_at_next_boundary_generic(section_text: str, best_doc: str, current_acti
392
  for ln in lines:
393
  low = ln.lower().strip()
394
 
395
- # 1) Metadata heading boundary (best case)
396
  matched_heading = None
397
  for h in known_headings:
398
  if h in low:
@@ -407,7 +396,6 @@ def _cut_at_next_boundary_generic(section_text: str, best_doc: str, current_acti
407
  # 2) Generic action boundary (works even if visible text != metadata title)
408
  fam = detect_action_family_in_line(low)
409
  if current_action and fam and fam != current_action:
410
- # treat heading-like OR numbered lines as boundaries
411
  if is_heading_like(ln, low) or STEP_PREFIX_RX.match(ln):
412
  break
413
 
@@ -559,78 +547,56 @@ def _format_steps_as_numbered(steps: list) -> str:
559
  out.append(f"{circled.get(i, str(i))} {s}")
560
  return "\n".join(out)
561
 
562
- # ------------------------------ Error lines helpers ------------------------------
563
- def _filter_error_lines_by_query(text: str, query: str, max_lines: int = 1) -> str:
564
- def _norm(s: str) -> str:
565
- s = (s or "").lower()
566
- s = re.sub(r"[^\w\s]", " ", s)
567
- s = re.sub(r"\s+", " ", s).strip()
568
- return s
569
- def _ngrams(tokens: List[str], n: int) -> List[str]:
570
- return [" ".join(tokens[i:i + n]) for i in range(len(tokens) - n + 1)]
571
- def _families_for(s: str) -> set:
572
- low = _norm(s)
573
- fams = set()
574
- for fam, syns in ERROR_FAMILY_SYNS.items():
575
- if any(k in low for k in syns):
576
- fams.add(fam)
577
- return fams
578
-
579
- q = _norm(query)
580
- q_tokens = [t for t in q.split() if len(t) > 1]
581
- q_bi = _ngrams(q_tokens, 2)
582
- q_tri = _ngrams(q_tokens, 3)
583
- q_fams = _families_for(query)
584
-
585
- lines = _normalize_lines(text)
586
- if not lines:
587
- return (text or "").strip()
588
- scored: List[Tuple[float, str]] = []
589
- for ln in lines:
590
- ln_norm = _norm(ln)
591
- ln_fams = _families_for(ln)
592
- fam_overlap = len(q_fams & ln_fams)
593
- anchored = 0.0
594
- first2 = " ".join(q_tokens[:2]) if len(q_tokens) >= 2 else ""
595
- first3 = " ".join(q_tokens[:3]) if len(q_tokens) >= 3 else ""
596
- if (first3 and ln_norm.startswith(first3)) or (first2 and ln_norm.startswith(first2)):
597
- anchored = 1.0
598
- bigram_hits = sum(1 for bg in q_bi if bg and bg in ln_norm)
599
- trigram_hits = sum(1 for tg in q_tri if tg and tg in ln_norm)
600
- token_overlap = sum(1 for t in q_tokens if t and t in ln_norm)
601
- exact_phrase = 1.0 if (q and q in ln_norm) else 0.0
602
-
603
- score = (
604
- 1.70 * fam_overlap +
605
- 1.00 * anchored +
606
- 0.80 * trigram_hits +
607
- 0.55 * bigram_hits +
608
- 0.40 * exact_phrase +
609
- 0.30 * token_overlap
610
- )
611
- if re.match(r"^\s*[-*\u2022]\s*", ln):
612
- score += 0.10
613
- heading = ln_norm.split(":")[0].strip()
614
- if heading and (heading in q or (first2 and first2 in heading)):
615
- score += 0.15
616
- scored.append((score, ln))
617
-
618
- scored.sort(key=lambda x: x[0], reverse=True)
619
- top = [ln for s, ln in scored[:max_lines] if s > 0.0]
620
- if not top:
621
- top = lines[:max_lines]
622
- return "\n".join(top).strip()
623
-
624
- def _friendly_permission_reply(raw: str) -> str:
625
- line = (raw or "").strip()
626
- line = re.sub(r"^\s*[-*\u2022]\s*", "", line)
627
- if not line:
628
- return "It looks like you may not have access for this action. Please verify your WMS role/permission with your supervisor or IT."
629
- if "verify role access" in line.lower():
630
- return "It looks like you may not have access for this action. Please verify your WMS role/permission with your supervisor or IT."
631
- if ("permission" in line.lower()) or ("access" in line.lower()) or ("authorization" in line.lower()):
632
- return f"It seems to be an access issue: {line}. Please check your role mapping or request access."
633
- return line
634
 
635
  # ------------------------------ Language hint ------------------------------
636
  def _detect_language_hint(msg: str) -> Optional[str]:
 
264
  return t
265
  return sections[0] if sections else None
266
 
267
+ # ------------------------------ Optional title-based fallback ------------------------------
268
  ACTION_SECTION_KEYS = {
269
  "create": ("create", "creation", "appointment creation", "new appointment", "book", "schedule"),
270
  "update": ("update", "updation", "reschedule", "change", "modify", "edit"),
 
318
  break
319
  return "\n".join(lines)
320
 
321
+ # ------------------------------ Generic boundary cutter (metadata + action-family) ------------------------------
322
  def _build_doc_section_index(best_doc: str) -> Dict[str, Optional[str]]:
323
  """
324
  Build a dictionary for the given doc:
 
345
  if not (section_text or "").strip():
346
  return section_text
347
 
348
+ index = _build_doc_section_index(best_doc) # {lower(section_title): action_tag}
 
 
 
 
 
 
 
 
 
349
  known_headings = set(index.keys())
350
 
 
351
  ACTION_FAMILIES = {
352
  "create": ("create", "creation", "new"),
353
  "update": ("update", "updation", "reschedule", "edit", "modify", "change"),
 
368
  return True
369
  if any(h in line_low for h in known_headings):
370
  return True
 
371
  if len(raw_line.strip()) <= 140:
372
  words = re.findall(r"[A-Za-z][A-Za-z]+", raw_line)
373
  cap_ratio = sum(1 for w in words if (w[0].isupper() or w.isupper())) / (len(words) or 1)
 
381
  for ln in lines:
382
  low = ln.lower().strip()
383
 
384
+ # 1) Metadata heading boundary
385
  matched_heading = None
386
  for h in known_headings:
387
  if h in low:
 
396
  # 2) Generic action boundary (works even if visible text != metadata title)
397
  fam = detect_action_family_in_line(low)
398
  if current_action and fam and fam != current_action:
 
399
  if is_heading_like(ln, low) or STEP_PREFIX_RX.match(ln):
400
  break
401
 
 
547
  out.append(f"{circled.get(i, str(i))} {s}")
548
  return "\n".join(out)
549
 
550
+ # ------------------------------ Context filter (ensure defined before /chat) ------------------------------
551
+ def _filter_context_for_query(context: str, query: str) -> Tuple[str, Dict[str, Any]]:
552
+ """
553
+ Keep only the most relevant sentences from the KB context for the query.
554
+ Returns (filtered_text, info_dict).
555
+ """
556
+ STRICT_OVERLAP = 3
557
+ MAX_SENTENCES_STRICT = 4
558
+ MAX_SENTENCES_CONCISE = 3
559
+
560
+ def _norm(text: str) -> str:
561
+ t = (text or "").lower()
562
+ t = re.sub(r"[^\w\s]", " ", t)
563
+ t = re.sub(r"\s+", " ", t).strip()
564
+ return t
565
+
566
+ def _split_sentences(ctx: str) -> List[str]:
567
+ raw_sents = re.split(r"(?<=[.!?])\s+|\n+|-\s*|\*\s*", ctx or "")
568
+ return [s.strip() for s in raw_sents if s and len(s.strip()) > 2]
569
+
570
+ ctx = (context or "").strip()
571
+ if not ctx or not query:
572
+ return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0}
573
+
574
+ q_norm = _norm(query)
575
+ q_terms = [t for t in q_norm.split() if len(t) > 2]
576
+ if not q_terms:
577
+ return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0}
578
+
579
+ sentences = _split_sentences(ctx)
580
+ matched_exact, matched_any = [], []
581
+ for s in sentences:
582
+ s_norm = _norm(s)
583
+ is_bullet = bool(re.match(r"^[\-\*]\s*", s))
584
+ overlap = sum(1 for t in q_terms if t in s_norm) + (1 if is_bullet else 0)
585
+ if overlap >= STRICT_OVERLAP:
586
+ matched_exact.append(s)
587
+ elif overlap > 0:
588
+ matched_any.append(s)
589
+
590
+ if matched_exact:
591
+ kept = matched_exact[:MAX_SENTENCES_STRICT]
592
+ return "\n".join(kept).strip(), {'mode': 'exact', 'matched_count': len(kept), 'all_sentences': len(sentences)}
593
+
594
+ if matched_any:
595
+ kept = matched_any[:MAX_SENTENCES_CONCISE]
596
+ return "\n".join(kept).strip(), {'mode': 'concise', 'matched_count': len(kept), 'all_sentences': len(sentences)}
597
+
598
+ kept = sentences[:MAX_SENTENCES_CONCISE]
599
+ return "\n".join(kept).strip(), {'mode': 'concise', 'matched_count': 0, 'all_sentences': len(sentences)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
600
 
601
  # ------------------------------ Language hint ------------------------------
602
  def _detect_language_hint(msg: str) -> Optional[str]: