busy-module-audio / emotion_features.py
EurekaPotato's picture
Upload folder using huggingface_hub
f4320c5 verified
"""
Emotion Feature Extractor - Using NeuroByte Models
Extracts emotion features from audio for busy detection.
Uses 3 pre-trained Keras models from NeuroByte-Consulting:
1. CRNN (Convolutional Recurrent Neural Network) - Best for sequential patterns
2. Mel Spectrogram CNN - Best for frequency patterns
3. MFCC CNN - Best for speech characteristics
Each model outputs 7 emotion classes: angry, disgust, fear, happy, neutral, sad, surprise
"""
import numpy as np
import librosa
import warnings
from typing import Dict, Optional
import os
warnings.filterwarnings("ignore")
try:
import tensorflow as tf
from tensorflow import keras
TENSORFLOW_AVAILABLE = True
except ImportError:
TENSORFLOW_AVAILABLE = False
print("[WARN] TensorFlow not available. Install with: pip install tensorflow")
class EmotionFeatureExtractor:
"""Extract emotion features using NeuroByte pre-trained models"""
# Emotion labels from the models
EMOTIONS = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
# Preprocessing parameters used during model training
MODEL_SAMPLE_RATE = 44100
MODEL_CLIP_DURATION = 4.0 # seconds
MODEL_N_FFT = 2048
MODEL_HOP_LENGTH = 512
MODEL_N_MELS = 128
MODEL_N_MFCC = 40
MODEL_TIME_FRAMES = 345
def __init__(self, models_dir: str = None, use_ensemble: bool = True):
"""
Initialize emotion detector with NeuroByte models
Args:
models_dir: Directory containing the .keras model files.
Defaults to 'models' relative to this file.
use_ensemble: If True, average predictions from all 3 models (more accurate)
If False, use only CRNN model (faster)
"""
if models_dir is None:
# Default to 'models' folder in same directory as this script
models_dir = os.path.join(os.path.dirname(__file__), 'models')
self.models_dir = models_dir
self.use_ensemble = use_ensemble
self.models = {}
if not TENSORFLOW_AVAILABLE:
print("[WARN] TensorFlow not installed. Falling back to acoustic features.")
self.use_tensorflow = False
return
self.use_tensorflow = True
# Model file paths
model_files = {
'crnn': 'emotion_recognition_crnn.keras',
'mel_spec': 'emotion_recognition_mel_spec.keras',
'mfcc': 'emotion_recognition_mfcc.keras'
}
# Load models
print(f"Loading NeuroByte emotion models from {models_dir}...")
for model_name, filename in model_files.items():
model_path = os.path.join(models_dir, filename)
if os.path.exists(model_path):
try:
model = keras.models.load_model(model_path)
self.models[model_name] = model
print(f"[OK] Loaded {model_name} model")
except Exception as e:
print(f"[WARN] Failed to load {model_name}: {e}")
else:
print(f"[WARN] Model not found: {model_path}")
# If no models loaded, fall back to acoustics
if len(self.models) == 0:
print("[WARN] No models loaded. Using acoustic features fallback.")
self.use_tensorflow = False
else:
print(f"[OK] {len(self.models)} emotion model(s) loaded successfully")
def download_models(self):
"""
Download NeuroByte models from Hugging Face
Run this once to download the models:
>>> extractor = EmotionFeatureExtractor()
>>> extractor.download_models()
"""
if not TENSORFLOW_AVAILABLE:
print("[WARN] TensorFlow required to download models")
return
try:
from huggingface_hub import hf_hub_download
os.makedirs(self.models_dir, exist_ok=True)
repo_id = "neurobyte-org/speech-emotion-recognition"
model_files = [
'emotion_recognition_crnn.keras',
'emotion_recognition_mel_spec.keras',
'emotion_recognition_mfcc.keras'
]
print(f"Downloading models from {repo_id}...")
for filename in model_files:
try:
print(f" Downloading {filename}...")
downloaded_path = hf_hub_download(
repo_id=repo_id,
filename=filename,
cache_dir=self.models_dir
)
# Copy to expected location
target_path = os.path.join(self.models_dir, filename)
if downloaded_path != target_path:
import shutil
shutil.copy(downloaded_path, target_path)
print(f" [OK] {filename} downloaded")
except Exception as e:
print(f" [WARN] Failed to download {filename}: {e}")
print("[OK] Download complete! Reinitialize the extractor to load models.")
except ImportError:
print("[WARN] huggingface_hub not installed. Install with: pip install huggingface_hub")
def extract_mel_spectrogram(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray:
"""
Extract mel spectrogram for the mel_spec model
Returns shape: (128, 345, 1) for CNN input
"""
# Resample to training sample rate if needed
if sr != self.MODEL_SAMPLE_RATE:
audio = librosa.resample(audio, orig_sr=sr, target_sr=self.MODEL_SAMPLE_RATE)
sr = self.MODEL_SAMPLE_RATE
# Pad/trim to fixed duration
target_samples = int(self.MODEL_CLIP_DURATION * sr)
if len(audio) < target_samples:
audio = np.pad(audio, (0, target_samples - len(audio)), mode='constant')
else:
audio = audio[:target_samples]
# Extract mel spectrogram
mel_spec = librosa.feature.melspectrogram(
y=audio,
sr=sr,
n_fft=self.MODEL_N_FFT,
hop_length=self.MODEL_HOP_LENGTH,
n_mels=self.MODEL_N_MELS,
fmin=0,
fmax=sr/2
)
# Convert to dB
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
# Normalize to [0, 1]
mel_spec_norm = (mel_spec_db - mel_spec_db.min()) / (mel_spec_db.max() - mel_spec_db.min() + 1e-8)
# Add channel dimension (freq, time, 1)
mel_spec_norm = np.expand_dims(mel_spec_norm, axis=-1)
# Pad or truncate to fixed time length
target_length = self.MODEL_TIME_FRAMES
if mel_spec_norm.shape[1] < target_length:
# Pad with zeros
pad_width = target_length - mel_spec_norm.shape[1]
mel_spec_norm = np.pad(mel_spec_norm, ((0, 0), (0, pad_width), (0, 0)), mode='constant')
else:
# Truncate
mel_spec_norm = mel_spec_norm[:, :target_length, :]
return mel_spec_norm
def extract_mfcc(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray:
"""
Extract MFCC features for the mfcc model
Returns shape: (40, 345, 1) for CNN input
"""
# Resample to training sample rate if needed
if sr != self.MODEL_SAMPLE_RATE:
audio = librosa.resample(audio, orig_sr=sr, target_sr=self.MODEL_SAMPLE_RATE)
sr = self.MODEL_SAMPLE_RATE
# Pad/trim to fixed duration
target_samples = int(self.MODEL_CLIP_DURATION * sr)
if len(audio) < target_samples:
audio = np.pad(audio, (0, target_samples - len(audio)), mode='constant')
else:
audio = audio[:target_samples]
# Extract MFCCs
mfccs = librosa.feature.mfcc(
y=audio,
sr=sr,
n_mfcc=self.MODEL_N_MFCC,
n_fft=self.MODEL_N_FFT,
hop_length=self.MODEL_HOP_LENGTH
)
# Normalize
mfccs = (mfccs - mfccs.mean()) / (mfccs.std() + 1e-8)
# Add channel dimension (coeff, time, 1)
mfccs = np.expand_dims(mfccs, axis=-1)
# Pad or truncate to fixed length
target_length = self.MODEL_TIME_FRAMES
if mfccs.shape[1] < target_length:
pad_width = target_length - mfccs.shape[1]
mfccs = np.pad(mfccs, ((0, 0), (0, pad_width), (0, 0)), mode='constant')
else:
mfccs = mfccs[:, :target_length, :]
return mfccs
def predict_emotions(self, audio: np.ndarray, sr: int = 16000) -> Dict[str, float]:
"""
Predict emotion probabilities using loaded models
Returns:
Dictionary with emotion labels as keys and probabilities as values
"""
if not self.use_tensorflow or len(self.models) == 0:
return self.extract_from_acoustics(audio, sr)
try:
predictions = []
def _predict_with_shape_guard(model, mel_spec_batch, mfcc_batch):
expected = model.input_shape
if expected is None or len(expected) < 4:
return model.predict(mel_spec_batch, verbose=0)[0]
freq_bins = expected[1]
if freq_bins == self.MODEL_N_MELS:
return model.predict(mel_spec_batch, verbose=0)[0]
if freq_bins == self.MODEL_N_MFCC:
return model.predict(mfcc_batch, verbose=0)[0]
# Fallback: try mel then mfcc
try:
return model.predict(mel_spec_batch, verbose=0)[0]
except Exception:
return model.predict(mfcc_batch, verbose=0)[0]
mel_spec = self.extract_mel_spectrogram(audio, sr)
mel_spec_batch = np.expand_dims(mel_spec, axis=0)
mfcc = self.extract_mfcc(audio, sr)
mfcc_batch = np.expand_dims(mfcc, axis=0)
# CRNN model (if available)
if 'crnn' in self.models:
pred_crnn = _predict_with_shape_guard(self.models['crnn'], mel_spec_batch, mfcc_batch)
predictions.append(pred_crnn)
# Mel Spectrogram model (if available)
if 'mel_spec' in self.models and self.use_ensemble:
pred_mel = _predict_with_shape_guard(self.models['mel_spec'], mel_spec_batch, mfcc_batch)
predictions.append(pred_mel)
# MFCC model (if available)
if 'mfcc' in self.models and self.use_ensemble:
pred_mfcc = _predict_with_shape_guard(self.models['mfcc'], mel_spec_batch, mfcc_batch)
predictions.append(pred_mfcc)
# Average predictions if ensemble
if len(predictions) > 1:
avg_pred = np.mean(predictions, axis=0)
else:
avg_pred = predictions[0]
# Convert to dictionary
emotion_probs = {emotion: float(prob) for emotion, prob in zip(self.EMOTIONS, avg_pred)}
return emotion_probs
except Exception as e:
print(f"⚠ Prediction failed: {e}")
return self.extract_from_acoustics(audio, sr)
def extract_from_acoustics(self, audio: np.ndarray, sr: int = 16000) -> Dict[str, float]:
"""
Fallback: Extract emotion proxies from acoustic features
Returns emotion-like scores without deep learning
"""
try:
if len(audio) < 512:
return {emotion: 1.0/7 for emotion in self.EMOTIONS} # Uniform distribution
# Extract acoustic features
rms = librosa.feature.rms(y=audio)[0]
mean_energy = np.mean(rms)
energy_std = np.std(rms)
f0 = librosa.yin(audio, fmin=75, fmax=400, sr=sr)
f0_voiced = f0[f0 > 0]
pitch_mean = np.mean(f0_voiced) if len(f0_voiced) > 0 else 0
pitch_std = np.std(f0_voiced) if len(f0_voiced) > 0 else 0
zcr = np.mean(librosa.feature.zero_crossing_rate(audio))
centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=sr))
# Heuristic mapping to emotions
scores = {
'angry': (energy_std * 10 + pitch_std / 50) / 2,
'disgust': (pitch_mean / 300) * 0.3,
'fear': (pitch_mean / 250 + zcr * 5) / 2,
'happy': (centroid / 3000 + mean_energy * 5) / 2,
'neutral': 0.3, # Baseline
'sad': (1 - centroid / 4000) * 0.5,
'surprise': (energy_std * 8 + zcr * 3) / 2
}
# Normalize to sum to 1
total = sum(scores.values())
scores = {k: v / total for k, v in scores.items()}
return scores
except Exception as e:
print(f"⚠ Acoustic fallback failed: {e}")
return {emotion: 1.0/7 for emotion in self.EMOTIONS}
def extract_all(self, audio: np.ndarray, sr: int = 16000) -> Dict[str, float]:
"""
Extract emotion features for busy detection
Returns:
v11_emotion_stress: 0-1 (angry + fear + disgust)
v12_emotion_energy: 0-1 (happy + surprise + angry)
v13_emotion_valence: 0-1 (happy - sad - angry)
"""
if audio.dtype != np.float32:
audio = audio.astype(np.float32)
# Get emotion predictions
emotion_probs = self.predict_emotions(audio, sr)
# Map emotions to features
stress = (
emotion_probs.get('angry', 0.0) * 0.5 +
emotion_probs.get('fear', 0.0) * 0.3 +
emotion_probs.get('disgust', 0.0) * 0.2
)
energy = (
emotion_probs.get('happy', 0.0) * 0.4 +
emotion_probs.get('surprise', 0.0) * 0.3 +
emotion_probs.get('angry', 0.0) * 0.3
)
valence = (
emotion_probs.get('happy', 0.0) +
emotion_probs.get('surprise', 0.0) * 0.5 -
emotion_probs.get('sad', 0.0) -
emotion_probs.get('angry', 0.0) * 0.5
)
# Normalize valence to [0, 1]
valence = (valence + 1.0) / 2.0
return {
'v11_emotion_stress': float(np.clip(stress, 0, 1)),
'v12_emotion_energy': float(np.clip(energy, 0, 1)),
'v13_emotion_valence': float(np.clip(valence, 0, 1))
}
# Standalone test
if __name__ == "__main__":
import time
print("Testing NeuroByte Emotion Feature Extractor...")
# Initialize extractor
extractor = EmotionFeatureExtractor(
models_dir="models_cache/emotion_models",
use_ensemble=True
)
# If models not found, try to download
if not extractor.use_tensorflow or len(extractor.models) == 0:
print("\nModels not found. Download them with:")
print(" extractor.download_models()")
print("\nUsing acoustic fallback for now...")
# Generate test audio
duration = 3
sr = 16000
t = np.linspace(0, duration, sr * duration)
# Test 1: Stressed voice (high pitch, varying)
print("\n1. Testing with stressed audio:")
audio_stressed = np.sin(2 * np.pi * 300 * t) + 0.5 * np.sin(2 * np.pi * 150 * t)
audio_stressed += 0.2 * np.random.randn(len(audio_stressed))
start = time.time()
features_stressed = extractor.extract_all(audio_stressed, sr)
print(f" Time: {(time.time() - start)*1000:.0f}ms")
print(" Features:")
for k, v in features_stressed.items():
print(f" {k}: {v:.3f}")
# Test 2: Calm voice (low pitch, steady)
print("\n2. Testing with calm audio:")
audio_calm = np.sin(2 * np.pi * 150 * t) * 0.3
start = time.time()
features_calm = extractor.extract_all(audio_calm, sr)
print(f" Time: {(time.time() - start)*1000:.0f}ms")
print(" Features:")
for k, v in features_calm.items():
print(f" {k}: {v:.3f}")
print("\n✓ Tests complete!")
if extractor.use_tensorflow and len(extractor.models) > 0:
print(f"\nUsing {len(extractor.models)} NeuroByte model(s)")
else:
print("\nUsing acoustic features fallback")