File size: 6,959 Bytes
d1c4aa1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""Text processing utilities for the TTS API."""

import re
from typing import List


class TextChunker:
    """Server-side text chunking for optimal GPU processing."""
    
    def __init__(self, max_chunk_size: int = 800, overlap_sentences: int = 0):
        """
        Initialize the text chunker.
        
        Args:
            max_chunk_size: Maximum number of characters per chunk
            overlap_sentences: Number of sentences to overlap between chunks for continuity
        """
        self.max_chunk_size = max_chunk_size
        self.overlap_sentences = overlap_sentences
    
    def chunk_text(self, text: str) -> List[str]:
        """
        Break text into smaller chunks based on paragraphs and sentence boundaries.
        
        Args:
            text: The input text to chunk
            
        Returns:
            List of text chunks
        """
        if not text or not text.strip():
            return []
        
        # Clean the text
        text = text.strip()
        
        # If text is within the limit, return as single chunk
        if len(text) <= self.max_chunk_size:
            return [text]
        
        chunks = []
        
        # First, try to split by paragraphs
        paragraphs = self._split_into_paragraphs(text)
        
        current_chunk = ""
        
        for paragraph in paragraphs:
            # If adding this paragraph would exceed the limit
            if len(current_chunk) + len(paragraph) + 1 > self.max_chunk_size:
                # If we have content in current chunk, save it
                if current_chunk.strip():
                    chunks.append(current_chunk.strip())
                    current_chunk = ""
                
                # If the paragraph itself is too long, split it by sentences
                if len(paragraph) > self.max_chunk_size:
                    sentence_chunks = self._split_paragraph_into_sentences(paragraph)
                    for sentence_chunk in sentence_chunks:
                        if len(current_chunk) + len(sentence_chunk) + 1 > self.max_chunk_size:
                            if current_chunk.strip():
                                chunks.append(current_chunk.strip())
                            current_chunk = sentence_chunk
                        else:
                            if current_chunk:
                                current_chunk += " " + sentence_chunk
                            else:
                                current_chunk = sentence_chunk
                else:
                    current_chunk = paragraph
            else:
                # Add paragraph to current chunk
                if current_chunk:
                    current_chunk += "\n\n" + paragraph
                else:
                    current_chunk = paragraph
        
        # Add any remaining content
        if current_chunk.strip():
            chunks.append(current_chunk.strip())
        
        # Apply overlap if specified
        if self.overlap_sentences > 0 and len(chunks) > 1:
            chunks = self._add_overlap(chunks)
        
        return chunks
    
    def _split_into_paragraphs(self, text: str) -> List[str]:
        """Split text into paragraphs."""
        # Split by double newlines or multiple spaces
        paragraphs = re.split(r'\n\s*\n|(?:\n\s*){2,}', text)
        # Filter out empty paragraphs and strip whitespace
        return [p.strip() for p in paragraphs if p.strip()]
    
    def _split_paragraph_into_sentences(self, paragraph: str) -> List[str]:
        """Split a long paragraph into sentence-based chunks."""
        # Split by sentence boundaries
        sentences = re.split(r'(?<=[.!?])\s+', paragraph)
        
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            # If a single sentence is longer than max_chunk_size, we need to force-split it
            if len(sentence) > self.max_chunk_size:
                # Save current chunk if it has content
                if current_chunk.strip():
                    chunks.append(current_chunk.strip())
                    current_chunk = ""
                
                # Force-split the long sentence into smaller pieces
                while len(sentence) > self.max_chunk_size:
                    # Find a good breaking point (prefer spaces)
                    break_point = self.max_chunk_size
                    if ' ' in sentence[:self.max_chunk_size]:
                        # Find the last space within the limit
                        break_point = sentence[:self.max_chunk_size].rfind(' ')
                    
                    chunk_part = sentence[:break_point]
                    chunks.append(chunk_part)
                    sentence = sentence[break_point:].strip()
                
                # Add the remaining part of the sentence
                if sentence:
                    current_chunk = sentence
                    
            elif len(current_chunk) + len(sentence) + 1 > self.max_chunk_size:
                if current_chunk.strip():
                    chunks.append(current_chunk.strip())
                current_chunk = sentence
            else:
                if current_chunk:
                    current_chunk += " " + sentence
                else:
                    current_chunk = sentence
        
        if current_chunk.strip():
            chunks.append(current_chunk.strip())
        
        return chunks
    
    def _add_overlap(self, chunks: List[str]) -> List[str]:
        """Add sentence overlap between chunks for better continuity."""
        if len(chunks) <= 1:
            return chunks
        
        overlapped_chunks = [chunks[0]]  # First chunk stays the same
        
        for i in range(1, len(chunks)):
            # Get last few sentences from previous chunk
            prev_chunk = chunks[i - 1]
            current_chunk = chunks[i]
            
            prev_sentences = re.split(r'(?<=[.!?])\s+', prev_chunk)
            overlap_text = " ".join(prev_sentences[-self.overlap_sentences:]) if len(prev_sentences) > self.overlap_sentences else ""
            
            if overlap_text:
                overlapped_chunk = overlap_text + " " + current_chunk
            else:
                overlapped_chunk = current_chunk
            
            overlapped_chunks.append(overlapped_chunk)
        
        return overlapped_chunks
    
    def get_chunk_info(self, chunks: List[str]) -> dict:
        """Get information about the chunks."""
        return {
            "total_chunks": len(chunks),
            "total_characters": sum(len(chunk) for chunk in chunks),
            "avg_chunk_size": sum(len(chunk) for chunk in chunks) / len(chunks) if chunks else 0,
            "max_chunk_size": max(len(chunk) for chunk in chunks) if chunks else 0,
            "min_chunk_size": min(len(chunk) for chunk in chunks) if chunks else 0
        }