sofzcc commited on
Commit
a7b58e6
·
verified ·
1 Parent(s): 2e97c72

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +194 -675
app.py CHANGED
@@ -1,697 +1,216 @@
1
  import os
2
- import re
3
- import json
4
- import yaml
5
- import argparse
6
- from pathlib import Path
7
- from typing import List, Dict, Tuple, Optional
8
 
9
- import numpy as np
10
- import faiss
11
  import gradio as gr
12
-
13
- from transformers import pipeline, AutoTokenizer, AutoModelForQuestionAnswering
14
  from sentence_transformers import SentenceTransformer
15
- import PyPDF2
16
- import docx
17
-
18
- # ----------- Configuration Loader -----------
19
- class Config:
20
- """Load and manage configuration from YAML file."""
21
-
22
- def __init__(self, config_path: str = "config.yaml"):
23
- with open(config_path, 'r', encoding='utf-8') as f:
24
- self.data = yaml.safe_load(f)
25
-
26
- @property
27
- def client_name(self) -> str:
28
- return self.data.get('client', {}).get('name', 'RAG Assistant')
29
-
30
- @property
31
- def client_description(self) -> str:
32
- return self.data.get('client', {}).get('description', 'AI-powered Q&A with document retrieval and citation')
33
-
34
- @property
35
- def client_logo(self) -> Optional[str]:
36
- return self.data.get('client', {}).get('logo')
37
-
38
- @property
39
- def theme_color(self) -> str:
40
- return self.data.get('client', {}).get('theme_color', 'blue')
41
-
42
- @property
43
- def kb_directory(self) -> Path:
44
- return Path(self.data.get('kb', {}).get('directory', './kb'))
45
-
46
- @property
47
- def index_directory(self) -> Path:
48
- return Path(self.data.get('kb', {}).get('index_directory', './.index'))
49
-
50
- @property
51
- def embedding_model(self) -> str:
52
- return self.data.get('models', {}).get('embedding', 'sentence-transformers/all-MiniLM-L6-v2')
53
-
54
- @property
55
- def qa_model(self) -> str:
56
- return self.data.get('models', {}).get('qa', 'deepset/roberta-base-squad2')
57
-
58
- @property
59
- def confidence_threshold(self) -> float:
60
- return self.data.get('thresholds', {}).get('confidence', 0.25)
61
-
62
- @property
63
- def similarity_threshold(self) -> float:
64
- return self.data.get('thresholds', {}).get('similarity', 0.35)
65
-
66
- @property
67
- def chunk_size(self) -> int:
68
- return self.data.get('chunking', {}).get('chunk_size', 800)
69
-
70
- @property
71
- def chunk_overlap(self) -> int:
72
- return self.data.get('chunking', {}).get('overlap', 200)
73
-
74
- @property
75
- def quick_actions(self) -> List[Tuple[str, str]]:
76
- actions = self.data.get('quick_actions', [])
77
- return [(a['label'], a['query']) for a in actions]
78
-
79
- @property
80
- def welcome_message(self) -> str:
81
- return self.data.get('messages', {}).get('welcome',
82
- '👋 How can I help? Ask me anything or use a quick action button below.')
83
-
84
- @property
85
- def no_answer_message(self) -> str:
86
- return self.data.get('messages', {}).get('no_answer',
87
- "❌ **I don't know the answer to that** but if you have any document with details I can learn about it.")
88
-
89
- @property
90
- def upload_prompt(self) -> str:
91
- return self.data.get('messages', {}).get('upload_prompt',
92
- '📤 Upload a relevant document above, and I\'ll be able to help you find the information you need!')
93
-
94
- # Global config instance
95
- config = None
96
-
97
- # ----------- Document Extraction -----------
98
- def extract_text_from_pdf(file_path: str) -> str:
99
- """Extract text from PDF file."""
100
- text = ""
101
- try:
102
- with open(file_path, 'rb') as file:
103
- pdf_reader = PyPDF2.PdfReader(file)
104
- for page in pdf_reader.pages:
105
- text += page.extract_text() + "\n"
106
- except Exception as e:
107
- raise RuntimeError(f"Error reading PDF: {str(e)}")
108
- return text
109
-
110
- def extract_text_from_docx(file_path: str) -> str:
111
- """Extract text from DOCX file."""
112
- try:
113
- doc = docx.Document(file_path)
114
- text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
115
- return text
116
- except Exception as e:
117
- raise RuntimeError(f"Error reading DOCX: {str(e)}")
118
-
119
- def extract_text_from_txt(file_path: str) -> str:
120
- """Extract text from TXT file."""
121
- try:
122
- with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
123
- return file.read()
124
- except Exception as e:
125
- raise RuntimeError(f"Error reading TXT: {str(e)}")
126
-
127
- def extract_text_from_file(file_path: str) -> Tuple[str, str]:
128
- """Extract text from uploaded file based on extension."""
129
- ext = Path(file_path).suffix.lower()
130
-
131
- if ext == '.pdf':
132
- return extract_text_from_pdf(file_path), 'PDF'
133
- elif ext == '.docx':
134
- return extract_text_from_docx(file_path), 'DOCX'
135
- elif ext in ['.txt', '.md']:
136
- return extract_text_from_txt(file_path), 'Text'
137
- else:
138
- raise ValueError(f"Unsupported file type: {ext}. Supported: .pdf, .docx, .txt, .md")
139
-
140
- # ----------- Document Processing -----------
141
- HEADING_RE = re.compile(r"^(#{1,6})\s+(.*)$", re.MULTILINE)
142
-
143
- def read_markdown_files(kb_dir: Path) -> List[Dict]:
144
- """Read all markdown files from the knowledge base directory."""
145
- docs = []
146
- for md_path in sorted(kb_dir.glob("*.md")):
147
- text = md_path.read_text(encoding="utf-8", errors="ignore")
148
- title = md_path.stem.replace("_", " ").title()
149
- m = re.search(r"^#\s+(.*)$", text, flags=re.MULTILINE)
150
- if m:
151
- title = m.group(1).strip()
152
- docs.append({
153
- "filepath": str(md_path),
154
- "filename": md_path.name,
155
- "title": title,
156
- "text": text
157
- })
158
- return docs
159
-
160
- def chunk_markdown(doc: Dict, chunk_chars: int = None, overlap: int = None) -> List[Dict]:
161
- """Split markdown document into overlapping chunks."""
162
- if chunk_chars is None:
163
- chunk_chars = config.chunk_size
164
- if overlap is None:
165
- overlap = config.chunk_overlap
166
-
167
- text = doc["text"]
168
- sections = re.split(r"(?=^##\s+|\n##\s+|\n###\s+|^###\s+)", text, flags=re.MULTILINE)
169
- if len(sections) == 1:
170
- sections = [text]
171
 
