| """ |
| MCP-based Audio Handler for ChatCal Voice - Uses Model Context Protocol. |
| |
| This module connects to STT and TTS services via MCP for reliable audio processing. |
| """ |
|
|
| import logging |
| import numpy as np |
| import tempfile |
| import wave |
| import json |
| import asyncio |
| from typing import Optional, Tuple |
|
|
| from .config import config |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class MCPAudioHandler: |
| """Handles audio processing using MCP services.""" |
| |
| def __init__(self): |
| self.stt_service = None |
| self.tts_service = None |
| |
| |
| self._initialize_real_services() |
| |
| def _initialize_real_services(self): |
| """Initialize real STT and TTS services - no demo mode.""" |
| try: |
| print(f"π§ REAL SERVICE INIT: Starting real service initialization") |
| |
| |
| self._discover_services() |
| |
| |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: |
| print(f"π΅ Real STT service available at {self.stt_http_url}") |
| logger.info("π΅ Real STT service connected") |
| else: |
| print(f"β No real STT service available - will return errors instead of demos") |
| logger.error("β No real STT service available") |
| |
| except Exception as e: |
| print(f"π§ REAL SERVICE INIT ERROR: {e}") |
| import traceback |
| traceback.print_exc() |
| logger.error(f"Failed to initialize real services: {e}") |
| |
| def _initialize_mcp_services(self): |
| """Initialize MCP-based STT and TTS services.""" |
| try: |
| print(f"π§ MCP INIT: Starting MCP service initialization") |
| |
| |
| self._discover_services() |
| |
| if self.stt_service: |
| self.demo_mode = False |
| print(f"π΅ MCP STT service available - EXITING DEMO MODE") |
| logger.info("π΅ MCP STT service available, exiting demo mode") |
| else: |
| print(f"π΅ STAYING IN DEMO MODE - MCP STT service not available") |
| logger.warning("π΅ Running in demo mode - MCP STT service unavailable") |
| |
| except Exception as e: |
| print(f"π§ MCP INIT ERROR: {e}") |
| import traceback |
| traceback.print_exc() |
| logger.error(f"Failed to initialize MCP services: {e}") |
| self.demo_mode = True |
| |
| def _discover_services(self): |
| """Discover available MCP services.""" |
| try: |
| |
| |
| self._initialize_websocket_and_http_services() |
| |
| |
| |
| try: |
| from mcp import ClientSession |
| from mcp.client.stdio import stdio_client |
| print("π§ MCP: MCP client library available") |
| |
| |
| self._connect_stt_service() |
| self._connect_tts_service() |
| |
| except ImportError as e: |
| print(f"π§ MCP: MCP client not available: {e}") |
| print("π§ MCP: Using WebSocket/HTTP services (already initialized)") |
| return |
| |
| except Exception as e: |
| print(f"π§ MCP SERVICE DISCOVERY ERROR: {e}") |
| logger.error(f"MCP service discovery failed: {e}") |
| print("π§ MCP: Using WebSocket/HTTP services (already initialized)") |
| |
| def _initialize_websocket_and_http_services(self): |
| """Initialize WebSocket and HTTP-based service connections (primary method).""" |
| print("π§ WEBSOCKET/HTTP INIT: Initializing WebSocket and HTTP-based service connections") |
| |
| |
| try: |
| import requests |
| |
| |
| stt_websocket_urls = [ |
| "wss://pgits-stt-gpu-service.hf.space/ws/stt", |
| "ws://localhost:7860/ws/stt" |
| ] |
| |
| stt_http_urls = [ |
| "https://pgits-stt-gpu-service.hf.space", |
| "https://huggingface.co/spaces/pgits/stt-gpu-service" |
| ] |
| |
| tts_urls = [ |
| "https://pgits-tts-gpu-service.hf.space", |
| "https://huggingface.co/spaces/pgits/tts-gpu-service" |
| ] |
| |
| |
| self.stt_websocket_urls = stt_websocket_urls |
| |
| |
| self.stt_http_url = self._find_working_http_endpoint(stt_http_urls, "STT") |
| self.tts_http_url = self._find_working_http_endpoint(tts_urls, "TTS") |
| |
| |
| if self.stt_http_url: |
| self._preload_stt_service() |
| |
| |
| if self.stt_websocket_urls or self.stt_http_url: |
| print("π§ WEBSOCKET/HTTP FALLBACK: STT service available") |
| |
| if self.stt_http_url or self.tts_http_url or self.stt_websocket_urls: |
| print("π§ WEBSOCKET/HTTP FALLBACK: Services available") |
| else: |
| print("π§ WEBSOCKET/HTTP FALLBACK: No services available - WILL RETURN ERRORS") |
| |
| except Exception as e: |
| print(f"π§ WEBSOCKET/HTTP FALLBACK ERROR: {e}") |
| |
| def _preload_stt_service(self): |
| """Preload STT service to warm up the model and avoid cold start delays.""" |
| try: |
| import requests |
| import time |
| |
| preload_url = f"{self.stt_http_url}/preload?model_size=base" |
| print(f"π§ STT PRELOAD: Warming up STT service at {preload_url}") |
| |
| start_time = time.time() |
| response = requests.get(preload_url, timeout=30) |
| elapsed_time = time.time() - start_time |
| |
| if response.status_code == 200: |
| result = response.json() |
| if result.get("status") == "success": |
| print(f"β
STT PRELOAD: Model loaded successfully in {elapsed_time:.1f}s - {result.get('message', 'Ready')}") |
| logger.info(f"STT service preloaded successfully: {result.get('message')}") |
| else: |
| print(f"β οΈ STT PRELOAD: {result.get('message', 'Unknown response')}") |
| logger.warning(f"STT preload response: {result}") |
| else: |
| print(f"β οΈ STT PRELOAD: HTTP {response.status_code} - Service may still be starting") |
| logger.warning(f"STT preload failed: HTTP {response.status_code}") |
| |
| except requests.exceptions.Timeout: |
| print(f"β° STT PRELOAD: Timeout after 30s - Service may be cold starting, will try again later") |
| logger.warning("STT preload timed out - service may be cold starting") |
| except Exception as e: |
| print(f"β οΈ STT PRELOAD: Error warming up service - {e}") |
| logger.warning(f"STT preload error: {e}") |
| |
| def _find_working_http_endpoint(self, urls: list, service_name: str) -> str: |
| """Find working HTTP endpoint for fallback.""" |
| import requests |
| |
| for url in urls: |
| try: |
| response = requests.get(url, timeout=5) |
| if response.status_code == 200: |
| print(f"β
{service_name} HTTP endpoint found: {url}") |
| return url |
| except: |
| continue |
| |
| print(f"β No working {service_name} HTTP endpoints found") |
| return None |
| |
| def _connect_stt_service(self): |
| """Connect to MCP STT service.""" |
| try: |
| |
| |
| |
| |
| print(f"π€ MCP: Checking for STT service availability") |
| |
| |
| |
| self.stt_service = self._create_stt_service_wrapper() |
| |
| if self.stt_service: |
| print(f"β
MCP STT service connected") |
| |
| except Exception as e: |
| print(f"π€ MCP STT connection error: {e}") |
| self.stt_service = None |
| |
| def _connect_tts_service(self): |
| """Connect to MCP TTS service.""" |
| try: |
| print(f"π MCP: Checking for TTS service availability") |
| |
| |
| self.tts_service = self._create_tts_service_wrapper() |
| |
| if self.tts_service: |
| print(f"β
MCP TTS service connected") |
| |
| except Exception as e: |
| print(f"π MCP TTS connection error: {e}") |
| self.tts_service = None |
| |
| def _create_stt_service_wrapper(self): |
| """Create STT service wrapper.""" |
| |
| |
| return { |
| 'name': 'stt-gpu-service', |
| 'available': True, |
| 'type': 'mcp' |
| } |
| |
| def _create_tts_service_wrapper(self): |
| """Create TTS service wrapper.""" |
| return { |
| 'name': 'tts-gpu-service', |
| 'available': True, |
| 'type': 'mcp' |
| } |
| |
| async def speech_to_text(self, audio_file_path: str) -> str: |
| """Convert speech to text using REAL SERVICES ONLY - no demo mode.""" |
| try: |
| print(f"π€ STT: Processing audio file: {audio_file_path}") |
| |
| |
| print(f"π DEBUG: hasattr(self, 'stt_websocket_urls'): {hasattr(self, 'stt_websocket_urls')}") |
| if hasattr(self, 'stt_websocket_urls'): |
| print(f"π DEBUG: self.stt_websocket_urls: {self.stt_websocket_urls}") |
| print(f"π DEBUG: bool(self.stt_websocket_urls): {bool(self.stt_websocket_urls)}") |
| |
| |
| if hasattr(self, 'stt_websocket_urls') and self.stt_websocket_urls: |
| print(f"π€ STT: Trying WebSocket connection (unmute.sh style)") |
| result = await self._call_websocket_stt_service(audio_file_path) |
| if result and not result.startswith("Error"): |
| print(f"π€ STT: WebSocket SUCCESS - {result[:50]}...") |
| return result |
| else: |
| print(f"π€ STT: WebSocket FAILED - {result}") |
| else: |
| print(f"π¨ STT: WebSocket URLs not available - falling back to MCP") |
| |
| |
| if self.stt_service: |
| print(f"π€ STT: Calling MCP STT service") |
| result = await self._call_mcp_stt_service(audio_file_path) |
| if result and not result.startswith("Error"): |
| print(f"π€ STT: MCP SUCCESS - {result[:50]}...") |
| return result |
| else: |
| print(f"π€ STT: MCP FAILED - {result}") |
| |
| |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: |
| print(f"π€ STT: Using HTTP fallback service at {self.stt_http_url}") |
| result = await self._call_http_stt_service(audio_file_path) |
| if result and not result.startswith("Error"): |
| print(f"π€ STT: HTTP SUCCESS - {result[:50]}...") |
| return result |
| else: |
| print(f"π€ STT: HTTP FAILED - {result}") |
| |
| |
| error_msg = "ERROR: All STT services failed - no demo mode available" |
| print(f"π€ STT: {error_msg}") |
| return error_msg |
| |
| except Exception as e: |
| print(f"π€ STT ERROR: {e}") |
| import traceback |
| traceback.print_exc() |
| logger.error(f"STT error: {e}") |
| return f"ERROR: STT processing failed - {str(e)}" |
| |
| async def _call_websocket_stt_service(self, audio_file_path: str) -> str: |
| """Call STT service via WebSocket using unmute.sh methodology.""" |
| try: |
| import websockets |
| import base64 |
| import json |
| import os |
| |
| print(f"π€ WebSocket STT: Processing {audio_file_path}") |
| print(f"π DEBUG: File exists: {os.path.exists(audio_file_path)}") |
| |
| |
| with open(audio_file_path, 'rb') as audio_file: |
| audio_data = audio_file.read() |
| audio_base64 = base64.b64encode(audio_data).decode('utf-8') |
| print(f"π DEBUG: Audio file size: {len(audio_data)} bytes, path: {audio_file_path}") |
| print(f"π DEBUG: First 20 bytes: {audio_data[:20].hex() if len(audio_data) >= 20 else 'N/A'}") |
| print(f"π DEBUG: Base64 length: {len(audio_base64)} chars") |
| |
| |
| if audio_data.startswith(b'RIFF'): |
| print(f"π DEBUG: Audio format: WAV") |
| elif audio_data.startswith(b'\xff\xfb') or audio_data.startswith(b'ID3'): |
| print(f"π DEBUG: Audio format: MP3") |
| elif audio_data.startswith(b'OggS'): |
| print(f"π DEBUG: Audio format: OGG") |
| elif audio_data.startswith(b'\x1a\x45\xdf\xa3'): |
| print(f"π DEBUG: Audio format: WebM") |
| else: |
| print(f"π DEBUG: Audio format: Unknown - first 4 bytes: {audio_data[:4].hex()}") |
| |
| print(f"π DEBUG: Available WebSocket URLs: {self.stt_websocket_urls}") |
| |
| |
| for i, ws_url in enumerate(self.stt_websocket_urls): |
| result = await self._connect_websocket_with_retry(ws_url, audio_base64) |
| if result and not result.startswith("Error"): |
| return result |
| |
| except ImportError: |
| print("π€ WebSocket STT: websockets library not available") |
| return "Error: WebSocket support not available" |
| except Exception as e: |
| print(f"π€ WebSocket STT: Fatal error in _call_websocket_stt_service: {e}") |
| print(f"π DEBUG: Exception type: {type(e).__name__}") |
| import traceback |
| traceback.print_exc() |
| return f"Error: WebSocket STT fatal error - {str(e)}" |
| |
| async def _connect_websocket_with_retry(self, ws_url: str, audio_base64: str) -> str: |
| """Connect to WebSocket with exponential backoff retry logic for GPU cold start calibration.""" |
| import websockets |
| import asyncio |
| import json |
| import ssl |
| import time |
| |
| |
| |
| max_retries = 8 |
| base_delay = 3.0 |
| max_delay = 45.0 |
| backoff_multiplier = 1.4 |
| connection_timeout = 30.0 |
| |
| |
| cold_start_threshold = 30.0 |
| warm_service_threshold = 8.0 |
| gpu_loading_threshold = 60.0 |
| |
| start_time = time.time() |
| |
| for attempt in range(1, max_retries + 1): |
| attempt_start = time.time() |
| elapsed_total = attempt_start - start_time |
| |
| |
| if attempt > 1: |
| delay = min(base_delay * (backoff_multiplier ** (attempt - 2)), max_delay) |
| print(f"β° RETRY CALIBRATION: Waiting {delay:.1f}s before attempt {attempt}/{max_retries} (elapsed: {elapsed_total:.1f}s)") |
| await asyncio.sleep(delay) |
| |
| try: |
| print(f"π€ WebSocket STT: Attempt {attempt}/{max_retries} - Connecting to {ws_url}") |
| |
| |
| ssl_context = ssl.create_default_context() |
| ssl_context.check_hostname = False |
| ssl_context.verify_mode = ssl.CERT_NONE |
| |
| |
| connect_kwargs = { |
| "ping_interval": None, |
| "ping_timeout": 20, |
| "close_timeout": 10, |
| "max_size": 10 * 1024 * 1024, |
| "compression": None |
| } |
| |
| |
| if ws_url.startswith("wss://"): |
| connect_kwargs["ssl"] = ssl_context |
| |
| |
| connection_start = time.time() |
| async with asyncio.timeout(connection_timeout): |
| async with websockets.connect(ws_url, **connect_kwargs) as websocket: |
| connection_time = time.time() - connection_start |
| total_elapsed = time.time() - start_time |
| |
| |
| if connection_time < warm_service_threshold: |
| print(f"β‘ TIMING CALIBRATION: Fast connection ({connection_time:.1f}s) - Service was warm") |
| elif connection_time < cold_start_threshold: |
| print(f"π TIMING CALIBRATION: Normal connection ({connection_time:.1f}s) - Service warming up") |
| else: |
| print(f"βοΈ TIMING CALIBRATION: Slow connection ({connection_time:.1f}s) - Cold start detected") |
| |
| if total_elapsed > gpu_loading_threshold: |
| print(f"π TIMING CALIBRATION: Long total wait ({total_elapsed:.1f}s) - GPU loading issues") |
| |
| print(f"π RETRY SUCCESS: Connected on attempt {attempt}/{max_retries} after {total_elapsed:.1f}s total") |
| print(f"π DEBUG: WebSocket connected successfully to {ws_url}") |
| |
| |
| print(f"π DEBUG: WebSocket state: {websocket.state}") |
| print(f"π DEBUG: WebSocket ping interval: {websocket.ping_interval}") |
| |
| |
| print(f"π€ WebSocket STT: Waiting for connection confirmation...") |
| try: |
| confirm_response = await asyncio.wait_for(websocket.recv(), timeout=10.0) |
| confirm_result = json.loads(confirm_response) |
| print(f"π€ WebSocket STT: Connection confirmation: {confirm_result}") |
| |
| if confirm_result.get("type") != "stt_connection_confirmed": |
| print(f"π€ WebSocket STT: Unexpected confirmation type: {confirm_result.get('type')}") |
| continue |
| |
| except asyncio.TimeoutError: |
| print(f"π€ WebSocket STT: Connection confirmation timeout for {ws_url}") |
| continue |
| except json.JSONDecodeError as e: |
| print(f"π€ WebSocket STT: Invalid confirmation JSON from {ws_url}: {e}") |
| continue |
| |
| |
| message = { |
| "type": "stt_audio_chunk", |
| "audio_data": audio_base64, |
| "language": "auto", |
| "model_size": "base", |
| "is_final": True |
| } |
| |
| print(f"π DEBUG: Preparing to send message with keys: {list(message.keys())}") |
| print(f"π DEBUG: Message type: {message['type']}") |
| print(f"π DEBUG: Audio data length: {len(message['audio_data'])}") |
| print(f"π DEBUG: Model size: {message['model_size']}") |
| print(f"π DEBUG: Is final: {message['is_final']}") |
| |
| message_json = json.dumps(message) |
| print(f"π DEBUG: JSON message size: {len(message_json)} chars") |
| |
| print(f"π€ WebSocket STT: Sending message to {ws_url}...") |
| await websocket.send(message_json) |
| print(f"π€ WebSocket STT: Message sent successfully - audio data ({len(audio_data)} bytes)") |
| |
| print(f"π€ WebSocket STT: Waiting for response (timeout: 30s)...") |
| |
| try: |
| response = await asyncio.wait_for(websocket.recv(), timeout=30.0) |
| print(f"π DEBUG: Raw response received: {response[:500]}...") |
| |
| result = json.loads(response) |
| print(f"π€ WebSocket STT: Parsed response: {result}") |
| |
| if result.get("type") == "stt_transcription" and result.get("text"): |
| transcription = result["text"].strip() |
| processing_time = result.get("processing_time", 0) |
| rtf = result.get("real_time_factor", 0) |
| |
| print(f"π€ WebSocket STT: SUCCESS - {transcription[:50]}... (RTF: {rtf:.2f}x)") |
| return transcription |
| elif result.get("type") == "stt_error": |
| error_msg = result.get("message", "Unknown error") |
| print(f"π€ WebSocket STT: Service error - {error_msg}") |
| print(f"π DEBUG: Full error response: {result}") |
| continue |
| else: |
| print(f"π€ WebSocket STT: Unexpected response format") |
| print(f"π DEBUG: Response type: {result.get('type', 'NO_TYPE')}") |
| print(f"π DEBUG: Response keys: {list(result.keys()) if isinstance(result, dict) else 'NOT_DICT'}") |
| continue |
| except asyncio.TimeoutError: |
| print(f"π€ WebSocket STT: Response timeout after 30s for {ws_url}") |
| continue |
| except json.JSONDecodeError as e: |
| print(f"π€ WebSocket STT: Invalid JSON response from {ws_url}: {e}") |
| print(f"π DEBUG: Raw response that failed to parse: {response}") |
| |
| break |
| |
| |
| print(f"π€ WebSocket STT: No valid response received from {ws_url}") |
| break |
| |
| except websockets.exceptions.ConnectionClosed as e: |
| attempt_time = time.time() - attempt_start |
| print(f"π€ RETRY: Connection closed (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") |
| print(f"π DEBUG: Connection close code: {e.code if hasattr(e, 'code') else 'N/A'}") |
| |
| |
| except websockets.exceptions.InvalidStatusCode as e: |
| attempt_time = time.time() - attempt_start |
| status_code = getattr(e, 'status_code', 'unknown') |
| print(f"π€ RETRY: HTTP {status_code} (attempt {attempt}/{max_retries}, {attempt_time:.1f}s)") |
| |
| |
| if status_code == 503: |
| print(f"π STATUS CALIBRATION: HTTP 503 - Service temporarily unavailable (cold starting)") |
| elif status_code == 403: |
| print(f"π« STATUS CALIBRATION: HTTP 403 - Service forbidden (WebSocket not available)") |
| elif status_code == 502: |
| print(f"β οΈ STATUS CALIBRATION: HTTP 502 - Bad gateway (service deployment issue)") |
| else: |
| print(f"β STATUS CALIBRATION: HTTP {status_code} - Unknown service state") |
| |
| |
| |
| except websockets.exceptions.WebSocketException as e: |
| attempt_time = time.time() - attempt_start |
| print(f"π€ RETRY: WebSocket error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") |
| |
| |
| except asyncio.TimeoutError: |
| attempt_time = time.time() - attempt_start |
| print(f"π€ RETRY: Timeout after {attempt_time:.1f}s (attempt {attempt}/{max_retries})") |
| print(f"β° TIMEOUT CALIBRATION: Service taking >{connection_timeout}s indicates severe cold start") |
| |
| |
| except ConnectionRefusedError as e: |
| attempt_time = time.time() - attempt_start |
| print(f"π€ RETRY: Connection refused (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}") |
| print(f"π« CONNECTION CALIBRATION: Immediate refusal indicates service not listening") |
| |
| |
| except Exception as ws_error: |
| attempt_time = time.time() - attempt_start |
| print(f"π€ RETRY: Unexpected error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s): {ws_error}") |
| print(f"π DEBUG: Exception type: {type(ws_error).__name__}") |
| |
| |
| |
| total_time = time.time() - start_time |
| print(f"π€ RETRY EXHAUSTED: Failed after {max_retries} attempts over {total_time:.1f}s") |
| |
| |
| if total_time > gpu_loading_threshold: |
| print(f"π CALIBRATION SUMMARY: Very slow ({total_time:.1f}s) - GPU loading or deployment issues") |
| elif total_time > cold_start_threshold: |
| print(f"βοΈ CALIBRATION SUMMARY: Slow ({total_time:.1f}s) - Cold start confirmed") |
| else: |
| print(f"β‘ CALIBRATION SUMMARY: Fast failure ({total_time:.1f}s) - Service configuration issue") |
| |
| return f"Error: WebSocket STT failed after {max_retries} attempts in {total_time:.1f}s" |
| |
| async def _call_mcp_stt_service(self, audio_file_path: str) -> str: |
| """Call MCP STT service with HTTP fallback.""" |
| try: |
| print(f"π€ MCP STT: Attempting MCP or HTTP service call for {audio_file_path}") |
| |
| |
| try: |
| from mcp import ClientSession |
| from mcp.client.stdio import stdio_client |
| |
| |
| print(f"π€ MCP STT: Trying MCP connection...") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: |
| return await self._call_http_stt_service(audio_file_path) |
| |
| |
| print(f"π€ MCP STT: No MCP client available") |
| return "ERROR: MCP STT service failed - no demo mode" |
| |
| except ImportError: |
| print(f"π€ MCP STT: MCP client not available, trying HTTP fallback") |
| |
| |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: |
| result = await self._call_http_stt_service(audio_file_path) |
| if result and not result.startswith("Error"): |
| return result |
| |
| |
| return "ERROR: No STT services available" |
| |
| except Exception as e: |
| print(f"π€ MCP STT service call error: {e}") |
| return f"ERROR: MCP STT service error - {str(e)}" |
| |
| async def _call_http_stt_service(self, audio_file_path: str) -> str: |
| """Call STT service via HTTP as fallback.""" |
| try: |
| import requests |
| |
| print(f"π€ HTTP STT: Calling service at {self.stt_http_url}") |
| |
| |
| try: |
| print(f"π€ HTTP STT: Trying direct HTTP API approach") |
| |
| |
| api_patterns = [ |
| f"{self.stt_http_url}/api/predict", |
| f"{self.stt_http_url}/call/predict", |
| f"{self.stt_http_url}/api/transcribe_audio", |
| f"{self.stt_http_url}/call/transcribe_audio" |
| ] |
| |
| for api_url in api_patterns: |
| try: |
| print(f"π€ HTTP STT: Trying API URL: {api_url}") |
| |
| with open(audio_file_path, 'rb') as audio_file: |
| |
| payload_formats = [ |
| |
| { |
| 'files': {'data': audio_file}, |
| 'data': {'data': json.dumps(["auto", "base", True])} |
| }, |
| |
| { |
| 'files': {'audio': audio_file}, |
| 'data': {'language': 'auto', 'model': 'base', 'timestamps': 'true'} |
| } |
| ] |
| |
| for i, payload in enumerate(payload_formats): |
| try: |
| audio_file.seek(0) |
| print(f"π€ HTTP STT: Trying payload format {i+1}") |
| |
| response = requests.post( |
| api_url, |
| files=payload['files'], |
| data=payload['data'], |
| timeout=60 |
| ) |
| |
| print(f"π€ HTTP STT: Response status: {response.status_code}") |
| print(f"π€ HTTP STT: Response headers: {dict(response.headers)}") |
| |
| if response.status_code == 200: |
| try: |
| result = response.json() |
| print(f"π€ HTTP STT: Response JSON: {result}") |
| |
| |
| transcription = None |
| if isinstance(result, dict): |
| if 'data' in result and len(result['data']) > 1: |
| transcription = result['data'][1] |
| elif 'transcription' in result: |
| transcription = result['transcription'] |
| elif 'text' in result: |
| transcription = result['text'] |
| elif isinstance(result, list) and len(result) > 1: |
| transcription = result[1] |
| |
| if transcription and transcription.strip(): |
| print(f"π€ HTTP STT: SUCCESS via direct API: {transcription}") |
| return transcription.strip() |
| |
| except json.JSONDecodeError as json_err: |
| print(f"π€ HTTP STT: JSON decode error: {json_err}") |
| print(f"π€ HTTP STT: Raw response: {response.text[:200]}") |
| else: |
| print(f"π€ HTTP STT: Failed with status {response.status_code}") |
| print(f"π€ HTTP STT: Error response: {response.text[:200]}") |
| |
| except Exception as payload_error: |
| print(f"π€ HTTP STT: Payload format {i+1} failed: {payload_error}") |
| continue |
| |
| except Exception as url_error: |
| print(f"π€ HTTP STT: URL {api_url} failed: {url_error}") |
| continue |
| |
| print(f"π€ HTTP STT: All direct API attempts failed") |
| |
| except Exception as direct_error: |
| print(f"π€ HTTP STT: Direct API approach failed: {direct_error}") |
| |
| |
| try: |
| print(f"π€ HTTP STT: Falling back to Gradio client...") |
| from gradio_client import Client |
| client = Client(self.stt_http_url) |
| |
| |
| gradio_endpoints = [ |
| "transcribe_audio", |
| "/transcribe_audio", |
| 0, |
| ] |
| |
| for endpoint in gradio_endpoints: |
| try: |
| print(f"π€ HTTP STT: Trying Gradio endpoint: {endpoint}") |
| |
| |
| from gradio_client import handle_file |
| file_data = handle_file(audio_file_path) |
| |
| result = client.predict( |
| file_data, |
| "auto", |
| "base", |
| True, |
| api_name=endpoint if isinstance(endpoint, str) else None, |
| fn_index=endpoint if isinstance(endpoint, int) else None |
| ) |
| |
| print(f"π€ HTTP STT: Gradio result for {endpoint}: {result}") |
| |
| if result: |
| |
| if isinstance(result, (list, tuple)) and len(result) >= 2: |
| |
| transcription = result[1] |
| elif isinstance(result, str): |
| |
| transcription = result |
| else: |
| |
| transcription = str(result) |
| |
| if transcription and transcription.strip() and not transcription.startswith("β"): |
| print(f"π€ HTTP STT: SUCCESS via Gradio endpoint {endpoint}: {transcription[:50]}...") |
| return transcription.strip() |
| |
| except Exception as endpoint_error: |
| print(f"π€ HTTP STT: Endpoint {endpoint} failed: {endpoint_error}") |
| continue |
| |
| print(f"π€ HTTP STT: All Gradio endpoints failed") |
| |
| except Exception as gradio_error: |
| print(f"π€ HTTP STT: Gradio client setup failed: {gradio_error}") |
| |
| |
| return "Error: STT service connection failed" |
| |
| except Exception as e: |
| print(f"π€ HTTP STT ERROR: {e}") |
| |
| return f"Error: STT service error - {str(e)}" |
| |
| def _get_audio_duration(self, audio_file_path: str) -> float: |
| """Get duration of audio file.""" |
| try: |
| with wave.open(audio_file_path, 'rb') as wav_file: |
| frames = wav_file.getnframes() |
| rate = wav_file.getframerate() |
| duration = frames / float(rate) |
| return duration |
| except: |
| return 5.0 |
| |
| |
| def process_audio_input(self, audio_tuple: Tuple) -> str: |
| """Process Gradio audio input format using MCP.""" |
| try: |
| print(f"π€ MCP HANDLER: Processing audio tuple: {type(audio_tuple)}") |
| print(f"π DEBUG: Audio tuple content: {audio_tuple}") |
| if audio_tuple is None or len(audio_tuple) < 2: |
| print(f"π€ MCP HANDLER: No audio received or invalid format") |
| return "ERROR: No audio received" |
| |
| |
| sample_rate, audio_array = audio_tuple |
| print(f"π€ MCP HANDLER: Sample rate: {sample_rate}, Array type: {type(audio_array)}") |
| |
| |
| if isinstance(audio_array, np.ndarray): |
| print(f"π€ MCP HANDLER: Audio array shape: {audio_array.shape}") |
| |
| |
| |
| try: |
| |
| try: |
| audio_normalized = (audio_array * 32767).astype(np.int16) |
| except ValueError as buffer_error: |
| if "buffer size must be a multiple of element size" in str(buffer_error): |
| print(f"π€ MCP HANDLER: Buffer size error - returning error") |
| audio_duration = len(audio_array) / sample_rate if len(audio_array) > 0 else 1.0 |
| return f"ERROR: Buffer size issue with audio processing ({audio_duration:.1f}s)" |
| else: |
| raise buffer_error |
| |
| |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: |
| |
| with wave.open(tmp_file.name, 'wb') as wav_file: |
| wav_file.setnchannels(1) |
| wav_file.setsampwidth(2) |
| wav_file.setframerate(sample_rate) |
| wav_file.writeframes(audio_normalized.tobytes()) |
| |
| print(f"π€ MCP HANDLER: Created temp WAV file: {tmp_file.name}") |
| |
| |
| import asyncio |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| result = loop.run_until_complete(self.speech_to_text(tmp_file.name)) |
| print(f"π€ MCP HANDLER: MCP STT result: {result}") |
| return result |
| finally: |
| loop.close() |
| |
| import os |
| try: |
| os.unlink(tmp_file.name) |
| except: |
| pass |
| except Exception as stt_error: |
| print(f"π€ MCP HANDLER ERROR: MCP STT processing failed: {stt_error}") |
| return f"ERROR: STT processing failed - {str(stt_error)}" |
| |
| print(f"π€ MCP HANDLER: Invalid audio array format") |
| return "Invalid audio format" |
| |
| except Exception as e: |
| print(f"π€ MCP HANDLER ERROR: {e}") |
| import traceback |
| traceback.print_exc() |
| logger.error(f"MCP audio processing error: {e}") |
| return f"Error processing audio: {str(e)}" |
| |
| async def text_to_speech(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: |
| """Convert text to speech using MCP TTS service.""" |
| try: |
| if not config.enable_voice_responses: |
| return None |
| |
| if not self.tts_service: |
| print(f"π MCP TTS: No TTS service available") |
| return None |
| |
| print(f"π MCP TTS: Converting text to speech via MCP: {text[:50]}...") |
| |
| |
| result = await self._call_mcp_tts_service(text, voice) |
| return result |
| |
| except Exception as e: |
| print(f"π MCP TTS ERROR: {e}") |
| logger.error(f"MCP TTS error: {e}") |
| return None |
| |
| async def _call_mcp_tts_service(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: |
| """Call MCP TTS service - placeholder for actual MCP integration.""" |
| try: |
| |
| print(f"π MCP TTS: Simulating MCP TTS service call") |
| |
| |
| |
| |
| |
| |
| |
| |
| return None |
| |
| except Exception as e: |
| print(f"π MCP TTS service call error: {e}") |
| return None |
| |
| def is_audio_service_available(self) -> Tuple[bool, bool]: |
| """Check if MCP STT and TTS services are available.""" |
| stt_available = bool(self.stt_service) |
| tts_available = bool(self.tts_service) |
| return stt_available, tts_available |
| |
| def get_audio_status(self) -> dict: |
| """Get status of MCP audio services.""" |
| stt_available, tts_available = self.is_audio_service_available() |
| |
| return { |
| "stt_available": stt_available, |
| "tts_available": tts_available, |
| "demo_mode": False, |
| "voice_responses_enabled": config.enable_voice_responses, |
| "default_voice": config.default_voice, |
| "service_type": "mcp" |
| } |
|
|
|
|
| |
| mcp_audio_handler = MCPAudioHandler() |