File size: 3,429 Bytes
aa1f259
 
 
4193bcd
aa1f259
 
 
 
 
 
4193bcd
 
 
 
 
aa1f259
 
4193bcd
 
 
 
 
 
aa1f259
 
 
4193bcd
 
aa1f259
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4193bcd
 
 
 
 
 
 
 
aa1f259
4193bcd
 
 
 
aa1f259
 
 
 
 
 
 
4193bcd
aa1f259
 
 
 
 
 
 
 
 
 
 
4193bcd
 
 
 
 
 
 
aa1f259
4193bcd
 
 
aa1f259
 
 
 
 
 
 
 
4193bcd
aa1f259
 
 
 
 
 
 
4193bcd
 
 
 
 
 
 
 
aa1f259
4193bcd
aa1f259
 
 
 
 
 
 
 
4193bcd
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
Text Processing Module for PodXplainClone.

Handles text cleanup, chunking, and chunk-count estimation for Kokoro TTS.
"""

import re
from typing import List


MAX_CHUNK_CHARS = 420
SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+(?=[\"'A-Z0-9])")
CLAUSE_SPLIT = re.compile(r"(?<=[,;:])\s+|\s+(?=(?:and|but|or|so|because)\b)", re.IGNORECASE)
SPACE_RE = re.compile(r"[ \t]+")
LINE_RE = re.compile(r"\n{3,}")


def normalize_text(text: str) -> str:
    """Normalize whitespace while preserving paragraph boundaries."""
    text = text.replace("\r\n", "\n").replace("\r", "\n")
    text = SPACE_RE.sub(" ", text)
    text = LINE_RE.sub("\n\n", text)
    return text.strip()


def chunk_text(text: str, max_chars: int = MAX_CHUNK_CHARS) -> List[str]:
    """Split text into TTS-sized chunks, preferring sentence and clause boundaries."""
    text = normalize_text(text)
    if not text:
        return []

    if len(text) <= max_chars:
        return [text]

    chunks = []
    sentences = SENTENCE_SPLIT.split(text)
    current_chunk = ""

    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue

        if len(current_chunk) + len(sentence) + 1 <= max_chars:
            current_chunk = f"{current_chunk} {sentence}".strip()
            continue

        if current_chunk:
            chunks.append(current_chunk)
            current_chunk = ""

        if len(sentence) <= max_chars:
            current_chunk = sentence
        else:
            clause_chunks = _split_at_clauses(sentence, max_chars)
            chunks.extend(clause_chunks[:-1])
            current_chunk = clause_chunks[-1] if clause_chunks else ""

    if current_chunk:
        chunks.append(current_chunk)

    return chunks


def _split_at_clauses(text: str, max_chars: int) -> List[str]:
    """Split a long sentence at clause boundaries, then word boundaries."""
    clauses = CLAUSE_SPLIT.split(text)
    chunks = []
    current = ""

    for clause in clauses:
        clause = clause.strip()
        if not clause:
            continue

        if len(current) + len(clause) + 1 <= max_chars:
            current = f"{current} {clause}".strip()
            continue

        if current:
            chunks.append(current)

        if len(clause) <= max_chars:
            current = clause
        else:
            hard_chunks = _hard_split(clause, max_chars)
            chunks.extend(hard_chunks[:-1])
            current = hard_chunks[-1] if hard_chunks else ""

    if current:
        chunks.append(current)

    return chunks


def _hard_split(text: str, max_chars: int) -> List[str]:
    """Last resort: split text at word boundaries."""
    words = text.split()
    chunks = []
    current = ""

    for word in words:
        if len(current) + len(word) + 1 <= max_chars:
            current = f"{current} {word}".strip()
            continue

        if current:
            chunks.append(current)

        if len(word) > max_chars:
            chunks.extend(word[i:i + max_chars] for i in range(0, len(word), max_chars))
            current = ""
        else:
            current = word

    if current:
        chunks.append(current)

    return chunks


def estimate_total_chunks(segments: list, max_chars: int = MAX_CHUNK_CHARS) -> int:
    """Estimate how many TTS chunks will be generated."""
    return sum(len(chunk_text(text, max_chars)) for _speaker_id, text in segments)