172
  chunks = []
173
- for sec in sections:
174
- sec = sec.strip()
175
- if not sec or len(sec) < 50:
176
- continue
177
-
178
- heading_match = HEADING_RE.search(sec)
179
- section_heading = heading_match.group(2).strip() if heading_match else doc["title"]
180
-
181
- start = 0
182
- while start < len(sec):
183
- end = min(start + chunk_chars, len(sec))
184
- chunk_text = sec[start:end].strip()
185
-
186
- if len(chunk_text) > 50:
187
- chunks.append({
188
- "doc_title": doc["title"],
189
- "filename": doc["filename"],
190
- "filepath": doc["filepath"],
191
- "section": section_heading,
192
- "content": chunk_text
193
- })
194
-
195
- if end == len(sec):
196
- break
197
- start = max(0, end - overlap)
198
-
199
  return chunks
200
 
201
- # ----------- KB Index -----------
202
- class KBIndex:
203
- def __init__(self):
204
- self.embedder = SentenceTransformer(config.embedding_model)
205
- self.reader_tokenizer = AutoTokenizer.from_pretrained(config.qa_model)
206
- self.reader_model = AutoModelForQuestionAnswering.from_pretrained(config.qa_model)
207
- self.reader = pipeline(
208
- "question-answering",
209
- model=self.reader_model,
210
- tokenizer=self.reader_tokenizer,
211
- max_answer_len=200,
212
- handle_impossible_answer=True
213
- )
214
 
