Spaces:
Sleeping
Sleeping
| """ | |
| Emotion Feature Extractor - Using NeuroByte Models | |
| Extracts emotion features from audio for busy detection. | |
| Uses 3 pre-trained Keras models from NeuroByte-Consulting: | |
| 1. CRNN (Convolutional Recurrent Neural Network) - Best for sequential patterns | |
| 2. Mel Spectrogram CNN - Best for frequency patterns | |
| 3. MFCC CNN - Best for speech characteristics | |
| Each model outputs 7 emotion classes: angry, disgust, fear, happy, neutral, sad, surprise | |
| """ | |
| import numpy as np | |
| import librosa | |
| import warnings | |
| from typing import Dict, Optional | |
| import os | |
| warnings.filterwarnings("ignore") | |
| try: | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| TENSORFLOW_AVAILABLE = True | |
| except ImportError: | |
| TENSORFLOW_AVAILABLE = False | |
| print("[WARN] TensorFlow not available. Install with: pip install tensorflow") | |
| class EmotionFeatureExtractor: | |
| """Extract emotion features using NeuroByte pre-trained models""" | |
| # Emotion labels from the models | |
| EMOTIONS = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise'] | |
| # Preprocessing parameters used during model training | |
| MODEL_SAMPLE_RATE = 44100 | |
| MODEL_CLIP_DURATION = 4.0 # seconds | |
| MODEL_N_FFT = 2048 | |
| MODEL_HOP_LENGTH = 512 | |
| MODEL_N_MELS = 128 | |
| MODEL_N_MFCC = 40 | |
| MODEL_TIME_FRAMES = 345 | |
| def __init__(self, models_dir: str = None, use_ensemble: bool = True): | |
| """ | |
| Initialize emotion detector with NeuroByte models | |
| Args: | |
| models_dir: Directory containing the .keras model files. | |
| Defaults to 'models' relative to this file. | |
| use_ensemble: If True, average predictions from all 3 models (more accurate) | |
| If False, use only CRNN model (faster) | |
| """ | |
| if models_dir is None: | |
| # Default to 'models' folder in same directory as this script | |
| models_dir = os.path.join(os.path.dirname(__file__), 'models') | |
| self.models_dir = models_dir | |
| self.use_ensemble = use_ensemble | |
| self.models = {} | |
| if not TENSORFLOW_AVAILABLE: | |
| print("[WARN] TensorFlow not installed. Falling back to acoustic features.") | |
| self.use_tensorflow = False | |
| return | |
| self.use_tensorflow = True | |
| # Model file paths | |
| model_files = { | |
| 'crnn': 'emotion_recognition_crnn.keras', | |
| 'mel_spec': 'emotion_recognition_mel_spec.keras', | |
| 'mfcc': 'emotion_recognition_mfcc.keras' | |
| } | |
| # Load models | |
| print(f"Loading NeuroByte emotion models from {models_dir}...") | |
| for model_name, filename in model_files.items(): | |
| model_path = os.path.join(models_dir, filename) | |
| if os.path.exists(model_path): | |
| try: | |
| model = keras.models.load_model(model_path) | |
| self.models[model_name] = model | |
| print(f"[OK] Loaded {model_name} model") | |
| except Exception as e: | |
| print(f"[WARN] Failed to load {model_name}: {e}") | |
| else: | |
| print(f"[WARN] Model not found: {model_path}") | |
| # If no models loaded, fall back to acoustics | |
| if len(self.models) == 0: | |
| print("[WARN] No models loaded. Using acoustic features fallback.") | |
| self.use_tensorflow = False | |
| else: | |
| print(f"[OK] {len(self.models)} emotion model(s) loaded successfully") | |
| def download_models(self): | |
| """ | |
| Download NeuroByte models from Hugging Face | |
| Run this once to download the models: | |
| >>> extractor = EmotionFeatureExtractor() | |
| >>> extractor.download_models() | |
| """ | |
| if not TENSORFLOW_AVAILABLE: | |
| print("[WARN] TensorFlow required to download models") | |
| return | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| os.makedirs(self.models_dir, exist_ok=True) | |
| repo_id = "neurobyte-org/speech-emotion-recognition" | |
| model_files = [ | |
| 'emotion_recognition_crnn.keras', | |
| 'emotion_recognition_mel_spec.keras', | |
| 'emotion_recognition_mfcc.keras' | |
| ] | |
| print(f"Downloading models from {repo_id}...") | |
| for filename in model_files: | |
| try: | |
| print(f" Downloading {filename}...") | |
| downloaded_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| cache_dir=self.models_dir | |
| ) | |
| # Copy to expected location | |
| target_path = os.path.join(self.models_dir, filename) | |
| if downloaded_path != target_path: | |
| import shutil | |
| shutil.copy(downloaded_path, target_path) | |
| print(f" [OK] {filename} downloaded") | |
| except Exception as e: | |
| print(f" [WARN] Failed to download {filename}: {e}") | |
| print("[OK] Download complete! Reinitialize the extractor to load models.") | |
| except ImportError: | |
| print("[WARN] huggingface_hub not installed. Install with: pip install huggingface_hub") | |
| def extract_mel_spectrogram(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray: | |
| """ | |
| Extract mel spectrogram for the mel_spec model | |
| Returns shape: (128, 345, 1) for CNN input | |
| """ | |
| # Resample to training sample rate if needed | |
| if sr != self.MODEL_SAMPLE_RATE: | |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=self.MODEL_SAMPLE_RATE) | |
| sr = self.MODEL_SAMPLE_RATE | |
| # Pad/trim to fixed duration | |
| target_samples = int(self.MODEL_CLIP_DURATION * sr) | |
| if len(audio) < target_samples: | |
| audio = np.pad(audio, (0, target_samples - len(audio)), mode='constant') | |
| else: | |
| audio = audio[:target_samples] | |
| # Extract mel spectrogram | |
| mel_spec = librosa.feature.melspectrogram( | |
| y=audio, | |
| sr=sr, | |
| n_fft=self.MODEL_N_FFT, | |
| hop_length=self.MODEL_HOP_LENGTH, | |
| n_mels=self.MODEL_N_MELS, | |
| fmin=0, | |
| fmax=sr/2 | |
| ) | |
| # Convert to dB | |
| mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max) | |
| # Normalize to [0, 1] | |
| mel_spec_norm = (mel_spec_db - mel_spec_db.min()) / (mel_spec_db.max() - mel_spec_db.min() + 1e-8) | |
| # Add channel dimension (freq, time, 1) | |
| mel_spec_norm = np.expand_dims(mel_spec_norm, axis=-1) | |
| # Pad or truncate to fixed time length | |
| target_length = self.MODEL_TIME_FRAMES | |
| if mel_spec_norm.shape[1] < target_length: | |
| # Pad with zeros | |
| pad_width = target_length - mel_spec_norm.shape[1] | |
| mel_spec_norm = np.pad(mel_spec_norm, ((0, 0), (0, pad_width), (0, 0)), mode='constant') | |
| else: | |
| # Truncate | |
| mel_spec_norm = mel_spec_norm[:, :target_length, :] | |
| return mel_spec_norm | |
| def extract_mfcc(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray: | |
| """ | |
| Extract MFCC features for the mfcc model | |
| Returns shape: (40, 345, 1) for CNN input | |
| """ | |
| # Resample to training sample rate if needed | |
| if sr != self.MODEL_SAMPLE_RATE: | |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=self.MODEL_SAMPLE_RATE) | |
| sr = self.MODEL_SAMPLE_RATE | |
| # Pad/trim to fixed duration | |
| target_samples = int(self.MODEL_CLIP_DURATION * sr) | |
| if len(audio) < target_samples: | |
| audio = np.pad(audio, (0, target_samples - len(audio)), mode='constant') | |
| else: | |
| audio = audio[:target_samples] | |
| # Extract MFCCs | |
| mfccs = librosa.feature.mfcc( | |
| y=audio, | |
| sr=sr, | |
| n_mfcc=self.MODEL_N_MFCC, | |
| n_fft=self.MODEL_N_FFT, | |
| hop_length=self.MODEL_HOP_LENGTH | |
| ) | |
| # Normalize | |
| mfccs = (mfccs - mfccs.mean()) / (mfccs.std() + 1e-8) | |
| # Add channel dimension (coeff, time, 1) | |
| mfccs = np.expand_dims(mfccs, axis=-1) | |
| # Pad or truncate to fixed length | |
| target_length = self.MODEL_TIME_FRAMES | |
| if mfccs.shape[1] < target_length: | |
| pad_width = target_length - mfccs.shape[1] | |
| mfccs = np.pad(mfccs, ((0, 0), (0, pad_width), (0, 0)), mode='constant') | |
| else: | |
| mfccs = mfccs[:, :target_length, :] | |
| return mfccs | |
| def predict_emotions(self, audio: np.ndarray, sr: int = 16000) -> Dict[str, float]: | |
| """ | |
| Predict emotion probabilities using loaded models | |
| Returns: | |
| Dictionary with emotion labels as keys and probabilities as values | |
| """ | |
| if not self.use_tensorflow or len(self.models) == 0: | |
| return self.extract_from_acoustics(audio, sr) | |
| try: | |
| predictions = [] | |
| def _predict_with_shape_guard(model, mel_spec_batch, mfcc_batch): | |
| expected = model.input_shape | |
| if expected is None or len(expected) < 4: | |
| return model.predict(mel_spec_batch, verbose=0)[0] | |
| freq_bins = expected[1] | |
| if freq_bins == self.MODEL_N_MELS: | |
| return model.predict(mel_spec_batch, verbose=0)[0] | |
| if freq_bins == self.MODEL_N_MFCC: | |
| return model.predict(mfcc_batch, verbose=0)[0] | |
| # Fallback: try mel then mfcc | |
| try: | |
| return model.predict(mel_spec_batch, verbose=0)[0] | |
| except Exception: | |
| return model.predict(mfcc_batch, verbose=0)[0] | |
| mel_spec = self.extract_mel_spectrogram(audio, sr) | |
| mel_spec_batch = np.expand_dims(mel_spec, axis=0) | |
| mfcc = self.extract_mfcc(audio, sr) | |
| mfcc_batch = np.expand_dims(mfcc, axis=0) | |
| # CRNN model (if available) | |
| if 'crnn' in self.models: | |
| pred_crnn = _predict_with_shape_guard(self.models['crnn'], mel_spec_batch, mfcc_batch) | |
| predictions.append(pred_crnn) | |
| # Mel Spectrogram model (if available) | |
| if 'mel_spec' in self.models and self.use_ensemble: | |
| pred_mel = _predict_with_shape_guard(self.models['mel_spec'], mel_spec_batch, mfcc_batch) | |
| predictions.append(pred_mel) | |
| # MFCC model (if available) | |
| if 'mfcc' in self.models and self.use_ensemble: | |
| pred_mfcc = _predict_with_shape_guard(self.models['mfcc'], mel_spec_batch, mfcc_batch) | |
| predictions.append(pred_mfcc) | |
| # Average predictions if ensemble | |
| if len(predictions) > 1: | |
| avg_pred = np.mean(predictions, axis=0) | |
| else: | |
| avg_pred = predictions[0] | |
| # Convert to dictionary | |
| emotion_probs = {emotion: float(prob) for emotion, prob in zip(self.EMOTIONS, avg_pred)} | |
| return emotion_probs | |
| except Exception as e: | |
| print(f"⚠ Prediction failed: {e}") | |
| return self.extract_from_acoustics(audio, sr) | |
| def extract_from_acoustics(self, audio: np.ndarray, sr: int = 16000) -> Dict[str, float]: | |
| """ | |
| Fallback: Extract emotion proxies from acoustic features | |
| Returns emotion-like scores without deep learning | |
| """ | |
| try: | |
| if len(audio) < 512: | |
| return {emotion: 1.0/7 for emotion in self.EMOTIONS} # Uniform distribution | |
| # Extract acoustic features | |
| rms = librosa.feature.rms(y=audio)[0] | |
| mean_energy = np.mean(rms) | |
| energy_std = np.std(rms) | |
| f0 = librosa.yin(audio, fmin=75, fmax=400, sr=sr) | |
| f0_voiced = f0[f0 > 0] | |
| pitch_mean = np.mean(f0_voiced) if len(f0_voiced) > 0 else 0 | |
| pitch_std = np.std(f0_voiced) if len(f0_voiced) > 0 else 0 | |
| zcr = np.mean(librosa.feature.zero_crossing_rate(audio)) | |
| centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=sr)) | |
| # Heuristic mapping to emotions | |
| scores = { | |
| 'angry': (energy_std * 10 + pitch_std / 50) / 2, | |
| 'disgust': (pitch_mean / 300) * 0.3, | |
| 'fear': (pitch_mean / 250 + zcr * 5) / 2, | |
| 'happy': (centroid / 3000 + mean_energy * 5) / 2, | |
| 'neutral': 0.3, # Baseline | |
| 'sad': (1 - centroid / 4000) * 0.5, | |
| 'surprise': (energy_std * 8 + zcr * 3) / 2 | |
| } | |
| # Normalize to sum to 1 | |
| total = sum(scores.values()) | |
| scores = {k: v / total for k, v in scores.items()} | |
| return scores | |
| except Exception as e: | |
| print(f"⚠ Acoustic fallback failed: {e}") | |
| return {emotion: 1.0/7 for emotion in self.EMOTIONS} | |
| def extract_all(self, audio: np.ndarray, sr: int = 16000) -> Dict[str, float]: | |
| """ | |
| Extract emotion features for busy detection | |
| Returns: | |
| v11_emotion_stress: 0-1 (angry + fear + disgust) | |
| v12_emotion_energy: 0-1 (happy + surprise + angry) | |
| v13_emotion_valence: 0-1 (happy - sad - angry) | |
| """ | |
| if audio.dtype != np.float32: | |
| audio = audio.astype(np.float32) | |
| # Get emotion predictions | |
| emotion_probs = self.predict_emotions(audio, sr) | |
| # Map emotions to features | |
| stress = ( | |
| emotion_probs.get('angry', 0.0) * 0.5 + | |
| emotion_probs.get('fear', 0.0) * 0.3 + | |
| emotion_probs.get('disgust', 0.0) * 0.2 | |
| ) | |
| energy = ( | |
| emotion_probs.get('happy', 0.0) * 0.4 + | |
| emotion_probs.get('surprise', 0.0) * 0.3 + | |
| emotion_probs.get('angry', 0.0) * 0.3 | |
| ) | |
| valence = ( | |
| emotion_probs.get('happy', 0.0) + | |
| emotion_probs.get('surprise', 0.0) * 0.5 - | |
| emotion_probs.get('sad', 0.0) - | |
| emotion_probs.get('angry', 0.0) * 0.5 | |
| ) | |
| # Normalize valence to [0, 1] | |
| valence = (valence + 1.0) / 2.0 | |
| return { | |
| 'v11_emotion_stress': float(np.clip(stress, 0, 1)), | |
| 'v12_emotion_energy': float(np.clip(energy, 0, 1)), | |
| 'v13_emotion_valence': float(np.clip(valence, 0, 1)) | |
| } | |
| # Standalone test | |
| if __name__ == "__main__": | |
| import time | |
| print("Testing NeuroByte Emotion Feature Extractor...") | |
| # Initialize extractor | |
| extractor = EmotionFeatureExtractor( | |
| models_dir="models_cache/emotion_models", | |
| use_ensemble=True | |
| ) | |
| # If models not found, try to download | |
| if not extractor.use_tensorflow or len(extractor.models) == 0: | |
| print("\nModels not found. Download them with:") | |
| print(" extractor.download_models()") | |
| print("\nUsing acoustic fallback for now...") | |
| # Generate test audio | |
| duration = 3 | |
| sr = 16000 | |
| t = np.linspace(0, duration, sr * duration) | |
| # Test 1: Stressed voice (high pitch, varying) | |
| print("\n1. Testing with stressed audio:") | |
| audio_stressed = np.sin(2 * np.pi * 300 * t) + 0.5 * np.sin(2 * np.pi * 150 * t) | |
| audio_stressed += 0.2 * np.random.randn(len(audio_stressed)) | |
| start = time.time() | |
| features_stressed = extractor.extract_all(audio_stressed, sr) | |
| print(f" Time: {(time.time() - start)*1000:.0f}ms") | |
| print(" Features:") | |
| for k, v in features_stressed.items(): | |
| print(f" {k}: {v:.3f}") | |
| # Test 2: Calm voice (low pitch, steady) | |
| print("\n2. Testing with calm audio:") | |
| audio_calm = np.sin(2 * np.pi * 150 * t) * 0.3 | |
| start = time.time() | |
| features_calm = extractor.extract_all(audio_calm, sr) | |
| print(f" Time: {(time.time() - start)*1000:.0f}ms") | |
| print(" Features:") | |
| for k, v in features_calm.items(): | |
| print(f" {k}: {v:.3f}") | |
| print("\n✓ Tests complete!") | |
| if extractor.use_tensorflow and len(extractor.models) > 0: | |
| print(f"\nUsing {len(extractor.models)} NeuroByte model(s)") | |
| else: | |
| print("\nUsing acoustic features fallback") | |