Spaces:
Sleeping
Sleeping
| """ | |
| Text Processing Module for PodXplainClone. | |
| Handles text cleanup, chunking, and chunk-count estimation for Kokoro TTS. | |
| """ | |
| import re | |
| from typing import List | |
| MAX_CHUNK_CHARS = 420 | |
| SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+(?=[\"'A-Z0-9])") | |
| CLAUSE_SPLIT = re.compile(r"(?<=[,;:])\s+|\s+(?=(?:and|but|or|so|because)\b)", re.IGNORECASE) | |
| SPACE_RE = re.compile(r"[ \t]+") | |
| LINE_RE = re.compile(r"\n{3,}") | |
| def normalize_text(text: str) -> str: | |
| """Normalize whitespace while preserving paragraph boundaries.""" | |
| text = text.replace("\r\n", "\n").replace("\r", "\n") | |
| text = SPACE_RE.sub(" ", text) | |
| text = LINE_RE.sub("\n\n", text) | |
| return text.strip() | |
| def chunk_text(text: str, max_chars: int = MAX_CHUNK_CHARS) -> List[str]: | |
| """Split text into TTS-sized chunks, preferring sentence and clause boundaries.""" | |
| text = normalize_text(text) | |
| if not text: | |
| return [] | |
| if len(text) <= max_chars: | |
| return [text] | |
| chunks = [] | |
| sentences = SENTENCE_SPLIT.split(text) | |
| current_chunk = "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| if len(current_chunk) + len(sentence) + 1 <= max_chars: | |
| current_chunk = f"{current_chunk} {sentence}".strip() | |
| continue | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = "" | |
| if len(sentence) <= max_chars: | |
| current_chunk = sentence | |
| else: | |
| clause_chunks = _split_at_clauses(sentence, max_chars) | |
| chunks.extend(clause_chunks[:-1]) | |
| current_chunk = clause_chunks[-1] if clause_chunks else "" | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| return chunks | |
| def _split_at_clauses(text: str, max_chars: int) -> List[str]: | |
| """Split a long sentence at clause boundaries, then word boundaries.""" | |
| clauses = CLAUSE_SPLIT.split(text) | |
| chunks = [] | |
| current = "" | |
| for clause in clauses: | |
| clause = clause.strip() | |
| if not clause: | |
| continue | |
| if len(current) + len(clause) + 1 <= max_chars: | |
| current = f"{current} {clause}".strip() | |
| continue | |
| if current: | |
| chunks.append(current) | |
| if len(clause) <= max_chars: | |
| current = clause | |
| else: | |
| hard_chunks = _hard_split(clause, max_chars) | |
| chunks.extend(hard_chunks[:-1]) | |
| current = hard_chunks[-1] if hard_chunks else "" | |
| if current: | |
| chunks.append(current) | |
| return chunks | |
| def _hard_split(text: str, max_chars: int) -> List[str]: | |
| """Last resort: split text at word boundaries.""" | |
| words = text.split() | |
| chunks = [] | |
| current = "" | |
| for word in words: | |
| if len(current) + len(word) + 1 <= max_chars: | |
| current = f"{current} {word}".strip() | |
| continue | |
| if current: | |
| chunks.append(current) | |
| if len(word) > max_chars: | |
| chunks.extend(word[i:i + max_chars] for i in range(0, len(word), max_chars)) | |
| current = "" | |
| else: | |
| current = word | |
| if current: | |
| chunks.append(current) | |
| return chunks | |
| def estimate_total_chunks(segments: list, max_chars: int = MAX_CHUNK_CHARS) -> int: | |
| """Estimate how many TTS chunks will be generated.""" | |
| return sum(len(chunk_text(text, max_chars)) for _speaker_id, text in segments) | |