tts-gpu-service / app.py
Peter Michael Gits
feat: Add unmute.sh streaming text processing methodology
072c9ef
import os
import sys
import warnings
from typing import List, Dict, Any, Optional
# === CRITICAL: COMPLETE STREAM PROTECTION SETUP ===
# This must happen BEFORE any other imports that might configure logging
# 1. Completely disable warnings to prevent stream conflicts
warnings.filterwarnings("ignore")
os.environ["PYTHONWARNINGS"] = "ignore"
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
os.environ["GRADIO_ALLOW_FLAGGING"] = "never"
os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0"
os.environ["GRADIO_SERVER_PORT"] = "7860"
# 2. Replace stdout/stderr with safe alternatives BEFORE any imports
class SafeStream:
"""Safe stream that never raises I/O errors - MCP compatible"""
def __init__(self, fallback_name):
self.fallback_name = fallback_name
self.closed = False
# Add buffer attribute for MCP compatibility
self.buffer = self
def write(self, text):
try:
if hasattr(sys, f'__{self.fallback_name}__'):
getattr(sys, f'__{self.fallback_name}__').write(text)
else:
# Ultimate fallback - do nothing rather than crash
pass
except:
pass # Never raise exceptions from write
return len(text) if isinstance(text, str) else 0
def flush(self):
try:
if hasattr(sys, f'__{self.fallback_name}__'):
getattr(sys, f'__{self.fallback_name}__').flush()
except:
pass
def isatty(self):
return False # Always return False to prevent tty-related errors
def fileno(self):
raise OSError("fileno not supported") # Prevent fileno access
def readable(self):
return False # For MCP compatibility
def writable(self):
return True # For MCP compatibility
def seekable(self):
return False # For MCP compatibility
# Install safe streams BEFORE any other imports
sys.stdout = SafeStream('stdout')
sys.stderr = SafeStream('stderr')
# 3. Completely disable the logging module to prevent any stream conflicts
import logging
logging.disable(logging.CRITICAL)
# 4. Patch uvicorn.Config to prevent it from configuring logging
try:
import uvicorn.config
original_configure_logging = uvicorn.config.Config.configure_logging
def patched_configure_logging(self):
"""Completely disable uvicorn logging configuration"""
# Do absolutely nothing - prevent uvicorn from touching streams
pass
uvicorn.config.Config.configure_logging = patched_configure_logging
except:
pass # If uvicorn not available yet, we'll patch it later
# 5. Now safe to import other modules
import gradio as gr
import torch
import torchaudio
from transformers import AutoProcessor, BarkModel
import numpy as np
import io
import time
from huggingface_hub import login
import spaces # Required for ZeroGPU
# Dummy GPU function to satisfy ZeroGPU startup requirements
# This ensures @spaces.GPU is detected during startup even in WebSocket-only mode
@spaces.GPU
def _dummy_gpu_function():
"""Dummy function to satisfy ZeroGPU startup detection"""
return "GPU available"
# Regular CPU function (no decorator needed)
def _dummy_cpu_function():
"""Regular CPU function for system info"""
return "CPU available"
# Initialize functions at module level for Spaces detection
_gpu_init = _dummy_gpu_function()
_cpu_init = _dummy_cpu_function()
import asyncio
import threading
import json
import base64
from datetime import datetime
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
# 6. Additional uvicorn patching after import
try:
import uvicorn
import uvicorn.server
import uvicorn.main
# Patch uvicorn.Server to disable logging
if hasattr(uvicorn.server, 'Server'):
original_init = uvicorn.server.Server.__init__
def patched_init(self, config):
# Force disable logging in config
config.log_config = None
config.access_log = False
config.log_level = "critical"
original_init(self, config)
uvicorn.server.Server.__init__ = patched_init
# Patch uvicorn.run to disable logging
original_run = uvicorn.run
def patched_run(*args, **kwargs):
kwargs['log_config'] = None
kwargs['access_log'] = False
kwargs['log_level'] = 'critical'
return original_run(*args, **kwargs)
uvicorn.run = patched_run
except:
pass
# 7. Disable specific library loggers that cause conflicts
try:
for logger_name in [
'httpx', 'gradio', 'uvicorn', 'transformers', 'torch',
'torchaudio', 'bark', 'scipy', 'asyncio', 'ffmpeg',
'uvicorn.access', 'uvicorn.error', 'gradio.routes'
]:
logger = logging.getLogger(logger_name)
logger.disabled = True
logger.propagate = False
logger.handlers = []
logger.setLevel(logging.CRITICAL + 1)
except Exception:
pass # Ignore any logging setup errors
# 8. Also disable root logger handlers to prevent conflicts
try:
root_logger = logging.getLogger()
root_logger.handlers = []
root_logger.disabled = True
root_logger.setLevel(logging.CRITICAL + 1)
except Exception:
pass
# Simple print-based logging to avoid all stream conflicts
def safe_log(level, message):
"""Bulletproof logging using only print statements"""
print(f"[TTS-{level.upper()}] {message}", flush=True)
# MCP Server imports
try:
from mcp.server import Server
from mcp.types import Tool, TextContent
import mcp.server.stdio
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
safe_log("warning", "MCP not available. Install with: pip install mcp>=1.0.0")
# Logging completely disabled to prevent stream conflicts in ZeroGPU
# MCP Server instance
mcp_server = None
if MCP_AVAILABLE:
mcp_server = Server("tts-gpu-service")
# Global variables for model
processor = None
model = None
device = None
def load_model():
"""Load the TTS model - optimized for ZeroGPU"""
global processor, model, device
safe_log("info", "Loading TTS model for ZeroGPU...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
safe_log("info", f"Using device: {device}")
try:
# Use Bark model for high-quality TTS
processor = AutoProcessor.from_pretrained("suno/bark-small")
model = BarkModel.from_pretrained(
"suno/bark-small",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto" if torch.cuda.is_available() else None
)
# Ensure model is on the correct device
if torch.cuda.is_available():
model = model.to(device)
safe_log("info", f"TTS model loaded successfully on {device}!")
return True
except Exception as e:
safe_log("error", f"Error loading model: {e}")
return False
@spaces.GPU # This decorator enables ZeroGPU for this function
def synthesize_speech(text, voice_preset="v2/en_speaker_6"):
"""Synthesize speech from text - ZeroGPU accelerated"""
global processor, model, device
if not text.strip():
return None, "Please enter some text to synthesize."
try:
# Load model if not already loaded
if model is None:
success = load_model()
if not success:
return None, "Error: Could not load TTS model."
safe_log("info", f"Synthesizing with ZeroGPU: {text[:50]}...")
start_time = time.time()
# Process text with voice preset - ensure return_tensors='pt'
inputs = processor(text, voice_preset=voice_preset, return_tensors="pt")
# Generate audio with ZeroGPU acceleration
with torch.no_grad():
# Ensure all inputs are on the correct device
if torch.cuda.is_available() and device.type == 'cuda':
# Move all tensor inputs to GPU recursively
def move_to_device(obj, target_device):
if isinstance(obj, torch.Tensor):
return obj.to(target_device)
elif isinstance(obj, dict):
return {k: move_to_device(v, target_device) for k, v in obj.items()}
elif isinstance(obj, list):
return [move_to_device(item, target_device) for item in obj]
elif isinstance(obj, tuple):
return tuple(move_to_device(item, target_device) for item in obj)
else:
return obj
inputs = move_to_device(inputs, device)
# Also ensure model is on correct device
model = model.to(device)
# Debug: log device info
safe_log("info", f"Model device: {next(model.parameters()).device}")
for k, v in inputs.items():
if isinstance(v, torch.Tensor):
safe_log("info", f"Input {k} device: {v.device}")
# Generate without mixed precision first to isolate the issue
try:
audio_array = model.generate(**inputs)
except Exception as e:
safe_log("error", f"Generation failed: {e}")
# Try with CPU fallback
safe_log("info", "Attempting CPU fallback...")
model = model.cpu()
inputs = move_to_device(inputs, torch.device('cpu'))
audio_array = model.generate(**inputs)
# Convert to numpy and ensure it's on CPU with correct dtype
if torch.cuda.is_available():
audio_array = audio_array.cpu()
# Convert to float32 for torchaudio compatibility
if audio_array.dtype == torch.float16:
audio_array = audio_array.float()
audio_array = audio_array.numpy().squeeze()
# Get sample rate
sample_rate = model.generation_config.sample_rate
# Save to temporary file for Gradio - ensure float32 tensor
output_path = "temp_audio.wav"
audio_tensor = torch.from_numpy(audio_array).unsqueeze(0).float()
torchaudio.save(output_path, audio_tensor, sample_rate)
generation_time = time.time() - start_time
gpu_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU"
status_message = f"✅ Generated in {generation_time:.2f}s on {gpu_name} (ZeroGPU)"
return output_path, status_message
except Exception as e:
error_msg = f"❌ Error during synthesis: {str(e)}"
safe_log("error", error_msg)
return None, error_msg
@spaces.GPU # ZeroGPU for batch processing
def batch_synthesize(text_list, voice_preset="v2/en_speaker_6"):
"""Batch synthesis with ZeroGPU optimization"""
results = []
start_time = time.time()
for i, text in enumerate(text_list):
if text.strip():
audio_path, status = synthesize_speech(text, voice_preset)
results.append((audio_path, f"Item {i+1}: {status}"))
else:
results.append((None, f"Item {i+1}: Empty text skipped"))
total_time = time.time() - start_time
batch_status = f"🚀 Batch completed: {len(text_list)} items in {total_time:.2f}s"
return results, batch_status
def get_system_info():
"""Get system information including ZeroGPU details"""
info = {
"🚀 ZeroGPU": "Active" if torch.cuda.is_available() else "Not Available",
"🎯 GPU Name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU Only",
"💾 GPU Memory": f"{torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB" if torch.cuda.is_available() else "N/A",
"⚡ CUDA Version": torch.version.cuda if torch.cuda.is_available() else "N/A",
"🔧 PyTorch": torch.__version__,
"🤖 Model Status": "✅ Loaded" if model is not None else "💤 Lazy Loading (ZeroGPU optimized)",
"🎛️ Mixed Precision": "✅ Enabled" if torch.cuda.is_available() else "❌ CPU Mode",
"🔌 MCP Server": "✅ Available" if MCP_AVAILABLE else "❌ Not Available",
"🌐 WebSocket TTS": "✅ Ready" if model is not None else "💤 Ready (lazy loading)"
}
return "\n".join([f"{k}: {v}" for k, v in info.items()])
# WebSocket TTS Handler for Real-time Text-to-Speech
class WebSocketTTSHandler:
"""WebSocket handler for real-time TTS integration with ChatCal WebRTC"""
def __init__(self):
self.active_connections = {}
async def connect(self, websocket: WebSocket, client_id: str):
"""Accept WebSocket connection for TTS service"""
await websocket.accept()
self.active_connections[client_id] = websocket
safe_log("info", f"🔌 TTS WebSocket client {client_id} connected")
# Send connection confirmation with service info
await self.send_message(client_id, {
"type": "tts_connection_confirmed",
"client_id": client_id,
"timestamp": datetime.now().isoformat(),
"service": "tts-gpu-service",
"model_status": "✅ Loaded" if model is not None else "⏳ Loading",
"zerogpu_status": "✅ Active" if torch.cuda.is_available() else "❌ Not Available",
"available_voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"]
})
async def disconnect(self, client_id: str):
"""Clean up connection"""
if client_id in self.active_connections:
del self.active_connections[client_id]
safe_log("info", f"🔌 TTS WebSocket client {client_id} disconnected")
async def send_message(self, client_id: str, message: dict):
"""Send JSON message to client"""
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
try:
await websocket.send_text(json.dumps(message))
except Exception as e:
safe_log("error", f"Failed to send message to TTS client {client_id}: {e}")
await self.disconnect(client_id)
async def handle_streaming_text_synthesis(self, client_id: str, text_chunks: list, voice_preset: str = "v2/en_speaker_6", is_final: bool = True):
"""Process streaming text synthesis following unmute.sh methodology"""
try:
# UNMUTE.SH METHODOLOGY: Process text chunks in streaming fashion
safe_log("info", f"🔊 TTS STREAMING: Processing {len(text_chunks)} chunks from {client_id} (final={is_final})")
if is_final:
# FLUSH TRICK: Process all accumulated text at once for best quality
complete_text = " ".join(text_chunks).strip()
if complete_text:
safe_log("info", f"🔊 TTS FLUSH: Final synthesis for {client_id}: {complete_text[:50]}...")
# Use the existing ZeroGPU synthesize_speech function
audio_path, status = synthesize_speech(complete_text, voice_preset)
if audio_path and "✅" in status:
# Read the generated audio file
with open(audio_path, 'rb') as audio_file:
audio_data = audio_file.read()
# Encode audio as base64 for WebSocket transmission
audio_b64 = base64.b64encode(audio_data).decode('utf-8')
# Send successful synthesis with streaming metadata
await self.send_message(client_id, {
"type": "tts_streaming_response",
"audio_data": audio_b64,
"audio_format": "wav",
"text": complete_text,
"text_chunks": text_chunks,
"voice_preset": voice_preset,
"timestamp": datetime.now().isoformat(),
"audio_size": len(audio_data),
"status": status,
"is_final": is_final,
"streaming_method": "unmute.sh_flush_trick"
})
safe_log("info", f"🔊 TTS STREAMING: Final audio sent to {client_id} ({len(audio_data)} bytes)")
# Clean up temporary file
import os
try:
os.unlink(audio_path)
except:
pass
else:
# Send error message
await self.send_message(client_id, {
"type": "tts_streaming_error",
"message": f"TTS streaming synthesis failed: {status}",
"text": complete_text,
"is_final": is_final,
"timestamp": datetime.now().isoformat()
})
else:
# Empty final flush
safe_log("info", f"🔊 TTS FLUSH: Empty final text for {client_id}")
else:
# STREAMING: Send partial progress update (no audio yet)
await self.send_message(client_id, {
"type": "tts_streaming_progress",
"message": f"Buffering text chunks: {len(text_chunks)}",
"text_chunks": text_chunks[-3:], # Show last 3 chunks for progress
"is_final": is_final,
"timestamp": datetime.now().isoformat()
})
safe_log("info", f"🔊 TTS STREAMING: Progress update sent to {client_id} ({len(text_chunks)} chunks)")
except Exception as e:
safe_log("error", f"TTS streaming error for {client_id}: {e}")
await self.send_message(client_id, {
"type": "tts_streaming_error",
"message": f"TTS streaming error: {str(e)}",
"is_final": is_final,
"timestamp": datetime.now().isoformat()
})
async def handle_text_synthesis(self, client_id: str, text: str, voice_preset: str = "v2/en_speaker_6"):
"""Process text synthesis with real TTS service (legacy single-shot method)"""
try:
safe_log("info", f"🔊 TTS: Processing text from {client_id}: {text[:50]}...")
# Use streaming method with single chunk for consistency
await self.handle_streaming_text_synthesis(client_id, [text], voice_preset, is_final=True)
except Exception as e:
safe_log("error", f"TTS WebSocket error for {client_id}: {e}")
await self.send_message(client_id, {
"type": "tts_error",
"message": f"TTS processing error: {str(e)}",
"timestamp": datetime.now().isoformat()
})
async def handle_message(self, client_id: str, message_data: dict):
"""Handle different types of WebSocket messages"""
message_type = message_data.get("type")
if message_type == "tts_synthesize":
# Text-to-speech synthesis request (legacy single-shot)
text = message_data.get("text", "")
voice_preset = message_data.get("voice_preset", "v2/en_speaker_6")
if text.strip():
await self.handle_text_synthesis(client_id, text, voice_preset)
else:
await self.send_message(client_id, {
"type": "tts_error",
"message": "Empty text provided for synthesis",
"timestamp": datetime.now().isoformat()
})
elif message_type == "tts_streaming_synthesize":
# Streaming text-to-speech synthesis request (unmute.sh methodology)
text_chunks = message_data.get("text_chunks", [])
voice_preset = message_data.get("voice_preset", "v2/en_speaker_6")
is_final = message_data.get("is_final", True)
if text_chunks:
await self.handle_streaming_text_synthesis(client_id, text_chunks, voice_preset, is_final)
else:
await self.send_message(client_id, {
"type": "tts_streaming_error",
"message": "Empty text chunks provided for streaming synthesis",
"is_final": is_final,
"timestamp": datetime.now().isoformat()
})
elif message_type == "tts_get_voices":
# Request available voice presets
await self.send_message(client_id, {
"type": "tts_voices_list",
"voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"],
"timestamp": datetime.now().isoformat()
})
elif message_type == "tts_get_streaming_info":
# Request streaming capabilities info
await self.send_message(client_id, {
"type": "tts_streaming_info",
"streaming_supported": True,
"methodology": "unmute.sh with flush trick",
"message_types": {
"tts_streaming_synthesize": "Send text chunks for streaming processing",
"tts_streaming_response": "Receive final audio with streaming metadata",
"tts_streaming_progress": "Receive progress updates during buffering",
"tts_streaming_error": "Receive streaming-specific error messages"
},
"flush_trick": "Set is_final=true to trigger synthesis of all buffered chunks",
"timestamp": datetime.now().isoformat()
})
else:
safe_log("warning", f"Unknown TTS message type from {client_id}: {message_type}")
# Global TTS WebSocket handler
tts_websocket_handler = WebSocketTTSHandler()
# FastAPI WebSocket Integration for TTS Service
def create_tts_fastapi_app():
"""Create FastAPI app with TTS WebSocket endpoint"""
import uuid
app = FastAPI(
title="TTS GPU Service WebSocket API",
description="Real-time Text-to-Speech with ZeroGPU acceleration",
version="1.0.0"
)
@app.websocket("/ws/tts")
async def tts_websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time TTS"""
client_id = str(uuid.uuid4())
try:
await tts_websocket_handler.connect(websocket, client_id)
while True:
# Receive JSON message from client
data = await websocket.receive_text()
try:
message = json.loads(data)
await tts_websocket_handler.handle_message(client_id, message)
except json.JSONDecodeError as e:
await tts_websocket_handler.send_message(client_id, {
"type": "tts_error",
"message": f"Invalid JSON format: {str(e)}",
"timestamp": datetime.now().isoformat()
})
except WebSocketDisconnect:
await tts_websocket_handler.disconnect(client_id)
except Exception as e:
safe_log("error", f"TTS WebSocket endpoint error: {e}")
await tts_websocket_handler.disconnect(client_id)
@app.websocket("/ws/tts/{client_id}")
async def tts_websocket_with_id(websocket: WebSocket, client_id: str):
"""WebSocket endpoint with specific client ID"""
try:
await tts_websocket_handler.connect(websocket, client_id)
while True:
data = await websocket.receive_text()
try:
message = json.loads(data)
await tts_websocket_handler.handle_message(client_id, message)
except json.JSONDecodeError as e:
await tts_websocket_handler.send_message(client_id, {
"type": "tts_error",
"message": f"Invalid JSON format: {str(e)}",
"timestamp": datetime.now().isoformat()
})
except WebSocketDisconnect:
await tts_websocket_handler.disconnect(client_id)
except Exception as e:
safe_log("error", f"TTS WebSocket endpoint error: {e}")
await tts_websocket_handler.disconnect(client_id)
@app.get("/")
async def tts_root():
"""TTS service status endpoint"""
return {
"service": "tts-gpu-service",
"status": "✅ Ready" if model is not None else "⏳ Loading",
"zerogpu": "✅ Active" if torch.cuda.is_available() else "❌ Not Available",
"websocket_endpoints": ["/ws/tts", "/ws/tts/{client_id}"],
"available_voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"],
"model": "suno/bark"
}
@app.get("/health")
async def tts_health():
"""Health check endpoint with detailed status"""
return {
"status": "healthy",
"model_loaded": model is not None,
"gpu_available": torch.cuda.is_available(),
"loading_strategy": "lazy (ZeroGPU optimized)",
"note": "Model loads on first synthesis request to optimize GPU usage"
}
async def preload_model_logic():
"""Shared logic for model preloading"""
global model
if model is not None:
return {"status": "success", "message": "Model already loaded", "model_loaded": True}
try:
success = load_model()
if success:
return {"status": "success", "message": "Model preloaded successfully", "model_loaded": True}
else:
return {"status": "error", "message": "Failed to preload model", "model_loaded": False}
except Exception as e:
return {"status": "error", "message": f"Preload error: {str(e)}", "model_loaded": False}
@app.get("/preload")
async def preload_model_get():
"""Preload the TTS model via GET (browser-friendly)"""
return await preload_model_logic()
@app.post("/preload")
async def preload_model_post():
"""Preload the TTS model via POST (API-friendly)"""
return await preload_model_logic()
return app
# MCP Tool Definitions and Handlers
if MCP_AVAILABLE:
@mcp_server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available MCP tools for TTS service"""
return [
Tool(
name="tts_synthesize",
description="Synthesize speech from text using Bark TTS model with ZeroGPU acceleration",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text to convert to speech"
},
"voice_preset": {
"type": "string",
"description": "Voice preset identifier (e.g., 'v2/en_speaker_6', 'v2/en_speaker_1')",
"default": "v2/en_speaker_6"
}
},
"required": ["text"]
}
),
Tool(
name="tts_batch_synthesize",
description="Synthesize speech from multiple texts in batch with ZeroGPU optimization",
inputSchema={
"type": "object",
"properties": {
"text_list": {
"type": "array",
"items": {"type": "string"},
"description": "List of texts to convert to speech"
},
"voice_preset": {
"type": "string",
"description": "Voice preset for all texts",
"default": "v2/en_speaker_6"
}
},
"required": ["text_list"]
}
),
Tool(
name="tts_get_info",
description="Get system information including ZeroGPU status and TTS service capabilities",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
)
]
@mcp_server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle MCP tool calls for TTS operations"""
try:
if name == "tts_synthesize":
text = arguments.get("text", "")
voice_preset = arguments.get("voice_preset", "v2/en_speaker_6")
if not text.strip():
return [TextContent(
type="text",
text=json.dumps({
"error": "No text provided for synthesis",
"status": "❌ Empty text",
"audio_file": None
})
)]
# Use the existing synthesize_speech function
audio_path, status = synthesize_speech(text, voice_preset)
result = {
"status": status,
"audio_file": audio_path,
"text": text,
"voice_preset": voice_preset,
"success": audio_path is not None
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
elif name == "tts_batch_synthesize":
text_list = arguments.get("text_list", [])
voice_preset = arguments.get("voice_preset", "v2/en_speaker_6")
if not text_list:
return [TextContent(
type="text",
text=json.dumps({
"error": "No texts provided for batch synthesis",
"status": "❌ Empty list",
"results": []
})
)]
# Use the existing batch_synthesize function
results, batch_status = batch_synthesize(text_list, voice_preset)
# Format results for MCP response
formatted_results = []
for i, (audio_path, status) in enumerate(results):
formatted_results.append({
"index": i,
"text": text_list[i] if i < len(text_list) else "",
"audio_file": audio_path,
"status": status,
"success": audio_path is not None
})
result = {
"batch_status": batch_status,
"results": formatted_results,
"total_items": len(text_list),
"voice_preset": voice_preset
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
elif name == "tts_get_info":
# Use the existing get_system_info function
system_info = get_system_info()
# Also include MCP-specific information
info_dict = {
"system_info": system_info,
"mcp_status": "✅ MCP Server Active",
"available_tools": ["tts_synthesize", "tts_batch_synthesize", "tts_get_info"],
"voice_presets": [
{"code": code, "description": desc}
for code, desc in VOICE_PRESETS
],
"service_endpoints": {
"gradio_interface": "http://localhost:7860",
"mcp_protocol": "stdio"
},
"model_info": {
"name": "suno/bark-small",
"type": "Text-to-Speech",
"accelerated": "ZeroGPU"
}
}
return [TextContent(
type="text",
text=json.dumps(info_dict, indent=2)
)]
else:
return [TextContent(
type="text",
text=json.dumps({
"error": f"Unknown tool: {name}",
"available_tools": ["tts_synthesize", "tts_batch_synthesize", "tts_get_info"]
})
)]
except Exception as e:
safe_log("error", f"Error in MCP tool '{name}': {str(e)}")
return [TextContent(
type="text",
text=json.dumps({
"error": f"Tool execution failed: {str(e)}",
"tool": name,
"arguments": arguments
})
)]
async def run_mcp_server():
"""Run the MCP server in stdio mode with temporary stream restoration"""
safe_log("info", "🔌 Starting MCP Server for TTS service...")
try:
# Temporarily restore original streams for MCP
original_stdin = sys.stdin
original_stdout = sys.stdout
original_stderr = sys.stderr
# Restore original streams for MCP operation
if hasattr(sys, '__stdin__'):
sys.stdin = sys.__stdin__
if hasattr(sys, '__stdout__'):
sys.stdout = sys.__stdout__
if hasattr(sys, '__stderr__'):
sys.stderr = sys.__stderr__
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options()
)
except Exception as e:
safe_log("error", f"MCP Server failed to start: {e}")
# Don't crash the whole service if MCP fails
return
finally:
# Always restore safe streams after MCP operation
try:
sys.stdin = original_stdin
sys.stdout = original_stdout
sys.stderr = original_stderr
except:
pass
def start_mcp_server_thread():
"""Start MCP server in a separate thread"""
if MCP_AVAILABLE:
def run_mcp():
try:
asyncio.run(run_mcp_server())
except Exception as e:
safe_log("error", f"MCP Server error: {e}")
mcp_thread = threading.Thread(target=run_mcp, daemon=True)
mcp_thread.start()
safe_log("info", "🔌 MCP Server thread started successfully")
else:
safe_log("warning", "⚠️ MCP not available - only Gradio interface will be active")
# Voice preset options with better descriptions
VOICE_PRESETS = [
("v2/en_speaker_0", "🗣️ Speaker 0 - Professional Male"),
("v2/en_speaker_1", "👩 Speaker 1 - Young Female"),
("v2/en_speaker_2", "👨 Speaker 2 - Mature Male"),
("v2/en_speaker_3", "🎭 Speaker 3 - Expressive Female"),
("v2/en_speaker_4", "📻 Speaker 4 - Radio Voice Male"),
("v2/en_speaker_5", "🎪 Speaker 5 - Animated Female"),
("v2/en_speaker_6", "🎯 Speaker 6 - Clear Male (Default)"),
("v2/en_speaker_7", "🌟 Speaker 7 - Warm Female"),
("v2/en_speaker_8", "🎬 Speaker 8 - Narrator Male"),
("v2/en_speaker_9", "✨ Speaker 9 - Elegant Female")
]
# Create enhanced Gradio interface for ZeroGPU
with gr.Blocks(
title="🚀 ZeroGPU TTS Service",
theme=gr.themes.Soft(),
css="""
.gradio-container {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}
.main-header {
text-align: center;
background: rgba(255,255,255,0.1);
padding: 20px;
border-radius: 10px;
margin-bottom: 20px;
}
"""
) as iface:
with gr.Row():
gr.Markdown("""
<div class="main-header">
# 🚀 ZeroGPU TTS Service
## Powered by Hugging Face Pro + Nvidia H200
Ultra-fast text-to-speech with dynamic GPU scaling
</div>
""")
with gr.Tabs():
# Single synthesis tab
with gr.TabItem("🎤 Single Synthesis"):
with gr.Row():
with gr.Column(scale=2):
text_input = gr.Textbox(
label="📝 Text to Synthesize",
placeholder="Enter the text you want to convert to speech...",
lines=6,
value="Hello! This is a test of the ZeroGPU-accelerated text-to-speech service running on Hugging Face Spaces with Nvidia H200 dynamic resources."
)
voice_dropdown = gr.Dropdown(
choices=[(desc, code) for code, desc in VOICE_PRESETS],
value="v2/en_speaker_6",
label="🎭 Voice Preset",
info="Choose different voice characteristics"
)
with gr.Row():
synthesize_btn = gr.Button("🎵 Generate Speech", variant="primary", size="lg")
clear_btn = gr.Button("🗑️ Clear", variant="secondary")
with gr.Column(scale=1):
system_info = gr.Textbox(
label="⚙️ ZeroGPU Status",
value=get_system_info(),
interactive=False,
lines=8
)
with gr.Row():
audio_output = gr.Audio(
label="🔊 Generated Speech",
type="filepath",
autoplay=False
)
status_output = gr.Textbox(
label="📊 Generation Status",
interactive=False,
lines=2
)
# Batch synthesis tab
with gr.TabItem("📦 Batch Synthesis"):
with gr.Row():
batch_input = gr.Textbox(
label="📝 Batch Text (one per line)",
placeholder="Enter multiple texts, one per line:\nHello world!\nThis is the second sentence.\nAnd this is the third.",
lines=8
)
batch_voice = gr.Dropdown(
choices=[(desc, code) for code, desc in VOICE_PRESETS],
value="v2/en_speaker_6",
label="🎭 Voice for All"
)
batch_btn = gr.Button("🚀 Generate Batch", variant="primary", size="lg")
batch_status = gr.Textbox(label="📊 Batch Status", interactive=False)
batch_results = gr.File(label="📁 Download All Audio Files", file_count="multiple")
# API Documentation tab
with gr.TabItem("🔧 API Usage"):
gr.Markdown("""
## 🔌 API Access
### Gradio Client API
Use this service programmatically with the Gradio client:
```python
from gradio_client import Client
# Connect to your ZeroGPU TTS service
client = Client("YOUR_USERNAME/tts-gpu-service")
# Generate speech
result = client.predict(
"Hello from the API!", # text
"v2/en_speaker_6", # voice_preset
api_name="/predict"
)
audio_file, status = result
print(f"Generated: {audio_file}")
print(f"Status: {status}")
```
### 🔌 MCP Protocol API
This service also supports Model Context Protocol (MCP) for integration with AI assistants:
```python
# MCP Client example (Claude Code, etc.)
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Connect to TTS service via MCP
async def use_tts_mcp():
server_params = StdioServerParameters(
command="python",
args=["app.py", "--mcp-only"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize connection
await session.initialize()
# List available tools
tools = await session.list_tools()
print("Available TTS tools:", [tool.name for tool in tools.tools])
# Synthesize speech
result = await session.call_tool("tts_synthesize", {
"text": "Hello from MCP!",
"voice_preset": "v2/en_speaker_6"
})
print("TTS Result:", result.content[0].text)
# Run MCP client
asyncio.run(use_tts_mcp())
```
### Available MCP Tools:
- **`tts_synthesize`**: Convert single text to speech
- **`tts_batch_synthesize`**: Convert multiple texts to speech
- **`tts_get_info`**: Get system status and capabilities
### 🚀 ZeroGPU Benefits:
- **Dynamic Scaling**: Resources allocated only when needed
- **H200 Performance**: Latest GPU architecture
- **Cost Efficient**: No idle costs with Pro subscription
- **High Throughput**: Optimized for batch processing
- **Dual Protocols**: Both Gradio API and MCP support
### 📊 Performance Metrics:
- **Single synthesis**: ~0.5-2s depending on text length
- **Batch processing**: Parallel execution on H200
- **Memory efficient**: Automatic cleanup after processing
- **MCP Integration**: Real-time protocol for AI assistants
""")
# Examples with ZeroGPU showcase
gr.Examples(
examples=[
["Welcome to our ZeroGPU-powered text-to-speech service running on Nvidia H200!", "v2/en_speaker_6"],
["The quick brown fox jumps over the lazy dog. This sentence tests various phonemes.", "v2/en_speaker_3"],
["Artificial intelligence is transforming how we interact with technology using advanced neural networks.", "v2/en_speaker_1"],
["This ultra-fast voice synthesis is running on Hugging Face Spaces with dynamic H200 GPU allocation.", "v2/en_speaker_8"],
["ZeroGPU technology allows for instant scaling and cost-effective AI model deployment.", "v2/en_speaker_9"]
],
inputs=[text_input, voice_dropdown],
outputs=[audio_output, status_output],
fn=synthesize_speech,
cache_examples=False,
label="🎯 ZeroGPU Examples"
)
# Event handlers with API names
synthesize_btn.click(
fn=synthesize_speech,
inputs=[text_input, voice_dropdown],
outputs=[audio_output, status_output],
api_name="predict"
)
clear_btn.click(
fn=lambda: ("", None, ""),
outputs=[text_input, audio_output, status_output]
)
def process_batch(batch_text, voice):
"""Process batch input"""
texts = [t.strip() for t in batch_text.split('\n') if t.strip()]
if not texts:
return "❌ No valid texts found", []
results, status = batch_synthesize(texts, voice)
audio_files = [r[0] for r in results if r[0]]
return status, audio_files
batch_btn.click(
fn=process_batch,
inputs=[batch_input, batch_voice],
outputs=[batch_status, batch_results]
)
# Auto-refresh system info on load
iface.load(
fn=get_system_info,
outputs=[system_info]
)
def safe_main():
"""Main function with comprehensive error handling and stream protection"""
try:
# Spaces functions already initialized at module level
print(f"[TTS-INFO] {_gpu_init}, {_cpu_init}", flush=True)
# === FINAL SAFETY MEASURES ===
# Last-chance protection against any remaining stream conflicts
# Ensure all logging is completely disabled
import logging
logging.disable(logging.CRITICAL)
# One final attempt to patch any gradio/uvicorn logging that might have been missed
try:
import gradio.helpers
if hasattr(gradio.helpers, 'create_tracker'):
# Disable gradio analytics/tracking
original_create_tracker = gradio.helpers.create_tracker
gradio.helpers.create_tracker = lambda: None
except:
pass
safe_log("info", "🚀 Initializing TTS service with comprehensive stream protection...")
# Get service mode from environment variable
# TTS_SERVICE_MODE can be: websocket, gradio, mcp, triple
# Default: websocket (as requested by user)
# Force WebSocket mode temporarily while debugging environment variables
service_mode = "websocket" # os.environ.get("TTS_SERVICE_MODE", "websocket").lower()
# Log environment variable details clearly
safe_log("info", "=" * 60)
safe_log("info", "🎛️ TTS SERVICE CONFIGURATION")
safe_log("info", "=" * 60)
safe_log("info", f"Environment Variable: TTS_SERVICE_MODE = {os.environ.get('TTS_SERVICE_MODE', 'NOT SET (using default)')}")
safe_log("info", f"Detected Mode: {service_mode}")
safe_log("info", f"Available Modes: websocket, gradio, mcp, triple")
safe_log("info", f"Default Mode: websocket")
safe_log("info", "=" * 60)
if service_mode == "mcp":
# MCP-only mode - no Gradio interface
if MCP_AVAILABLE:
safe_log("info", "🔌 Starting TTS service in MCP-only mode...")
try:
asyncio.run(run_mcp_server())
except KeyboardInterrupt:
safe_log("info", "MCP server stopped by user")
except Exception as e:
safe_log("error", f"MCP server failed: {e}")
sys.exit(1)
else:
safe_log("error", "❌ MCP not available but MCP-only mode requested")
sys.exit(1)
elif service_mode == "websocket":
# WebSocket-only mode - FastAPI with TTS WebSocket endpoints (DEFAULT)
safe_log("info", "🌐 Starting TTS service in WebSocket-only mode (DEFAULT)...")
try:
import uvicorn
fastapi_app = create_tts_fastapi_app()
# Detect if running on Hugging Face Spaces
space_id = os.environ.get("SPACE_ID")
if space_id:
base_url = f"https://{space_id.replace('/', '-')}.hf.space"
websocket_url = f"wss://{space_id.replace('/', '-')}.hf.space"
else:
base_url = "http://localhost:7860"
websocket_url = "ws://localhost:7860"
safe_log("info", "✅ TTS WebSocket Server: Starting on port 7860...")
safe_log("info", f"🔗 WebSocket Endpoints: {websocket_url}/ws/tts, {websocket_url}/ws/tts/{{client_id}}")
safe_log("info", f"📡 Status Endpoint: {base_url}/")
safe_log("info", f"💚 Health Check: {base_url}/health")
uvicorn.run(
fastapi_app,
host="0.0.0.0",
port=7860,
log_config=None,
access_log=False,
log_level="critical"
)
except Exception as e:
safe_log("error", f"Failed to start TTS WebSocket server: {e}")
sys.exit(1)
elif service_mode == "gradio":
# Gradio-only mode - Web interface only
safe_log("info", "🎨 Starting TTS service in Gradio-only mode...")
# Start Gradio interface with comprehensive error handling and stream protection
try:
safe_log("info", "✅ Gradio Interface: Starting on port 7860...")
# Final attempt to patch any remaining uvicorn logging
try:
import gradio.networking
if hasattr(gradio.networking, 'start_server'):
original_start_server = gradio.networking.start_server
def patched_start_server(*args, **kwargs):
# Force disable uvicorn logging
if 'log_config' in kwargs:
kwargs['log_config'] = None
if 'access_log' in kwargs:
kwargs['access_log'] = False
kwargs.setdefault('log_level', 'critical')
return original_start_server(*args, **kwargs)
gradio.networking.start_server = patched_start_server
except:
pass
# Try multiple launch strategies with failsafe
launch_success = False
# Strategy 1: Primary launch with error handling
try:
iface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
quiet=True,
show_error=False,
prevent_thread_lock=True,
max_threads=4
)
launch_success = True
except Exception as e1:
safe_log("warning", f"Primary launch failed: {e1}")
# Strategy 2: Minimal configuration
try:
iface.launch(
server_name="0.0.0.0",
server_port=7860,
quiet=True
)
launch_success = True
except Exception as e2:
safe_log("error", f"All Gradio launch strategies failed: {e2}")
sys.exit(1)
if not launch_success:
safe_log("error", "Failed to start Gradio interface")
sys.exit(1)
except Exception as e:
safe_log("error", f"Unexpected error starting Gradio interface: {e}")
sys.exit(1)
elif service_mode == "triple":
# Triple mode - both Gradio, MCP, and WebSocket
safe_log("info", "🚀 Starting TTS service with dual protocol support...")
# Start MCP server in background thread with error handling
if MCP_AVAILABLE:
try:
start_mcp_server_thread()
safe_log("info", "✅ MCP Server: Available on stdio protocol")
except Exception as e:
safe_log("warning", f"⚠️ MCP Server failed to start: {e}")
safe_log("info", "Continuing with Gradio-only mode...")
else:
safe_log("warning", "⚠️ MCP Server: Not available")
# Start Gradio interface with comprehensive error handling and stream protection
try:
safe_log("info", "✅ Gradio Interface: Starting on port 7860...")
# Final attempt to patch any remaining uvicorn logging
try:
import gradio.networking
if hasattr(gradio.networking, 'start_server'):
original_start_server = gradio.networking.start_server
def patched_start_server(*args, **kwargs):
# Force disable uvicorn logging
if 'log_config' in kwargs:
kwargs['log_config'] = None
if 'access_log' in kwargs:
kwargs['access_log'] = False
kwargs.setdefault('log_level', 'critical')
return original_start_server(*args, **kwargs)
gradio.networking.start_server = patched_start_server
except:
pass
# Try multiple launch strategies with failsafe
launch_success = False
# Strategy 1: Direct launch with stream protection
try:
iface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=False, # Disable error display to avoid stream issues
quiet=True, # Reduce Gradio logging
max_threads=4, # Limit threads for ZeroGPU
prevent_thread_lock=True, # Prevent threading issues
show_tips=False, # Reduce output
enable_monitoring=False # Disable monitoring to reduce logging
)
launch_success = True
except Exception as e1:
safe_log("warning", f"Primary launch failed: {e1}")
# Strategy 2: Minimal launch configuration
try:
safe_log("info", "Attempting minimal launch configuration...")
iface.launch(
server_name="0.0.0.0",
server_port=7860,
quiet=True,
show_error=False
)
launch_success = True
except Exception as e2:
safe_log("warning", f"Minimal launch failed: {e2}")
# Strategy 3: Last resort - basic launch
try:
safe_log("info", "Attempting basic launch...")
iface.launch(quiet=True)
launch_success = True
except Exception as e3:
safe_log("error", f"All launch strategies failed: {e3}")
if not launch_success:
safe_log("error", "Failed to start Gradio interface with all strategies")
sys.exit(1)
except Exception as e:
safe_log("error", f"Unexpected error starting Gradio interface: {e}")
# Don't exit - try to continue running for debugging
safe_log("info", "Service may still be accessible despite launch errors")
else:
safe_log("error", f"❌ Invalid TTS_SERVICE_MODE: {service_mode}")
safe_log("info", "Valid modes: websocket (default), gradio, mcp, triple")
safe_log("info", "Set environment variable: TTS_SERVICE_MODE=websocket")
sys.exit(1)
except Exception as e:
# Ultimate safety net
try:
safe_log("critical", f"Critical error in main: {e}")
except:
# Even safe_log failed - use basic print
print(f"[TTS-CRITICAL] Fatal error: {e}", flush=True)
# Try to provide some debugging info before exiting
try:
print("[TTS-DEBUG] Python version:", sys.version, flush=True)
print("[TTS-DEBUG] Current working directory:", os.getcwd(), flush=True)
if torch.cuda.is_available():
print(f"[TTS-DEBUG] CUDA available: {torch.cuda.get_device_name(0)}", flush=True)
else:
print("[TTS-DEBUG] CUDA not available", flush=True)
except:
pass
sys.exit(1)
# Launch the TTS app optimized for ZeroGPU with dual protocol support
if __name__ == "__main__":
safe_main()