abakerdp commited on
Commit
219895c
Β·
verified Β·
1 Parent(s): d988be3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +173 -53
app.py CHANGED
@@ -1,42 +1,143 @@
1
  import gradio as gr
2
  from sentence_transformers import SentenceTransformer
3
- import json
4
- from pathlib import Path
5
  import numpy as np
6
  from typing import List, Dict
 
 
 
 
 
 
 
 
7
 
8
- class SimpleRAG:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  def __init__(self):
10
  self.model = SentenceTransformer('all-MiniLM-L6-v2')
11
  self.documents = []
12
  self.embeddings = []
13
  self.metadata = []
 
14
 
15
- def load_documents(self, filepath: str):
16
- with open(filepath) as f:
17
- data = json.load(f)
18
-
19
- for doc in data["documents"]:
20
- self.documents.append(doc["content"])
21
- self.metadata.append({
22
- "title": doc["title"],
23
- "source": doc.get("source", "Unknown"),
24
- "section": doc.get("section", "General")
25
- })
26
 
27
- # Create embeddings for all documents
28
- self.embeddings = self.model.encode(self.documents)
 
29
 
30
  def search(self, query: str, top_k: int = 5) -> List[Dict]:
31
- # Get query embedding
32
  query_embedding = self.model.encode(query)
33
 
34
- # Calculate similarities
35
  similarities = np.dot(self.embeddings, query_embedding) / (
36
  np.linalg.norm(self.embeddings, axis=1) * np.linalg.norm(query_embedding)
37
  )
38
 
39
- # Get top results
40
  top_indices = np.argsort(similarities)[-top_k:][::-1]
41
 
42
  results = []
@@ -50,42 +151,51 @@ class SimpleRAG:
50
  return results
51
 
52
  # Initialize the RAG system
53
- rag = SimpleRAG()
54
- try:
55
- rag.load_documents("documents.json")
56
- except Exception as e:
57
- print(f"Error loading documents: {e}")
58
- # Load a sample document if the file doesn't exist
59
- sample_data = {
60
- "documents": [
61
- {
62
- "title": "Sample Document",
63
- "content": "This is a sample document. Please add your own documents.json file to see real content.",
64
- "source": "Sample",
65
- "section": "Test"
66
- }
67
- ]
68
- }
69
- with open("documents.json", "w") as f:
70
- json.dump(sample_data, f)
71
- rag.load_documents("documents.json")
 
72
 
73
- def search_documents(query, top_k=5):
74
  if not query.strip():
75
- return "Please enter a query"
76
 
77
  results = rag.search(query, top_k)
78
 
79
- # Format output
80
  output = ""
81
- for result in results:
 
 
82
  metadata = result["metadata"]
83
  score_percentage = round(result["score"] * 100)
84
- output += f"\n\nπŸ“š {metadata['title']}\n"
85
- output += f"πŸ“ {metadata['source']} β€’ {metadata['section']} β€’ Relevance: {score_percentage}%\n"
 
 
 
86
  output += f"───────────────────\n{result['content']}\n"
 
 
 
 
87
 
88
- return output
89
 
90
  # Create Gradio interface
91
  interface = gr.Interface(
@@ -102,19 +212,29 @@ interface = gr.Interface(
102
  value=5,
103
  step=1,
104
  label="Number of results"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  )
106
  ],
107
- outputs=gr.Textbox(
108
- label="Search Results",
109
- lines=20
110
- ),
111
- title="Knowledge Base Search",
112
- description="Ask questions about your documents and get relevant answers.",
113
  theme="default",
114
  allow_flagging="never",
115
  examples=[
116
- ["What is machine learning?"],
117
- ["How does this work?"],
118
  ]
119
  )
120
 
 
1
  import gradio as gr
2
  from sentence_transformers import SentenceTransformer
 
 
3
  import numpy as np
4
  from typing import List, Dict