215
- self.index = None
216
- self.embeddings = None
217
- self.metadata = []
218
- self.uploaded_file_active = False
219
-
220
- # Paths based on config
221
- self.embeddings_path = config.index_directory / "kb_embeddings.npy"
222
- self.metadata_path = config.index_directory / "kb_metadata.json"
223
- self.faiss_path = config.index_directory / "kb_faiss.index"
224
-
225
- def build(self, kb_dir: Path):
226
- """Build the FAISS index from markdown files."""
227
- docs = read_markdown_files(kb_dir)
228
- if not docs:
229
- raise RuntimeError(f"No markdown files found in {kb_dir.resolve()}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
230
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
231
  all_chunks = []
232
- for d in docs:
233
- all_chunks.extend(chunk_markdown(d))
234
-
235
- if not all_chunks:
236
- raise RuntimeError("No content chunks generated from KB.")
237
-
238
- texts = [c["content"] for c in all_chunks]
239
- embeddings = self.embedder.encode(
240
- texts,
241
- batch_size=32,
242
- convert_to_numpy=True,
243
- show_progress_bar=True
244
- )
245
- faiss.normalize_L2(embeddings)
246
 
247
- dim = embeddings.shape[1]
248
- index = faiss.IndexFlatIP(dim)
249
- index.add(embeddings)
 
250
 
251
- self.index = index
252
- self.embeddings = embeddings
253
- self.metadata = all_chunks
254
- self.uploaded_file_active = False
255
-
256
- # Ensure index directory exists
257
- config.index_directory.mkdir(exist_ok=True, parents=True)
258
-
259
- np.save(self.embeddings_path, embeddings)
260
- with open(self.metadata_path, "w", encoding="utf-8") as f:
261
- json.dump(self.metadata, f, ensure_ascii=False, indent=2)
262
- faiss.write_index(index, str(self.faiss_path))
263
-
264
- def build_from_uploaded_file(self, file_path: str, filename: str):
265
- """Build temporary index from an uploaded file."""
266
- text_content, file_type = extract_text_from_file(file_path)
267
-
268
- if not text_content or len(text_content.strip()) < 100:
269
- raise RuntimeError("File appears to be empty or too short.")
270
-
271
- doc = {
272
- "filepath": file_path,
273
- "filename": filename,
274
- "title": Path(filename).stem.replace("_", " ").title(),
275
- "text": text_content
276
- }
277
-
278
- all_chunks = chunk_markdown(doc)
279
-
280
  if not all_chunks:
281
- raise RuntimeError("Could not extract meaningful content from file.")
282
-
283
- texts = [c["content"] for c in all_chunks]
284
- embeddings = self.embedder.encode(
285
- texts,
286
- batch_size=32,
287
- convert_to_numpy=True,
288
- show_progress_bar=False
289
- )
290
- faiss.normalize_L2(embeddings)
291
-
292
- dim = embeddings.shape[1]
293
- index = faiss.IndexFlatIP(dim)
294
- index.add(embeddings)
295
-
296
- self.index = index
297
- self.embeddings = embeddings
298
- self.metadata = all_chunks
299
- self.uploaded_file_active = True
300
-
301
- return len(all_chunks), file_type
302
-
303
- def load(self) -> bool:
304
- """Load pre-built index from disk."""
305
- if not (self.embeddings_path.exists() and self.metadata_path.exists() and self.faiss_path.exists()):
306
- return False
307
- self.embeddings = np.load(self.embeddings_path)
308
- with open(self.metadata_path, "r", encoding="utf-8") as f:
309
- self.metadata = json.load(f)
310
- self.index = faiss.read_index(str(self.faiss_path))
311
- self.uploaded_file_active = False
312
- return True
313
-
314
- def retrieve(self, query: str, top_k: int = 6) -> List[Tuple[int, float]]:
315
- """Retrieve top-k most similar chunks for a query."""
316
- q_emb = self.embedder.encode([query], convert_to_numpy=True)
317
- faiss.normalize_L2(q_emb)
318
- D, I = self.index.search(q_emb, top_k)
319
- return list(zip(I[0].tolist(), D[0].tolist()))
320
-
321
- def answer(self, question: str, retrieved: List[Tuple[int, float]]) -> Tuple[Optional[str], float, List[Dict], float]:
322
- """Extract answer from retrieved chunks using QA model."""
323
- candidates = []
324
-
325
- for idx, sim in retrieved:
326
- meta = self.metadata[idx]
327
- ctx = meta["content"]
328
-
329
- try:
330
- out = self.reader(question=question, context=ctx)
331
- score = float(out.get("score", 0.0))
332
- answer_text = out.get("answer", "").strip()
333
-
334
- if answer_text and len(answer_text) > 3:
335
- expanded_answer = self._expand_answer(answer_text, ctx)
336
-
337
- candidates.append({
338
- "text": expanded_answer,
339
- "original": answer_text,
340
- "score": score,
341
- "meta": meta,
342
- "sim": float(sim),
343
- "context": ctx
344
- })
345
- except Exception as e:
346
- continue
347
-
348
- if not candidates:
349
- return None, 0.0, [], max([s for _, s in retrieved]) if retrieved else 0.0
350
-
351
- candidates.sort(key=lambda x: x["score"] * 0.7 + x["sim"] * 0.3, reverse=True)
352
- best = candidates[0]
353
-
354
- citations = []
355
- seen = set()
356
- for idx, _ in retrieved[:3]:
357
- m = self.metadata[idx]
358
- key = (m["filename"], m["section"])
359
- if key in seen:
360
- continue
361
- seen.add(key)
362
- citations.append({
363
- "title": m["doc_title"],
364
- "filename": m["filename"],
365
- "section": m["section"]
366
- })
367
-
368
- best_sim = max([s for _, s in retrieved]) if retrieved else 0.0
369
- return best["text"], best["score"], citations, best_sim
370
-
371
- def _expand_answer(self, answer: str, context: str, max_chars: int = 300) -> str:
372
- """Expand the extracted answer with surrounding context."""
373
- answer_pos = context.lower().find(answer.lower())
374
-
375
- if answer_pos == -1:
376
- return answer
377
-
378
- start = answer_pos
379
- end = answer_pos + len(answer)
380
-
381
- while start > 0 and context[start - 1] not in '.!?\n':
382
- start -= 1
383
- if answer_pos - start > max_chars // 2:
384
- break
385
-
386
- while end < len(context) and context[end] not in '.!?\n':
387
- end += 1
388
- if end - answer_pos > max_chars // 2:
389
- break
390
-
391
- if end < len(context) and context[end] in '.!?':
392
- end += 1
393
-
394
- expanded = context[start:end].strip()
395
-
396
- if len(expanded) < 50:
397
- sentences = context.split('.')
398
- for i, sent in enumerate(sentences):
399
- if answer.lower() in sent.lower():
400
- result = sent.strip()
401
- if i + 1 < len(sentences) and len(result) < 100:
402
- result += ". " + sentences[i + 1].strip()
403
- return result + ("." if not result.endswith(".") else "")
404
-
405
- return expanded
406
-
407
- # Initialize KB (will be done after config is loaded)
408
- kb = None
409
-
410
- def ensure_index():
411
- """Build index on first run or load from cache."""
412
- try:
413
- # Try to load existing index first
414
- if kb.load():
415
- print(f"✅ Loaded existing index from {config.index_directory}")
416
  return
417
- except Exception as e:
418
- print(f"⚠️ Could not load existing index: {e}")
419
-
420
- # Try to build new index if KB directory exists and has files
421
- if config.kb_directory.exists():
422
- md_files = list(config.kb_directory.glob("*.md"))
423
- if md_files:
424
- try:
425
- print(f"🔨 Building index from {len(md_files)} markdown files...")
426
- kb.build(config.kb_directory)
427
- print(f"✅ Index built successfully!")
428
- except Exception as e:
429
- print(f"⚠️ Could not build index: {e}")
430
- print(f"ℹ️ You can upload documents via the UI or add .md files to {config.kb_directory}")
431
- else:
432
- print(f"ℹ️ No markdown files found in {config.kb_directory}")
433
- print(f"ℹ️ Upload documents via the UI or add .md files to start using the knowledge base")
434
- else:
435
- print(f"ℹ️ KB directory {config.kb_directory} not found. Creating it...")
436
- config.kb_directory.mkdir(exist_ok=True, parents=True)
437
- print(f"ℹ️ Add .md files to {config.kb_directory} or upload documents via the UI")
438
-
439
- # ----------- Response Generation -----------
440
- def format_citations(citations: List[Dict]) -> str:
441
- """Format citations as markdown list."""
442
- if not citations:
443
- return ""
444
- lines = []
445
- for c in citations:
446
- lines.append(f"• **{c['title']}** — _{c['section']}_")
447
- return "\n".join(lines)
448
-
449
- def respond(user_msg: str, history: List, uploaded_file_info: str = None) -> str:
450
- """Generate response to user query using RAG pipeline."""
451
- user_msg = (user_msg or "").strip()
452
-
453
- if not user_msg:
454
- return config.welcome_message
455
-
456
- if kb.index is None or len(kb.metadata) == 0:
457
- return f"{config.no_answer_message}\n\n{config.upload_prompt}"
458
-
459
- source_info = f" in the uploaded file" if kb.uploaded_file_active and uploaded_file_info else " in the knowledge base"
460
-
461
- retrieved = kb.retrieve(user_msg, top_k=6)
462
-
463
- if not retrieved or (retrieved and max([s for _, s in retrieved]) < 0.20):
464
- return f"{config.no_answer_message}\n\n{config.upload_prompt}"
465
-
466
- answer, qa_score, citations, best_sim = kb.answer(user_msg, retrieved)
467
-
468
- if not answer or qa_score < 0.15 or best_sim < 0.25:
469
- return (
470
- f"{config.no_answer_message}\n\n"
471
- f"The question seems outside the scope of what I currently know{source_info}. "
472
- f"Try uploading a relevant document, or rephrase your question if you think the information might be here."
473
- )
474
 
475
- answer = answer.strip()
476
- if answer and answer[-1] not in '.!?':
477
- answer += "."
 
 
 
478
 
479
- low_confidence = (qa_score < config.confidence_threshold) or (best_sim < config.similarity_threshold)
480
- citations_md = format_citations(citations)
481
-
482
- if low_confidence:
483
- return (
484
- f"⚠️ **Answer (Low Confidence):**\n\n{answer}\n\n"
485
- f"---\n"
486
- f"📚 **Related Sources:**\n{citations_md}\n\n"
487
- f"💬 *I'm not entirely certain about this answer. If you have a more detailed document about this topic, please upload it for better accuracy.*"
488
- )
489
- else:
490
- return (
491
- f"✅ **Answer:**\n\n{answer}\n\n"
492
- f"---\n"
493
- f"📚 **Sources:**\n{citations_md}\n\n"
494
- f"💡 *Say \"show more details\" to see the full context.*"
495
- )
496
 
497
- # ----------- UI Handlers -----------
498
- def process_message(user_input: str, history: List, uploaded_file_info: str) -> Tuple[List, Dict]:
499
- """Process user message and return updated chat history."""
500
- user_input = (user_input or "").strip()
501
- if not user_input:
502
- return history, gr.update(value="")
503
-
504
- reply = respond(user_input, history or [], uploaded_file_info)
505
- new_history = (history or []) + [
506
- {"role": "user", "content": user_input},
507
- {"role": "assistant", "content": reply}
508
- ]
509
- return new_history, gr.update(value="")
510
-
511
- def process_quick(label: str, history: List, uploaded_file_info: str) -> Tuple[List, Dict]:
512
- """Process quick action button click."""
513
- for btn_label, query in config.quick_actions:
514
- if label == btn_label:
515
- return process_message(query, history, uploaded_file_info)
516
- return history, gr.update(value="")
517
-
518
- def handle_file_upload(file):
519
- """Process uploaded file and build index."""
520
- if file is None:
521
- return "ℹ️ No file uploaded.", ""
522
-
523
- try:
524
- filename = Path(file.name).name
525
- num_chunks, file_type = kb.build_from_uploaded_file(file.name, filename)
526
-
527
  return (
528
- f" **File processed successfully!**\n\n"
529
- f"📄 **File:** {filename}\n"
530
- f"📋 **Type:** {file_type}\n"
531
- f"🔢 **Chunks:** {num_chunks}\n\n"
532
- f"You can now ask questions about this document!"
533
- ), filename
534
- except Exception as e:
535
- return f"❌ **Error processing file:** {str(e)}\n\nPlease ensure the file is a valid PDF, DOCX, TXT, or MD file.", ""
536
-
537
- def clear_uploaded_file():
538
- """Clear uploaded file and reload KB index."""
539
- try:
540
- if kb.load():
541
- return "✅ Switched back to knowledge base.", "", None
542
- else:
543
- kb.index = None
544
- kb.embeddings = None
545
- kb.metadata = []
546
- kb.uploaded_file_active = False
547
- return "ℹ️ No knowledge base found. Please upload a file or build the KB index.", "", None
548
- except Exception as e:
549
- return f"⚠️ Error: {str(e)}", "", None
550
-
551
- def rebuild_index_handler():
552
- """Rebuild the search index from KB directory."""
553
- try:
554
- kb.build(config.kb_directory)
555
- return "✅ Index rebuilt successfully! Ready to answer questions."
556
- except Exception as e:
557
- return f"❌ Error rebuilding index: {str(e)}"
558
-
559
- # ----------- Gradio UI -----------
560
- def create_interface():
561
- """Create Gradio interface with configuration."""
562
-
563
- with gr.Blocks(
564
- title=config.client_name,
565
- theme=gr.themes.Soft(primary_hue=config.theme_color),
566
- css="""
567
- .contain { max-width: 1200px; margin: auto; }
568
- .quick-btn { min-width: 180px !important; }
569
- """
570
- ) as demo:
571
-
572
- uploaded_file_state = gr.State("")
573
-
574
- # Header
575
- header_text = f"# 🤖 {config.client_name}\n### {config.client_description}"
576
- if config.client_logo:
577
- header_text += f"\n![Logo]({config.client_logo})"
578
-
579
- gr.Markdown(header_text)
580
-
581
- # File upload section
582
- with gr.Row():
583
- with gr.Column(scale=1):
584
- gr.Markdown("### 📤 Upload Document")
585
- file_upload = gr.File(
586
- label="Upload PDF, DOCX, TXT, or MD file",
587
- file_types=[".pdf", ".docx", ".txt", ".md"],
588
- type="filepath"
589
- )
590
- upload_status = gr.Markdown("ℹ️ Upload a file to ask questions about it.")
591
- with gr.Row():
592
- clear_btn = gr.Button("🔄 Clear & Use KB", variant="secondary", size="sm")
593
-
594
- # Main chat interface
595
- with gr.Row():
596
- with gr.Column(scale=1):
597
- chat = gr.Chatbot(
598
- height=500,
599
- show_copy_button=True,
600
- type="messages",
601
- avatar_images=(None, "https://em-content.zobj.net/source/twitter/376/robot_1f916.png")
602
- )
603
-
604
- with gr.Row():
605
- txt = gr.Textbox(
606
- placeholder="💬 Ask a question about the document or knowledge base...",
607
- scale=9,
608
- show_label=False,
609
- container=False
610
- )
611
- send = gr.Button("Send", variant="primary", scale=1)
612
-
613
- # Quick action buttons (if configured)
614
- if config.quick_actions:
615
- with gr.Accordion("⚡ Quick Actions", open=False):
616
- with gr.Row():
617
- quick_buttons = []
618
- for label, _ in config.quick_actions:
619
- btn = gr.Button(label, elem_classes="quick-btn", size="sm")
620
- quick_buttons.append((btn, label))
621
-
622
- # Admin section
623
- with gr.Accordion("🔧 Admin Panel", open=False):
624
- gr.Markdown(
625
- """
626
- **Rebuild Index:** Use this after adding or modifying files in the `{config.kb_directory}` directory.
627
- The system will re-scan all markdown files and update the search index.
628
- """
629
- )
630
- with gr.Row():
631
- rebuild_btn = gr.Button("🔄 Rebuild KB Index", variant="secondary")
632
- status_msg = gr.Markdown("")
633
-
634
- # Event handlers
635
- file_upload.change(
636
- handle_file_upload,
637
- inputs=[file_upload],
638
- outputs=[upload_status, uploaded_file_state]
639
  )
640
-
641
- clear_btn.click(
642
- clear_uploaded_file,
643
- outputs=[upload_status, uploaded_file_state, file_upload]
644
- )
645
-
646
- send.click(
647
- process_message,
648
- inputs=[txt, chat, uploaded_file_state],
649
- outputs=[chat, txt]
650
- )
651
- txt.submit(
652
- process_message,
653
- inputs=[txt, chat, uploaded_file_state],
654
- outputs=[chat, txt]
655
- )
656
-
657
- if config.quick_actions:
658
- for btn, label in quick_buttons:
659
- btn.click(
660
- process_quick,
661
- inputs=[gr.State(label), chat, uploaded_file_state],
662
- outputs=[chat, txt]
663
- )
664
-
665
- rebuild_btn.click(rebuild_index_handler, outputs=status_msg)
666
-
667
- # Footer
668
- gr.Markdown(
669
- """
670
- ---
671
- 💡 **Tips:**
672
- - Upload a document to ask questions specifically about that file
673
- - Use "Clear & Use KB" to switch back to the knowledge base
674
- - Be specific in your questions for better results
675
- - Check the cited sources for full context
676
- """
677
- )
678
-
679
- return demo
680
 
681
- # ----------- Main Entry Point -----------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
682
  if __name__ == "__main__":
683
- parser = argparse.ArgumentParser(description='Configurable RAG Assistant')
684
- parser.add_argument('--config', type=str, default='config.yaml',
685
- help='Path to configuration YAML file (default: config.yaml)')
686
- args = parser.parse_args()
687
-
688
- # Load configuration
689
- config = Config(args.config)
690
-
691
- # Initialize KB with config
692
- kb = KBIndex()
693
- ensure_index()
694
-
695
- # Create and launch interface
696
- demo = create_interface()
697
- demo.launch()
 
1
  import os
2
+ import glob
3
+ import math
4
+ from typing import List, Tuple
 
 
 
5
 
 
 
6
  import gradio as gr
7
+ import numpy as np
 
8
  from sentence_transformers import SentenceTransformer
9
+
10
+
11
+ # -----------------------------
12
+ # CONFIG
13
+ # -----------------------------
14
+ KB_DIR = "./kb" # optional: folder with .txt or .md files
15
+ EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
16
+ TOP_K = 3 # how many chunks to show per answer
17
+ CHUNK_SIZE = 500 # characters
18
+ CHUNK_OVERLAP = 100 # characters
19
+
20
+
21
+ # -----------------------------
22
+ # UTILITIES
23
+ # -----------------------------
24
+
25
+ def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]:
26
+ """Split long text into overlapping chunks so retrieval is more precise."""
27
+ if not text:
28
+ return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
  chunks = []
