joelg commited on
Commit
e17ceac
Β·
1 Parent(s): a8115f1

ADD better chunking visualisation

Browse files
Files changed (1) hide show
  1. rag_system.py +91 -10
rag_system.py CHANGED
@@ -14,6 +14,7 @@ import spaces
14
  class RAGSystem:
15
  def __init__(self):
16
  self.chunks = []
 
17
  self.embeddings = None
18
  self.index = None
19
  self.embedding_model = None
@@ -66,10 +67,8 @@ class RAGSystem:
66
 
67
  self.ready = True
68
 
69
- # Format chunks for display
70
- chunks_display = "### Processed Chunks\n\n"
71
- for i, chunk in enumerate(self.chunks, 1):
72
- chunks_display += f"**Chunk {i}** ({len(chunk)} chars)\n```\n{chunk[:200]}{'...' if len(chunk) > 200 else ''}\n```\n\n"
73
 
74
  status = f"βœ… Success! Processed {len(pdf_files)} documents into {len(self.chunks)} chunks."
75
  return status, chunks_display, corpus_summary
@@ -88,14 +87,17 @@ class RAGSystem:
88
  return text
89
 
90
  def chunk_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
91
- """Split text into overlapping chunks"""
92
  chunks = []
 
93
  start = 0
94
  text_length = len(text)
 
95
 
96
  while start < text_length:
97
  end = start + chunk_size
98
  chunk = text[start:end]
 
99
 
100
  # Try to break at sentence boundary
101
  if end < text_length:
@@ -107,11 +109,33 @@ class RAGSystem:
107
  if break_point > chunk_size * 0.5: # Only break if we're past halfway
108
  chunk = chunk[:break_point + 1]
109
  end = start + break_point + 1
 
 
 
 
 
110
 
111
  chunks.append(chunk.strip())
 
 
 
 
 
 
 
 
112
  start = end - overlap
113
 
114
- return [c for c in chunks if len(c) > 50] # Filter out very small chunks
 
 
 
 
 
 
 
 
 
115
 
116
  @spaces.GPU
117
  def create_embeddings(self, texts: List[str]) -> np.ndarray:
@@ -158,10 +182,8 @@ class RAGSystem:
158
 
159
  self.ready = True
160
 
161
- # Format chunks for display
162
- chunks_display = "### Processed Chunks\n\n"
163
- for i, chunk in enumerate(self.chunks, 1):
164
- chunks_display += f"**Chunk {i}** ({len(chunk)} chars)\n```\n{chunk}\n```\n\n"
165
 
166
  status = f"βœ… Success! Processed {len(self.chunks)} chunks from the document."
167
  return status, chunks_display, text[:5000] # Return first 5000 chars of original text
@@ -170,6 +192,65 @@ class RAGSystem:
170
  self.ready = False
171
  return f"Error processing document: {str(e)}", "", ""
172
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
173
  def set_embedding_model(self, model_name: str):
174
  """Set or change the embedding model"""
175
  if self.embedding_model_name != model_name:
 
14
  class RAGSystem:
15
  def __init__(self):
16
  self.chunks = []
17
+ self.chunk_metadata = [] # Store chunk positions for overlap visualization
18
  self.embeddings = None
19
  self.index = None
20
  self.embedding_model = None
 
67
 
68
  self.ready = True
69
 
70
+ # Format chunks for display with overlap highlighting
71
+ chunks_display = self._format_chunks_with_overlap()
 
 
72
 
73
  status = f"βœ… Success! Processed {len(pdf_files)} documents into {len(self.chunks)} chunks."
74
  return status, chunks_display, corpus_summary
 
87
  return text
88
 
89
  def chunk_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
90
+ """Split text into overlapping chunks and store metadata"""
91
  chunks = []
92
+ self.chunk_metadata = [] # Reset metadata
93
  start = 0
94
  text_length = len(text)
95
+ previous_end = 0
96
 
97
  while start < text_length:
98
  end = start + chunk_size
99
  chunk = text[start:end]
100
+ original_end = end
101
 
102
  # Try to break at sentence boundary
103
  if end < text_length:
 
109
  if break_point > chunk_size * 0.5: # Only break if we're past halfway
110
  chunk = chunk[:break_point + 1]
111
  end = start + break_point + 1
112
+ original_end = end
113
+
114
+ # Calculate overlap with previous chunk
115
+ overlap_start = max(0, start - previous_end) if previous_end > 0 else 0
116
+ overlap_length = min(overlap, previous_end - start) if start < previous_end else 0
117
 
