srilakshu012456 commited on
Commit
08c1dac
·
verified ·
1 Parent(s): 5dd6a63

Update services/kb_creation.py

Browse files
Files changed (1) hide show
  1. services/kb_creation.py +75 -93
services/kb_creation.py CHANGED
@@ -2,6 +2,7 @@
2
  import os
3
  import re
4
  import pickle
 
5
  from typing import List, Dict, Any, Tuple, Optional
6
  from docx import Document
7
  from sentence_transformers import SentenceTransformer
@@ -46,6 +47,39 @@ def _normalize_query(q: str) -> str:
46
  def _tokenize_meta_value(val: Optional[str]) -> List[str]:
47
  return _tokenize(val or "")
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  # --------------------------- DOCX parsing & chunking ---------------------------
50
  def _split_by_sections(doc: Document) -> List[Tuple[str, List[str]]]:
51
  sections: List[Tuple[str, List[str]]] = []
@@ -71,6 +105,7 @@ def _split_by_sections(doc: Document) -> List[Tuple[str, List[str]]]:
71
  return sections
72
 
73
  def _chunk_text_with_context(doc_title: str, section_title: str, paragraphs: List[str], max_words: int = 900) -> List[str]:
 
74
  body = "\n".join(paragraphs).strip()
75
  if not body:
76
  return []
@@ -79,24 +114,11 @@ def _chunk_text_with_context(doc_title: str, section_title: str, paragraphs: Lis
79
  for i in range(0, len(words), max_words):
80
  chunk_body = ' '.join(words[i:i + max_words]).strip()
81
  if chunk_body:
82
- chunks.append(chunk_body) # no doc/section headers inside text
83
  if not chunks:
84
  chunks = [body]
85
  return chunks
86
 
87
- # --------------------------- Intent tagging (auto) ---------------------------
88
- def _infer_intent_tag(section_title: str) -> str:
89
- st = (section_title or "").lower()
90
- if any(k in st for k in ["process steps", "procedure", "how to", "workflow", "instructions"]):
91
- return "steps"
92
- if any(k in st for k in ["common errors", "resolution", "troubleshooting"]):
93
- return "errors"
94
- if any(k in st for k in ["pre-requisites", "prerequisites"]):
95
- return "prereqs"
96
- if any(k in st for k in ["purpose", "overview", "introduction"]):
97
- return "purpose"
98
- return "neutral"
99
-
100
  # --------------------------- Ingestion ---------------------------
101
  def ingest_documents(folder_path: str) -> None:
102
  print(f"📂 Checking folder: {folder_path}")
@@ -120,7 +142,16 @@ def ingest_documents(folder_path: str) -> None:
120
  for s_idx, (section_title, paras) in enumerate(sections):
121
  chunks = _chunk_text_with_context(doc_title, section_title, paras, max_words=900)
122
  total_chunks += len(chunks)
123
- intent_tag = _infer_intent_tag(section_title)
 
 
 
 
 
 
 
 
 
124
  for c_idx, chunk in enumerate(chunks):
125
  embedding = model.encode(chunk).tolist()
126
  doc_id = f"{file}:{s_idx}:{c_idx}"
@@ -130,7 +161,8 @@ def ingest_documents(folder_path: str) -> None:
130
  "chunk_index": c_idx,
131
  "title": doc_title,
132
  "collection": "SOP",
133
- "intent_tag": intent_tag, # NEW
 
134
  }
135
  try:
136
  collection.add(ids=[doc_id], embeddings=[embedding], documents=[chunk], metadatas=[meta])
@@ -141,6 +173,7 @@ def ingest_documents(folder_path: str) -> None:
141
  except Exception as e2:
142
  print(f"❌ Upsert failed for {doc_id}: {e2}")
143
 
 
144
  tokens = _tokenize(chunk)
145
  tf: Dict[str, int] = {}
146
  for t in tokens:
@@ -212,7 +245,6 @@ def _bm25_score_for_doc(query_terms: List[str], doc_idx: int) -> float:
212
  N = len(bm25_docs)
213
  idf_ratio = ((N - df + 0.5) / (df + 0.5))
214
  try:
215
- import math
216
  idf = math.log(idf_ratio + 1.0)
217
  except Exception:
218
  idf = 1.0
