sofzcc commited on
Commit
bf0ef35
·
verified ·
1 Parent(s): 3d379e2

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +89 -110
app.py CHANGED
@@ -1,21 +1,22 @@
1
  import os
2
  import re
3
  import json
4
- import time
5
  from pathlib import Path
6
  from typing import List, Dict, Tuple
7
 
8
  import numpy as np
9
  import faiss
10
-
11
  import gradio as gr
 
12
  from transformers import pipeline, AutoTokenizer, AutoModelForQuestionAnswering
13
  from sentence_transformers import SentenceTransformer
14
 
 
15
  KB_DIR = Path("./kb")
16
  INDEX_DIR = Path("./.index")
17
  INDEX_DIR.mkdir(exist_ok=True, parents=True)
18
 
 
19
  EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
20
  READER_MODEL_NAME = "deepset/roberta-base-squad2"
21
 
@@ -23,39 +24,22 @@ EMBEDDINGS_PATH = INDEX_DIR / "kb_embeddings.npy"
23
  METADATA_PATH = INDEX_DIR / "kb_metadata.json"
24
  FAISS_PATH = INDEX_DIR / "kb_faiss.index"
25
 
26
-
27
- # ---------------------------
28
- # Utilities: Markdown loading
29
- # ---------------------------
30
-
31
  HEADING_RE = re.compile(r"^(#{1,6})\s+(.*)$", re.MULTILINE)
32
 
 
33
  def read_markdown_files(kb_dir: Path) -> List[Dict]:
34
  docs = []
35
  for md_path in sorted(kb_dir.glob("*.md")):
36
  text = md_path.read_text(encoding="utf-8", errors="ignore")
37
  title = md_path.stem.replace("_", " ").title()
38
- # Try first H1 as title if present
39
  m = re.search(r"^#\s+(.*)$", text, flags=re.MULTILINE)
40
  if m:
41
  title = m.group(1).strip()
42
-
43
- docs.append({
44
- "filepath": str(md_path),
45
- "filename": md_path.name,
46
- "title": title,
47
- "text": text
48
- })
49
  return docs
50
 
51
-
52
  def chunk_markdown(doc: Dict, chunk_chars: int = 1200, overlap: int = 150) -> List[Dict]:
53
- """
54
- Simple header-aware chunking: split by H2/H3 when possible and then by char length.
55
- Stores anchor-ish metadata for basic citations.
56
- """
57
  text = doc["text"]
58
- # Split by H2/H3 as sections (fallback to entire text)
59
  sections = re.split(r"(?=^##\s+|\n##\s+|\n###\s+|^###\s+)", text, flags=re.MULTILINE)
60
  if len(sections) == 1:
61
  sections = [text]
@@ -65,12 +49,9 @@ def chunk_markdown(doc: Dict, chunk_chars: int = 1200, overlap: int = 150) -> Li
65
  sec = sec.strip()
66
  if not sec:
67
  continue
68
-
69
- # Derive a section heading for citation
70
  heading_match = HEADING_RE.search(sec)
71
  section_heading = heading_match.group(2).strip() if heading_match else doc["title"]
72
 
73
- # Hard wrap into chunks
74
  start = 0
75
  while start < len(sec):
76
  end = min(start + chunk_chars, len(sec))
@@ -83,19 +64,12 @@ def chunk_markdown(doc: Dict, chunk_chars: int = 1200, overlap: int = 150) -> Li
83
  "section": section_heading,
84
  "content": chunk_text
85
  })
86
- start = end - overlap if end - overlap > 0 else end
87
- if start < 0:
88
- start = 0
89
  if end == len(sec):
90
  break
91
-
92
  return chunks
93
 
94
-
95
- # ---------------------------
96
- # Build / Load Index
97
- # ---------------------------
98
-
99
  class KBIndex:
100
  def __init__(self):
101
  self.embedder = SentenceTransformer(EMBEDDING_MODEL_NAME)
@@ -103,29 +77,25 @@ class KBIndex:
103
  self.reader_model = AutoModelForQuestionAnswering.from_pretrained(READER_MODEL_NAME)
104
  self.reader = pipeline("question-answering", model=self.reader_model, tokenizer=self.reader_tokenizer)
105
 
106
- self.index = None # FAISS index
107
- self.embeddings = None # numpy array
108
- self.metadata = [] # list of dicts
109
 