118
  chunks.append(chunk.strip())
119
+ self.chunk_metadata.append({
120
+ 'start': start,
121
+ 'end': original_end,
122
+ 'overlap_with_previous': overlap_length,
123
+ 'text': chunk
124
+ })
125
+
126
+ previous_end = original_end
127
  start = end - overlap
128
 
129
+ # Filter out very small chunks and update metadata accordingly
130
+ filtered_chunks = []
131
+ filtered_metadata = []
132
+ for i, c in enumerate(chunks):
133
+ if len(c) > 50:
134
+ filtered_chunks.append(c)
135
+ filtered_metadata.append(self.chunk_metadata[i])
136
+
137
+ self.chunk_metadata = filtered_metadata
138
+ return filtered_chunks
139
 
140
  @spaces.GPU
141
  def create_embeddings(self, texts: List[str]) -> np.ndarray:
 
182
 
183
  self.ready = True
184
 
185
+ # Format chunks for display with overlap highlighting
186
+ chunks_display = self._format_chunks_with_overlap()
 
 
187
 
188
  status = f"βœ… Success! Processed {len(self.chunks)} chunks from the document."
189
  return status, chunks_display, text[:5000] # Return first 5000 chars of original text
 
192
  self.ready = False
193
  return f"Error processing document: {str(e)}", "", ""
194
 
195
+ def _format_chunks_with_overlap(self) -> str:
196
+ """Format chunks with overlap highlighting for pedagogical display"""
197
+ if not self.chunks or not self.chunk_metadata:
198
+ return "No chunks available"
199
+
200
+ display = "### πŸ“‘ Processed Chunks\n\n"
201
+ display += "*Overlapping parts are shown separately with a yellow marker (⚠️)*\n\n"
202
+ display += "---\n\n"
203
+
204
+ for i, (chunk, metadata) in enumerate(zip(self.chunks, self.chunk_metadata), 1):
205
+ # Calculate which part is overlapping with previous chunk
206
+ if i == 1:
207
+ # First chunk has no overlap
208
+ display += f"#### πŸ“„ Chunk {i}\n"
209
+ display += f"**{len(chunk)} characters** | πŸ†• No overlap (first chunk)\n\n"
210
+ display += f"```text\n{chunk}\n```\n\n"
211
+ display += "---\n\n"
212
+ else:
213
+ # Find overlap with previous chunk
214
+ prev_chunk = self.chunks[i-2]
215
+
216
+ # Find common substring at the beginning of current chunk
217
+ overlap_length = 0
218
+ for j in range(1, min(len(chunk), len(prev_chunk)) + 1):
219
+ if prev_chunk[-j:] == chunk[:j]:
220
+ overlap_length = j
221
+
222
+ if overlap_length > 0:
223
+ overlap_text = chunk[:overlap_length]
224
+ remaining_text = chunk[overlap_length:]
225
+
226
+ display += f"#### πŸ“„ Chunk {i}\n"
227
+ display += f"**{len(chunk)} characters** | ⚠️ **{overlap_length} characters overlap** with previous chunk\n\n"
228
+
229
+ # Show overlap
230
+ display += f"> **⚠️ OVERLAP ({overlap_length} chars) - Repeated from Chunk {i-1}:**\n"
231
+ display += f"> ```text\n"
232
+ for line in overlap_text.split('\n'):
233
+ display += f"> {line}\n"
234
+ display += f"> ```\n\n"
235
+
236
+ # Show the new content
237
+ display += f"**πŸ†• NEW CONTENT ({len(remaining_text)} chars):**\n"
238
+ display += f"```text\n{remaining_text}\n```\n\n"
239
+
240
+ # Show full chunk for reference
241
+ display += f"<details>\n<summary>πŸ“‹ Click to view complete chunk (overlap + new)</summary>\n\n"
242
+ display += f"```text\n{chunk}\n```\n\n"
243
+ display += f"</details>\n\n"
244
+ else:
245
+ # No overlap found (shouldn't happen normally)
246
+ display += f"#### πŸ“„ Chunk {i}\n"
247
+ display += f"**{len(chunk)} characters** | No overlap detected\n\n"
248
+ display += f"```text\n{chunk}\n```\n\n"
249
+
250
+ display += "---\n\n"
251
+
252
+ return display
253
+
254
  def set_embedding_model(self, model_name: str):
255
  """Set or change the embedding model"""
256
  if self.embedding_model_name != model_name: