philipp-zettl commited on
Commit
cc18cf3
·
verified ·
1 Parent(s): f7b8b86

Add vrom_hub/chunker.py

Browse files
Files changed (1) hide show
  1. vrom_hub/chunker.py +307 -0
vrom_hub/chunker.py ADDED
@@ -0,0 +1,307 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Section-aware document chunker for vROM.
3
+
4
+ Splits markdown documents into ~256-token chunks that:
5
+ - Respect section boundaries (heading-aware)
6
+ - Preserve code blocks intact
7
+ - Create a doubly-linked list (prev_chunk_id / next_chunk_id)
8
+ - Track source file, heading, char offsets, and URL
9
+ """
10
+
11
+ from __future__ import annotations
12
+
13
+ import hashlib
14
+ import re
15
+ from dataclasses import dataclass, field, asdict
16
+ from typing import Optional
17
+
18
+
19
+ @dataclass
20
+ class Chunk:
21
+ """A single chunk of documentation text with full metadata."""
22
+ chunk_id: int
23
+ text: str
24
+ source_file: str
25
+ section_heading: str
26
+ char_start: int
27
+ char_end: int
28
+ token_estimate: int
29
+ prev_chunk_id: Optional[int]
30
+ next_chunk_id: Optional[int]
31
+ url: str
32
+ doc_title: str
33
+
34
+ def to_dict(self) -> dict:
35
+ return asdict(self)
36
+
37
+
38
+ @dataclass
39
+ class DocPage:
40
+ """A single documentation page to be chunked."""
41
+ content: str
42
+ source_file: str # e.g. "trl/index.md"
43
+ url: str # canonical URL
44
+ title: str # document title
45
+
46
+
47
+ def _estimate_tokens(text: str) -> int:
48
+ """Rough token count: ~4 chars per token for English text."""
49
+ return max(1, len(text) // 4)
50
+
51
+
52
+ def _split_preserving_code_blocks(text: str) -> list[dict]:
53
+ """
54
+ Split text into segments, marking which are code blocks.
55
+ Returns list of {"text": str, "is_code": bool}.
56
+ """
57
+ segments = []
58
+ pattern = re.compile(r'(```[\s\S]*?```)', re.MULTILINE)
59
+ last_end = 0
60
+
61
+ for match in pattern.finditer(text):
62
+ # Text before code block
63
+ before = text[last_end:match.start()]
64
+ if before.strip():
65
+ segments.append({"text": before, "is_code": False})
66
+ # The code block itself
67
+ segments.append({"text": match.group(0), "is_code": True})
68
+ last_end = match.end()
69
+
70
+ # Remaining text after last code block
71
+ remaining = text[last_end:]
72
+ if remaining.strip():
73
+ segments.append({"text": remaining, "is_code": False})
74
+
75
+ return segments
76
+
77
+
78
+ def _split_into_sections(content: str) -> list[dict]:
79
+ """
80
+ Split markdown content by headings.
81
+ Returns list of {"heading": str, "text": str, "char_start": int, "char_end": int}.
82
+ """
83
+ # Match markdown headings (# ## ### etc.)
84
+ heading_pattern = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE)
85
+
86
+ sections = []
87
+ matches = list(heading_pattern.finditer(content))
88
+
89
+ if not matches:
90
+ # No headings found — entire content is one section
91
+ return [{
92
+ "heading": "",
93
+ "text": content,
94
+ "char_start": 0,
95
+ "char_end": len(content),
96
+ }]
97
+
98
+ # Text before first heading
99
+ if matches[0].start() > 0:
100
+ pre_text = content[:matches[0].start()]
101
+ if pre_text.strip():
102
+ sections.append({
103
+ "heading": "",
104
+ "text": pre_text,
105
+ "char_start": 0,
106
+ "char_end": matches[0].start(),
107
+ })
108
+
109
+ for i, match in enumerate(matches):
110
+ heading_text = match.group(2).strip()
111
+ start = match.start()
112
+ end = matches[i + 1].start() if i + 1 < len(matches) else len(content)
113
+ section_text = content[start:end]
114
+
115
+ sections.append({
116
+ "heading": heading_text,
117
+ "text": section_text,
118
+ "char_start": start,
119
+ "char_end": end,
120
+ })
121
+
122
+ return sections
123
+
124
+
125
+ class SectionAwareChunker:
126
+ """
127
+ Chunks documentation pages into ~max_tokens-token pieces.
128
+
129
+ Strategy:
130
+ 1. Split by markdown headings → sections
131
+ 2. For each section, split into chunks of ≤ max_tokens tokens
132
+ 3. Preserve code blocks: never split in the middle of a fenced code block
133
+ 4. Create doubly-linked list pointers between chunks of the same document
134
+ """
135
+
136
+ def __init__(self, max_tokens: int = 256):
137
+ self.max_tokens = max_tokens
138
+
139
+ def chunk_page(self, page: DocPage, start_chunk_id: int = 0) -> list[Chunk]:
140
+ """
141
+ Chunk a single documentation page.
142
+
143
+ Args:
144
+ page: The document page to chunk.
145
+ start_chunk_id: The starting chunk_id (for multi-page builds).
146
+
147
+ Returns:
148
+ List of Chunk objects with sequential IDs starting at start_chunk_id.
149
+ """
150
+ sections = _split_into_sections(page.content)
151
+ raw_chunks: list[dict] = []
152
+
153
+ for section in sections:
154
+ section_chunks = self._chunk_section(
155
+ text=section["text"],
156
+ heading=section["heading"],
157
+ char_offset=section["char_start"],
158
+ source_file=page.source_file,
159
+ url=page.url,
160
+ doc_title=page.title,
161
+ )
162
+ raw_chunks.extend(section_chunks)
163
+
164
+ # Assign IDs and build linked list
165
+ chunks = []
166
+ for i, raw in enumerate(raw_chunks):
167
+ cid = start_chunk_id + i
168
+ chunk = Chunk(
169
+ chunk_id=cid,
170
+ text=raw["text"],
171
+ source_file=raw["source_file"],
172
+ section_heading=raw["heading"],
173
+ char_start=raw["char_start"],
174
+ char_end=raw["char_end"],
175
+ token_estimate=_estimate_tokens(raw["text"]),
176
+ prev_chunk_id=cid - 1 if i > 0 else None,
177
+ next_chunk_id=cid + 1 if i < len(raw_chunks) - 1 else None,
178
+ url=raw["url"],
179
+ doc_title=raw["doc_title"],
180
+ )
181
+ chunks.append(chunk)
182
+
183
+ return chunks
184
+
185
+ def chunk_pages(self, pages: list[DocPage]) -> list[Chunk]:
186
+ """Chunk multiple pages, maintaining global chunk IDs and linked lists."""
187
+ all_chunks = []
188
+ current_id = 0
189
+
190
+ for page in pages:
191
+ page_chunks = self.chunk_page(page, start_chunk_id=current_id)
192
+ # Break cross-page links: first chunk of new page has no prev from old page
193
+ if all_chunks and page_chunks:
194
+ # Last chunk of previous page shouldn't link to first of new page
195
+ all_chunks[-1] = Chunk(
196
+ **{**all_chunks[-1].to_dict(), "next_chunk_id": None}
197
+ )
198
+ page_chunks[0] = Chunk(
199
+ **{**page_chunks[0].to_dict(), "prev_chunk_id": None}
200
+ )
201
+
202
+ all_chunks.extend(page_chunks)
203
+ current_id += len(page_chunks)
204
+
205
+ return all_chunks
206
+
207
+ def _chunk_section(
208
+ self,
209
+ text: str,
210
+ heading: str,
211
+ char_offset: int,
212
+ source_file: str,
213
+ url: str,
214
+ doc_title: str,
215
+ ) -> list[dict]:
216
+ """Split a section into token-bounded chunks, preserving code blocks."""
217
+ segments = _split_preserving_code_blocks(text)
218
+ chunks = []
219
+ current_text = ""
220
+ current_start = char_offset
221
+
222
+ for seg in segments:
223
+ seg_tokens = _estimate_tokens(seg["text"])
224
+
225
+ if seg["is_code"]:
226
+ # Code blocks are kept intact even if they exceed max_tokens
227
+ if current_text.strip():
228
+ # Flush accumulated text first
229
+ cur_tokens = _estimate_tokens(current_text)
230
+ if cur_tokens > 0:
231
+ chunks.append({
232
+ "text": current_text.strip(),
233
+ "heading": heading,
234
+ "char_start": current_start,
235
+ "char_end": current_start + len(current_text),
236
+ "source_file": source_file,
237
+ "url": url,
238
+ "doc_title": doc_title,
239
+ })
240
+ current_start += len(current_text)
241
+ current_text = ""
242
+
243
+ # Add code block as its own chunk
244
+ chunks.append({
245
+ "text": seg["text"].strip(),
246
+ "heading": heading,
247
+ "char_start": current_start,
248
+ "char_end": current_start + len(seg["text"]),
249
+ "source_file": source_file,
250
+ "url": url,
251
+ "doc_title": doc_title,
252
+ })
253
+ current_start += len(seg["text"])
254
+ else:
255
+ # Regular text — split by paragraphs/sentences if needed
256
+ paragraphs = re.split(r'\n\n+', seg["text"])
257
+ for para in paragraphs:
258
+ para_tokens = _estimate_tokens(para)
259
+
260
+ if _estimate_tokens(current_text) + para_tokens > self.max_tokens and current_text.strip():
261
+ # Flush current chunk
262
+ chunks.append({
263
+ "text": current_text.strip(),
264
+ "heading": heading,
265
+ "char_start": current_start,
266
+ "char_end": current_start + len(current_text),
267
+ "source_file": source_file,
268
+ "url": url,
269
+ "doc_title": doc_title,
270
+ })
271
+ current_start += len(current_text)
272
+ current_text = ""
273
+
274
+ if para_tokens > self.max_tokens:
275
+ # Very long paragraph — split by sentences
276
+ sentences = re.split(r'(?<=[.!?])\s+', para)
277
+ for sent in sentences:
278
+ sent_tokens = _estimate_tokens(sent)
279
+ if _estimate_tokens(current_text) + sent_tokens > self.max_tokens and current_text.strip():
280
+ chunks.append({
281
+ "text": current_text.strip(),
282
+ "heading": heading,
283
+ "char_start": current_start,
284
+ "char_end": current_start + len(current_text),
285
+ "source_file": source_file,
286
+ "url": url,
287
+ "doc_title": doc_title,
288
+ })
289
+ current_start += len(current_text)
290
+ current_text = ""
291
+ current_text += sent + " "
292
+ else:
293
+ current_text += para + "\n\n"
294
+
295
+ # Flush remaining
296
+ if current_text.strip():
297
+ chunks.append({
298
+ "text": current_text.strip(),
299
+ "heading": heading,
300
+ "char_start": current_start,
301
+ "char_end": current_start + len(current_text),
302
+ "source_file": source_file,
303
+ "url": url,
304
+ "doc_title": doc_title,
305
+ })
306
+
307
+ return chunks