Spaces:
Sleeping
Sleeping
Peter Michael Gits
feat: Add Streamlit-native WebRTC speech-to-text using unmute.sh patterns
21fac9b | """ | |
| MCP-based Audio Handler for ChatCal Voice - Uses Model Context Protocol. | |
| This module connects to STT and TTS services via MCP for reliable audio processing. | |
| """ | |
| import logging | |
| import numpy as np | |
| import tempfile | |
| import wave | |
| import json | |
| import asyncio | |
| from typing import Optional, Tuple | |
| from .config import config | |
| logger = logging.getLogger(__name__) | |
| class MCPAudioHandler: | |
| """Handles audio processing using MCP services.""" | |
| def __init__(self): | |
| self.stt_service = None | |
| self.tts_service = None | |
| # Initialize real services only - NO DEMO MODE | |
| self._initialize_real_services() | |
| def _initialize_real_services(self): | |
| """Initialize real STT and TTS services - no demo mode.""" | |
| try: | |
| print(f"π§ REAL SERVICE INIT: Starting real service initialization") | |
| # Always try to connect to real services | |
| self._discover_services() | |
| # Force real service usage | |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: | |
| print(f"π΅ Real STT service available at {self.stt_http_url}") | |
| logger.info("π΅ Real STT service connected") | |
| else: | |
| print(f"β No real STT service available - will return errors instead of demos") | |
| logger.error("β No real STT service available") | |
| except Exception as e: | |
| print(f"π§ REAL SERVICE INIT ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"Failed to initialize real services: {e}") | |
| def _initialize_mcp_services(self): | |
| """Initialize MCP-based STT and TTS services.""" | |
| try: | |
| print(f"π§ MCP INIT: Starting MCP service initialization") | |
| # Try to discover and connect to MCP services | |
| self._discover_services() | |
| if self.stt_service: | |
| self.demo_mode = False | |
| print(f"π΅ MCP STT service available - EXITING DEMO MODE") | |
| logger.info("π΅ MCP STT service available, exiting demo mode") | |
| else: | |
| print(f"π΅ STAYING IN DEMO MODE - MCP STT service not available") | |
| logger.warning("π΅ Running in demo mode - MCP STT service unavailable") | |
| except Exception as e: | |
| print(f"π§ MCP INIT ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"Failed to initialize MCP services: {e}") | |
| self.demo_mode = True | |
| def _discover_services(self): | |
| """Discover available MCP services.""" | |
| try: | |
| # ALWAYS initialize WebSocket and HTTP fallbacks first | |
| # This ensures we have working service connections regardless of MCP status | |
| self._initialize_websocket_and_http_services() | |
| # Check what MCP tools are available in the environment | |
| # First, try to import MCP client | |
| try: | |
| from mcp import ClientSession | |
| from mcp.client.stdio import stdio_client | |
| print("π§ MCP: MCP client library available") | |
| # Try to connect to our MCP-enabled services | |
| self._connect_stt_service() | |
| self._connect_tts_service() | |
| except ImportError as e: | |
| print(f"π§ MCP: MCP client not available: {e}") | |
| print("π§ MCP: Using WebSocket/HTTP services (already initialized)") | |
| return | |
| except Exception as e: | |
| print(f"π§ MCP SERVICE DISCOVERY ERROR: {e}") | |
| logger.error(f"MCP service discovery failed: {e}") | |
| print("π§ MCP: Using WebSocket/HTTP services (already initialized)") | |
| def _initialize_websocket_and_http_services(self): | |
| """Initialize WebSocket and HTTP-based service connections (primary method).""" | |
| print("π§ WEBSOCKET/HTTP INIT: Initializing WebSocket and HTTP-based service connections") | |
| # Import HTTP handler components | |
| try: | |
| import requests | |
| # Updated service URLs following unmute.sh methodology | |
| stt_websocket_urls = [ | |
| "wss://pgits-stt-gpu-service.hf.space/ws/stt", | |
| "ws://localhost:7860/ws/stt" # For local development | |
| ] | |
| stt_http_urls = [ | |
| "https://pgits-stt-gpu-service.hf.space", | |
| "https://huggingface.co/spaces/pgits/stt-gpu-service" | |
| ] | |
| tts_urls = [ | |
| "https://pgits-tts-gpu-service.hf.space", | |
| "https://huggingface.co/spaces/pgits/tts-gpu-service" | |
| ] | |
| # Store WebSocket URLs (will be used for real-time audio) | |
| self.stt_websocket_urls = stt_websocket_urls | |
| # Find working HTTP endpoints as fallback | |
| self.stt_http_url = self._find_working_http_endpoint(stt_http_urls, "STT") | |
| self.tts_http_url = self._find_working_http_endpoint(tts_urls, "TTS") | |
| # Preload STT service to warm up the model and avoid cold start delays | |
| if self.stt_http_url: | |
| self._preload_stt_service() | |
| # Check if we have any working STT service (WebSocket preferred, HTTP fallback) | |
| if self.stt_websocket_urls or self.stt_http_url: | |
| print("π§ WEBSOCKET/HTTP FALLBACK: STT service available") | |
| if self.stt_http_url or self.tts_http_url or self.stt_websocket_urls: | |
| print("π§ WEBSOCKET/HTTP FALLBACK: Services available") | |
| else: | |
| print("π§ WEBSOCKET/HTTP FALLBACK: No services available - WILL RETURN ERRORS") | |
| except Exception as e: | |
| print(f"π§ WEBSOCKET/HTTP FALLBACK ERROR: {e}") | |
| def _preload_stt_service(self): | |
| """Preload STT service to warm up the model and avoid cold start delays.""" | |
| try: | |
| import requests | |
| import time | |
| preload_url = f"{self.stt_http_url}/preload?model_size=base" | |
| print(f"π§ STT PRELOAD: Warming up STT service at {preload_url}") | |
| start_time = time.time() | |
| response = requests.get(preload_url, timeout=30) | |
| elapsed_time = time.time() - start_time | |
| if response.status_code == 200: | |
| result = response.json() | |
| if result.get("status") == "success": | |
| print(f"β STT PRELOAD: Model loaded successfully in {elapsed_time:.1f}s - {result.get('message', 'Ready')}") | |
| logger.info(f"STT service preloaded successfully: {result.get('message')}") | |
| else: | |
| print(f"β οΈ STT PRELOAD: {result.get('message', 'Unknown response')}") | |
| logger.warning(f"STT preload response: {result}") | |
| else: | |
| print(f"β οΈ STT PRELOAD: HTTP {response.status_code} - Service may still be starting") | |
| logger.warning(f"STT preload failed: HTTP {response.status_code}") | |
| except requests.exceptions.Timeout: | |
| print(f"β° STT PRELOAD: Timeout after 30s - Service may be cold starting, will try again later") | |
| logger.warning("STT preload timed out - service may be cold starting") | |
| except Exception as e: | |
| print(f"β οΈ STT PRELOAD: Error warming up service - {e}") | |
| logger.warning(f"STT preload error: {e}") | |
| def _find_working_http_endpoint(self, urls: list, service_name: str) -> str: | |
| """Find working HTTP endpoint for fallback.""" | |
| import requests | |
| for url in urls: | |
| try: | |
| response = requests.get(url, timeout=5) | |
| if response.status_code == 200: | |
| print(f"β {service_name} HTTP endpoint found: {url}") | |
| return url | |
| except: | |
| continue | |
| print(f"β No working {service_name} HTTP endpoints found") | |
| return None | |
| def _connect_stt_service(self): | |
| """Connect to MCP STT service.""" | |
| try: | |
| # For now, we'll create a wrapper around the available MCP tools | |
| # In HF Spaces, MCP services might be exposed differently | |
| # Check if we have access to STT via available tools | |
| print(f"π€ MCP: Checking for STT service availability") | |
| # Since we don't have direct MCP access yet, let's create a placeholder | |
| # that can be replaced with actual MCP integration | |
| self.stt_service = self._create_stt_service_wrapper() | |
| if self.stt_service: | |
| print(f"β MCP STT service connected") | |
| except Exception as e: | |
| print(f"π€ MCP STT connection error: {e}") | |
| self.stt_service = None | |
| def _connect_tts_service(self): | |
| """Connect to MCP TTS service.""" | |
| try: | |
| print(f"π MCP: Checking for TTS service availability") | |
| # Create TTS service wrapper | |
| self.tts_service = self._create_tts_service_wrapper() | |
| if self.tts_service: | |
| print(f"β MCP TTS service connected") | |
| except Exception as e: | |
| print(f"π MCP TTS connection error: {e}") | |
| self.tts_service = None | |
| def _create_stt_service_wrapper(self): | |
| """Create STT service wrapper.""" | |
| # For now, return a placeholder that indicates MCP availability | |
| # This will be replaced with actual MCP service calls | |
| return { | |
| 'name': 'stt-gpu-service', | |
| 'available': True, | |
| 'type': 'mcp' | |
| } | |
| def _create_tts_service_wrapper(self): | |
| """Create TTS service wrapper.""" | |
| return { | |
| 'name': 'tts-gpu-service', | |
| 'available': True, | |
| 'type': 'mcp' | |
| } | |
| async def speech_to_text(self, audio_file_path: str) -> str: | |
| """Convert speech to text using REAL SERVICES ONLY - no demo mode.""" | |
| try: | |
| print(f"π€ STT: Processing audio file: {audio_file_path}") | |
| # DEBUG: Check WebSocket URLs availability | |
| print(f"π DEBUG: hasattr(self, 'stt_websocket_urls'): {hasattr(self, 'stt_websocket_urls')}") | |
| if hasattr(self, 'stt_websocket_urls'): | |
| print(f"π DEBUG: self.stt_websocket_urls: {self.stt_websocket_urls}") | |
| print(f"π DEBUG: bool(self.stt_websocket_urls): {bool(self.stt_websocket_urls)}") | |
| # First try WebSocket connection (unmute.sh methodology) | |
| if hasattr(self, 'stt_websocket_urls') and self.stt_websocket_urls: | |
| print(f"π€ STT: Trying WebSocket connection (unmute.sh style)") | |
| result = await self._call_websocket_stt_service(audio_file_path) | |
| if result and not result.startswith("Error"): | |
| print(f"π€ STT: WebSocket SUCCESS - {result[:50]}...") | |
| return result | |
| else: | |
| print(f"π€ STT: WebSocket FAILED - {result}") | |
| else: | |
| print(f"π¨ STT: WebSocket URLs not available - falling back to MCP") | |
| # Try MCP service if available | |
| if self.stt_service: | |
| print(f"π€ STT: Calling MCP STT service") | |
| result = await self._call_mcp_stt_service(audio_file_path) | |
| if result and not result.startswith("Error"): | |
| print(f"π€ STT: MCP SUCCESS - {result[:50]}...") | |
| return result | |
| else: | |
| print(f"π€ STT: MCP FAILED - {result}") | |
| # HTTP fallback if WebSocket and MCP failed | |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: | |
| print(f"π€ STT: Using HTTP fallback service at {self.stt_http_url}") | |
| result = await self._call_http_stt_service(audio_file_path) | |
| if result and not result.startswith("Error"): | |
| print(f"π€ STT: HTTP SUCCESS - {result[:50]}...") | |
| return result | |
| else: | |
| print(f"π€ STT: HTTP FAILED - {result}") | |
| # NO DEMO MODE - Return error if all services failed | |
| error_msg = "ERROR: All STT services failed - no demo mode available" | |
| print(f"π€ STT: {error_msg}") | |
| return error_msg | |
| except Exception as e: | |
| print(f"π€ STT ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"STT error: {e}") | |
| return f"ERROR: STT processing failed - {str(e)}" | |
| async def _call_websocket_stt_service(self, audio_file_path: str) -> str: | |
| """Call STT service via WebSocket using unmute.sh methodology.""" | |
| try: | |
| import websockets | |
| import base64 | |
| import json | |
| import os | |
| print(f"π€ WebSocket STT: Processing {audio_file_path}") | |
| print(f"π DEBUG: File exists: {os.path.exists(audio_file_path)}") | |
| # Read audio file and encode for WebSocket transmission | |
| with open(audio_file_path, 'rb') as audio_file: | |
| audio_data = audio_file.read() | |
| audio_base64 = base64.b64encode(audio_data).decode('utf-8') | |
| print(f"π DEBUG: Audio file size: {len(audio_data)} bytes, path: {audio_file_path}") | |
| print(f"π DEBUG: First 20 bytes: {audio_data[:20].hex() if len(audio_data) >= 20 else 'N/A'}") | |
| print(f"π DEBUG: Base64 length: {len(audio_base64)} chars") | |
| # Audio format detection | |
| if audio_data.startswith(b'RIFF'): | |
| print(f"π DEBUG: Audio format: WAV") | |
| elif audio_data.startswith(b'\xff\xfb') or audio_data.startswith(b'ID3'): | |
| print(f"π DEBUG: Audio format: MP3") | |
| elif audio_data.startswith(b'OggS'): | |
| print(f"π DEBUG: Audio format: OGG") | |
| elif audio_data.startswith(b'\x1a\x45\xdf\xa3'): | |
| print(f"π DEBUG: Audio format: WebM") | |
| else: | |
| print(f"π DEBUG: Audio format: Unknown - first 4 bytes: {audio_data[:4].hex()}") | |
| print(f"π DEBUG: Available WebSocket URLs: {self.stt_websocket_urls}") | |
| # Try each WebSocket URL with sophisticated retry logic | |
| for i, ws_url in enumerate(self.stt_websocket_urls): | |
| result = await self._connect_websocket_with_retry(ws_url, audio_base64) | |
| if result and not result.startswith("Error"): | |
| return result | |
| except ImportError: | |
| print("π€ WebSocket STT: websockets library not available") | |
| return "Error: WebSocket support not available" | |
| except Exception as e: | |
| print(f"π€ WebSocket STT: Fatal error in _call_websocket_stt_service: {e}") | |
| print(f"π DEBUG: Exception type: {type(e).__name__}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error: WebSocket STT fatal error - {str(e)}" | |
| async def _connect_websocket_with_retry(self, ws_url: str, audio_base64: str) -> str: | |
| """Connect to WebSocket with exponential backoff retry logic for GPU cold start calibration.""" | |
| import websockets | |
| import asyncio | |
| import json | |
| import ssl | |
| import time | |
| # π― RETRY CALIBRATION CONFIGURATION | |
| # Based on HF Spaces ZeroGPU cold start patterns | |
| max_retries = 8 # Up to 8 attempts total | |
| base_delay = 3.0 # Start with 3 second delay | |
| max_delay = 45.0 # Cap at 45 seconds (HF Spaces usually under 60s) | |
| backoff_multiplier = 1.4 # Moderate exponential backoff (not too aggressive) | |
| connection_timeout = 30.0 # 30s connection timeout per attempt | |
| # π TIMING CALIBRATION THRESHOLDS | |
| cold_start_threshold = 30.0 # >30s = likely cold start | |
| warm_service_threshold = 8.0 # <8s = warm service | |
| gpu_loading_threshold = 60.0 # >60s = GPU loading issues | |
| start_time = time.time() | |
| for attempt in range(1, max_retries + 1): | |
| attempt_start = time.time() | |
| elapsed_total = attempt_start - start_time | |
| # Calculate delay for this attempt (exponential backoff) | |
| if attempt > 1: | |
| delay = min(base_delay * (backoff_multiplier ** (attempt - 2)), max_delay) | |
| print(f"β° RETRY CALIBRATION: Waiting {delay:.1f}s before attempt {attempt}/{max_retries} (elapsed: {elapsed_total:.1f}s)") | |
| await asyncio.sleep(delay) | |
| try: | |
| print(f"π€ WebSocket STT: Attempt {attempt}/{max_retries} - Connecting to {ws_url}") | |
| # Create SSL context for secure connections | |
| ssl_context = ssl.create_default_context() | |
| ssl_context.check_hostname = False | |
| ssl_context.verify_mode = ssl.CERT_NONE | |
| # Connection parameters optimized for HF Spaces | |
| connect_kwargs = { | |
| "ping_interval": None, | |
| "ping_timeout": 20, | |
| "close_timeout": 10, | |
| "max_size": 10 * 1024 * 1024, # 10MB max message size | |
| "compression": None # Disable compression for audio | |
| } | |
| # Add SSL context for wss:// URLs | |
| if ws_url.startswith("wss://"): | |
| connect_kwargs["ssl"] = ssl_context | |
| # Attempt connection with timeout and detailed timing | |
| connection_start = time.time() | |
| async with asyncio.timeout(connection_timeout): | |
| async with websockets.connect(ws_url, **connect_kwargs) as websocket: | |
| connection_time = time.time() - connection_start | |
| total_elapsed = time.time() - start_time | |
| # π CONNECTION TIMING CALIBRATION | |
| if connection_time < warm_service_threshold: | |
| print(f"β‘ TIMING CALIBRATION: Fast connection ({connection_time:.1f}s) - Service was warm") | |
| elif connection_time < cold_start_threshold: | |
| print(f"π TIMING CALIBRATION: Normal connection ({connection_time:.1f}s) - Service warming up") | |
| else: | |
| print(f"βοΈ TIMING CALIBRATION: Slow connection ({connection_time:.1f}s) - Cold start detected") | |
| if total_elapsed > gpu_loading_threshold: | |
| print(f"π TIMING CALIBRATION: Long total wait ({total_elapsed:.1f}s) - GPU loading issues") | |
| print(f"π RETRY SUCCESS: Connected on attempt {attempt}/{max_retries} after {total_elapsed:.1f}s total") | |
| print(f"π DEBUG: WebSocket connected successfully to {ws_url}") | |
| # Check WebSocket state | |
| print(f"π DEBUG: WebSocket state: {websocket.state}") | |
| print(f"π DEBUG: WebSocket ping interval: {websocket.ping_interval}") | |
| # IMPORTANT: STT service sends connection confirmation first | |
| print(f"π€ WebSocket STT: Waiting for connection confirmation...") | |
| try: | |
| confirm_response = await asyncio.wait_for(websocket.recv(), timeout=10.0) | |
| confirm_result = json.loads(confirm_response) | |
| print(f"π€ WebSocket STT: Connection confirmation: {confirm_result}") | |
| if confirm_result.get("type") != "stt_connection_confirmed": | |
| print(f"π€ WebSocket STT: Unexpected confirmation type: {confirm_result.get('type')}") | |
| continue # Try next URL | |
| except asyncio.TimeoutError: | |
| print(f"π€ WebSocket STT: Connection confirmation timeout for {ws_url}") | |
| continue | |
| except json.JSONDecodeError as e: | |
| print(f"π€ WebSocket STT: Invalid confirmation JSON from {ws_url}: {e}") | |
| continue | |
| # Now send audio chunk with unmute.sh protocol | |
| message = { | |
| "type": "stt_audio_chunk", | |
| "audio_data": audio_base64, | |
| "language": "auto", | |
| "model_size": "base", | |
| "is_final": True # Use flush trick for best performance | |
| } | |
| print(f"π DEBUG: Preparing to send message with keys: {list(message.keys())}") | |
| print(f"π DEBUG: Message type: {message['type']}") | |
| print(f"π DEBUG: Audio data length: {len(message['audio_data'])}") | |
| print(f"π DEBUG: Model size: {message['model_size']}") | |
| print(f"π DEBUG: Is final: {message['is_final']}") | |
| message_json = json.dumps(message) | |
| print(f"π DEBUG: JSON message size: {len(message_json)} chars") | |
| print(f"π€ WebSocket STT: Sending message to {ws_url}...") | |
| await websocket.send(message_json) | |
| print(f"π€ WebSocket STT: Message sent successfully - audio data ({len(audio_data)} bytes)") | |
| print(f"π€ WebSocket STT: Waiting for response (timeout: 30s)...") | |
| # Wait for response with timeout to prevent hanging | |
| try: | |
| response = await asyncio.wait_for(websocket.recv(), timeout=30.0) | |
| print(f"π DEBUG: Raw response received: {response[:500]}...") # First 500 chars | |
| result = json.loads(response) | |
| print(f"π€ WebSocket STT: Parsed response: {result}") | |
| if result.get("type") == "stt_transcription" and result.get("text"): | |
| transcription = result["text"].strip() | |
| processing_time = result.get("processing_time", 0) | |
| rtf = result.get("real_time_factor", 0) | |
| print(f"π€ WebSocket STT: SUCCESS - {transcription[:50]}... (RTF: {rtf:.2f}x)") | |
| return transcription | |
| elif result.get("type") == "stt_error": | |
| error_msg = result.get("message", "Unknown error") | |
| print(f"π€ WebSocket STT: Service error - {error_msg}") | |
| print(f"π DEBUG: Full error response: {result}") | |
| continue # Try next URL | |
| else: | |
| print(f"π€ WebSocket STT: Unexpected response format") | |
| print(f"π DEBUG: Response type: {result.get('type', 'NO_TYPE')}") | |
| print(f"π DEBUG: Response keys: {list(result.keys()) if isinstance(result, dict) else 'NOT_DICT'}") | |
| continue | |
| except asyncio.TimeoutError: | |
| print(f"π€ WebSocket STT: Response timeout after 30s for {ws_url}") | |
| continue | |
| except json.JSONDecodeError as e: | |
| print(f"π€ WebSocket STT: Invalid JSON response from {ws_url}: {e}") | |
| print(f"π DEBUG: Raw response that failed to parse: {response}") | |
| # Don't return here - let retry logic handle it | |
| break # Break from WebSocket connection, will retry | |
| # If we get here, something went wrong but no exception | |
| print(f"π€ WebSocket STT: No valid response received from {ws_url}") | |
| break # Break from WebSocket connection, will retry | |
| except websockets.exceptions.ConnectionClosed as e: | |
| attempt_time = time.time() - attempt_start | |
| print(f"π€ RETRY: Connection closed (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") | |
| print(f"π DEBUG: Connection close code: {e.code if hasattr(e, 'code') else 'N/A'}") | |
| # Continue to retry logic | |
| except websockets.exceptions.InvalidStatusCode as e: | |
| attempt_time = time.time() - attempt_start | |
| status_code = getattr(e, 'status_code', 'unknown') | |
| print(f"π€ RETRY: HTTP {status_code} (attempt {attempt}/{max_retries}, {attempt_time:.1f}s)") | |
| # π CALIBRATION: Different status codes indicate different service states | |
| if status_code == 503: | |
| print(f"π STATUS CALIBRATION: HTTP 503 - Service temporarily unavailable (cold starting)") | |
| elif status_code == 403: | |
| print(f"π« STATUS CALIBRATION: HTTP 403 - Service forbidden (WebSocket not available)") | |
| elif status_code == 502: | |
| print(f"β οΈ STATUS CALIBRATION: HTTP 502 - Bad gateway (service deployment issue)") | |
| else: | |
| print(f"β STATUS CALIBRATION: HTTP {status_code} - Unknown service state") | |
| # Continue to retry logic | |
| except websockets.exceptions.WebSocketException as e: | |
| attempt_time = time.time() - attempt_start | |
| print(f"π€ RETRY: WebSocket error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") | |
| # Continue to retry logic | |
| except asyncio.TimeoutError: | |
| attempt_time = time.time() - attempt_start | |
| print(f"π€ RETRY: Timeout after {attempt_time:.1f}s (attempt {attempt}/{max_retries})") | |
| print(f"β° TIMEOUT CALIBRATION: Service taking >{connection_timeout}s indicates severe cold start") | |
| # Continue to retry logic | |
| except ConnectionRefusedError as e: | |
| attempt_time = time.time() - attempt_start | |
| print(f"π€ RETRY: Connection refused (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") | |
| print(f"π« CONNECTION CALIBRATION: Immediate refusal indicates service not listening") | |
| # Continue to retry logic | |
| except Exception as ws_error: | |
| attempt_time = time.time() - attempt_start | |
| print(f"π€ RETRY: Unexpected error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s): {ws_error}") | |
| print(f"π DEBUG: Exception type: {type(ws_error).__name__}") | |
| # Continue to retry logic | |
| # All retry attempts failed | |
| total_time = time.time() - start_time | |
| print(f"π€ RETRY EXHAUSTED: Failed after {max_retries} attempts over {total_time:.1f}s") | |
| # π FINAL CALIBRATION SUMMARY | |
| if total_time > gpu_loading_threshold: | |
| print(f"π CALIBRATION SUMMARY: Very slow ({total_time:.1f}s) - GPU loading or deployment issues") | |
| elif total_time > cold_start_threshold: | |
| print(f"βοΈ CALIBRATION SUMMARY: Slow ({total_time:.1f}s) - Cold start confirmed") | |
| else: | |
| print(f"β‘ CALIBRATION SUMMARY: Fast failure ({total_time:.1f}s) - Service configuration issue") | |
| return f"Error: WebSocket STT failed after {max_retries} attempts in {total_time:.1f}s" | |
| async def _call_mcp_stt_service(self, audio_file_path: str) -> str: | |
| """Call MCP STT service with HTTP fallback.""" | |
| try: | |
| print(f"π€ MCP STT: Attempting MCP or HTTP service call for {audio_file_path}") | |
| # Try actual MCP integration first | |
| try: | |
| from mcp import ClientSession | |
| from mcp.client.stdio import stdio_client | |
| # Attempt to connect to STT MCP service | |
| print(f"π€ MCP STT: Trying MCP connection...") | |
| # TODO: Implement actual MCP call when services are deployed with MCP | |
| # For now, this would connect to the MCP-enabled STT service | |
| # result = await mcp_client.call_tool("stt_transcribe", { | |
| # "audio_file": audio_file_path, | |
| # "language": "auto", | |
| # "model": "base" | |
| # }) | |
| # Fall back to HTTP until MCP services are deployed | |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: | |
| return await self._call_http_stt_service(audio_file_path) | |
| # Return error - no simulation | |
| print(f"π€ MCP STT: No MCP client available") | |
| return "ERROR: MCP STT service failed - no demo mode" | |
| except ImportError: | |
| print(f"π€ MCP STT: MCP client not available, trying HTTP fallback") | |
| # Try HTTP fallback | |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: | |
| result = await self._call_http_stt_service(audio_file_path) | |
| if result and not result.startswith("Error"): | |
| return result | |
| # Return error - no simulation | |
| return "ERROR: No STT services available" | |
| except Exception as e: | |
| print(f"π€ MCP STT service call error: {e}") | |
| return f"ERROR: MCP STT service error - {str(e)}" | |
| async def _call_http_stt_service(self, audio_file_path: str) -> str: | |
| """Call STT service via HTTP as fallback.""" | |
| try: | |
| import requests | |
| print(f"π€ HTTP STT: Calling service at {self.stt_http_url}") | |
| # Skip problematic Gradio client, try direct HTTP API first | |
| try: | |
| print(f"π€ HTTP STT: Trying direct HTTP API approach") | |
| # Try multiple API endpoint patterns | |
| api_patterns = [ | |
| f"{self.stt_http_url}/api/predict", | |
| f"{self.stt_http_url}/call/predict", | |
| f"{self.stt_http_url}/api/transcribe_audio", | |
| f"{self.stt_http_url}/call/transcribe_audio" | |
| ] | |
| for api_url in api_patterns: | |
| try: | |
| print(f"π€ HTTP STT: Trying API URL: {api_url}") | |
| with open(audio_file_path, 'rb') as audio_file: | |
| # Try different payload formats | |
| payload_formats = [ | |
| # Format 1: Standard Gradio API format | |
| { | |
| 'files': {'data': audio_file}, | |
| 'data': {'data': json.dumps(["auto", "base", True])} | |
| }, | |
| # Format 2: Direct form data | |
| { | |
| 'files': {'audio': audio_file}, | |
| 'data': {'language': 'auto', 'model': 'base', 'timestamps': 'true'} | |
| } | |
| ] | |
| for i, payload in enumerate(payload_formats): | |
| try: | |
| audio_file.seek(0) # Reset file pointer | |
| print(f"π€ HTTP STT: Trying payload format {i+1}") | |
| response = requests.post( | |
| api_url, | |
| files=payload['files'], | |
| data=payload['data'], | |
| timeout=60 | |
| ) | |
| print(f"π€ HTTP STT: Response status: {response.status_code}") | |
| print(f"π€ HTTP STT: Response headers: {dict(response.headers)}") | |
| if response.status_code == 200: | |
| try: | |
| result = response.json() | |
| print(f"π€ HTTP STT: Response JSON: {result}") | |
| # Try different response formats | |
| transcription = None | |
| if isinstance(result, dict): | |
| if 'data' in result and len(result['data']) > 1: | |
| transcription = result['data'][1] | |
| elif 'transcription' in result: | |
| transcription = result['transcription'] | |
| elif 'text' in result: | |
| transcription = result['text'] | |
| elif isinstance(result, list) and len(result) > 1: | |
| transcription = result[1] | |
| if transcription and transcription.strip(): | |
| print(f"π€ HTTP STT: SUCCESS via direct API: {transcription}") | |
| return transcription.strip() | |
| except json.JSONDecodeError as json_err: | |
| print(f"π€ HTTP STT: JSON decode error: {json_err}") | |
| print(f"π€ HTTP STT: Raw response: {response.text[:200]}") | |
| else: | |
| print(f"π€ HTTP STT: Failed with status {response.status_code}") | |
| print(f"π€ HTTP STT: Error response: {response.text[:200]}") | |
| except Exception as payload_error: | |
| print(f"π€ HTTP STT: Payload format {i+1} failed: {payload_error}") | |
| continue | |
| except Exception as url_error: | |
| print(f"π€ HTTP STT: URL {api_url} failed: {url_error}") | |
| continue | |
| print(f"π€ HTTP STT: All direct API attempts failed") | |
| except Exception as direct_error: | |
| print(f"π€ HTTP STT: Direct API approach failed: {direct_error}") | |
| # Final fallback - try Gradio client if direct API failed | |
| try: | |
| print(f"π€ HTTP STT: Falling back to Gradio client...") | |
| from gradio_client import Client | |
| client = Client(self.stt_http_url) | |
| # Try different endpoint names that the STT service might use | |
| gradio_endpoints = [ | |
| "transcribe_audio", # Main transcribe function name | |
| "/transcribe_audio", # With slash | |
| 0, # First endpoint by index | |
| ] | |
| for endpoint in gradio_endpoints: | |
| try: | |
| print(f"π€ HTTP STT: Trying Gradio endpoint: {endpoint}") | |
| # Create proper FileData object for Gradio client | |
| from gradio_client import handle_file | |
| file_data = handle_file(audio_file_path) | |
| result = client.predict( | |
| file_data, # audio file as FileData object | |
| "auto", # language | |
| "base", # model_size | |
| True, # return_timestamps | |
| api_name=endpoint if isinstance(endpoint, str) else None, | |
| fn_index=endpoint if isinstance(endpoint, int) else None | |
| ) | |
| print(f"π€ HTTP STT: Gradio result for {endpoint}: {result}") | |
| if result: | |
| # Handle different result formats | |
| if isinstance(result, (list, tuple)) and len(result) >= 2: | |
| # Multi-element result: [status, transcription, timestamps] | |
| transcription = result[1] | |
| elif isinstance(result, str): | |
| # Direct string result | |
| transcription = result | |
| else: | |
| # Other formats | |
| transcription = str(result) | |
| if transcription and transcription.strip() and not transcription.startswith("β"): | |
| print(f"π€ HTTP STT: SUCCESS via Gradio endpoint {endpoint}: {transcription[:50]}...") | |
| return transcription.strip() | |
| except Exception as endpoint_error: | |
| print(f"π€ HTTP STT: Endpoint {endpoint} failed: {endpoint_error}") | |
| continue | |
| print(f"π€ HTTP STT: All Gradio endpoints failed") | |
| except Exception as gradio_error: | |
| print(f"π€ HTTP STT: Gradio client setup failed: {gradio_error}") | |
| # Return error instead of simulation | |
| return "Error: STT service connection failed" | |
| except Exception as e: | |
| print(f"π€ HTTP STT ERROR: {e}") | |
| # Return error instead of demo text | |
| return f"Error: STT service error - {str(e)}" | |
| def _get_audio_duration(self, audio_file_path: str) -> float: | |
| """Get duration of audio file.""" | |
| try: | |
| with wave.open(audio_file_path, 'rb') as wav_file: | |
| frames = wav_file.getnframes() | |
| rate = wav_file.getframerate() | |
| duration = frames / float(rate) | |
| return duration | |
| except: | |
| return 5.0 # Default duration | |
| def process_audio_input(self, audio_tuple: Tuple) -> str: | |
| """Process Gradio audio input format using MCP.""" | |
| try: | |
| print(f"π€ MCP HANDLER: Processing audio tuple: {type(audio_tuple)}") | |
| print(f"π DEBUG: Audio tuple content: {audio_tuple}") | |
| if audio_tuple is None or len(audio_tuple) < 2: | |
| print(f"π€ MCP HANDLER: No audio received or invalid format") | |
| return "ERROR: No audio received" | |
| # Gradio audio format: (sample_rate, audio_array) | |
| sample_rate, audio_array = audio_tuple | |
| print(f"π€ MCP HANDLER: Sample rate: {sample_rate}, Array type: {type(audio_array)}") | |
| # Convert numpy array to audio file for MCP service | |
| if isinstance(audio_array, np.ndarray): | |
| print(f"π€ MCP HANDLER: Audio array shape: {audio_array.shape}") | |
| # Process with MCP STT service | |
| try: | |
| # Convert to proper format for MCP service - with buffer error handling | |
| try: | |
| audio_normalized = (audio_array * 32767).astype(np.int16) | |
| except ValueError as buffer_error: | |
| if "buffer size must be a multiple of element size" in str(buffer_error): | |
| print(f"π€ MCP HANDLER: Buffer size error - returning error") | |
| audio_duration = len(audio_array) / sample_rate if len(audio_array) > 0 else 1.0 | |
| return f"ERROR: Buffer size issue with audio processing ({audio_duration:.1f}s)" | |
| else: | |
| raise buffer_error | |
| # Create temporary WAV file | |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: | |
| # Write WAV file | |
| with wave.open(tmp_file.name, 'wb') as wav_file: | |
| wav_file.setnchannels(1) # Mono | |
| wav_file.setsampwidth(2) # 16-bit | |
| wav_file.setframerate(sample_rate) | |
| wav_file.writeframes(audio_normalized.tobytes()) | |
| print(f"π€ MCP HANDLER: Created temp WAV file: {tmp_file.name}") | |
| # Process with MCP STT | |
| import asyncio | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| result = loop.run_until_complete(self.speech_to_text(tmp_file.name)) | |
| print(f"π€ MCP HANDLER: MCP STT result: {result}") | |
| return result | |
| finally: | |
| loop.close() | |
| # Clean up temp file | |
| import os | |
| try: | |
| os.unlink(tmp_file.name) | |
| except: | |
| pass # Ignore cleanup errors | |
| except Exception as stt_error: | |
| print(f"π€ MCP HANDLER ERROR: MCP STT processing failed: {stt_error}") | |
| return f"ERROR: STT processing failed - {str(stt_error)}" | |
| print(f"π€ MCP HANDLER: Invalid audio array format") | |
| return "Invalid audio format" | |
| except Exception as e: | |
| print(f"π€ MCP HANDLER ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"MCP audio processing error: {e}") | |
| return f"Error processing audio: {str(e)}" | |
| async def text_to_speech(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: | |
| """Convert text to speech using MCP TTS service.""" | |
| try: | |
| if not config.enable_voice_responses: | |
| return None | |
| if not self.tts_service: | |
| print(f"π MCP TTS: No TTS service available") | |
| return None | |
| print(f"π MCP TTS: Converting text to speech via MCP: {text[:50]}...") | |
| # Call MCP TTS service | |
| result = await self._call_mcp_tts_service(text, voice) | |
| return result | |
| except Exception as e: | |
| print(f"π MCP TTS ERROR: {e}") | |
| logger.error(f"MCP TTS error: {e}") | |
| return None | |
| async def _call_mcp_tts_service(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: | |
| """Call MCP TTS service - placeholder for actual MCP integration.""" | |
| try: | |
| # This is where we would make the actual MCP call | |
| print(f"π MCP TTS: Simulating MCP TTS service call") | |
| # In a real MCP integration, this would be something like: | |
| # result = await mcp_client.call_tool("tts_synthesize", { | |
| # "text": text, | |
| # "voice": voice or config.default_voice | |
| # }) | |
| # For now, return None (no audio in demo) | |
| return None | |
| except Exception as e: | |
| print(f"π MCP TTS service call error: {e}") | |
| return None | |
| def is_audio_service_available(self) -> Tuple[bool, bool]: | |
| """Check if MCP STT and TTS services are available.""" | |
| stt_available = bool(self.stt_service) | |
| tts_available = bool(self.tts_service) | |
| return stt_available, tts_available | |
| def get_audio_status(self) -> dict: | |
| """Get status of MCP audio services.""" | |
| stt_available, tts_available = self.is_audio_service_available() | |
| return { | |
| "stt_available": stt_available, | |
| "tts_available": tts_available, | |
| "demo_mode": False, # NO DEMO MODE | |
| "voice_responses_enabled": config.enable_voice_responses, | |
| "default_voice": config.default_voice, | |
| "service_type": "mcp" | |
| } | |
| # Global MCP audio handler instance | |
| mcp_audio_handler = MCPAudioHandler() |