voiceCalendar / core /mcp_audio_handler.py
Peter Michael Gits
feat: Add Streamlit-native WebRTC speech-to-text using unmute.sh patterns
21fac9b
"""
MCP-based Audio Handler for ChatCal Voice - Uses Model Context Protocol.
This module connects to STT and TTS services via MCP for reliable audio processing.
"""
import logging
import numpy as np
import tempfile
import wave
import json
import asyncio
from typing import Optional, Tuple
from .config import config
logger = logging.getLogger(__name__)
class MCPAudioHandler:
"""Handles audio processing using MCP services."""
def __init__(self):
self.stt_service = None
self.tts_service = None
# Initialize real services only - NO DEMO MODE
self._initialize_real_services()
def _initialize_real_services(self):
"""Initialize real STT and TTS services - no demo mode."""
try:
print(f"πŸ”§ REAL SERVICE INIT: Starting real service initialization")
# Always try to connect to real services
self._discover_services()
# Force real service usage
if hasattr(self, 'stt_http_url') and self.stt_http_url:
print(f"🎡 Real STT service available at {self.stt_http_url}")
logger.info("🎡 Real STT service connected")
else:
print(f"❌ No real STT service available - will return errors instead of demos")
logger.error("❌ No real STT service available")
except Exception as e:
print(f"πŸ”§ REAL SERVICE INIT ERROR: {e}")
import traceback
traceback.print_exc()
logger.error(f"Failed to initialize real services: {e}")
def _initialize_mcp_services(self):
"""Initialize MCP-based STT and TTS services."""
try:
print(f"πŸ”§ MCP INIT: Starting MCP service initialization")
# Try to discover and connect to MCP services
self._discover_services()
if self.stt_service:
self.demo_mode = False
print(f"🎡 MCP STT service available - EXITING DEMO MODE")
logger.info("🎡 MCP STT service available, exiting demo mode")
else:
print(f"🎡 STAYING IN DEMO MODE - MCP STT service not available")
logger.warning("🎡 Running in demo mode - MCP STT service unavailable")
except Exception as e:
print(f"πŸ”§ MCP INIT ERROR: {e}")
import traceback
traceback.print_exc()
logger.error(f"Failed to initialize MCP services: {e}")
self.demo_mode = True
def _discover_services(self):
"""Discover available MCP services."""
try:
# ALWAYS initialize WebSocket and HTTP fallbacks first
# This ensures we have working service connections regardless of MCP status
self._initialize_websocket_and_http_services()
# Check what MCP tools are available in the environment
# First, try to import MCP client
try:
from mcp import ClientSession
from mcp.client.stdio import stdio_client
print("πŸ”§ MCP: MCP client library available")
# Try to connect to our MCP-enabled services
self._connect_stt_service()
self._connect_tts_service()
except ImportError as e:
print(f"πŸ”§ MCP: MCP client not available: {e}")
print("πŸ”§ MCP: Using WebSocket/HTTP services (already initialized)")
return
except Exception as e:
print(f"πŸ”§ MCP SERVICE DISCOVERY ERROR: {e}")
logger.error(f"MCP service discovery failed: {e}")
print("πŸ”§ MCP: Using WebSocket/HTTP services (already initialized)")
def _initialize_websocket_and_http_services(self):
"""Initialize WebSocket and HTTP-based service connections (primary method)."""
print("πŸ”§ WEBSOCKET/HTTP INIT: Initializing WebSocket and HTTP-based service connections")
# Import HTTP handler components
try:
import requests
# Updated service URLs following unmute.sh methodology
stt_websocket_urls = [
"wss://pgits-stt-gpu-service.hf.space/ws/stt",
"ws://localhost:7860/ws/stt" # For local development
]
stt_http_urls = [
"https://pgits-stt-gpu-service.hf.space",
"https://huggingface.co/spaces/pgits/stt-gpu-service"
]
tts_urls = [
"https://pgits-tts-gpu-service.hf.space",
"https://huggingface.co/spaces/pgits/tts-gpu-service"
]
# Store WebSocket URLs (will be used for real-time audio)
self.stt_websocket_urls = stt_websocket_urls
# Find working HTTP endpoints as fallback
self.stt_http_url = self._find_working_http_endpoint(stt_http_urls, "STT")
self.tts_http_url = self._find_working_http_endpoint(tts_urls, "TTS")
# Preload STT service to warm up the model and avoid cold start delays
if self.stt_http_url:
self._preload_stt_service()
# Check if we have any working STT service (WebSocket preferred, HTTP fallback)
if self.stt_websocket_urls or self.stt_http_url:
print("πŸ”§ WEBSOCKET/HTTP FALLBACK: STT service available")
if self.stt_http_url or self.tts_http_url or self.stt_websocket_urls:
print("πŸ”§ WEBSOCKET/HTTP FALLBACK: Services available")
else:
print("πŸ”§ WEBSOCKET/HTTP FALLBACK: No services available - WILL RETURN ERRORS")
except Exception as e:
print(f"πŸ”§ WEBSOCKET/HTTP FALLBACK ERROR: {e}")
def _preload_stt_service(self):
"""Preload STT service to warm up the model and avoid cold start delays."""
try:
import requests
import time
preload_url = f"{self.stt_http_url}/preload?model_size=base"
print(f"πŸ”§ STT PRELOAD: Warming up STT service at {preload_url}")
start_time = time.time()
response = requests.get(preload_url, timeout=30)
elapsed_time = time.time() - start_time
if response.status_code == 200:
result = response.json()
if result.get("status") == "success":
print(f"βœ… STT PRELOAD: Model loaded successfully in {elapsed_time:.1f}s - {result.get('message', 'Ready')}")
logger.info(f"STT service preloaded successfully: {result.get('message')}")
else:
print(f"⚠️ STT PRELOAD: {result.get('message', 'Unknown response')}")
logger.warning(f"STT preload response: {result}")
else:
print(f"⚠️ STT PRELOAD: HTTP {response.status_code} - Service may still be starting")
logger.warning(f"STT preload failed: HTTP {response.status_code}")
except requests.exceptions.Timeout:
print(f"⏰ STT PRELOAD: Timeout after 30s - Service may be cold starting, will try again later")
logger.warning("STT preload timed out - service may be cold starting")
except Exception as e:
print(f"⚠️ STT PRELOAD: Error warming up service - {e}")
logger.warning(f"STT preload error: {e}")
def _find_working_http_endpoint(self, urls: list, service_name: str) -> str:
"""Find working HTTP endpoint for fallback."""
import requests
for url in urls:
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
print(f"βœ… {service_name} HTTP endpoint found: {url}")
return url
except:
continue
print(f"❌ No working {service_name} HTTP endpoints found")
return None
def _connect_stt_service(self):
"""Connect to MCP STT service."""
try:
# For now, we'll create a wrapper around the available MCP tools
# In HF Spaces, MCP services might be exposed differently
# Check if we have access to STT via available tools
print(f"🎀 MCP: Checking for STT service availability")
# Since we don't have direct MCP access yet, let's create a placeholder
# that can be replaced with actual MCP integration
self.stt_service = self._create_stt_service_wrapper()
if self.stt_service:
print(f"βœ… MCP STT service connected")
except Exception as e:
print(f"🎀 MCP STT connection error: {e}")
self.stt_service = None
def _connect_tts_service(self):
"""Connect to MCP TTS service."""
try:
print(f"πŸ”Š MCP: Checking for TTS service availability")
# Create TTS service wrapper
self.tts_service = self._create_tts_service_wrapper()
if self.tts_service:
print(f"βœ… MCP TTS service connected")
except Exception as e:
print(f"πŸ”Š MCP TTS connection error: {e}")
self.tts_service = None
def _create_stt_service_wrapper(self):
"""Create STT service wrapper."""
# For now, return a placeholder that indicates MCP availability
# This will be replaced with actual MCP service calls
return {
'name': 'stt-gpu-service',
'available': True,
'type': 'mcp'
}
def _create_tts_service_wrapper(self):
"""Create TTS service wrapper."""
return {
'name': 'tts-gpu-service',
'available': True,
'type': 'mcp'
}
async def speech_to_text(self, audio_file_path: str) -> str:
"""Convert speech to text using REAL SERVICES ONLY - no demo mode."""
try:
print(f"🎀 STT: Processing audio file: {audio_file_path}")
# DEBUG: Check WebSocket URLs availability
print(f"πŸ” DEBUG: hasattr(self, 'stt_websocket_urls'): {hasattr(self, 'stt_websocket_urls')}")
if hasattr(self, 'stt_websocket_urls'):
print(f"πŸ” DEBUG: self.stt_websocket_urls: {self.stt_websocket_urls}")
print(f"πŸ” DEBUG: bool(self.stt_websocket_urls): {bool(self.stt_websocket_urls)}")
# First try WebSocket connection (unmute.sh methodology)
if hasattr(self, 'stt_websocket_urls') and self.stt_websocket_urls:
print(f"🎀 STT: Trying WebSocket connection (unmute.sh style)")
result = await self._call_websocket_stt_service(audio_file_path)
if result and not result.startswith("Error"):
print(f"🎀 STT: WebSocket SUCCESS - {result[:50]}...")
return result
else:
print(f"🎀 STT: WebSocket FAILED - {result}")
else:
print(f"🚨 STT: WebSocket URLs not available - falling back to MCP")
# Try MCP service if available
if self.stt_service:
print(f"🎀 STT: Calling MCP STT service")
result = await self._call_mcp_stt_service(audio_file_path)
if result and not result.startswith("Error"):
print(f"🎀 STT: MCP SUCCESS - {result[:50]}...")
return result
else:
print(f"🎀 STT: MCP FAILED - {result}")
# HTTP fallback if WebSocket and MCP failed
if hasattr(self, 'stt_http_url') and self.stt_http_url:
print(f"🎀 STT: Using HTTP fallback service at {self.stt_http_url}")
result = await self._call_http_stt_service(audio_file_path)
if result and not result.startswith("Error"):
print(f"🎀 STT: HTTP SUCCESS - {result[:50]}...")
return result
else:
print(f"🎀 STT: HTTP FAILED - {result}")
# NO DEMO MODE - Return error if all services failed
error_msg = "ERROR: All STT services failed - no demo mode available"
print(f"🎀 STT: {error_msg}")
return error_msg
except Exception as e:
print(f"🎀 STT ERROR: {e}")
import traceback
traceback.print_exc()
logger.error(f"STT error: {e}")
return f"ERROR: STT processing failed - {str(e)}"
async def _call_websocket_stt_service(self, audio_file_path: str) -> str:
"""Call STT service via WebSocket using unmute.sh methodology."""
try:
import websockets
import base64
import json
import os
print(f"🎀 WebSocket STT: Processing {audio_file_path}")
print(f"πŸ” DEBUG: File exists: {os.path.exists(audio_file_path)}")
# Read audio file and encode for WebSocket transmission
with open(audio_file_path, 'rb') as audio_file:
audio_data = audio_file.read()
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
print(f"πŸ” DEBUG: Audio file size: {len(audio_data)} bytes, path: {audio_file_path}")
print(f"πŸ” DEBUG: First 20 bytes: {audio_data[:20].hex() if len(audio_data) >= 20 else 'N/A'}")
print(f"πŸ” DEBUG: Base64 length: {len(audio_base64)} chars")
# Audio format detection
if audio_data.startswith(b'RIFF'):
print(f"πŸ” DEBUG: Audio format: WAV")
elif audio_data.startswith(b'\xff\xfb') or audio_data.startswith(b'ID3'):
print(f"πŸ” DEBUG: Audio format: MP3")
elif audio_data.startswith(b'OggS'):
print(f"πŸ” DEBUG: Audio format: OGG")
elif audio_data.startswith(b'\x1a\x45\xdf\xa3'):
print(f"πŸ” DEBUG: Audio format: WebM")
else:
print(f"πŸ” DEBUG: Audio format: Unknown - first 4 bytes: {audio_data[:4].hex()}")
print(f"πŸ” DEBUG: Available WebSocket URLs: {self.stt_websocket_urls}")
# Try each WebSocket URL with sophisticated retry logic
for i, ws_url in enumerate(self.stt_websocket_urls):
result = await self._connect_websocket_with_retry(ws_url, audio_base64)
if result and not result.startswith("Error"):
return result
except ImportError:
print("🎀 WebSocket STT: websockets library not available")
return "Error: WebSocket support not available"
except Exception as e:
print(f"🎀 WebSocket STT: Fatal error in _call_websocket_stt_service: {e}")
print(f"πŸ” DEBUG: Exception type: {type(e).__name__}")
import traceback
traceback.print_exc()
return f"Error: WebSocket STT fatal error - {str(e)}"
async def _connect_websocket_with_retry(self, ws_url: str, audio_base64: str) -> str:
"""Connect to WebSocket with exponential backoff retry logic for GPU cold start calibration."""
import websockets
import asyncio
import json
import ssl
import time
# 🎯 RETRY CALIBRATION CONFIGURATION
# Based on HF Spaces ZeroGPU cold start patterns
max_retries = 8 # Up to 8 attempts total
base_delay = 3.0 # Start with 3 second delay
max_delay = 45.0 # Cap at 45 seconds (HF Spaces usually under 60s)
backoff_multiplier = 1.4 # Moderate exponential backoff (not too aggressive)
connection_timeout = 30.0 # 30s connection timeout per attempt
# πŸ“Š TIMING CALIBRATION THRESHOLDS
cold_start_threshold = 30.0 # >30s = likely cold start
warm_service_threshold = 8.0 # <8s = warm service
gpu_loading_threshold = 60.0 # >60s = GPU loading issues
start_time = time.time()
for attempt in range(1, max_retries + 1):
attempt_start = time.time()
elapsed_total = attempt_start - start_time
# Calculate delay for this attempt (exponential backoff)
if attempt > 1:
delay = min(base_delay * (backoff_multiplier ** (attempt - 2)), max_delay)
print(f"⏰ RETRY CALIBRATION: Waiting {delay:.1f}s before attempt {attempt}/{max_retries} (elapsed: {elapsed_total:.1f}s)")
await asyncio.sleep(delay)
try:
print(f"🎀 WebSocket STT: Attempt {attempt}/{max_retries} - Connecting to {ws_url}")
# Create SSL context for secure connections
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Connection parameters optimized for HF Spaces
connect_kwargs = {
"ping_interval": None,
"ping_timeout": 20,
"close_timeout": 10,
"max_size": 10 * 1024 * 1024, # 10MB max message size
"compression": None # Disable compression for audio
}
# Add SSL context for wss:// URLs
if ws_url.startswith("wss://"):
connect_kwargs["ssl"] = ssl_context
# Attempt connection with timeout and detailed timing
connection_start = time.time()
async with asyncio.timeout(connection_timeout):
async with websockets.connect(ws_url, **connect_kwargs) as websocket:
connection_time = time.time() - connection_start
total_elapsed = time.time() - start_time
# πŸ“Š CONNECTION TIMING CALIBRATION
if connection_time < warm_service_threshold:
print(f"⚑ TIMING CALIBRATION: Fast connection ({connection_time:.1f}s) - Service was warm")
elif connection_time < cold_start_threshold:
print(f"πŸ”„ TIMING CALIBRATION: Normal connection ({connection_time:.1f}s) - Service warming up")
else:
print(f"❄️ TIMING CALIBRATION: Slow connection ({connection_time:.1f}s) - Cold start detected")
if total_elapsed > gpu_loading_threshold:
print(f"🐌 TIMING CALIBRATION: Long total wait ({total_elapsed:.1f}s) - GPU loading issues")
print(f"πŸ” RETRY SUCCESS: Connected on attempt {attempt}/{max_retries} after {total_elapsed:.1f}s total")
print(f"πŸ” DEBUG: WebSocket connected successfully to {ws_url}")
# Check WebSocket state
print(f"πŸ” DEBUG: WebSocket state: {websocket.state}")
print(f"πŸ” DEBUG: WebSocket ping interval: {websocket.ping_interval}")
# IMPORTANT: STT service sends connection confirmation first
print(f"🎀 WebSocket STT: Waiting for connection confirmation...")
try:
confirm_response = await asyncio.wait_for(websocket.recv(), timeout=10.0)
confirm_result = json.loads(confirm_response)
print(f"🎀 WebSocket STT: Connection confirmation: {confirm_result}")
if confirm_result.get("type") != "stt_connection_confirmed":
print(f"🎀 WebSocket STT: Unexpected confirmation type: {confirm_result.get('type')}")
continue # Try next URL
except asyncio.TimeoutError:
print(f"🎀 WebSocket STT: Connection confirmation timeout for {ws_url}")
continue
except json.JSONDecodeError as e:
print(f"🎀 WebSocket STT: Invalid confirmation JSON from {ws_url}: {e}")
continue
# Now send audio chunk with unmute.sh protocol
message = {
"type": "stt_audio_chunk",
"audio_data": audio_base64,
"language": "auto",
"model_size": "base",
"is_final": True # Use flush trick for best performance
}
print(f"πŸ” DEBUG: Preparing to send message with keys: {list(message.keys())}")
print(f"πŸ” DEBUG: Message type: {message['type']}")
print(f"πŸ” DEBUG: Audio data length: {len(message['audio_data'])}")
print(f"πŸ” DEBUG: Model size: {message['model_size']}")
print(f"πŸ” DEBUG: Is final: {message['is_final']}")
message_json = json.dumps(message)
print(f"πŸ” DEBUG: JSON message size: {len(message_json)} chars")
print(f"🎀 WebSocket STT: Sending message to {ws_url}...")
await websocket.send(message_json)
print(f"🎀 WebSocket STT: Message sent successfully - audio data ({len(audio_data)} bytes)")
print(f"🎀 WebSocket STT: Waiting for response (timeout: 30s)...")
# Wait for response with timeout to prevent hanging
try:
response = await asyncio.wait_for(websocket.recv(), timeout=30.0)
print(f"πŸ” DEBUG: Raw response received: {response[:500]}...") # First 500 chars
result = json.loads(response)
print(f"🎀 WebSocket STT: Parsed response: {result}")
if result.get("type") == "stt_transcription" and result.get("text"):
transcription = result["text"].strip()
processing_time = result.get("processing_time", 0)
rtf = result.get("real_time_factor", 0)
print(f"🎀 WebSocket STT: SUCCESS - {transcription[:50]}... (RTF: {rtf:.2f}x)")
return transcription
elif result.get("type") == "stt_error":
error_msg = result.get("message", "Unknown error")
print(f"🎀 WebSocket STT: Service error - {error_msg}")
print(f"πŸ” DEBUG: Full error response: {result}")
continue # Try next URL
else:
print(f"🎀 WebSocket STT: Unexpected response format")
print(f"πŸ” DEBUG: Response type: {result.get('type', 'NO_TYPE')}")
print(f"πŸ” DEBUG: Response keys: {list(result.keys()) if isinstance(result, dict) else 'NOT_DICT'}")
continue
except asyncio.TimeoutError:
print(f"🎀 WebSocket STT: Response timeout after 30s for {ws_url}")
continue
except json.JSONDecodeError as e:
print(f"🎀 WebSocket STT: Invalid JSON response from {ws_url}: {e}")
print(f"πŸ” DEBUG: Raw response that failed to parse: {response}")
# Don't return here - let retry logic handle it
break # Break from WebSocket connection, will retry
# If we get here, something went wrong but no exception
print(f"🎀 WebSocket STT: No valid response received from {ws_url}")
break # Break from WebSocket connection, will retry
except websockets.exceptions.ConnectionClosed as e:
attempt_time = time.time() - attempt_start
print(f"🎀 RETRY: Connection closed (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}")
print(f"πŸ” DEBUG: Connection close code: {e.code if hasattr(e, 'code') else 'N/A'}")
# Continue to retry logic
except websockets.exceptions.InvalidStatusCode as e:
attempt_time = time.time() - attempt_start
status_code = getattr(e, 'status_code', 'unknown')
print(f"🎀 RETRY: HTTP {status_code} (attempt {attempt}/{max_retries}, {attempt_time:.1f}s)")
# πŸ“Š CALIBRATION: Different status codes indicate different service states
if status_code == 503:
print(f"πŸ”„ STATUS CALIBRATION: HTTP 503 - Service temporarily unavailable (cold starting)")
elif status_code == 403:
print(f"🚫 STATUS CALIBRATION: HTTP 403 - Service forbidden (WebSocket not available)")
elif status_code == 502:
print(f"⚠️ STATUS CALIBRATION: HTTP 502 - Bad gateway (service deployment issue)")
else:
print(f"❓ STATUS CALIBRATION: HTTP {status_code} - Unknown service state")
# Continue to retry logic
except websockets.exceptions.WebSocketException as e:
attempt_time = time.time() - attempt_start
print(f"🎀 RETRY: WebSocket error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}")
# Continue to retry logic
except asyncio.TimeoutError:
attempt_time = time.time() - attempt_start
print(f"🎀 RETRY: Timeout after {attempt_time:.1f}s (attempt {attempt}/{max_retries})")
print(f"⏰ TIMEOUT CALIBRATION: Service taking >{connection_timeout}s indicates severe cold start")
# Continue to retry logic
except ConnectionRefusedError as e:
attempt_time = time.time() - attempt_start
print(f"🎀 RETRY: Connection refused (attempt {attempt}/{max_retries}, {attempt_time:.1f}s) - {e}")
print(f"🚫 CONNECTION CALIBRATION: Immediate refusal indicates service not listening")
# Continue to retry logic
except Exception as ws_error:
attempt_time = time.time() - attempt_start
print(f"🎀 RETRY: Unexpected error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s): {ws_error}")
print(f"πŸ” DEBUG: Exception type: {type(ws_error).__name__}")
# Continue to retry logic
# All retry attempts failed
total_time = time.time() - start_time
print(f"🎀 RETRY EXHAUSTED: Failed after {max_retries} attempts over {total_time:.1f}s")
# πŸ“Š FINAL CALIBRATION SUMMARY
if total_time > gpu_loading_threshold:
print(f"🐌 CALIBRATION SUMMARY: Very slow ({total_time:.1f}s) - GPU loading or deployment issues")
elif total_time > cold_start_threshold:
print(f"❄️ CALIBRATION SUMMARY: Slow ({total_time:.1f}s) - Cold start confirmed")
else:
print(f"⚑ CALIBRATION SUMMARY: Fast failure ({total_time:.1f}s) - Service configuration issue")
return f"Error: WebSocket STT failed after {max_retries} attempts in {total_time:.1f}s"
async def _call_mcp_stt_service(self, audio_file_path: str) -> str:
"""Call MCP STT service with HTTP fallback."""
try:
print(f"🎀 MCP STT: Attempting MCP or HTTP service call for {audio_file_path}")
# Try actual MCP integration first
try:
from mcp import ClientSession
from mcp.client.stdio import stdio_client
# Attempt to connect to STT MCP service
print(f"🎀 MCP STT: Trying MCP connection...")
# TODO: Implement actual MCP call when services are deployed with MCP
# For now, this would connect to the MCP-enabled STT service
# result = await mcp_client.call_tool("stt_transcribe", {
# "audio_file": audio_file_path,
# "language": "auto",
# "model": "base"
# })
# Fall back to HTTP until MCP services are deployed
if hasattr(self, 'stt_http_url') and self.stt_http_url:
return await self._call_http_stt_service(audio_file_path)
# Return error - no simulation
print(f"🎀 MCP STT: No MCP client available")
return "ERROR: MCP STT service failed - no demo mode"
except ImportError:
print(f"🎀 MCP STT: MCP client not available, trying HTTP fallback")
# Try HTTP fallback
if hasattr(self, 'stt_http_url') and self.stt_http_url:
result = await self._call_http_stt_service(audio_file_path)
if result and not result.startswith("Error"):
return result
# Return error - no simulation
return "ERROR: No STT services available"
except Exception as e:
print(f"🎀 MCP STT service call error: {e}")
return f"ERROR: MCP STT service error - {str(e)}"
async def _call_http_stt_service(self, audio_file_path: str) -> str:
"""Call STT service via HTTP as fallback."""
try:
import requests
print(f"🎀 HTTP STT: Calling service at {self.stt_http_url}")
# Skip problematic Gradio client, try direct HTTP API first
try:
print(f"🎀 HTTP STT: Trying direct HTTP API approach")
# Try multiple API endpoint patterns
api_patterns = [
f"{self.stt_http_url}/api/predict",
f"{self.stt_http_url}/call/predict",
f"{self.stt_http_url}/api/transcribe_audio",
f"{self.stt_http_url}/call/transcribe_audio"
]
for api_url in api_patterns:
try:
print(f"🎀 HTTP STT: Trying API URL: {api_url}")
with open(audio_file_path, 'rb') as audio_file:
# Try different payload formats
payload_formats = [
# Format 1: Standard Gradio API format
{
'files': {'data': audio_file},
'data': {'data': json.dumps(["auto", "base", True])}
},
# Format 2: Direct form data
{
'files': {'audio': audio_file},
'data': {'language': 'auto', 'model': 'base', 'timestamps': 'true'}
}
]
for i, payload in enumerate(payload_formats):
try:
audio_file.seek(0) # Reset file pointer
print(f"🎀 HTTP STT: Trying payload format {i+1}")
response = requests.post(
api_url,
files=payload['files'],
data=payload['data'],
timeout=60
)
print(f"🎀 HTTP STT: Response status: {response.status_code}")
print(f"🎀 HTTP STT: Response headers: {dict(response.headers)}")
if response.status_code == 200:
try:
result = response.json()
print(f"🎀 HTTP STT: Response JSON: {result}")
# Try different response formats
transcription = None
if isinstance(result, dict):
if 'data' in result and len(result['data']) > 1:
transcription = result['data'][1]
elif 'transcription' in result:
transcription = result['transcription']
elif 'text' in result:
transcription = result['text']
elif isinstance(result, list) and len(result) > 1:
transcription = result[1]
if transcription and transcription.strip():
print(f"🎀 HTTP STT: SUCCESS via direct API: {transcription}")
return transcription.strip()
except json.JSONDecodeError as json_err:
print(f"🎀 HTTP STT: JSON decode error: {json_err}")
print(f"🎀 HTTP STT: Raw response: {response.text[:200]}")
else:
print(f"🎀 HTTP STT: Failed with status {response.status_code}")
print(f"🎀 HTTP STT: Error response: {response.text[:200]}")
except Exception as payload_error:
print(f"🎀 HTTP STT: Payload format {i+1} failed: {payload_error}")
continue
except Exception as url_error:
print(f"🎀 HTTP STT: URL {api_url} failed: {url_error}")
continue
print(f"🎀 HTTP STT: All direct API attempts failed")
except Exception as direct_error:
print(f"🎀 HTTP STT: Direct API approach failed: {direct_error}")
# Final fallback - try Gradio client if direct API failed
try:
print(f"🎀 HTTP STT: Falling back to Gradio client...")
from gradio_client import Client
client = Client(self.stt_http_url)
# Try different endpoint names that the STT service might use
gradio_endpoints = [
"transcribe_audio", # Main transcribe function name
"/transcribe_audio", # With slash
0, # First endpoint by index
]
for endpoint in gradio_endpoints:
try:
print(f"🎀 HTTP STT: Trying Gradio endpoint: {endpoint}")
# Create proper FileData object for Gradio client
from gradio_client import handle_file
file_data = handle_file(audio_file_path)
result = client.predict(
file_data, # audio file as FileData object
"auto", # language
"base", # model_size
True, # return_timestamps
api_name=endpoint if isinstance(endpoint, str) else None,
fn_index=endpoint if isinstance(endpoint, int) else None
)
print(f"🎀 HTTP STT: Gradio result for {endpoint}: {result}")
if result:
# Handle different result formats
if isinstance(result, (list, tuple)) and len(result) >= 2:
# Multi-element result: [status, transcription, timestamps]
transcription = result[1]
elif isinstance(result, str):
# Direct string result
transcription = result
else:
# Other formats
transcription = str(result)
if transcription and transcription.strip() and not transcription.startswith("❌"):
print(f"🎀 HTTP STT: SUCCESS via Gradio endpoint {endpoint}: {transcription[:50]}...")
return transcription.strip()
except Exception as endpoint_error:
print(f"🎀 HTTP STT: Endpoint {endpoint} failed: {endpoint_error}")
continue
print(f"🎀 HTTP STT: All Gradio endpoints failed")
except Exception as gradio_error:
print(f"🎀 HTTP STT: Gradio client setup failed: {gradio_error}")
# Return error instead of simulation
return "Error: STT service connection failed"
except Exception as e:
print(f"🎀 HTTP STT ERROR: {e}")
# Return error instead of demo text
return f"Error: STT service error - {str(e)}"
def _get_audio_duration(self, audio_file_path: str) -> float:
"""Get duration of audio file."""
try:
with wave.open(audio_file_path, 'rb') as wav_file:
frames = wav_file.getnframes()
rate = wav_file.getframerate()
duration = frames / float(rate)
return duration
except:
return 5.0 # Default duration
def process_audio_input(self, audio_tuple: Tuple) -> str:
"""Process Gradio audio input format using MCP."""
try:
print(f"🎀 MCP HANDLER: Processing audio tuple: {type(audio_tuple)}")
print(f"πŸ” DEBUG: Audio tuple content: {audio_tuple}")
if audio_tuple is None or len(audio_tuple) < 2:
print(f"🎀 MCP HANDLER: No audio received or invalid format")
return "ERROR: No audio received"
# Gradio audio format: (sample_rate, audio_array)
sample_rate, audio_array = audio_tuple
print(f"🎀 MCP HANDLER: Sample rate: {sample_rate}, Array type: {type(audio_array)}")
# Convert numpy array to audio file for MCP service
if isinstance(audio_array, np.ndarray):
print(f"🎀 MCP HANDLER: Audio array shape: {audio_array.shape}")
# Process with MCP STT service
try:
# Convert to proper format for MCP service - with buffer error handling
try:
audio_normalized = (audio_array * 32767).astype(np.int16)
except ValueError as buffer_error:
if "buffer size must be a multiple of element size" in str(buffer_error):
print(f"🎀 MCP HANDLER: Buffer size error - returning error")
audio_duration = len(audio_array) / sample_rate if len(audio_array) > 0 else 1.0
return f"ERROR: Buffer size issue with audio processing ({audio_duration:.1f}s)"
else:
raise buffer_error
# Create temporary WAV file
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
# Write WAV file
with wave.open(tmp_file.name, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_normalized.tobytes())
print(f"🎀 MCP HANDLER: Created temp WAV file: {tmp_file.name}")
# Process with MCP STT
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(self.speech_to_text(tmp_file.name))
print(f"🎀 MCP HANDLER: MCP STT result: {result}")
return result
finally:
loop.close()
# Clean up temp file
import os
try:
os.unlink(tmp_file.name)
except:
pass # Ignore cleanup errors
except Exception as stt_error:
print(f"🎀 MCP HANDLER ERROR: MCP STT processing failed: {stt_error}")
return f"ERROR: STT processing failed - {str(stt_error)}"
print(f"🎀 MCP HANDLER: Invalid audio array format")
return "Invalid audio format"
except Exception as e:
print(f"🎀 MCP HANDLER ERROR: {e}")
import traceback
traceback.print_exc()
logger.error(f"MCP audio processing error: {e}")
return f"Error processing audio: {str(e)}"
async def text_to_speech(self, text: str, voice: Optional[str] = None) -> Optional[bytes]:
"""Convert text to speech using MCP TTS service."""
try:
if not config.enable_voice_responses:
return None
if not self.tts_service:
print(f"πŸ”Š MCP TTS: No TTS service available")
return None
print(f"πŸ”Š MCP TTS: Converting text to speech via MCP: {text[:50]}...")
# Call MCP TTS service
result = await self._call_mcp_tts_service(text, voice)
return result
except Exception as e:
print(f"πŸ”Š MCP TTS ERROR: {e}")
logger.error(f"MCP TTS error: {e}")
return None
async def _call_mcp_tts_service(self, text: str, voice: Optional[str] = None) -> Optional[bytes]:
"""Call MCP TTS service - placeholder for actual MCP integration."""
try:
# This is where we would make the actual MCP call
print(f"πŸ”Š MCP TTS: Simulating MCP TTS service call")
# In a real MCP integration, this would be something like:
# result = await mcp_client.call_tool("tts_synthesize", {
# "text": text,
# "voice": voice or config.default_voice
# })
# For now, return None (no audio in demo)
return None
except Exception as e:
print(f"πŸ”Š MCP TTS service call error: {e}")
return None
def is_audio_service_available(self) -> Tuple[bool, bool]:
"""Check if MCP STT and TTS services are available."""
stt_available = bool(self.stt_service)
tts_available = bool(self.tts_service)
return stt_available, tts_available
def get_audio_status(self) -> dict:
"""Get status of MCP audio services."""
stt_available, tts_available = self.is_audio_service_available()
return {
"stt_available": stt_available,
"tts_available": tts_available,
"demo_mode": False, # NO DEMO MODE
"voice_responses_enabled": config.enable_voice_responses,
"default_voice": config.default_voice,
"service_type": "mcp"
}
# Global MCP audio handler instance
mcp_audio_handler = MCPAudioHandler()