File size: 16,507 Bytes
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
 
 
 
 
 
 
 
 
 
 
 
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
 
 
 
 
 
 
 
 
 
 
 
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
 
 
 
 
 
 
 
 
 
 
 
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
 
 
 
 
 
 
 
 
 
 
 
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
7875858
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65621f7
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
"""
sub200 - Ultra Low Latency TTS Hosting Server
Supports multiple open-source TTS engines
Optimized for Hugging Face Spaces with Gradio and zero GPU (H200 dynamic allocation)
"""

import os
import subprocess
import tempfile
from typing import Optional
import concurrent.futures
import asyncio
import gradio as gr
import numpy as np

# Import spaces for GPU decorator
try:
    import spaces
except ImportError:
    # Fallback if spaces not available (local development)
    class spaces:
        @staticmethod
        def GPU(func):
            return func

# Import TTS engines
def check_engine_availability():
    """Check which TTS engines are available"""
    engines = {
        "piper": False,
        "coqui": False,
        "espeak": False,
        "gtts": False,
        "pyttsx3": False,
        "edge_tts": False
    }
    
    # Check piper
    try:
        import piper
        models_dir = os.path.join(os.path.dirname(__file__), "models")
        if os.path.exists(models_dir):
            for file in os.listdir(models_dir):
                if file.endswith('.onnx'):
                    engines["piper"] = True
                    break
    except:
        pass
    
    # Check coqui
    try:
        import TTS
        engines["coqui"] = True
    except:
        pass
    
    # Check espeak
    try:
        result = subprocess.run(["espeak", "--version"], 
                              capture_output=True, 
                              timeout=2)
        engines["espeak"] = result.returncode == 0
    except:
        pass
    
    # Check gTTS
    try:
        from gtts import gTTS
        engines["gtts"] = True
    except:
        pass
    
    # Check pyttsx3
    try:
        import pyttsx3
        engines["pyttsx3"] = True
    except:
        pass
    
    # Check edge_tts
    try:
        import edge_tts
        engines["edge_tts"] = True
    except:
        pass
    
    return engines

def run_async_blocking(coro):
    """Run async coroutine from sync context"""
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # Run in thread with new event loop
            with concurrent.futures.ThreadPoolExecutor() as executor:
                future = executor.submit(asyncio.run, coro)
                return future.result()
        else:
            return loop.run_until_complete(coro)
    except RuntimeError:
        return asyncio.run(coro)

def generate_audio_piper(text: str, speed: float = 1.0):
    """Generate audio using Piper TTS"""
    try:
        import piper
        import soundfile as sf
        
        models_dir = os.path.join(os.path.dirname(__file__), "models")
        model_path = None
        
        if os.path.exists(models_dir):
            for file in os.listdir(models_dir):
                if file.endswith('.onnx'):
                    model_path = os.path.join(models_dir, file)
                    break
        
        if not model_path or not os.path.exists(model_path):
            raise FileNotFoundError("Piper model not found")
        
        piper_voice = piper.PiperVoice.load(model_path)
        
        # synthesize() returns an iterable of AudioChunk objects
        audio_chunks = piper_voice.synthesize(text)
        
        # Collect all audio chunks and concatenate them
        audio_arrays = []
        sample_rate = piper_voice.config.sample_rate
        
        for chunk in audio_chunks:
            # Each chunk has an audio_float_array property
            audio_arrays.append(chunk.audio_float_array)
            # Use sample_rate from first chunk if available
            if hasattr(chunk, 'sample_rate') and chunk.sample_rate:
                sample_rate = chunk.sample_rate
        
        # Concatenate all chunks into a single array
        if audio_arrays:
            audio_data_np = np.concatenate(audio_arrays)
        else:
            raise Exception("No audio chunks generated")
        
        # Ensure it's a numpy array and float32
        if not isinstance(audio_data_np, np.ndarray):
            audio_data_np = np.array(audio_data_np, dtype=np.float32)
        
        # Ensure audio is 1D (mono)
        if len(audio_data_np.shape) > 1:
            audio_data_np = audio_data_np.flatten()
        
        # Convert to float32 if needed
        if audio_data_np.dtype != np.float32:
            audio_data_np = audio_data_np.astype(np.float32)
        
        return (sample_rate, audio_data_np)
        
    except Exception as e:
        raise Exception(f"Piper TTS failed: {str(e)}")