5
+ import PyPDF2
6
+ import docx
7
+ import os
8
+ from pathlib import Path
9
+ import json
10
+ import fitz # PyMuPDF for better PDF handling
11
+ import re
12
+ from tqdm import tqdm
13
 
14
+ class DocumentProcessor:
15
+ def __init__(self, docs_dir="documents"):
16
+ self.docs_dir = docs_dir
17
+
18
+ def extract_text_from_pdf(self, file_path):
19
+ try:
20
+ doc = fitz.open(file_path)
21
+ text_chunks = []
22
+
23
+ for page_num, page in enumerate(doc):
24
+ # Extract text
25
+ text = page.get_text()
26
+
27
+ # Get page dimensions for preview coordinates
28
+ preview = {
29
+ "page": page_num + 1,
30
+ "total_pages": len(doc),
31
+ }
32
+
33
+ # Split into chunks (~ 500 chars each)
34
+ chunks = self.split_into_chunks(text)
35
+ for chunk in chunks:
36
+ text_chunks.append({
37
+ "content": chunk,
38
+ "metadata": {
39
+ "source": os.path.basename(file_path),
40
+ "type": "pdf",
41
+ "preview": preview
42
+ }
43
+ })
44
+ return text_chunks
45
+ except Exception as e:
46
+ print(f"Error processing PDF {file_path}: {e}")
47
+ return []
48
+
49
+ def extract_text_from_docx(self, file_path):
50
+ try:
51
+ doc = docx.Document(file_path)
52
+ text_chunks = []
53
+
54
+ full_text = ""
55
+ for para in doc.paragraphs:
56
+ full_text += para.text + "\n"
57
+
58
+ chunks = self.split_into_chunks(full_text)
59
+ for chunk in chunks:
60
+ text_chunks.append({
61
+ "content": chunk,
62
+ "metadata": {
63
+ "source": os.path.basename(file_path),
64
+ "type": "docx"
65
+ }
66
+ })
67
+ return text_chunks
68
+ except Exception as e:
69
+ print(f"Error processing DOCX {file_path}: {e}")
70
+ return []
71
+
72
+ def split_into_chunks(self, text, chunk_size=500, overlap=50):
73
+ chunks = []
74
+ start = 0
75
+ text_length = len(text)
76
+
77
+ while start < text_length:
78
+ end = start + chunk_size
79
+
80
+ # Adjust chunk end to nearest sentence or paragraph break
81
+ if end < text_length:
82
+ # Look for sentence endings (.!?) followed by space or newline
83
+ match = re.search(r'[.!?]\s+', text[end-50:end+50])
84
+ if match:
85
+ end = end - 50 + match.end()
86
+
87
+ chunk = text[start:end].strip()
88
+ if chunk: # Only add non-empty chunks
89
+ chunks.append(chunk)
90
+
91
+ start = end - overlap
92
+
93
+ return chunks
94
+
95
+ def process_all_documents(self):
96
+ all_chunks = []
97
+
98
+ if not os.path.exists(self.docs_dir):
99
+ os.makedirs(self.docs_dir)
100
+ print(f"Created documents directory at {self.docs_dir}")
101
+ return all_chunks
102
+
103
+ for file_name in tqdm(os.listdir(self.docs_dir)):
104
+ file_path = os.path.join(self.docs_dir, file_name)
105
+
106
+ if file_name.lower().endswith('.pdf'):
107
+ chunks = self.extract_text_from_pdf(file_path)
108
+ all_chunks.extend(chunks)
109
+ elif file_name.lower().endswith('.docx'):
110
+ chunks = self.extract_text_from_docx(file_path)
111
+ all_chunks.extend(chunks)
112
+
113
+ return all_chunks
114
+
115
+ class DocumentRAG:
116
  def __init__(self):
117
  self.model = SentenceTransformer('all-MiniLM-L6-v2')
118
  self.documents = []
119
  self.embeddings = []
