busy-module-audio / audio_features.py
EurekaPotato's picture
Upload folder using huggingface_hub
563e76e verified
"""
Audio Feature Extractor - IMPROVED VERSION
Extracts 14 voice features from audio to detect busy/distracted states.
KEY IMPROVEMENTS:
1. HNR instead of SNR - Better for voice recordings (not affected by recording noise)
2. Smarter noise classification using multiple spectral features
3. Removed useless latency feature (t9_latency) from consideration
"""
import numpy as np
import librosa
import soundfile as sf
from scipy import signal
from typing import Dict, Tuple, List
import noisereduce as nr
import torch
import warnings
try:
from .emotion_features import EmotionFeatureExtractor
except ImportError:
from emotion_features import EmotionFeatureExtractor
warnings.filterwarnings("ignore")
class AudioFeatureExtractor:
"""Extract 14 audio features for busy detection (Enhanced with Silero VAD)"""
_vad_model_cache = None
_vad_utils_cache = None
_emotion_extractor_cache = None
def __init__(self, sample_rate: int = 16000, use_emotion: bool = True, config: Dict = None, emotion_models_dir: str = None):
self.config = config or {}
self.sample_rate = self.config.get('audio_sample_rate', sample_rate)
self.vad_sample_rate = self.config.get('vad_sample_rate', self.sample_rate)
self.use_emotion = use_emotion and (not self.config.get('skip_emotion_features', False))
self.skip_noise_reduction = bool(self.config.get('skip_noise_reduction', False))
self.audio_duration_limit = self.config.get('audio_duration_limit', None)
self.emotion_models_dir = emotion_models_dir
print("Loading Silero VAD...")
try:
if AudioFeatureExtractor._vad_model_cache is None:
AudioFeatureExtractor._vad_model_cache, AudioFeatureExtractor._vad_utils_cache = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False,
trust_repo=True
)
self.vad_model = AudioFeatureExtractor._vad_model_cache
utils = AudioFeatureExtractor._vad_utils_cache
self.get_speech_timestamps = utils[0]
print("[OK] Silero VAD loaded (cached)")
except Exception as e:
print(f"[WARN] Failed to load Silero VAD: {e}. Fallback to energy VAD might be needed.")
self.vad_model = None
if self.use_emotion:
print("Loading Emotion CNN...")
try:
if AudioFeatureExtractor._emotion_extractor_cache is None:
# Pass models dir to extractor
AudioFeatureExtractor._emotion_extractor_cache = EmotionFeatureExtractor(models_dir=self.emotion_models_dir)
self.emotion_extractor = AudioFeatureExtractor._emotion_extractor_cache
print("[OK] Emotion CNN loaded (cached)")
except Exception as e:
print(f"[WARN] Emotion features disabled: {e}")
self.emotion_extractor = None
self.use_emotion = False
else:
self.emotion_extractor = None
def _prepare_vad_audio(self, audio: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""Prepare audio for VAD and return speech timestamps."""
if self.vad_model is None or len(audio) < 512:
return audio, []
audio_vad = audio
if self.vad_sample_rate != self.sample_rate:
try:
audio_vad = librosa.resample(audio, orig_sr=self.sample_rate, target_sr=self.vad_sample_rate)
except Exception:
audio_vad = audio
wav = torch.tensor(audio_vad, dtype=torch.float32).unsqueeze(0)
try:
speech_dict = self.get_speech_timestamps(wav, self.vad_model, sampling_rate=self.vad_sample_rate)
except Exception:
speech_dict = []
return audio_vad, speech_dict
def _split_speech_pause(self, audio: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]:
"""Return speech audio, pause audio, and the sample rate used for VAD."""
if self.vad_model is None:
return audio, np.array([], dtype=audio.dtype), self.sample_rate
audio_vad, speech_dict = self._prepare_vad_audio(audio)
if not speech_dict:
return np.array([], dtype=audio_vad.dtype), audio_vad, self.vad_sample_rate
mask = np.zeros(len(audio_vad), dtype=bool)
for seg in speech_dict:
start = max(0, int(seg.get('start', 0)))
end = min(len(audio_vad), int(seg.get('end', 0)))
if end > start:
mask[start:end] = True
speech_audio = audio_vad[mask]
pause_audio = audio_vad[~mask]
return speech_audio, pause_audio, self.vad_sample_rate
def load_audio(self, audio_path: str) -> np.ndarray:
"""Load and preprocess audio file"""
audio, sr = librosa.load(
audio_path,
sr=self.sample_rate,
mono=True,
duration=self.audio_duration_limit
)
return audio
def extract_snr(self, audio: np.ndarray) -> float:
"""
V1: Signal-to-Noise Ratio (SNR)
Signal power is calculated only during speech; noise power only during pauses.
"""
if len(audio) == 0 or len(audio) < 2048:
return 15.0 # Neutral default
try:
speech_audio, pause_audio, _ = self._split_speech_pause(audio)
if len(speech_audio) == 0:
return 0.0
signal_power = float(np.mean(speech_audio ** 2))
if signal_power <= 0:
return 0.0
if len(pause_audio) > 0:
noise_power = float(np.mean(pause_audio ** 2))
else:
noise_power = 1e-8
if noise_power <= 0:
noise_power = 1e-8
snr_db = 10.0 * np.log10(signal_power / noise_power)
return float(np.clip(snr_db, -10.0, 40.0))
except Exception as e:
print(f"SNR extraction failed: {e}")
return 15.0
def extract_hnr(self, audio: np.ndarray) -> float:
"""
V1: Harmonics-to-Noise Ratio (HNR)
Measures voice quality - higher = clearer voice
IMPROVEMENT: HNR is better than SNR for voice because:
- Not affected by recording equipment noise
- Focuses on harmonic structure of speech
- More robust to environmental noise
Range: 0-30 dB (typical: 10-20 dB for clear speech)
"""
if len(audio) == 0 or len(audio) < 2048:
return 15.0 # Neutral default
try:
# Method 1: Autocorrelation-based HNR (most accurate)
frame_length = 2048
hop_length = 512
hnr_values = []
for i in range(0, len(audio) - frame_length, hop_length):
frame = audio[i:i+frame_length]
# Only process frames with enough energy
energy = np.sum(frame ** 2)
if energy < 0.001:
continue
# Autocorrelation
autocorr = np.correlate(frame, frame, mode='full')
autocorr = autocorr[len(autocorr)//2:]
# Normalize
if autocorr[0] > 0:
autocorr = autocorr / autocorr[0]
else:
continue
# Find fundamental frequency peak (skip first 20 samples = ~1250 Hz max)
min_lag = int(self.sample_rate / 400) # Max 400 Hz
max_lag = int(self.sample_rate / 75) # Min 75 Hz
if max_lag >= len(autocorr):
continue
peak_idx = np.argmax(autocorr[min_lag:max_lag]) + min_lag
if peak_idx > 0 and autocorr[peak_idx] > 0.3: # Minimum correlation threshold
# HNR calculation
periodic_power = autocorr[peak_idx]
aperiodic_power = 1 - periodic_power
if aperiodic_power > 0:
hnr_db = 10 * np.log10(periodic_power / aperiodic_power)
# Clip to realistic range
hnr_db = np.clip(hnr_db, 0, 30)
hnr_values.append(hnr_db)
if len(hnr_values) > 0:
# Return median (more robust than mean)
return float(np.median(hnr_values))
# Method 2: Fallback using spectral flatness
flatness = np.mean(librosa.feature.spectral_flatness(y=audio))
# Convert to HNR-like scale (inverted)
hnr_proxy = (1 - np.clip(flatness, 0, 1)) * 25
return float(hnr_proxy)
except Exception as e:
print(f"HNR extraction failed: {e}")
return 15.0 # Safe default
def classify_noise_type(self, audio: np.ndarray) -> Dict[str, float]:
"""
V2: Background Noise Classification (one-hot encoded)
IMPROVEMENT: Uses multiple spectral features for better accuracy:
- Spectral centroid (frequency brightness)
- Spectral rolloff (energy distribution)
- Zero crossing rate (noisiness)
- Low frequency energy (rumble)
- High frequency energy (hiss)
- Spectral contrast (texture)
"""
if len(audio) < 512:
return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1}
try:
# Extract comprehensive spectral features
S = np.abs(librosa.stft(audio))
if S.shape[1] == 0:
return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1}
# Feature 1: Spectral Centroid (brightness) - computed on pauses only
pause_audio = None
if self.vad_model is not None:
_, pause_audio, vad_sr = self._split_speech_pause(audio)
else:
vad_sr = self.sample_rate
if pause_audio is not None and len(pause_audio) >= 512:
S_pause = np.abs(librosa.stft(pause_audio))
centroid = np.mean(librosa.feature.spectral_centroid(S=S_pause, sr=vad_sr))
else:
centroid = np.mean(librosa.feature.spectral_centroid(S=S, sr=self.sample_rate))
# Feature 2: Spectral Rolloff (energy concentration)
rolloff = np.mean(librosa.feature.spectral_rolloff(S=S, sr=self.sample_rate))
# Feature 3: Zero Crossing Rate
zcr = np.mean(librosa.feature.zero_crossing_rate(audio))
# Feature 4: Low frequency energy (0-500 Hz)
freqs = librosa.fft_frequencies(sr=self.sample_rate, n_fft=2048)
low_freq_mask = freqs < 500
low_energy = np.mean(S[low_freq_mask, :]) if np.any(low_freq_mask) else 0
# Feature 5: High frequency energy (4000+ Hz)
high_freq_mask = freqs > 4000
high_energy = np.mean(S[high_freq_mask, :]) if np.any(high_freq_mask) else 0
# Feature 6: Overall energy
total_energy = np.mean(audio ** 2)
# Feature 7: Spectral contrast (texture measure)
contrast = np.mean(librosa.feature.spectral_contrast(S=S, sr=self.sample_rate))
# Score each noise type based on features
scores = {
'traffic': 0.0,
'office': 0.0,
'crowd': 0.0,
'wind': 0.0,
'clean': 0.0
}
# Traffic: Low frequency dominant + rumble + consistent
if low_energy > 0.002 and centroid < 2000 and contrast < 20:
scores['traffic'] = low_energy * 100 + (2500 - centroid) / 1000
# Office: Mid frequencies + keyboard clicks + air conditioning hum
if 1500 < centroid < 3500 and 0.0005 < total_energy < 0.005:
scores['office'] = (3500 - abs(centroid - 2500)) / 1000 + contrast / 30
# Crowd: High ZCR + varying spectrum + speech-like energy
if zcr > 0.08 and total_energy > 0.003 and contrast > 15:
scores['crowd'] = zcr * 10 + total_energy * 50
# Wind: Very high ZCR + high frequency energy + low contrast
if zcr > 0.12 and high_energy > 0.001 and contrast < 15:
scores['wind'] = zcr * 8 + high_energy * 100
# Clean: Low energy + low ZCR + high contrast (speech only)
if total_energy < 0.005 and zcr < 0.08 and contrast > 20:
scores['clean'] = (0.005 - total_energy) * 200 + contrast / 30
# If all scores are low, default to clean
if max(scores.values()) < 0.1:
scores['clean'] = 1.0
# Normalize to probabilities
total = sum(scores.values())
if total > 0:
scores = {k: v/total for k, v in scores.items()}
else:
scores['clean'] = 1.0
return scores
except Exception as e:
print(f"Noise classification failed: {e}")
return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1}
def extract_speech_rate(self, audio: np.ndarray, transcript: str) -> float:
"""V3: Speech Rate (words per second)"""
if not transcript:
return 0.0
word_count = len(transcript.split())
duration = len(audio) / self.sample_rate
if duration == 0:
return 0.0
return word_count / duration
def extract_pitch_features(self, audio: np.ndarray) -> Tuple[float, float]:
"""V4-V5: Pitch Mean and Std"""
try:
if len(audio) < 2048:
return 0.0, 0.0
# Use pyin (more robust than yin)
f0, voiced_flag, voiced_probs = librosa.pyin(
audio,
fmin=librosa.note_to_hz('C2'),
fmax=librosa.note_to_hz('C7'),
sr=self.sample_rate
)
# Only use voiced frames
f0_voiced = f0[voiced_flag]
if len(f0_voiced) == 0:
return 0.0, 0.0
return float(np.mean(f0_voiced)), float(np.std(f0_voiced))
except Exception as e:
print(f"Pitch extraction failed: {e}")
return 0.0, 0.0
def extract_energy_features(self, audio: np.ndarray) -> Tuple[float, float]:
"""V6-V7: Energy Mean and Std"""
try:
rms = librosa.feature.rms(y=audio)[0]
e_mean = float(np.mean(rms))
e_std = float(np.std(rms))
if e_mean > 0:
e_std = e_std / e_mean
else:
e_std = 0.0
return e_mean, e_std
except:
return 0.0, 0.0
def extract_pause_features(self, audio: np.ndarray) -> Tuple[float, float, int]:
"""
V8-V10: Pause Ratio, Average Pause Duration, Mid-Pause Count
Uses Silero VAD
"""
if self.vad_model is None or len(audio) < 512:
return 0.0, 0.0, 0
try:
audio_vad, speech_dict = self._prepare_vad_audio(audio)
# Calculate speech duration
speech_samples = sum(seg['end'] - seg['start'] for seg in speech_dict)
total_samples = len(audio_vad)
if total_samples == 0:
return 0.0, 0.0, 0
# Pause Ratio
pause_samples = total_samples - speech_samples
pause_ratio = pause_samples / total_samples
# Calculate gaps between speech segments
gaps = []
if len(speech_dict) > 1:
for i in range(len(speech_dict) - 1):
gap = speech_dict[i+1]['start'] - speech_dict[i]['end']
if gap > 0:
gaps.append(gap / self.vad_sample_rate) # Convert to seconds
avg_pause_dur = float(np.mean(gaps)) if gaps else 0.0
# Mid-Pause Count (0.3s - 1.0s)
mid_pause_cnt = sum(1 for g in gaps if 0.3 <= g <= 1.0)
return float(pause_ratio), float(avg_pause_dur), int(mid_pause_cnt)
except Exception as e:
print(f"VAD Error: {e}")
return 0.0, 0.0, 0
def extract_all(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]:
"""Extract all audio features (14 original + 3 emotion = 17 total)"""
if audio.dtype != np.float32:
audio = audio.astype(np.float32)
features = {}
# V1: SNR (speech-only signal vs pause-only noise)
features['v1_snr'] = self.extract_snr(audio)
# V2: Noise classification (IMPROVED)
noise_class = self.classify_noise_type(audio)
features['v2_noise_traffic'] = noise_class['traffic']
features['v2_noise_office'] = noise_class['office']
features['v2_noise_crowd'] = noise_class['crowd']
features['v2_noise_wind'] = noise_class['wind']
features['v2_noise_clean'] = noise_class['clean']
# V3: Speech rate
features['v3_speech_rate'] = self.extract_speech_rate(audio, transcript)
# V4-V5: Pitch
p_mean, p_std = self.extract_pitch_features(audio)
features['v4_pitch_mean'] = p_mean
features['v5_pitch_std'] = p_std
# V6-V7: Energy
e_mean, e_std = self.extract_energy_features(audio)
features['v6_energy_mean'] = e_mean
features['v7_energy_std'] = e_std
# V8-V10: Pause features
pause_ratio, avg_pause, mid_pause_cnt = self.extract_pause_features(audio)
features['v8_pause_ratio'] = pause_ratio
features['v9_avg_pause_dur'] = avg_pause
features['v10_mid_pause_cnt'] = float(mid_pause_cnt)
# V11-V13: Emotion features
if self.use_emotion and self.emotion_extractor is not None:
try:
emotion_features = self.emotion_extractor.extract_all(audio, self.sample_rate)
features.update(emotion_features)
except Exception as e:
print(f"⚠ Emotion features skipped: {e}")
# Add zero values for compatibility
features['v11_emotion_stress'] = 0.0
features['v12_emotion_energy'] = 0.0
features['v13_emotion_valence'] = 0.0
return features
def extract_basic(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]:
"""
Extract a minimal set of audio features for fast decisions.
Uses only low-cost features.
"""
if audio.dtype != np.float32:
audio = audio.astype(np.float32)
features = {}
features['v1_snr'] = self.extract_snr(audio)
features['v3_speech_rate'] = self.extract_speech_rate(audio, transcript)
e_mean, e_std = self.extract_energy_features(audio)
features['v6_energy_mean'] = e_mean
features['v7_energy_std'] = e_std
pause_ratio, avg_pause, mid_pause_cnt = self.extract_pause_features(audio)
features['v8_pause_ratio'] = pause_ratio
features['v9_avg_pause_dur'] = avg_pause
features['v10_mid_pause_cnt'] = float(mid_pause_cnt)
return features
if __name__ == "__main__":
extractor = AudioFeatureExtractor()
print("Audio Feature Extractor initialized successfully")
print("Using HNR instead of SNR for better voice quality measurement")