|
|
""" |
|
|
sub200 - Ultra Low Latency TTS Hosting Server |
|
|
Supports multiple open-source TTS engines |
|
|
Optimized for Hugging Face Spaces with Gradio and zero GPU (H200 dynamic allocation) |
|
|
""" |
|
|
|
|
|
import os |
|
|
import subprocess |
|
|
import tempfile |
|
|
from typing import Optional |
|
|
import concurrent.futures |
|
|
import asyncio |
|
|
import gradio as gr |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
try: |
|
|
import spaces |
|
|
except ImportError: |
|
|
|
|
|
class spaces: |
|
|
@staticmethod |
|
|
def GPU(func): |
|
|
return func |
|
|
|
|
|
|
|
|
def check_engine_availability(): |
|
|
"""Check which TTS engines are available""" |
|
|
engines = { |
|
|
"piper": False, |
|
|
"coqui": False, |
|
|
"espeak": False, |
|
|
"gtts": False, |
|
|
"pyttsx3": False, |
|
|
"edge_tts": False |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
import piper |
|
|
models_dir = os.path.join(os.path.dirname(__file__), "models") |
|
|
if os.path.exists(models_dir): |
|
|
for file in os.listdir(models_dir): |
|
|
if file.endswith('.onnx'): |
|
|
engines["piper"] = True |
|
|
break |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
import TTS |
|
|
engines["coqui"] = True |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
result = subprocess.run(["espeak", "--version"], |
|
|
capture_output=True, |
|
|
timeout=2) |
|
|
engines["espeak"] = result.returncode == 0 |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
from gtts import gTTS |
|
|
engines["gtts"] = True |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
import pyttsx3 |
|
|
engines["pyttsx3"] = True |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
import edge_tts |
|
|
engines["edge_tts"] = True |
|
|
except: |
|
|
pass |
|
|
|
|
|
return engines |
|
|
|
|
|
def run_async_blocking(coro): |
|
|
"""Run async coroutine from sync context""" |
|
|
try: |
|
|
loop = asyncio.get_event_loop() |
|
|
if loop.is_running(): |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
future = executor.submit(asyncio.run, coro) |
|
|
return future.result() |
|
|
else: |
|
|
return loop.run_until_complete(coro) |
|
|
except RuntimeError: |
|
|
return asyncio.run(coro) |
|
|
|
|
|
def generate_audio_piper(text: str, speed: float = 1.0): |
|
|
"""Generate audio using Piper TTS""" |
|
|
try: |
|
|
import piper |
|
|
import soundfile as sf |
|
|
|
|
|
models_dir = os.path.join(os.path.dirname(__file__), "models") |
|
|
model_path = None |
|
|
|
|
|
if os.path.exists(models_dir): |
|
|
for file in os.listdir(models_dir): |
|
|
if file.endswith('.onnx'): |
|
|
model_path = os.path.join(models_dir, file) |
|
|
break |
|
|
|
|
|
if not model_path or not os.path.exists(model_path): |
|
|
raise FileNotFoundError("Piper model not found") |
|
|
|
|
|
piper_voice = piper.PiperVoice.load(model_path) |
|
|
|
|
|
|
|
|
audio_chunks = piper_voice.synthesize(text) |
|
|
|
|
|
|
|
|
audio_arrays = [] |
|
|
sample_rate = piper_voice.config.sample_rate |
|
|
|
|
|
for chunk in audio_chunks: |
|
|
|
|
|
audio_arrays.append(chunk.audio_float_array) |
|
|
|
|
|
if hasattr(chunk, 'sample_rate') and chunk.sample_rate: |
|
|
sample_rate = chunk.sample_rate |
|
|
|
|
|
|
|
|
if audio_arrays: |
|
|
audio_data_np = np.concatenate(audio_arrays) |
|
|
else: |
|
|
raise Exception("No audio chunks generated") |
|
|
|
|
|
|
|
|
if not isinstance(audio_data_np, np.ndarray): |
|
|
audio_data_np = np.array(audio_data_np, dtype=np.float32) |
|
|
|
|
|
|
|
|
if len(audio_data_np.shape) > 1: |
|
|
audio_data_np = audio_data_np.flatten() |
|
|
|
|
|
|
|
|
if audio_data_np.dtype != np.float32: |
|
|
audio_data_np = audio_data_np.astype(np.float32) |
|
|
|
|
|
return (sample_rate, audio_data_np) |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Piper TTS failed: {str(e)}") |
|
|
|
|
|
@spaces.GPU |
|
|
def generate_audio_coqui(text: str, speed: float = 1.0): |
|
|
"""Generate audio using Coqui TTS (GPU accelerated)""" |
|
|
try: |
|
|
from TTS.api import TTS |
|
|
import soundfile as sf |
|
|
|
|
|
models = [ |
|
|
"tts_models/en/ljspeech/tacotron2-DDC", |
|
|
"tts_models/en/ljspeech/glow-tts", |
|
|
"tts_models/en/vctk/vits", |
|
|
] |
|
|
|
|
|
tts = None |
|
|
for model in models: |
|
|
try: |
|
|
tts = TTS(model_name=model, progress_bar=False) |
|
|
break |
|
|
except: |
|
|
continue |
|
|
|
|
|
if tts is None: |
|
|
raise Exception("No Coqui TTS model available") |
|
|
|
|
|
wav = tts.tts(text=text) |
|
|
sample_rate = 22050 |
|
|
if hasattr(tts, 'synthesizer') and hasattr(tts.synthesizer, 'output_sample_rate'): |
|
|
sample_rate = tts.synthesizer.output_sample_rate |
|
|
|
|
|
|
|
|
if hasattr(wav, 'cpu'): |
|
|
wav = wav.cpu().numpy() |
|
|
elif hasattr(wav, 'numpy'): |
|
|
wav = wav.numpy() |
|
|
elif not isinstance(wav, np.ndarray): |
|
|
wav = np.array(wav, dtype=np.float32) |
|
|
|
|
|
|
|
|
if len(wav.shape) > 1: |
|
|
wav = wav.flatten() |
|
|
|
|
|
|
|
|
if wav.dtype != np.float32: |
|
|
wav = wav.astype(np.float32) |
|
|
|
|
|
return (sample_rate, wav) |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Coqui TTS failed: {str(e)}") |
|
|
|
|
|
def generate_audio_espeak(text: str, speed: float = 1.0): |
|
|
"""Generate audio using espeak""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_file: |
|
|
audio_file_path = audio_file.name |
|
|
|
|
|
try: |
|
|
cmd = ["espeak", "-s", str(int(150 * speed)), "-w", audio_file_path, text] |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
|
|
|
import soundfile as sf |
|
|
audio_data, sample_rate = sf.read(audio_file_path) |
|
|
|
|
|
|
|
|
if not isinstance(audio_data, np.ndarray): |
|
|
audio_data = np.array(audio_data, dtype=np.float32) |
|
|
|
|
|
|
|
|
if len(audio_data.shape) > 1: |
|
|
audio_data = audio_data.flatten() |
|
|
|
|
|
|
|
|
if audio_data.dtype != np.float32: |
|
|
audio_data = audio_data.astype(np.float32) |
|
|
|
|
|
return (sample_rate, audio_data) |
|
|
except Exception as e: |
|
|
raise Exception(f"eSpeak TTS failed: {str(e)}") |
|
|
finally: |
|
|
try: |
|
|
os.unlink(audio_file_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
def generate_audio_gtts(text: str, speed: float = 1.0): |
|
|
"""Generate audio using Google TTS""" |
|
|
try: |
|
|
from gtts import gTTS |
|
|
import io |
|
|
from pydub import AudioSegment |
|
|
|
|
|
tts = gTTS(text=text, lang='en', slow=False) |
|
|
audio_buffer = io.BytesIO() |
|
|
tts.write_to_fp(audio_buffer) |
|
|
audio_buffer.seek(0) |
|
|
|
|
|
|
|
|
audio = AudioSegment.from_mp3(audio_buffer) |
|
|
wav_buffer = io.BytesIO() |
|
|
audio.export(wav_buffer, format="wav") |
|
|
wav_buffer.seek(0) |
|
|
|
|
|
import soundfile as sf |
|
|
audio_data, sample_rate = sf.read(wav_buffer) |
|
|
|
|
|
|
|
|
if not isinstance(audio_data, np.ndarray): |
|
|
audio_data = np.array(audio_data, dtype=np.float32) |
|
|
|
|
|
|
|
|
if len(audio_data.shape) > 1: |
|
|
audio_data = audio_data.flatten() |
|
|
|
|
|
|
|
|
if audio_data.dtype != np.float32: |
|
|
audio_data = audio_data.astype(np.float32) |
|
|
|
|
|
return (sample_rate, audio_data) |
|
|
except Exception as e: |
|
|
raise Exception(f"gTTS failed: {str(e)}") |
|
|
|
|
|
def generate_audio_pyttsx3(text: str, speed: float = 1.0): |
|
|
"""Generate audio using pyttsx3""" |
|
|
try: |
|
|
import pyttsx3 |
|
|
|
|
|
engine = pyttsx3.init() |
|
|
engine.setProperty('rate', int(150 * speed)) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_file: |
|
|
audio_file_path = audio_file.name |
|
|
|
|
|
engine.save_to_file(text, audio_file_path) |
|
|
engine.runAndWait() |
|
|
|
|
|
import soundfile as sf |
|
|
audio_data, sample_rate = sf.read(audio_file_path) |
|
|
|
|
|
|
|
|
if not isinstance(audio_data, np.ndarray): |
|
|
audio_data = np.array(audio_data, dtype=np.float32) |
|
|
|
|
|
|
|
|
if len(audio_data.shape) > 1: |
|
|
audio_data = audio_data.flatten() |
|
|
|
|
|
|
|
|
if audio_data.dtype != np.float32: |
|
|
audio_data = audio_data.astype(np.float32) |
|
|
|
|
|
os.unlink(audio_file_path) |
|
|
return (sample_rate, audio_data) |
|
|
except Exception as e: |
|
|
raise Exception(f"pyttsx3 failed: {str(e)}") |
|
|
|
|
|
def generate_audio_edge_tts(text: str, speed: float = 1.0): |
|
|
"""Generate audio using Edge TTS""" |
|
|
try: |
|
|
import edge_tts |
|
|
|
|
|
async def generate(): |
|
|
voices = await edge_tts.list_voices() |
|
|
voice_obj = next((v for v in voices if v['Locale'].startswith('en')), None) |
|
|
if voice_obj: |
|
|
voice = voice_obj['ShortName'] |
|
|
else: |
|
|
voice = "en-US-AriaNeural" |
|
|
|
|
|
communicate = edge_tts.Communicate(text, voice, rate=f"+{int((speed - 1) * 100)}%") |
|
|
audio_data = b"" |
|
|
async for chunk in communicate.stream(): |
|
|
if chunk["type"] == "audio": |
|
|
audio_data += chunk["data"] |
|
|
return audio_data |
|
|
|
|
|
audio_data = run_async_blocking(generate()) |
|
|
|
|
|
|
|
|
import io |
|
|
from pydub import AudioSegment |
|
|
|
|
|
audio = AudioSegment.from_mp3(io.BytesIO(audio_data)) |
|
|
wav_buffer = io.BytesIO() |
|
|
audio.export(wav_buffer, format="wav") |
|
|
wav_buffer.seek(0) |
|
|
|
|
|
import soundfile as sf |
|
|
audio_array, sample_rate = sf.read(wav_buffer) |
|
|
|
|
|
|
|
|
if not isinstance(audio_array, np.ndarray): |
|
|
audio_array = np.array(audio_array, dtype=np.float32) |
|
|
|
|
|
|
|
|
if len(audio_array.shape) > 1: |
|
|
audio_array = audio_array.flatten() |
|
|
|
|
|
|
|
|
if audio_array.dtype != np.float32: |
|
|
audio_array = audio_array.astype(np.float32) |
|
|
|
|
|
return (sample_rate, audio_array) |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Edge TTS failed: {str(e)}") |
|
|
|
|
|
def generate_speech(text: str, engine: str, speed: float = 1.0): |
|
|
"""Main function to generate speech from text""" |
|
|
if not text or not text.strip(): |
|
|
return None, "Please enter some text" |
|
|
|
|
|
engines_status = check_engine_availability() |
|
|
|
|
|
if not engines_status.get(engine, False): |
|
|
available = [e for e, v in engines_status.items() if v] |
|
|
if not available: |
|
|
return None, "No TTS engines available" |
|
|
engine = available[0] |
|
|
|
|
|
try: |
|
|
if engine == "piper": |
|
|
sample_rate, audio_data = generate_audio_piper(text, speed) |
|
|
elif engine == "coqui": |
|
|
sample_rate, audio_data = generate_audio_coqui(text, speed) |
|
|
elif engine == "gtts": |
|
|
sample_rate, audio_data = generate_audio_gtts(text, speed) |
|
|
elif engine == "pyttsx3": |
|
|
sample_rate, audio_data = generate_audio_pyttsx3(text, speed) |
|
|
elif engine == "edge_tts": |
|
|
sample_rate, audio_data = generate_audio_edge_tts(text, speed) |
|
|
else: |
|
|
sample_rate, audio_data = generate_audio_espeak(text, speed) |
|
|
|
|
|
|
|
|
if not isinstance(audio_data, np.ndarray): |
|
|
audio_data = np.array(audio_data, dtype=np.float32) |
|
|
|
|
|
|
|
|
if len(audio_data.shape) > 1: |
|
|
audio_data = audio_data.flatten() |
|
|
|
|
|
|
|
|
max_val = np.max(np.abs(audio_data)) |
|
|
if max_val > 1.0: |
|
|
audio_data = audio_data / max_val |
|
|
|
|
|
|
|
|
if not isinstance(audio_data, np.ndarray): |
|
|
audio_data = np.array(audio_data, dtype=np.float32) |
|
|
|
|
|
|
|
|
import tempfile |
|
|
import soundfile as sf |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp: |
|
|
tmp_path = tmp.name |
|
|
|
|
|
sf.write(tmp_path, audio_data, int(sample_rate)) |
|
|
|
|
|
|
|
|
return tmp_path, None |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
engines_status = check_engine_availability() |
|
|
available_engines = [e for e, v in engines_status.items() if v] |
|
|
|
|
|
if not available_engines: |
|
|
available_engines = ["espeak"] |
|
|
|
|
|
|
|
|
with gr.Blocks(title="sub200 - Ultra Low Latency TTS", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown(""" |
|
|
# 🎙️ sub200 - Ultra Low Latency Text-to-Speech |
|
|
|
|
|
Host different open source TTS engines with ultra low latency. Supports GPU acceleration for high-quality neural TTS. |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
text_input = gr.Textbox( |
|
|
label="Enter text to convert", |
|
|
placeholder="Type or paste your text here...", |
|
|
lines=5, |
|
|
value="" |
|
|
) |
|
|
with gr.Column(scale=1): |
|
|
engine_select = gr.Dropdown( |
|
|
label="TTS Engine", |
|
|
choices=available_engines, |
|
|
value=available_engines[0] if available_engines else "espeak", |
|
|
info="Select the TTS engine to use" |
|
|
) |
|
|
speed_slider = gr.Slider( |
|
|
label="Speed", |
|
|
minimum=0.5, |
|
|
maximum=2.0, |
|
|
value=1.0, |
|
|
step=0.1, |
|
|
info="Speech speed multiplier" |
|
|
) |
|
|
|
|
|
generate_btn = gr.Button("Generate Speech", variant="primary", size="lg") |
|
|
|
|
|
audio_output = gr.Audio(label="Generated Audio", type="filepath", autoplay=True) |
|
|
error_output = gr.Textbox(label="Status", visible=True) |
|
|
|
|
|
|
|
|
with gr.Accordion("Engine Status", open=False): |
|
|
status_text = "\n".join([ |
|
|
f"**{engine}**: {'✓ Available' if engines_status.get(engine, False) else '✗ Not Available'}" |
|
|
for engine in ["piper", "coqui", "espeak", "gtts", "pyttsx3", "edge_tts"] |
|
|
]) |
|
|
gr.Markdown(status_text) |
|
|
|
|
|
|
|
|
generate_btn.click( |
|
|
fn=generate_speech, |
|
|
inputs=[text_input, engine_select, speed_slider], |
|
|
outputs=[audio_output, error_output] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
import download_models |
|
|
download_models.download_piper_model() |
|
|
except: |
|
|
pass |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
port = int(os.getenv("PORT", 8000)) |
|
|
demo.launch(server_name="0.0.0.0", server_port=port, share=False) |
|
|
|