Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import warnings | |
| from typing import List, Dict, Any, Optional | |
| # === CRITICAL: COMPLETE STREAM PROTECTION SETUP === | |
| # This must happen BEFORE any other imports that might configure logging | |
| # 1. Completely disable warnings to prevent stream conflicts | |
| warnings.filterwarnings("ignore") | |
| os.environ["PYTHONWARNINGS"] = "ignore" | |
| os.environ["TRANSFORMERS_VERBOSITY"] = "error" | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" | |
| os.environ["GRADIO_ALLOW_FLAGGING"] = "never" | |
| os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" | |
| os.environ["GRADIO_SERVER_PORT"] = "7860" | |
| # 2. Replace stdout/stderr with safe alternatives BEFORE any imports | |
| class SafeStream: | |
| """Safe stream that never raises I/O errors - MCP compatible""" | |
| def __init__(self, fallback_name): | |
| self.fallback_name = fallback_name | |
| self.closed = False | |
| # Add buffer attribute for MCP compatibility | |
| self.buffer = self | |
| def write(self, text): | |
| try: | |
| if hasattr(sys, f'__{self.fallback_name}__'): | |
| getattr(sys, f'__{self.fallback_name}__').write(text) | |
| else: | |
| # Ultimate fallback - do nothing rather than crash | |
| pass | |
| except: | |
| pass # Never raise exceptions from write | |
| return len(text) if isinstance(text, str) else 0 | |
| def flush(self): | |
| try: | |
| if hasattr(sys, f'__{self.fallback_name}__'): | |
| getattr(sys, f'__{self.fallback_name}__').flush() | |
| except: | |
| pass | |
| def isatty(self): | |
| return False # Always return False to prevent tty-related errors | |
| def fileno(self): | |
| raise OSError("fileno not supported") # Prevent fileno access | |
| def readable(self): | |
| return False # For MCP compatibility | |
| def writable(self): | |
| return True # For MCP compatibility | |
| def seekable(self): | |
| return False # For MCP compatibility | |
| # Install safe streams BEFORE any other imports | |
| sys.stdout = SafeStream('stdout') | |
| sys.stderr = SafeStream('stderr') | |
| # 3. Completely disable the logging module to prevent any stream conflicts | |
| import logging | |
| logging.disable(logging.CRITICAL) | |
| # 4. Patch uvicorn.Config to prevent it from configuring logging | |
| try: | |
| import uvicorn.config | |
| original_configure_logging = uvicorn.config.Config.configure_logging | |
| def patched_configure_logging(self): | |
| """Completely disable uvicorn logging configuration""" | |
| # Do absolutely nothing - prevent uvicorn from touching streams | |
| pass | |
| uvicorn.config.Config.configure_logging = patched_configure_logging | |
| except: | |
| pass # If uvicorn not available yet, we'll patch it later | |
| # 5. Now safe to import other modules | |
| import gradio as gr | |
| import torch | |
| import torchaudio | |
| from transformers import AutoProcessor, BarkModel | |
| import numpy as np | |
| import io | |
| import time | |
| from huggingface_hub import login | |
| import spaces # Required for ZeroGPU | |
| # Dummy GPU function to satisfy ZeroGPU startup requirements | |
| # This ensures @spaces.GPU is detected during startup even in WebSocket-only mode | |
| def _dummy_gpu_function(): | |
| """Dummy function to satisfy ZeroGPU startup detection""" | |
| return "GPU available" | |
| # Regular CPU function (no decorator needed) | |
| def _dummy_cpu_function(): | |
| """Regular CPU function for system info""" | |
| return "CPU available" | |
| # Initialize functions at module level for Spaces detection | |
| _gpu_init = _dummy_gpu_function() | |
| _cpu_init = _dummy_cpu_function() | |
| import asyncio | |
| import threading | |
| import json | |
| import base64 | |
| from datetime import datetime | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| # 6. Additional uvicorn patching after import | |
| try: | |
| import uvicorn | |
| import uvicorn.server | |
| import uvicorn.main | |
| # Patch uvicorn.Server to disable logging | |
| if hasattr(uvicorn.server, 'Server'): | |
| original_init = uvicorn.server.Server.__init__ | |
| def patched_init(self, config): | |
| # Force disable logging in config | |
| config.log_config = None | |
| config.access_log = False | |
| config.log_level = "critical" | |
| original_init(self, config) | |
| uvicorn.server.Server.__init__ = patched_init | |
| # Patch uvicorn.run to disable logging | |
| original_run = uvicorn.run | |
| def patched_run(*args, **kwargs): | |
| kwargs['log_config'] = None | |
| kwargs['access_log'] = False | |
| kwargs['log_level'] = 'critical' | |
| return original_run(*args, **kwargs) | |
| uvicorn.run = patched_run | |
| except: | |
| pass | |
| # 7. Disable specific library loggers that cause conflicts | |
| try: | |
| for logger_name in [ | |
| 'httpx', 'gradio', 'uvicorn', 'transformers', 'torch', | |
| 'torchaudio', 'bark', 'scipy', 'asyncio', 'ffmpeg', | |
| 'uvicorn.access', 'uvicorn.error', 'gradio.routes' | |
| ]: | |
| logger = logging.getLogger(logger_name) | |
| logger.disabled = True | |
| logger.propagate = False | |
| logger.handlers = [] | |
| logger.setLevel(logging.CRITICAL + 1) | |
| except Exception: | |
| pass # Ignore any logging setup errors | |
| # 8. Also disable root logger handlers to prevent conflicts | |
| try: | |
| root_logger = logging.getLogger() | |
| root_logger.handlers = [] | |
| root_logger.disabled = True | |
| root_logger.setLevel(logging.CRITICAL + 1) | |
| except Exception: | |
| pass | |
| # Simple print-based logging to avoid all stream conflicts | |
| def safe_log(level, message): | |
| """Bulletproof logging using only print statements""" | |
| print(f"[TTS-{level.upper()}] {message}", flush=True) | |
| # MCP Server imports | |
| try: | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent | |
| import mcp.server.stdio | |
| MCP_AVAILABLE = True | |
| except ImportError: | |
| MCP_AVAILABLE = False | |
| safe_log("warning", "MCP not available. Install with: pip install mcp>=1.0.0") | |
| # Logging completely disabled to prevent stream conflicts in ZeroGPU | |
| # MCP Server instance | |
| mcp_server = None | |
| if MCP_AVAILABLE: | |
| mcp_server = Server("tts-gpu-service") | |
| # Global variables for model | |
| processor = None | |
| model = None | |
| device = None | |
| def load_model(): | |
| """Load the TTS model - optimized for ZeroGPU""" | |
| global processor, model, device | |
| safe_log("info", "Loading TTS model for ZeroGPU...") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| safe_log("info", f"Using device: {device}") | |
| try: | |
| # Use Bark model for high-quality TTS | |
| processor = AutoProcessor.from_pretrained("suno/bark-small") | |
| model = BarkModel.from_pretrained( | |
| "suno/bark-small", | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device_map="auto" if torch.cuda.is_available() else None | |
| ) | |
| # Ensure model is on the correct device | |
| if torch.cuda.is_available(): | |
| model = model.to(device) | |
| safe_log("info", f"TTS model loaded successfully on {device}!") | |
| return True | |
| except Exception as e: | |
| safe_log("error", f"Error loading model: {e}") | |
| return False | |
| # This decorator enables ZeroGPU for this function | |
| def synthesize_speech(text, voice_preset="v2/en_speaker_6"): | |
| """Synthesize speech from text - ZeroGPU accelerated""" | |
| global processor, model, device | |
| if not text.strip(): | |
| return None, "Please enter some text to synthesize." | |
| try: | |
| # Load model if not already loaded | |
| if model is None: | |
| success = load_model() | |
| if not success: | |
| return None, "Error: Could not load TTS model." | |
| safe_log("info", f"Synthesizing with ZeroGPU: {text[:50]}...") | |
| start_time = time.time() | |
| # Process text with voice preset - ensure return_tensors='pt' | |
| inputs = processor(text, voice_preset=voice_preset, return_tensors="pt") | |
| # Generate audio with ZeroGPU acceleration | |
| with torch.no_grad(): | |
| # Ensure all inputs are on the correct device | |
| if torch.cuda.is_available() and device.type == 'cuda': | |
| # Move all tensor inputs to GPU recursively | |
| def move_to_device(obj, target_device): | |
| if isinstance(obj, torch.Tensor): | |
| return obj.to(target_device) | |
| elif isinstance(obj, dict): | |
| return {k: move_to_device(v, target_device) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [move_to_device(item, target_device) for item in obj] | |
| elif isinstance(obj, tuple): | |
| return tuple(move_to_device(item, target_device) for item in obj) | |
| else: | |
| return obj | |
| inputs = move_to_device(inputs, device) | |
| # Also ensure model is on correct device | |
| model = model.to(device) | |
| # Debug: log device info | |
| safe_log("info", f"Model device: {next(model.parameters()).device}") | |
| for k, v in inputs.items(): | |
| if isinstance(v, torch.Tensor): | |
| safe_log("info", f"Input {k} device: {v.device}") | |
| # Generate without mixed precision first to isolate the issue | |
| try: | |
| audio_array = model.generate(**inputs) | |
| except Exception as e: | |
| safe_log("error", f"Generation failed: {e}") | |
| # Try with CPU fallback | |
| safe_log("info", "Attempting CPU fallback...") | |
| model = model.cpu() | |
| inputs = move_to_device(inputs, torch.device('cpu')) | |
| audio_array = model.generate(**inputs) | |
| # Convert to numpy and ensure it's on CPU with correct dtype | |
| if torch.cuda.is_available(): | |
| audio_array = audio_array.cpu() | |
| # Convert to float32 for torchaudio compatibility | |
| if audio_array.dtype == torch.float16: | |
| audio_array = audio_array.float() | |
| audio_array = audio_array.numpy().squeeze() | |
| # Get sample rate | |
| sample_rate = model.generation_config.sample_rate | |
| # Save to temporary file for Gradio - ensure float32 tensor | |
| output_path = "temp_audio.wav" | |
| audio_tensor = torch.from_numpy(audio_array).unsqueeze(0).float() | |
| torchaudio.save(output_path, audio_tensor, sample_rate) | |
| generation_time = time.time() - start_time | |
| gpu_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU" | |
| status_message = f"✅ Generated in {generation_time:.2f}s on {gpu_name} (ZeroGPU)" | |
| return output_path, status_message | |
| except Exception as e: | |
| error_msg = f"❌ Error during synthesis: {str(e)}" | |
| safe_log("error", error_msg) | |
| return None, error_msg | |
| # ZeroGPU for batch processing | |
| def batch_synthesize(text_list, voice_preset="v2/en_speaker_6"): | |
| """Batch synthesis with ZeroGPU optimization""" | |
| results = [] | |
| start_time = time.time() | |
| for i, text in enumerate(text_list): | |
| if text.strip(): | |
| audio_path, status = synthesize_speech(text, voice_preset) | |
| results.append((audio_path, f"Item {i+1}: {status}")) | |
| else: | |
| results.append((None, f"Item {i+1}: Empty text skipped")) | |
| total_time = time.time() - start_time | |
| batch_status = f"🚀 Batch completed: {len(text_list)} items in {total_time:.2f}s" | |
| return results, batch_status | |
| def get_system_info(): | |
| """Get system information including ZeroGPU details""" | |
| info = { | |
| "🚀 ZeroGPU": "Active" if torch.cuda.is_available() else "Not Available", | |
| "🎯 GPU Name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU Only", | |
| "💾 GPU Memory": f"{torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB" if torch.cuda.is_available() else "N/A", | |
| "⚡ CUDA Version": torch.version.cuda if torch.cuda.is_available() else "N/A", | |
| "🔧 PyTorch": torch.__version__, | |
| "🤖 Model Status": "✅ Loaded" if model is not None else "💤 Lazy Loading (ZeroGPU optimized)", | |
| "🎛️ Mixed Precision": "✅ Enabled" if torch.cuda.is_available() else "❌ CPU Mode", | |
| "🔌 MCP Server": "✅ Available" if MCP_AVAILABLE else "❌ Not Available", | |
| "🌐 WebSocket TTS": "✅ Ready" if model is not None else "💤 Ready (lazy loading)" | |
| } | |
| return "\n".join([f"{k}: {v}" for k, v in info.items()]) | |
| # WebSocket TTS Handler for Real-time Text-to-Speech | |
| class WebSocketTTSHandler: | |
| """WebSocket handler for real-time TTS integration with ChatCal WebRTC""" | |
| def __init__(self): | |
| self.active_connections = {} | |
| async def connect(self, websocket: WebSocket, client_id: str): | |
| """Accept WebSocket connection for TTS service""" | |
| await websocket.accept() | |
| self.active_connections[client_id] = websocket | |
| safe_log("info", f"🔌 TTS WebSocket client {client_id} connected") | |
| # Send connection confirmation with service info | |
| await self.send_message(client_id, { | |
| "type": "tts_connection_confirmed", | |
| "client_id": client_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "service": "tts-gpu-service", | |
| "model_status": "✅ Loaded" if model is not None else "⏳ Loading", | |
| "zerogpu_status": "✅ Active" if torch.cuda.is_available() else "❌ Not Available", | |
| "available_voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"] | |
| }) | |
| async def disconnect(self, client_id: str): | |
| """Clean up connection""" | |
| if client_id in self.active_connections: | |
| del self.active_connections[client_id] | |
| safe_log("info", f"🔌 TTS WebSocket client {client_id} disconnected") | |
| async def send_message(self, client_id: str, message: dict): | |
| """Send JSON message to client""" | |
| if client_id in self.active_connections: | |
| websocket = self.active_connections[client_id] | |
| try: | |
| await websocket.send_text(json.dumps(message)) | |
| except Exception as e: | |
| safe_log("error", f"Failed to send message to TTS client {client_id}: {e}") | |
| await self.disconnect(client_id) | |
| async def handle_streaming_text_synthesis(self, client_id: str, text_chunks: list, voice_preset: str = "v2/en_speaker_6", is_final: bool = True): | |
| """Process streaming text synthesis following unmute.sh methodology""" | |
| try: | |
| # UNMUTE.SH METHODOLOGY: Process text chunks in streaming fashion | |
| safe_log("info", f"🔊 TTS STREAMING: Processing {len(text_chunks)} chunks from {client_id} (final={is_final})") | |
| if is_final: | |
| # FLUSH TRICK: Process all accumulated text at once for best quality | |
| complete_text = " ".join(text_chunks).strip() | |
| if complete_text: | |
| safe_log("info", f"🔊 TTS FLUSH: Final synthesis for {client_id}: {complete_text[:50]}...") | |
| # Use the existing ZeroGPU synthesize_speech function | |
| audio_path, status = synthesize_speech(complete_text, voice_preset) | |
| if audio_path and "✅" in status: | |
| # Read the generated audio file | |
| with open(audio_path, 'rb') as audio_file: | |
| audio_data = audio_file.read() | |
| # Encode audio as base64 for WebSocket transmission | |
| audio_b64 = base64.b64encode(audio_data).decode('utf-8') | |
| # Send successful synthesis with streaming metadata | |
| await self.send_message(client_id, { | |
| "type": "tts_streaming_response", | |
| "audio_data": audio_b64, | |
| "audio_format": "wav", | |
| "text": complete_text, | |
| "text_chunks": text_chunks, | |
| "voice_preset": voice_preset, | |
| "timestamp": datetime.now().isoformat(), | |
| "audio_size": len(audio_data), | |
| "status": status, | |
| "is_final": is_final, | |
| "streaming_method": "unmute.sh_flush_trick" | |
| }) | |
| safe_log("info", f"🔊 TTS STREAMING: Final audio sent to {client_id} ({len(audio_data)} bytes)") | |
| # Clean up temporary file | |
| import os | |
| try: | |
| os.unlink(audio_path) | |
| except: | |
| pass | |
| else: | |
| # Send error message | |
| await self.send_message(client_id, { | |
| "type": "tts_streaming_error", | |
| "message": f"TTS streaming synthesis failed: {status}", | |
| "text": complete_text, | |
| "is_final": is_final, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| else: | |
| # Empty final flush | |
| safe_log("info", f"🔊 TTS FLUSH: Empty final text for {client_id}") | |
| else: | |
| # STREAMING: Send partial progress update (no audio yet) | |
| await self.send_message(client_id, { | |
| "type": "tts_streaming_progress", | |
| "message": f"Buffering text chunks: {len(text_chunks)}", | |
| "text_chunks": text_chunks[-3:], # Show last 3 chunks for progress | |
| "is_final": is_final, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| safe_log("info", f"🔊 TTS STREAMING: Progress update sent to {client_id} ({len(text_chunks)} chunks)") | |
| except Exception as e: | |
| safe_log("error", f"TTS streaming error for {client_id}: {e}") | |
| await self.send_message(client_id, { | |
| "type": "tts_streaming_error", | |
| "message": f"TTS streaming error: {str(e)}", | |
| "is_final": is_final, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| async def handle_text_synthesis(self, client_id: str, text: str, voice_preset: str = "v2/en_speaker_6"): | |
| """Process text synthesis with real TTS service (legacy single-shot method)""" | |
| try: | |
| safe_log("info", f"🔊 TTS: Processing text from {client_id}: {text[:50]}...") | |
| # Use streaming method with single chunk for consistency | |
| await self.handle_streaming_text_synthesis(client_id, [text], voice_preset, is_final=True) | |
| except Exception as e: | |
| safe_log("error", f"TTS WebSocket error for {client_id}: {e}") | |
| await self.send_message(client_id, { | |
| "type": "tts_error", | |
| "message": f"TTS processing error: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| async def handle_message(self, client_id: str, message_data: dict): | |
| """Handle different types of WebSocket messages""" | |
| message_type = message_data.get("type") | |
| if message_type == "tts_synthesize": | |
| # Text-to-speech synthesis request (legacy single-shot) | |
| text = message_data.get("text", "") | |
| voice_preset = message_data.get("voice_preset", "v2/en_speaker_6") | |
| if text.strip(): | |
| await self.handle_text_synthesis(client_id, text, voice_preset) | |
| else: | |
| await self.send_message(client_id, { | |
| "type": "tts_error", | |
| "message": "Empty text provided for synthesis", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| elif message_type == "tts_streaming_synthesize": | |
| # Streaming text-to-speech synthesis request (unmute.sh methodology) | |
| text_chunks = message_data.get("text_chunks", []) | |
| voice_preset = message_data.get("voice_preset", "v2/en_speaker_6") | |
| is_final = message_data.get("is_final", True) | |
| if text_chunks: | |
| await self.handle_streaming_text_synthesis(client_id, text_chunks, voice_preset, is_final) | |
| else: | |
| await self.send_message(client_id, { | |
| "type": "tts_streaming_error", | |
| "message": "Empty text chunks provided for streaming synthesis", | |
| "is_final": is_final, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| elif message_type == "tts_get_voices": | |
| # Request available voice presets | |
| await self.send_message(client_id, { | |
| "type": "tts_voices_list", | |
| "voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"], | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| elif message_type == "tts_get_streaming_info": | |
| # Request streaming capabilities info | |
| await self.send_message(client_id, { | |
| "type": "tts_streaming_info", | |
| "streaming_supported": True, | |
| "methodology": "unmute.sh with flush trick", | |
| "message_types": { | |
| "tts_streaming_synthesize": "Send text chunks for streaming processing", | |
| "tts_streaming_response": "Receive final audio with streaming metadata", | |
| "tts_streaming_progress": "Receive progress updates during buffering", | |
| "tts_streaming_error": "Receive streaming-specific error messages" | |
| }, | |
| "flush_trick": "Set is_final=true to trigger synthesis of all buffered chunks", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| else: | |
| safe_log("warning", f"Unknown TTS message type from {client_id}: {message_type}") | |
| # Global TTS WebSocket handler | |
| tts_websocket_handler = WebSocketTTSHandler() | |
| # FastAPI WebSocket Integration for TTS Service | |
| def create_tts_fastapi_app(): | |
| """Create FastAPI app with TTS WebSocket endpoint""" | |
| import uuid | |
| app = FastAPI( | |
| title="TTS GPU Service WebSocket API", | |
| description="Real-time Text-to-Speech with ZeroGPU acceleration", | |
| version="1.0.0" | |
| ) | |
| async def tts_websocket_endpoint(websocket: WebSocket): | |
| """WebSocket endpoint for real-time TTS""" | |
| client_id = str(uuid.uuid4()) | |
| try: | |
| await tts_websocket_handler.connect(websocket, client_id) | |
| while True: | |
| # Receive JSON message from client | |
| data = await websocket.receive_text() | |
| try: | |
| message = json.loads(data) | |
| await tts_websocket_handler.handle_message(client_id, message) | |
| except json.JSONDecodeError as e: | |
| await tts_websocket_handler.send_message(client_id, { | |
| "type": "tts_error", | |
| "message": f"Invalid JSON format: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| except WebSocketDisconnect: | |
| await tts_websocket_handler.disconnect(client_id) | |
| except Exception as e: | |
| safe_log("error", f"TTS WebSocket endpoint error: {e}") | |
| await tts_websocket_handler.disconnect(client_id) | |
| async def tts_websocket_with_id(websocket: WebSocket, client_id: str): | |
| """WebSocket endpoint with specific client ID""" | |
| try: | |
| await tts_websocket_handler.connect(websocket, client_id) | |
| while True: | |
| data = await websocket.receive_text() | |
| try: | |
| message = json.loads(data) | |
| await tts_websocket_handler.handle_message(client_id, message) | |
| except json.JSONDecodeError as e: | |
| await tts_websocket_handler.send_message(client_id, { | |
| "type": "tts_error", | |
| "message": f"Invalid JSON format: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| except WebSocketDisconnect: | |
| await tts_websocket_handler.disconnect(client_id) | |
| except Exception as e: | |
| safe_log("error", f"TTS WebSocket endpoint error: {e}") | |
| await tts_websocket_handler.disconnect(client_id) | |
| async def tts_root(): | |
| """TTS service status endpoint""" | |
| return { | |
| "service": "tts-gpu-service", | |
| "status": "✅ Ready" if model is not None else "⏳ Loading", | |
| "zerogpu": "✅ Active" if torch.cuda.is_available() else "❌ Not Available", | |
| "websocket_endpoints": ["/ws/tts", "/ws/tts/{client_id}"], | |
| "available_voices": ["v2/en_speaker_6", "v2/en_speaker_9", "v2/en_speaker_3", "v2/en_speaker_1"], | |
| "model": "suno/bark" | |
| } | |
| async def tts_health(): | |
| """Health check endpoint with detailed status""" | |
| return { | |
| "status": "healthy", | |
| "model_loaded": model is not None, | |
| "gpu_available": torch.cuda.is_available(), | |
| "loading_strategy": "lazy (ZeroGPU optimized)", | |
| "note": "Model loads on first synthesis request to optimize GPU usage" | |
| } | |
| async def preload_model_logic(): | |
| """Shared logic for model preloading""" | |
| global model | |
| if model is not None: | |
| return {"status": "success", "message": "Model already loaded", "model_loaded": True} | |
| try: | |
| success = load_model() | |
| if success: | |
| return {"status": "success", "message": "Model preloaded successfully", "model_loaded": True} | |
| else: | |
| return {"status": "error", "message": "Failed to preload model", "model_loaded": False} | |
| except Exception as e: | |
| return {"status": "error", "message": f"Preload error: {str(e)}", "model_loaded": False} | |
| async def preload_model_get(): | |
| """Preload the TTS model via GET (browser-friendly)""" | |
| return await preload_model_logic() | |
| async def preload_model_post(): | |
| """Preload the TTS model via POST (API-friendly)""" | |
| return await preload_model_logic() | |
| return app | |
| # MCP Tool Definitions and Handlers | |
| if MCP_AVAILABLE: | |
| async def handle_list_tools() -> List[Tool]: | |
| """List available MCP tools for TTS service""" | |
| return [ | |
| Tool( | |
| name="tts_synthesize", | |
| description="Synthesize speech from text using Bark TTS model with ZeroGPU acceleration", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "text": { | |
| "type": "string", | |
| "description": "Text to convert to speech" | |
| }, | |
| "voice_preset": { | |
| "type": "string", | |
| "description": "Voice preset identifier (e.g., 'v2/en_speaker_6', 'v2/en_speaker_1')", | |
| "default": "v2/en_speaker_6" | |
| } | |
| }, | |
| "required": ["text"] | |
| } | |
| ), | |
| Tool( | |
| name="tts_batch_synthesize", | |
| description="Synthesize speech from multiple texts in batch with ZeroGPU optimization", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "text_list": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "List of texts to convert to speech" | |
| }, | |
| "voice_preset": { | |
| "type": "string", | |
| "description": "Voice preset for all texts", | |
| "default": "v2/en_speaker_6" | |
| } | |
| }, | |
| "required": ["text_list"] | |
| } | |
| ), | |
| Tool( | |
| name="tts_get_info", | |
| description="Get system information including ZeroGPU status and TTS service capabilities", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": {}, | |
| "required": [] | |
| } | |
| ) | |
| ] | |
| async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: | |
| """Handle MCP tool calls for TTS operations""" | |
| try: | |
| if name == "tts_synthesize": | |
| text = arguments.get("text", "") | |
| voice_preset = arguments.get("voice_preset", "v2/en_speaker_6") | |
| if not text.strip(): | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps({ | |
| "error": "No text provided for synthesis", | |
| "status": "❌ Empty text", | |
| "audio_file": None | |
| }) | |
| )] | |
| # Use the existing synthesize_speech function | |
| audio_path, status = synthesize_speech(text, voice_preset) | |
| result = { | |
| "status": status, | |
| "audio_file": audio_path, | |
| "text": text, | |
| "voice_preset": voice_preset, | |
| "success": audio_path is not None | |
| } | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(result, indent=2) | |
| )] | |
| elif name == "tts_batch_synthesize": | |
| text_list = arguments.get("text_list", []) | |
| voice_preset = arguments.get("voice_preset", "v2/en_speaker_6") | |
| if not text_list: | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps({ | |
| "error": "No texts provided for batch synthesis", | |
| "status": "❌ Empty list", | |
| "results": [] | |
| }) | |
| )] | |
| # Use the existing batch_synthesize function | |
| results, batch_status = batch_synthesize(text_list, voice_preset) | |
| # Format results for MCP response | |
| formatted_results = [] | |
| for i, (audio_path, status) in enumerate(results): | |
| formatted_results.append({ | |
| "index": i, | |
| "text": text_list[i] if i < len(text_list) else "", | |
| "audio_file": audio_path, | |
| "status": status, | |
| "success": audio_path is not None | |
| }) | |
| result = { | |
| "batch_status": batch_status, | |
| "results": formatted_results, | |
| "total_items": len(text_list), | |
| "voice_preset": voice_preset | |
| } | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(result, indent=2) | |
| )] | |
| elif name == "tts_get_info": | |
| # Use the existing get_system_info function | |
| system_info = get_system_info() | |
| # Also include MCP-specific information | |
| info_dict = { | |
| "system_info": system_info, | |
| "mcp_status": "✅ MCP Server Active", | |
| "available_tools": ["tts_synthesize", "tts_batch_synthesize", "tts_get_info"], | |
| "voice_presets": [ | |
| {"code": code, "description": desc} | |
| for code, desc in VOICE_PRESETS | |
| ], | |
| "service_endpoints": { | |
| "gradio_interface": "http://localhost:7860", | |
| "mcp_protocol": "stdio" | |
| }, | |
| "model_info": { | |
| "name": "suno/bark-small", | |
| "type": "Text-to-Speech", | |
| "accelerated": "ZeroGPU" | |
| } | |
| } | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(info_dict, indent=2) | |
| )] | |
| else: | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps({ | |
| "error": f"Unknown tool: {name}", | |
| "available_tools": ["tts_synthesize", "tts_batch_synthesize", "tts_get_info"] | |
| }) | |
| )] | |
| except Exception as e: | |
| safe_log("error", f"Error in MCP tool '{name}': {str(e)}") | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps({ | |
| "error": f"Tool execution failed: {str(e)}", | |
| "tool": name, | |
| "arguments": arguments | |
| }) | |
| )] | |
| async def run_mcp_server(): | |
| """Run the MCP server in stdio mode with temporary stream restoration""" | |
| safe_log("info", "🔌 Starting MCP Server for TTS service...") | |
| try: | |
| # Temporarily restore original streams for MCP | |
| original_stdin = sys.stdin | |
| original_stdout = sys.stdout | |
| original_stderr = sys.stderr | |
| # Restore original streams for MCP operation | |
| if hasattr(sys, '__stdin__'): | |
| sys.stdin = sys.__stdin__ | |
| if hasattr(sys, '__stdout__'): | |
| sys.stdout = sys.__stdout__ | |
| if hasattr(sys, '__stderr__'): | |
| sys.stderr = sys.__stderr__ | |
| async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): | |
| await mcp_server.run( | |
| read_stream, | |
| write_stream, | |
| mcp_server.create_initialization_options() | |
| ) | |
| except Exception as e: | |
| safe_log("error", f"MCP Server failed to start: {e}") | |
| # Don't crash the whole service if MCP fails | |
| return | |
| finally: | |
| # Always restore safe streams after MCP operation | |
| try: | |
| sys.stdin = original_stdin | |
| sys.stdout = original_stdout | |
| sys.stderr = original_stderr | |
| except: | |
| pass | |
| def start_mcp_server_thread(): | |
| """Start MCP server in a separate thread""" | |
| if MCP_AVAILABLE: | |
| def run_mcp(): | |
| try: | |
| asyncio.run(run_mcp_server()) | |
| except Exception as e: | |
| safe_log("error", f"MCP Server error: {e}") | |
| mcp_thread = threading.Thread(target=run_mcp, daemon=True) | |
| mcp_thread.start() | |
| safe_log("info", "🔌 MCP Server thread started successfully") | |
| else: | |
| safe_log("warning", "⚠️ MCP not available - only Gradio interface will be active") | |
| # Voice preset options with better descriptions | |
| VOICE_PRESETS = [ | |
| ("v2/en_speaker_0", "🗣️ Speaker 0 - Professional Male"), | |
| ("v2/en_speaker_1", "👩 Speaker 1 - Young Female"), | |
| ("v2/en_speaker_2", "👨 Speaker 2 - Mature Male"), | |
| ("v2/en_speaker_3", "🎭 Speaker 3 - Expressive Female"), | |
| ("v2/en_speaker_4", "📻 Speaker 4 - Radio Voice Male"), | |
| ("v2/en_speaker_5", "🎪 Speaker 5 - Animated Female"), | |
| ("v2/en_speaker_6", "🎯 Speaker 6 - Clear Male (Default)"), | |
| ("v2/en_speaker_7", "🌟 Speaker 7 - Warm Female"), | |
| ("v2/en_speaker_8", "🎬 Speaker 8 - Narrator Male"), | |
| ("v2/en_speaker_9", "✨ Speaker 9 - Elegant Female") | |
| ] | |
| # Create enhanced Gradio interface for ZeroGPU | |
| with gr.Blocks( | |
| title="🚀 ZeroGPU TTS Service", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| } | |
| .main-header { | |
| text-align: center; | |
| background: rgba(255,255,255,0.1); | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| } | |
| """ | |
| ) as iface: | |
| with gr.Row(): | |
| gr.Markdown(""" | |
| <div class="main-header"> | |
| # 🚀 ZeroGPU TTS Service | |
| ## Powered by Hugging Face Pro + Nvidia H200 | |
| Ultra-fast text-to-speech with dynamic GPU scaling | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| # Single synthesis tab | |
| with gr.TabItem("🎤 Single Synthesis"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| text_input = gr.Textbox( | |
| label="📝 Text to Synthesize", | |
| placeholder="Enter the text you want to convert to speech...", | |
| lines=6, | |
| value="Hello! This is a test of the ZeroGPU-accelerated text-to-speech service running on Hugging Face Spaces with Nvidia H200 dynamic resources." | |
| ) | |
| voice_dropdown = gr.Dropdown( | |
| choices=[(desc, code) for code, desc in VOICE_PRESETS], | |
| value="v2/en_speaker_6", | |
| label="🎭 Voice Preset", | |
| info="Choose different voice characteristics" | |
| ) | |
| with gr.Row(): | |
| synthesize_btn = gr.Button("🎵 Generate Speech", variant="primary", size="lg") | |
| clear_btn = gr.Button("🗑️ Clear", variant="secondary") | |
| with gr.Column(scale=1): | |
| system_info = gr.Textbox( | |
| label="⚙️ ZeroGPU Status", | |
| value=get_system_info(), | |
| interactive=False, | |
| lines=8 | |
| ) | |
| with gr.Row(): | |
| audio_output = gr.Audio( | |
| label="🔊 Generated Speech", | |
| type="filepath", | |
| autoplay=False | |
| ) | |
| status_output = gr.Textbox( | |
| label="📊 Generation Status", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| # Batch synthesis tab | |
| with gr.TabItem("📦 Batch Synthesis"): | |
| with gr.Row(): | |
| batch_input = gr.Textbox( | |
| label="📝 Batch Text (one per line)", | |
| placeholder="Enter multiple texts, one per line:\nHello world!\nThis is the second sentence.\nAnd this is the third.", | |
| lines=8 | |
| ) | |
| batch_voice = gr.Dropdown( | |
| choices=[(desc, code) for code, desc in VOICE_PRESETS], | |
| value="v2/en_speaker_6", | |
| label="🎭 Voice for All" | |
| ) | |
| batch_btn = gr.Button("🚀 Generate Batch", variant="primary", size="lg") | |
| batch_status = gr.Textbox(label="📊 Batch Status", interactive=False) | |
| batch_results = gr.File(label="📁 Download All Audio Files", file_count="multiple") | |
| # API Documentation tab | |
| with gr.TabItem("🔧 API Usage"): | |
| gr.Markdown(""" | |
| ## 🔌 API Access | |
| ### Gradio Client API | |
| Use this service programmatically with the Gradio client: | |
| ```python | |
| from gradio_client import Client | |
| # Connect to your ZeroGPU TTS service | |
| client = Client("YOUR_USERNAME/tts-gpu-service") | |
| # Generate speech | |
| result = client.predict( | |
| "Hello from the API!", # text | |
| "v2/en_speaker_6", # voice_preset | |
| api_name="/predict" | |
| ) | |
| audio_file, status = result | |
| print(f"Generated: {audio_file}") | |
| print(f"Status: {status}") | |
| ``` | |
| ### 🔌 MCP Protocol API | |
| This service also supports Model Context Protocol (MCP) for integration with AI assistants: | |
| ```python | |
| # MCP Client example (Claude Code, etc.) | |
| import asyncio | |
| from mcp import ClientSession, StdioServerParameters | |
| from mcp.client.stdio import stdio_client | |
| # Connect to TTS service via MCP | |
| async def use_tts_mcp(): | |
| server_params = StdioServerParameters( | |
| command="python", | |
| args=["app.py", "--mcp-only"] | |
| ) | |
| async with stdio_client(server_params) as (read, write): | |
| async with ClientSession(read, write) as session: | |
| # Initialize connection | |
| await session.initialize() | |
| # List available tools | |
| tools = await session.list_tools() | |
| print("Available TTS tools:", [tool.name for tool in tools.tools]) | |
| # Synthesize speech | |
| result = await session.call_tool("tts_synthesize", { | |
| "text": "Hello from MCP!", | |
| "voice_preset": "v2/en_speaker_6" | |
| }) | |
| print("TTS Result:", result.content[0].text) | |
| # Run MCP client | |
| asyncio.run(use_tts_mcp()) | |
| ``` | |
| ### Available MCP Tools: | |
| - **`tts_synthesize`**: Convert single text to speech | |
| - **`tts_batch_synthesize`**: Convert multiple texts to speech | |
| - **`tts_get_info`**: Get system status and capabilities | |
| ### 🚀 ZeroGPU Benefits: | |
| - **Dynamic Scaling**: Resources allocated only when needed | |
| - **H200 Performance**: Latest GPU architecture | |
| - **Cost Efficient**: No idle costs with Pro subscription | |
| - **High Throughput**: Optimized for batch processing | |
| - **Dual Protocols**: Both Gradio API and MCP support | |
| ### 📊 Performance Metrics: | |
| - **Single synthesis**: ~0.5-2s depending on text length | |
| - **Batch processing**: Parallel execution on H200 | |
| - **Memory efficient**: Automatic cleanup after processing | |
| - **MCP Integration**: Real-time protocol for AI assistants | |
| """) | |
| # Examples with ZeroGPU showcase | |
| gr.Examples( | |
| examples=[ | |
| ["Welcome to our ZeroGPU-powered text-to-speech service running on Nvidia H200!", "v2/en_speaker_6"], | |
| ["The quick brown fox jumps over the lazy dog. This sentence tests various phonemes.", "v2/en_speaker_3"], | |
| ["Artificial intelligence is transforming how we interact with technology using advanced neural networks.", "v2/en_speaker_1"], | |
| ["This ultra-fast voice synthesis is running on Hugging Face Spaces with dynamic H200 GPU allocation.", "v2/en_speaker_8"], | |
| ["ZeroGPU technology allows for instant scaling and cost-effective AI model deployment.", "v2/en_speaker_9"] | |
| ], | |
| inputs=[text_input, voice_dropdown], | |
| outputs=[audio_output, status_output], | |
| fn=synthesize_speech, | |
| cache_examples=False, | |
| label="🎯 ZeroGPU Examples" | |
| ) | |
| # Event handlers with API names | |
| synthesize_btn.click( | |
| fn=synthesize_speech, | |
| inputs=[text_input, voice_dropdown], | |
| outputs=[audio_output, status_output], | |
| api_name="predict" | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ("", None, ""), | |
| outputs=[text_input, audio_output, status_output] | |
| ) | |
| def process_batch(batch_text, voice): | |
| """Process batch input""" | |
| texts = [t.strip() for t in batch_text.split('\n') if t.strip()] | |
| if not texts: | |
| return "❌ No valid texts found", [] | |
| results, status = batch_synthesize(texts, voice) | |
| audio_files = [r[0] for r in results if r[0]] | |
| return status, audio_files | |
| batch_btn.click( | |
| fn=process_batch, | |
| inputs=[batch_input, batch_voice], | |
| outputs=[batch_status, batch_results] | |
| ) | |
| # Auto-refresh system info on load | |
| iface.load( | |
| fn=get_system_info, | |
| outputs=[system_info] | |
| ) | |
| def safe_main(): | |
| """Main function with comprehensive error handling and stream protection""" | |
| try: | |
| # Spaces functions already initialized at module level | |
| print(f"[TTS-INFO] {_gpu_init}, {_cpu_init}", flush=True) | |
| # === FINAL SAFETY MEASURES === | |
| # Last-chance protection against any remaining stream conflicts | |
| # Ensure all logging is completely disabled | |
| import logging | |
| logging.disable(logging.CRITICAL) | |
| # One final attempt to patch any gradio/uvicorn logging that might have been missed | |
| try: | |
| import gradio.helpers | |
| if hasattr(gradio.helpers, 'create_tracker'): | |
| # Disable gradio analytics/tracking | |
| original_create_tracker = gradio.helpers.create_tracker | |
| gradio.helpers.create_tracker = lambda: None | |
| except: | |
| pass | |
| safe_log("info", "🚀 Initializing TTS service with comprehensive stream protection...") | |
| # Get service mode from environment variable | |
| # TTS_SERVICE_MODE can be: websocket, gradio, mcp, triple | |
| # Default: websocket (as requested by user) | |
| # Force WebSocket mode temporarily while debugging environment variables | |
| service_mode = "websocket" # os.environ.get("TTS_SERVICE_MODE", "websocket").lower() | |
| # Log environment variable details clearly | |
| safe_log("info", "=" * 60) | |
| safe_log("info", "🎛️ TTS SERVICE CONFIGURATION") | |
| safe_log("info", "=" * 60) | |
| safe_log("info", f"Environment Variable: TTS_SERVICE_MODE = {os.environ.get('TTS_SERVICE_MODE', 'NOT SET (using default)')}") | |
| safe_log("info", f"Detected Mode: {service_mode}") | |
| safe_log("info", f"Available Modes: websocket, gradio, mcp, triple") | |
| safe_log("info", f"Default Mode: websocket") | |
| safe_log("info", "=" * 60) | |
| if service_mode == "mcp": | |
| # MCP-only mode - no Gradio interface | |
| if MCP_AVAILABLE: | |
| safe_log("info", "🔌 Starting TTS service in MCP-only mode...") | |
| try: | |
| asyncio.run(run_mcp_server()) | |
| except KeyboardInterrupt: | |
| safe_log("info", "MCP server stopped by user") | |
| except Exception as e: | |
| safe_log("error", f"MCP server failed: {e}") | |
| sys.exit(1) | |
| else: | |
| safe_log("error", "❌ MCP not available but MCP-only mode requested") | |
| sys.exit(1) | |
| elif service_mode == "websocket": | |
| # WebSocket-only mode - FastAPI with TTS WebSocket endpoints (DEFAULT) | |
| safe_log("info", "🌐 Starting TTS service in WebSocket-only mode (DEFAULT)...") | |
| try: | |
| import uvicorn | |
| fastapi_app = create_tts_fastapi_app() | |
| # Detect if running on Hugging Face Spaces | |
| space_id = os.environ.get("SPACE_ID") | |
| if space_id: | |
| base_url = f"https://{space_id.replace('/', '-')}.hf.space" | |
| websocket_url = f"wss://{space_id.replace('/', '-')}.hf.space" | |
| else: | |
| base_url = "http://localhost:7860" | |
| websocket_url = "ws://localhost:7860" | |
| safe_log("info", "✅ TTS WebSocket Server: Starting on port 7860...") | |
| safe_log("info", f"🔗 WebSocket Endpoints: {websocket_url}/ws/tts, {websocket_url}/ws/tts/{{client_id}}") | |
| safe_log("info", f"📡 Status Endpoint: {base_url}/") | |
| safe_log("info", f"💚 Health Check: {base_url}/health") | |
| uvicorn.run( | |
| fastapi_app, | |
| host="0.0.0.0", | |
| port=7860, | |
| log_config=None, | |
| access_log=False, | |
| log_level="critical" | |
| ) | |
| except Exception as e: | |
| safe_log("error", f"Failed to start TTS WebSocket server: {e}") | |
| sys.exit(1) | |
| elif service_mode == "gradio": | |
| # Gradio-only mode - Web interface only | |
| safe_log("info", "🎨 Starting TTS service in Gradio-only mode...") | |
| # Start Gradio interface with comprehensive error handling and stream protection | |
| try: | |
| safe_log("info", "✅ Gradio Interface: Starting on port 7860...") | |
| # Final attempt to patch any remaining uvicorn logging | |
| try: | |
| import gradio.networking | |
| if hasattr(gradio.networking, 'start_server'): | |
| original_start_server = gradio.networking.start_server | |
| def patched_start_server(*args, **kwargs): | |
| # Force disable uvicorn logging | |
| if 'log_config' in kwargs: | |
| kwargs['log_config'] = None | |
| if 'access_log' in kwargs: | |
| kwargs['access_log'] = False | |
| kwargs.setdefault('log_level', 'critical') | |
| return original_start_server(*args, **kwargs) | |
| gradio.networking.start_server = patched_start_server | |
| except: | |
| pass | |
| # Try multiple launch strategies with failsafe | |
| launch_success = False | |
| # Strategy 1: Primary launch with error handling | |
| try: | |
| iface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| quiet=True, | |
| show_error=False, | |
| prevent_thread_lock=True, | |
| max_threads=4 | |
| ) | |
| launch_success = True | |
| except Exception as e1: | |
| safe_log("warning", f"Primary launch failed: {e1}") | |
| # Strategy 2: Minimal configuration | |
| try: | |
| iface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| quiet=True | |
| ) | |
| launch_success = True | |
| except Exception as e2: | |
| safe_log("error", f"All Gradio launch strategies failed: {e2}") | |
| sys.exit(1) | |
| if not launch_success: | |
| safe_log("error", "Failed to start Gradio interface") | |
| sys.exit(1) | |
| except Exception as e: | |
| safe_log("error", f"Unexpected error starting Gradio interface: {e}") | |
| sys.exit(1) | |
| elif service_mode == "triple": | |
| # Triple mode - both Gradio, MCP, and WebSocket | |
| safe_log("info", "🚀 Starting TTS service with dual protocol support...") | |
| # Start MCP server in background thread with error handling | |
| if MCP_AVAILABLE: | |
| try: | |
| start_mcp_server_thread() | |
| safe_log("info", "✅ MCP Server: Available on stdio protocol") | |
| except Exception as e: | |
| safe_log("warning", f"⚠️ MCP Server failed to start: {e}") | |
| safe_log("info", "Continuing with Gradio-only mode...") | |
| else: | |
| safe_log("warning", "⚠️ MCP Server: Not available") | |
| # Start Gradio interface with comprehensive error handling and stream protection | |
| try: | |
| safe_log("info", "✅ Gradio Interface: Starting on port 7860...") | |
| # Final attempt to patch any remaining uvicorn logging | |
| try: | |
| import gradio.networking | |
| if hasattr(gradio.networking, 'start_server'): | |
| original_start_server = gradio.networking.start_server | |
| def patched_start_server(*args, **kwargs): | |
| # Force disable uvicorn logging | |
| if 'log_config' in kwargs: | |
| kwargs['log_config'] = None | |
| if 'access_log' in kwargs: | |
| kwargs['access_log'] = False | |
| kwargs.setdefault('log_level', 'critical') | |
| return original_start_server(*args, **kwargs) | |
| gradio.networking.start_server = patched_start_server | |
| except: | |
| pass | |
| # Try multiple launch strategies with failsafe | |
| launch_success = False | |
| # Strategy 1: Direct launch with stream protection | |
| try: | |
| iface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=False, # Disable error display to avoid stream issues | |
| quiet=True, # Reduce Gradio logging | |
| max_threads=4, # Limit threads for ZeroGPU | |
| prevent_thread_lock=True, # Prevent threading issues | |
| show_tips=False, # Reduce output | |
| enable_monitoring=False # Disable monitoring to reduce logging | |
| ) | |
| launch_success = True | |
| except Exception as e1: | |
| safe_log("warning", f"Primary launch failed: {e1}") | |
| # Strategy 2: Minimal launch configuration | |
| try: | |
| safe_log("info", "Attempting minimal launch configuration...") | |
| iface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| quiet=True, | |
| show_error=False | |
| ) | |
| launch_success = True | |
| except Exception as e2: | |
| safe_log("warning", f"Minimal launch failed: {e2}") | |
| # Strategy 3: Last resort - basic launch | |
| try: | |
| safe_log("info", "Attempting basic launch...") | |
| iface.launch(quiet=True) | |
| launch_success = True | |
| except Exception as e3: | |
| safe_log("error", f"All launch strategies failed: {e3}") | |
| if not launch_success: | |
| safe_log("error", "Failed to start Gradio interface with all strategies") | |
| sys.exit(1) | |
| except Exception as e: | |
| safe_log("error", f"Unexpected error starting Gradio interface: {e}") | |
| # Don't exit - try to continue running for debugging | |
| safe_log("info", "Service may still be accessible despite launch errors") | |
| else: | |
| safe_log("error", f"❌ Invalid TTS_SERVICE_MODE: {service_mode}") | |
| safe_log("info", "Valid modes: websocket (default), gradio, mcp, triple") | |
| safe_log("info", "Set environment variable: TTS_SERVICE_MODE=websocket") | |
| sys.exit(1) | |
| except Exception as e: | |
| # Ultimate safety net | |
| try: | |
| safe_log("critical", f"Critical error in main: {e}") | |
| except: | |
| # Even safe_log failed - use basic print | |
| print(f"[TTS-CRITICAL] Fatal error: {e}", flush=True) | |
| # Try to provide some debugging info before exiting | |
| try: | |
| print("[TTS-DEBUG] Python version:", sys.version, flush=True) | |
| print("[TTS-DEBUG] Current working directory:", os.getcwd(), flush=True) | |
| if torch.cuda.is_available(): | |
| print(f"[TTS-DEBUG] CUDA available: {torch.cuda.get_device_name(0)}", flush=True) | |
| else: | |
| print("[TTS-DEBUG] CUDA not available", flush=True) | |
| except: | |
| pass | |
| sys.exit(1) | |
| # Launch the TTS app optimized for ZeroGPU with dual protocol support | |
| if __name__ == "__main__": | |
| safe_main() | |