File size: 3,400 Bytes
c7180df | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | """Text chunker - splits cleaned text into training-sized chunks."""
import logging
import re
logger = logging.getLogger(__name__)
class TextChunker:
"""Splits text into chunks suitable for character-level GPT training.
Each chunk becomes one line in the training file. Chunks are split
at sentence boundaries when possible, otherwise at word boundaries.
"""
# Sentence-ending punctuation followed by space or end-of-string
SENTENCE_END = re.compile(r"[.!?]['\"]?\s+")
def __init__(self, config: dict):
self.max_chars = config.get("max_chars", 256)
self.min_chars = config.get("min_chars", 40)
self.break_on_sentence = config.get("break_on_sentence", True)
def chunk(self, text: str) -> list[str]:
"""Split text into chunks of at most max_chars characters.
Args:
text: Cleaned text to chunk.
Returns:
List of text chunks, each a single line with no newlines.
"""
if not text.strip():
return []
# First, split into paragraphs
paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
chunks = []
for para in paragraphs:
# Replace remaining newlines within paragraph with spaces
para = para.replace("\n", " ").strip()
if not para:
continue
if len(para) <= self.max_chars:
if len(para) >= self.min_chars:
chunks.append(para)
continue
# Split long paragraphs
chunks.extend(self._split_long_text(para))
logger.info("Chunked text into %d chunks (max %d chars)", len(chunks), self.max_chars)
return chunks
def _split_long_text(self, text: str) -> list[str]:
"""Split text longer than max_chars into sentence-aware chunks."""
chunks = []
remaining = text
while remaining:
remaining = remaining.strip()
if not remaining:
break
if len(remaining) <= self.max_chars:
if len(remaining) >= self.min_chars:
chunks.append(remaining)
break
# Find the best break point within max_chars
cut = self._find_break_point(remaining)
chunk = remaining[:cut].strip()
remaining = remaining[cut:].strip()
if len(chunk) >= self.min_chars:
chunks.append(chunk)
return chunks
def _find_break_point(self, text: str) -> int:
"""Find the best position to break text at, within max_chars.
Priority: sentence boundary > word boundary > hard cut.
"""
window = text[:self.max_chars]
# Try to find sentence boundary
if self.break_on_sentence:
best_sentence_break = -1
for match in self.SENTENCE_END.finditer(window):
pos = match.end()
if pos <= self.max_chars:
best_sentence_break = pos
if best_sentence_break > self.min_chars:
return best_sentence_break
# Fall back to word boundary
last_space = window.rfind(" ")
if last_space > self.min_chars:
return last_space
# Hard cut at max_chars (shouldn't happen often with natural text)
return self.max_chars
|