Spaces:
Running
Running
| # ============================================================ | |
| # app/services/voice_service.py - Voice Processing Service | |
| # STT: MiMo-V2-Omni (raw audio understanding) | TTS: Edge TTS | |
| # ============================================================ | |
| import logging | |
| import asyncio | |
| import base64 | |
| import uuid | |
| import re | |
| import io | |
| from typing import Optional, Tuple | |
| from datetime import datetime | |
| import httpx | |
| import edge_tts | |
| import boto3 | |
| from botocore.config import Config | |
| from app.config import settings | |
| logger = logging.getLogger(__name__) | |
| # Edge TTS voice mapping by language | |
| EDGE_TTS_VOICES = { | |
| "en": "en-US-AriaNeural", # Female, warm | |
| "fr": "fr-FR-DeniseNeural", # Female, French | |
| "es": "es-ES-ElviraNeural", # Female, Spanish | |
| "pt": "pt-BR-FranciscaNeural", # Female, Portuguese | |
| "ar": "ar-SA-ZariyahNeural", # Female, Arabic | |
| "de": "de-DE-KatjaNeural", # Female, German | |
| "it": "it-IT-ElsaNeural", # Female, Italian | |
| "zh": "zh-CN-XiaoxiaoNeural", # Female, Chinese | |
| "yo": "en-NG-EzinneNeural", # Female, Nigerian English (closest to Yoruba) | |
| "ig": "en-NG-EzinneNeural", # Female, Nigerian English (closest to Igbo) | |
| "ha": "en-NG-EzinneNeural", # Female, Nigerian English (closest to Hausa) | |
| } | |
| class VoiceService: | |
| """ | |
| Voice processing service for AIDA. | |
| Features: | |
| - Speech-to-Text: MiMo-V2-Omni (raw audio understanding, no transcription step) | |
| - Text-to-Speech: Microsoft Edge TTS (free, multilingual) | |
| - Audio Storage: Cloudflare R2 | |
| """ | |
| def __init__(self): | |
| """Initialize the voice service with R2 client and MiMo.""" | |
| self._r2_client = None | |
| self._mimo = None | |
| self._init_r2_client() | |
| self._init_mimo() | |
| def _init_r2_client(self): | |
| """Initialize Cloudflare R2 client using S3-compatible API.""" | |
| try: | |
| self._r2_client = boto3.client( | |
| 's3', | |
| endpoint_url=settings.CF_R2_ENDPOINT, | |
| aws_access_key_id=settings.CF_R2_ACCESS_KEY_ID, | |
| aws_secret_access_key=settings.CF_R2_SECRET_ACCESS_KEY, | |
| config=Config( | |
| signature_version='s3v4', | |
| s3={'addressing_style': 'path'} | |
| ), | |
| region_name='auto' | |
| ) | |
| logger.info("β R2 client initialized successfully") | |
| except Exception as e: | |
| logger.error(f"β Failed to initialize R2 client: {e}") | |
| self._r2_client = None | |
| def _init_mimo(self): | |
| """Initialize MiMo-V2-Omni for audio understanding.""" | |
| try: | |
| from app.core.mimo_client import get_mimo_client | |
| self._mimo = get_mimo_client() | |
| if self._mimo.is_available: | |
| logger.info("β MiMo-V2-Omni STT configured (raw audio understanding)") | |
| else: | |
| logger.warning("β οΈ MiMo-V2-Omni not available - STT unavailable") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Failed to init MiMo for STT: {e}") | |
| self._mimo = None | |
| def _strip_emojis(self, text: str) -> str: | |
| """Remove emojis from text to prevent TTS from reading them.""" | |
| emoji_pattern = re.compile( | |
| "[" | |
| "\U0001F600-\U0001F64F" | |
| "\U0001F300-\U0001F5FF" | |
| "\U0001F680-\U0001F6FF" | |
| "\U0001F1E0-\U0001F1FF" | |
| "\U00002702-\U000027B0" | |
| "\U000024C2-\U0001F251" | |
| "\U0001F900-\U0001F9FF" | |
| "\U0001FA00-\U0001FA6F" | |
| "\U0001FA70-\U0001FAFF" | |
| "\U00002600-\U000026FF" | |
| "\U00002700-\U000027BF" | |
| "\U0001F000-\U0001F02F" | |
| "\U0001F0A0-\U0001F0FF" | |
| "]+", | |
| flags=re.UNICODE | |
| ) | |
| clean_text = emoji_pattern.sub('', text) | |
| clean_text = re.sub(r'\s+', ' ', clean_text).strip() | |
| return clean_text | |
| def _strip_markdown(self, text: str) -> str: | |
| """Remove markdown formatting from text.""" | |
| text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) | |
| text = re.sub(r'__(.+?)__', r'\1', text) | |
| text = re.sub(r'(?<!\w)\*(.+?)\*(?!\w)', r'\1', text) | |
| text = re.sub(r'(?<!\w)_(.+?)_(?!\w)', r'\1', text) | |
| text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE) | |
| text = re.sub(r'^\s*[-*β’]\s+', '', text, flags=re.MULTILINE) | |
| text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) | |
| text = re.sub(r'`(.+?)`', r'\1', text) | |
| text = re.sub(r'```[\s\S]*?```', '', text) | |
| text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) | |
| text = re.sub(r'^[-*]{3,}\s*$', '', text, flags=re.MULTILINE) | |
| text = re.sub(r'\n\s*\n', '\n', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def _expand_currency_codes(self, text: str) -> str: | |
| """Replace currency codes with their spoken names for natural TTS.""" | |
| currency_names = { | |
| "XOF": "CFA Franc", "XAF": "CFA Franc", "NGN": "Naira", | |
| "GHS": "Cedi", "KES": "Kenyan Shilling", "ZAR": "Rand", | |
| "USD": "Dollar", "EUR": "Euro", "GBP": "Pound", | |
| "CAD": "Canadian Dollar", "TZS": "Tanzanian Shilling", | |
| "UGX": "Ugandan Shilling", "RWF": "Rwandan Franc", | |
| "ETB": "Birr", "MAD": "Dirham", "EGP": "Egyptian Pound", | |
| "CDF": "Congolese Franc", "TND": "Dinar", | |
| } | |
| for code, name in currency_names.items(): | |
| # Match standalone currency codes (not inside words) | |
| text = re.sub(rf'\b{code}\b', name, text, flags=re.IGNORECASE) | |
| return text | |
| def _enhance_prosody(self, text: str) -> str: | |
| """Enhance text for more natural TTS pronunciation.""" | |
| lines = text.split('\n') | |
| enhanced_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| if line and not line[-1] in '.!?:;': | |
| line += '.' | |
| enhanced_lines.append(line) | |
| text = ' '.join(enhanced_lines) | |
| text = re.sub(r'\.{3,}', '.', text) | |
| text = re.sub(r'([.!?,;:])(?=[A-Za-z])', r'\1 ', text) | |
| text = re.sub(r'[.!?]{2,}', '.', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| # ============================================================ | |
| # SPEECH-TO-TEXT (MiMo-V2-Omni - Raw Audio Understanding) | |
| # ============================================================ | |
| async def transcribe_audio( | |
| self, | |
| audio_url: str, | |
| language: Optional[str] = None, | |
| force_language: str = None | |
| ) -> Tuple[str, str]: | |
| """ | |
| Transcribe audio using MiMo-V2-Omni's native audio understanding. | |
| No separate transcription step β MiMo understands raw audio directly. | |
| Args: | |
| audio_url: URL to the audio file | |
| language: Optional language hint | |
| force_language: Force transcription in this language | |
| Returns: | |
| Tuple of (transcript_text, detected_language) | |
| """ | |
| try: | |
| if not self._mimo or not self._mimo.is_available: | |
| raise ValueError("MiMo-V2-Omni not available - cannot transcribe audio") | |
| logger.info(f"π€ Transcribing audio with MiMo-V2-Omni: {audio_url[:50]}...") | |
| transcript, detected_lang = await self._mimo.transcribe_audio(audio_url) | |
| # Override with forced language if specified | |
| if force_language: | |
| detected_lang = force_language | |
| elif language and detected_lang == "en": | |
| # Use hint if MiMo defaulted to English | |
| detected_lang = language | |
| logger.info(f"β MiMo transcription: '{transcript[:50]}...' (lang: {detected_lang})") | |
| return transcript, detected_lang | |
| except Exception as e: | |
| logger.error(f"β MiMo transcription error: {e}") | |
| raise ValueError(f"Failed to transcribe audio: {str(e)}") | |
| # ============================================================ | |
| # TEXT-TO-SPEECH (Edge TTS - Free, Multilingual) | |
| # ============================================================ | |
| async def text_to_speech( | |
| self, | |
| text: str, | |
| language: str = "en", | |
| voice: Optional[str] = None | |
| ) -> Tuple[bytes, float]: | |
| """ | |
| Convert text to speech using Microsoft Edge TTS. | |
| Args: | |
| text: Text to convert to speech | |
| language: Language code (en, fr, es, etc.) | |
| voice: Optional Edge TTS voice name override | |
| Returns: | |
| Tuple of (audio_bytes, duration_seconds) | |
| """ | |
| try: | |
| # Clean text for natural speech | |
| clean_text = self._strip_emojis(text) | |
| clean_text = self._strip_markdown(clean_text) | |
| clean_text = self._expand_currency_codes(clean_text) | |
| clean_text = self._enhance_prosody(clean_text) | |
| if not clean_text: | |
| raise ValueError("No text to convert after cleaning") | |
| # Select voice based on language | |
| selected_voice = voice or EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["en"]) | |
| logger.info(f"π Generating speech with Edge TTS: {selected_voice} (lang: {language})") | |
| # Generate audio with edge-tts | |
| communicate = edge_tts.Communicate(clean_text, selected_voice) | |
| audio_buffer = io.BytesIO() | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| audio_buffer.write(chunk["data"]) | |
| audio_bytes = audio_buffer.getvalue() | |
| if len(audio_bytes) < 100: | |
| raise ValueError(f"Generated audio is too small ({len(audio_bytes)} bytes)") | |
| # Estimate duration from file size (~16kbps for mp3) | |
| duration_seconds = len(audio_bytes) / 2000 | |
| logger.info(f"β Edge TTS complete: {len(audio_bytes)} bytes, ~{duration_seconds:.1f}s, voice={selected_voice}") | |
| return audio_bytes, duration_seconds | |
| except Exception as e: | |
| logger.error(f"β Edge TTS error: {e}") | |
| raise ValueError(f"Failed to generate speech: {str(e)}") | |
| # ============================================================ | |
| # CLOUDFLARE R2 STORAGE | |
| # ============================================================ | |
| async def upload_audio_to_r2( | |
| self, | |
| audio_bytes: bytes, | |
| filename: Optional[str] = None, | |
| content_type: str = "audio/mpeg" | |
| ) -> str: | |
| """ | |
| Upload audio file to Cloudflare R2. | |
| Args: | |
| audio_bytes: Audio file bytes | |
| filename: Optional filename (auto-generated if not provided) | |
| content_type: MIME type (default: audio/mpeg for mp3) | |
| Returns: | |
| Public URL to the uploaded audio | |
| """ | |
| if not self._r2_client: | |
| raise ValueError("R2 client not initialized. Check configuration.") | |
| try: | |
| if not filename: | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| unique_id = str(uuid.uuid4())[:8] | |
| ext_map = { | |
| "audio/mpeg": "mp3", | |
| "audio/mp3": "mp3", | |
| "audio/mp4": "m4a", | |
| "audio/x-m4a": "m4a", | |
| "audio/aac": "m4a", | |
| "audio/wav": "wav", | |
| "audio/x-wav": "wav", | |
| "audio/wave": "wav", | |
| "audio/ogg": "ogg", | |
| "audio/flac": "flac", | |
| "audio/webm": "webm", | |
| } | |
| extension = ext_map.get(content_type, "m4a") # default m4a for mobile | |
| filename = f"voice_{timestamp}_{unique_id}.{extension}" | |
| key = f"aida/{filename}" | |
| loop = asyncio.get_event_loop() | |
| await loop.run_in_executor( | |
| None, | |
| lambda: self._r2_client.put_object( | |
| Bucket=settings.CF_R2_BUCKET_NAME, | |
| Key=key, | |
| Body=audio_bytes, | |
| ContentType=content_type | |
| ) | |
| ) | |
| public_url = f"{settings.CF_R2_PUBLIC_URL}/{key}" | |
| logger.info(f"β Audio uploaded to R2: {public_url}") | |
| return public_url | |
| except Exception as e: | |
| logger.error(f"β R2 upload error: {e}") | |
| raise ValueError(f"Failed to upload audio: {str(e)}") | |
| # ============================================================ | |
| # COMBINED VOICE CHAT FLOW | |
| # ============================================================ | |
| async def process_voice_message( | |
| self, | |
| user_audio_url: str, | |
| ai_response_text: str, | |
| language: str = "en" | |
| ) -> dict: | |
| """ | |
| Process a complete voice chat interaction. | |
| 1. Transcribe user's voice message (Whisper STT) | |
| 2. Generate AIDA's voice response (Edge TTS) | |
| 3. Upload AIDA's audio to R2 | |
| """ | |
| try: | |
| # Step 1: Transcribe user audio | |
| transcript, detected_lang = await self.transcribe_audio(user_audio_url) | |
| tts_language = detected_lang if detected_lang else language | |
| # Step 2: Generate AIDA's voice response | |
| aida_audio_bytes, aida_duration = await self.text_to_speech( | |
| ai_response_text, | |
| language=tts_language | |
| ) | |
| # Step 3: Upload AIDA's audio to R2 | |
| aida_audio_url = await self.upload_audio_to_r2(aida_audio_bytes) | |
| return { | |
| "user_transcript": transcript, | |
| "detected_language": detected_lang, | |
| "aida_audio_url": aida_audio_url, | |
| "aida_audio_duration": round(aida_duration, 1), | |
| "ai_response_text": ai_response_text | |
| } | |
| except Exception as e: | |
| logger.error(f"β Voice processing error: {e}") | |
| raise | |
| async def generate_aida_voice_response( | |
| self, | |
| text: str, | |
| language: str = "en" | |
| ) -> dict: | |
| """ | |
| Generate and upload AIDA's voice response. | |
| Use this when you already have AIDA's text and just need the audio. | |
| """ | |
| audio_bytes, duration = await self.text_to_speech(text, language) | |
| audio_url = await self.upload_audio_to_r2(audio_bytes) | |
| return { | |
| "audio_url": audio_url, | |
| "duration": int(round(duration)) | |
| } | |
| # Singleton instance | |
| voice_service = VoiceService() | |