@spaces.GPU
def generate_audio_coqui(text: str, speed: float = 1.0):
    """Generate audio using Coqui TTS (GPU accelerated)"""
    try:
        from TTS.api import TTS
        import soundfile as sf
        
        models = [
            "tts_models/en/ljspeech/tacotron2-DDC",
            "tts_models/en/ljspeech/glow-tts",
            "tts_models/en/vctk/vits",
        ]
        
        tts = None
        for model in models:
            try:
                tts = TTS(model_name=model, progress_bar=False)
                break
            except:
                continue
        
        if tts is None:
            raise Exception("No Coqui TTS model available")
        
        wav = tts.tts(text=text)
        sample_rate = 22050
        if hasattr(tts, 'synthesizer') and hasattr(tts.synthesizer, 'output_sample_rate'):
            sample_rate = tts.synthesizer.output_sample_rate
        
        # Convert to numpy array if it's a tensor or list
        if hasattr(wav, 'cpu'):  # PyTorch tensor
            wav = wav.cpu().numpy()
        elif hasattr(wav, 'numpy'):  # TensorFlow tensor
            wav = wav.numpy()
        elif not isinstance(wav, np.ndarray):
            wav = np.array(wav, dtype=np.float32)
        
        # Ensure audio is 1D (mono) and float32
        if len(wav.shape) > 1:
            wav = wav.flatten()
        
        # Convert to float32 if needed
        if wav.dtype != np.float32:
            wav = wav.astype(np.float32)
        
        return (sample_rate, wav)
        
    except Exception as e:
        raise Exception(f"Coqui TTS failed: {str(e)}")

def generate_audio_espeak(text: str, speed: float = 1.0):
    """Generate audio using espeak"""
    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_file:
        audio_file_path = audio_file.name
    
    try:
        cmd = ["espeak", "-s", str(int(150 * speed)), "-w", audio_file_path, text]
        subprocess.run(cmd, check=True, capture_output=True)
        
        import soundfile as sf
        audio_data, sample_rate = sf.read(audio_file_path)
        
        # Ensure it's a numpy array and float32
        if not isinstance(audio_data, np.ndarray):
            audio_data = np.array(audio_data, dtype=np.float32)
        
        # Ensure audio is 1D (mono)
        if len(audio_data.shape) > 1:
            audio_data = audio_data.flatten()
        
        # Convert to float32 if needed
        if audio_data.dtype != np.float32:
            audio_data = audio_data.astype(np.float32)
        
        return (sample_rate, audio_data)
    except Exception as e:
        raise Exception(f"eSpeak TTS failed: {str(e)}")
    finally:
        try:
            os.unlink(audio_file_path)
        except:
            pass

def generate_audio_gtts(text: str, speed: float = 1.0):
    """Generate audio using Google TTS"""
    try:
        from gtts import gTTS
        import io
        from pydub import AudioSegment
        
        tts = gTTS(text=text, lang='en', slow=False)
        audio_buffer = io.BytesIO()
        tts.write_to_fp(audio_buffer)
        audio_buffer.seek(0)
        
        # Convert MP3 to WAV
        audio = AudioSegment.from_mp3(audio_buffer)
        wav_buffer = io.BytesIO()
        audio.export(wav_buffer, format="wav")
        wav_buffer.seek(0)
        
        import soundfile as sf
        audio_data, sample_rate = sf.read(wav_buffer)
        
        # Ensure it's a numpy array and float32
        if not isinstance(audio_data, np.ndarray):
            audio_data = np.array(audio_data, dtype=np.float32)
        
        # Ensure audio is 1D (mono)
        if len(audio_data.shape) > 1:
            audio_data = audio_data.flatten()
        
        # Convert to float32 if needed
        if audio_data.dtype != np.float32:
            audio_data = audio_data.astype(np.float32)
        
        return (sample_rate, audio_data)
    except Exception as e:
        raise Exception(f"gTTS failed: {str(e)}")

