import os import sys import warnings from typing import List, Dict, Any, Optional # === CRITICAL: COMPLETE STREAM PROTECTION SETUP === # This must happen BEFORE any other imports that might configure logging # 1. Completely disable warnings to prevent stream conflicts warnings.filterwarnings("ignore") os.environ["PYTHONWARNINGS"] = "ignore" os.environ["TRANSFORMERS_VERBOSITY"] = "error" os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" os.environ["GRADIO_ALLOW_FLAGGING"] = "never" os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" os.environ["GRADIO_SERVER_PORT"] = "7860" # 2. Replace stdout/stderr with safe alternatives BEFORE any imports class SafeStream: """Safe stream that never raises I/O errors - MCP compatible""" def __init__(self, fallback_name): self.fallback_name = fallback_name self.closed = False # Add buffer attribute for MCP compatibility self.buffer = self def write(self, text): try: if hasattr(sys, f'__{self.fallback_name}__'): getattr(sys, f'__{self.fallback_name}__').write(text) else: # Ultimate fallback - do nothing rather than crash pass except: pass # Never raise exceptions from write return len(text) if isinstance(text, str) else 0 def flush(self): try: if hasattr(sys, f'__{self.fallback_name}__'): getattr(sys, f'__{self.fallback_name}__').flush() except: pass def isatty(self): return False # Always return False to prevent tty-related errors def fileno(self): raise OSError("fileno not supported") # Prevent fileno access def readable(self): return False # For MCP compatibility def writable(self): return True # For MCP compatibility def seekable(self): return False # For MCP compatibility # Install safe streams BEFORE any other imports sys.stdout = SafeStream('stdout') sys.stderr = SafeStream('stderr') # 3. Completely disable the logging module to prevent any stream conflicts import logging logging.disable(logging.CRITICAL) # 4. Patch uvicorn.Config to prevent it from configuring logging try: import uvicorn.config original_configure_logging = uvicorn.config.Config.configure_logging def patched_configure_logging(self): """Completely disable uvicorn logging configuration""" # Do absolutely nothing - prevent uvicorn from touching streams pass uvicorn.config.Config.configure_logging = patched_configure_logging except: pass # If uvicorn not available yet, we'll patch it later # 5. Now safe to import other modules import gradio as gr import torch import torchaudio from transformers import AutoProcessor, BarkModel import numpy as np import io import time from huggingface_hub import login import spaces # Required for ZeroGPU # Dummy GPU function to satisfy ZeroGPU startup requirements # This ensures @spaces.GPU is detected during startup even in WebSocket-only mode @spaces.GPU def _dummy_gpu_function(): """Dummy function to satisfy ZeroGPU startup detection""" return "GPU available" # Regular CPU function (no decorator needed) def _dummy_cpu_function(): """Regular CPU function for system info""" return "CPU available" # Initialize functions at module level for Spaces detection _gpu_init = _dummy_gpu_function() _cpu_init = _dummy_cpu_function() import asyncio import threading import json import base64 from datetime import datetime from fastapi import FastAPI, WebSocket, WebSocketDisconnect # 6. Additional uvicorn patching after import try: import uvicorn import uvicorn.server import uvicorn.main # Patch uvicorn.Server to disable logging if hasattr(uvicorn.server, 'Server'): original_init = uvicorn.server.Server.__init__ def patched_init(self, config): # Force disable logging in config config.log_config = None config.access_log = False config.log_level = "critical" original_init(self, config) uvicorn.server.Server.__init__ = patched_init # Patch uvicorn.run to disable logging original_run = uvicorn.run def patched_run(*args, **kwargs): kwargs['log_config'] = None kwargs['access_log'] = False kwargs['log_level'] = 'critical' return original_run(*args, **kwargs) uvicorn.run = patched_run except: pass # 7. Disable specific library loggers that cause conflicts try: for logger_name in [ 'httpx', 'gradio', 'uvicorn', 'transformers', 'torch', 'torchaudio', 'bark', 'scipy', 'asyncio', 'ffmpeg', 'uvicorn.access', 'uvicorn.error', 'gradio.routes' ]: logger = logging.getLogger(logger_name) logger.disabled = True logger.propagate = False logger.handlers = [] logger.setLevel(logging.CRITICAL + 1) except Exception: pass # Ignore any logging setup errors # 8. Also disable root logger handlers to prevent conflicts try: root_logger = logging.getLogger() root_logger.handlers = [] root_logger.disabled = True root_logger.setLevel(logging.CRITICAL + 1) except Exception: pass # Simple print-based logging to avoid all stream conflicts def safe_log(level, message): """Bulletproof logging using only print statements""" print(f"[TTS-{level.upper()}] {message}", flush=True) # MCP Server imports try: from mcp.server import Server from mcp.types import Tool, TextContent import mcp.server.stdio MCP_AVAILABLE = True except ImportError: MCP_AVAILABLE = False safe_log("warning", "MCP not available. Install with: pip install mcp>=1.0.0") # Logging completely disabled to prevent stream conflicts in ZeroGPU # MCP Server instance mcp_server = None if MCP_AVAILABLE: mcp_server = Server("tts-gpu-service") # Global variables for model processor = None model = None device = None def load_model(): """Load the TTS model - optimized for ZeroGPU""" global processor, model, device safe_log("info", "Loading TTS model for ZeroGPU...") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") safe_log("info", f"Using device: {device}") try: # Use Bark model for high-quality TTS processor = AutoProcessor.from_pretrained("suno/bark-small") model = BarkModel.from_pretrained( "suno/bark-small", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None ) # Ensure model is on the correct device if torch.cuda.is_available(): model = model.to(device) safe_log("info", f"TTS model loaded successfully on {device}!") return True except Exception as e: safe_log("error", f"Error loading model: {e}") return False @spaces.GPU # This decorator enables ZeroGPU for this function def synthesize_speech(text, voice_preset="v2/en_speaker_6"): """Synthesize speech from text - ZeroGPU accelerated""" global processor, model, device if not text.strip(): return None, "Please enter some text to synthesize." try: # Load model if not already loaded if model is None: success = load_model() if not success: return None, "Error: Could not load TTS model." safe_log("info", f"Synthesizing with ZeroGPU: {text[:50]}...") start_time = time.time() # Process text with voice preset - ensure return_tensors='pt' inputs = processor(text, voice_preset=voice_preset, return_tensors="pt") # Generate audio with ZeroGPU acceleration with torch.no_grad(): # Ensure all inputs are on the correct device if torch.cuda.is_available() and device.type == 'cuda': # Move all tensor inputs to GPU recursively def move_to_device(obj, target_device): if isinstance(obj, torch.Tensor): return obj.to(target_device) elif isinstance(obj, dict): return {k: move_to_device(v, target_device) for k, v in obj.items()} elif isinstance(obj, list): return [move_to_device(item, target_device) for item in obj] elif isinstance(obj, tuple): return tuple(move_to_device(item, target_device) for item in obj) else: return obj inputs = move_to_device(inputs, device) # Also ensure model is on correct device model = model.to(device) # Debug: log device info safe_log("info", f"Model device: {next(model.parameters()).device}") for k, v in inputs.items(): if isinstance(v, torch.Tensor): safe_log("info", f"Input {k} device: {v.device}") # Generate without mixed precision first to isolate the issue try: audio_array = model.generate(**inputs) except Exception as e: safe_log("error", f"Generation failed: {e}") # Try with CPU fallback safe_log("info", "Attempting CPU fallback...") model = model.cpu() inputs = move_to_device(inputs, torch.device('cpu')) audio_array = model.generate(**inputs) # Convert to numpy and ensure it's on CPU with correct dtype if torch.cuda.is_available(): audio_array = audio_array.cpu() # Convert to float32 for torchaudio compatibility if audio_array.dtype == torch.float16: audio_array = audio_array.float() audio_array = audio_array.numpy().squeeze() # Get sample rate sample_rate = model.generation_config.sample_rate # Save to temporary file for Gradio - ensure float32 tensor output_path = "temp_audio.wav" audio_tensor = torch.from_numpy(audio_array).unsqueeze(0).float() torchaudio.save(output_path, audio_tensor, sample_rate) generation_time = time.time() - start_time gpu_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU" status_message = f"✅ Generated in {generation_time:.2f}s on {gpu_name} (ZeroGPU)" return output_path, status_message except Exception as e: error_msg = f"❌ Error during synthesis: {str(e)}" safe_log("error", error_msg) return None, error_msg @spaces.GPU # ZeroGPU for batch processing def batch_synthesize(text_list, voice_preset="v2/en_speaker_6"): """Batch synthesis with ZeroGPU optimization""" results = [] start_time = time.time() for i, text in enumerate(text_list): if text.strip(): audio_path, status = synthesize_speech(text, voice_preset) results.append((audio_path, f"Item {i+1}: {status}")) else: results.append((None, f"Item {i+1}: Empty text skipped")) total_time = time.time() - start_time batch_status = f"🚀 Batch completed: {len(text_list)} items in {total_time:.2f}s" return results, batch_status def get_system_info(): """Get system information including ZeroGPU details""" info = { "🚀 ZeroGPU": "Active" if torch.cuda.is_available() else "Not Available", "🎯 GPU Name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU Only", "💾 GPU Memory": f"{torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB" if torch.cuda.is_available() else "N/A", "⚡ CUDA Version": torch.version.cuda if torch.cuda.is_available() else "N/A", "🔧 PyTorch": torch.__version__, "🤖 Model Status": "✅ Loaded" if model is not None else "💤 Lazy Loading (ZeroGPU optimized)", "🎛️ Mixed Precision": "✅ Enabled" if torch.cuda.is_available() else "❌ CPU Mode", "🔌 MCP Server": "✅ Available" if MCP_AVAILABLE else "❌ Not Available", "🌐 WebSocket TTS": "✅ Ready" if model is not None else "💤 Ready (lazy loading)" } return "\n".join([f"{k}: {v}" for k, v in info.items()]) # WebSocket TTS Handler for Real-time Text-to-Speech class WebSocketTTSHandler: """WebSocket handler for real-time TTS integration with ChatCal WebRTC""" def __init__(self): self.active_connections = {} async def connect(self, websocket: WebSocket, client_id: str): """Accept WebSocket connection for TTS service""" await websocket.accept() self.active_connections[client_id] = websocket safe_log("info", f"🔌 TTS WebSocket client {client_id} connected") # Send connection confirmation with service info await self.send_message(client_id, { "type": "tts_connection_confirmed", "client_id": client_id, "timestamp": datetime.now().isoformat(), "service": "tts-gpu-service", "model_status": "✅ Loaded" if model is not None else "⏳ Loading", "zerogpu_status": "✅ Active" if torch.cuda.is_available() else "❌ Not Available", "available_voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"] }) async def disconnect(self, client_id: str): """Clean up connection""" if client_id in self.active_connections: del self.active_connections[client_id] safe_log("info", f"🔌 TTS WebSocket client {client_id} disconnected") async def send_message(self, client_id: str, message: dict): """Send JSON message to client""" if client_id in self.active_connections: websocket = self.active_connections[client_id] try: await websocket.send_text(json.dumps(message)) except Exception as e: safe_log("error", f"Failed to send message to TTS client {client_id}: {e}") await self.disconnect(client_id) async def handle_streaming_text_synthesis(self, client_id: str, text_chunks: list, voice_preset: str = "v2/en_speaker_6", is_final: bool = True): """Process streaming text synthesis following unmute.sh methodology""" try: # UNMUTE.SH METHODOLOGY: Process text chunks in streaming fashion safe_log("info", f"🔊 TTS STREAMING: Processing {len(text_chunks)} chunks from {client_id} (final={is_final})") if is_final: # FLUSH TRICK: Process all accumulated text at once for best quality complete_text = " ".join(text_chunks).strip() if complete_text: safe_log("info", f"🔊 TTS FLUSH: Final synthesis for {client_id}: {complete_text[:50]}...") # Use the existing ZeroGPU synthesize_speech function audio_path, status = synthesize_speech(complete_text, voice_preset) if audio_path and "✅" in status: # Read the generated audio file with open(audio_path, 'rb') as audio_file: audio_data = audio_file.read() # Encode audio as base64 for WebSocket transmission audio_b64 = base64.b64encode(audio_data).decode('utf-8') # Send successful synthesis with streaming metadata await self.send_message(client_id, { "type": "tts_streaming_response", "audio_data": audio_b64, "audio_format": "wav", "text": complete_text, "text_chunks": text_chunks, "voice_preset": voice_preset, "timestamp": datetime.now().isoformat(), "audio_size": len(audio_data), "status": status, "is_final": is_final, "streaming_method": "unmute.sh_flush_trick" }) safe_log("info", f"🔊 TTS STREAMING: Final audio sent to {client_id} ({len(audio_data)} bytes)") # Clean up temporary file import os try: os.unlink(audio_path) except: pass else: # Send error message await self.send_message(client_id, { "type": "tts_streaming_error", "message": f"TTS streaming synthesis failed: {status}", "text": complete_text, "is_final": is_final, "timestamp": datetime.now().isoformat() }) else: # Empty final flush safe_log("info", f"🔊 TTS FLUSH: Empty final text for {client_id}") else: # STREAMING: Send partial progress update (no audio yet) await self.send_message(client_id, { "type": "tts_streaming_progress", "message": f"Buffering text chunks: {len(text_chunks)}", "text_chunks": text_chunks[-3:], # Show last 3 chunks for progress "is_final": is_final, "timestamp": datetime.now().isoformat() }) safe_log("info", f"🔊 TTS STREAMING: Progress update sent to {client_id} ({len(text_chunks)} chunks)") except Exception as e: safe_log("error", f"TTS streaming error for {client_id}: {e}") await self.send_message(client_id, { "type": "tts_streaming_error", "message": f"TTS streaming error: {str(e)}", "is_final": is_final, "timestamp": datetime.now().isoformat() }) async def handle_text_synthesis(self, client_id: str, text: str, voice_preset: str = "v2/en_speaker_6"): """Process text synthesis with real TTS service (legacy single-shot method)""" try: safe_log("info", f"🔊 TTS: Processing text from {client_id}: {text[:50]}...") # Use streaming method with single chunk for consistency await self.handle_streaming_text_synthesis(client_id, [text], voice_preset, is_final=True) except Exception as e: safe_log("error", f"TTS WebSocket error for {client_id}: {e}") await self.send_message(client_id, { "type": "tts_error", "message": f"TTS processing error: {str(e)}", "timestamp": datetime.now().isoformat() }) async def handle_message(self, client_id: str, message_data: dict): """Handle different types of WebSocket messages""" message_type = message_data.get("type") if message_type == "tts_synthesize": # Text-to-speech synthesis request (legacy single-shot) text = message_data.get("text", "") voice_preset = message_data.get("voice_preset", "v2/en_speaker_6") if text.strip(): await self.handle_text_synthesis(client_id, text, voice_preset) else: await self.send_message(client_id, { "type": "tts_error", "message": "Empty text provided for synthesis", "timestamp": datetime.now().isoformat() }) elif message_type == "tts_streaming_synthesize": # Streaming text-to-speech synthesis request (unmute.sh methodology) text_chunks = message_data.get("text_chunks", []) voice_preset = message_data.get("voice_preset", "v2/en_speaker_6") is_final = message_data.get("is_final", True) if text_chunks: await self.handle_streaming_text_synthesis(client_id, text_chunks, voice_preset, is_final) else: await self.send_message(client_id, { "type": "tts_streaming_error", "message": "Empty text chunks provided for streaming synthesis", "is_final": is_final, "timestamp": datetime.now().isoformat() }) elif message_type == "tts_get_voices": # Request available voice presets await self.send_message(client_id, { "type": "tts_voices_list", "voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"], "timestamp": datetime.now().isoformat() }) elif message_type == "tts_get_streaming_info": # Request streaming capabilities info await self.send_message(client_id, { "type": "tts_streaming_info", "streaming_supported": True, "methodology": "unmute.sh with flush trick", "message_types": { "tts_streaming_synthesize": "Send text chunks for streaming processing", "tts_streaming_response": "Receive final audio with streaming metadata", "tts_streaming_progress": "Receive progress updates during buffering", "tts_streaming_error": "Receive streaming-specific error messages" }, "flush_trick": "Set is_final=true to trigger synthesis of all buffered chunks", "timestamp": datetime.now().isoformat() }) else: safe_log("warning", f"Unknown TTS message type from {client_id}: {message_type}") # Global TTS WebSocket handler tts_websocket_handler = WebSocketTTSHandler() # FastAPI WebSocket Integration for TTS Service def create_tts_fastapi_app(): """Create FastAPI app with TTS WebSocket endpoint""" import uuid app = FastAPI( title="TTS GPU Service WebSocket API", description="Real-time Text-to-Speech with ZeroGPU acceleration", version="1.0.0" ) @app.websocket("/ws/tts") async def tts_websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time TTS""" client_id = str(uuid.uuid4()) try: await tts_websocket_handler.connect(websocket, client_id) while True: # Receive JSON message from client data = await websocket.receive_text() try: message = json.loads(data) await tts_websocket_handler.handle_message(client_id, message) except json.JSONDecodeError as e: await tts_websocket_handler.send_message(client_id, { "type": "tts_error", "message": f"Invalid JSON format: {str(e)}", "timestamp": datetime.now().isoformat() }) except WebSocketDisconnect: await tts_websocket_handler.disconnect(client_id) except Exception as e: safe_log("error", f"TTS WebSocket endpoint error: {e}") await tts_websocket_handler.disconnect(client_id) @app.websocket("/ws/tts/{client_id}") async def tts_websocket_with_id(websocket: WebSocket, client_id: str): """WebSocket endpoint with specific client ID""" try: await tts_websocket_handler.connect(websocket, client_id) while True: data = await websocket.receive_text() try: message = json.loads(data) await tts_websocket_handler.handle_message(client_id, message) except json.JSONDecodeError as e: await tts_websocket_handler.send_message(client_id, { "type": "tts_error", "message": f"Invalid JSON format: {str(e)}", "timestamp": datetime.now().isoformat() }) except WebSocketDisconnect: await tts_websocket_handler.disconnect(client_id) except Exception as e: safe_log("error", f"TTS WebSocket endpoint error: {e}") await tts_websocket_handler.disconnect(client_id) @app.get("/") async def tts_root(): """TTS service status endpoint""" return { "service": "tts-gpu-service", "status": "✅ Ready" if model is not None else "⏳ Loading", "zerogpu": "✅ Active" if torch.cuda.is_available() else "❌ Not Available", "websocket_endpoints": ["/ws/tts", "/ws/tts/{client_id}"], "available_voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"], "model": "suno/bark" } @app.get("/health") async def tts_health(): """Health check endpoint with detailed status""" return { "status": "healthy", "model_loaded": model is not None, "gpu_available": torch.cuda.is_available(), "loading_strategy": "lazy (ZeroGPU optimized)", "note": "Model loads on first synthesis request to optimize GPU usage" } async def preload_model_logic(): """Shared logic for model preloading""" global model if model is not None: return {"status": "success", "message": "Model already loaded", "model_loaded": True} try: success = load_model() if success: return {"status": "success", "message": "Model preloaded successfully", "model_loaded": True} else: return {"status": "error", "message": "Failed to preload model", "model_loaded": False} except Exception as e: return {"status": "error", "message": f"Preload error: {str(e)}", "model_loaded": False} @app.get("/preload") async def preload_model_get(): """Preload the TTS model via GET (browser-friendly)""" return await preload_model_logic() @app.post("/preload") async def preload_model_post(): """Preload the TTS model via POST (API-friendly)""" return await preload_model_logic() return app # MCP Tool Definitions and Handlers if MCP_AVAILABLE: @mcp_server.list_tools() async def handle_list_tools() -> List[Tool]: """List available MCP tools for TTS service""" return [ Tool( name="tts_synthesize", description="Synthesize speech from text using Bark TTS model with ZeroGPU acceleration", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "Text to convert to speech" }, "voice_preset": { "type": "string", "description": "Voice preset identifier (e.g., 'v2/en_speaker_6', 'v2/en_speaker_1')", "default": "v2/en_speaker_6" } }, "required": ["text"] } ), Tool( name="tts_batch_synthesize", description="Synthesize speech from multiple texts in batch with ZeroGPU optimization", inputSchema={ "type": "object", "properties": { "text_list": { "type": "array", "items": {"type": "string"}, "description": "List of texts to convert to speech" }, "voice_preset": { "type": "string", "description": "Voice preset for all texts", "default": "v2/en_speaker_6" } }, "required": ["text_list"] } ), Tool( name="tts_get_info", description="Get system information including ZeroGPU status and TTS service capabilities", inputSchema={ "type": "object", "properties": {}, "required": [] } ) ] @mcp_server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle MCP tool calls for TTS operations""" try: if name == "tts_synthesize": text = arguments.get("text", "") voice_preset = arguments.get("voice_preset", "v2/en_speaker_6") if not text.strip(): return [TextContent( type="text", text=json.dumps({ "error": "No text provided for synthesis", "status": "❌ Empty text", "audio_file": None }) )] # Use the existing synthesize_speech function audio_path, status = synthesize_speech(text, voice_preset) result = { "status": status, "audio_file": audio_path, "text": text, "voice_preset": voice_preset, "success": audio_path is not None } return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "tts_batch_synthesize": text_list = arguments.get("text_list", []) voice_preset = arguments.get("voice_preset", "v2/en_speaker_6") if not text_list: return [TextContent( type="text", text=json.dumps({ "error": "No texts provided for batch synthesis", "status": "❌ Empty list", "results": [] }) )] # Use the existing batch_synthesize function results, batch_status = batch_synthesize(text_list, voice_preset) # Format results for MCP response formatted_results = [] for i, (audio_path, status) in enumerate(results): formatted_results.append({ "index": i, "text": text_list[i] if i < len(text_list) else "", "audio_file": audio_path, "status": status, "success": audio_path is not None }) result = { "batch_status": batch_status, "results": formatted_results, "total_items": len(text_list), "voice_preset": voice_preset } return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "tts_get_info": # Use the existing get_system_info function system_info = get_system_info() # Also include MCP-specific information info_dict = { "system_info": system_info, "mcp_status": "✅ MCP Server Active", "available_tools": ["tts_synthesize", "tts_batch_synthesize", "tts_get_info"], "voice_presets": [ {"code": code, "description": desc} for code, desc in VOICE_PRESETS ], "service_endpoints": { "gradio_interface": "http://localhost:7860", "mcp_protocol": "stdio" }, "model_info": { "name": "suno/bark-small", "type": "Text-to-Speech", "accelerated": "ZeroGPU" } } return [TextContent( type="text", text=json.dumps(info_dict, indent=2) )] else: return [TextContent( type="text", text=json.dumps({ "error": f"Unknown tool: {name}", "available_tools": ["tts_synthesize", "tts_batch_synthesize", "tts_get_info"] }) )] except Exception as e: safe_log("error", f"Error in MCP tool '{name}': {str(e)}") return [TextContent( type="text", text=json.dumps({ "error": f"Tool execution failed: {str(e)}", "tool": name, "arguments": arguments }) )] async def run_mcp_server(): """Run the MCP server in stdio mode with temporary stream restoration""" safe_log("info", "🔌 Starting MCP Server for TTS service...") try: # Temporarily restore original streams for MCP original_stdin = sys.stdin original_stdout = sys.stdout original_stderr = sys.stderr # Restore original streams for MCP operation if hasattr(sys, '__stdin__'): sys.stdin = sys.__stdin__ if hasattr(sys, '__stdout__'): sys.stdout = sys.__stdout__ if hasattr(sys, '__stderr__'): sys.stderr = sys.__stderr__ async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await mcp_server.run( read_stream, write_stream, mcp_server.create_initialization_options() ) except Exception as e: safe_log("error", f"MCP Server failed to start: {e}") # Don't crash the whole service if MCP fails return finally: # Always restore safe streams after MCP operation try: sys.stdin = original_stdin sys.stdout = original_stdout sys.stderr = original_stderr except: pass def start_mcp_server_thread(): """Start MCP server in a separate thread""" if MCP_AVAILABLE: def run_mcp(): try: asyncio.run(run_mcp_server()) except Exception as e: safe_log("error", f"MCP Server error: {e}") mcp_thread = threading.Thread(target=run_mcp, daemon=True) mcp_thread.start() safe_log("info", "🔌 MCP Server thread started successfully") else: safe_log("warning", "⚠️ MCP not available - only Gradio interface will be active") # Voice preset options with better descriptions VOICE_PRESETS = [ ("v2/en_speaker_0", "🗣️ Speaker 0 - Professional Male"), ("v2/en_speaker_1", "👩 Speaker 1 - Young Female"), ("v2/en_speaker_2", "👨 Speaker 2 - Mature Male"), ("v2/en_speaker_3", "🎭 Speaker 3 - Expressive Female"), ("v2/en_speaker_4", "📻 Speaker 4 - Radio Voice Male"), ("v2/en_speaker_5", "🎪 Speaker 5 - Animated Female"), ("v2/en_speaker_6", "🎯 Speaker 6 - Clear Male (Default)"), ("v2/en_speaker_7", "🌟 Speaker 7 - Warm Female"), ("v2/en_speaker_8", "🎬 Speaker 8 - Narrator Male"), ("v2/en_speaker_9", "✨ Speaker 9 - Elegant Female") ] # Create enhanced Gradio interface for ZeroGPU with gr.Blocks( title="🚀 ZeroGPU TTS Service", theme=gr.themes.Soft(), css=""" .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); } .main-header { text-align: center; background: rgba(255,255,255,0.1); padding: 20px; border-radius: 10px; margin-bottom: 20px; } """ ) as iface: with gr.Row(): gr.Markdown("""
# 🚀 ZeroGPU TTS Service ## Powered by Hugging Face Pro + Nvidia H200 Ultra-fast text-to-speech with dynamic GPU scaling
""") with gr.Tabs(): # Single synthesis tab with gr.TabItem("🎤 Single Synthesis"): with gr.Row(): with gr.Column(scale=2): text_input = gr.Textbox( label="📝 Text to Synthesize", placeholder="Enter the text you want to convert to speech...", lines=6, value="Hello! This is a test of the ZeroGPU-accelerated text-to-speech service running on Hugging Face Spaces with Nvidia H200 dynamic resources." ) voice_dropdown = gr.Dropdown( choices=[(desc, code) for code, desc in VOICE_PRESETS], value="v2/en_speaker_6", label="🎭 Voice Preset", info="Choose different voice characteristics" ) with gr.Row(): synthesize_btn = gr.Button("🎵 Generate Speech", variant="primary", size="lg") clear_btn = gr.Button("🗑️ Clear", variant="secondary") with gr.Column(scale=1): system_info = gr.Textbox( label="⚙️ ZeroGPU Status", value=get_system_info(), interactive=False, lines=8 ) with gr.Row(): audio_output = gr.Audio( label="🔊 Generated Speech", type="filepath", autoplay=False ) status_output = gr.Textbox( label="📊 Generation Status", interactive=False, lines=2 ) # Batch synthesis tab with gr.TabItem("📦 Batch Synthesis"): with gr.Row(): batch_input = gr.Textbox( label="📝 Batch Text (one per line)", placeholder="Enter multiple texts, one per line:\nHello world!\nThis is the second sentence.\nAnd this is the third.", lines=8 ) batch_voice = gr.Dropdown( choices=[(desc, code) for code, desc in VOICE_PRESETS], value="v2/en_speaker_6", label="🎭 Voice for All" ) batch_btn = gr.Button("🚀 Generate Batch", variant="primary", size="lg") batch_status = gr.Textbox(label="📊 Batch Status", interactive=False) batch_results = gr.File(label="📁 Download All Audio Files", file_count="multiple") # API Documentation tab with gr.TabItem("🔧 API Usage"): gr.Markdown(""" ## 🔌 API Access ### Gradio Client API Use this service programmatically with the Gradio client: ```python from gradio_client import Client # Connect to your ZeroGPU TTS service client = Client("YOUR_USERNAME/tts-gpu-service") # Generate speech result = client.predict( "Hello from the API!", # text "v2/en_speaker_6", # voice_preset api_name="/predict" ) audio_file, status = result print(f"Generated: {audio_file}") print(f"Status: {status}") ``` ### 🔌 MCP Protocol API This service also supports Model Context Protocol (MCP) for integration with AI assistants: ```python # MCP Client example (Claude Code, etc.) import asyncio from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client # Connect to TTS service via MCP async def use_tts_mcp(): server_params = StdioServerParameters( command="python", args=["app.py", "--mcp-only"] ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize connection await session.initialize() # List available tools tools = await session.list_tools() print("Available TTS tools:", [tool.name for tool in tools.tools]) # Synthesize speech result = await session.call_tool("tts_synthesize", { "text": "Hello from MCP!", "voice_preset": "v2/en_speaker_6" }) print("TTS Result:", result.content[0].text) # Run MCP client asyncio.run(use_tts_mcp()) ``` ### Available MCP Tools: - **`tts_synthesize`**: Convert single text to speech - **`tts_batch_synthesize`**: Convert multiple texts to speech - **`tts_get_info`**: Get system status and capabilities ### 🚀 ZeroGPU Benefits: - **Dynamic Scaling**: Resources allocated only when needed - **H200 Performance**: Latest GPU architecture - **Cost Efficient**: No idle costs with Pro subscription - **High Throughput**: Optimized for batch processing - **Dual Protocols**: Both Gradio API and MCP support ### 📊 Performance Metrics: - **Single synthesis**: ~0.5-2s depending on text length - **Batch processing**: Parallel execution on H200 - **Memory efficient**: Automatic cleanup after processing - **MCP Integration**: Real-time protocol for AI assistants """) # Examples with ZeroGPU showcase gr.Examples( examples=[ ["Welcome to our ZeroGPU-powered text-to-speech service running on Nvidia H200!", "v2/en_speaker_6"], ["The quick brown fox jumps over the lazy dog. This sentence tests various phonemes.", "v2/en_speaker_3"], ["Artificial intelligence is transforming how we interact with technology using advanced neural networks.", "v2/en_speaker_1"], ["This ultra-fast voice synthesis is running on Hugging Face Spaces with dynamic H200 GPU allocation.", "v2/en_speaker_8"], ["ZeroGPU technology allows for instant scaling and cost-effective AI model deployment.", "v2/en_speaker_9"] ], inputs=[text_input, voice_dropdown], outputs=[audio_output, status_output], fn=synthesize_speech, cache_examples=False, label="🎯 ZeroGPU Examples" ) # Event handlers with API names synthesize_btn.click( fn=synthesize_speech, inputs=[text_input, voice_dropdown], outputs=[audio_output, status_output], api_name="predict" ) clear_btn.click( fn=lambda: ("", None, ""), outputs=[text_input, audio_output, status_output] ) def process_batch(batch_text, voice): """Process batch input""" texts = [t.strip() for t in batch_text.split('\n') if t.strip()] if not texts: return "❌ No valid texts found", [] results, status = batch_synthesize(texts, voice) audio_files = [r[0] for r in results if r[0]] return status, audio_files batch_btn.click( fn=process_batch, inputs=[batch_input, batch_voice], outputs=[batch_status, batch_results] ) # Auto-refresh system info on load iface.load( fn=get_system_info, outputs=[system_info] ) def safe_main(): """Main function with comprehensive error handling and stream protection""" try: # Spaces functions already initialized at module level print(f"[TTS-INFO] {_gpu_init}, {_cpu_init}", flush=True) # === FINAL SAFETY MEASURES === # Last-chance protection against any remaining stream conflicts # Ensure all logging is completely disabled import logging logging.disable(logging.CRITICAL) # One final attempt to patch any gradio/uvicorn logging that might have been missed try: import gradio.helpers if hasattr(gradio.helpers, 'create_tracker'): # Disable gradio analytics/tracking original_create_tracker = gradio.helpers.create_tracker gradio.helpers.create_tracker = lambda: None except: pass safe_log("info", "🚀 Initializing TTS service with comprehensive stream protection...") # Get service mode from environment variable # TTS_SERVICE_MODE can be: websocket, gradio, mcp, triple # Default: websocket (as requested by user) # Force WebSocket mode temporarily while debugging environment variables service_mode = "websocket" # os.environ.get("TTS_SERVICE_MODE", "websocket").lower() # Log environment variable details clearly safe_log("info", "=" * 60) safe_log("info", "🎛️ TTS SERVICE CONFIGURATION") safe_log("info", "=" * 60) safe_log("info", f"Environment Variable: TTS_SERVICE_MODE = {os.environ.get('TTS_SERVICE_MODE', 'NOT SET (using default)')}") safe_log("info", f"Detected Mode: {service_mode}") safe_log("info", f"Available Modes: websocket, gradio, mcp, triple") safe_log("info", f"Default Mode: websocket") safe_log("info", "=" * 60) if service_mode == "mcp": # MCP-only mode - no Gradio interface if MCP_AVAILABLE: safe_log("info", "🔌 Starting TTS service in MCP-only mode...") try: asyncio.run(run_mcp_server()) except KeyboardInterrupt: safe_log("info", "MCP server stopped by user") except Exception as e: safe_log("error", f"MCP server failed: {e}") sys.exit(1) else: safe_log("error", "❌ MCP not available but MCP-only mode requested") sys.exit(1) elif service_mode == "websocket": # WebSocket-only mode - FastAPI with TTS WebSocket endpoints (DEFAULT) safe_log("info", "🌐 Starting TTS service in WebSocket-only mode (DEFAULT)...") try: import uvicorn fastapi_app = create_tts_fastapi_app() # Detect if running on Hugging Face Spaces space_id = os.environ.get("SPACE_ID") if space_id: base_url = f"https://{space_id.replace('/', '-')}.hf.space" websocket_url = f"wss://{space_id.replace('/', '-')}.hf.space" else: base_url = "http://localhost:7860" websocket_url = "ws://localhost:7860" safe_log("info", "✅ TTS WebSocket Server: Starting on port 7860...") safe_log("info", f"🔗 WebSocket Endpoints: {websocket_url}/ws/tts, {websocket_url}/ws/tts/{{client_id}}") safe_log("info", f"📡 Status Endpoint: {base_url}/") safe_log("info", f"💚 Health Check: {base_url}/health") uvicorn.run( fastapi_app, host="0.0.0.0", port=7860, log_config=None, access_log=False, log_level="critical" ) except Exception as e: safe_log("error", f"Failed to start TTS WebSocket server: {e}") sys.exit(1) elif service_mode == "gradio": # Gradio-only mode - Web interface only safe_log("info", "🎨 Starting TTS service in Gradio-only mode...") # Start Gradio interface with comprehensive error handling and stream protection try: safe_log("info", "✅ Gradio Interface: Starting on port 7860...") # Final attempt to patch any remaining uvicorn logging try: import gradio.networking if hasattr(gradio.networking, 'start_server'): original_start_server = gradio.networking.start_server def patched_start_server(*args, **kwargs): # Force disable uvicorn logging if 'log_config' in kwargs: kwargs['log_config'] = None if 'access_log' in kwargs: kwargs['access_log'] = False kwargs.setdefault('log_level', 'critical') return original_start_server(*args, **kwargs) gradio.networking.start_server = patched_start_server except: pass # Try multiple launch strategies with failsafe launch_success = False # Strategy 1: Primary launch with error handling try: iface.launch( server_name="0.0.0.0", server_port=7860, share=False, quiet=True, show_error=False, prevent_thread_lock=True, max_threads=4 ) launch_success = True except Exception as e1: safe_log("warning", f"Primary launch failed: {e1}") # Strategy 2: Minimal configuration try: iface.launch( server_name="0.0.0.0", server_port=7860, quiet=True ) launch_success = True except Exception as e2: safe_log("error", f"All Gradio launch strategies failed: {e2}") sys.exit(1) if not launch_success: safe_log("error", "Failed to start Gradio interface") sys.exit(1) except Exception as e: safe_log("error", f"Unexpected error starting Gradio interface: {e}") sys.exit(1) elif service_mode == "triple": # Triple mode - both Gradio, MCP, and WebSocket safe_log("info", "🚀 Starting TTS service with dual protocol support...") # Start MCP server in background thread with error handling if MCP_AVAILABLE: try: start_mcp_server_thread() safe_log("info", "✅ MCP Server: Available on stdio protocol") except Exception as e: safe_log("warning", f"⚠️ MCP Server failed to start: {e}") safe_log("info", "Continuing with Gradio-only mode...") else: safe_log("warning", "⚠️ MCP Server: Not available") # Start Gradio interface with comprehensive error handling and stream protection try: safe_log("info", "✅ Gradio Interface: Starting on port 7860...") # Final attempt to patch any remaining uvicorn logging try: import gradio.networking if hasattr(gradio.networking, 'start_server'): original_start_server = gradio.networking.start_server def patched_start_server(*args, **kwargs): # Force disable uvicorn logging if 'log_config' in kwargs: kwargs['log_config'] = None if 'access_log' in kwargs: kwargs['access_log'] = False kwargs.setdefault('log_level', 'critical') return original_start_server(*args, **kwargs) gradio.networking.start_server = patched_start_server except: pass # Try multiple launch strategies with failsafe launch_success = False # Strategy 1: Direct launch with stream protection try: iface.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=False, # Disable error display to avoid stream issues quiet=True, # Reduce Gradio logging max_threads=4, # Limit threads for ZeroGPU prevent_thread_lock=True, # Prevent threading issues show_tips=False, # Reduce output enable_monitoring=False # Disable monitoring to reduce logging ) launch_success = True except Exception as e1: safe_log("warning", f"Primary launch failed: {e1}") # Strategy 2: Minimal launch configuration try: safe_log("info", "Attempting minimal launch configuration...") iface.launch( server_name="0.0.0.0", server_port=7860, quiet=True, show_error=False ) launch_success = True except Exception as e2: safe_log("warning", f"Minimal launch failed: {e2}") # Strategy 3: Last resort - basic launch try: safe_log("info", "Attempting basic launch...") iface.launch(quiet=True) launch_success = True except Exception as e3: safe_log("error", f"All launch strategies failed: {e3}") if not launch_success: safe_log("error", "Failed to start Gradio interface with all strategies") sys.exit(1) except Exception as e: safe_log("error", f"Unexpected error starting Gradio interface: {e}") # Don't exit - try to continue running for debugging safe_log("info", "Service may still be accessible despite launch errors") else: safe_log("error", f"❌ Invalid TTS_SERVICE_MODE: {service_mode}") safe_log("info", "Valid modes: websocket (default), gradio, mcp, triple") safe_log("info", "Set environment variable: TTS_SERVICE_MODE=websocket") sys.exit(1) except Exception as e: # Ultimate safety net try: safe_log("critical", f"Critical error in main: {e}") except: # Even safe_log failed - use basic print print(f"[TTS-CRITICAL] Fatal error: {e}", flush=True) # Try to provide some debugging info before exiting try: print("[TTS-DEBUG] Python version:", sys.version, flush=True) print("[TTS-DEBUG] Current working directory:", os.getcwd(), flush=True) if torch.cuda.is_available(): print(f"[TTS-DEBUG] CUDA available: {torch.cuda.get_device_name(0)}", flush=True) else: print("[TTS-DEBUG] CUDA not available", flush=True) except: pass sys.exit(1) # Launch the TTS app optimized for ZeroGPU with dual protocol support if __name__ == "__main__": safe_main()