@@ -241,7 +273,7 @@ def bm25_search(query: str, top_k: int = 50) -> List[Tuple[int, float]]:
241
  scored.sort(key=lambda x: x[1], reverse=True)
242
  return scored[:top_k]
243
 
244
- # --------------------------- Semantic-only ---------------------------
245
  def search_knowledge_base(query: str, top_k: int = 10) -> dict:
246
  query_embedding = model.encode(query).tolist()
247
  res = collection.query(
@@ -277,45 +309,7 @@ def search_knowledge_base(query: str, top_k: int = 10) -> dict:
277
  "ids": ids,
278
  }
279
 
280
- # --------------------------- Hybrid (BM25 + Embeddings + Intent + Action) ---------------------------
281
- ACTION_SYNONYMS = {
282
- "create": ["create", "creation", "add", "new", "generate"],
283
- "update": ["update", "modify", "change", "edit"],
284
- "delete": ["delete", "remove"],
285
- "navigate": ["navigate", "go to", "open"],
286
- # NOTE: 'perform' REMOVED to avoid wrong boosts like Appointment "performed..."
287
- }
288
-
289
- def _detect_user_intent(query: str) -> str:
290
- q = (query or "").lower()
291
- if any(k in q for k in ["steps", "procedure", "how to", "navigate", "perform", "do", "process"]):
292
- return "steps"
293
- if any(k in q for k in ["error", "issue", "fail", "not working", "resolution", "fix"]):
294
- return "errors"
295
- if any(k in q for k in ["pre-requisite", "prerequisites", "requirement", "requirements"]):
296
- return "prereqs"
297
- if any(k in q for k in ["purpose", "overview", "introduction"]):
298
- return "purpose"
299
- return "neutral"
300
-
301
- def _extract_actions(query: str) -> List[str]:
302
- q = (query or "").lower()
303
- found = []
304
- for act, syns in ACTION_SYNONYMS.items():
305
- if any(s in q for s in syns):
306
- found.append(act)
307
- return found or []
308
-
309
- def _intent_weight(meta: dict, user_intent: str) -> float:
310
- tag = (meta or {}).get("intent_tag", "neutral")
311
- if user_intent == "neutral":
312
- return 0.0
313
- if tag == user_intent:
314
- return 1.0
315
- if tag in ["purpose", "prereqs"] and user_intent in ["steps", "errors"]:
316
- return -0.6
317
- return -0.2
318
-
319
  def _meta_overlap(meta: Dict[str, Any], q_terms: List[str]) -> float:
320
  fn_tokens = _tokenize_meta_value(meta.get("filename"))
321
  title_tokens = _tokenize_meta_value(meta.get("title"))
@@ -327,28 +321,10 @@ def _meta_overlap(meta: Dict[str, Any], q_terms: List[str]) -> float:
327
  inter = len(meta_tokens & qset)
328
  return inter / max(1, len(qset))
329
 
330
- def _action_weight(text: str, actions: List[str]) -> float:
331
- if not actions:
332
- return 0.0
333
- t = (text or "").lower()
334
- score = 0.0
335
- for act in actions:
336
- for syn in ACTION_SYNONYMS.get(act, [act]):
337
- if syn in t:
338
- score += 1.0
339
- conflicts = {"create": ["delete"], "delete": ["create"], "update": ["delete"], "navigate": []}
340
- for act in actions:
341
- for bad in conflicts.get(act, []):
342
- for syn in ACTION_SYNONYMS.get(bad, [bad]):
343
- if syn in t:
344
- score -= 0.8
345
- return score
346
-
347
  def hybrid_search_knowledge_base(query: str, top_k: int = 10, alpha: float = 0.6, beta: float = 0.4) -> dict:
348
  norm_query = _normalize_query(query)
349
  q_terms = _tokenize(norm_query)
350
- user_intent = _detect_user_intent(query)
351
- actions = _extract_actions(query)
352
 
353
  sem_res = search_knowledge_base(norm_query, top_k=max(top_k, 30))
354
  sem_docs = sem_res.get("documents", [])
