| """Text chunker - splits cleaned text into training-sized chunks.""" |
|
|
| import logging |
| import re |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TextChunker: |
| """Splits text into chunks suitable for character-level GPT training. |
| |
| Each chunk becomes one line in the training file. Chunks are split |
| at sentence boundaries when possible, otherwise at word boundaries. |
| """ |
|
|
| |
| SENTENCE_END = re.compile(r"[.!?]['\"]?\s+") |
|
|
| def __init__(self, config: dict): |
| self.max_chars = config.get("max_chars", 256) |
| self.min_chars = config.get("min_chars", 40) |
| self.break_on_sentence = config.get("break_on_sentence", True) |
|
|
| def chunk(self, text: str) -> list[str]: |
| """Split text into chunks of at most max_chars characters. |
| |
| Args: |
| text: Cleaned text to chunk. |
| |
| Returns: |
| List of text chunks, each a single line with no newlines. |
| """ |
| if not text.strip(): |
| return [] |
|
|
| |
| paragraphs = [p.strip() for p in text.split("\n") if p.strip()] |
|
|
| chunks = [] |
| for para in paragraphs: |
| |
| para = para.replace("\n", " ").strip() |
|
|
| if not para: |
| continue |
|
|
| if len(para) <= self.max_chars: |
| if len(para) >= self.min_chars: |
| chunks.append(para) |
| continue |
|
|
| |
| chunks.extend(self._split_long_text(para)) |
|
|
| logger.info("Chunked text into %d chunks (max %d chars)", len(chunks), self.max_chars) |
| return chunks |
|
|
| def _split_long_text(self, text: str) -> list[str]: |
| """Split text longer than max_chars into sentence-aware chunks.""" |
| chunks = [] |
| remaining = text |
|
|
| while remaining: |
| remaining = remaining.strip() |
| if not remaining: |
| break |
|
|
| if len(remaining) <= self.max_chars: |
| if len(remaining) >= self.min_chars: |
| chunks.append(remaining) |
| break |
|
|
| |
| cut = self._find_break_point(remaining) |
| chunk = remaining[:cut].strip() |
| remaining = remaining[cut:].strip() |
|
|
| if len(chunk) >= self.min_chars: |
| chunks.append(chunk) |
|
|
| return chunks |
|
|
| def _find_break_point(self, text: str) -> int: |
| """Find the best position to break text at, within max_chars. |
| |
| Priority: sentence boundary > word boundary > hard cut. |
| """ |
| window = text[:self.max_chars] |
|
|
| |
| if self.break_on_sentence: |
| best_sentence_break = -1 |
| for match in self.SENTENCE_END.finditer(window): |
| pos = match.end() |
| if pos <= self.max_chars: |
| best_sentence_break = pos |
|
|
| if best_sentence_break > self.min_chars: |
| return best_sentence_break |
|
|
| |
| last_space = window.rfind(" ") |
| if last_space > self.min_chars: |
| return last_space |
|
|
| |
| return self.max_chars |
|
|