110
  def build(self, kb_dir: Path):
111
  docs = read_markdown_files(kb_dir)
112
  if not docs:
113
- raise RuntimeError(f"No markdown files found in {kb_dir.resolve()}. Please add *.md files.")
114
 
115
- # Produce chunks
116
  all_chunks = []
117
  for d in docs:
118
  all_chunks.extend(chunk_markdown(d))
119
-
120
  texts = [c["content"] for c in all_chunks]
121
  if not texts:
122
  raise RuntimeError("No content chunks generated from KB.")
123
 
124
  embeddings = self.embedder.encode(texts, batch_size=32, convert_to_numpy=True, show_progress_bar=False)
125
- # Normalize for cosine similarity
126
  faiss.normalize_L2(embeddings)
127
 
128
- # Build FAISS index (cosine via inner product on normalized vectors)
129
  dim = embeddings.shape[1]
130
  index = faiss.IndexFlatIP(dim)
131
  index.add(embeddings)
@@ -134,7 +104,6 @@ class KBIndex:
134
  self.embeddings = embeddings
135
  self.metadata = all_chunks
136
 
137
- # Persist to disk
138
  np.save(EMBEDDINGS_PATH, embeddings)
139
  with open(METADATA_PATH, "w", encoding="utf-8") as f:
140
  json.dump(self.metadata, f, ensure_ascii=False, indent=2)
@@ -143,102 +112,54 @@ class KBIndex:
143
  def load(self):
144
  if not (EMBEDDINGS_PATH.exists() and METADATA_PATH.exists() and FAISS_PATH.exists()):
145
  return False
146
-
147
  self.embeddings = np.load(EMBEDDINGS_PATH)
148
  with open(METADATA_PATH, "r", encoding="utf-8") as f:
149
  self.metadata = json.load(f)
150
  self.index = faiss.read_index(str(FAISS_PATH))
151
  return True
152
 
153
- def rebuild_if_kb_changed(self):
154
- """
155
- Very light heuristic: if index older than newest kb file, rebuild.
156
- """
157
- kb_mtime = max([p.stat().st_mtime for p in KB_DIR.glob("*.md")] or [0])
158
- idx_mtime = min([
159
- EMBEDDINGS_PATH.stat().st_mtime if EMBEDDINGS_PATH.exists() else 0,
160
- METADATA_PATH.stat().st_mtime if METADATA_PATH.exists() else 0,
161
- FAISS_PATH.stat().st_mtime if FAISS_PATH.exists() else 0,
162
- ])
163
- if kb_mtime > idx_mtime:
164
- self.build(KB_DIR)
165
-
166
  def retrieve(self, query: str, top_k: int = 4) -> List[Tuple[int, float]]:
167
  q_emb = self.embedder.encode([query], convert_to_numpy=True)
168
  faiss.normalize_L2(q_emb)
169
  D, I = self.index.search(q_emb, top_k)
170
- indices = I[0].tolist()
171
- sims = D[0].tolist()
172
- return list(zip(indices, sims))
173
 
174
  def answer(self, question: str, retrieved: List[Tuple[int, float]]):
175
- """
176
- Use extractive QA across the top retrieved chunks; pick the best span by score.
177
- Return (answer_text, best_score, citations)
178
- """
179
- best = {"text": None, "score": -1e9, "meta": None, "ctx": None, "sim": 0.0}
180
  for idx, sim in retrieved:
181
  meta = self.metadata[idx]
182
- context = meta["content"]
183
  try:
184
- out = self.reader(question=question, context=context)
185
  except Exception:
186
  continue
187
  score = float(out.get("score", 0.0))
188
  if score > best["score"]:
189
- best = {
190
- "text": out.get("answer", "").strip(),
191
- "score": score,
192
- "meta": meta,
193
- "ctx": context,
194
- "sim": float(sim)
195
- }
196
-
197
  if not best["text"]:
198
  return None, 0.0, []
199
-
200
- # Build citations: top 2 sources from retrieved
201
  citations = []
202
  seen = set()
203
- for idx, sim in retrieved[:2]:
204
- meta = self.metadata[idx]
205
- key = (meta["filename"], meta["section"])
206
  if key in seen:
207
  continue
208
  seen.add(key)
209
- citations.append({
210
- "title": meta["doc_title"],
211
- "filename": meta["filename"],
212
- "section": meta["section"]
213
- })
214
  return best["text"], best["score"], citations
