Spaces:
Sleeping
Sleeping
Peter Michael Gits
feat: Add Streamlit-native WebRTC speech-to-text using unmute.sh patterns
21fac9b | """ | |
| Audio Handler for ChatCal Voice - Handles STT and TTS integration. | |
| This module connects to the external Hugging Face STT and TTS services | |
| to provide voice interaction capabilities. | |
| """ | |
| import logging | |
| import numpy as np | |
| import requests | |
| import tempfile | |
| import wave | |
| import json | |
| from typing import Optional, Tuple | |
| from .config import config | |
| logger = logging.getLogger(__name__) | |
| class AudioHandler: | |
| """Handles audio processing for voice interactions.""" | |
| def __init__(self): | |
| self.demo_mode = True # Start in demo mode | |
| # Convert HF URLs to API endpoints (will return lists of URLs to try) | |
| self.stt_api_urls = self._get_api_url(config.stt_service_url) | |
| self.tts_api_urls = self._get_api_url(config.tts_service_url) | |
| # Will be set to the working URL after testing | |
| self.stt_api_url = None | |
| self.tts_api_url = None | |
| # Initialize services | |
| self._initialize_services() | |
| def _get_api_url(self, space_url: str) -> str: | |
| """Convert HF Space URL to direct API endpoint.""" | |
| if "huggingface.co/spaces/" in space_url: | |
| # Convert: https://huggingface.co/spaces/pgits/stt-gpu-service | |
| # Multiple possible API patterns to try | |
| parts = space_url.replace("https://huggingface.co/spaces/", "").split("/") | |
| if len(parts) >= 2: | |
| username, space_name = parts[0], parts[1] | |
| # Return a list of possible URLs to try | |
| return [ | |
| f"https://{username}-{space_name.replace('_', '-')}.hf.space/api/predict", | |
| f"https://{space_url.replace('https://huggingface.co/spaces/', '').replace('/', '-')}.hf.space/api/predict", | |
| f"{space_url}/api/predict", | |
| f"https://{username}-{space_name}.hf.space/api/predict" | |
| ] | |
| return [space_url + "/api/predict" if not space_url.endswith("/api/predict") else space_url] | |
| def _initialize_services(self): | |
| """Initialize STT and TTS services with HTTP API calls.""" | |
| try: | |
| print(f"π§ HTTP INIT: Starting audio service initialization") | |
| print(f"π§ HTTP INIT: Testing STT URLs: {self.stt_api_urls}") | |
| print(f"π§ HTTP INIT: Testing TTS URLs: {self.tts_api_urls}") | |
| # Test STT service availability - try multiple URLs | |
| self.stt_api_url = self._find_working_endpoint(self.stt_api_urls, "STT") | |
| self.tts_api_url = self._find_working_endpoint(self.tts_api_urls, "TTS") | |
| # Exit demo mode if STT is available (TTS optional for now) | |
| if self.stt_api_url: | |
| self.demo_mode = False | |
| print(f"π΅ STT service available via HTTP - EXITING DEMO MODE") | |
| print(f"π΅ Using STT URL: {self.stt_api_url}") | |
| logger.info("π΅ STT service available, exiting demo mode") | |
| else: | |
| print(f"π΅ STAYING IN DEMO MODE - STT service not available") | |
| logger.warning("π΅ Running in demo mode - STT service unavailable") | |
| except Exception as e: | |
| print(f"π§ HTTP INIT ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"Failed to initialize audio services: {e}") | |
| self.demo_mode = True | |
| def _find_working_endpoint(self, urls: list, service_name: str) -> str: | |
| """Find the first working endpoint from a list of URLs.""" | |
| for url in urls: | |
| print(f"π Testing {service_name} endpoint: {url}") | |
| if self._test_service_availability(url, service_name): | |
| print(f"β {service_name} working endpoint found: {url}") | |
| return url | |
| print(f"β No working {service_name} endpoints found") | |
| return None | |
| def _test_service_availability(self, api_url: str, service_name: str) -> bool: | |
| """Test if a service is available via HTTP.""" | |
| try: | |
| print(f"π Testing {service_name} service: {api_url}") | |
| # Try a simple GET request first to check if endpoint exists | |
| response = requests.get(api_url.replace('/api/predict', '/'), timeout=10) | |
| if response.status_code == 200: | |
| print(f"β {service_name} service is accessible") | |
| return True | |
| else: | |
| print(f"β {service_name} service returned status: {response.status_code}") | |
| return False | |
| except requests.exceptions.Timeout: | |
| print(f"β±οΈ {service_name} service timeout - may be in cold start") | |
| return False | |
| except Exception as e: | |
| print(f"β {service_name} service error: {e}") | |
| return False | |
| async def speech_to_text(self, audio_file_path: str) -> str: | |
| """Convert speech to text using HTTP API calls.""" | |
| try: | |
| print(f"π€ HTTP STT: Processing audio file: {audio_file_path}") | |
| if self.demo_mode: | |
| print(f"π€ HTTP STT: Using demo mode") | |
| return self._simulate_stt(audio_file_path) | |
| # Call STT service via HTTP | |
| print(f"π€ HTTP STT: Calling STT service: {self.stt_api_url}") | |
| with open(audio_file_path, 'rb') as audio_file: | |
| files = { | |
| 'data': audio_file | |
| } | |
| data = { | |
| 'data': json.dumps(["auto", "base", True]) # [language, model_size, include_timestamps] | |
| } | |
| response = requests.post( | |
| self.stt_api_url, | |
| files=files, | |
| data=data, | |
| timeout=30 | |
| ) | |
| print(f"π€ HTTP STT: Response status: {response.status_code}") | |
| if response.status_code == 200: | |
| result = response.json() | |
| print(f"π€ HTTP STT: Service returned: {result}") | |
| # Extract transcription from result | |
| if result and 'data' in result and len(result['data']) > 1: | |
| transcription = result['data'][1] # Assuming [status, transcription, ...] | |
| print(f"π€ HTTP STT: Extracted transcription: {transcription}") | |
| return transcription | |
| elif result and isinstance(result, list) and len(result) > 1: | |
| transcription = result[1] | |
| print(f"π€ HTTP STT: Extracted transcription (alt format): {transcription}") | |
| return transcription | |
| else: | |
| print(f"π€ HTTP STT: Unexpected result format") | |
| return "Could not parse transcription result" | |
| else: | |
| print(f"π€ HTTP STT: Service error - Status {response.status_code}: {response.text}") | |
| return self._simulate_stt(audio_file_path) | |
| except requests.exceptions.Timeout: | |
| print(f"π€ HTTP STT: Request timeout - service may be cold starting") | |
| return "STT service timeout - please try again" | |
| except Exception as e: | |
| print(f"π€ HTTP STT ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"STT HTTP error: {e}") | |
| return self._simulate_stt(audio_file_path) | |
| def _simulate_stt(self, audio_data) -> str: | |
| """Simulate speech-to-text for demo purposes.""" | |
| # Return a realistic demo transcription | |
| demo_transcriptions = [ | |
| "Hi, I'm John Smith. I'd like to book a 30-minute meeting with Peter tomorrow at 2 PM.", | |
| "Hello, this is Sarah. Can we schedule a Google Meet for next Tuesday?", | |
| "I'm Mike Johnson. Please book an appointment for Friday afternoon.", | |
| "Hi there! I need to schedule a one-hour consultation about my project.", | |
| "Good morning, I'd like to check Peter's availability this week." | |
| ] | |
| import random | |
| return random.choice(demo_transcriptions) | |
| def _simulate_stt_with_length(self, duration: float) -> str: | |
| """Simulate STT with duration-appropriate responses.""" | |
| if duration < 2: | |
| return "Hello" | |
| elif duration < 5: | |
| return "Hi, I'm testing the voice input" | |
| elif duration < 10: | |
| return "Hi, I'm John Smith. I'd like to book a meeting with Peter." | |
| else: | |
| return "Hi, I'm John Smith. I'd like to book a 30-minute meeting with Peter tomorrow at 2 PM to discuss my project." | |
| async def text_to_speech(self, text: str, voice: Optional[str] = None) -> Optional[bytes]: | |
| """Convert text to speech using external TTS service.""" | |
| try: | |
| if not config.enable_voice_responses: | |
| return None | |
| if self.demo_mode or not self.tts_client: | |
| return self._simulate_tts(text) | |
| # Use provided voice or default | |
| selected_voice = voice or config.default_voice | |
| # Process with actual TTS service | |
| result = self.tts_client.predict( | |
| text, | |
| selected_voice, | |
| api_name="/predict" | |
| ) | |
| # Extract audio from result | |
| if result and len(result) > 0: | |
| return result[0] # audio file data | |
| return None | |
| except Exception as e: | |
| logger.error(f"TTS error: {e}") | |
| return self._simulate_tts(text) | |
| def _simulate_tts(self, text: str) -> Optional[bytes]: | |
| """Simulate text-to-speech for demo purposes.""" | |
| # Return None to indicate no audio generation in demo mode | |
| logger.info(f"π Demo TTS would say: {text[:50]}...") | |
| return None | |
| def process_audio_input(self, audio_tuple: Tuple) -> str: | |
| """Process Gradio audio input format.""" | |
| try: | |
| print(f"π€ HANDLER DEBUG: Processing audio tuple: {type(audio_tuple)}") | |
| if audio_tuple is None or len(audio_tuple) < 2: | |
| print(f"π€ HANDLER DEBUG: No audio received or invalid format") | |
| return "No audio received" | |
| # Gradio audio format: (sample_rate, audio_array) | |
| sample_rate, audio_array = audio_tuple | |
| print(f"π€ HANDLER DEBUG: Sample rate: {sample_rate}, Array type: {type(audio_array)}") | |
| # Convert numpy array to audio file for STT service | |
| if isinstance(audio_array, np.ndarray): | |
| print(f"π€ HANDLER DEBUG: Audio array shape: {audio_array.shape}") | |
| # For now, use demo mode to test the flow | |
| if self.demo_mode: | |
| print(f"π€ HANDLER DEBUG: Using demo STT mode - creating realistic transcription") | |
| # Create a more realistic demo response based on audio length | |
| audio_duration = len(audio_array) / sample_rate | |
| print(f"π€ HANDLER DEBUG: Audio duration: {audio_duration:.2f} seconds") | |
| return self._simulate_stt_with_length(audio_duration) | |
| # Process with HTTP STT service | |
| try: | |
| # Convert to proper format for STT service | |
| audio_normalized = (audio_array * 32767).astype(np.int16) | |
| # Create temporary WAV file | |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: | |
| # Write WAV file | |
| with wave.open(tmp_file.name, 'wb') as wav_file: | |
| wav_file.setnchannels(1) # Mono | |
| wav_file.setsampwidth(2) # 16-bit | |
| wav_file.setframerate(sample_rate) | |
| wav_file.writeframes(audio_normalized.tobytes()) | |
| print(f"π€ HANDLER DEBUG: Created temp WAV file: {tmp_file.name}") | |
| # Process with HTTP STT | |
| import asyncio | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| result = loop.run_until_complete(self.speech_to_text(tmp_file.name)) | |
| print(f"π€ HANDLER DEBUG: HTTP STT result: {result}") | |
| return result | |
| finally: | |
| loop.close() | |
| # Clean up temp file | |
| import os | |
| try: | |
| os.unlink(tmp_file.name) | |
| except: | |
| pass # Ignore cleanup errors | |
| except Exception as stt_error: | |
| print(f"π€ HANDLER ERROR: HTTP STT processing failed: {stt_error}") | |
| return self._simulate_stt_with_length(len(audio_array) / sample_rate) | |
| print(f"π€ HANDLER DEBUG: Invalid audio array format") | |
| return "Invalid audio format" | |
| except Exception as e: | |
| print(f"π€ HANDLER ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"Audio processing error: {e}") | |
| return f"Error processing audio: {str(e)}" | |
| def is_audio_service_available(self) -> Tuple[bool, bool]: | |
| """Check if STT and TTS services are available.""" | |
| stt_available = not self.demo_mode # HTTP-based, no client objects | |
| tts_available = not self.demo_mode # HTTP-based, no client objects | |
| return stt_available, tts_available | |
| def get_audio_status(self) -> dict: | |
| """Get status of audio services.""" | |
| stt_available, tts_available = self.is_audio_service_available() | |
| return { | |
| "stt_available": stt_available, | |
| "tts_available": tts_available, | |
| "demo_mode": self.demo_mode, | |
| "voice_responses_enabled": config.enable_voice_responses, | |
| "default_voice": config.default_voice | |
| } | |
| # Global audio handler instance | |
| audio_handler = AudioHandler() |