Spaces:
Paused
Paused
| """ | |
| Real-time Audio Processing Utilities for WebRTC | |
| Handles STT service integration without demo modes | |
| """ | |
| import aiohttp | |
| import asyncio | |
| import logging | |
| from typing import Optional | |
| import json | |
| logger = logging.getLogger(__name__) | |
| class RealTimeSTTProcessor: | |
| """Real-time STT processor - connects only to real services""" | |
| def __init__(self, stt_service_url: str): | |
| self.stt_service_url = stt_service_url.rstrip('/') | |
| async def transcribe_audio_file(self, audio_file_path: str) -> Optional[str]: | |
| """Transcribe audio file using real STT service - NO DEMO MODE""" | |
| try: | |
| logger.info(f"π€ Real-time STT: Processing {audio_file_path}") | |
| # Try multiple API endpoint patterns systematically | |
| api_patterns = [ | |
| f"{self.stt_service_url}/api/predict", | |
| f"{self.stt_service_url}/call/predict", | |
| f"{self.stt_service_url}/api/transcribe_audio", | |
| f"{self.stt_service_url}/call/transcribe_audio" | |
| ] | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: | |
| for api_url in api_patterns: | |
| try: | |
| logger.info(f"π€ Trying STT API: {api_url}") | |
| # Prepare file upload | |
| with open(audio_file_path, 'rb') as audio_file: | |
| # Try different payload formats | |
| payload_formats = [ | |
| # Format 1: Standard Gradio API | |
| { | |
| 'data': aiohttp.FormData() | |
| }, | |
| # Format 2: Direct form data | |
| { | |
| 'data': aiohttp.FormData() | |
| } | |
| ] | |
| # Format 1: Gradio API style | |
| payload_formats[0]['data'].add_field('data', audio_file, filename='audio.wav') | |
| payload_formats[0]['data'].add_field('data', json.dumps(["auto", "base", True])) | |
| # Format 2: Direct style | |
| audio_file.seek(0) | |
| payload_formats[1]['data'].add_field('audio', audio_file, filename='audio.wav') | |
| payload_formats[1]['data'].add_field('language', 'auto') | |
| payload_formats[1]['data'].add_field('model', 'base') | |
| payload_formats[1]['data'].add_field('timestamps', 'true') | |
| for i, payload in enumerate(payload_formats): | |
| try: | |
| logger.info(f"π€ Trying payload format {i+1}") | |
| async with session.post(api_url, data=payload['data']) as response: | |
| logger.info(f"π€ Response status: {response.status}") | |
| if response.status == 200: | |
| result = await response.json() | |
| logger.info(f"π€ Response JSON: {result}") | |
| # Extract transcription from various response formats | |
| transcription = self._extract_transcription(result) | |
| if transcription and transcription.strip(): | |
| logger.info(f"π€ SUCCESS: {transcription}") | |
| return transcription.strip() | |
| else: | |
| error_text = await response.text() | |
| logger.warning(f"π€ API failed: {response.status} - {error_text[:200]}") | |
| except Exception as payload_error: | |
| logger.error(f"π€ Payload {i+1} failed: {payload_error}") | |
| continue | |
| except Exception as url_error: | |
| logger.error(f"π€ URL {api_url} failed: {url_error}") | |
| continue | |
| logger.error("π€ All STT API attempts failed") | |
| return None | |
| except Exception as e: | |
| logger.error(f"π€ STT processing error: {e}") | |
| return None | |
| def _extract_transcription(self, result) -> Optional[str]: | |
| """Extract transcription from different API response formats""" | |
| try: | |
| # Try different response formats | |
| transcription = None | |
| if isinstance(result, dict): | |
| # Gradio API format: {"data": [status, transcription, timestamps]} | |
| if 'data' in result and isinstance(result['data'], list) and len(result['data']) > 1: | |
| transcription = result['data'][1] | |
| # Direct API formats | |
| elif 'transcription' in result: | |
| transcription = result['transcription'] | |
| elif 'text' in result: | |
| transcription = result['text'] | |
| elif 'result' in result: | |
| transcription = result['result'] | |
| elif isinstance(result, list) and len(result) > 1: | |
| # Direct array format: [status, transcription, timestamps] | |
| transcription = result[1] | |
| return transcription | |
| except Exception as e: | |
| logger.error(f"Failed to extract transcription: {e}") | |
| return None | |
| class RealTimeTTSProcessor: | |
| """Real-time TTS processor for voice responses""" | |
| def __init__(self, tts_service_url: str): | |
| self.tts_service_url = tts_service_url.rstrip('/') | |
| async def synthesize_text(self, text: str, voice_preset: str = "v2/en_speaker_6") -> Optional[bytes]: | |
| """Synthesize text to speech using real TTS service""" | |
| try: | |
| logger.info(f"π Real-time TTS: Synthesizing '{text[:50]}...'") | |
| # Implementation for TTS service calls | |
| # This will be implemented in Phase 4 (TTS integration) | |
| logger.info("π TTS synthesis placeholder - Phase 4 implementation") | |
| return None | |
| except Exception as e: | |
| logger.error(f"π TTS synthesis error: {e}") | |
| return None |