File size: 3,400 Bytes
c7180df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Text chunker - splits cleaned text into training-sized chunks."""

import logging
import re

logger = logging.getLogger(__name__)


class TextChunker:
    """Splits text into chunks suitable for character-level GPT training.

    Each chunk becomes one line in the training file. Chunks are split
    at sentence boundaries when possible, otherwise at word boundaries.
    """

    # Sentence-ending punctuation followed by space or end-of-string
    SENTENCE_END = re.compile(r"[.!?]['\"]?\s+")

    def __init__(self, config: dict):
        self.max_chars = config.get("max_chars", 256)
        self.min_chars = config.get("min_chars", 40)
        self.break_on_sentence = config.get("break_on_sentence", True)

    def chunk(self, text: str) -> list[str]:
        """Split text into chunks of at most max_chars characters.

        Args:
            text: Cleaned text to chunk.

        Returns:
            List of text chunks, each a single line with no newlines.
        """
        if not text.strip():
            return []

        # First, split into paragraphs
        paragraphs = [p.strip() for p in text.split("\n") if p.strip()]

        chunks = []
        for para in paragraphs:
            # Replace remaining newlines within paragraph with spaces
            para = para.replace("\n", " ").strip()

            if not para:
                continue

            if len(para) <= self.max_chars:
                if len(para) >= self.min_chars:
                    chunks.append(para)
                continue

            # Split long paragraphs
            chunks.extend(self._split_long_text(para))

        logger.info("Chunked text into %d chunks (max %d chars)", len(chunks), self.max_chars)
        return chunks

    def _split_long_text(self, text: str) -> list[str]:
        """Split text longer than max_chars into sentence-aware chunks."""
        chunks = []
        remaining = text

        while remaining:
            remaining = remaining.strip()
            if not remaining:
                break

            if len(remaining) <= self.max_chars:
                if len(remaining) >= self.min_chars:
                    chunks.append(remaining)
                break

            # Find the best break point within max_chars
            cut = self._find_break_point(remaining)
            chunk = remaining[:cut].strip()
            remaining = remaining[cut:].strip()

            if len(chunk) >= self.min_chars:
                chunks.append(chunk)

        return chunks

    def _find_break_point(self, text: str) -> int:
        """Find the best position to break text at, within max_chars.

        Priority: sentence boundary > word boundary > hard cut.
        """
        window = text[:self.max_chars]

        # Try to find sentence boundary
        if self.break_on_sentence:
            best_sentence_break = -1
            for match in self.SENTENCE_END.finditer(window):
                pos = match.end()
                if pos <= self.max_chars:
                    best_sentence_break = pos

            if best_sentence_break > self.min_chars:
                return best_sentence_break

        # Fall back to word boundary
        last_space = window.rfind(" ")
        if last_space > self.min_chars:
            return last_space

        # Hard cut at max_chars (shouldn't happen often with natural text)
        return self.max_chars