Spaces:
Sleeping
Sleeping
| import os | |
| import numpy as np | |
| import librosa | |
| import tensorflow as tf | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import LabelEncoder | |
| from tensorflow.keras.utils import to_categorical | |
| from tensorflow.keras.models import Sequential, load_model | |
| from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization | |
| from tensorflow.keras.optimizers import Adam | |
| from tensorflow.keras.callbacks import EarlyStopping | |
| import pickle | |
| import joblib | |
| from joblib import Parallel, delayed | |
| from .utils import get_label_from_filename | |
| # Parameters | |
| MAX_PAD_LEN = 174 | |
| N_MFCC = 40 | |
| DURATION = 3 | |
| SAMPLE_RATE = 22050 | |
| MODEL_PATH = "model.h5" | |
| ENCODER_PATH = "encoder.pkl" | |
| def extract_features_static(file_path, duration=DURATION, sample_rate=SAMPLE_RATE, n_mfcc=N_MFCC, max_pad_len=MAX_PAD_LEN): | |
| """ | |
| Static helper for feature extraction to allow pickling for joblib parallel processing. | |
| """ | |
| try: | |
| # Normalize path | |
| file_path = os.path.normpath(os.path.abspath(file_path)) | |
| audio = None | |
| sr = sample_rate | |
| # Try loading with librosa first | |
| try: | |
| audio, sr = librosa.load(file_path, res_type='kaiser_fast', duration=duration, sr=sample_rate) | |
| except Exception as e_librosa: | |
| print(f"Librosa load failed for {file_path}: {e_librosa}. Trying soundfile...") | |
| try: | |
| import soundfile as sf | |
| audio, file_sr = sf.read(file_path) | |
| # If we read successfuly, we might need to resample or crop/pad | |
| if len(audio.shape) > 1: | |
| audio = audio[:, 0] # Take first channel if stereo | |
| # Simple resampling if needed (though librosa is better at this, we can try to use librosa.resample if load failed but resample works) | |
| if file_sr != sample_rate: | |
| audio = librosa.resample(audio, orig_sr=file_sr, target_sr=sample_rate) | |
| # Manual duration crop | |
| max_samples = int(duration * sample_rate) | |
| if len(audio) > max_samples: | |
| audio = audio[:max_samples] | |
| sr = sample_rate | |
| except Exception as e_sf: | |
| print(f"Soundfile fallback also failed for {file_path}: {e_sf}") | |
| return None | |
| if audio is None: | |
| return None | |
| mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc) | |
| pad_width = max_pad_len - mfccs.shape[1] | |
| if pad_width > 0: | |
| mfccs = np.pad(mfccs, pad_width=((0, 0), (0, pad_width)), mode='constant') | |
| else: | |
| mfccs = mfccs[:, :max_pad_len] | |
| return mfccs.T | |
| except Exception as e: | |
| print(f"Error extracting features from {file_path}: {e}") | |
| return None | |
| def process_file(file): | |
| """ | |
| Helper to process a single file and return (features, label). | |
| Must be at module level for joblib on Windows. | |
| """ | |
| try: | |
| lbl = get_label_from_filename(file) | |
| if lbl: | |
| feat = extract_features_static(file) | |
| if feat is not None: | |
| return (feat, lbl) | |
| except Exception as e: | |
| print(f"Error processing {file}: {e}") | |
| return None | |
| class EmotionClassifier: | |
| def __init__(self): | |
| self.model = None | |
| self.le = LabelEncoder() | |
| self.is_loaded = False | |
| self.load_artifacts() | |
| def load_artifacts(self): | |
| if os.path.exists(MODEL_PATH) and os.path.exists(ENCODER_PATH): | |
| try: | |
| self.model = load_model(MODEL_PATH) | |
| with open(ENCODER_PATH, 'rb') as f: | |
| self.le = pickle.load(f) | |
| # Consistency Check | |
| expected_classes = self.model.output_shape[1] | |
| actual_classes = len(self.le.classes_) | |
| if expected_classes != actual_classes: | |
| print(f"WARNING: Model expects {expected_classes} classes but encoder has {actual_classes}.") | |
| print("This may cause 'unseen labels' errors. Please re-train or ensure files match.") | |
| self.is_loaded = True | |
| print("Model and encoder loaded successfully.") | |
| except Exception as e: | |
| print(f"Failed to load artifacts: {e}") | |
| else: | |
| print("No pre-trained model found. System ready for training.") | |
| def extract_features(self, file_path): | |
| """Wrapper for static extraction method.""" | |
| return extract_features_static(file_path) | |
| def train_model(self, data_path, log_callback=None): | |
| """Trains the model from scratch using data in data_path.""" | |
| def log(msg): | |
| if log_callback: | |
| log_callback(msg) | |
| else: | |
| print(msg) | |
| # Cache paths | |
| features_cache_path = os.path.join(data_path, "features_cache.npy") | |
| labels_cache_path = os.path.join(data_path, "labels_cache.npy") | |
| X = None | |
| y = None | |
| # Check cache | |
| if os.path.exists(features_cache_path) and os.path.exists(labels_cache_path): | |
| log("Found cached features. Loading from disk...") | |
| try: | |
| X = np.load(features_cache_path) | |
| y = np.load(labels_cache_path) | |
| log(f"Loaded {len(X)} cached samples.") | |
| except Exception as e: | |
| log(f"Failed to load cache: {e}. Recomputing...") | |
| X = None | |
| y = None | |
| if X is None or y is None: | |
| files = [] | |
| # Walk through directory | |
| for root, _, filenames in os.walk(data_path): | |
| for f in filenames: | |
| if f.endswith('.wav'): | |
| files.append(os.path.join(root, f)) | |
| if not files: | |
| log("DEBUG: No .wav files found in os.walk") | |
| raise ValueError("No .wav files found for training.") | |
| log(f"Processing {len(files)} files for training utilizing parallel processing...") | |
| # Run in parallel | |
| # n_jobs=1 avoids Windows multiprocessing issues | |
| results = Parallel(n_jobs=1, verbose=5)(delayed(process_file)(f) for f in files) | |
| # Filter None results | |
| valid_results = [r for r in results if r is not None] | |
| if not valid_results: | |
| log("CRITICAL: No features extracted successfully!") | |
| raise ValueError("No features extracted. Check files and labels.") | |
| log(f"Successfully processed {len(valid_results)}/{len(files)} files.") | |
| features = [r[0] for r in valid_results] | |
| labels = [r[1] for r in valid_results] | |
| X = np.array(features, dtype='float32') | |
| y = np.array(labels) | |
| # Save cache | |
| log("Saving features to cache...") | |
| np.save(features_cache_path, X) | |
| np.save(labels_cache_path, y) | |
| # Encode labels | |
| y_encoded = to_categorical(self.le.fit_transform(y)) | |
| # Split | |
| X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42, stratify=y) # stratify=y handles class imbalance better | |
| # Build Model | |
| self.model = Sequential([ | |
| Conv1D(128, kernel_size=5, padding="same", activation="relu", input_shape=(X_train.shape[1], X_train.shape[2])), | |
| MaxPooling1D(pool_size=2), | |
| BatchNormalization(), | |
| Conv1D(256, kernel_size=5, padding="same", activation="relu"), | |
| MaxPooling1D(pool_size=2), | |
| BatchNormalization(), | |
| Dropout(0.3), | |
| Flatten(), | |
| Dense(256, activation='relu'), | |
| Dropout(0.4), | |
| Dense(y_encoded.shape[1], activation='softmax') | |
| ]) | |
| self.model.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=0.0001), metrics=['accuracy']) | |
| early_stop = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True) | |
| log("Starting training...") | |
| class LogCallback(tf.keras.callbacks.Callback): | |
| def on_epoch_end(self, epoch, logs=None): | |
| log(f"Epoch {epoch+1}: loss={logs['loss']:.4f}, acc={logs['accuracy']:.4f}, val_loss={logs['val_loss']:.4f}") | |
| self.model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test), callbacks=[early_stop, LogCallback()], verbose=0) | |
| # Save artifacts | |
| self.model.save(MODEL_PATH) | |
| with open(ENCODER_PATH, 'wb') as f: | |
| pickle.dump(self.le, f) | |
| self.is_loaded = True | |
| log("Training complete and model saved.") | |
| return {"accuracy": self.model.evaluate(X_test, y_test)[1]} | |
| def predict_emotion(self, file_path): | |
| if not self.is_loaded: | |
| raise ValueError("Model not loaded. Train the model first.") | |
| mfcc = self.extract_features(file_path) | |
| if mfcc is None: | |
| raise ValueError("Could not extract features.") | |
| mfcc = mfcc[np.newaxis, :, :] # Add batch dimension | |
| prediction = self.model.predict(mfcc, verbose=0) | |
| # Get all probabilities | |
| probs = prediction[0] | |
| classes = self.le.classes_ | |
| # Determine predicted index | |
| predicted_index = np.argmax(prediction) | |
| # Safety check for unseen labels | |
| if predicted_index >= len(classes): | |
| print(f"ERROR: Predicted index {predicted_index} is out of bounds (0-{len(classes)-1}).") | |
| # Fallback: Get valid index with highest probability | |
| # We slice probs to match classes length just in case the model output is larger | |
| valid_probs = probs[:len(classes)] | |
| predicted_index = np.argmax(valid_probs) | |
| print(f"Fallback: Using index {predicted_index} ({classes[predicted_index]})") | |
| predicted_label = self.le.inverse_transform([predicted_index])[0] | |
| confidence = float(probs[predicted_index]) | |
| # Create distribution dict {label: score} | |
| # Only zip up to the length of classes to avoid errors | |
| distribution = {label: float(score) for label, score in zip(classes, probs)} | |
| return { | |
| "label": predicted_label, | |
| "confidence": confidence, | |
| "distribution": distribution | |
| } | |