File size: 20,531 Bytes
cedabd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c59ae07
 
 
 
 
cedabd5
 
 
563e76e
 
cedabd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
563e76e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cedabd5
 
 
 
 
 
 
563e76e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cedabd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
563e76e
cedabd5
 
 
 
 
 
 
 
 
 
 
563e76e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cedabd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
563e76e
 
 
 
 
 
 
 
 
 
 
 
 
cedabd5
563e76e
cedabd5
 
 
 
563e76e
 
 
 
 
 
 
 
 
 
 
 
cedabd5
 
 
 
 
 
 
 
 
 
 
563e76e
cedabd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
563e76e
 
cedabd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
563e76e
cedabd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c59ae07
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
"""

Audio Feature Extractor - IMPROVED VERSION

Extracts 14 voice features from audio to detect busy/distracted states.



KEY IMPROVEMENTS:

1. HNR instead of SNR - Better for voice recordings (not affected by recording noise)

2. Smarter noise classification using multiple spectral features

3. Removed useless latency feature (t9_latency) from consideration

"""

import numpy as np
import librosa
import soundfile as sf
from scipy import signal
from typing import Dict, Tuple, List
import noisereduce as nr
import torch
import warnings
try:
    from .emotion_features import EmotionFeatureExtractor
except ImportError:
    from emotion_features import EmotionFeatureExtractor

warnings.filterwarnings("ignore")