@@ -379,11 +355,9 @@ def hybrid_search_knowledge_base(query: str, top_k: int = 10, alpha: float = 0.6
379
 
380
  union_ids = set(sem_ids) | set(bm25_id_to_norm.keys())
381
 
382
- gamma = 0.25 # meta overlap
383
- delta = 0.35 # intent boost
384
- epsilon = 0.30 # action weight
385
 
386
- combined_records_ext: List[Tuple[str, float, float, str, Dict[str, Any], float, float, float]] = []
387
  for cid in union_ids:
388
  if cid in sem_ids:
389
  pos = sem_ids.index(cid)
@@ -402,29 +376,37 @@ def hybrid_search_knowledge_base(query: str, top_k: int = 10, alpha: float = 0.6
402
  meta = sem_meta if sem_meta else bm25_meta
403
 
404
  m_overlap = _meta_overlap(meta, q_terms)
405
- intent_boost = _intent_weight(meta, user_intent)
406
- act_wt = _action_weight(text, actions)
 
 
 
 
 
 
 
 
407
 
408
- final_score = alpha * sem_sim + beta * bm25_sim + gamma * m_overlap + delta * intent_boost + epsilon * act_wt
409
 
410
  combined_records_ext.append(
411
- (cid, final_score, (sem_dist if sem_dist is not None else 999.0), text, meta, m_overlap, intent_boost, act_wt)
412
  )
413
 
 
414
  from collections import defaultdict
415
- doc_groups: Dict[str, List[Tuple[str, float, float, str, Dict[str, Any], float, float, float]]] = defaultdict(list)
416
  for rec in combined_records_ext:
417
  meta = rec[4] or {}
418
  fn = meta.get("filename", "unknown")
419
  doc_groups[fn].append(rec)
420
 
421
- def doc_prior(recs: List[Tuple[str, float, float, str, Dict[str, Any], float, float, float]]) -> float:
422
  total_score = sum(r[1] for r in recs)
423
  total_overlap = sum(r[5] for r in recs)
424
- total_intent = sum(max(0.0, r[6]) for r in recs)
425
- total_action = sum(max(0.0, r[7]) for r in recs)
426
- total_penalty = sum(min(0.0, r[6]) for r in recs) + sum(min(0.0, r[7]) for r in recs)
427
- return total_score + 0.4 * total_overlap + 0.6 * total_intent + 0.5 * total_action + 0.3 * total_penalty
428
 
429
  best_doc, best_doc_prior = None, -1.0
430
  for fn, recs in doc_groups.items():
@@ -458,10 +440,10 @@ def hybrid_search_knowledge_base(query: str, top_k: int = 10, alpha: float = 0.6
458
  "best_doc": best_doc,
459
  "best_doc_prior": best_doc_prior,
460
  "user_intent": user_intent,
461
- "actions": actions,
462
  }
463
 
464
- # --------------------------- Section fetch helpers (for full output) ---------------------------
465
  def get_section_text(filename: str, section: str) -> str:
466
  """Concatenate all chunk texts for a given filename+section."""
467
  texts: List[str] = []
@@ -484,7 +466,7 @@ def get_best_steps_section_text(filename: str) -> str:
484
  texts.append(t)
485
  return "\n\n".join(texts).strip()
486
 
487
- # --- Admin helpers (optional; unchanged) ---
488
  def get_kb_runtime_info() -> Dict[str, Any]:
489
  return {
490
  "chroma_path": CHROMA_PATH,
 
2
  import os
3
  import re
4
  import pickle
5
+ import math
6
  from typing import List, Dict, Any, Tuple, Optional
7
  from docx import Document
8
  from sentence_transformers import SentenceTransformer
 
47
  def _tokenize_meta_value(val: Optional[str]) -> List[str]:
48
  return _tokenize(val or "")
49
 
50
+ # --------------------------- Semantic intent prototypes ---------------------------
51
+ INTENT_PROTOTYPES: Dict[str, str] = {
52
+ "steps": "Step-by-step procedure with actions the user must perform",
53
+ "navigation": "Menu paths and locations in WMS, for example Navigate to Inbound > Receiving",
54
+ "errors": "Common errors and resolution tips or troubleshooting guidance",
55
+ "prereqs": "Pre-requisites, authorization, requirements before executing steps",
56
+ "purpose": "Purpose, overview, introduction that explains why something is done",
57
+ "escalation": "Escalation path or who to contact if the issue cannot be resolved",
58
+ "permission": "User lacks authorization or access denied and needs role access check",
59
+ }
60
+
61
+ # Precompute prototype embeddings once
62
+ PROTO_EMBS: Dict[str, List[float]] = {label: model.encode(text).tolist() for label, text in INTENT_PROTOTYPES.items()}
63
+
64
+ def _embed(txt: str) -> List[float]:
65
+ return model.encode((txt or "").strip()).tolist()
66
+
67
+ def _cos_sim(a: List[float], b: List[float]) -> float:
68
+ # pure-python cosine similarity
69
+ dot = sum(x * y for x, y in zip(a, b))
70
+ na = math.sqrt(sum(x * x for x in a)) + 1e-9
71
+ nb = math.sqrt(sum(y * y for y in b)) + 1e-9
72
+ return float(dot / (na * nb))
73
+
74
+ def detect_user_intent(query: str) -> Tuple[str, float]:
75
+ q_vec = _embed(query or "")
76
+ best, best_s = "neutral", 0.0
77
+ for label, proto_vec in PROTO_EMBS.items():
78
+ s = _cos_sim(q_vec, proto_vec)
79
+ if s > best_s:
80
+ best, best_s = label, s
81
+ return best, best_s # (intent label, confidence approx 0..1)
82
+
83
  # --------------------------- DOCX parsing & chunking ---------------------------
84
  def _split_by_sections(doc: Document) -> List[Tuple[str, List[str]]]:
85
  sections: List[Tuple[str, List[str]]] = []
 
105
  return sections
106
 
107
  def _chunk_text_with_context(doc_title: str, section_title: str, paragraphs: List[str], max_words: int = 900) -> List[str]:
108
+ # Store only body text (no titles/headers in chunk) so users never see SOP headers
109
  body = "\n".join(paragraphs).strip()
110
  if not body:
111
  return []
 
114
  for i in range(0, len(words), max_words):
115
  chunk_body = ' '.join(words[i:i + max_words]).strip()
116
  if chunk_body:
117
+ chunks.append(chunk_body)
118
  if not chunks:
119
  chunks = [body]
120
  return chunks
121
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  # --------------------------- Ingestion ---------------------------
123
  def ingest_documents(folder_path: str) -> None:
124
  print(f"📂 Checking folder: {folder_path}")
 
142
  for s_idx, (section_title, paras) in enumerate(sections):
143
  chunks = _chunk_text_with_context(doc_title, section_title, paras, max_words=900)
144
  total_chunks += len(chunks)
145
+
146
+ # --- Semantic section intent tagging (no keywords to maintain) ---
147
+ section_text_for_tag = (section_title or "") + "\n" + ("\n".join(paras[:6]) if paras else "")
148
+ sec_vec = _embed(section_text_for_tag)
149
+ best_intent, best_score = "neutral", 0.0
150
+ for label, proto_vec in PROTO_EMBS.items():
151
+ s = _cos_sim(sec_vec, proto_vec)
152
+ if s > best_score:
153
+ best_intent, best_score = label, s
154
+
155
  for c_idx, chunk in enumerate(chunks):
156
  embedding = model.encode(chunk).tolist()
157
  doc_id = f"{file}:{s_idx}:{c_idx}"
 
161
  "chunk_index": c_idx,
162
  "title": doc_title,
163
  "collection": "SOP",
164
+ "intent_tag": best_intent,
165
+ "intent_score": best_score,
166
  }
167
  try:
168
  collection.add(ids=[doc_id], embeddings=[embedding], documents=[chunk], metadatas=[meta])
 
173
  except Exception as e2:
174
  print(f"❌ Upsert failed for {doc_id}: {e2}")
175
 
176
+ # BM25 indexing
177
  tokens = _tokenize(chunk)
178
  tf: Dict[str, int] = {}
179
  for t in tokens:
 
245
  N = len(bm25_docs)
246
  idf_ratio = ((N - df + 0.5) / (df + 0.5))
247
  try:
 
248
  idf = math.log(idf_ratio + 1.0)
249
  except Exception:
250
  idf = 1.0
 
273
  scored.sort(key=lambda x: x[1], reverse=True)
274
  return scored[:top_k]
275
 
276
+ # --------------------------- Semantic-only (Chroma) ---------------------------
277
  def search_knowledge_base(query: str, top_k: int = 10) -> dict:
278
  query_embedding = model.encode(query).tolist()
279
  res = collection.query(
 
309
  "ids": ids,
310
  }
311
 
312
+ # --------------------------- Hybrid (BM25 + Embeddings + Semantic Intent) ---------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
313
  def _meta_overlap(meta: Dict[str, Any], q_terms: List[str]) -> float:
314
  fn_tokens = _tokenize_meta_value(meta.get("filename"))
315
  title_tokens = _tokenize_meta_value(meta.get("title"))
 
321
  inter = len(meta_tokens & qset)
322
  return inter / max(1, len(qset))
323
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
324
  def hybrid_search_knowledge_base(query: str, top_k: int = 10, alpha: float = 0.6, beta: float = 0.4) -> dict:
325
  norm_query = _normalize_query(query)
326
  q_terms = _tokenize(norm_query)
327
+ user_intent, intent_conf = detect_user_intent(query) # semantic
 
328
 
329
  sem_res = search_knowledge_base(norm_query, top_k=max(top_k, 30))
330
  sem_docs = sem_res.get("documents", [])
 
355
 
356
  union_ids = set(sem_ids) | set(bm25_id_to_norm.keys())
357
 
358
+ gamma = 0.25 # metadata overlap weight
359
+ combined_records_ext: List[Tuple[str, float, float, str, Dict[str, Any], float, float]] = [] # id, score, dist, text, meta, overlap, intentBoost
 
360
 
 
361
  for cid in union_ids:
362
  if cid in sem_ids:
363
  pos = sem_ids.index(cid)
 
376
  meta = sem_meta if sem_meta else bm25_meta
377
 
378
  m_overlap = _meta_overlap(meta, q_terms)
379
+ tag = (meta or {}).get("intent_tag", "neutral")
380
+ tag_conf = float((meta or {}).get("intent_score", 0.0))
381
+
382
+ # Semantic intent boost (no keyword list)
383
+ intent_boost = 0.0
384
+ if user_intent != "neutral":
385
+ if tag == user_intent:
386
+ intent_boost = 0.7 * (0.5 + 0.5 * tag_conf) # stronger if section is confidently tagged
387
+ elif tag_conf > 0.4:
388
+ intent_boost = -0.3 * tag_conf # soft penalty if clearly different and confident
389
 
390
+ final_score = alpha * sem_sim + beta * bm25_sim + gamma * m_overlap + intent_boost
391
 
392
  combined_records_ext.append(
393
+ (cid, final_score, (sem_dist if sem_dist is not None else 999.0), text, meta, m_overlap, intent_boost)
394
  )
395
 
396
+ # ---------------- Document-level voting prior ----------------
397
  from collections import defaultdict
398
+ doc_groups: Dict[str, List[Tuple[str, float, float, str, Dict[str, Any], float, float]]] = defaultdict(list)
399
  for rec in combined_records_ext:
400
  meta = rec[4] or {}
401
  fn = meta.get("filename", "unknown")
402
  doc_groups[fn].append(rec)
403
 
404
+ def doc_prior(recs: List[Tuple[str, float, float, str, Dict[str, Any], float, float]]) -> float:
405
  total_score = sum(r[1] for r in recs)
406
  total_overlap = sum(r[5] for r in recs)
407
+ total_intent = sum(max(0.0, r[6]) for r in recs) # positive boosts
408
+ total_penalty = sum(min(0.0, r[6]) for r in recs) # penalties
409
+ return total_score + 0.4 * total_overlap + 0.6 * total_intent + 0.3 * total_penalty
 
410
 
411
  best_doc, best_doc_prior = None, -1.0
412
  for fn, recs in doc_groups.items():
 
440
  "best_doc": best_doc,
441
  "best_doc_prior": best_doc_prior,
442
  "user_intent": user_intent,
443
+ "user_intent_conf": intent_conf,
444
  }
445
 
446
+ # --------------------------- Section fetch helpers ---------------------------
447
  def get_section_text(filename: str, section: str) -> str:
448
  """Concatenate all chunk texts for a given filename+section."""
449
  texts: List[str] = []
 
466
  texts.append(t)
467
  return "\n\n".join(texts).strip()
468
 
469
+ # --------------------------- Admin helpers ---------------------------
470
  def get_kb_runtime_info() -> Dict[str, Any]:
471
  return {
472
  "chroma_path": CHROMA_PATH,