215
 
216
-
217
  kb = KBIndex()
218
 
219
  def ensure_index():
 
220
  if not kb.load():
221
  kb.build(KB_DIR)
222
- else:
223
- kb.rebuild_if_kb_changed()
224
-
225
  ensure_index()
226
 
227
-
228
- # ---------------------------
229
- # Clarify / Guardrails logic
230
- # ---------------------------
231
-
232
- def format_citations(citations: List[Dict]) -> str:
233
- if not citations:
234
- return ""
235
- lines = []
236
- for c in citations:
237
- lines.append(f"• **{c['title']}** — _{c['section']}_ (`{c['filename']}`)")
238
- return "\n".join(lines)
239
-
240
- LOW_CONF_THRESHOLD = 0.20 # reader score heuristic (0–1)
241
- LOW_SIM_THRESHOLD = 0.30 # retriever sim heuristic (cosine/IP on normalized vectors)
242
 
243
  HELPFUL_SUGGESTIONS = [
244
  ("Connect WhatsApp", "How do I connect my WhatsApp number?"),
@@ -248,24 +169,82 @@ HELPFUL_SUGGESTIONS = [
248
  ("Fix Instagram Connect", "Why can't I connect Instagram?")
249
  ]
250
 
 
 
 
 
251
 
252
  def respond(user_msg, history):
253
  user_msg = (user_msg or "").strip()
254
  if not user_msg:
255
  return "How can I help? Try: **Connect WhatsApp** or **Reset password**."
256
 
257
- # Retrieve
258
  retrieved = kb.retrieve(user_msg, top_k=4)
259
  if not retrieved:
260
- return "I couldn't find anything yet. Try rephrasing or pick a quick action below."
261
 
262
- # Answer
263
  span, score, citations = kb.answer(user_msg, retrieved)
264
-
265
- # If no span, surface top articles as fallback
266
  if not span:
267
  suggestions = "\n".join([f"- {c['title']} — _{c['section']}_" for c in citations]) or "- Try a different query."
268
  return f"I’m not fully sure. Here are the closest matches:\n\n{suggestions}"
269
 
270
- # Confidence heuristics
271
- best_sim = max(_
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import re
3
  import json
 
4
  from pathlib import Path
5
  from typing import List, Dict, Tuple
6
 
7
  import numpy as np
8
  import faiss
 
9
  import gradio as gr
10
+
11
  from transformers import pipeline, AutoTokenizer, AutoModelForQuestionAnswering
12
  from sentence_transformers import SentenceTransformer
13
 
14
+ # ----------- Paths -----------
15
  KB_DIR = Path("./kb")
16
  INDEX_DIR = Path("./.index")
17
  INDEX_DIR.mkdir(exist_ok=True, parents=True)
18
 
19
+ # ----------- Models (free) -----------
20
  EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
21
  READER_MODEL_NAME = "deepset/roberta-base-squad2"
22
 
 
24
  METADATA_PATH = INDEX_DIR / "kb_metadata.json"
25
  FAISS_PATH = INDEX_DIR / "kb_faiss.index"
26
 
 
 
 
 
 
27
  HEADING_RE = re.compile(r"^(#{1,6})\s+(.*)$", re.MULTILINE)
28
 
29
+ # ----------- Load Markdown -----------
30
  def read_markdown_files(kb_dir: Path) -> List[Dict]:
31
  docs = []
32
  for md_path in sorted(kb_dir.glob("*.md")):
33
  text = md_path.read_text(encoding="utf-8", errors="ignore")
34
  title = md_path.stem.replace("_", " ").title()
 
35
  m = re.search(r"^#\s+(.*)$", text, flags=re.MULTILINE)
36
  if m:
37
  title = m.group(1).strip()
38
+ docs.append({"filepath": str(md_path), "filename": md_path.name, "title": title, "text": text})
 
 
 
 
 
 
39
  return docs
40
 
 
41
  def chunk_markdown(doc: Dict, chunk_chars: int = 1200, overlap: int = 150) -> List[Dict]:
 
 
 
 
42
  text = doc["text"]
 
43
  sections = re.split(r"(?=^##\s+|\n##\s+|\n###\s+|^###\s+)", text, flags=re.MULTILINE)
44
  if len(sections) == 1:
45
  sections = [text]
 
49
  sec = sec.strip()
50
  if not sec:
51
  continue
 
 
52
  heading_match = HEADING_RE.search(sec)
53
  section_heading = heading_match.group(2).strip() if heading_match else doc["title"]
54
 
 
55
  start = 0
56
  while start < len(sec):
57
  end = min(start + chunk_chars, len(sec))
 
64
  "section": section_heading,
65
  "content": chunk_text
66
  })
 
 
 