class AudioFeatureExtractor:
    """Extract 14 audio features for busy detection (Enhanced with Silero VAD)"""

    _vad_model_cache = None
    _vad_utils_cache = None
    _emotion_extractor_cache = None
    
    def __init__(self, sample_rate: int = 16000, use_emotion: bool = True, config: Dict = None, emotion_models_dir: str = None):
        self.config = config or {}
        self.sample_rate = self.config.get('audio_sample_rate', sample_rate)
        self.vad_sample_rate = self.config.get('vad_sample_rate', self.sample_rate)
        self.use_emotion = use_emotion and (not self.config.get('skip_emotion_features', False))
        self.skip_noise_reduction = bool(self.config.get('skip_noise_reduction', False))
        self.audio_duration_limit = self.config.get('audio_duration_limit', None)
        self.emotion_models_dir = emotion_models_dir

        print("Loading Silero VAD...")
        try:
            if AudioFeatureExtractor._vad_model_cache is None:
                AudioFeatureExtractor._vad_model_cache, AudioFeatureExtractor._vad_utils_cache = torch.hub.load(
                    repo_or_dir='snakers4/silero-vad',
                    model='silero_vad',
                    force_reload=False,
                    trust_repo=True
                )
            self.vad_model = AudioFeatureExtractor._vad_model_cache
            utils = AudioFeatureExtractor._vad_utils_cache
            self.get_speech_timestamps = utils[0]
            print("[OK] Silero VAD loaded (cached)")
        except Exception as e:
            print(f"[WARN] Failed to load Silero VAD: {e}. Fallback to energy VAD might be needed.")
            self.vad_model = None

        if self.use_emotion:
            print("Loading Emotion CNN...")
            try:
                if AudioFeatureExtractor._emotion_extractor_cache is None:
                    # Pass models dir to extractor
                    AudioFeatureExtractor._emotion_extractor_cache = EmotionFeatureExtractor(models_dir=self.emotion_models_dir)
                self.emotion_extractor = AudioFeatureExtractor._emotion_extractor_cache
                print("[OK] Emotion CNN loaded (cached)")
            except Exception as e:
                print(f"[WARN] Emotion features disabled: {e}")
                self.emotion_extractor = None
                self.use_emotion = False
        else:
            self.emotion_extractor = None

    def _prepare_vad_audio(self, audio: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
        """Prepare audio for VAD and return speech timestamps."""
        if self.vad_model is None or len(audio) < 512:
            return audio, []

        audio_vad = audio
        if self.vad_sample_rate != self.sample_rate:
            try:
                audio_vad = librosa.resample(audio, orig_sr=self.sample_rate, target_sr=self.vad_sample_rate)
            except Exception:
                audio_vad = audio

        wav = torch.tensor(audio_vad, dtype=torch.float32).unsqueeze(0)

        try:
            speech_dict = self.get_speech_timestamps(wav, self.vad_model, sampling_rate=self.vad_sample_rate)
        except Exception:
            speech_dict = []

        return audio_vad, speech_dict

    def _split_speech_pause(self, audio: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]:
        """Return speech audio, pause audio, and the sample rate used for VAD."""
        if self.vad_model is None:
            return audio, np.array([], dtype=audio.dtype), self.sample_rate

        audio_vad, speech_dict = self._prepare_vad_audio(audio)

        if not speech_dict:
            return np.array([], dtype=audio_vad.dtype), audio_vad, self.vad_sample_rate

        mask = np.zeros(len(audio_vad), dtype=bool)
        for seg in speech_dict:
            start = max(0, int(seg.get('start', 0)))
            end = min(len(audio_vad), int(seg.get('end', 0)))
            if end > start:
                mask[start:end] = True

        speech_audio = audio_vad[mask]
        pause_audio = audio_vad[~mask]
        return speech_audio, pause_audio, self.vad_sample_rate

    def load_audio(self, audio_path: str) -> np.ndarray:
        """Load and preprocess audio file"""
        audio, sr = librosa.load(
            audio_path,
            sr=self.sample_rate,
            mono=True,
            duration=self.audio_duration_limit
        )
        return audio
    
    def extract_snr(self, audio: np.ndarray) -> float:
        """
        V1: Signal-to-Noise Ratio (SNR)
        Signal power is calculated only during speech; noise power only during pauses.
        """
        if len(audio) == 0 or len(audio) < 2048:
            return 15.0  # Neutral default

        try:
            speech_audio, pause_audio, _ = self._split_speech_pause(audio)

            if len(speech_audio) == 0:
                return 0.0

            signal_power = float(np.mean(speech_audio ** 2))
            if signal_power <= 0:
                return 0.0

            if len(pause_audio) > 0:
                noise_power = float(np.mean(pause_audio ** 2))
            else:
                noise_power = 1e-8

            if noise_power <= 0:
                noise_power = 1e-8

            snr_db = 10.0 * np.log10(signal_power / noise_power)
            return float(np.clip(snr_db, -10.0, 40.0))
        except Exception as e:
            print(f"SNR extraction failed: {e}")
            return 15.0

    def extract_hnr(self, audio: np.ndarray) -> float:
        """

        V1: Harmonics-to-Noise Ratio (HNR)

        Measures voice quality - higher = clearer voice

        

        IMPROVEMENT: HNR is better than SNR for voice because:

        - Not affected by recording equipment noise

        - Focuses on harmonic structure of speech

        - More robust to environmental noise

        

        Range: 0-30 dB (typical: 10-20 dB for clear speech)

        """
        if len(audio) == 0 or len(audio) < 2048:
            return 15.0  # Neutral default
        
        try:
            # Method 1: Autocorrelation-based HNR (most accurate)
            frame_length = 2048
            hop_length = 512
            hnr_values = []
            
            for i in range(0, len(audio) - frame_length, hop_length):
                frame = audio[i:i+frame_length]
                
                # Only process frames with enough energy
                energy = np.sum(frame ** 2)
                if energy < 0.001:
                    continue
                
                # Autocorrelation
                autocorr = np.correlate(frame, frame, mode='full')
                autocorr = autocorr[len(autocorr)//2:]
                
                # Normalize
                if autocorr[0] > 0:
                    autocorr = autocorr / autocorr[0]
                else:
                    continue
                
                # Find fundamental frequency peak (skip first 20 samples = ~1250 Hz max)
                min_lag = int(self.sample_rate / 400)  # Max 400 Hz
                max_lag = int(self.sample_rate / 75)   # Min 75 Hz
                
                if max_lag >= len(autocorr):
                    continue
                
                peak_idx = np.argmax(autocorr[min_lag:max_lag]) + min_lag
                
                if peak_idx > 0 and autocorr[peak_idx] > 0.3:  # Minimum correlation threshold
                    # HNR calculation
                    periodic_power = autocorr[peak_idx]
                    aperiodic_power = 1 - periodic_power
                    
                    if aperiodic_power > 0:
                        hnr_db = 10 * np.log10(periodic_power / aperiodic_power)
                        # Clip to realistic range
                        hnr_db = np.clip(hnr_db, 0, 30)
                        hnr_values.append(hnr_db)
            
            if len(hnr_values) > 0:
                # Return median (more robust than mean)
                return float(np.median(hnr_values))
            
            # Method 2: Fallback using spectral flatness
            flatness = np.mean(librosa.feature.spectral_flatness(y=audio))
            # Convert to HNR-like scale (inverted)
            hnr_proxy = (1 - np.clip(flatness, 0, 1)) * 25
            return float(hnr_proxy)
                
        except Exception as e:
            print(f"HNR extraction failed: {e}")
            return 15.0  # Safe default
    
    def classify_noise_type(self, audio: np.ndarray) -> Dict[str, float]:
        """

        V2: Background Noise Classification (one-hot encoded)

        

        IMPROVEMENT: Uses multiple spectral features for better accuracy:

        - Spectral centroid (frequency brightness)

        - Spectral rolloff (energy distribution)

        - Zero crossing rate (noisiness)

        - Low frequency energy (rumble)

        - High frequency energy (hiss)

        - Spectral contrast (texture)

        """
        if len(audio) < 512:
            return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1}

        try:
            # Extract comprehensive spectral features
            S = np.abs(librosa.stft(audio))
            if S.shape[1] == 0:
                return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1}
            
            # Feature 1: Spectral Centroid (brightness) - computed on pauses only
            pause_audio = None
            if self.vad_model is not None:
                _, pause_audio, vad_sr = self._split_speech_pause(audio)
            else:
                vad_sr = self.sample_rate

            if pause_audio is not None and len(pause_audio) >= 512:
                S_pause = np.abs(librosa.stft(pause_audio))
                centroid = np.mean(librosa.feature.spectral_centroid(S=S_pause, sr=vad_sr))
            else:
                centroid = np.mean(librosa.feature.spectral_centroid(S=S, sr=self.sample_rate))
            
            # Feature 2: Spectral Rolloff (energy concentration)
            rolloff = np.mean(librosa.feature.spectral_rolloff(S=S, sr=self.sample_rate))
            
            # Feature 3: Zero Crossing Rate
            zcr = np.mean(librosa.feature.zero_crossing_rate(audio))
            
            # Feature 4: Low frequency energy (0-500 Hz)
            freqs = librosa.fft_frequencies(sr=self.sample_rate, n_fft=2048)
            low_freq_mask = freqs < 500
            low_energy = np.mean(S[low_freq_mask, :]) if np.any(low_freq_mask) else 0
            
            # Feature 5: High frequency energy (4000+ Hz)
            high_freq_mask = freqs > 4000
            high_energy = np.mean(S[high_freq_mask, :]) if np.any(high_freq_mask) else 0
            
            # Feature 6: Overall energy
            total_energy = np.mean(audio ** 2)
            
            # Feature 7: Spectral contrast (texture measure)
            contrast = np.mean(librosa.feature.spectral_contrast(S=S, sr=self.sample_rate))
            
            # Score each noise type based on features
            scores = {
                'traffic': 0.0,
                'office': 0.0,
                'crowd': 0.0,
                'wind': 0.0,
                'clean': 0.0
            }
            
            # Traffic: Low frequency dominant + rumble + consistent
            if low_energy > 0.002 and centroid < 2000 and contrast < 20:
                scores['traffic'] = low_energy * 100 + (2500 - centroid) / 1000
            
            # Office: Mid frequencies + keyboard clicks + air conditioning hum
            if 1500 < centroid < 3500 and 0.0005 < total_energy < 0.005:
                scores['office'] = (3500 - abs(centroid - 2500)) / 1000 + contrast / 30
            
            # Crowd: High ZCR + varying spectrum + speech-like energy
            if zcr > 0.08 and total_energy > 0.003 and contrast > 15:
                scores['crowd'] = zcr * 10 + total_energy * 50
            
            # Wind: Very high ZCR + high frequency energy + low contrast
            if zcr > 0.12 and high_energy > 0.001 and contrast < 15:
                scores['wind'] = zcr * 8 + high_energy * 100
            
            # Clean: Low energy + low ZCR + high contrast (speech only)
            if total_energy < 0.005 and zcr < 0.08 and contrast > 20:
                scores['clean'] = (0.005 - total_energy) * 200 + contrast / 30
            
            # If all scores are low, default to clean
            if max(scores.values()) < 0.1:
                scores['clean'] = 1.0
            
            # Normalize to probabilities
            total = sum(scores.values())
            if total > 0:
                scores = {k: v/total for k, v in scores.items()}
            else:
                scores['clean'] = 1.0
            
            return scores
            
        except Exception as e:
            print(f"Noise classification failed: {e}")
            return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1}
    
    def extract_speech_rate(self, audio: np.ndarray, transcript: str) -> float:
        """V3: Speech Rate (words per second)"""
        if not transcript:
            return 0.0
        
        word_count = len(transcript.split())
        duration = len(audio) / self.sample_rate
        
        if duration == 0:
            return 0.0
        
        return word_count / duration
    
    def extract_pitch_features(self, audio: np.ndarray) -> Tuple[float, float]:
        """V4-V5: Pitch Mean and Std"""
        try:
            if len(audio) < 2048:
                return 0.0, 0.0

            # Use pyin (more robust than yin)
            f0, voiced_flag, voiced_probs = librosa.pyin(
                audio,
                fmin=librosa.note_to_hz('C2'),
                fmax=librosa.note_to_hz('C7'),
                sr=self.sample_rate
            )
            
            # Only use voiced frames
            f0_voiced = f0[voiced_flag]
            
            if len(f0_voiced) == 0:
                return 0.0, 0.0
            
            return float(np.mean(f0_voiced)), float(np.std(f0_voiced))
        except Exception as e:
            print(f"Pitch extraction failed: {e}")
            return 0.0, 0.0
    
    def extract_energy_features(self, audio: np.ndarray) -> Tuple[float, float]:
        """V6-V7: Energy Mean and Std"""
        try:
            rms = librosa.feature.rms(y=audio)[0]
            e_mean = float(np.mean(rms))
            e_std = float(np.std(rms))
            if e_mean > 0:
                e_std = e_std / e_mean
            else:
                e_std = 0.0
            return e_mean, e_std
        except:
            return 0.0, 0.0
    
    def extract_pause_features(self, audio: np.ndarray) -> Tuple[float, float, int]:
        """

        V8-V10: Pause Ratio, Average Pause Duration, Mid-Pause Count

        Uses Silero VAD

        """
        if self.vad_model is None or len(audio) < 512:
            return 0.0, 0.0, 0
        
        try:
            audio_vad, speech_dict = self._prepare_vad_audio(audio)
            
            # Calculate speech duration
            speech_samples = sum(seg['end'] - seg['start'] for seg in speech_dict)
            total_samples = len(audio_vad)
            
            if total_samples == 0:
                return 0.0, 0.0, 0
            
            # Pause Ratio
            pause_samples = total_samples - speech_samples
            pause_ratio = pause_samples / total_samples
            
            # Calculate gaps between speech segments
            gaps = []
            if len(speech_dict) > 1:
                for i in range(len(speech_dict) - 1):
                    gap = speech_dict[i+1]['start'] - speech_dict[i]['end']
                    if gap > 0:
                        gaps.append(gap / self.vad_sample_rate)  # Convert to seconds
            
            avg_pause_dur = float(np.mean(gaps)) if gaps else 0.0
            
            # Mid-Pause Count (0.3s - 1.0s)
            mid_pause_cnt = sum(1 for g in gaps if 0.3 <= g <= 1.0)
            
            return float(pause_ratio), float(avg_pause_dur), int(mid_pause_cnt)
            
        except Exception as e:
            print(f"VAD Error: {e}")
            return 0.0, 0.0, 0

    def extract_all(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]:
        """Extract all audio features (14 original + 3 emotion = 17 total)"""
        
        if audio.dtype != np.float32:
            audio = audio.astype(np.float32)
        
        features = {}
        
        # V1: SNR (speech-only signal vs pause-only noise)
        features['v1_snr'] = self.extract_snr(audio)
        
        # V2: Noise classification (IMPROVED)
        noise_class = self.classify_noise_type(audio)
        features['v2_noise_traffic'] = noise_class['traffic']
        features['v2_noise_office'] = noise_class['office']
        features['v2_noise_crowd'] = noise_class['crowd']
        features['v2_noise_wind'] = noise_class['wind']
        features['v2_noise_clean'] = noise_class['clean']
        
        # V3: Speech rate
        features['v3_speech_rate'] = self.extract_speech_rate(audio, transcript)
        
        # V4-V5: Pitch
        p_mean, p_std = self.extract_pitch_features(audio)
        features['v4_pitch_mean'] = p_mean
        features['v5_pitch_std'] = p_std
        
        # V6-V7: Energy
        e_mean, e_std = self.extract_energy_features(audio)
        features['v6_energy_mean'] = e_mean
        features['v7_energy_std'] = e_std
        
        # V8-V10: Pause features
        pause_ratio, avg_pause, mid_pause_cnt = self.extract_pause_features(audio)
        features['v8_pause_ratio'] = pause_ratio
        features['v9_avg_pause_dur'] = avg_pause
        features['v10_mid_pause_cnt'] = float(mid_pause_cnt)
        
        # V11-V13: Emotion features
        if self.use_emotion and self.emotion_extractor is not None:
            try:
                emotion_features = self.emotion_extractor.extract_all(audio, self.sample_rate)
                features.update(emotion_features)
            except Exception as e:
                print(f"⚠ Emotion features skipped: {e}")
                # Add zero values for compatibility
                features['v11_emotion_stress'] = 0.0
                features['v12_emotion_energy'] = 0.0
                features['v13_emotion_valence'] = 0.0
        
        return features

    def extract_basic(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]:
        """

        Extract a minimal set of audio features for fast decisions.

        Uses only low-cost features.

        """
        if audio.dtype != np.float32:
            audio = audio.astype(np.float32)

        features = {}
        features['v1_snr'] = self.extract_snr(audio)
        features['v3_speech_rate'] = self.extract_speech_rate(audio, transcript)

        e_mean, e_std = self.extract_energy_features(audio)
        features['v6_energy_mean'] = e_mean
        features['v7_energy_std'] = e_std

        pause_ratio, avg_pause, mid_pause_cnt = self.extract_pause_features(audio)
        features['v8_pause_ratio'] = pause_ratio
        features['v9_avg_pause_dur'] = avg_pause
        features['v10_mid_pause_cnt'] = float(mid_pause_cnt)

        return features


if __name__ == "__main__":
    extractor = AudioFeatureExtractor()
    print("Audio Feature Extractor initialized successfully")
    print("Using HNR instead of SNR for better voice quality measurement")