import os import numpy as np import librosa import tensorflow as tf from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder from tensorflow.keras.utils import to_categorical from tensorflow.keras.models import Sequential, load_model from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import EarlyStopping import pickle import joblib from joblib import Parallel, delayed from .utils import get_label_from_filename # Parameters MAX_PAD_LEN = 174 N_MFCC = 40 DURATION = 3 SAMPLE_RATE = 22050 MODEL_PATH = "model.h5" ENCODER_PATH = "encoder.pkl" def extract_features_static(file_path, duration=DURATION, sample_rate=SAMPLE_RATE, n_mfcc=N_MFCC, max_pad_len=MAX_PAD_LEN): """ Static helper for feature extraction to allow pickling for joblib parallel processing. """ try: # Normalize path file_path = os.path.normpath(os.path.abspath(file_path)) audio = None sr = sample_rate # Try loading with librosa first try: audio, sr = librosa.load(file_path, res_type='kaiser_fast', duration=duration, sr=sample_rate) except Exception as e_librosa: print(f"Librosa load failed for {file_path}: {e_librosa}. Trying soundfile...") try: import soundfile as sf audio, file_sr = sf.read(file_path) # If we read successfuly, we might need to resample or crop/pad if len(audio.shape) > 1: audio = audio[:, 0] # Take first channel if stereo # Simple resampling if needed (though librosa is better at this, we can try to use librosa.resample if load failed but resample works) if file_sr != sample_rate: audio = librosa.resample(audio, orig_sr=file_sr, target_sr=sample_rate) # Manual duration crop max_samples = int(duration * sample_rate) if len(audio) > max_samples: audio = audio[:max_samples] sr = sample_rate except Exception as e_sf: print(f"Soundfile fallback also failed for {file_path}: {e_sf}") return None if audio is None: return None mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc) pad_width = max_pad_len - mfccs.shape[1] if pad_width > 0: mfccs = np.pad(mfccs, pad_width=((0, 0), (0, pad_width)), mode='constant') else: mfccs = mfccs[:, :max_pad_len] return mfccs.T except Exception as e: print(f"Error extracting features from {file_path}: {e}") return None def process_file(file): """ Helper to process a single file and return (features, label). Must be at module level for joblib on Windows. """ try: lbl = get_label_from_filename(file) if lbl: feat = extract_features_static(file) if feat is not None: return (feat, lbl) except Exception as e: print(f"Error processing {file}: {e}") return None class EmotionClassifier: def __init__(self): self.model = None self.le = LabelEncoder() self.is_loaded = False self.load_artifacts() def load_artifacts(self): if os.path.exists(MODEL_PATH) and os.path.exists(ENCODER_PATH): try: self.model = load_model(MODEL_PATH) with open(ENCODER_PATH, 'rb') as f: self.le = pickle.load(f) # Consistency Check expected_classes = self.model.output_shape[1] actual_classes = len(self.le.classes_) if expected_classes != actual_classes: print(f"WARNING: Model expects {expected_classes} classes but encoder has {actual_classes}.") print("This may cause 'unseen labels' errors. Please re-train or ensure files match.") self.is_loaded = True print("Model and encoder loaded successfully.") except Exception as e: print(f"Failed to load artifacts: {e}") else: print("No pre-trained model found. System ready for training.") def extract_features(self, file_path): """Wrapper for static extraction method.""" return extract_features_static(file_path) def train_model(self, data_path, log_callback=None): """Trains the model from scratch using data in data_path.""" def log(msg): if log_callback: log_callback(msg) else: print(msg) # Cache paths features_cache_path = os.path.join(data_path, "features_cache.npy") labels_cache_path = os.path.join(data_path, "labels_cache.npy") X = None y = None # Check cache if os.path.exists(features_cache_path) and os.path.exists(labels_cache_path): log("Found cached features. Loading from disk...") try: X = np.load(features_cache_path) y = np.load(labels_cache_path) log(f"Loaded {len(X)} cached samples.") except Exception as e: log(f"Failed to load cache: {e}. Recomputing...") X = None y = None if X is None or y is None: files = [] # Walk through directory for root, _, filenames in os.walk(data_path): for f in filenames: if f.endswith('.wav'): files.append(os.path.join(root, f)) if not files: log("DEBUG: No .wav files found in os.walk") raise ValueError("No .wav files found for training.") log(f"Processing {len(files)} files for training utilizing parallel processing...") # Run in parallel # n_jobs=1 avoids Windows multiprocessing issues results = Parallel(n_jobs=1, verbose=5)(delayed(process_file)(f) for f in files) # Filter None results valid_results = [r for r in results if r is not None] if not valid_results: log("CRITICAL: No features extracted successfully!") raise ValueError("No features extracted. Check files and labels.") log(f"Successfully processed {len(valid_results)}/{len(files)} files.") features = [r[0] for r in valid_results] labels = [r[1] for r in valid_results] X = np.array(features, dtype='float32') y = np.array(labels) # Save cache log("Saving features to cache...") np.save(features_cache_path, X) np.save(labels_cache_path, y) # Encode labels y_encoded = to_categorical(self.le.fit_transform(y)) # Split X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42, stratify=y) # stratify=y handles class imbalance better # Build Model self.model = Sequential([ Conv1D(128, kernel_size=5, padding="same", activation="relu", input_shape=(X_train.shape[1], X_train.shape[2])), MaxPooling1D(pool_size=2), BatchNormalization(), Conv1D(256, kernel_size=5, padding="same", activation="relu"), MaxPooling1D(pool_size=2), BatchNormalization(), Dropout(0.3), Flatten(), Dense(256, activation='relu'), Dropout(0.4), Dense(y_encoded.shape[1], activation='softmax') ]) self.model.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=0.0001), metrics=['accuracy']) early_stop = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True) log("Starting training...") class LogCallback(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): log(f"Epoch {epoch+1}: loss={logs['loss']:.4f}, acc={logs['accuracy']:.4f}, val_loss={logs['val_loss']:.4f}") self.model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test), callbacks=[early_stop, LogCallback()], verbose=0) # Save artifacts self.model.save(MODEL_PATH) with open(ENCODER_PATH, 'wb') as f: pickle.dump(self.le, f) self.is_loaded = True log("Training complete and model saved.") return {"accuracy": self.model.evaluate(X_test, y_test)[1]} def predict_emotion(self, file_path): if not self.is_loaded: raise ValueError("Model not loaded. Train the model first.") mfcc = self.extract_features(file_path) if mfcc is None: raise ValueError("Could not extract features.") mfcc = mfcc[np.newaxis, :, :] # Add batch dimension prediction = self.model.predict(mfcc, verbose=0) # Get all probabilities probs = prediction[0] classes = self.le.classes_ # Determine predicted index predicted_index = np.argmax(prediction) # Safety check for unseen labels if predicted_index >= len(classes): print(f"ERROR: Predicted index {predicted_index} is out of bounds (0-{len(classes)-1}).") # Fallback: Get valid index with highest probability # We slice probs to match classes length just in case the model output is larger valid_probs = probs[:len(classes)] predicted_index = np.argmax(valid_probs) print(f"Fallback: Using index {predicted_index} ({classes[predicted_index]})") predicted_label = self.le.inverse_transform([predicted_index])[0] confidence = float(probs[predicted_index]) # Create distribution dict {label: score} # Only zip up to the length of classes to avoid errors distribution = {label: float(score) for label, score in zip(classes, probs)} return { "label": predicted_label, "confidence": confidence, "distribution": distribution }