""" sub200 - Ultra Low Latency TTS Hosting Server Supports multiple open-source TTS engines Optimized for Hugging Face Spaces with Gradio and zero GPU (H200 dynamic allocation) """ import os import subprocess import tempfile from typing import Optional import concurrent.futures import asyncio import gradio as gr import numpy as np # Import spaces for GPU decorator try: import spaces except ImportError: # Fallback if spaces not available (local development) class spaces: @staticmethod def GPU(func): return func # Import TTS engines def check_engine_availability(): """Check which TTS engines are available""" engines = { "piper": False, "coqui": False, "espeak": False, "gtts": False, "pyttsx3": False, "edge_tts": False } # Check piper try: import piper models_dir = os.path.join(os.path.dirname(__file__), "models") if os.path.exists(models_dir): for file in os.listdir(models_dir): if file.endswith('.onnx'): engines["piper"] = True break except: pass # Check coqui try: import TTS engines["coqui"] = True except: pass # Check espeak try: result = subprocess.run(["espeak", "--version"], capture_output=True, timeout=2) engines["espeak"] = result.returncode == 0 except: pass # Check gTTS try: from gtts import gTTS engines["gtts"] = True except: pass # Check pyttsx3 try: import pyttsx3 engines["pyttsx3"] = True except: pass # Check edge_tts try: import edge_tts engines["edge_tts"] = True except: pass return engines def run_async_blocking(coro): """Run async coroutine from sync context""" try: loop = asyncio.get_event_loop() if loop.is_running(): # Run in thread with new event loop with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, coro) return future.result() else: return loop.run_until_complete(coro) except RuntimeError: return asyncio.run(coro) def generate_audio_piper(text: str, speed: float = 1.0): """Generate audio using Piper TTS""" try: import piper import soundfile as sf models_dir = os.path.join(os.path.dirname(__file__), "models") model_path = None if os.path.exists(models_dir): for file in os.listdir(models_dir): if file.endswith('.onnx'): model_path = os.path.join(models_dir, file) break if not model_path or not os.path.exists(model_path): raise FileNotFoundError("Piper model not found") piper_voice = piper.PiperVoice.load(model_path) # synthesize() returns an iterable of AudioChunk objects audio_chunks = piper_voice.synthesize(text) # Collect all audio chunks and concatenate them audio_arrays = [] sample_rate = piper_voice.config.sample_rate for chunk in audio_chunks: # Each chunk has an audio_float_array property audio_arrays.append(chunk.audio_float_array) # Use sample_rate from first chunk if available if hasattr(chunk, 'sample_rate') and chunk.sample_rate: sample_rate = chunk.sample_rate # Concatenate all chunks into a single array if audio_arrays: audio_data_np = np.concatenate(audio_arrays) else: raise Exception("No audio chunks generated") # Ensure it's a numpy array and float32 if not isinstance(audio_data_np, np.ndarray): audio_data_np = np.array(audio_data_np, dtype=np.float32) # Ensure audio is 1D (mono) if len(audio_data_np.shape) > 1: audio_data_np = audio_data_np.flatten() # Convert to float32 if needed if audio_data_np.dtype != np.float32: audio_data_np = audio_data_np.astype(np.float32) return (sample_rate, audio_data_np) except Exception as e: raise Exception(f"Piper TTS failed: {str(e)}") @spaces.GPU def generate_audio_coqui(text: str, speed: float = 1.0): """Generate audio using Coqui TTS (GPU accelerated)""" try: from TTS.api import TTS import soundfile as sf models = [ "tts_models/en/ljspeech/tacotron2-DDC", "tts_models/en/ljspeech/glow-tts", "tts_models/en/vctk/vits", ] tts = None for model in models: try: tts = TTS(model_name=model, progress_bar=False) break except: continue if tts is None: raise Exception("No Coqui TTS model available") wav = tts.tts(text=text) sample_rate = 22050 if hasattr(tts, 'synthesizer') and hasattr(tts.synthesizer, 'output_sample_rate'): sample_rate = tts.synthesizer.output_sample_rate # Convert to numpy array if it's a tensor or list if hasattr(wav, 'cpu'): # PyTorch tensor wav = wav.cpu().numpy() elif hasattr(wav, 'numpy'): # TensorFlow tensor wav = wav.numpy() elif not isinstance(wav, np.ndarray): wav = np.array(wav, dtype=np.float32) # Ensure audio is 1D (mono) and float32 if len(wav.shape) > 1: wav = wav.flatten() # Convert to float32 if needed if wav.dtype != np.float32: wav = wav.astype(np.float32) return (sample_rate, wav) except Exception as e: raise Exception(f"Coqui TTS failed: {str(e)}") def generate_audio_espeak(text: str, speed: float = 1.0): """Generate audio using espeak""" with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_file: audio_file_path = audio_file.name try: cmd = ["espeak", "-s", str(int(150 * speed)), "-w", audio_file_path, text] subprocess.run(cmd, check=True, capture_output=True) import soundfile as sf audio_data, sample_rate = sf.read(audio_file_path) # Ensure it's a numpy array and float32 if not isinstance(audio_data, np.ndarray): audio_data = np.array(audio_data, dtype=np.float32) # Ensure audio is 1D (mono) if len(audio_data.shape) > 1: audio_data = audio_data.flatten() # Convert to float32 if needed if audio_data.dtype != np.float32: audio_data = audio_data.astype(np.float32) return (sample_rate, audio_data) except Exception as e: raise Exception(f"eSpeak TTS failed: {str(e)}") finally: try: os.unlink(audio_file_path) except: pass def generate_audio_gtts(text: str, speed: float = 1.0): """Generate audio using Google TTS""" try: from gtts import gTTS import io from pydub import AudioSegment tts = gTTS(text=text, lang='en', slow=False) audio_buffer = io.BytesIO() tts.write_to_fp(audio_buffer) audio_buffer.seek(0) # Convert MP3 to WAV audio = AudioSegment.from_mp3(audio_buffer) wav_buffer = io.BytesIO() audio.export(wav_buffer, format="wav") wav_buffer.seek(0) import soundfile as sf audio_data, sample_rate = sf.read(wav_buffer) # Ensure it's a numpy array and float32 if not isinstance(audio_data, np.ndarray): audio_data = np.array(audio_data, dtype=np.float32) # Ensure audio is 1D (mono) if len(audio_data.shape) > 1: audio_data = audio_data.flatten() # Convert to float32 if needed if audio_data.dtype != np.float32: audio_data = audio_data.astype(np.float32) return (sample_rate, audio_data) except Exception as e: raise Exception(f"gTTS failed: {str(e)}") def generate_audio_pyttsx3(text: str, speed: float = 1.0): """Generate audio using pyttsx3""" try: import pyttsx3 engine = pyttsx3.init() engine.setProperty('rate', int(150 * speed)) with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_file: audio_file_path = audio_file.name engine.save_to_file(text, audio_file_path) engine.runAndWait() import soundfile as sf audio_data, sample_rate = sf.read(audio_file_path) # Ensure it's a numpy array and float32 if not isinstance(audio_data, np.ndarray): audio_data = np.array(audio_data, dtype=np.float32) # Ensure audio is 1D (mono) if len(audio_data.shape) > 1: audio_data = audio_data.flatten() # Convert to float32 if needed if audio_data.dtype != np.float32: audio_data = audio_data.astype(np.float32) os.unlink(audio_file_path) return (sample_rate, audio_data) except Exception as e: raise Exception(f"pyttsx3 failed: {str(e)}") def generate_audio_edge_tts(text: str, speed: float = 1.0): """Generate audio using Edge TTS""" try: import edge_tts async def generate(): voices = await edge_tts.list_voices() voice_obj = next((v for v in voices if v['Locale'].startswith('en')), None) if voice_obj: voice = voice_obj['ShortName'] else: voice = "en-US-AriaNeural" communicate = edge_tts.Communicate(text, voice, rate=f"+{int((speed - 1) * 100)}%") audio_data = b"" async for chunk in communicate.stream(): if chunk["type"] == "audio": audio_data += chunk["data"] return audio_data audio_data = run_async_blocking(generate()) # Convert MP3 bytes to numpy array import io from pydub import AudioSegment audio = AudioSegment.from_mp3(io.BytesIO(audio_data)) wav_buffer = io.BytesIO() audio.export(wav_buffer, format="wav") wav_buffer.seek(0) import soundfile as sf audio_array, sample_rate = sf.read(wav_buffer) # Ensure it's a numpy array and float32 if not isinstance(audio_array, np.ndarray): audio_array = np.array(audio_array, dtype=np.float32) # Ensure audio is 1D (mono) if len(audio_array.shape) > 1: audio_array = audio_array.flatten() # Convert to float32 if needed if audio_array.dtype != np.float32: audio_array = audio_array.astype(np.float32) return (sample_rate, audio_array) except Exception as e: raise Exception(f"Edge TTS failed: {str(e)}") def generate_speech(text: str, engine: str, speed: float = 1.0): """Main function to generate speech from text""" if not text or not text.strip(): return None, "Please enter some text" engines_status = check_engine_availability() if not engines_status.get(engine, False): available = [e for e, v in engines_status.items() if v] if not available: return None, "No TTS engines available" engine = available[0] # Fallback to first available try: if engine == "piper": sample_rate, audio_data = generate_audio_piper(text, speed) elif engine == "coqui": sample_rate, audio_data = generate_audio_coqui(text, speed) elif engine == "gtts": sample_rate, audio_data = generate_audio_gtts(text, speed) elif engine == "pyttsx3": sample_rate, audio_data = generate_audio_pyttsx3(text, speed) elif engine == "edge_tts": sample_rate, audio_data = generate_audio_edge_tts(text, speed) else: # espeak sample_rate, audio_data = generate_audio_espeak(text, speed) # Ensure audio_data is a numpy array (not a list) if not isinstance(audio_data, np.ndarray): audio_data = np.array(audio_data, dtype=np.float32) # Ensure audio is 1D (mono) if len(audio_data.shape) > 1: audio_data = audio_data.flatten() # Normalize audio to [-1, 1] range if needed max_val = np.max(np.abs(audio_data)) if max_val > 1.0: audio_data = audio_data / max_val # Ensure it's still a numpy array after normalization if not isinstance(audio_data, np.ndarray): audio_data = np.array(audio_data, dtype=np.float32) # Save to temporary file for Gradio Audio component import tempfile import soundfile as sf with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp: tmp_path = tmp.name sf.write(tmp_path, audio_data, int(sample_rate)) # Return file path for Gradio Audio component return tmp_path, None except Exception as e: return None, f"Error: {str(e)}" # Create Gradio interface engines_status = check_engine_availability() available_engines = [e for e, v in engines_status.items() if v] if not available_engines: available_engines = ["espeak"] # Fallback # Create Gradio interface with gr.Blocks(title="sub200 - Ultra Low Latency TTS", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🎙️ sub200 - Ultra Low Latency Text-to-Speech Host different open source TTS engines with ultra low latency. Supports GPU acceleration for high-quality neural TTS. """) with gr.Row(): with gr.Column(scale=2): text_input = gr.Textbox( label="Enter text to convert", placeholder="Type or paste your text here...", lines=5, value="" ) with gr.Column(scale=1): engine_select = gr.Dropdown( label="TTS Engine", choices=available_engines, value=available_engines[0] if available_engines else "espeak", info="Select the TTS engine to use" ) speed_slider = gr.Slider( label="Speed", minimum=0.5, maximum=2.0, value=1.0, step=0.1, info="Speech speed multiplier" ) generate_btn = gr.Button("Generate Speech", variant="primary", size="lg") audio_output = gr.Audio(label="Generated Audio", type="filepath", autoplay=True) error_output = gr.Textbox(label="Status", visible=True) # Engine status with gr.Accordion("Engine Status", open=False): status_text = "\n".join([ f"**{engine}**: {'✓ Available' if engines_status.get(engine, False) else '✗ Not Available'}" for engine in ["piper", "coqui", "espeak", "gtts", "pyttsx3", "edge_tts"] ]) gr.Markdown(status_text) # Connect the function generate_btn.click( fn=generate_speech, inputs=[text_input, engine_select, speed_slider], outputs=[audio_output, error_output] ) # Auto-generate on text change (optional) # text_input.submit( # fn=generate_speech, # inputs=[text_input, engine_select, speed_slider], # outputs=[audio_output, error_output] # ) # Try to download Piper models if not present try: import download_models download_models.download_piper_model() except: pass if __name__ == "__main__": # Get port from environment (Hugging Face Spaces uses 7860, local uses 8000) port = int(os.getenv("PORT", 8000)) demo.launch(server_name="0.0.0.0", server_port=port, share=False)