def generate_audio_pyttsx3(text: str, speed: float = 1.0):
    """Generate audio using pyttsx3"""
    try:
        import pyttsx3
        
        engine = pyttsx3.init()
        engine.setProperty('rate', int(150 * speed))
        
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_file:
            audio_file_path = audio_file.name
        
        engine.save_to_file(text, audio_file_path)
        engine.runAndWait()
        
        import soundfile as sf
        audio_data, sample_rate = sf.read(audio_file_path)
        
        # Ensure it's a numpy array and float32
        if not isinstance(audio_data, np.ndarray):
            audio_data = np.array(audio_data, dtype=np.float32)
        
        # Ensure audio is 1D (mono)
        if len(audio_data.shape) > 1:
            audio_data = audio_data.flatten()
        
        # Convert to float32 if needed
        if audio_data.dtype != np.float32:
            audio_data = audio_data.astype(np.float32)
        
        os.unlink(audio_file_path)
        return (sample_rate, audio_data)
    except Exception as e:
        raise Exception(f"pyttsx3 failed: {str(e)}")

def generate_audio_edge_tts(text: str, speed: float = 1.0):
    """Generate audio using Edge TTS"""
    try:
        import edge_tts
        
        async def generate():
            voices = await edge_tts.list_voices()
            voice_obj = next((v for v in voices if v['Locale'].startswith('en')), None)
            if voice_obj:
                voice = voice_obj['ShortName']
            else:
                voice = "en-US-AriaNeural"
            
            communicate = edge_tts.Communicate(text, voice, rate=f"+{int((speed - 1) * 100)}%")
            audio_data = b""
            async for chunk in communicate.stream():
                if chunk["type"] == "audio":
                    audio_data += chunk["data"]
            return audio_data
        
        audio_data = run_async_blocking(generate())
        
        # Convert MP3 bytes to numpy array
        import io
        from pydub import AudioSegment
        
        audio = AudioSegment.from_mp3(io.BytesIO(audio_data))
        wav_buffer = io.BytesIO()
        audio.export(wav_buffer, format="wav")
        wav_buffer.seek(0)
        
        import soundfile as sf
        audio_array, sample_rate = sf.read(wav_buffer)
        
        # Ensure it's a numpy array and float32
        if not isinstance(audio_array, np.ndarray):
            audio_array = np.array(audio_array, dtype=np.float32)
        
        # Ensure audio is 1D (mono)
        if len(audio_array.shape) > 1:
            audio_array = audio_array.flatten()
        
        # Convert to float32 if needed
        if audio_array.dtype != np.float32:
            audio_array = audio_array.astype(np.float32)
        
        return (sample_rate, audio_array)
        
    except Exception as e:
        raise Exception(f"Edge TTS failed: {str(e)}")