31
+ start = 0
32
+ length = len(text)
33
+
34
+ while start < length:
35
+ end = min(start + chunk_size, length)
36
+ chunk = text[start:end].strip()
37
+ if chunk:
38
+ chunks.append(chunk)
39
+ start += chunk_size - overlap
40
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  return chunks
42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
 
44
+ def load_kb_texts(kb_dir: str = KB_DIR) -> List[Tuple[str, str]]:
45
+ """
46
+ Load all .txt and .md files from the KB directory.
47
+ Returns a list of (source_name, content).
48
+ """
49
+ texts = []
50
+
51
+ if os.path.isdir(kb_dir):
52
+ paths = glob.glob(os.path.join(kb_dir, "*.txt")) + glob.glob(os.path.join(kb_dir, "*.md"))
53
+ for path in paths:
54
+ try:
55
+ with open(path, "r", encoding="utf-8") as f:
56
+ content = f.read()
57
+ if content.strip():
58
+ texts.append((os.path.basename(path), content))
59
+ except Exception as e:
60
+ print(f"Could not read {path}: {e}")
61
+
62
+ # If no files found, fall back to some built-in demo content
63
+ if not texts:
64
+ print("No KB files found. Using built-in demo content.")
65
+ demo_text = """
66
+ Welcome to the Self-Service KB Assistant.
67
+
68
+ This assistant is meant to help you find information inside a knowledge base.
69
+ In a real setup, it would be connected to your own articles, procedures,
70
+ troubleshooting guides and FAQs.
71
+
72
+ Good knowledge base content is:
73
+ - Clear and structured with headings, steps and expected outcomes.
74
+ - Written in a customer-friendly tone.
75
+ - Easy to scan, with short paragraphs and bullet points.
76
+ - Maintained regularly to reflect product and process changes.
77
+
78
+ Example use cases for a KB assistant:
79
+ - Agents quickly searching for internal procedures.
80
+ - Customers asking “how do I…” style questions.
81
+ - Managers analyzing gaps in documentation based on repeated queries.
82
+ """
83
+ texts.append(("demo_content.txt", demo_text))
84
 
