""" Audio Feature Extractor - IMPROVED VERSION Extracts 14 voice features from audio to detect busy/distracted states. KEY IMPROVEMENTS: 1. HNR instead of SNR - Better for voice recordings (not affected by recording noise) 2. Smarter noise classification using multiple spectral features 3. Removed useless latency feature (t9_latency) from consideration """ import numpy as np import librosa import soundfile as sf from scipy import signal from typing import Dict, Tuple, List import noisereduce as nr import torch import warnings try: from .emotion_features import EmotionFeatureExtractor except ImportError: from emotion_features import EmotionFeatureExtractor warnings.filterwarnings("ignore") class AudioFeatureExtractor: """Extract 14 audio features for busy detection (Enhanced with Silero VAD)""" _vad_model_cache = None _vad_utils_cache = None _emotion_extractor_cache = None def __init__(self, sample_rate: int = 16000, use_emotion: bool = True, config: Dict = None, emotion_models_dir: str = None): self.config = config or {} self.sample_rate = self.config.get('audio_sample_rate', sample_rate) self.vad_sample_rate = self.config.get('vad_sample_rate', self.sample_rate) self.use_emotion = use_emotion and (not self.config.get('skip_emotion_features', False)) self.skip_noise_reduction = bool(self.config.get('skip_noise_reduction', False)) self.audio_duration_limit = self.config.get('audio_duration_limit', None) self.emotion_models_dir = emotion_models_dir print("Loading Silero VAD...") try: if AudioFeatureExtractor._vad_model_cache is None: AudioFeatureExtractor._vad_model_cache, AudioFeatureExtractor._vad_utils_cache = torch.hub.load( repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=False, trust_repo=True ) self.vad_model = AudioFeatureExtractor._vad_model_cache utils = AudioFeatureExtractor._vad_utils_cache self.get_speech_timestamps = utils[0] print("[OK] Silero VAD loaded (cached)") except Exception as e: print(f"[WARN] Failed to load Silero VAD: {e}. Fallback to energy VAD might be needed.") self.vad_model = None if self.use_emotion: print("Loading Emotion CNN...") try: if AudioFeatureExtractor._emotion_extractor_cache is None: # Pass models dir to extractor AudioFeatureExtractor._emotion_extractor_cache = EmotionFeatureExtractor(models_dir=self.emotion_models_dir) self.emotion_extractor = AudioFeatureExtractor._emotion_extractor_cache print("[OK] Emotion CNN loaded (cached)") except Exception as e: print(f"[WARN] Emotion features disabled: {e}") self.emotion_extractor = None self.use_emotion = False else: self.emotion_extractor = None def _prepare_vad_audio(self, audio: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: """Prepare audio for VAD and return speech timestamps.""" if self.vad_model is None or len(audio) < 512: return audio, [] audio_vad = audio if self.vad_sample_rate != self.sample_rate: try: audio_vad = librosa.resample(audio, orig_sr=self.sample_rate, target_sr=self.vad_sample_rate) except Exception: audio_vad = audio wav = torch.tensor(audio_vad, dtype=torch.float32).unsqueeze(0) try: speech_dict = self.get_speech_timestamps(wav, self.vad_model, sampling_rate=self.vad_sample_rate) except Exception: speech_dict = [] return audio_vad, speech_dict def _split_speech_pause(self, audio: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]: """Return speech audio, pause audio, and the sample rate used for VAD.""" if self.vad_model is None: return audio, np.array([], dtype=audio.dtype), self.sample_rate audio_vad, speech_dict = self._prepare_vad_audio(audio) if not speech_dict: return np.array([], dtype=audio_vad.dtype), audio_vad, self.vad_sample_rate mask = np.zeros(len(audio_vad), dtype=bool) for seg in speech_dict: start = max(0, int(seg.get('start', 0))) end = min(len(audio_vad), int(seg.get('end', 0))) if end > start: mask[start:end] = True speech_audio = audio_vad[mask] pause_audio = audio_vad[~mask] return speech_audio, pause_audio, self.vad_sample_rate def load_audio(self, audio_path: str) -> np.ndarray: """Load and preprocess audio file""" audio, sr = librosa.load( audio_path, sr=self.sample_rate, mono=True, duration=self.audio_duration_limit ) return audio def extract_snr(self, audio: np.ndarray) -> float: """ V1: Signal-to-Noise Ratio (SNR) Signal power is calculated only during speech; noise power only during pauses. """ if len(audio) == 0 or len(audio) < 2048: return 15.0 # Neutral default try: speech_audio, pause_audio, _ = self._split_speech_pause(audio) if len(speech_audio) == 0: return 0.0 signal_power = float(np.mean(speech_audio ** 2)) if signal_power <= 0: return 0.0 if len(pause_audio) > 0: noise_power = float(np.mean(pause_audio ** 2)) else: noise_power = 1e-8 if noise_power <= 0: noise_power = 1e-8 snr_db = 10.0 * np.log10(signal_power / noise_power) return float(np.clip(snr_db, -10.0, 40.0)) except Exception as e: print(f"SNR extraction failed: {e}") return 15.0 def extract_hnr(self, audio: np.ndarray) -> float: """ V1: Harmonics-to-Noise Ratio (HNR) Measures voice quality - higher = clearer voice IMPROVEMENT: HNR is better than SNR for voice because: - Not affected by recording equipment noise - Focuses on harmonic structure of speech - More robust to environmental noise Range: 0-30 dB (typical: 10-20 dB for clear speech) """ if len(audio) == 0 or len(audio) < 2048: return 15.0 # Neutral default try: # Method 1: Autocorrelation-based HNR (most accurate) frame_length = 2048 hop_length = 512 hnr_values = [] for i in range(0, len(audio) - frame_length, hop_length): frame = audio[i:i+frame_length] # Only process frames with enough energy energy = np.sum(frame ** 2) if energy < 0.001: continue # Autocorrelation autocorr = np.correlate(frame, frame, mode='full') autocorr = autocorr[len(autocorr)//2:] # Normalize if autocorr[0] > 0: autocorr = autocorr / autocorr[0] else: continue # Find fundamental frequency peak (skip first 20 samples = ~1250 Hz max) min_lag = int(self.sample_rate / 400) # Max 400 Hz max_lag = int(self.sample_rate / 75) # Min 75 Hz if max_lag >= len(autocorr): continue peak_idx = np.argmax(autocorr[min_lag:max_lag]) + min_lag if peak_idx > 0 and autocorr[peak_idx] > 0.3: # Minimum correlation threshold # HNR calculation periodic_power = autocorr[peak_idx] aperiodic_power = 1 - periodic_power if aperiodic_power > 0: hnr_db = 10 * np.log10(periodic_power / aperiodic_power) # Clip to realistic range hnr_db = np.clip(hnr_db, 0, 30) hnr_values.append(hnr_db) if len(hnr_values) > 0: # Return median (more robust than mean) return float(np.median(hnr_values)) # Method 2: Fallback using spectral flatness flatness = np.mean(librosa.feature.spectral_flatness(y=audio)) # Convert to HNR-like scale (inverted) hnr_proxy = (1 - np.clip(flatness, 0, 1)) * 25 return float(hnr_proxy) except Exception as e: print(f"HNR extraction failed: {e}") return 15.0 # Safe default def classify_noise_type(self, audio: np.ndarray) -> Dict[str, float]: """ V2: Background Noise Classification (one-hot encoded) IMPROVEMENT: Uses multiple spectral features for better accuracy: - Spectral centroid (frequency brightness) - Spectral rolloff (energy distribution) - Zero crossing rate (noisiness) - Low frequency energy (rumble) - High frequency energy (hiss) - Spectral contrast (texture) """ if len(audio) < 512: return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1} try: # Extract comprehensive spectral features S = np.abs(librosa.stft(audio)) if S.shape[1] == 0: return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1} # Feature 1: Spectral Centroid (brightness) - computed on pauses only pause_audio = None if self.vad_model is not None: _, pause_audio, vad_sr = self._split_speech_pause(audio) else: vad_sr = self.sample_rate if pause_audio is not None and len(pause_audio) >= 512: S_pause = np.abs(librosa.stft(pause_audio)) centroid = np.mean(librosa.feature.spectral_centroid(S=S_pause, sr=vad_sr)) else: centroid = np.mean(librosa.feature.spectral_centroid(S=S, sr=self.sample_rate)) # Feature 2: Spectral Rolloff (energy concentration) rolloff = np.mean(librosa.feature.spectral_rolloff(S=S, sr=self.sample_rate)) # Feature 3: Zero Crossing Rate zcr = np.mean(librosa.feature.zero_crossing_rate(audio)) # Feature 4: Low frequency energy (0-500 Hz) freqs = librosa.fft_frequencies(sr=self.sample_rate, n_fft=2048) low_freq_mask = freqs < 500 low_energy = np.mean(S[low_freq_mask, :]) if np.any(low_freq_mask) else 0 # Feature 5: High frequency energy (4000+ Hz) high_freq_mask = freqs > 4000 high_energy = np.mean(S[high_freq_mask, :]) if np.any(high_freq_mask) else 0 # Feature 6: Overall energy total_energy = np.mean(audio ** 2) # Feature 7: Spectral contrast (texture measure) contrast = np.mean(librosa.feature.spectral_contrast(S=S, sr=self.sample_rate)) # Score each noise type based on features scores = { 'traffic': 0.0, 'office': 0.0, 'crowd': 0.0, 'wind': 0.0, 'clean': 0.0 } # Traffic: Low frequency dominant + rumble + consistent if low_energy > 0.002 and centroid < 2000 and contrast < 20: scores['traffic'] = low_energy * 100 + (2500 - centroid) / 1000 # Office: Mid frequencies + keyboard clicks + air conditioning hum if 1500 < centroid < 3500 and 0.0005 < total_energy < 0.005: scores['office'] = (3500 - abs(centroid - 2500)) / 1000 + contrast / 30 # Crowd: High ZCR + varying spectrum + speech-like energy if zcr > 0.08 and total_energy > 0.003 and contrast > 15: scores['crowd'] = zcr * 10 + total_energy * 50 # Wind: Very high ZCR + high frequency energy + low contrast if zcr > 0.12 and high_energy > 0.001 and contrast < 15: scores['wind'] = zcr * 8 + high_energy * 100 # Clean: Low energy + low ZCR + high contrast (speech only) if total_energy < 0.005 and zcr < 0.08 and contrast > 20: scores['clean'] = (0.005 - total_energy) * 200 + contrast / 30 # If all scores are low, default to clean if max(scores.values()) < 0.1: scores['clean'] = 1.0 # Normalize to probabilities total = sum(scores.values()) if total > 0: scores = {k: v/total for k, v in scores.items()} else: scores['clean'] = 1.0 return scores except Exception as e: print(f"Noise classification failed: {e}") return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1} def extract_speech_rate(self, audio: np.ndarray, transcript: str) -> float: """V3: Speech Rate (words per second)""" if not transcript: return 0.0 word_count = len(transcript.split()) duration = len(audio) / self.sample_rate if duration == 0: return 0.0 return word_count / duration def extract_pitch_features(self, audio: np.ndarray) -> Tuple[float, float]: """V4-V5: Pitch Mean and Std""" try: if len(audio) < 2048: return 0.0, 0.0 # Use pyin (more robust than yin) f0, voiced_flag, voiced_probs = librosa.pyin( audio, fmin=librosa.note_to_hz('C2'), fmax=librosa.note_to_hz('C7'), sr=self.sample_rate ) # Only use voiced frames f0_voiced = f0[voiced_flag] if len(f0_voiced) == 0: return 0.0, 0.0 return float(np.mean(f0_voiced)), float(np.std(f0_voiced)) except Exception as e: print(f"Pitch extraction failed: {e}") return 0.0, 0.0 def extract_energy_features(self, audio: np.ndarray) -> Tuple[float, float]: """V6-V7: Energy Mean and Std""" try: rms = librosa.feature.rms(y=audio)[0] e_mean = float(np.mean(rms)) e_std = float(np.std(rms)) if e_mean > 0: e_std = e_std / e_mean else: e_std = 0.0 return e_mean, e_std except: return 0.0, 0.0 def extract_pause_features(self, audio: np.ndarray) -> Tuple[float, float, int]: """ V8-V10: Pause Ratio, Average Pause Duration, Mid-Pause Count Uses Silero VAD """ if self.vad_model is None or len(audio) < 512: return 0.0, 0.0, 0 try: audio_vad, speech_dict = self._prepare_vad_audio(audio) # Calculate speech duration speech_samples = sum(seg['end'] - seg['start'] for seg in speech_dict) total_samples = len(audio_vad) if total_samples == 0: return 0.0, 0.0, 0 # Pause Ratio pause_samples = total_samples - speech_samples pause_ratio = pause_samples / total_samples # Calculate gaps between speech segments gaps = [] if len(speech_dict) > 1: for i in range(len(speech_dict) - 1): gap = speech_dict[i+1]['start'] - speech_dict[i]['end'] if gap > 0: gaps.append(gap / self.vad_sample_rate) # Convert to seconds avg_pause_dur = float(np.mean(gaps)) if gaps else 0.0 # Mid-Pause Count (0.3s - 1.0s) mid_pause_cnt = sum(1 for g in gaps if 0.3 <= g <= 1.0) return float(pause_ratio), float(avg_pause_dur), int(mid_pause_cnt) except Exception as e: print(f"VAD Error: {e}") return 0.0, 0.0, 0 def extract_all(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]: """Extract all audio features (14 original + 3 emotion = 17 total)""" if audio.dtype != np.float32: audio = audio.astype(np.float32) features = {} # V1: SNR (speech-only signal vs pause-only noise) features['v1_snr'] = self.extract_snr(audio) # V2: Noise classification (IMPROVED) noise_class = self.classify_noise_type(audio) features['v2_noise_traffic'] = noise_class['traffic'] features['v2_noise_office'] = noise_class['office'] features['v2_noise_crowd'] = noise_class['crowd'] features['v2_noise_wind'] = noise_class['wind'] features['v2_noise_clean'] = noise_class['clean'] # V3: Speech rate features['v3_speech_rate'] = self.extract_speech_rate(audio, transcript) # V4-V5: Pitch p_mean, p_std = self.extract_pitch_features(audio) features['v4_pitch_mean'] = p_mean features['v5_pitch_std'] = p_std # V6-V7: Energy e_mean, e_std = self.extract_energy_features(audio) features['v6_energy_mean'] = e_mean features['v7_energy_std'] = e_std # V8-V10: Pause features pause_ratio, avg_pause, mid_pause_cnt = self.extract_pause_features(audio) features['v8_pause_ratio'] = pause_ratio features['v9_avg_pause_dur'] = avg_pause features['v10_mid_pause_cnt'] = float(mid_pause_cnt) # V11-V13: Emotion features if self.use_emotion and self.emotion_extractor is not None: try: emotion_features = self.emotion_extractor.extract_all(audio, self.sample_rate) features.update(emotion_features) except Exception as e: print(f"⚠ Emotion features skipped: {e}") # Add zero values for compatibility features['v11_emotion_stress'] = 0.0 features['v12_emotion_energy'] = 0.0 features['v13_emotion_valence'] = 0.0 return features def extract_basic(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]: """ Extract a minimal set of audio features for fast decisions. Uses only low-cost features. """ if audio.dtype != np.float32: audio = audio.astype(np.float32) features = {} features['v1_snr'] = self.extract_snr(audio) features['v3_speech_rate'] = self.extract_speech_rate(audio, transcript) e_mean, e_std = self.extract_energy_features(audio) features['v6_energy_mean'] = e_mean features['v7_energy_std'] = e_std pause_ratio, avg_pause, mid_pause_cnt = self.extract_pause_features(audio) features['v8_pause_ratio'] = pause_ratio features['v9_avg_pause_dur'] = avg_pause features['v10_mid_pause_cnt'] = float(mid_pause_cnt) return features if __name__ == "__main__": extractor = AudioFeatureExtractor() print("Audio Feature Extractor initialized successfully") print("Using HNR instead of SNR for better voice quality measurement")