def generate_speech(text: str, engine: str, speed: float = 1.0):
    """Main function to generate speech from text"""
    if not text or not text.strip():
        return None, "Please enter some text"
    
    engines_status = check_engine_availability()
    
    if not engines_status.get(engine, False):
        available = [e for e, v in engines_status.items() if v]
        if not available:
            return None, "No TTS engines available"
        engine = available[0]  # Fallback to first available
    
    try:
        if engine == "piper":
            sample_rate, audio_data = generate_audio_piper(text, speed)
        elif engine == "coqui":
            sample_rate, audio_data = generate_audio_coqui(text, speed)
        elif engine == "gtts":
            sample_rate, audio_data = generate_audio_gtts(text, speed)
        elif engine == "pyttsx3":
            sample_rate, audio_data = generate_audio_pyttsx3(text, speed)
        elif engine == "edge_tts":
            sample_rate, audio_data = generate_audio_edge_tts(text, speed)
        else:  # espeak
            sample_rate, audio_data = generate_audio_espeak(text, speed)
        
        # Ensure audio_data is a numpy array (not a list)
        if not isinstance(audio_data, np.ndarray):
            audio_data = np.array(audio_data, dtype=np.float32)
        
        # Ensure audio is 1D (mono)
        if len(audio_data.shape) > 1:
            audio_data = audio_data.flatten()
        
        # Normalize audio to [-1, 1] range if needed
        max_val = np.max(np.abs(audio_data))
        if max_val > 1.0:
            audio_data = audio_data / max_val
        
        # Ensure it's still a numpy array after normalization
        if not isinstance(audio_data, np.ndarray):
            audio_data = np.array(audio_data, dtype=np.float32)
        
        # Save to temporary file for Gradio Audio component
        import tempfile
        import soundfile as sf
        
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
            tmp_path = tmp.name
        
        sf.write(tmp_path, audio_data, int(sample_rate))
        
        # Return file path for Gradio Audio component
        return tmp_path, None
        
    except Exception as e:
        return None, f"Error: {str(e)}"

# Create Gradio interface
engines_status = check_engine_availability()
available_engines = [e for e, v in engines_status.items() if v]

if not available_engines:
    available_engines = ["espeak"]  # Fallback

# Create Gradio interface
with gr.Blocks(title="sub200 - Ultra Low Latency TTS", theme=gr.themes.Soft()) as demo:
    gr.Markdown("""
    # 🎙️ sub200 - Ultra Low Latency Text-to-Speech
    
    Host different open source TTS engines with ultra low latency. Supports GPU acceleration for high-quality neural TTS.
    """)
    
    with gr.Row():
        with gr.Column(scale=2):
            text_input = gr.Textbox(
                label="Enter text to convert",
                placeholder="Type or paste your text here...",
                lines=5,
                value=""
            )
        with gr.Column(scale=1):
            engine_select = gr.Dropdown(
                label="TTS Engine",
                choices=available_engines,
                value=available_engines[0] if available_engines else "espeak",
                info="Select the TTS engine to use"
            )
            speed_slider = gr.Slider(
                label="Speed",
                minimum=0.5,
                maximum=2.0,
                value=1.0,
                step=0.1,
                info="Speech speed multiplier"
            )
    
    generate_btn = gr.Button("Generate Speech", variant="primary", size="lg")
    
    audio_output = gr.Audio(label="Generated Audio", type="filepath", autoplay=True)
    error_output = gr.Textbox(label="Status", visible=True)
    
    # Engine status
    with gr.Accordion("Engine Status", open=False):
        status_text = "\n".join([
            f"**{engine}**: {'✓ Available' if engines_status.get(engine, False) else '✗ Not Available'}"
            for engine in ["piper", "coqui", "espeak", "gtts", "pyttsx3", "edge_tts"]
        ])
        gr.Markdown(status_text)
    
    # Connect the function
    generate_btn.click(
        fn=generate_speech,
        inputs=[text_input, engine_select, speed_slider],
        outputs=[audio_output, error_output]
    )
    
    # Auto-generate on text change (optional)
    # text_input.submit(
    #     fn=generate_speech,
    #     inputs=[text_input, engine_select, speed_slider],
    #     outputs=[audio_output, error_output]
    # )

# Try to download Piper models if not present
try:
    import download_models
    download_models.download_piper_model()
except:
    pass

if __name__ == "__main__":
    # Get port from environment (Hugging Face Spaces uses 7860, local uses 8000)
    port = int(os.getenv("PORT", 8000))
    demo.launch(server_name="0.0.0.0", server_port=port, share=False)