""" Text Processing Module for PodXplainClone. Handles text cleanup, chunking, and chunk-count estimation for Kokoro TTS. """ import re from typing import List MAX_CHUNK_CHARS = 420 SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+(?=[\"'A-Z0-9])") CLAUSE_SPLIT = re.compile(r"(?<=[,;:])\s+|\s+(?=(?:and|but|or|so|because)\b)", re.IGNORECASE) SPACE_RE = re.compile(r"[ \t]+") LINE_RE = re.compile(r"\n{3,}") def normalize_text(text: str) -> str: """Normalize whitespace while preserving paragraph boundaries.""" text = text.replace("\r\n", "\n").replace("\r", "\n") text = SPACE_RE.sub(" ", text) text = LINE_RE.sub("\n\n", text) return text.strip() def chunk_text(text: str, max_chars: int = MAX_CHUNK_CHARS) -> List[str]: """Split text into TTS-sized chunks, preferring sentence and clause boundaries.""" text = normalize_text(text) if not text: return [] if len(text) <= max_chars: return [text] chunks = [] sentences = SENTENCE_SPLIT.split(text) current_chunk = "" for sentence in sentences: sentence = sentence.strip() if not sentence: continue if len(current_chunk) + len(sentence) + 1 <= max_chars: current_chunk = f"{current_chunk} {sentence}".strip() continue if current_chunk: chunks.append(current_chunk) current_chunk = "" if len(sentence) <= max_chars: current_chunk = sentence else: clause_chunks = _split_at_clauses(sentence, max_chars) chunks.extend(clause_chunks[:-1]) current_chunk = clause_chunks[-1] if clause_chunks else "" if current_chunk: chunks.append(current_chunk) return chunks def _split_at_clauses(text: str, max_chars: int) -> List[str]: """Split a long sentence at clause boundaries, then word boundaries.""" clauses = CLAUSE_SPLIT.split(text) chunks = [] current = "" for clause in clauses: clause = clause.strip() if not clause: continue if len(current) + len(clause) + 1 <= max_chars: current = f"{current} {clause}".strip() continue if current: chunks.append(current) if len(clause) <= max_chars: current = clause else: hard_chunks = _hard_split(clause, max_chars) chunks.extend(hard_chunks[:-1]) current = hard_chunks[-1] if hard_chunks else "" if current: chunks.append(current) return chunks def _hard_split(text: str, max_chars: int) -> List[str]: """Last resort: split text at word boundaries.""" words = text.split() chunks = [] current = "" for word in words: if len(current) + len(word) + 1 <= max_chars: current = f"{current} {word}".strip() continue if current: chunks.append(current) if len(word) > max_chars: chunks.extend(word[i:i + max_chars] for i in range(0, len(word), max_chars)) current = "" else: current = word if current: chunks.append(current) return chunks def estimate_total_chunks(segments: list, max_chars: int = MAX_CHUNK_CHARS) -> int: """Estimate how many TTS chunks will be generated.""" return sum(len(chunk_text(text, max_chars)) for _speaker_id, text in segments)