85
+ return texts
86
+
87
+
88
+ # -----------------------------
89
+ # KB INDEX
90
+ # -----------------------------
91
+
92
+ class KBIndex:
93
+ def __init__(self, model_name: str = EMBEDDING_MODEL_NAME):
94
+ print("Loading embedding model...")
95
+ self.model = SentenceTransformer(model_name)
96
+ print("Model loaded.")
97
+ self.chunks: List[str] = []
98
+ self.chunk_sources: List[str] = []
99
+ self.embeddings: np.ndarray | None = None
100
+ self.build_index()
101
+
102
+ def build_index(self):
103
+ """Load KB texts, split into chunks, and build an embedding index."""
104
+ texts = load_kb_texts(KB_DIR)
105
  all_chunks = []
106
+ all_sources = []
 
 
 
 
 
 
 
 
 
 
 
 
 
107
 
108
+ for source_name, content in texts:
109
+ for chunk in chunk_text(content):
110
+ all_chunks.append(chunk)
111
+ all_sources.append(source_name)
112
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  if not all_chunks:
114
+ print("⚠️ No chunks found for KB index.")
115
+ self.chunks = []
116
+ self.chunk_sources = []
117
+ self.embeddings = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
 
120
+ print(f"Creating embeddings for {len(all_chunks)} chunks...")
121
+ embeddings = self.model.encode(all_chunks, show_progress_bar=False, convert_to_numpy=True)
122
+ self.chunks = all_chunks
123
+ self.chunk_sources = all_sources
124
+ self.embeddings = embeddings
125
+ print("KB index ready.")
126
 
