Spaces:
Running
Running
| """Split long text into overlapping chunks for embedding. | |
| Approximation: 1500 chars ≈ 375 tokens for English/Latin scripts. That fits | |
| the SmolLM2 2048-token context window comfortably when retrieving 3 chunks | |
| plus a few chat turns. For CJK or other dense scripts the chunks will be | |
| fewer tokens per char but the embedder doesn't care. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| def chunk_text( | |
| text: str, | |
| chunk_chars: int = 1500, | |
| overlap_chars: int = 200, | |
| ) -> list[str]: | |
| """Paragraph-aware splitter with overlap. | |
| Walk paragraphs (separated by blank lines). For each: | |
| • If it fits in the current buffer: append. | |
| • Else, flush the buffer (with tail-overlap into the next chunk). | |
| • If the paragraph itself is longer than chunk_chars: hard-split it. | |
| """ | |
| text = re.sub(r"\r\n?", "\n", text or "") | |
| text = re.sub(r"\n{3,}", "\n\n", text).strip() | |
| if not text: | |
| return [] | |
| if len(text) <= chunk_chars: | |
| return [text] | |
| chunks: list[str] = [] | |
| buffer = "" | |
| for para in text.split("\n\n"): | |
| para = para.strip() | |
| if not para: | |
| continue | |
| if _can_fit(buffer, para, chunk_chars): | |
| buffer = _append(buffer, para) | |
| continue | |
| # Doesn't fit — flush the buffer (if any). | |
| if buffer: | |
| chunks.append(buffer) | |
| buffer = buffer[-overlap_chars:] if overlap_chars > 0 else "" | |
| # Now try fitting again into the smaller (overlap-only) buffer. | |
| if _can_fit(buffer, para, chunk_chars): | |
| buffer = _append(buffer, para) | |
| continue | |
| # Paragraph alone exceeds chunk_chars — hard-split it. | |
| chunks.extend(_hard_split(para, chunk_chars, overlap_chars)) | |
| buffer = "" | |
| if buffer: | |
| chunks.append(buffer) | |
| return [c.strip() for c in chunks if c.strip()] | |
| def _can_fit(buffer: str, para: str, chunk_chars: int) -> bool: | |
| sep = 2 if buffer else 0 | |
| return len(buffer) + sep + len(para) <= chunk_chars | |
| def _append(buffer: str, para: str) -> str: | |
| return f"{buffer}\n\n{para}" if buffer else para | |
| def _hard_split(text: str, chunk_chars: int, overlap_chars: int) -> list[str]: | |
| step = max(1, chunk_chars - overlap_chars) | |
| return [text[i : i + chunk_chars] for i in range(0, len(text), step)] | |