120
  self.metadata = []
121
+ self.processor = DocumentProcessor()
122
 
123
+ def load_documents(self):
124
+ print("Processing documents...")
125
+ chunks = self.processor.process_all_documents()
126
+
127
+ self.documents = [chunk["content"] for chunk in chunks]
128
+ self.metadata = [chunk["metadata"] for chunk in chunks]
 
 
 
 
 
129
 
130
+ print("Creating embeddings...")
131
+ self.embeddings = self.model.encode(self.documents, show_progress_bar=True)
132
+ print(f"Loaded {len(self.documents)} chunks from documents")
133
 
134
  def search(self, query: str, top_k: int = 5) -> List[Dict]:
 
135
  query_embedding = self.model.encode(query)
136
 
 
137
  similarities = np.dot(self.embeddings, query_embedding) / (
138
  np.linalg.norm(self.embeddings, axis=1) * np.linalg.norm(query_embedding)
139
  )
140
 
 
141
  top_indices = np.argsort(similarities)[-top_k:][::-1]
142
 
143
  results = []
 
151
  return results
152
 
153
  # Initialize the RAG system
154
+ rag = DocumentRAG()
155
+ rag.load_documents()
156
+
157
+ def preview_document(source, page=1):
158
+ if not source.lower().endswith('.pdf'):
159
+ return "Preview only available for PDF documents"
160
+
161
+ try:
162
+ doc = fitz.open(os.path.join("documents", source))
163
+ if 1 <= page <= len(doc):
164
+ page_content = doc[page-1]
165
+ # Convert page to image
166
+ pix = page_content.get_pixmap(matrix=fitz.Matrix(2, 2)) # 2x zoom for better quality
167
+ img_path = f"temp_{source}_{page}.png"
168
+ pix.save(img_path)
169
+ return img_path
170
+ else:
171
+ return "Invalid page number"
172
+ except Exception as e:
173
+ return f"Error previewing document: {e}"
174
 
175
+ def search_documents(query, top_k=5, include_preview=True):
176
  if not query.strip():
177
+ return "Please enter a query", None
178
 
179
  results = rag.search(query, top_k)
180
 
 
181
  output = ""
182
+ preview_path = None
183
+
184
+ for i, result in enumerate(results, 1):
185
  metadata = result["metadata"]
186
  score_percentage = round(result["score"] * 100)
187
+
188
+ output += f"\n\nπŸ“„ Document: {metadata['source']}\n"
189
+ if metadata['type'] == 'pdf':
190
+ output += f"πŸ“ Page {metadata['preview']['page']}/{metadata['preview']['total_pages']}"
191
+ output += f" β€’ Relevance: {score_percentage}%\n"
192
  output += f"───────────────────\n{result['content']}\n"
193
+
194
+ # Get preview for the first PDF result if requested
195
+ if i == 1 and include_preview and metadata['type'] == 'pdf':
196
+ preview_path = preview_document(metadata['source'], metadata['preview']['page'])
197
 
198
+ return output, preview_path
199
 
200
  # Create Gradio interface
201
  interface = gr.Interface(
 
212
  value=5,
213
  step=1,
214
  label="Number of results"
215
+ ),
216
+ gr.Checkbox(
217
+ label="Show document preview",
218
+ value=True
219
+ )
220
+ ],
221
+ outputs=[
222
+ gr.Textbox(
223
+ label="Search Results",
224
+ lines=20
225
+ ),
226
+ gr.Image(
227
+ label="Document Preview",
228
+ type="filepath"
229
  )
230
  ],
231
+ title="Document Search",
232
+ description="Search through PDFs and Word documents. Enter your question to find relevant content.",
 
 
 
 
233
  theme="default",
234
  allow_flagging="never",
235
  examples=[
236
+ ["What is the main topic discussed in the documents?"],
237
+ ["Can you find specific examples of...?"],
238
  ]
239
  )
240