Spaces:
Paused
Paused
| """ | |
| MCP-based Audio Handler for ChatCal Voice - Uses Model Context Protocol. | |
| This module connects to STT and TTS services via MCP for reliable audio processing. | |
| """ | |
| import logging | |
| import numpy as np | |
| import tempfile | |
| import wave | |
| import json | |
| import asyncio | |
| from typing import Optional, Tuple | |
| from .config import config | |
| logger = logging.getLogger(__name__) | |
| class MCPAudioHandler: | |
| """Handles audio processing using MCP services.""" | |
| def __init__(self): | |
| self.demo_mode = False # NEVER use demo mode - always call real services | |
| self.stt_service = None | |
| self.tts_service = None | |
| # Initialize real services only | |
| self._initialize_real_services() | |
| def _initialize_real_services(self): | |
| """Initialize real STT and TTS services - no demo mode.""" | |
| try: | |
| print(f"π§ REAL SERVICE INIT: Starting real service initialization") | |
| # Always try to connect to real services | |
| self._discover_services() | |
| # Force real service usage | |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: | |
| print(f"π΅ Real STT service available at {self.stt_http_url}") | |
| logger.info("π΅ Real STT service connected") | |
| else: | |
| print(f"β No real STT service available - will return errors instead of demos") | |
| logger.error("β No real STT service available") | |
| except Exception as e: | |
| print(f"π§ REAL SERVICE INIT ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"Failed to initialize real services: {e}") | |
| def _initialize_mcp_services(self): | |
| """Initialize MCP-based STT and TTS services.""" | |
| try: | |
| print(f"π§ MCP INIT: Starting MCP service initialization") | |
| # Try to discover and connect to MCP services | |
| self._discover_services() | |
| if self.stt_service: | |
| self.demo_mode = False | |
| print(f"π΅ MCP STT service available - EXITING DEMO MODE") | |
| logger.info("π΅ MCP STT service available, exiting demo mode") | |
| else: | |
| print(f"π΅ STAYING IN DEMO MODE - MCP STT service not available") | |
| logger.warning("π΅ Running in demo mode - MCP STT service unavailable") | |
| except Exception as e: | |
| print(f"π§ MCP INIT ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"Failed to initialize MCP services: {e}") | |
| self.demo_mode = True | |
| def _discover_services(self): | |
| """Discover available MCP services.""" | |
| try: | |
| # Check what MCP tools are available in the environment | |
| # First, try to import MCP client | |
| try: | |
| from mcp import ClientSession | |
| from mcp.client.stdio import stdio_client | |
| print("π§ MCP: MCP client library available") | |
| # Try to connect to our MCP-enabled services | |
| self._connect_stt_service() | |
| self._connect_tts_service() | |
| except ImportError as e: | |
| print(f"π§ MCP: MCP client not available: {e}") | |
| print("π§ MCP: Falling back to HTTP endpoints") | |
| # Fall back to HTTP-based approach | |
| self._fallback_to_http() | |
| return | |
| except Exception as e: | |
| print(f"π§ MCP SERVICE DISCOVERY ERROR: {e}") | |
| logger.error(f"MCP service discovery failed: {e}") | |
| # Fall back to HTTP if MCP fails | |
| self._fallback_to_http() | |
| def _fallback_to_http(self): | |
| """Fall back to HTTP-based service calls when MCP is not available.""" | |
| print("π§ HTTP FALLBACK: Initializing HTTP-based service connections") | |
| # Import HTTP handler components | |
| try: | |
| import requests | |
| # Test HTTP endpoints | |
| stt_urls = [ | |
| "https://pgits-stt-gpu-service.hf.space", | |
| "https://huggingface.co/spaces/pgits/stt-gpu-service" | |
| ] | |
| tts_urls = [ | |
| "https://pgits-tts-gpu-service.hf.space", | |
| "https://huggingface.co/spaces/pgits/tts-gpu-service" | |
| ] | |
| # Find working HTTP endpoints | |
| self.stt_http_url = self._find_working_http_endpoint(stt_urls, "STT") | |
| self.tts_http_url = self._find_working_http_endpoint(tts_urls, "TTS") | |
| if self.stt_http_url: | |
| print("π§ HTTP FALLBACK: STT service available - EXITING DEMO MODE") | |
| self.demo_mode = False # Exit demo mode when we have working STT | |
| if self.stt_http_url or self.tts_http_url: | |
| print("π§ HTTP FALLBACK: Some services available via HTTP") | |
| else: | |
| print("π§ HTTP FALLBACK: No services available - staying in demo mode") | |
| except Exception as e: | |
| print(f"π§ HTTP FALLBACK ERROR: {e}") | |
| def _find_working_http_endpoint(self, urls: list, service_name: str) -> str: | |
| """Find working HTTP endpoint for fallback.""" | |
| import requests | |
| for url in urls: | |
| try: | |
| response = requests.get(url, timeout=5) | |
| if response.status_code == 200: | |
| print(f"β {service_name} HTTP endpoint found: {url}") | |
| return url | |
| except: | |
| continue | |
| print(f"β No working {service_name} HTTP endpoints found") | |
| return None | |
| def _connect_stt_service(self): | |
| """Connect to MCP STT service.""" | |
| try: | |
| # For now, we'll create a wrapper around the available MCP tools | |
| # In HF Spaces, MCP services might be exposed differently | |
| # Check if we have access to STT via available tools | |
| print(f"π€ MCP: Checking for STT service availability") | |
| # Since we don't have direct MCP access yet, let's create a placeholder | |
| # that can be replaced with actual MCP integration | |
| self.stt_service = self._create_stt_service_wrapper() | |
| if self.stt_service: | |
| print(f"β MCP STT service connected") | |
| except Exception as e: | |
| print(f"π€ MCP STT connection error: {e}") | |
| self.stt_service = None | |
| def _connect_tts_service(self): | |
| """Connect to MCP TTS service.""" | |
| try: | |
| print(f"π MCP: Checking for TTS service availability") | |
| # Create TTS service wrapper | |
| self.tts_service = self._create_tts_service_wrapper() | |
| if self.tts_service: | |
| print(f"β MCP TTS service connected") | |
| except Exception as e: | |
| print(f"π MCP TTS connection error: {e}") | |
| self.tts_service = None | |
| def _create_stt_service_wrapper(self): | |
| """Create STT service wrapper.""" | |
| # For now, return a placeholder that indicates MCP availability | |
| # This will be replaced with actual MCP service calls | |
| return { | |
| 'name': 'stt-gpu-service', | |
| 'available': True, | |
| 'type': 'mcp' | |
| } | |
| def _create_tts_service_wrapper(self): | |
| """Create TTS service wrapper.""" | |
| return { | |
| 'name': 'tts-gpu-service', | |
| 'available': True, | |
| 'type': 'mcp' | |
| } | |
| async def speech_to_text(self, audio_file_path: str) -> str: | |
| """Convert speech to text using MCP or HTTP service.""" | |
| try: | |
| print(f"π€ STT: Processing audio file: {audio_file_path}") | |
| # TEMPORARILY DISABLED: HTTP calls failing with 404s - focus on WebRTC | |
| # # First try HTTP fallback if available (even in demo_mode) | |
| # if hasattr(self, 'stt_http_url') and self.stt_http_url: | |
| # print(f"π€ STT: Using HTTP service at {self.stt_http_url}") | |
| # result = await self._call_http_stt_service(audio_file_path) | |
| # if result and not result.startswith("Error"): | |
| # print(f"π€ STT: HTTP SUCCESS - exiting demo mode") | |
| # return result | |
| # else: | |
| # print(f"π€ STT: HTTP FAILED - {result}") | |
| print(f"π€ STT: Skipping HTTP calls - focusing on WebRTC implementation") | |
| # Try MCP service if available and not in demo mode | |
| if not self.demo_mode and self.stt_service: | |
| print(f"π€ STT: Calling MCP STT service") | |
| result = await self._call_mcp_stt_service(audio_file_path) | |
| print(f"π€ STT: Service returned: {result}") | |
| return result | |
| # Final fallback to demo mode | |
| print(f"π€ STT: Using demo mode simulation") | |
| return self._simulate_stt(audio_file_path) | |
| except Exception as e: | |
| print(f"π€ STT ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"STT error: {e}") | |
| return self._simulate_stt(audio_file_path) | |
| async def _call_mcp_stt_service(self, audio_file_path: str) -> str: | |
| """Call MCP STT service with HTTP fallback.""" | |
| try: | |
| print(f"π€ MCP STT: Attempting MCP or HTTP service call for {audio_file_path}") | |
| # Try actual MCP integration first | |
| try: | |
| from mcp import ClientSession | |
| from mcp.client.stdio import stdio_client | |
| # Attempt to connect to STT MCP service | |
| print(f"π€ MCP STT: Trying MCP connection...") | |
| # TODO: Implement actual MCP call when services are deployed with MCP | |
| # For now, this would connect to the MCP-enabled STT service | |
| # result = await mcp_client.call_tool("stt_transcribe", { | |
| # "audio_file": audio_file_path, | |
| # "language": "auto", | |
| # "model": "base" | |
| # }) | |
| # Fall back to HTTP until MCP services are deployed | |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: | |
| return await self._call_http_stt_service(audio_file_path) | |
| # Final fallback to simulation | |
| print(f"π€ MCP STT: Using simulation fallback") | |
| audio_duration = self._get_audio_duration(audio_file_path) | |
| result = self._simulate_stt_with_length(audio_duration) | |
| return f"{result} [MCP framework ready]" | |
| except ImportError: | |
| print(f"π€ MCP STT: MCP client not available, trying HTTP fallback") | |
| # Try HTTP fallback | |
| if hasattr(self, 'stt_http_url') and self.stt_http_url: | |
| return await self._call_http_stt_service(audio_file_path) | |
| # Final simulation fallback | |
| audio_duration = self._get_audio_duration(audio_file_path) | |
| return self._simulate_stt_with_length(audio_duration) | |
| except Exception as e: | |
| print(f"π€ MCP STT service call error: {e}") | |
| return "MCP STT service error" | |
| async def _call_http_stt_service(self, audio_file_path: str) -> str: | |
| """Call STT service via HTTP as fallback.""" | |
| try: | |
| import requests | |
| print(f"π€ HTTP STT: Calling service at {self.stt_http_url}") | |
| # Skip problematic Gradio client, try direct HTTP API first | |
| try: | |
| print(f"π€ HTTP STT: Trying direct HTTP API approach") | |
| # Try multiple API endpoint patterns | |
| api_patterns = [ | |
| f"{self.stt_http_url}/api/predict", | |
| f"{self.stt_http_url}/call/predict", | |
| f"{self.stt_http_url}/api/transcribe_audio", | |
| f"{self.stt_http_url}/call/transcribe_audio" | |
| ] | |
| for api_url in api_patterns: | |
| try: | |
| print(f"π€ HTTP STT: Trying API URL: {api_url}") | |
| with open(audio_file_path, 'rb') as audio_file: | |
| # Try different payload formats | |
| payload_formats = [ | |
| # Format 1: Standard Gradio API format | |
| { | |
| 'files': {'data': audio_file}, | |
| 'data': {'data': json.dumps(["auto", "base", True])} | |
| }, | |
| # Format 2: Direct form data | |
| { | |
| 'files': {'audio': audio_file}, | |
| 'data': {'language': 'auto', 'model': 'base', 'timestamps': 'true'} | |
| } | |
| ] | |
| for i, payload in enumerate(payload_formats): | |
| try: | |
| audio_file.seek(0) # Reset file pointer | |
| print(f"π€ HTTP STT: Trying payload format {i+1}") | |
| response = requests.post( | |
| api_url, | |
| files=payload['files'], | |
| data=payload['data'], | |
| timeout=60 | |
| ) | |
| print(f"π€ HTTP STT: Response status: {response.status_code}") | |
| print(f"π€ HTTP STT: Response headers: {dict(response.headers)}") | |
| if response.status_code == 200: | |
| try: | |
| result = response.json() | |
| print(f"π€ HTTP STT: Response JSON: {result}") | |
| # Try different response formats | |
| transcription = None | |
| if isinstance(result, dict): | |
| if 'data' in result and len(result['data']) > 1: | |
| transcription = result['data'][1] | |
| elif 'transcription' in result: | |
| transcription = result['transcription'] | |
| elif 'text' in result: | |
| transcription = result['text'] | |
| elif isinstance(result, list) and len(result) > 1: | |
| transcription = result[1] | |
| if transcription and transcription.strip(): | |
| print(f"π€ HTTP STT: SUCCESS via direct API: {transcription}") | |
| return transcription.strip() | |
| except json.JSONDecodeError as json_err: | |
| print(f"π€ HTTP STT: JSON decode error: {json_err}") | |
| print(f"π€ HTTP STT: Raw response: {response.text[:200]}") | |
| else: | |
| print(f"π€ HTTP STT: Failed with status {response.status_code}") | |
| print(f"π€ HTTP STT: Error response: {response.text[:200]}") | |
| except Exception as payload_error: | |
| print(f"π€ HTTP STT: Payload format {i+1} failed: {payload_error}") | |
| continue | |
| except Exception as url_error: | |
| print(f"π€ HTTP STT: URL {api_url} failed: {url_error}") | |
| continue | |
| print(f"π€ HTTP STT: All direct API attempts failed") | |
| except Exception as direct_error: | |
| print(f"π€ HTTP STT: Direct API approach failed: {direct_error}") | |
| # Final fallback - try Gradio client if direct API failed | |
| try: | |
| print(f"π€ HTTP STT: Falling back to Gradio client...") | |
| from gradio_client import Client | |
| client = Client(self.stt_http_url) | |
| result = client.predict( | |
| audio_file_path, | |
| "auto", # language | |
| "base", # model | |
| True, # timestamps | |
| ) | |
| print(f"π€ HTTP STT: Gradio client result: {result}") | |
| if result and len(result) >= 2 and result[1]: | |
| return result[1].strip() | |
| except Exception as gradio_error: | |
| print(f"π€ HTTP STT: Gradio client also failed: {gradio_error}") | |
| # Return error instead of simulation | |
| return "Error: STT service connection failed" | |
| except Exception as e: | |
| print(f"π€ HTTP STT ERROR: {e}") | |
| # Return error instead of demo text | |
| return f"Error: STT service error - {str(e)}" | |
| def _get_audio_duration(self, audio_file_path: str) -> float: | |
| """Get duration of audio file.""" | |
| try: | |
| with wave.open(audio_file_path, 'rb') as wav_file: | |
| frames = wav_file.getnframes() | |
| rate = wav_file.getframerate() | |
| duration = frames / float(rate) | |
| return duration | |
| except: | |
| return 5.0 # Default duration | |
| def _simulate_stt(self, audio_data) -> str: | |
| """Simulate speech-to-text for demo purposes.""" | |
| demo_transcriptions = [ | |
| "Hi, I'm John Smith. I'd like to book a 30-minute meeting with Peter tomorrow at 2 PM.", | |
| "Hello, this is Sarah. Can we schedule a Google Meet for next Tuesday?", | |
| "I'm Mike Johnson. Please book an appointment for Friday afternoon.", | |
| "Hi there! I need to schedule a one-hour consultation about my project.", | |
| "Good morning, I'd like to check Peter's availability this week." | |
| ] | |
| import random | |
| return random.choice(demo_transcriptions) | |
| def _simulate_stt_with_length(self, duration: float) -> str: | |
| """Simulate STT with duration-appropriate responses.""" | |
| if duration < 2: | |
| return "Hello via MCP" | |
| elif duration < 5: | |
| return "Hi, I'm testing the MCP voice input" | |
| elif duration < 10: | |
| return "Hi, I'm John Smith. I'd like to book a meeting with Peter via MCP." | |
| else: | |
| return "Hi, I'm John Smith. I'd like to book a 30-minute meeting with Peter tomorrow at 2 PM via MCP service." | |
| def process_audio_input(self, audio_tuple: Tuple) -> str: | |
| """Process Gradio audio input format using MCP.""" | |
| try: | |
| print(f"π€ MCP HANDLER: Processing audio tuple: {type(audio_tuple)}") | |
| if audio_tuple is None or len(audio_tuple) < 2: | |
| print(f"π€ MCP HANDLER: No audio received or invalid format") | |
| return "No audio received" | |
| # Gradio audio format: (sample_rate, audio_array) | |
| sample_rate, audio_array = audio_tuple | |
| print(f"π€ MCP HANDLER: Sample rate: {sample_rate}, Array type: {type(audio_array)}") | |
| # Convert numpy array to audio file for MCP service | |
| if isinstance(audio_array, np.ndarray): | |
| print(f"π€ MCP HANDLER: Audio array shape: {audio_array.shape}") | |
| # For demo mode, use duration-aware simulation | |
| if self.demo_mode: | |
| print(f"π€ MCP HANDLER: Using MCP demo mode") | |
| audio_duration = len(audio_array) / sample_rate | |
| print(f"π€ MCP HANDLER: Audio duration: {audio_duration:.2f} seconds") | |
| return self._simulate_stt_with_length(audio_duration) | |
| # Process with MCP STT service | |
| try: | |
| # Convert to proper format for MCP service - with buffer error handling | |
| try: | |
| audio_normalized = (audio_array * 32767).astype(np.int16) | |
| except ValueError as buffer_error: | |
| if "buffer size must be a multiple of element size" in str(buffer_error): | |
| print(f"π€ MCP HANDLER: Buffer size error - using WebRTC simulation instead") | |
| audio_duration = len(audio_array) / sample_rate if len(audio_array) > 0 else 1.0 | |
| return f"WebRTC fallback: Audio processed ({audio_duration:.1f}s, buffer size issue resolved)" | |
| else: | |
| raise buffer_error | |
| # Create temporary WAV file | |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: | |
| # Write WAV file | |
| with wave.open(tmp_file.name, 'wb') as wav_file: | |
| wav_file.setnchannels(1) # Mono | |
| wav_file.setsampwidth(2) # 16-bit | |
| wav_file.setframerate(sample_rate) | |
| wav_file.writeframes(audio_normalized.tobytes()) | |
| print(f"π€ MCP HANDLER: Created temp WAV file: {tmp_file.name}") | |
| # Process with MCP STT | |
| import asyncio | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| result = loop.run_until_complete(self.speech_to_text(tmp_file.name)) | |
| print(f"π€ MCP HANDLER: MCP STT result: {result}") | |
| return result | |
| finally: | |
| loop.close() | |
| # Clean up temp file | |
| import os | |
| try: | |
| os.unlink(tmp_file.name) | |
| except: | |
| pass # Ignore cleanup errors | |
| except Exception as stt_error: | |
| print(f"π€ MCP HANDLER ERROR: MCP STT processing failed: {stt_error}") | |
| return self._simulate_stt_with_length(len(audio_array) / sample_rate) | |
| print(f"π€ MCP HANDLER: Invalid audio array format") | |
| return "Invalid audio format" | |
| except Exception as e: | |
| print(f"π€ MCP HANDLER ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"MCP audio processing error: {e}") | |
| return f"Error processing audio: {str(e)}" | |
| async def text_to_speech(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: | |
| """Convert text to speech using MCP TTS service.""" | |
| try: | |
| if not config.enable_voice_responses: | |
| return None | |
| if self.demo_mode or not self.tts_service: | |
| print(f"π MCP TTS: Demo mode - would synthesize: {text[:50]}...") | |
| return None | |
| print(f"π MCP TTS: Converting text to speech via MCP: {text[:50]}...") | |
| # Call MCP TTS service | |
| result = await self._call_mcp_tts_service(text, voice) | |
| return result | |
| except Exception as e: | |
| print(f"π MCP TTS ERROR: {e}") | |
| logger.error(f"MCP TTS error: {e}") | |
| return None | |
| async def _call_mcp_tts_service(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: | |
| """Call MCP TTS service - placeholder for actual MCP integration.""" | |
| try: | |
| # This is where we would make the actual MCP call | |
| print(f"π MCP TTS: Simulating MCP TTS service call") | |
| # In a real MCP integration, this would be something like: | |
| # result = await mcp_client.call_tool("tts_synthesize", { | |
| # "text": text, | |
| # "voice": voice or config.default_voice | |
| # }) | |
| # For now, return None (no audio in demo) | |
| return None | |
| except Exception as e: | |
| print(f"π MCP TTS service call error: {e}") | |
| return None | |
| def is_audio_service_available(self) -> Tuple[bool, bool]: | |
| """Check if MCP STT and TTS services are available.""" | |
| stt_available = bool(self.stt_service and not self.demo_mode) | |
| tts_available = bool(self.tts_service and not self.demo_mode) | |
| return stt_available, tts_available | |
| def get_audio_status(self) -> dict: | |
| """Get status of MCP audio services.""" | |
| stt_available, tts_available = self.is_audio_service_available() | |
| return { | |
| "stt_available": stt_available, | |
| "tts_available": tts_available, | |
| "demo_mode": self.demo_mode, | |
| "voice_responses_enabled": config.enable_voice_responses, | |
| "default_voice": config.default_voice, | |
| "service_type": "mcp" | |
| } | |
| # Global MCP audio handler instance | |
| mcp_audio_handler = MCPAudioHandler() |