67
  if end == len(sec):
68
  break
69
+ start = max(0, end - overlap)
70
  return chunks
71
 
72
+ # ----------- KB Index -----------
 
 
 
 
73
  class KBIndex:
74
  def __init__(self):
75
  self.embedder = SentenceTransformer(EMBEDDING_MODEL_NAME)
 
77
  self.reader_model = AutoModelForQuestionAnswering.from_pretrained(READER_MODEL_NAME)
78
  self.reader = pipeline("question-answering", model=self.reader_model, tokenizer=self.reader_tokenizer)
79
 
80
+ self.index = None
81
+ self.embeddings = None
82
+ self.metadata = []
83
 
84
  def build(self, kb_dir: Path):
85
  docs = read_markdown_files(kb_dir)
86
  if not docs:
87
+ raise RuntimeError(f"No markdown files found in {kb_dir.resolve()}")
88
 
 
89
  all_chunks = []
90
  for d in docs:
91
  all_chunks.extend(chunk_markdown(d))
 
92
  texts = [c["content"] for c in all_chunks]
93
  if not texts:
94
  raise RuntimeError("No content chunks generated from KB.")
95
 
96
  embeddings = self.embedder.encode(texts, batch_size=32, convert_to_numpy=True, show_progress_bar=False)
 
97
  faiss.normalize_L2(embeddings)
98
 
 
99
  dim = embeddings.shape[1]
100
  index = faiss.IndexFlatIP(dim)
101
  index.add(embeddings)
 
104
  self.embeddings = embeddings
105
  self.metadata = all_chunks
106
 
 
107
  np.save(EMBEDDINGS_PATH, embeddings)
108
  with open(METADATA_PATH, "w", encoding="utf-8") as f:
109
  json.dump(self.metadata, f, ensure_ascii=False, indent=2)
 
112
  def load(self):
113
  if not (EMBEDDINGS_PATH.exists() and METADATA_PATH.exists() and FAISS_PATH.exists()):
114
  return False
 
115
  self.embeddings = np.load(EMBEDDINGS_PATH)
116
  with open(METADATA_PATH, "r", encoding="utf-8") as f:
117
  self.metadata = json.load(f)
118
  self.index = faiss.read_index(str(FAISS_PATH))
119
  return True
120
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
  def retrieve(self, query: str, top_k: int = 4) -> List[Tuple[int, float]]:
122
  q_emb = self.embedder.encode([query], convert_to_numpy=True)
123
  faiss.normalize_L2(q_emb)
124
  D, I = self.index.search(q_emb, top_k)
125
+ return list(zip(I[0].tolist(), D[0].tolist()))
 
 
126
 
127
  def answer(self, question: str, retrieved: List[Tuple[int, float]]):
128
+ best = {"text": None, "score": -1e9, "meta": None, "sim": 0.0}
 
 
 
 
129
  for idx, sim in retrieved:
130
  meta = self.metadata[idx]
131
+ ctx = meta["content"]
132
  try:
133
+ out = self.reader(question=question, context=ctx)
134
  except Exception:
135
  continue
136
  score = float(out.get("score", 0.0))
137
  if score > best["score"]:
138
+ best = {"text": out.get("answer", "").strip(), "score": score, "meta": meta, "sim": float(sim)}
 
 
 
 
 
 
 
139
  if not best["text"]:
140
  return None, 0.0, []
 
 
141
  citations = []
142
  seen = set()
143
+ for idx, _ in retrieved[:2]:
144
+ m = self.metadata[idx]
145
+ key = (m["filename"], m["section"])
146
  if key in seen:
147
  continue
148
  seen.add(key)
149
+ citations.append({"title": m["doc_title"], "filename": m["filename"], "section": m["section"]})
 
 
 
 
150
  return best["text"], best["score"], citations
151
 
 
152
  kb = KBIndex()
153
 
154
  def ensure_index():
155
+ # Build on first run in Space; load if cached
156
  if not kb.load():
157
  kb.build(KB_DIR)
 
 
 
