Spaces:
Sleeping
Sleeping
| """ | |
| TTS Engine Router — routes synthesis to local models or YourVoic API. | |
| """ | |
| import os | |
| import io | |
| import time | |
| import tempfile | |
| import requests | |
| import numpy as np | |
| import soundfile as sf | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| YOURVOIC_API_KEY = os.environ.get("YOURVOIC_API_KEY", "") | |
| YOURVOIC_STREAM_URL = "https://yourvoic.com/api/v1/tts/stream" | |
| def synthesize_yourvoic(text, language_code, voice="Peter", speed=1.0): | |
| """ | |
| Synthesize text using YourVoic API. | |
| Returns (audio_array, sample_rate) or raises on failure. | |
| """ | |
| if not YOURVOIC_API_KEY: | |
| raise RuntimeError( | |
| "YOURVOIC_API_KEY not set. Add it as a Space secret." | |
| ) | |
| headers = { | |
| "X-API-Key": YOURVOIC_API_KEY, | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "text": text, | |
| "voice": voice, | |
| "language": language_code, | |
| "model": "aura-prime", | |
| "speed": speed, | |
| } | |
| t0 = time.time() | |
| response = requests.post( | |
| YOURVOIC_STREAM_URL, | |
| headers=headers, | |
| json=payload, | |
| stream=True, | |
| timeout=60, | |
| ) | |
| if response.status_code != 200: | |
| raise RuntimeError( | |
| f"YourVoic API error {response.status_code}: {response.text[:200]}" | |
| ) | |
| # Collect streamed audio bytes | |
| audio_bytes = io.BytesIO() | |
| for chunk in response.iter_content(chunk_size=8192): | |
| audio_bytes.write(chunk) | |
| audio_bytes.seek(0) | |
| elapsed = time.time() - t0 | |
| logger.info(f"YourVoic TTS: {len(text)} chars, {elapsed:.2f}s") | |
| # Read audio from WAV bytes | |
| audio_array, sample_rate = sf.read(audio_bytes, dtype="float32") | |
| return audio_array, sample_rate | |
| def synthesize_yourvoic_to_file(text, language_code, output_path, voice="Peter", speed=1.0): | |
| """Synthesize via YourVoic and save to file.""" | |
| audio, sr = synthesize_yourvoic(text, language_code, voice, speed) | |
| sf.write(output_path, audio, sr) | |
| return output_path, sr | |
| def synthesize_local(text, tts_pipe): | |
| """ | |
| Synthesize text using local HuggingFace TTS pipeline (MMS-TTS). | |
| Returns (audio_array, sample_rate). | |
| """ | |
| t0 = time.time() | |
| result = tts_pipe(text) | |
| audio = np.array(result["audio"]).squeeze() | |
| sr = result["sampling_rate"] | |
| elapsed = time.time() - t0 | |
| logger.info(f"Local TTS: {len(text)} chars, {elapsed:.2f}s, {len(audio)/sr:.1f}s audio") | |
| return audio, sr | |
| def synthesize_chunked(text, language_config, tts_pipe=None, sentences_per_chunk=2): | |
| """ | |
| Synthesize long text by chunking into sentence groups. | |
| Routes to either YourVoic or local TTS based on language config. | |
| Args: | |
| text: Full text to synthesize | |
| language_config: Dict from LANGUAGES (has tts_engine, yourvoic_lang, etc.) | |
| tts_pipe: Local HuggingFace TTS pipeline (needed for local engine) | |
| sentences_per_chunk: How many sentences to synthesize per API call | |
| Returns: | |
| (audio_array, sample_rate) | |
| """ | |
| import re | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if not sentences: | |
| return np.array([], dtype=np.float32), 16000 | |
| engine = language_config["tts_engine"] | |
| audio_segments = [] | |
| output_sr = None | |
| for i in range(0, len(sentences), sentences_per_chunk): | |
| chunk_text = ' '.join(sentences[i:i + sentences_per_chunk]) | |
| if not chunk_text: | |
| continue | |
| try: | |
| if engine == "yourvoic": | |
| voice = language_config["yourvoic_voices"][0] if language_config["yourvoic_voices"] else "Peter" | |
| lang_code = language_config["yourvoic_lang"] | |
| audio_seg, seg_sr = synthesize_yourvoic(chunk_text, lang_code, voice) | |
| else: | |
| if tts_pipe is None: | |
| raise RuntimeError("Local TTS pipeline not loaded") | |
| audio_seg, seg_sr = synthesize_local(chunk_text, tts_pipe) | |
| if output_sr is None: | |
| output_sr = seg_sr | |
| if len(audio_seg) > 0: | |
| audio_segments.append(audio_seg) | |
| # Small silence between chunks | |
| silence = np.zeros(int(0.15 * seg_sr), dtype=np.float32) | |
| audio_segments.append(silence) | |
| except Exception as e: | |
| logger.error(f"TTS chunk failed: {e}") | |
| continue | |
| if not audio_segments: | |
| return np.array([], dtype=np.float32), output_sr or 16000 | |
| return np.concatenate(audio_segments), output_sr | |