"""Text chunker - splits cleaned text into training-sized chunks.""" import logging import re logger = logging.getLogger(__name__) class TextChunker: """Splits text into chunks suitable for character-level GPT training. Each chunk becomes one line in the training file. Chunks are split at sentence boundaries when possible, otherwise at word boundaries. """ # Sentence-ending punctuation followed by space or end-of-string SENTENCE_END = re.compile(r"[.!?]['\"]?\s+") def __init__(self, config: dict): self.max_chars = config.get("max_chars", 256) self.min_chars = config.get("min_chars", 40) self.break_on_sentence = config.get("break_on_sentence", True) def chunk(self, text: str) -> list[str]: """Split text into chunks of at most max_chars characters. Args: text: Cleaned text to chunk. Returns: List of text chunks, each a single line with no newlines. """ if not text.strip(): return [] # First, split into paragraphs paragraphs = [p.strip() for p in text.split("\n") if p.strip()] chunks = [] for para in paragraphs: # Replace remaining newlines within paragraph with spaces para = para.replace("\n", " ").strip() if not para: continue if len(para) <= self.max_chars: if len(para) >= self.min_chars: chunks.append(para) continue # Split long paragraphs chunks.extend(self._split_long_text(para)) logger.info("Chunked text into %d chunks (max %d chars)", len(chunks), self.max_chars) return chunks def _split_long_text(self, text: str) -> list[str]: """Split text longer than max_chars into sentence-aware chunks.""" chunks = [] remaining = text while remaining: remaining = remaining.strip() if not remaining: break if len(remaining) <= self.max_chars: if len(remaining) >= self.min_chars: chunks.append(remaining) break # Find the best break point within max_chars cut = self._find_break_point(remaining) chunk = remaining[:cut].strip() remaining = remaining[cut:].strip() if len(chunk) >= self.min_chars: chunks.append(chunk) return chunks def _find_break_point(self, text: str) -> int: """Find the best position to break text at, within max_chars. Priority: sentence boundary > word boundary > hard cut. """ window = text[:self.max_chars] # Try to find sentence boundary if self.break_on_sentence: best_sentence_break = -1 for match in self.SENTENCE_END.finditer(window): pos = match.end() if pos <= self.max_chars: best_sentence_break = pos if best_sentence_break > self.min_chars: return best_sentence_break # Fall back to word boundary last_space = window.rfind(" ") if last_space > self.min_chars: return last_space # Hard cut at max_chars (shouldn't happen often with natural text) return self.max_chars