127
+ def search(self, query: str, top_k: int = TOP_K) -> List[Tuple[str, str, float]]:
128
+ """Return top-k (chunk, source_name, score) for a given query."""
129
+ if not query.strip():
130
+ return []
131
+
132
+ if self.embeddings is None or not len(self.chunks):
133
+ return []
134
+
135
+ query_vec = self.model.encode([query], show_progress_bar=False, convert_to_numpy=True)[0]
136
+
137
+ # Cosine similarity
138
+ dot_scores = np.dot(self.embeddings, query_vec)
139
+ norm_docs = np.linalg.norm(self.embeddings, axis=1)
140
+ norm_query = np.linalg.norm(query_vec) + 1e-10
141
+ scores = dot_scores / (norm_docs * norm_query + 1e-10)
 
 
142
 
143
+ top_idx = np.argsort(scores)[::-1][:top_k]
144
+ results = []
145
+ for idx in top_idx:
146
+ results.append((self.chunks[idx], self.chunk_sources[idx], float(scores[idx])))
147
+
148
+ return results
149
+
150
+
151
+ kb_index = KBIndex()
152
+
153
+
154
+ # -----------------------------
155
+ # CHAT LOGIC
156
+ # -----------------------------
157
+
158
+ def build_answer(query: str) -> str:
159
+ """Use the KB index to build a human-readable answer."""
160
+ results = kb_index.search(query, top_k=TOP_K)
161
+ if not results:
 
 
 
 
 
 
 
 
 
 
 
