Spaces:
Sleeping
Sleeping
| """ | |
| Sophisticated Voice Processor with ASR, TTS, and Audio Intelligence | |
| Handles multi-language voice processing with emotion detection and adaptive responses | |
| """ | |
| import asyncio | |
| import io | |
| import numpy as np | |
| import librosa | |
| import soundfile as sf | |
| import webrtcvad | |
| from typing import Dict, List, Optional, Any, Union | |
| from dataclasses import dataclass, field, asdict | |
| import logging | |
| from pathlib import Path | |
| import tempfile | |
| import hashlib | |
| import torch | |
| import torch.nn.functional as F | |
| from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq, VitsModel | |
| from pydub import AudioSegment | |
| from pydub.effects import normalize, compress_dynamic_range | |
| import scipy.signal | |
| import structlog | |
| from caching_system import CacheManager | |
| from production_components import ( | |
| ProductionModelOrchestrator, | |
| ProductionMonitoringEngine, | |
| ) | |
| # Initialize logging | |
| logger = structlog.get_logger(__name__) | |
| class AudioConfig: | |
| """Configuration for audio processing""" | |
| sample_rate: int = 16000 | |
| channels: int = 1 | |
| bit_depth: int = 16 | |
| chunk_size: int = 1024 | |
| max_duration: int = 300 # 5 minutes | |
| min_duration: float = 0.5 # 0.5 seconds | |
| noise_reduction: bool = True | |
| voice_activity_detection: bool = True | |
| audio_enhancement: bool = True | |
| emotion_detection: bool = True | |
| speaker_identification: bool = True | |
| class VoiceAnalysis: | |
| """Comprehensive voice analysis results""" | |
| transcription: str | |
| confidence_score: float | |
| detected_language: str | |
| language_confidence: float | |
| emotion_analysis: Dict[str, float] = field(default_factory=dict) | |
| speaker_characteristics: Dict[str, Any] = field(default_factory=dict) | |
| audio_quality: Dict[str, float] = field(default_factory=dict) | |
| speech_rate: float = 0.0 | |
| pause_analysis: Dict[str, Any] = field(default_factory=dict) | |
| pronunciation_assessment: Dict[str, float] = field(default_factory=dict) | |
| class TTSConfig: | |
| """Configuration for text-to-speech synthesis""" | |
| speaking_rate: float = 1.0 | |
| pitch_shift: float = 0.0 | |
| emotion_style: str = "neutral" | |
| language_specific_models: Dict[str, str] = field(default_factory=dict) | |
| class ASREngine: | |
| """Automatic Speech Recognition with multi-model support""" | |
| def __init__(self, config: AudioConfig): | |
| self.config = config | |
| self.asr_processors = {} | |
| self.asr_models = {} | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Model configurations for different languages and use cases | |
| self.model_configs = { | |
| "general": "openai/whisper-small", | |
| "rw": "mbazaNLP/Whisper-Small-Kinyarwanda", | |
| } | |
| async def initialize(self): | |
| """Initialize all ASR models""" | |
| logger.info("Initializing ASR Engine...") | |
| try: | |
| for model_name, model_path in self.model_configs.items(): | |
| logger.info( | |
| f"Loading Whisper model: {model_path} for {model_name} on {self.device}" | |
| ) | |
| processor = AutoProcessor.from_pretrained(model_path) | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained(model_path).to( | |
| self.device | |
| ) | |
| self.asr_processors[model_name] = processor | |
| self.asr_models[model_name] = model | |
| logger.info(f"{model_name} model loaded.") | |
| logger.info("ASR Engine initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize ASR Engine: {e}", exc_info=True) | |
| raise | |
| async def transcribe_audio( | |
| self, audio_data: bytes, expected_language: str = "auto" | |
| ) -> VoiceAnalysis: | |
| """Comprehensive audio transcription with analysis""" | |
| try: | |
| # Preprocess audio | |
| processed_audio = await self._preprocess_audio(audio_data) | |
| # Detect language if not specified or fallback | |
| detected_language = expected_language | |
| if expected_language == "auto": | |
| detected_language = await self._detect_language(processed_audio) | |
| language_confidence = 0.8 # Default | |
| if expected_language == "auto": | |
| # Language confidence from detection | |
| language_confidence = await self._get_language_confidence( | |
| processed_audio, detected_language | |
| ) | |
| # Select best model for language | |
| best_model_name = "rw" if detected_language == "rw" else "general" | |
| # Perform transcription | |
| transcription_result = await self._transcribe_with_model( | |
| processed_audio, best_model_name, detected_language | |
| ) | |
| # Analyze audio characteristics | |
| audio_analysis = await self._analyze_audio_characteristics(processed_audio) | |
| # Combine results | |
| voice_analysis = VoiceAnalysis( | |
| transcription=transcription_result["text"], | |
| confidence_score=transcription_result["confidence"], | |
| detected_language=detected_language, | |
| language_confidence=language_confidence, | |
| emotion_analysis=audio_analysis.get("emotions", {}), | |
| speaker_characteristics=audio_analysis.get("speaker", {}), | |
| audio_quality=audio_analysis.get("quality", {}), | |
| speech_rate=audio_analysis.get("speech_rate", 0.0), | |
| pause_analysis=audio_analysis.get("pauses", {}), | |
| pronunciation_assessment=audio_analysis.get("pronunciation", {}), | |
| ) | |
| return voice_analysis | |
| except Exception as e: | |
| logger.error(f"Error in audio transcription: {e}", exc_info=True) | |
| raise | |
| async def _preprocess_audio(self, audio_data: bytes) -> np.ndarray: | |
| """audio preprocessing pipeline""" | |
| # Convert bytes to audio array | |
| import noisereduce as nr | |
| audio_segment = AudioSegment.from_file(io.BytesIO(audio_data)) | |
| # Convert to mono and resample | |
| audio_segment = audio_segment.set_channels(1).set_frame_rate( | |
| self.config.sample_rate | |
| ) | |
| # Convert to numpy array | |
| audio_array = np.array(audio_segment.get_array_of_samples(), dtype=np.float32) | |
| audio_array = audio_array / np.max(np.abs(audio_array)) # Normalize to -1 to 1 | |
| # Apply noise reduction | |
| if self.config.noise_reduction: | |
| audio_array = nr.reduce_noise(y=audio_array, sr=self.config.sample_rate) | |
| # Apply audio enhancement | |
| if self.config.audio_enhancement: | |
| audio_array = self._enhance_audio(audio_array) | |
| # Voice activity detection | |
| if self.config.voice_activity_detection: | |
| audio_array = self._apply_vad(audio_array) | |
| return audio_array | |
| def _enhance_audio(self, audio: np.ndarray) -> np.ndarray: | |
| """Apply audio enhancement techniques.""" | |
| processed_audio = np.asarray(audio, dtype=np.float32).flatten() | |
| sos = scipy.signal.butter( | |
| 5, 80, btype="high", fs=self.config.sample_rate, output="sos" | |
| ) | |
| filtered_audio_output = scipy.signal.sosfilt(sos, processed_audio) | |
| filtered_audio: np.ndarray | |
| if isinstance(filtered_audio_output, tuple): | |
| filtered_audio = filtered_audio_output[0] | |
| else: | |
| filtered_audio = filtered_audio_output | |
| if filtered_audio.size == 0 or np.all(filtered_audio == 0): | |
| logger.warning( | |
| "Audio array became silent or empty after filtering. Skipping further enhancement." | |
| ) | |
| return filtered_audio | |
| audio_segment = AudioSegment( | |
| data=filtered_audio.tobytes(), | |
| frame_rate=self.config.sample_rate, | |
| sample_width=filtered_audio.dtype.itemsize, | |
| channels=1, | |
| ) | |
| compressed_segment = compress_dynamic_range( | |
| audio_segment, threshold=-20.0, ratio=4.0 | |
| ) | |
| enhanced_audio = np.array( | |
| compressed_segment.get_array_of_samples(), dtype=np.float32 | |
| ) | |
| max_val = np.max(np.abs(enhanced_audio)) | |
| if max_val > 0: | |
| enhanced_audio = enhanced_audio / max_val | |
| return enhanced_audio | |
| def _apply_vad(self, audio: np.ndarray) -> np.ndarray: | |
| """Apply Voice Activity Detection to remove silence""" | |
| audio_16bit = (audio * 32767).astype(np.int16) | |
| vad = webrtcvad.Vad(2) | |
| frame_duration_ms = 30 | |
| frame_size = int(self.config.sample_rate * frame_duration_ms / 1000) | |
| if self.config.sample_rate == 16000: | |
| frame_size = 480 | |
| voiced_frames_list = [] | |
| for i in range(0, len(audio_16bit) - frame_size + 1, frame_size): | |
| frame = audio_16bit[i : i + frame_size] | |
| if len(frame) == frame_size: | |
| is_speech = vad.is_speech(frame.tobytes(), self.config.sample_rate) | |
| if is_speech: | |
| voiced_frames_list.extend(frame) | |
| if voiced_frames_list: | |
| return np.array(voiced_frames_list, dtype=np.float32) / 32767.0 | |
| else: | |
| return audio | |
| async def _detect_language(self, audio: np.ndarray) -> str: | |
| """Detect language from audio using general model""" | |
| try: | |
| processor = self.asr_processors["general"] | |
| model = self.asr_models["general"] | |
| input_features = processor( | |
| audio, sampling_rate=self.config.sample_rate, return_tensors="pt" | |
| ).input_features.to(self.device) | |
| predicted = model.generate( | |
| input_features, | |
| max_new_tokens=1, | |
| output_scores=True, | |
| return_dict_in_generate=True, | |
| ) | |
| lang_token = predicted.sequences[0, 1] | |
| lang = processor.decode(lang_token) | |
| # Map to our supported languages | |
| language_mapping = { | |
| "en": "en", | |
| "fr": "fr", | |
| "rw": "rw", | |
| "sw": "rw", # Fallback Swahili to Kinyarwanda | |
| } | |
| return language_mapping.get(lang, "en") | |
| except Exception as e: | |
| logger.error(f"Language detection failed: {e}", exc_info=True) | |
| return "en" | |
| async def _get_language_confidence( | |
| self, audio: np.ndarray, detected_language: str | |
| ) -> float: | |
| """Get confidence for detected language""" | |
| try: | |
| processor = self.asr_processors["general"] | |
| model = self.asr_models["general"] | |
| input_features = processor( | |
| audio, sampling_rate=self.config.sample_rate, return_tensors="pt" | |
| ).input_features.to(self.device) | |
| predicted = model.generate( | |
| input_features, | |
| max_new_tokens=1, | |
| output_scores=True, | |
| return_dict_in_generate=True, | |
| ) | |
| prob = F.softmax(predicted.scores[0], dim=-1)[ | |
| 0, predicted.sequences[0, 1] | |
| ].item() | |
| return float(prob) | |
| except Exception as e: | |
| logger.error(f"Language confidence calculation failed: {e}", exc_info=True) | |
| return 0.8 | |
| def _select_best_model(self, language: str) -> str: | |
| """Select the best ASR model for the detected language""" | |
| if language == "rw": | |
| return "rw" | |
| return "general" | |
| async def _transcribe_with_model( | |
| self, audio: np.ndarray, model_name: str, language: str | |
| ) -> Dict[str, Any]: | |
| """Transcribe audio using specified model""" | |
| try: | |
| processor = self.asr_processors[model_name] | |
| model = self.asr_models[model_name] | |
| lang_code = "rw" if model_name == "rw" else language | |
| forced_decoder_ids = processor.get_decoder_prompt_ids( | |
| language=lang_code, task="transcribe" | |
| ) | |
| input_features = processor( | |
| audio, sampling_rate=self.config.sample_rate, return_tensors="pt" | |
| ).input_features.to(self.device) | |
| predicted = model.generate( | |
| input_features, | |
| forced_decoder_ids=forced_decoder_ids, | |
| output_scores=True, | |
| return_dict_in_generate=True, | |
| ) | |
| transcription = processor.batch_decode( | |
| predicted.sequences, skip_special_tokens=True | |
| )[0] | |
| # Calculate confidence | |
| scores = predicted.scores | |
| log_probs = [torch.log_softmax(score, dim=-1) for score in scores] | |
| seq_len = predicted.sequences.shape[1] | |
| forced_len = len(forced_decoder_ids) if forced_decoder_ids else 0 | |
| token_log_probs = [ | |
| log_prob[0, predicted.sequences[0, i].item()] | |
| for i, log_prob in enumerate(log_probs, start=forced_len) | |
| if i >= forced_len | |
| ] | |
| avg_logprob = ( | |
| sum(token_log_probs) / len(token_log_probs) if token_log_probs else 0 | |
| ) | |
| confidence = float(np.exp(avg_logprob)) | |
| return { | |
| "text": transcription.strip(), | |
| "confidence": confidence, | |
| "language_confidence": 0.8, | |
| } | |
| except Exception as e: | |
| logger.error(f"Transcription failed for {model_name}: {e}", exc_info=True) | |
| raise | |
| async def _analyze_audio_characteristics(self, audio: np.ndarray) -> Dict[str, Any]: | |
| """Analyze various audio characteristics""" | |
| analysis = {} | |
| try: | |
| if self.config.emotion_detection: | |
| analysis["emotions"] = await self._analyze_emotions(audio) | |
| analysis["speech_rate"] = self._calculate_speech_rate(audio) | |
| analysis["quality"] = self._assess_audio_quality(audio) | |
| analysis["pauses"] = self._analyze_pauses(audio) | |
| if self.config.speaker_identification: | |
| analysis["speaker"] = await self._analyze_speaker_characteristics(audio) | |
| except Exception as e: | |
| logger.error(f"Error in audio characteristics analysis: {e}", exc_info=True) | |
| return analysis | |
| async def _analyze_emotions(self, audio: np.ndarray) -> Dict[str, float]: | |
| """Analyze emotional content of speech""" | |
| try: | |
| emotions = { | |
| "neutral": 0.4, | |
| "happy": 0.2, | |
| "sad": 0.1, | |
| "angry": 0.1, | |
| "fear": 0.1, | |
| "surprise": 0.1, | |
| } | |
| mfccs = librosa.feature.mfcc(y=audio, sr=self.config.sample_rate, n_mfcc=13) | |
| energy = float(np.mean(librosa.feature.rms(y=audio))) | |
| spectral_centroid = float( | |
| np.mean( | |
| librosa.feature.spectral_centroid( | |
| y=audio, sr=self.config.sample_rate | |
| ) | |
| ) | |
| ) | |
| if energy > 0.1 and spectral_centroid > 2000: | |
| emotions["happy"] += 0.3 | |
| emotions["neutral"] -= 0.2 | |
| elif energy < 0.05: | |
| emotions["sad"] += 0.2 | |
| emotions["neutral"] -= 0.1 | |
| total = sum(emotions.values()) | |
| if total > 0: | |
| emotions = {k: v / total for k, v in emotions.items()} | |
| return emotions | |
| except Exception as e: | |
| logger.error(f"Emotion analysis failed: {e}", exc_info=True) | |
| return {"neutral": 1.0} | |
| def _calculate_speech_rate(self, audio: np.ndarray) -> float: | |
| """Calculate speech rate (words per minute)""" | |
| try: | |
| duration = len(audio) / self.config.sample_rate | |
| intervals = librosa.effects.split(audio, top_db=20) | |
| speech_duration = sum( | |
| (end - start) / self.config.sample_rate for start, end in intervals | |
| ) | |
| if speech_duration > 0: | |
| estimated_words = speech_duration * 2.5 # Rough syllable rate | |
| speech_rate = ( | |
| estimated_words / speech_duration | |
| ) * 60 # Words per minute | |
| return float(min(speech_rate, 300)) | |
| return 0.0 | |
| except Exception as e: | |
| logger.error(f"Speech rate calculation failed: {e}", exc_info=True) | |
| return 0.0 | |
| def _assess_audio_quality(self, audio: np.ndarray) -> Dict[str, float]: | |
| """Assess audio quality metrics""" | |
| try: | |
| quality = {} | |
| signal_power = float(np.mean(audio**2)) | |
| noise_power = float(np.mean((audio - np.mean(audio)) ** 2) * 0.1) | |
| snr = 10 * np.log10(signal_power / max(noise_power, 1e-10)) | |
| quality["snr"] = float(max(0.0, min(1.0, snr / 30))) | |
| dynamic_range = float(np.max(audio) - np.min(audio)) | |
| quality["dynamic_range"] = float(min(1.0, dynamic_range)) | |
| fft = np.fft.fft(audio) | |
| spectral_energy = float(np.mean(np.abs(fft))) | |
| quality["spectral_quality"] = float(min(1.0, spectral_energy * 10)) | |
| quality["overall"] = float(np.mean(list(quality.values()))) | |
| return quality | |
| except Exception as e: | |
| logger.error(f"Audio quality assessment failed: {e}", exc_info=True) | |
| return {"overall": 0.5} | |
| def _analyze_pauses(self, audio: np.ndarray) -> Dict[str, Any]: | |
| """Analyze pause patterns in speech""" | |
| try: | |
| intervals = librosa.effects.split(audio, top_db=20) | |
| if len(intervals) < 2: | |
| return { | |
| "pause_count": 0, | |
| "average_pause_duration": 0.0, | |
| "total_pause_time": 0.0, | |
| "pause_distribution": {}, | |
| } | |
| pause_durations = [] | |
| for i in range(len(intervals) - 1): | |
| pause_start = intervals[i][1] | |
| pause_end = intervals[i + 1][0] | |
| pause_duration = (pause_end - pause_start) / self.config.sample_rate | |
| if pause_duration > 0.1: | |
| pause_durations.append(pause_duration) | |
| return { | |
| "pause_count": len(pause_durations), | |
| "average_pause_duration": ( | |
| float(np.mean(pause_durations)) if pause_durations else 0.0 | |
| ), | |
| "total_pause_time": float(sum(pause_durations)), | |
| "pause_distribution": { | |
| "short": sum(1 for p in pause_durations if p < 0.5), | |
| "medium": sum(1 for p in pause_durations if 0.5 <= p < 1.0), | |
| "long": sum(1 for p in pause_durations if p >= 1.0), | |
| }, | |
| } | |
| except Exception as e: | |
| logger.error(f"Pause analysis failed: {e}", exc_info=True) | |
| return { | |
| "pause_count": 0, | |
| "average_pause_duration": 0.0, | |
| "total_pause_time": 0.0, | |
| "pause_distribution": {}, | |
| } | |
| async def _analyze_speaker_characteristics( | |
| self, audio: np.ndarray | |
| ) -> Dict[str, Any]: | |
| """Analyze speaker characteristics""" | |
| try: | |
| characteristics = {} | |
| f0 = librosa.yin(audio, fmin=50, fmax=400, sr=self.config.sample_rate) | |
| f0_clean = f0[f0 > 0] | |
| if len(f0_clean) > 0: | |
| characteristics["average_pitch"] = float(np.mean(f0_clean)) | |
| characteristics["pitch_range"] = float( | |
| np.max(f0_clean) - np.min(f0_clean) | |
| ) | |
| characteristics["pitch_variance"] = float(np.var(f0_clean)) | |
| mfccs = librosa.feature.mfcc(y=audio, sr=self.config.sample_rate, n_mfcc=13) | |
| characteristics["vocal_tract_length"] = ( | |
| float(np.mean(mfccs[1:4])) if mfccs.size > 0 else 0.0 | |
| ) | |
| spectral_centroid = librosa.feature.spectral_centroid( | |
| y=audio, sr=self.config.sample_rate | |
| ) | |
| characteristics["brightness"] = ( | |
| float(np.mean(spectral_centroid)) if spectral_centroid.size > 0 else 0.0 | |
| ) | |
| zero_crossing_rate = librosa.feature.zero_crossing_rate(audio) | |
| characteristics["roughness"] = ( | |
| float(np.mean(zero_crossing_rate)) | |
| if zero_crossing_rate.size > 0 | |
| else 0.0 | |
| ) | |
| return characteristics | |
| except Exception as e: | |
| logger.error(f"Speaker characteristics analysis failed: {e}", exc_info=True) | |
| return {} | |
| class TTSEngine: | |
| """Text-to-Speech with emotion and voice cloning""" | |
| def __init__(self, config: TTSConfig): | |
| self.config = config | |
| self.tts_processors = {} | |
| self.tts_models = {} | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # TTS model configurations | |
| self.model_configs = { | |
| "en": "facebook/mms-tts-eng", | |
| "fr": "facebook/mms-tts-fra", | |
| "rw": "facebook/mms-tts-kin", | |
| } | |
| async def initialize(self): | |
| """Initialize TTS models""" | |
| logger.info("Initializing TTS Engine...") | |
| try: | |
| for lang, model_path in self.model_configs.items(): | |
| logger.info( | |
| f"Loading MMS-TTS model: {model_path} for {lang} on {self.device}" | |
| ) | |
| processor = AutoProcessor.from_pretrained(model_path) | |
| model = VitsModel.from_pretrained(model_path) | |
| if hasattr(model, "to"): | |
| model = model.to(self.device) # type: ignore | |
| self.tts_processors[lang] = processor | |
| self.tts_models[lang] = model | |
| logger.info(f"{lang} TTS model loaded.") | |
| logger.info("TTS Engine initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize TTS Engine: {e}", exc_info=True) | |
| raise | |
| async def synthesize_speech( | |
| self, | |
| text: str, | |
| language: str = "en", | |
| emotion_context: Optional[Dict[str, float]] = None, | |
| ) -> bytes: | |
| """Synthesize speech with advanced features""" | |
| try: | |
| # Select appropriate model | |
| model_key = language if language in self.tts_models else "en" | |
| # Preprocess text | |
| processed_text = await self._preprocess_text(text, language) | |
| # Generate speech | |
| audio = await self._mms_tts_synthesize(processed_text, model_key) | |
| # Post-process audio | |
| enhanced_audio = await self._postprocess_audio(audio, emotion_context) | |
| # Convert to bytes | |
| audio_bytes = self._audio_to_bytes(enhanced_audio) | |
| return audio_bytes | |
| except Exception as e: | |
| logger.error(f"Speech synthesis failed: {e}", exc_info=True) | |
| raise | |
| async def _preprocess_text(self, text: str, language: str) -> str: | |
| """Preprocess text for better TTS output""" | |
| processed_text = text.strip() | |
| if language == "rw": | |
| processed_text = self._normalize_kinyarwanda_text(processed_text) | |
| elif language == "en": | |
| processed_text = self._normalize_english_text(processed_text) | |
| elif language == "fr": | |
| processed_text = self._normalize_french_text(processed_text) | |
| processed_text = self._add_prosodic_markers(processed_text) | |
| return processed_text | |
| def _normalize_kinyarwanda_text(self, text: str) -> str: | |
| """Normalize Kinyarwanda text for TTS (Placeholder)""" | |
| return text | |
| def _normalize_english_text(self, text: str) -> str: | |
| """Normalize English text for TTS""" | |
| import re | |
| abbreviations = { | |
| "Dr.": "Doctor", | |
| "Mr.": "Mister", | |
| "Mrs.": "Missus", | |
| "Ms.": "Miss", | |
| "Prof.": "Professor", | |
| "etc.": "etcetera", | |
| "vs.": "versus", | |
| } | |
| for abbr, expansion in abbreviations.items(): | |
| text = text.replace(abbr, expansion) | |
| text = re.sub( | |
| r"\b(\d+)\b", lambda m: self._number_to_words(int(m.group(1))), text | |
| ) | |
| return text | |
| def _normalize_french_text(self, text: str) -> str: | |
| """Normalize French text for TTS (Placeholder)""" | |
| return text | |
| def _number_to_words(self, number: int) -> str: | |
| """Convert numbers to words (simplified placeholder)""" | |
| if number == 0: | |
| return "zero" | |
| if number == 1: | |
| return "one" | |
| if number == 2: | |
| return "two" | |
| return str(number) | |
| def _add_prosodic_markers(self, text: str) -> str: | |
| """Add prosodic markers for better speech synthesis""" | |
| text = text.replace(".", ". <break time='0.5s'/>") | |
| text = text.replace(",", ", <break time='0.2s'/>") | |
| text = text.replace("?", "? <break time='0.7s'/>") | |
| text = text.replace("!", "! <break time='0.7s'/>") | |
| return text | |
| async def _mms_tts_synthesize(self, text: str, model_key: str) -> np.ndarray: | |
| """Synthesize using MMS-TTS model""" | |
| try: | |
| processor = self.tts_processors[model_key] | |
| model = self.tts_models[model_key] | |
| inputs = processor(text=text, return_tensors="pt").to(self.device) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| audio = outputs.waveform[0].cpu().numpy() | |
| return audio | |
| except Exception as e: | |
| logger.error(f"MMS-TTS synthesis failed: {e}", exc_info=True) | |
| raise | |
| async def _postprocess_audio( | |
| self, audio: np.ndarray, emotion_context: Optional[Dict[str, float]] = None | |
| ) -> np.ndarray: | |
| """Post-process synthesized audio""" | |
| if emotion_context: | |
| audio = self._apply_emotional_processing(audio, emotion_context) | |
| audio = self._enhance_synthesized_audio(audio) | |
| audio = audio / np.max(np.abs(audio)) # Final normalization | |
| return audio | |
| def _apply_emotional_processing( | |
| self, audio: np.ndarray, emotion_context: Dict[str, float] | |
| ) -> np.ndarray: | |
| """Apply emotion-based audio processing""" | |
| dominant_emotion_item = max(emotion_context.items(), key=lambda x: x[1]) | |
| emotion, intensity = dominant_emotion_item | |
| sr_for_pitch_shift = 16000 | |
| if emotion == "happy" and intensity > 0.5: | |
| audio = librosa.effects.pitch_shift(audio, sr=sr_for_pitch_shift, n_steps=1) | |
| elif emotion == "sad" and intensity > 0.5: | |
| audio = librosa.effects.pitch_shift( | |
| audio, sr=sr_for_pitch_shift, n_steps=-1 | |
| ) | |
| elif emotion == "angry" and intensity > 0.5: | |
| audio = np.tanh(audio * 1.2) # Simple distortion | |
| return audio | |
| def _enhance_synthesized_audio(self, audio: np.ndarray) -> np.ndarray: | |
| """Enhance synthesized audio quality""" | |
| audio_segment = AudioSegment( | |
| audio.tobytes(), | |
| frame_rate=16000, | |
| sample_width=audio.dtype.itemsize, | |
| channels=1, | |
| ) | |
| normalized = normalize(audio_segment) | |
| compressed = compress_dynamic_range(normalized, threshold=-15.0, ratio=2.0) | |
| enhanced_audio = np.array(compressed.get_array_of_samples(), dtype=np.float32) | |
| enhanced_audio = enhanced_audio / np.max(np.abs(enhanced_audio)) | |
| return enhanced_audio | |
| def _audio_to_bytes(self, audio: np.ndarray, format: str = "mp3") -> bytes: | |
| """Convert audio array to bytes""" | |
| audio_16bit = (audio * 32767).astype(np.int16) | |
| audio_segment = AudioSegment( | |
| audio_16bit.tobytes(), frame_rate=16000, sample_width=2, channels=1 | |
| ) | |
| buffer = io.BytesIO() | |
| audio_segment.export(buffer, format=format) | |
| return buffer.getvalue() | |
| class VoiceProcessor: | |
| """Main voice processor orchestrating ASR and TTS""" | |
| def __init__( | |
| self, | |
| cache_manager: CacheManager, | |
| model_orchestrator: ProductionModelOrchestrator, | |
| monitoring_engine: ProductionMonitoringEngine, | |
| ): | |
| self.cache_manager = cache_manager | |
| self.model_orchestrator = model_orchestrator | |
| self.monitoring_engine = monitoring_engine | |
| self.audio_config = AudioConfig() | |
| self.tts_config = TTSConfig() | |
| self.asr_engine = ASREngine(self.audio_config) | |
| self.tts_engine = TTSEngine(self.tts_config) | |
| self.is_initialized = False | |
| async def initialize(self): | |
| """Initialize the sophisticated voice processor""" | |
| logger.info("Initializing Sophisticated Voice Processor...") | |
| try: | |
| # Initialize ASR engine | |
| await self.asr_engine.initialize() | |
| # Initialize TTS engine | |
| await self.tts_engine.initialize() | |
| self.is_initialized = True | |
| logger.info("Sophisticated Voice Processor initialized successfully") | |
| except Exception as e: | |
| logger.error( | |
| f"Failed to initialize Sophisticated Voice Processor: {e}", | |
| exc_info=True, | |
| ) | |
| raise | |
| async def enhanced_transcribe_audio( | |
| self, | |
| audio_data: bytes, | |
| expected_language: str = "auto", | |
| session_id: Optional[str] = None, | |
| ) -> VoiceAnalysis: | |
| """Enhanced audio transcription with comprehensive analysis""" | |
| if not self.is_initialized: | |
| raise RuntimeError("Voice Processor not initialized") | |
| try: | |
| # Check cache first | |
| audio_hash = hashlib.md5(audio_data).hexdigest() | |
| cached_result = await self.cache_manager.get_transcription_cache(audio_hash) | |
| if cached_result: | |
| logger.info("Returning cached transcription.", audio_hash=audio_hash, session_id=session_id) | |
| return VoiceAnalysis(**cached_result) | |
| # Perform transcription | |
| voice_analysis = await self.asr_engine.transcribe_audio(audio_data, expected_language) | |
| # Prepare result | |
| result_dict = asdict(voice_analysis) | |
| # Cache result | |
| await self.cache_manager.cache_transcription(audio_hash, result_dict) | |
| # Log metrics | |
| await self.monitoring_engine.log_query_metrics( | |
| { | |
| "session_id": session_id, | |
| "transcription_confidence": voice_analysis.confidence_score, | |
| "detected_language": voice_analysis.detected_language, | |
| "audio_duration": len(audio_data) | |
| / ( | |
| self.audio_config.sample_rate | |
| * (self.audio_config.bit_depth / 8) | |
| ), | |
| "audio_quality": voice_analysis.audio_quality.get("overall", 0.5), | |
| } | |
| ) | |
| return voice_analysis | |
| except Exception as e: | |
| logger.error(f"Enhanced transcription failed: {e}", exc_info=True) | |
| raise | |
| async def enhanced_text_to_speech( | |
| self, | |
| text: str, | |
| language: str = "en", | |
| session_context: Optional[Dict[str, Any]] = None, | |
| emotion_context: Optional[Dict[str, float]] = None, | |
| ) -> bytes: | |
| """Enhanced text-to-speech with contextual adaptation""" | |
| if not self.is_initialized: | |
| raise RuntimeError("Voice Processor not initialized") | |
| try: | |
| # Check cache first | |
| emotion_key = ( | |
| "_".join(f"{k}{v:.2f}" for k, v in sorted(emotion_context.items())) | |
| if emotion_context | |
| else "neutral" | |
| ) | |
| tts_key = f"{text}_{language}_{emotion_key}" | |
| tts_hash = hashlib.md5(tts_key.encode()).hexdigest() | |
| cached_audio = await self.cache_manager.get_tts_cache(tts_hash) | |
| if cached_audio: | |
| return cached_audio | |
| # Synthesize speech | |
| audio_bytes = await self.tts_engine.synthesize_speech( | |
| text=text, language=language, emotion_context=emotion_context | |
| ) | |
| # Cache result | |
| await self.cache_manager.cache_tts(tts_hash, audio_bytes) | |
| return audio_bytes | |
| except Exception as e: | |
| logger.error(f"Enhanced TTS failed: {e}", exc_info=True) | |
| raise | |
| async def health_check(self) -> bool: | |
| """Check health of voice processor components""" | |
| try: | |
| if not self.is_initialized: | |
| logger.warning("Voice Processor is not initialized.") | |
| return False | |
| # Check ASR models | |
| if not self.asr_engine.asr_models: | |
| logger.error("ASR models not loaded.") | |
| return False | |
| # Check TTS models | |
| if not self.tts_engine.tts_models: | |
| logger.error("TTS models not loaded.") | |
| return False | |
| # Check dependencies health | |
| if not await self.cache_manager.health_check(): | |
| return False | |
| if not await self.model_orchestrator.health_check(): | |
| return False | |
| if not await self.monitoring_engine.health_check(): | |
| return False | |
| logger.info("Voice Processor health check passed.") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Voice processor health check failed: {e}", exc_info=True) | |
| return False | |