Spaces:
Sleeping
Sleeping
| """ | |
| Abstract base class for TTS backends. | |
| All TTS backends must implement this interface to be compatible with the engine. | |
| """ | |
| import re | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| import numpy as np | |
| def split_into_sentences(text: str, max_chars: int = 250) -> list[str]: | |
| """ | |
| Split text into sentences for better TTS quality on long texts. | |
| Args: | |
| text: Input text to split | |
| max_chars: Maximum characters per chunk (default: 250) | |
| Returns: | |
| List of text chunks, each suitable for TTS generation | |
| """ | |
| if len(text) <= max_chars: | |
| return [text] | |
| # Sentence-ending punctuation patterns | |
| # Handles: . ! ? and their equivalents in other languages | |
| sentence_enders = r"(?<=[.!?。?!،؟])\s+" | |
| # Split by sentence endings | |
| sentences = re.split(sentence_enders, text) | |
| # Merge short sentences and split long ones | |
| chunks = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # If sentence itself is too long, split by commas or other breaks | |
| if len(sentence) > max_chars: | |
| # Try splitting by comma, semicolon, or dash | |
| sub_parts = re.split(r"(?<=[,;:،–—])\s+", sentence) | |
| for part in sub_parts: | |
| part = part.strip() | |
| if not part: | |
| continue | |
| if len(current_chunk) + len(part) + 1 <= max_chars: | |
| current_chunk = f"{current_chunk} {part}".strip() | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = part | |
| elif len(current_chunk) + len(sentence) + 1 <= max_chars: | |
| current_chunk = f"{current_chunk} {sentence}".strip() | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = sentence | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| return chunks if chunks else [text] | |
| class TTSResult: | |
| """Result from TTS generation.""" | |
| audio: np.ndarray # Audio waveform as numpy array | |
| sample_rate: int # Sample rate in Hz | |
| def to_int16(self) -> np.ndarray: | |
| """Convert audio to 16-bit integer format.""" | |
| audio = self.audio | |
| if audio.dtype == np.float32 or audio.dtype == np.float64: | |
| audio = np.clip(audio, -1.0, 1.0) | |
| audio = (audio * 32767).astype(np.int16) | |
| return audio | |
| class BackendConfig: | |
| """Configuration for TTS backends.""" | |
| device: str = "auto" # "auto", "cuda", "mps", "cpu" | |
| def resolve_device(self) -> str: | |
| """Resolve 'auto' to the best available device.""" | |
| if self.device != "auto": | |
| return self.device | |
| import torch | |
| if torch.cuda.is_available(): | |
| return "cuda" | |
| elif torch.backends.mps.is_available(): | |
| return "mps" | |
| return "cpu" | |
| class TTSBackend(ABC): | |
| """ | |
| Abstract base class for TTS backends. | |
| To create a new backend: | |
| 1. Inherit from this class | |
| 2. Implement all abstract methods | |
| 3. Register the backend in the engine | |
| """ | |
| def __init__(self, config: Optional[BackendConfig] = None): | |
| self.config = config or BackendConfig() | |
| self._is_loaded = False | |
| def name(self) -> str: | |
| """Human-readable name of the backend.""" | |
| pass | |
| def supports_voice_cloning(self) -> bool: | |
| """Whether this backend supports voice cloning from audio.""" | |
| pass | |
| def supported_languages(self) -> dict[str, str]: | |
| """ | |
| Dictionary of supported language codes to language names. | |
| Example: {"en": "English", "de": "German"} | |
| """ | |
| pass | |
| def is_loaded(self) -> bool: | |
| """Whether the backend model is loaded and ready.""" | |
| return self._is_loaded | |
| def load(self) -> None: | |
| """ | |
| Load the model and prepare for inference. | |
| Should set self._is_loaded = True when complete. | |
| """ | |
| pass | |
| def unload(self) -> None: | |
| """ | |
| Unload the model to free memory. | |
| Should set self._is_loaded = False when complete. | |
| """ | |
| pass | |
| def generate( | |
| self, | |
| text: str, | |
| language: str = "de", | |
| voice_audio_path: Optional[str] = None, | |
| **kwargs, | |
| ) -> TTSResult: | |
| """ | |
| Generate speech from text. | |
| Args: | |
| text: The text to synthesize | |
| language: Language code (e.g., "de", "en") | |
| voice_audio_path: Optional path to reference audio for voice cloning | |
| **kwargs: Backend-specific parameters | |
| Returns: | |
| TTSResult containing audio waveform and sample rate | |
| """ | |
| pass | |
| def generate_long( | |
| self, | |
| text: str, | |
| language: str = "de", | |
| voice_audio_path: Optional[str] = None, | |
| max_chars_per_chunk: int = 250, | |
| silence_between_ms: int = 300, | |
| **kwargs, | |
| ) -> "TTSResult": | |
| """ | |
| Generate speech from long text by splitting into sentences. | |
| Args: | |
| text: The text to synthesize (can be long) | |
| language: Language code (e.g., "de", "en") | |
| voice_audio_path: Optional path to reference audio for voice cloning | |
| max_chars_per_chunk: Maximum characters per chunk (default: 250) | |
| silence_between_ms: Silence between chunks in milliseconds (default: 300) | |
| **kwargs: Backend-specific parameters | |
| Returns: | |
| TTSResult containing concatenated audio waveform and sample rate | |
| """ | |
| from loguru import logger | |
| chunks = split_into_sentences(text, max_chars_per_chunk) | |
| if len(chunks) == 1: | |
| return self.generate(text, language, voice_audio_path, **kwargs) | |
| logger.info(f"Splitting text into {len(chunks)} chunks for generation") | |
| audio_segments = [] | |
| sample_rate = None | |
| for i, chunk in enumerate(chunks): | |
| logger.debug(f"Generating chunk {i+1}/{len(chunks)}: '{chunk[:50]}...'") | |
| result = self.generate(chunk, language, voice_audio_path, **kwargs) | |
| audio_segments.append(result.audio) | |
| if sample_rate is None: | |
| sample_rate = result.sample_rate | |
| # Add silence between chunks (except after last) | |
| if i < len(chunks) - 1 and silence_between_ms > 0: | |
| silence_samples = int(sample_rate * silence_between_ms / 1000) | |
| silence = np.zeros(silence_samples, dtype=result.audio.dtype) | |
| audio_segments.append(silence) | |
| # Concatenate all segments | |
| combined_audio = np.concatenate(audio_segments) | |
| return TTSResult(audio=combined_audio, sample_rate=sample_rate) | |
| def __repr__(self) -> str: | |
| status = "loaded" if self._is_loaded else "not loaded" | |
| return f"{self.__class__.__name__}(name='{self.name}', status={status})" | |