sub200 / app.py
Revrse's picture
Upload 3 files
65621f7 verified
"""
sub200 - Ultra Low Latency TTS Hosting Server
Supports multiple open-source TTS engines
Optimized for Hugging Face Spaces with Gradio and zero GPU (H200 dynamic allocation)
"""
import os
import subprocess
import tempfile
from typing import Optional
import concurrent.futures
import asyncio
import gradio as gr
import numpy as np
# Import spaces for GPU decorator
try:
import spaces
except ImportError:
# Fallback if spaces not available (local development)
class spaces:
@staticmethod
def GPU(func):
return func
# Import TTS engines
def check_engine_availability():
"""Check which TTS engines are available"""
engines = {
"piper": False,
"coqui": False,
"espeak": False,
"gtts": False,
"pyttsx3": False,
"edge_tts": False
}
# Check piper
try:
import piper
models_dir = os.path.join(os.path.dirname(__file__), "models")
if os.path.exists(models_dir):
for file in os.listdir(models_dir):
if file.endswith('.onnx'):
engines["piper"] = True
break
except:
pass
# Check coqui
try:
import TTS
engines["coqui"] = True
except:
pass
# Check espeak
try:
result = subprocess.run(["espeak", "--version"],
capture_output=True,
timeout=2)
engines["espeak"] = result.returncode == 0
except:
pass
# Check gTTS
try:
from gtts import gTTS
engines["gtts"] = True
except:
pass
# Check pyttsx3
try:
import pyttsx3
engines["pyttsx3"] = True
except:
pass
# Check edge_tts
try:
import edge_tts
engines["edge_tts"] = True
except:
pass
return engines
def run_async_blocking(coro):
"""Run async coroutine from sync context"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Run in thread with new event loop
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, coro)
return future.result()
else:
return loop.run_until_complete(coro)
except RuntimeError:
return asyncio.run(coro)
def generate_audio_piper(text: str, speed: float = 1.0):
"""Generate audio using Piper TTS"""
try:
import piper
import soundfile as sf
models_dir = os.path.join(os.path.dirname(__file__), "models")
model_path = None
if os.path.exists(models_dir):
for file in os.listdir(models_dir):
if file.endswith('.onnx'):
model_path = os.path.join(models_dir, file)
break
if not model_path or not os.path.exists(model_path):
raise FileNotFoundError("Piper model not found")
piper_voice = piper.PiperVoice.load(model_path)
# synthesize() returns an iterable of AudioChunk objects
audio_chunks = piper_voice.synthesize(text)
# Collect all audio chunks and concatenate them
audio_arrays = []
sample_rate = piper_voice.config.sample_rate
for chunk in audio_chunks:
# Each chunk has an audio_float_array property
audio_arrays.append(chunk.audio_float_array)
# Use sample_rate from first chunk if available
if hasattr(chunk, 'sample_rate') and chunk.sample_rate:
sample_rate = chunk.sample_rate
# Concatenate all chunks into a single array
if audio_arrays:
audio_data_np = np.concatenate(audio_arrays)
else:
raise Exception("No audio chunks generated")
# Ensure it's a numpy array and float32
if not isinstance(audio_data_np, np.ndarray):
audio_data_np = np.array(audio_data_np, dtype=np.float32)
# Ensure audio is 1D (mono)
if len(audio_data_np.shape) > 1:
audio_data_np = audio_data_np.flatten()
# Convert to float32 if needed
if audio_data_np.dtype != np.float32:
audio_data_np = audio_data_np.astype(np.float32)
return (sample_rate, audio_data_np)
except Exception as e:
raise Exception(f"Piper TTS failed: {str(e)}")
@spaces.GPU
def generate_audio_coqui(text: str, speed: float = 1.0):
"""Generate audio using Coqui TTS (GPU accelerated)"""
try:
from TTS.api import TTS
import soundfile as sf
models = [
"tts_models/en/ljspeech/tacotron2-DDC",
"tts_models/en/ljspeech/glow-tts",
"tts_models/en/vctk/vits",
]
tts = None
for model in models:
try:
tts = TTS(model_name=model, progress_bar=False)
break
except:
continue
if tts is None:
raise Exception("No Coqui TTS model available")
wav = tts.tts(text=text)
sample_rate = 22050
if hasattr(tts, 'synthesizer') and hasattr(tts.synthesizer, 'output_sample_rate'):
sample_rate = tts.synthesizer.output_sample_rate
# Convert to numpy array if it's a tensor or list
if hasattr(wav, 'cpu'): # PyTorch tensor
wav = wav.cpu().numpy()
elif hasattr(wav, 'numpy'): # TensorFlow tensor
wav = wav.numpy()
elif not isinstance(wav, np.ndarray):
wav = np.array(wav, dtype=np.float32)
# Ensure audio is 1D (mono) and float32
if len(wav.shape) > 1:
wav = wav.flatten()
# Convert to float32 if needed
if wav.dtype != np.float32:
wav = wav.astype(np.float32)
return (sample_rate, wav)
except Exception as e:
raise Exception(f"Coqui TTS failed: {str(e)}")
def generate_audio_espeak(text: str, speed: float = 1.0):
"""Generate audio using espeak"""
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_file:
audio_file_path = audio_file.name
try:
cmd = ["espeak", "-s", str(int(150 * speed)), "-w", audio_file_path, text]
subprocess.run(cmd, check=True, capture_output=True)
import soundfile as sf
audio_data, sample_rate = sf.read(audio_file_path)
# Ensure it's a numpy array and float32
if not isinstance(audio_data, np.ndarray):
audio_data = np.array(audio_data, dtype=np.float32)
# Ensure audio is 1D (mono)
if len(audio_data.shape) > 1:
audio_data = audio_data.flatten()
# Convert to float32 if needed
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32)
return (sample_rate, audio_data)
except Exception as e:
raise Exception(f"eSpeak TTS failed: {str(e)}")
finally:
try:
os.unlink(audio_file_path)
except:
pass
def generate_audio_gtts(text: str, speed: float = 1.0):
"""Generate audio using Google TTS"""
try:
from gtts import gTTS
import io
from pydub import AudioSegment
tts = gTTS(text=text, lang='en', slow=False)
audio_buffer = io.BytesIO()
tts.write_to_fp(audio_buffer)
audio_buffer.seek(0)
# Convert MP3 to WAV
audio = AudioSegment.from_mp3(audio_buffer)
wav_buffer = io.BytesIO()
audio.export(wav_buffer, format="wav")
wav_buffer.seek(0)
import soundfile as sf
audio_data, sample_rate = sf.read(wav_buffer)
# Ensure it's a numpy array and float32
if not isinstance(audio_data, np.ndarray):
audio_data = np.array(audio_data, dtype=np.float32)
# Ensure audio is 1D (mono)
if len(audio_data.shape) > 1:
audio_data = audio_data.flatten()
# Convert to float32 if needed
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32)
return (sample_rate, audio_data)
except Exception as e:
raise Exception(f"gTTS failed: {str(e)}")
def generate_audio_pyttsx3(text: str, speed: float = 1.0):
"""Generate audio using pyttsx3"""
try:
import pyttsx3
engine = pyttsx3.init()
engine.setProperty('rate', int(150 * speed))
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_file:
audio_file_path = audio_file.name
engine.save_to_file(text, audio_file_path)
engine.runAndWait()
import soundfile as sf
audio_data, sample_rate = sf.read(audio_file_path)
# Ensure it's a numpy array and float32
if not isinstance(audio_data, np.ndarray):
audio_data = np.array(audio_data, dtype=np.float32)
# Ensure audio is 1D (mono)
if len(audio_data.shape) > 1:
audio_data = audio_data.flatten()
# Convert to float32 if needed
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32)
os.unlink(audio_file_path)
return (sample_rate, audio_data)
except Exception as e:
raise Exception(f"pyttsx3 failed: {str(e)}")
def generate_audio_edge_tts(text: str, speed: float = 1.0):
"""Generate audio using Edge TTS"""
try:
import edge_tts
async def generate():
voices = await edge_tts.list_voices()
voice_obj = next((v for v in voices if v['Locale'].startswith('en')), None)
if voice_obj:
voice = voice_obj['ShortName']
else:
voice = "en-US-AriaNeural"
communicate = edge_tts.Communicate(text, voice, rate=f"+{int((speed - 1) * 100)}%")
audio_data = b""
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_data += chunk["data"]
return audio_data
audio_data = run_async_blocking(generate())
# Convert MP3 bytes to numpy array
import io
from pydub import AudioSegment
audio = AudioSegment.from_mp3(io.BytesIO(audio_data))
wav_buffer = io.BytesIO()
audio.export(wav_buffer, format="wav")
wav_buffer.seek(0)
import soundfile as sf
audio_array, sample_rate = sf.read(wav_buffer)
# Ensure it's a numpy array and float32
if not isinstance(audio_array, np.ndarray):
audio_array = np.array(audio_array, dtype=np.float32)
# Ensure audio is 1D (mono)
if len(audio_array.shape) > 1:
audio_array = audio_array.flatten()
# Convert to float32 if needed
if audio_array.dtype != np.float32:
audio_array = audio_array.astype(np.float32)
return (sample_rate, audio_array)
except Exception as e:
raise Exception(f"Edge TTS failed: {str(e)}")
def generate_speech(text: str, engine: str, speed: float = 1.0):
"""Main function to generate speech from text"""
if not text or not text.strip():
return None, "Please enter some text"
engines_status = check_engine_availability()
if not engines_status.get(engine, False):
available = [e for e, v in engines_status.items() if v]
if not available:
return None, "No TTS engines available"
engine = available[0] # Fallback to first available
try:
if engine == "piper":
sample_rate, audio_data = generate_audio_piper(text, speed)
elif engine == "coqui":
sample_rate, audio_data = generate_audio_coqui(text, speed)
elif engine == "gtts":
sample_rate, audio_data = generate_audio_gtts(text, speed)
elif engine == "pyttsx3":
sample_rate, audio_data = generate_audio_pyttsx3(text, speed)
elif engine == "edge_tts":
sample_rate, audio_data = generate_audio_edge_tts(text, speed)
else: # espeak
sample_rate, audio_data = generate_audio_espeak(text, speed)
# Ensure audio_data is a numpy array (not a list)
if not isinstance(audio_data, np.ndarray):
audio_data = np.array(audio_data, dtype=np.float32)
# Ensure audio is 1D (mono)
if len(audio_data.shape) > 1:
audio_data = audio_data.flatten()
# Normalize audio to [-1, 1] range if needed
max_val = np.max(np.abs(audio_data))
if max_val > 1.0:
audio_data = audio_data / max_val
# Ensure it's still a numpy array after normalization
if not isinstance(audio_data, np.ndarray):
audio_data = np.array(audio_data, dtype=np.float32)
# Save to temporary file for Gradio Audio component
import tempfile
import soundfile as sf
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
tmp_path = tmp.name
sf.write(tmp_path, audio_data, int(sample_rate))
# Return file path for Gradio Audio component
return tmp_path, None
except Exception as e:
return None, f"Error: {str(e)}"
# Create Gradio interface
engines_status = check_engine_availability()
available_engines = [e for e, v in engines_status.items() if v]
if not available_engines:
available_engines = ["espeak"] # Fallback
# Create Gradio interface
with gr.Blocks(title="sub200 - Ultra Low Latency TTS", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🎙️ sub200 - Ultra Low Latency Text-to-Speech
Host different open source TTS engines with ultra low latency. Supports GPU acceleration for high-quality neural TTS.
""")
with gr.Row():
with gr.Column(scale=2):
text_input = gr.Textbox(
label="Enter text to convert",
placeholder="Type or paste your text here...",
lines=5,
value=""
)
with gr.Column(scale=1):
engine_select = gr.Dropdown(
label="TTS Engine",
choices=available_engines,
value=available_engines[0] if available_engines else "espeak",
info="Select the TTS engine to use"
)
speed_slider = gr.Slider(
label="Speed",
minimum=0.5,
maximum=2.0,
value=1.0,
step=0.1,
info="Speech speed multiplier"
)
generate_btn = gr.Button("Generate Speech", variant="primary", size="lg")
audio_output = gr.Audio(label="Generated Audio", type="filepath", autoplay=True)
error_output = gr.Textbox(label="Status", visible=True)
# Engine status
with gr.Accordion("Engine Status", open=False):
status_text = "\n".join([
f"**{engine}**: {'✓ Available' if engines_status.get(engine, False) else '✗ Not Available'}"
for engine in ["piper", "coqui", "espeak", "gtts", "pyttsx3", "edge_tts"]
])
gr.Markdown(status_text)
# Connect the function
generate_btn.click(
fn=generate_speech,
inputs=[text_input, engine_select, speed_slider],
outputs=[audio_output, error_output]
)
# Auto-generate on text change (optional)
# text_input.submit(
# fn=generate_speech,
# inputs=[text_input, engine_select, speed_slider],
# outputs=[audio_output, error_output]
# )
# Try to download Piper models if not present
try:
import download_models
download_models.download_piper_model()
except:
pass
if __name__ == "__main__":
# Get port from environment (Hugging Face Spaces uses 7860, local uses 8000)
port = int(os.getenv("PORT", 8000))
demo.launch(server_name="0.0.0.0", server_port=port, share=False)