158
  ensure_index()
159
 
160
+ # ----------- Guardrails -----------
161
+ LOW_CONF_THRESHOLD = 0.20
162
+ LOW_SIM_THRESHOLD = 0.30
 
 
 
 
 
 
 
 
 
 
 
 
163
 
164
  HELPFUL_SUGGESTIONS = [
165
  ("Connect WhatsApp", "How do I connect my WhatsApp number?"),
 
169
  ("Fix Instagram Connect", "Why can't I connect Instagram?")
170
  ]
171
 
172
+ def format_citations(citations: List[Dict]) -> str:
173
+ if not citations:
174
+ return ""
175
+ return "\n".join([f"• **{c['title']}** — _{c['section']}_ (`{c['filename']}`)" for c in citations])
176
 
177
  def respond(user_msg, history):
178
  user_msg = (user_msg or "").strip()
179
  if not user_msg:
180
  return "How can I help? Try: **Connect WhatsApp** or **Reset password**."
181
 
 
182
  retrieved = kb.retrieve(user_msg, top_k=4)
183
  if not retrieved:
184
+ return "I couldnt find anything yet. Try rephrasing or pick a quick action below."
185
 
 
186
  span, score, citations = kb.answer(user_msg, retrieved)
 
 
187
  if not span:
188
  suggestions = "\n".join([f"- {c['title']} — _{c['section']}_" for c in citations]) or "- Try a different query."
189
  return f"I’m not fully sure. Here are the closest matches:\n\n{suggestions}"
190
 
191
+ best_sim = max([s for _, s in retrieved]) if retrieved else 0.0
192
+ low_conf = (score < LOW_CONF_THRESHOLD) or (best_sim < LOW_SIM_THRESHOLD)
193
+ citations_md = format_citations(citations)
194
+ base_answer = span if len(span) > 3 else "I found a relevant section. Opening the steps in the cited article."
195
+
196
+ if low_conf:
197
+ return (
198
+ f"{base_answer}\n\n---\n**I may be uncertain.** Here are relevant articles:\n{citations_md}\n\n"
199
+ f"If this doesn’t solve it, ask me to *escalate to human support*."
200
+ )
201
+
202
+ return f"{base_answer}\n\n---\n**Sources:**\n{citations_md}\n_Tip: Say **show full steps** for more detail._"
203
+
204
+ def quick_intent(label):
205
+ for l, q in HELPFUL_SUGGESTIONS:
206
+ if l == label:
207
+ return q
208
+ return ""
209
+
210
+ def rebuild_index():
211
+ kb.build(KB_DIR)
212
+ return gr.update(value="✅ Index rebuilt from KB.")
213
+
214
+ # ----------- Gradio UI -----------
215
+ with gr.Blocks(title="Self-Service KB Assistant", fill_height=True) as demo:
216
+ gr.Markdown(
217
+ """
218
+ # 🧩 Self-Service Knowledge Assistant
219
+ Ask about setup, automations, billing, or troubleshooting.
220
+ The assistant answers **only from the Knowledge Base** and cites the articles it used.
221
+
222
+ **Quick actions:** Try one of the buttons below.
223
+ """
224
+ )
225
+
226
+ with gr.Row():
227
+ outputs = []
228
+ for label, _ in HELPFUL_SUGGESTIONS:
229
+ btn = gr.Button(label)
230
+ btn.click(fn=lambda L=label: quick_intent(L), outputs=None)\
231
+ .then(fn=respond, inputs=[gr.State(quick_intent(label)), gr.State([])], outputs=gr.Chatbot())
232
+ outputs.append(btn)
233
+
234
+ chat = gr.ChatInterface(
235
+ fn=respond,
236
+ chatbot=gr.Chatbot(height=420, show_copy_button=True),
237
+ textbox=gr.Textbox(placeholder="e.g., How do I connect WhatsApp?"),
238
+ retry_btn="Retry",
239
+ undo_btn="Undo",
240
+ clear_btn="Clear",
241
+ )
242
+
243
+ with gr.Accordion("Admin", open=False):
244
+ gr.Markdown("Rebuild the search index after changing files in `/kb`.")
245
+ rebuild = gr.Button("Rebuild Index")
246
+ status = gr.Markdown("")
247
+ rebuild.click(fn=rebuild_index, outputs=status)
248
+
249
+ if __name__ == "__main__":
250
+ demo.launch()