import os import sys import warnings from typing import List, Dict, Any, Optional # === CRITICAL: COMPLETE STREAM PROTECTION SETUP === # This must happen BEFORE any other imports that might configure logging # 1. Completely disable warnings to prevent stream conflicts warnings.filterwarnings("ignore") os.environ["PYTHONWARNINGS"] = "ignore" os.environ["TRANSFORMERS_VERBOSITY"] = "error" os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" os.environ["GRADIO_ALLOW_FLAGGING"] = "never" os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" os.environ["GRADIO_SERVER_PORT"] = "7860" # 2. Replace stdout/stderr with safe alternatives BEFORE any imports class SafeStream: """Safe stream that never raises I/O errors - MCP compatible""" def __init__(self, fallback_name): self.fallback_name = fallback_name self.closed = False # Add buffer attribute for MCP compatibility self.buffer = self def write(self, text): try: if hasattr(sys, f'__{self.fallback_name}__'): getattr(sys, f'__{self.fallback_name}__').write(text) else: # Ultimate fallback - do nothing rather than crash pass except: pass # Never raise exceptions from write return len(text) if isinstance(text, str) else 0 def flush(self): try: if hasattr(sys, f'__{self.fallback_name}__'): getattr(sys, f'__{self.fallback_name}__').flush() except: pass def isatty(self): return False # Always return False to prevent tty-related errors def fileno(self): raise OSError("fileno not supported") # Prevent fileno access def readable(self): return False # For MCP compatibility def writable(self): return True # For MCP compatibility def seekable(self): return False # For MCP compatibility # Install safe streams BEFORE any other imports sys.stdout = SafeStream('stdout') sys.stderr = SafeStream('stderr') # 3. Completely disable the logging module to prevent any stream conflicts import logging logging.disable(logging.CRITICAL) # 4. Patch uvicorn.Config to prevent it from configuring logging try: import uvicorn.config original_configure_logging = uvicorn.config.Config.configure_logging def patched_configure_logging(self): """Completely disable uvicorn logging configuration""" # Do absolutely nothing - prevent uvicorn from touching streams pass uvicorn.config.Config.configure_logging = patched_configure_logging except: pass # If uvicorn not available yet, we'll patch it later # 5. Now safe to import other modules import gradio as gr import torch import torchaudio from transformers import AutoProcessor, BarkModel import numpy as np import io import time from huggingface_hub import login import spaces # Required for ZeroGPU # Dummy GPU function to satisfy ZeroGPU startup requirements # This ensures @spaces.GPU is detected during startup even in WebSocket-only mode @spaces.GPU def _dummy_gpu_function(): """Dummy function to satisfy ZeroGPU startup detection""" return "GPU available" # Regular CPU function (no decorator needed) def _dummy_cpu_function(): """Regular CPU function for system info""" return "CPU available" # Initialize functions at module level for Spaces detection _gpu_init = _dummy_gpu_function() _cpu_init = _dummy_cpu_function() import asyncio import threading import json import base64 from datetime import datetime from fastapi import FastAPI, WebSocket, WebSocketDisconnect # 6. Additional uvicorn patching after import try: import uvicorn import uvicorn.server import uvicorn.main # Patch uvicorn.Server to disable logging if hasattr(uvicorn.server, 'Server'): original_init = uvicorn.server.Server.__init__ def patched_init(self, config): # Force disable logging in config config.log_config = None config.access_log = False config.log_level = "critical" original_init(self, config) uvicorn.server.Server.__init__ = patched_init # Patch uvicorn.run to disable logging original_run = uvicorn.run def patched_run(*args, **kwargs): kwargs['log_config'] = None kwargs['access_log'] = False kwargs['log_level'] = 'critical' return original_run(*args, **kwargs) uvicorn.run = patched_run except: pass # 7. Disable specific library loggers that cause conflicts try: for logger_name in [ 'httpx', 'gradio', 'uvicorn', 'transformers', 'torch', 'torchaudio', 'bark', 'scipy', 'asyncio', 'ffmpeg', 'uvicorn.access', 'uvicorn.error', 'gradio.routes' ]: logger = logging.getLogger(logger_name) logger.disabled = True logger.propagate = False logger.handlers = [] logger.setLevel(logging.CRITICAL + 1) except Exception: pass # Ignore any logging setup errors # 8. Also disable root logger handlers to prevent conflicts try: root_logger = logging.getLogger() root_logger.handlers = [] root_logger.disabled = True root_logger.setLevel(logging.CRITICAL + 1) except Exception: pass # Simple print-based logging to avoid all stream conflicts def safe_log(level, message): """Bulletproof logging using only print statements""" print(f"[TTS-{level.upper()}] {message}", flush=True) # MCP Server imports try: from mcp.server import Server from mcp.types import Tool, TextContent import mcp.server.stdio MCP_AVAILABLE = True except ImportError: MCP_AVAILABLE = False safe_log("warning", "MCP not available. Install with: pip install mcp>=1.0.0") # Logging completely disabled to prevent stream conflicts in ZeroGPU # MCP Server instance mcp_server = None if MCP_AVAILABLE: mcp_server = Server("tts-gpu-service") # Global variables for model processor = None model = None device = None def load_model(): """Load the TTS model - optimized for ZeroGPU""" global processor, model, device safe_log("info", "Loading TTS model for ZeroGPU...") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") safe_log("info", f"Using device: {device}") try: # Use Bark model for high-quality TTS processor = AutoProcessor.from_pretrained("suno/bark-small") model = BarkModel.from_pretrained( "suno/bark-small", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None ) # Ensure model is on the correct device if torch.cuda.is_available(): model = model.to(device) safe_log("info", f"TTS model loaded successfully on {device}!") return True except Exception as e: safe_log("error", f"Error loading model: {e}") return False @spaces.GPU # This decorator enables ZeroGPU for this function def synthesize_speech(text, voice_preset="v2/en_speaker_6"): """Synthesize speech from text - ZeroGPU accelerated""" global processor, model, device if not text.strip(): return None, "Please enter some text to synthesize." try: # Load model if not already loaded if model is None: success = load_model() if not success: return None, "Error: Could not load TTS model." safe_log("info", f"Synthesizing with ZeroGPU: {text[:50]}...") start_time = time.time() # Process text with voice preset - ensure return_tensors='pt' inputs = processor(text, voice_preset=voice_preset, return_tensors="pt") # Generate audio with ZeroGPU acceleration with torch.no_grad(): # Ensure all inputs are on the correct device if torch.cuda.is_available() and device.type == 'cuda': # Move all tensor inputs to GPU recursively def move_to_device(obj, target_device): if isinstance(obj, torch.Tensor): return obj.to(target_device) elif isinstance(obj, dict): return {k: move_to_device(v, target_device) for k, v in obj.items()} elif isinstance(obj, list): return [move_to_device(item, target_device) for item in obj] elif isinstance(obj, tuple): return tuple(move_to_device(item, target_device) for item in obj) else: return obj inputs = move_to_device(inputs, device) # Also ensure model is on correct device model = model.to(device) # Debug: log device info safe_log("info", f"Model device: {next(model.parameters()).device}") for k, v in inputs.items(): if isinstance(v, torch.Tensor): safe_log("info", f"Input {k} device: {v.device}") # Generate without mixed precision first to isolate the issue try: audio_array = model.generate(**inputs) except Exception as e: safe_log("error", f"Generation failed: {e}") # Try with CPU fallback safe_log("info", "Attempting CPU fallback...") model = model.cpu() inputs = move_to_device(inputs, torch.device('cpu')) audio_array = model.generate(**inputs) # Convert to numpy and ensure it's on CPU with correct dtype if torch.cuda.is_available(): audio_array = audio_array.cpu() # Convert to float32 for torchaudio compatibility if audio_array.dtype == torch.float16: audio_array = audio_array.float() audio_array = audio_array.numpy().squeeze() # Get sample rate sample_rate = model.generation_config.sample_rate # Save to temporary file for Gradio - ensure float32 tensor output_path = "temp_audio.wav" audio_tensor = torch.from_numpy(audio_array).unsqueeze(0).float() torchaudio.save(output_path, audio_tensor, sample_rate) generation_time = time.time() - start_time gpu_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU" status_message = f"✅ Generated in {generation_time:.2f}s on {gpu_name} (ZeroGPU)" return output_path, status_message except Exception as e: error_msg = f"❌ Error during synthesis: {str(e)}" safe_log("error", error_msg) return None, error_msg @spaces.GPU # ZeroGPU for batch processing def batch_synthesize(text_list, voice_preset="v2/en_speaker_6"): """Batch synthesis with ZeroGPU optimization""" results = [] start_time = time.time() for i, text in enumerate(text_list): if text.strip(): audio_path, status = synthesize_speech(text, voice_preset) results.append((audio_path, f"Item {i+1}: {status}")) else: results.append((None, f"Item {i+1}: Empty text skipped")) total_time = time.time() - start_time batch_status = f"🚀 Batch completed: {len(text_list)} items in {total_time:.2f}s" return results, batch_status def get_system_info(): """Get system information including ZeroGPU details""" info = { "🚀 ZeroGPU": "Active" if torch.cuda.is_available() else "Not Available", "🎯 GPU Name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU Only", "💾 GPU Memory": f"{torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB" if torch.cuda.is_available() else "N/A", "⚡ CUDA Version": torch.version.cuda if torch.cuda.is_available() else "N/A", "🔧 PyTorch": torch.__version__, "🤖 Model Status": "✅ Loaded" if model is not None else "💤 Lazy Loading (ZeroGPU optimized)", "🎛️ Mixed Precision": "✅ Enabled" if torch.cuda.is_available() else "❌ CPU Mode", "🔌 MCP Server": "✅ Available" if MCP_AVAILABLE else "❌ Not Available", "🌐 WebSocket TTS": "✅ Ready" if model is not None else "💤 Ready (lazy loading)" } return "\n".join([f"{k}: {v}" for k, v in info.items()]) # WebSocket TTS Handler for Real-time Text-to-Speech class WebSocketTTSHandler: """WebSocket handler for real-time TTS integration with ChatCal WebRTC""" def __init__(self): self.active_connections = {} async def connect(self, websocket: WebSocket, client_id: str): """Accept WebSocket connection for TTS service""" await websocket.accept() self.active_connections[client_id] = websocket safe_log("info", f"🔌 TTS WebSocket client {client_id} connected") # Send connection confirmation with service info await self.send_message(client_id, { "type": "tts_connection_confirmed", "client_id": client_id, "timestamp": datetime.now().isoformat(), "service": "tts-gpu-service", "model_status": "✅ Loaded" if model is not None else "⏳ Loading", "zerogpu_status": "✅ Active" if torch.cuda.is_available() else "❌ Not Available", "available_voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"] }) async def disconnect(self, client_id: str): """Clean up connection""" if client_id in self.active_connections: del self.active_connections[client_id] safe_log("info", f"🔌 TTS WebSocket client {client_id} disconnected") async def send_message(self, client_id: str, message: dict): """Send JSON message to client""" if client_id in self.active_connections: websocket = self.active_connections[client_id] try: await websocket.send_text(json.dumps(message)) except Exception as e: safe_log("error", f"Failed to send message to TTS client {client_id}: {e}") await self.disconnect(client_id) async def handle_streaming_text_synthesis(self, client_id: str, text_chunks: list, voice_preset: str = "v2/en_speaker_6", is_final: bool = True): """Process streaming text synthesis following unmute.sh methodology""" try: # UNMUTE.SH METHODOLOGY: Process text chunks in streaming fashion safe_log("info", f"🔊 TTS STREAMING: Processing {len(text_chunks)} chunks from {client_id} (final={is_final})") if is_final: # FLUSH TRICK: Process all accumulated text at once for best quality complete_text = " ".join(text_chunks).strip() if complete_text: safe_log("info", f"🔊 TTS FLUSH: Final synthesis for {client_id}: {complete_text[:50]}...") # Use the existing ZeroGPU synthesize_speech function audio_path, status = synthesize_speech(complete_text, voice_preset) if audio_path and "✅" in status: # Read the generated audio file with open(audio_path, 'rb') as audio_file: audio_data = audio_file.read() # Encode audio as base64 for WebSocket transmission audio_b64 = base64.b64encode(audio_data).decode('utf-8') # Send successful synthesis with streaming metadata await self.send_message(client_id, { "type": "tts_streaming_response", "audio_data": audio_b64, "audio_format": "wav", "text": complete_text, "text_chunks": text_chunks, "voice_preset": voice_preset, "timestamp": datetime.now().isoformat(), "audio_size": len(audio_data), "status": status, "is_final": is_final, "streaming_method": "unmute.sh_flush_trick" }) safe_log("info", f"🔊 TTS STREAMING: Final audio sent to {client_id} ({len(audio_data)} bytes)") # Clean up temporary file import os try: os.unlink(audio_path) except: pass else: # Send error message await self.send_message(client_id, { "type": "tts_streaming_error", "message": f"TTS streaming synthesis failed: {status}", "text": complete_text, "is_final": is_final, "timestamp": datetime.now().isoformat() }) else: # Empty final flush safe_log("info", f"🔊 TTS FLUSH: Empty final text for {client_id}") else: # STREAMING: Send partial progress update (no audio yet) await self.send_message(client_id, { "type": "tts_streaming_progress", "message": f"Buffering text chunks: {len(text_chunks)}", "text_chunks": text_chunks[-3:], # Show last 3 chunks for progress "is_final": is_final, "timestamp": datetime.now().isoformat() }) safe_log("info", f"🔊 TTS STREAMING: Progress update sent to {client_id} ({len(text_chunks)} chunks)") except Exception as e: safe_log("error", f"TTS streaming error for {client_id}: {e}") await self.send_message(client_id, { "type": "tts_streaming_error", "message": f"TTS streaming error: {str(e)}", "is_final": is_final, "timestamp": datetime.now().isoformat() }) async def handle_text_synthesis(self, client_id: str, text: str, voice_preset: str = "v2/en_speaker_6"): """Process text synthesis with real TTS service (legacy single-shot method)""" try: safe_log("info", f"🔊 TTS: Processing text from {client_id}: {text[:50]}...") # Use streaming method with single chunk for consistency await self.handle_streaming_text_synthesis(client_id, [text], voice_preset, is_final=True) except Exception as e: safe_log("error", f"TTS WebSocket error for {client_id}: {e}") await self.send_message(client_id, { "type": "tts_error", "message": f"TTS processing error: {str(e)}", "timestamp": datetime.now().isoformat() }) async def handle_message(self, client_id: str, message_data: dict): """Handle different types of WebSocket messages""" message_type = message_data.get("type") if message_type == "tts_synthesize": # Text-to-speech synthesis request (legacy single-shot) text = message_data.get("text", "") voice_preset = message_data.get("voice_preset", "v2/en_speaker_6") if text.strip(): await self.handle_text_synthesis(client_id, text, voice_preset) else: await self.send_message(client_id, { "type": "tts_error", "message": "Empty text provided for synthesis", "timestamp": datetime.now().isoformat() }) elif message_type == "tts_streaming_synthesize": # Streaming text-to-speech synthesis request (unmute.sh methodology) text_chunks = message_data.get("text_chunks", []) voice_preset = message_data.get("voice_preset", "v2/en_speaker_6") is_final = message_data.get("is_final", True) if text_chunks: await self.handle_streaming_text_synthesis(client_id, text_chunks, voice_preset, is_final) else: await self.send_message(client_id, { "type": "tts_streaming_error", "message": "Empty text chunks provided for streaming synthesis", "is_final": is_final, "timestamp": datetime.now().isoformat() }) elif message_type == "tts_get_voices": # Request available voice presets await self.send_message(client_id, { "type": "tts_voices_list", "voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"], "timestamp": datetime.now().isoformat() }) elif message_type == "tts_get_streaming_info": # Request streaming capabilities info await self.send_message(client_id, { "type": "tts_streaming_info", "streaming_supported": True, "methodology": "unmute.sh with flush trick", "message_types": { "tts_streaming_synthesize": "Send text chunks for streaming processing", "tts_streaming_response": "Receive final audio with streaming metadata", "tts_streaming_progress": "Receive progress updates during buffering", "tts_streaming_error": "Receive streaming-specific error messages" }, "flush_trick": "Set is_final=true to trigger synthesis of all buffered chunks", "timestamp": datetime.now().isoformat() }) else: safe_log("warning", f"Unknown TTS message type from {client_id}: {message_type}") # Global TTS WebSocket handler tts_websocket_handler = WebSocketTTSHandler() # FastAPI WebSocket Integration for TTS Service def create_tts_fastapi_app(): """Create FastAPI app with TTS WebSocket endpoint""" import uuid app = FastAPI( title="TTS GPU Service WebSocket API", description="Real-time Text-to-Speech with ZeroGPU acceleration", version="1.0.0" ) @app.websocket("/ws/tts") async def tts_websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time TTS""" client_id = str(uuid.uuid4()) try: await tts_websocket_handler.connect(websocket, client_id) while True: # Receive JSON message from client data = await websocket.receive_text() try: message = json.loads(data) await tts_websocket_handler.handle_message(client_id, message) except json.JSONDecodeError as e: await tts_websocket_handler.send_message(client_id, { "type": "tts_error", "message": f"Invalid JSON format: {str(e)}", "timestamp": datetime.now().isoformat() }) except WebSocketDisconnect: await tts_websocket_handler.disconnect(client_id) except Exception as e: safe_log("error", f"TTS WebSocket endpoint error: {e}") await tts_websocket_handler.disconnect(client_id) @app.websocket("/ws/tts/{client_id}") async def tts_websocket_with_id(websocket: WebSocket, client_id: str): """WebSocket endpoint with specific client ID""" try: await tts_websocket_handler.connect(websocket, client_id) while True: data = await websocket.receive_text() try: message = json.loads(data) await tts_websocket_handler.handle_message(client_id, message) except json.JSONDecodeError as e: await tts_websocket_handler.send_message(client_id, { "type": "tts_error", "message": f"Invalid JSON format: {str(e)}", "timestamp": datetime.now().isoformat() }) except WebSocketDisconnect: await tts_websocket_handler.disconnect(client_id) except Exception as e: safe_log("error", f"TTS WebSocket endpoint error: {e}") await tts_websocket_handler.disconnect(client_id) @app.get("/") async def tts_root(): """TTS service status endpoint""" return { "service": "tts-gpu-service", "status": "✅ Ready" if model is not None else "⏳ Loading", "zerogpu": "✅ Active" if torch.cuda.is_available() else "❌ Not Available", "websocket_endpoints": ["/ws/tts", "/ws/tts/{client_id}"], "available_voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"], "model": "suno/bark" } @app.get("/health") async def tts_health(): """Health check endpoint with detailed status""" return { "status": "healthy", "model_loaded": model is not None, "gpu_available": torch.cuda.is_available(), "loading_strategy": "lazy (ZeroGPU optimized)", "note": "Model loads on first synthesis request to optimize GPU usage" } async def preload_model_logic(): """Shared logic for model preloading""" global model if model is not None: return {"status": "success", "message": "Model already loaded", "model_loaded": True} try: success = load_model() if success: return {"status": "success", "message": "Model preloaded successfully", "model_loaded": True} else: return {"status": "error", "message": "Failed to preload model", "model_loaded": False} except Exception as e: return {"status": "error", "message": f"Preload error: {str(e)}", "model_loaded": False} @app.get("/preload") async def preload_model_get(): """Preload the TTS model via GET (browser-friendly)""" return await preload_model_logic() @app.post("/preload") async def preload_model_post(): """Preload the TTS model via POST (API-friendly)""" return await preload_model_logic() return app # MCP Tool Definitions and Handlers if MCP_AVAILABLE: @mcp_server.list_tools() async def handle_list_tools() -> List[Tool]: """List available MCP tools for TTS service""" return [ Tool( name="tts_synthesize", description="Synthesize speech from text using Bark TTS model with ZeroGPU acceleration", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "Text to convert to speech" }, "voice_preset": { "type": "string", "description": "Voice preset identifier (e.g., 'v2/en_speaker_6', 'v2/en_speaker_1')", "default": "v2/en_speaker_6" } }, "required": ["text"] } ), Tool( name="tts_batch_synthesize", description="Synthesize speech from multiple texts in batch with ZeroGPU optimization", inputSchema={ "type": "object", "properties": { "text_list": { "type": "array", "items": {"type": "string"}, "description": "List of texts to convert to speech" }, "voice_preset": { "type": "string", "description": "Voice preset for all texts", "default": "v2/en_speaker_6" } }, "required": ["text_list"] } ), Tool( name="tts_get_info", description="Get system information including ZeroGPU status and TTS service capabilities", inputSchema={ "type": "object", "properties": {}, "required": [] } ) ] @mcp_server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle MCP tool calls for TTS operations""" try: if name == "tts_synthesize": text = arguments.get("text", "") voice_preset = arguments.get("voice_preset", "v2/en_speaker_6") if not text.strip(): return [TextContent( type="text", text=json.dumps({ "error": "No text provided for synthesis", "status": "❌ Empty text", "audio_file": None }) )] # Use the existing synthesize_speech function audio_path, status = synthesize_speech(text, voice_preset) result = { "status": status, "audio_file": audio_path, "text": text, "voice_preset": voice_preset, "success": audio_path is not None } return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "tts_batch_synthesize": text_list = arguments.get("text_list", []) voice_preset = arguments.get("voice_preset", "v2/en_speaker_6") if not text_list: return [TextContent( type="text", text=json.dumps({ "error": "No texts provided for batch synthesis", "status": "❌ Empty list", "results": [] }) )] # Use the existing batch_synthesize function results, batch_status = batch_synthesize(text_list, voice_preset) # Format results for MCP response formatted_results = [] for i, (audio_path, status) in enumerate(results): formatted_results.append({ "index": i, "text": text_list[i] if i < len(text_list) else "", "audio_file": audio_path, "status": status, "success": audio_path is not None }) result = { "batch_status": batch_status, "results": formatted_results, "total_items": len(text_list), "voice_preset": voice_preset } return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "tts_get_info": # Use the existing get_system_info function system_info = get_system_info() # Also include MCP-specific information info_dict = { "system_info": system_info, "mcp_status": "✅ MCP Server Active", "available_tools": ["tts_synthesize", "tts_batch_synthesize", "tts_get_info"], "voice_presets": [ {"code": code, "description": desc} for code, desc in VOICE_PRESETS ], "service_endpoints": { "gradio_interface": "http://localhost:7860", "mcp_protocol": "stdio" }, "model_info": { "name": "suno/bark-small", "type": "Text-to-Speech", "accelerated": "ZeroGPU" } } return [TextContent( type="text", text=json.dumps(info_dict, indent=2) )] else: return [TextContent( type="text", text=json.dumps({ "error": f"Unknown tool: {name}", "available_tools": ["tts_synthesize", "tts_batch_synthesize", "tts_get_info"] }) )] except Exception as e: safe_log("error", f"Error in MCP tool '{name}': {str(e)}") return [TextContent( type="text", text=json.dumps({ "error": f"Tool execution failed: {str(e)}", "tool": name, "arguments": arguments }) )] async def run_mcp_server(): """Run the MCP server in stdio mode with temporary stream restoration""" safe_log("info", "🔌 Starting MCP Server for TTS service...") try: # Temporarily restore original streams for MCP original_stdin = sys.stdin original_stdout = sys.stdout original_stderr = sys.stderr # Restore original streams for MCP operation if hasattr(sys, '__stdin__'): sys.stdin = sys.__stdin__ if hasattr(sys, '__stdout__'): sys.stdout = sys.__stdout__ if hasattr(sys, '__stderr__'): sys.stderr = sys.__stderr__ async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await mcp_server.run( read_stream, write_stream, mcp_server.create_initialization_options() ) except Exception as e: safe_log("error", f"MCP Server failed to start: {e}") # Don't crash the whole service if MCP fails return finally: # Always restore safe streams after MCP operation try: sys.stdin = original_stdin sys.stdout = original_stdout sys.stderr = original_stderr except: pass def start_mcp_server_thread(): """Start MCP server in a separate thread""" if MCP_AVAILABLE: def run_mcp(): try: asyncio.run(run_mcp_server()) except Exception as e: safe_log("error", f"MCP Server error: {e}") mcp_thread = threading.Thread(target=run_mcp, daemon=True) mcp_thread.start() safe_log("info", "🔌 MCP Server thread started successfully") else: safe_log("warning", "⚠️ MCP not available - only Gradio interface will be active") # Voice preset options with better descriptions VOICE_PRESETS = [ ("v2/en_speaker_0", "🗣️ Speaker 0 - Professional Male"), ("v2/en_speaker_1", "👩 Speaker 1 - Young Female"), ("v2/en_speaker_2", "👨 Speaker 2 - Mature Male"), ("v2/en_speaker_3", "🎭 Speaker 3 - Expressive Female"), ("v2/en_speaker_4", "📻 Speaker 4 - Radio Voice Male"), ("v2/en_speaker_5", "🎪 Speaker 5 - Animated Female"), ("v2/en_speaker_6", "🎯 Speaker 6 - Clear Male (Default)"), ("v2/en_speaker_7", "🌟 Speaker 7 - Warm Female"), ("v2/en_speaker_8", "🎬 Speaker 8 - Narrator Male"), ("v2/en_speaker_9", "✨ Speaker 9 - Elegant Female") ] # Create enhanced Gradio interface for ZeroGPU with gr.Blocks( title="🚀 ZeroGPU TTS Service", theme=gr.themes.Soft(), css=""" .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); } .main-header { text-align: center; background: rgba(255,255,255,0.1); padding: 20px; border-radius: 10px; margin-bottom: 20px; } """ ) as iface: with gr.Row(): gr.Markdown("""