""" MCP-based Audio Handler for ChatCal Voice - Uses Model Context Protocol. This module connects to STT and TTS services via MCP for reliable audio processing. """ import logging import numpy as np import tempfile import wave import json import asyncio from typing import Optional, Tuple from .config import config logger = logging.getLogger(__name__) class MCPAudioHandler: """Handles audio processing using MCP services.""" def __init__(self): self.stt_service = None self.tts_service = None # Initialize real services only - NO DEMO MODE self._initialize_real_services() def _initialize_real_services(self): """Initialize real STT and TTS services - no demo mode.""" try: print(f"🔧 REAL SERVICE INIT: Starting real service initialization") # Always try to connect to real services self._discover_services() # Force real service usage if hasattr(self, 'stt_http_url') and self.stt_http_url: print(f"🎵 Real STT service available at {self.stt_http_url}") logger.info("🎵 Real STT service connected") else: print(f"❌ No real STT service available - will return errors instead of demos") logger.error("❌ No real STT service available") except Exception as e: print(f"🔧 REAL SERVICE INIT ERROR: {e}") import traceback traceback.print_exc() logger.error(f"Failed to initialize real services: {e}") def _initialize_mcp_services(self): """Initialize MCP-based STT and TTS services.""" try: print(f"🔧 MCP INIT: Starting MCP service initialization") # Try to discover and connect to MCP services self._discover_services() if self.stt_service: self.demo_mode = False print(f"🎵 MCP STT service available - EXITING DEMO MODE") logger.info("🎵 MCP STT service available, exiting demo mode") else: print(f"🎵 STAYING IN DEMO MODE - MCP STT service not available") logger.warning("🎵 Running in demo mode - MCP STT service unavailable") except Exception as e: print(f"🔧 MCP INIT ERROR: {e}") import traceback traceback.print_exc() logger.error(f"Failed to initialize MCP services: {e}") self.demo_mode = True def _discover_services(self): """Discover available MCP services.""" try: # ALWAYS initialize WebSocket and HTTP fallbacks first # This ensures we have working service connections regardless of MCP status self._initialize_websocket_and_http_services() # Check what MCP tools are available in the environment # First, try to import MCP client try: from mcp import ClientSession from mcp.client.stdio import stdio_client print("🔧 MCP: MCP client library available") # Try to connect to our MCP-enabled services self._connect_stt_service() self._connect_tts_service() except ImportError as e: print(f"🔧 MCP: MCP client not available: {e}") print("🔧 MCP: Using WebSocket/HTTP services (already initialized)") return except Exception as e: print(f"🔧 MCP SERVICE DISCOVERY ERROR: {e}") logger.error(f"MCP service discovery failed: {e}") print("🔧 MCP: Using WebSocket/HTTP services (already initialized)") def _initialize_websocket_and_http_services(self): """Initialize WebSocket and HTTP-based service connections (primary method).""" print("🔧 WEBSOCKET/HTTP INIT: Initializing WebSocket and HTTP-based service connections") # Import HTTP handler components try: import requests # Updated service URLs following unmute.sh methodology stt_websocket_urls = [ "wss://pgits-stt-gpu-service.hf.space/ws/stt", "ws://localhost:7860/ws/stt" # For local development ] stt_http_urls = [ "https://pgits-stt-gpu-service.hf.space", "https://huggingface.co/spaces/pgits/stt-gpu-service" ] tts_urls = [ "https://pgits-tts-gpu-service.hf.space", "https://huggingface.co/spaces/pgits/tts-gpu-service" ] # Store WebSocket URLs (will be used for real-time audio) self.stt_websocket_urls = stt_websocket_urls # Find working HTTP endpoints as fallback self.stt_http_url = self._find_working_http_endpoint(stt_http_urls, "STT") self.tts_http_url = self._find_working_http_endpoint(tts_urls, "TTS") # Preload STT service to warm up the model and avoid cold start delays if self.stt_http_url: self._preload_stt_service() # Check if we have any working STT service (WebSocket preferred, HTTP fallback) if self.stt_websocket_urls or self.stt_http_url: print("🔧 WEBSOCKET/HTTP FALLBACK: STT service available") if self.stt_http_url or self.tts_http_url or self.stt_websocket_urls: print("🔧 WEBSOCKET/HTTP FALLBACK: Services available") else: print("🔧 WEBSOCKET/HTTP FALLBACK: No services available - WILL RETURN ERRORS") except Exception as e: print(f"🔧 WEBSOCKET/HTTP FALLBACK ERROR: {e}") def _preload_stt_service(self): """Preload STT service to warm up the model and avoid cold start delays.""" try: import requests import time preload_url = f"{self.stt_http_url}/preload?model_size=base" print(f"🔧 STT PRELOAD: Warming up STT service at {preload_url}") start_time = time.time() response = requests.get(preload_url, timeout=30) elapsed_time = time.time() - start_time if response.status_code == 200: result = response.json() if result.get("status") == "success": print(f"✅ STT PRELOAD: Model loaded successfully in {elapsed_time:.1f}s - {result.get('message', 'Ready')}") logger.info(f"STT service preloaded successfully: {result.get('message')}") else: print(f"⚠️ STT PRELOAD: {result.get('message', 'Unknown response')}") logger.warning(f"STT preload response: {result}") else: print(f"⚠️ STT PRELOAD: HTTP {response.status_code} - Service may still be starting") logger.warning(f"STT preload failed: HTTP {response.status_code}") except requests.exceptions.Timeout: print(f"⏰ STT PRELOAD: Timeout after 30s - Service may be cold starting, will try again later") logger.warning("STT preload timed out - service may be cold starting") except Exception as e: print(f"⚠️ STT PRELOAD: Error warming up service - {e}") logger.warning(f"STT preload error: {e}") def _find_working_http_endpoint(self, urls: list, service_name: str) -> str: """Find working HTTP endpoint for fallback.""" import requests for url in urls: try: response = requests.get(url, timeout=5) if response.status_code == 200: print(f"✅ {service_name} HTTP endpoint found: {url}") return url except: continue print(f"❌ No working {service_name} HTTP endpoints found") return None def _connect_stt_service(self): """Connect to MCP STT service.""" try: # For now, we'll create a wrapper around the available MCP tools # In HF Spaces, MCP services might be exposed differently # Check if we have access to STT via available tools print(f"🎤 MCP: Checking for STT service availability") # Since we don't have direct MCP access yet, let's create a placeholder # that can be replaced with actual MCP integration self.stt_service = self._create_stt_service_wrapper() if self.stt_service: print(f"✅ MCP STT service connected") except Exception as e: print(f"🎤 MCP STT connection error: {e}") self.stt_service = None def _connect_tts_service(self): """Connect to MCP TTS service.""" try: print(f"🔊 MCP: Checking for TTS service availability") # Create TTS service wrapper self.tts_service = self._create_tts_service_wrapper() if self.tts_service: print(f"✅ MCP TTS service connected") except Exception as e: print(f"🔊 MCP TTS connection error: {e}") self.tts_service = None def _create_stt_service_wrapper(self): """Create STT service wrapper.""" # For now, return a placeholder that indicates MCP availability # This will be replaced with actual MCP service calls return { 'name': 'stt-gpu-service', 'available': True, 'type': 'mcp' } def _create_tts_service_wrapper(self): """Create TTS service wrapper.""" return { 'name': 'tts-gpu-service', 'available': True, 'type': 'mcp' } async def speech_to_text(self, audio_file_path: str) -> str: """Convert speech to text using REAL SERVICES ONLY - no demo mode.""" try: print(f"🎤 STT: Processing audio file: {audio_file_path}") # DEBUG: Check WebSocket URLs availability print(f"🔍 DEBUG: hasattr(self, 'stt_websocket_urls'): {hasattr(self, 'stt_websocket_urls')}") if hasattr(self, 'stt_websocket_urls'): print(f"🔍 DEBUG: self.stt_websocket_urls: {self.stt_websocket_urls}") print(f"🔍 DEBUG: bool(self.stt_websocket_urls): {bool(self.stt_websocket_urls)}") # First try WebSocket connection (unmute.sh methodology) if hasattr(self, 'stt_websocket_urls') and self.stt_websocket_urls: print(f"🎤 STT: Trying WebSocket connection (unmute.sh style)") result = await self._call_websocket_stt_service(audio_file_path) if result and not result.startswith("Error"): print(f"🎤 STT: WebSocket SUCCESS - {result[:50]}...") return result else: print(f"🎤 STT: WebSocket FAILED - {result}") else: print(f"🚨 STT: WebSocket URLs not available - falling back to MCP") # Try MCP service if available if self.stt_service: print(f"🎤 STT: Calling MCP STT service") result = await self._call_mcp_stt_service(audio_file_path) if result and not result.startswith("Error"): print(f"🎤 STT: MCP SUCCESS - {result[:50]}...") return result else: print(f"🎤 STT: MCP FAILED - {result}") # HTTP fallback if WebSocket and MCP failed if hasattr(self, 'stt_http_url') and self.stt_http_url: print(f"🎤 STT: Using HTTP fallback service at {self.stt_http_url}") result = await self._call_http_stt_service(audio_file_path) if result and not result.startswith("Error"): print(f"🎤 STT: HTTP SUCCESS - {result[:50]}...") return result else: print(f"🎤 STT: HTTP FAILED - {result}") # NO DEMO MODE - Return error if all services failed error_msg = "ERROR: All STT services failed - no demo mode available" print(f"🎤 STT: {error_msg}") return error_msg except Exception as e: print(f"🎤 STT ERROR: {e}") import traceback traceback.print_exc() logger.error(f"STT error: {e}") return f"ERROR: STT processing failed - {str(e)}" async def _call_websocket_stt_service(self, audio_file_path: str) -> str: """Call STT service via WebSocket using unmute.sh methodology.""" try: import websockets import base64 import json import os print(f"🎤 WebSocket STT: Processing {audio_file_path}") print(f"🔍 DEBUG: File exists: {os.path.exists(audio_file_path)}") # Read audio file and encode for WebSocket transmission with open(audio_file_path, 'rb') as audio_file: audio_data = audio_file.read() audio_base64 = base64.b64encode(audio_data).decode('utf-8') print(f"🔍 DEBUG: Audio file size: {len(audio_data)} bytes, path: {audio_file_path}") print(f"🔍 DEBUG: First 20 bytes: {audio_data[:20].hex() if len(audio_data) >= 20 else 'N/A'}") print(f"🔍 DEBUG: Base64 length: {len(audio_base64)} chars") # Audio format detection if audio_data.startswith(b'RIFF'): print(f"🔍 DEBUG: Audio format: WAV") elif audio_data.startswith(b'\xff\xfb') or audio_data.startswith(b'ID3'): print(f"🔍 DEBUG: Audio format: MP3") elif audio_data.startswith(b'OggS'): print(f"🔍 DEBUG: Audio format: OGG") elif audio_data.startswith(b'\x1a\x45\xdf\xa3'): print(f"🔍 DEBUG: Audio format: WebM") else: print(f"🔍 DEBUG: Audio format: Unknown - first 4 bytes: {audio_data[:4].hex()}") print(f"🔍 DEBUG: Available WebSocket URLs: {self.stt_websocket_urls}") # Try each WebSocket URL with sophisticated retry logic for i, ws_url in enumerate(self.stt_websocket_urls): result = await self._connect_websocket_with_retry(ws_url, audio_base64) if result and not result.startswith("Error"): return result except ImportError: print("🎤 WebSocket STT: websockets library not available") return "Error: WebSocket support not available" except Exception as e: print(f"🎤 WebSocket STT: Fatal error in _call_websocket_stt_service: {e}") print(f"🔍 DEBUG: Exception type: {type(e).__name__}") import traceback traceback.print_exc() return f"Error: WebSocket STT fatal error - {str(e)}" async def _connect_websocket_with_retry(self, ws_url: str, audio_base64: str) -> str: """Connect to WebSocket with exponential backoff retry logic for GPU cold start calibration.""" import websockets import asyncio import json import ssl import time # 🎯 RETRY CALIBRATION CONFIGURATION # Based on HF Spaces ZeroGPU cold start patterns max_retries = 8 # Up to 8 attempts total base_delay = 3.0 # Start with 3 second delay max_delay = 45.0 # Cap at 45 seconds (HF Spaces usually under 60s) backoff_multiplier = 1.4 # Moderate exponential backoff (not too aggressive) connection_timeout = 30.0 # 30s connection timeout per attempt # 📊 TIMING CALIBRATION THRESHOLDS cold_start_threshold = 30.0 # >30s = likely cold start warm_service_threshold = 8.0 # <8s = warm service gpu_loading_threshold = 60.0 # >60s = GPU loading issues start_time = time.time() for attempt in range(1, max_retries + 1): attempt_start = time.time() elapsed_total = attempt_start - start_time # Calculate delay for this attempt (exponential backoff) if attempt > 1: delay = min(base_delay * (backoff_multiplier ** (attempt - 2)), max_delay) print(f"⏰ RETRY CALIBRATION: Waiting {delay:.1f}s before attempt {attempt}/{max_retries} (elapsed: {elapsed_total:.1f}s)") await asyncio.sleep(delay) try: print(f"🎤 WebSocket STT: Attempt {attempt}/{max_retries} - Connecting to {ws_url}") # Create SSL context for secure connections ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE # Connection parameters optimized for HF Spaces connect_kwargs = { "ping_interval": None, "ping_timeout": 20, "close_timeout": 10, "max_size": 10 * 1024 * 1024, # 10MB max message size "compression": None # Disable compression for audio } # Add SSL context for wss:// URLs if ws_url.startswith("wss://"): connect_kwargs["ssl"] = ssl_context # Attempt connection with timeout and detailed timing connection_start = time.time() async with asyncio.timeout(connection_timeout): async with websockets.connect(ws_url, **connect_kwargs) as websocket: connection_time = time.time() - connection_start total_elapsed = time.time() - start_time # 📊 CONNECTION TIMING CALIBRATION if connection_time < warm_service_threshold: print(f"⚡ TIMING CALIBRATION: Fast connection ({connection_time:.1f}s) - Service was warm") elif connection_time < cold_start_threshold: print(f"🔄 TIMING CALIBRATION: Normal connection ({connection_time:.1f}s) - Service warming up") else: print(f"❄️ TIMING CALIBRATION: Slow connection ({connection_time:.1f}s) - Cold start detected") if total_elapsed > gpu_loading_threshold: print(f"🐌 TIMING CALIBRATION: Long total wait ({total_elapsed:.1f}s) - GPU loading issues") print(f"🔍 RETRY SUCCESS: Connected on attempt {attempt}/{max_retries} after {total_elapsed:.1f}s total") print(f"🔍 DEBUG: WebSocket connected successfully to {ws_url}") # Check WebSocket state print(f"🔍 DEBUG: WebSocket state: {websocket.state}") print(f"🔍 DEBUG: WebSocket ping interval: {websocket.ping_interval}") # IMPORTANT: STT service sends connection confirmation first print(f"🎤 WebSocket STT: Waiting for connection confirmation...") try: confirm_response = await asyncio.wait_for(websocket.recv(), timeout=10.0) confirm_result = json.loads(confirm_response) print(f"🎤 WebSocket STT: Connection confirmation: {confirm_result}") if confirm_result.get("type") != "stt_connection_confirmed": print(f"🎤 WebSocket STT: Unexpected confirmation type: {confirm_result.get('type')}") continue # Try next URL except asyncio.TimeoutError: print(f"🎤 WebSocket STT: Connection confirmation timeout for {ws_url}") continue except json.JSONDecodeError as e: print(f"🎤 WebSocket STT: Invalid confirmation JSON from {ws_url}: {e}") continue # Now send audio chunk with unmute.sh protocol message = { "type": "stt_audio_chunk", "audio_data": audio_base64, "language": "auto", "model_size": "base", "is_final": True # Use flush trick for best performance } print(f"🔍 DEBUG: Preparing to send message with keys: {list(message.keys())}") print(f"🔍 DEBUG: Message type: {message['type']}") print(f"🔍 DEBUG: Audio data length: {len(message['audio_data'])}") print(f"🔍 DEBUG: Model size: {message['model_size']}") print(f"🔍 DEBUG: Is final: {message['is_final']}") message_json = json.dumps(message) print(f"🔍 DEBUG: JSON message size: {len(message_json)} chars") print(f"🎤 WebSocket STT: Sending message to {ws_url}...") await websocket.send(message_json) print(f"🎤 WebSocket STT: Message sent successfully - audio data ({len(audio_data)} bytes)") print(f"🎤 WebSocket STT: Waiting for response (timeout: 30s)...") # Wait for response with timeout to prevent hanging try: response = await asyncio.wait_for(websocket.recv(), timeout=30.0) print(f"🔍 DEBUG: Raw response received: {response[:500]}...") # First 500 chars result = json.loads(response) print(f"🎤 WebSocket STT: Parsed response: {result}") if result.get("type") == "stt_transcription" and result.get("text"): transcription = result["text"].strip() processing_time = result.get("processing_time", 0) rtf = result.get("real_time_factor", 0) print(f"🎤 WebSocket STT: SUCCESS - {transcription[:50]}... (RTF: {rtf:.2f}x)") return transcription elif result.get("type") == "stt_error": error_msg = result.get("message", "Unknown error") print(f"🎤 WebSocket STT: Service error - {error_msg}") print(f"🔍 DEBUG: Full error response: {result}") continue # Try next URL else: print(f"🎤 WebSocket STT: Unexpected response format") print(f"🔍 DEBUG: Response type: {result.get('type', 'NO_TYPE')}") print(f"🔍 DEBUG: Response keys: {list(result.keys()) if isinstance(result, dict) else 'NOT_DICT'}") continue except asyncio.TimeoutError: print(f"🎤 WebSocket STT: Response timeout after 30s for {ws_url}") continue except json.JSONDecodeError as e: print(f"🎤 WebSocket STT: Invalid JSON response from {ws_url}: {e}") print(f"🔍 DEBUG: Raw response that failed to parse: {response}") # Don't return here - let retry logic handle it break # Break from WebSocket connection, will retry # If we get here, something went wrong but no exception print(f"🎤 WebSocket STT: No valid response received from {ws_url}") break # Break from WebSocket connection, will retry except websockets.exceptions.ConnectionClosed as e: attempt_time = time.time() - attempt_start print(f"🎤 RETRY: Connection closed (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") print(f"🔍 DEBUG: Connection close code: {e.code if hasattr(e, 'code') else 'N/A'}") # Continue to retry logic except websockets.exceptions.InvalidStatusCode as e: attempt_time = time.time() - attempt_start status_code = getattr(e, 'status_code', 'unknown') print(f"🎤 RETRY: HTTP {status_code} (attempt {attempt}/{max_retries}, {attempt_time:.1f}s)") # 📊 CALIBRATION: Different status codes indicate different service states if status_code == 503: print(f"🔄 STATUS CALIBRATION: HTTP 503 - Service temporarily unavailable (cold starting)") elif status_code == 403: print(f"🚫 STATUS CALIBRATION: HTTP 403 - Service forbidden (WebSocket not available)") elif status_code == 502: print(f"⚠️ STATUS CALIBRATION: HTTP 502 - Bad gateway (service deployment issue)") else: print(f"❓ STATUS CALIBRATION: HTTP {status_code} - Unknown service state") # Continue to retry logic except websockets.exceptions.WebSocketException as e: attempt_time = time.time() - attempt_start print(f"🎤 RETRY: WebSocket error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") # Continue to retry logic except asyncio.TimeoutError: attempt_time = time.time() - attempt_start print(f"🎤 RETRY: Timeout after {attempt_time:.1f}s (attempt {attempt}/{max_retries})") print(f"⏰ TIMEOUT CALIBRATION: Service taking >{connection_timeout}s indicates severe cold start") # Continue to retry logic except ConnectionRefusedError as e: attempt_time = time.time() - attempt_start print(f"🎤 RETRY: Connection refused (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") print(f"🚫 CONNECTION CALIBRATION: Immediate refusal indicates service not listening") # Continue to retry logic except Exception as ws_error: attempt_time = time.time() - attempt_start print(f"🎤 RETRY: Unexpected error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s): {ws_error}") print(f"🔍 DEBUG: Exception type: {type(ws_error).__name__}") # Continue to retry logic # All retry attempts failed total_time = time.time() - start_time print(f"🎤 RETRY EXHAUSTED: Failed after {max_retries} attempts over {total_time:.1f}s") # 📊 FINAL CALIBRATION SUMMARY if total_time > gpu_loading_threshold: print(f"🐌 CALIBRATION SUMMARY: Very slow ({total_time:.1f}s) - GPU loading or deployment issues") elif total_time > cold_start_threshold: print(f"❄️ CALIBRATION SUMMARY: Slow ({total_time:.1f}s) - Cold start confirmed") else: print(f"⚡ CALIBRATION SUMMARY: Fast failure ({total_time:.1f}s) - Service configuration issue") return f"Error: WebSocket STT failed after {max_retries} attempts in {total_time:.1f}s" async def _call_mcp_stt_service(self, audio_file_path: str) -> str: """Call MCP STT service with HTTP fallback.""" try: print(f"🎤 MCP STT: Attempting MCP or HTTP service call for {audio_file_path}") # Try actual MCP integration first try: from mcp import ClientSession from mcp.client.stdio import stdio_client # Attempt to connect to STT MCP service print(f"🎤 MCP STT: Trying MCP connection...") # TODO: Implement actual MCP call when services are deployed with MCP # For now, this would connect to the MCP-enabled STT service # result = await mcp_client.call_tool("stt_transcribe", { # "audio_file": audio_file_path, # "language": "auto", # "model": "base" # }) # Fall back to HTTP until MCP services are deployed if hasattr(self, 'stt_http_url') and self.stt_http_url: return await self._call_http_stt_service(audio_file_path) # Return error - no simulation print(f"🎤 MCP STT: No MCP client available") return "ERROR: MCP STT service failed - no demo mode" except ImportError: print(f"🎤 MCP STT: MCP client not available, trying HTTP fallback") # Try HTTP fallback if hasattr(self, 'stt_http_url') and self.stt_http_url: result = await self._call_http_stt_service(audio_file_path) if result and not result.startswith("Error"): return result # Return error - no simulation return "ERROR: No STT services available" except Exception as e: print(f"🎤 MCP STT service call error: {e}") return f"ERROR: MCP STT service error - {str(e)}" async def _call_http_stt_service(self, audio_file_path: str) -> str: """Call STT service via HTTP as fallback.""" try: import requests print(f"🎤 HTTP STT: Calling service at {self.stt_http_url}") # Skip problematic Gradio client, try direct HTTP API first try: print(f"🎤 HTTP STT: Trying direct HTTP API approach") # Try multiple API endpoint patterns api_patterns = [ f"{self.stt_http_url}/api/predict", f"{self.stt_http_url}/call/predict", f"{self.stt_http_url}/api/transcribe_audio", f"{self.stt_http_url}/call/transcribe_audio" ] for api_url in api_patterns: try: print(f"🎤 HTTP STT: Trying API URL: {api_url}") with open(audio_file_path, 'rb') as audio_file: # Try different payload formats payload_formats = [ # Format 1: Standard Gradio API format { 'files': {'data': audio_file}, 'data': {'data': json.dumps(["auto", "base", True])} }, # Format 2: Direct form data { 'files': {'audio': audio_file}, 'data': {'language': 'auto', 'model': 'base', 'timestamps': 'true'} } ] for i, payload in enumerate(payload_formats): try: audio_file.seek(0) # Reset file pointer print(f"🎤 HTTP STT: Trying payload format {i+1}") response = requests.post( api_url, files=payload['files'], data=payload['data'], timeout=60 ) print(f"🎤 HTTP STT: Response status: {response.status_code}") print(f"🎤 HTTP STT: Response headers: {dict(response.headers)}") if response.status_code == 200: try: result = response.json() print(f"🎤 HTTP STT: Response JSON: {result}") # Try different response formats transcription = None if isinstance(result, dict): if 'data' in result and len(result['data']) > 1: transcription = result['data'][1] elif 'transcription' in result: transcription = result['transcription'] elif 'text' in result: transcription = result['text'] elif isinstance(result, list) and len(result) > 1: transcription = result[1] if transcription and transcription.strip(): print(f"🎤 HTTP STT: SUCCESS via direct API: {transcription}") return transcription.strip() except json.JSONDecodeError as json_err: print(f"🎤 HTTP STT: JSON decode error: {json_err}") print(f"🎤 HTTP STT: Raw response: {response.text[:200]}") else: print(f"🎤 HTTP STT: Failed with status {response.status_code}") print(f"🎤 HTTP STT: Error response: {response.text[:200]}") except Exception as payload_error: print(f"🎤 HTTP STT: Payload format {i+1} failed: {payload_error}") continue except Exception as url_error: print(f"🎤 HTTP STT: URL {api_url} failed: {url_error}") continue print(f"🎤 HTTP STT: All direct API attempts failed") except Exception as direct_error: print(f"🎤 HTTP STT: Direct API approach failed: {direct_error}") # Final fallback - try Gradio client if direct API failed try: print(f"🎤 HTTP STT: Falling back to Gradio client...") from gradio_client import Client client = Client(self.stt_http_url) # Try different endpoint names that the STT service might use gradio_endpoints = [ "transcribe_audio", # Main transcribe function name "/transcribe_audio", # With slash 0, # First endpoint by index ] for endpoint in gradio_endpoints: try: print(f"🎤 HTTP STT: Trying Gradio endpoint: {endpoint}") # Create proper FileData object for Gradio client from gradio_client import handle_file file_data = handle_file(audio_file_path) result = client.predict( file_data, # audio file as FileData object "auto", # language "base", # model_size True, # return_timestamps api_name=endpoint if isinstance(endpoint, str) else None, fn_index=endpoint if isinstance(endpoint, int) else None ) print(f"🎤 HTTP STT: Gradio result for {endpoint}: {result}") if result: # Handle different result formats if isinstance(result, (list, tuple)) and len(result) >= 2: # Multi-element result: [status, transcription, timestamps] transcription = result[1] elif isinstance(result, str): # Direct string result transcription = result else: # Other formats transcription = str(result) if transcription and transcription.strip() and not transcription.startswith("❌"): print(f"🎤 HTTP STT: SUCCESS via Gradio endpoint {endpoint}: {transcription[:50]}...") return transcription.strip() except Exception as endpoint_error: print(f"🎤 HTTP STT: Endpoint {endpoint} failed: {endpoint_error}") continue print(f"🎤 HTTP STT: All Gradio endpoints failed") except Exception as gradio_error: print(f"🎤 HTTP STT: Gradio client setup failed: {gradio_error}") # Return error instead of simulation return "Error: STT service connection failed" except Exception as e: print(f"🎤 HTTP STT ERROR: {e}") # Return error instead of demo text return f"Error: STT service error - {str(e)}" def _get_audio_duration(self, audio_file_path: str) -> float: """Get duration of audio file.""" try: with wave.open(audio_file_path, 'rb') as wav_file: frames = wav_file.getnframes() rate = wav_file.getframerate() duration = frames / float(rate) return duration except: return 5.0 # Default duration def process_audio_input(self, audio_tuple: Tuple) -> str: """Process Gradio audio input format using MCP.""" try: print(f"🎤 MCP HANDLER: Processing audio tuple: {type(audio_tuple)}") print(f"🔍 DEBUG: Audio tuple content: {audio_tuple}") if audio_tuple is None or len(audio_tuple) < 2: print(f"🎤 MCP HANDLER: No audio received or invalid format") return "ERROR: No audio received" # Gradio audio format: (sample_rate, audio_array) sample_rate, audio_array = audio_tuple print(f"🎤 MCP HANDLER: Sample rate: {sample_rate}, Array type: {type(audio_array)}") # Convert numpy array to audio file for MCP service if isinstance(audio_array, np.ndarray): print(f"🎤 MCP HANDLER: Audio array shape: {audio_array.shape}") # Process with MCP STT service try: # Convert to proper format for MCP service - with buffer error handling try: audio_normalized = (audio_array * 32767).astype(np.int16) except ValueError as buffer_error: if "buffer size must be a multiple of element size" in str(buffer_error): print(f"🎤 MCP HANDLER: Buffer size error - returning error") audio_duration = len(audio_array) / sample_rate if len(audio_array) > 0 else 1.0 return f"ERROR: Buffer size issue with audio processing ({audio_duration:.1f}s)" else: raise buffer_error # Create temporary WAV file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: # Write WAV file with wave.open(tmp_file.name, 'wb') as wav_file: wav_file.setnchannels(1) # Mono wav_file.setsampwidth(2) # 16-bit wav_file.setframerate(sample_rate) wav_file.writeframes(audio_normalized.tobytes()) print(f"🎤 MCP HANDLER: Created temp WAV file: {tmp_file.name}") # Process with MCP STT import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete(self.speech_to_text(tmp_file.name)) print(f"🎤 MCP HANDLER: MCP STT result: {result}") return result finally: loop.close() # Clean up temp file import os try: os.unlink(tmp_file.name) except: pass # Ignore cleanup errors except Exception as stt_error: print(f"🎤 MCP HANDLER ERROR: MCP STT processing failed: {stt_error}") return f"ERROR: STT processing failed - {str(stt_error)}" print(f"🎤 MCP HANDLER: Invalid audio array format") return "Invalid audio format" except Exception as e: print(f"🎤 MCP HANDLER ERROR: {e}") import traceback traceback.print_exc() logger.error(f"MCP audio processing error: {e}") return f"Error processing audio: {str(e)}" async def text_to_speech(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: """Convert text to speech using MCP TTS service.""" try: if not config.enable_voice_responses: return None if not self.tts_service: print(f"🔊 MCP TTS: No TTS service available") return None print(f"🔊 MCP TTS: Converting text to speech via MCP: {text[:50]}...") # Call MCP TTS service result = await self._call_mcp_tts_service(text, voice) return result except Exception as e: print(f"🔊 MCP TTS ERROR: {e}") logger.error(f"MCP TTS error: {e}") return None async def _call_mcp_tts_service(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: """Call MCP TTS service - placeholder for actual MCP integration.""" try: # This is where we would make the actual MCP call print(f"🔊 MCP TTS: Simulating MCP TTS service call") # In a real MCP integration, this would be something like: # result = await mcp_client.call_tool("tts_synthesize", { # "text": text, # "voice": voice or config.default_voice # }) # For now, return None (no audio in demo) return None except Exception as e: print(f"🔊 MCP TTS service call error: {e}") return None def is_audio_service_available(self) -> Tuple[bool, bool]: """Check if MCP STT and TTS services are available.""" stt_available = bool(self.stt_service) tts_available = bool(self.tts_service) return stt_available, tts_available def get_audio_status(self) -> dict: """Get status of MCP audio services.""" stt_available, tts_available = self.is_audio_service_available() return { "stt_available": stt_available, "tts_available": tts_available, "demo_mode": False, # NO DEMO MODE "voice_responses_enabled": config.enable_voice_responses, "default_voice": config.default_voice, "service_type": "mcp" } # Global MCP audio handler instance mcp_audio_handler = MCPAudioHandler()