162
  return (
163
+ "I couldn't find anything relevant in the knowledge base for this query yet.\n\n"
164
+ "If this were connected to your real KB, this would be a good moment to:\n"
165
+ "- Create a new article, or\n"
166
+ "- Improve the existing documentation for this topic."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
 
169
+ intro = "Here’s what I found in the knowledge base:\n"
170
+ bullets = []
171
+ for i, (chunk, source, score) in enumerate(results, start=1):
172
+ bullets.append(f"{i}. From **{source}**:\n{chunk.strip()}\n")
173
+
174
+ guidance = (
175
+ "\nYou can ask follow-up questions, or try a more specific query if this doesn't fully answer your question."
176
+ )
177
+
178
+ return intro + "\n".join(bullets) + guidance
179
+
180
+
181
+ def chat_respond(message: str, history: List[Tuple[str, str]]):
182
+ """Gradio ChatInterface-compatible respond function."""
183
+ answer = build_answer(message)
184
+ history = history + [(message, answer)]
185
+ return answer, history
186
+
187
+
188
+ # -----------------------------
189
+ # GRADIO UI
190
+ # -----------------------------
191
+
192
+ description = """
193
+ Ask questions as if you were talking to a knowledge base assistant.
194
+
195
+ In a real scenario, this assistant would be connected to your own
196
+ help center or internal documentation. Here, it's using a small demo
197
+ knowledge base to show how retrieval-based self-service can work.
198
+ """
199
+
200
+ chat = gr.ChatInterface(
201
+ fn=chat_respond,
202
+ title="Self-Service KB Assistant",
203
+ description=description,
204
+ chatbot=gr.Chatbot(height=420, show_copy_button=True),
205
+ examples=[
206
+ "What makes a good knowledge base article?",
207
+ "How could a KB assistant help agents?",
208
+ "Why is self-service important for customer support?",
209
+ ],
210
+ )
211
+
212
+
213
  if __name__ == "__main__":
214
+ # On Hugging Face Spaces, you don't need to specify server_name/port,
215
+ # but it's harmless if you do.
216
+ chat.launch()