""" //ml_classifier.py CSR Call Recording - ML Classifier Module Emotion classification using SVM, Random Forest, and KNN """ import os import sys import json import pickle import warnings import numpy as np import pandas as pd from pathlib import Path from datetime import datetime # Machine Learning from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.svm import SVC from sklearn.ensemble import RandomForestClassifier from sklearn.neighbors import KNeighborsClassifier from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, precision_recall_fscore_support, roc_auc_score ) try: import matplotlib.pyplot as plt import seaborn as sns import matplotlib matplotlib.use('Agg') MATPLOTLIB_AVAILABLE = True except ImportError: MATPLOTLIB_AVAILABLE = False print("Warning: matplotlib/seaborn not available. Visualization disabled.") warnings.filterwarnings('ignore') class EmotionClassifier: """ ML Classifier for Emotional State Classification Supports: SVM, Random Forest, KNN """ # Emotion labels EMOTIONS = ['angry', 'happy', 'sad', 'neutral', 'frustrated', 'satisfied'] # Available classifiers CLASSIFIERS = { 'svm': 'Support Vector Machine', 'rf': 'Random Forest', 'knn': 'K-Nearest Neighbors' } def __init__(self, classifier_type='svm', random_state=42): """ Initialize Emotion Classifier Args: classifier_type (str): 'svm', 'rf', or 'knn' random_state (int): Random seed for reproducibility """ self.classifier_type = classifier_type.lower() self.random_state = random_state if self.classifier_type not in self.CLASSIFIERS: raise ValueError( f"Classifier must be one of: {list(self.CLASSIFIERS.keys())}") # Initialize components self.scaler = StandardScaler() self.label_encoder = LabelEncoder() self.model = None self.is_trained = False # Training history self.training_history = { 'classifier_type': classifier_type, 'trained_at': None, 'training_samples': 0, 'test_accuracy': 0.0, 'feature_count': 0, 'emotions': [] } print(f"{'='*70}") print(f"ML Emotion Classifier Initialized") print(f"{'='*70}") print(f"Classifier: {self.CLASSIFIERS[self.classifier_type]}") print(f"Random State: {random_state}") print(f"{'='*70}\n") def _mock_predict(self, X, return_probabilities=True): """ TEMPORARY: Return mock predictions for testing This should be replaced with real trained model predictions """ import random # Ensure X is 2D if len(X.shape) == 1: X = X.reshape(1, -1) n_samples = X.shape[0] # Generate mock predictions mock_emotions = random.choices(self.EMOTIONS, k=n_samples) results = { 'predictions': mock_emotions, 'num_samples': n_samples } if return_probabilities: probabilities = [] for emotion in mock_emotions: # Generate realistic-looking probabilities base_prob = random.uniform(0.55, 0.85) # Confidence for predicted emotion remaining = 1.0 - base_prob # Distribute remaining probability among other emotions other_emotions = [e for e in self.EMOTIONS if e != emotion] other_probs = np.random.dirichlet(np.ones(len(other_emotions))) other_probs = other_probs * remaining # Create probability dict prob_dict = {emotion: float(base_prob)} for other_emotion, prob in zip(other_emotions, other_probs): prob_dict[other_emotion] = float(prob) # Sort by probability prob_dict = dict(sorted(prob_dict.items(), key=lambda x: x[1], reverse=True)) probabilities.append({ 'predicted_emotion': emotion, 'confidence': float(base_prob), 'all_probabilities': prob_dict }) results['probabilities'] = probabilities return results def _create_classifier(self, **kwargs): """ Create classifier instance based on type Args: **kwargs: Classifier-specific parameters Returns: Classifier instance """ if self.classifier_type == 'svm': return SVC( kernel=kwargs.get('kernel', 'rbf'), C=kwargs.get('C', 1.0), gamma=kwargs.get('gamma', 'scale'), probability=True, random_state=self.random_state ) elif self.classifier_type == 'rf': return RandomForestClassifier( n_estimators=kwargs.get('n_estimators', 100), max_depth=kwargs.get('max_depth', None), min_samples_split=kwargs.get('min_samples_split', 2), min_samples_leaf=kwargs.get('min_samples_leaf', 1), random_state=self.random_state ) elif self.classifier_type == 'knn': return KNeighborsClassifier( n_neighbors=kwargs.get('n_neighbors', 5), weights=kwargs.get('weights', 'uniform'), metric=kwargs.get('metric', 'minkowski') ) def train(self, X, y, test_size=0.2, **classifier_params): """ Train the classifier Args: X (np.array): Feature matrix (n_samples, n_features) y (np.array): Labels (n_samples,) test_size (float): Proportion of test set **classifier_params: Classifier-specific parameters Returns: dict: Training results """ print(f"\n{'='*70}") print(f"TRAINING {self.CLASSIFIERS[self.classifier_type].upper()}") print(f"{'='*70}\n") # Validate input if len(X) != len(y): raise ValueError( f"X and y must have same length. Got X:{len(X)}, y:{len(y)}") print(f"[Data] Total samples: {len(X)}") print(f"[Data] Features per sample: {X.shape[1]}") print(f"[Data] Unique emotions: {np.unique(y)}") print(f"[Data] Test size: {test_size * 100}%\n") # Encode labels y_encoded = self.label_encoder.fit_transform(y) # Split data X_train, X_test, y_train, y_test = train_test_split( X, y_encoded, test_size=test_size, random_state=self.random_state, stratify=y_encoded ) print(f"[Split] Training samples: {len(X_train)}") print(f"[Split] Test samples: {len(X_test)}\n") # Scale features print("[Preprocessing] Scaling features...") X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) print("✓ Features scaled\n") # Create and train classifier print( f"[Training] Training {self.CLASSIFIERS[self.classifier_type]}...") self.model = self._create_classifier(**classifier_params) self.model.fit(X_train_scaled, y_train) print("✓ Training complete\n") # Evaluate on test set print("[Evaluation] Testing model...") y_pred = self.model.predict(X_test_scaled) y_pred_proba = self.model.predict_proba(X_test_scaled) # Calculate metrics accuracy = accuracy_score(y_test, y_pred) precision, recall, f1, support = precision_recall_fscore_support( y_test, y_pred, average='weighted' ) # Confusion matrix cm = confusion_matrix(y_test, y_pred) # Cross-validation print("[Cross-Validation] Running 5-fold CV...") cv_scores = cross_val_score( self.model, X_train_scaled, y_train, cv=5, scoring='accuracy' ) # Update training history self.is_trained = True self.training_history.update({ 'trained_at': datetime.now().isoformat(), 'training_samples': len(X_train), 'test_samples': len(X_test), 'test_accuracy': float(accuracy), 'precision': float(precision), 'recall': float(recall), 'f1_score': float(f1), 'cv_scores': cv_scores.tolist(), 'cv_mean': float(cv_scores.mean()), 'cv_std': float(cv_scores.std()), 'feature_count': X.shape[1], 'emotions': self.label_encoder.classes_.tolist(), 'confusion_matrix': cm.tolist() }) # Display results print(f"\n{'='*70}") print(f"TRAINING RESULTS") print(f"{'='*70}") print(f"Test Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)") print(f"Precision: {precision:.4f}") print(f"Recall: {recall:.4f}") print(f"F1-Score: {f1:.4f}") print(f"\nCross-Validation (5-fold):") print( f" Mean CV Accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})") print(f" CV Scores: {cv_scores}") print(f"{'='*70}\n") # Detailed classification report print("Classification Report:") print(classification_report( y_test, y_pred, target_names=self.label_encoder.classes_ )) results = { 'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1_score': f1, 'cv_scores': cv_scores, 'cv_mean': cv_scores.mean(), 'cv_std': cv_scores.std(), 'confusion_matrix': cm, 'classification_report': classification_report( y_test, y_pred, target_names=self.label_encoder.classes_, output_dict=True ), 'predictions': y_pred, 'true_labels': y_test, 'predicted_probabilities': y_pred_proba } return results def predict(self, X, return_probabilities=True): """ Predict emotions for new samples Args: X (np.array): Feature matrix (n_samples, n_features) return_probabilities (bool): Return probability scores Returns: dict: Predictions with labels and probabilities """ if not self.is_trained: # TEMPORARY: Return mock predictions for testing print("⚠️ WARNING: Using MOCK predictions - model not trained yet!") return self._mock_predict(X, return_probabilities) # Ensure X is 2D if len(X.shape) == 1: X = X.reshape(1, -1) # Scale features X_scaled = self.scaler.transform(X) # Predict y_pred_encoded = self.model.predict(X_scaled) y_pred_labels = self.label_encoder.inverse_transform(y_pred_encoded) results = { 'predictions': y_pred_labels.tolist(), 'num_samples': len(X) } if return_probabilities: y_pred_proba = self.model.predict_proba(X_scaled) # Get probabilities for each emotion probabilities = [] for i, probs in enumerate(y_pred_proba): prob_dict = { emotion: float(prob) for emotion, prob in zip(self.label_encoder.classes_, probs) } # Sort by probability prob_dict = dict( sorted(prob_dict.items(), key=lambda x: x[1], reverse=True)) probabilities.append({ 'predicted_emotion': y_pred_labels[i], 'confidence': float(max(probs)), 'all_probabilities': prob_dict }) results['probabilities'] = probabilities return results def predict_single(self, feature_vector): """ Predict emotion for a single call recording Args: feature_vector (np.array or list): Single feature vector Returns: dict: Prediction result """ try: if isinstance(feature_vector, list): feature_vector = np.array(feature_vector) # Ensure it's a 1D array before reshaping if len(feature_vector.shape) > 1: feature_vector = feature_vector.flatten() result = self.predict(feature_vector.reshape(1, -1)) prediction = { 'emotion': result['predictions'][0], 'confidence': result['probabilities'][0]['confidence'], 'all_probabilities': result['probabilities'][0]['all_probabilities'] } return prediction except Exception as e: print(f"Error in predict_single: {e}") return { 'error': str(e), 'emotion': 'neutral', 'confidence': 0.0, 'all_probabilities': {emotion: 0.0 for emotion in self.EMOTIONS} } def optimize_hyperparameters(self, X, y, cv=5): """ Optimize classifier hyperparameters using GridSearchCV Args: X (np.array): Feature matrix y (np.array): Labels cv (int): Number of cross-validation folds Returns: dict: Best parameters and scores """ print(f"\n{'='*70}") print(f"HYPERPARAMETER OPTIMIZATION") print(f"{'='*70}\n") # Encode labels and scale features y_encoded = self.label_encoder.fit_transform(y) X_scaled = self.scaler.fit_transform(X) # Define parameter grids param_grids = { 'svm': { 'kernel': ['rbf', 'linear', 'poly'], 'C': [0.1, 1, 10, 100], 'gamma': ['scale', 'auto', 0.001, 0.01] }, 'rf': { 'n_estimators': [50, 100, 200, 300], 'max_depth': [None, 10, 20, 30], 'min_samples_split': [2, 5, 10], 'min_samples_leaf': [1, 2, 4] }, 'knn': { 'n_neighbors': [3, 5, 7, 9, 11], 'weights': ['uniform', 'distance'], 'metric': ['euclidean', 'manhattan', 'minkowski'] } } param_grid = param_grids[self.classifier_type] print(f"[Optimization] Searching parameter space...") print(f"[Optimization] Parameter grid: {param_grid}\n") # Create base classifier base_model = self._create_classifier() # Grid search grid_search = GridSearchCV( base_model, param_grid, cv=cv, scoring='accuracy', n_jobs=-1, verbose=1 ) grid_search.fit(X_scaled, y_encoded) print(f"\n{'='*70}") print(f"OPTIMIZATION RESULTS") print(f"{'='*70}") print(f"Best Score: {grid_search.best_score_:.4f}") print(f"Best Parameters: {grid_search.best_params_}") print(f"{'='*70}\n") # Update model with best parameters self.model = grid_search.best_estimator_ self.is_trained = True return { 'best_score': grid_search.best_score_, 'best_params': grid_search.best_params_, 'cv_results': grid_search.cv_results_ } def save_model(self, filepath='emotion_classifier_model.pkl'): """ Save trained model to file Args: filepath (str): Path to save model """ if not self.is_trained: raise RuntimeError("Cannot save untrained model") model_data = { 'model': self.model, 'scaler': self.scaler, 'label_encoder': self.label_encoder, 'classifier_type': self.classifier_type, 'training_history': self.training_history, 'is_trained': self.is_trained } with open(filepath, 'wb') as f: pickle.dump(model_data, f) print(f"✓ Model saved to: {filepath}") return filepath def load_model(self, filepath): """ Load trained model from file Args: filepath (str): Path to saved model """ if not os.path.exists(filepath): raise FileNotFoundError(f"Model file not found: {filepath}") with open(filepath, 'rb') as f: model_data = pickle.load(f) self.model = model_data['model'] self.scaler = model_data['scaler'] self.label_encoder = model_data['label_encoder'] self.classifier_type = model_data['classifier_type'] self.training_history = model_data['training_history'] self.is_trained = model_data['is_trained'] print(f"✓ Model loaded from: {filepath}") print(f" Classifier: {self.CLASSIFIERS[self.classifier_type]}") print(f" Trained at: {self.training_history['trained_at']}") print( f" Training accuracy: {self.training_history['test_accuracy']:.4f}") def visualize_results(self, results, output_dir='visualizations'): """ Create visualizations of training results Args: results (dict): Training results from train() output_dir (str): Directory to save plots Returns: dict: Paths to saved visualizations """ if not MATPLOTLIB_AVAILABLE: print("Matplotlib not available. Skipping visualization.") return {} output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) saved_plots = {} # 1. Confusion Matrix plt.figure(figsize=(10, 8)) sns.heatmap( results['confusion_matrix'], annot=True, fmt='d', cmap='Blues', xticklabels=self.label_encoder.classes_, yticklabels=self.label_encoder.classes_ ) plt.title( f'Confusion Matrix - {self.CLASSIFIERS[self.classifier_type]}') plt.ylabel('True Label') plt.xlabel('Predicted Label') plt.tight_layout() cm_path = output_path / f'{self.classifier_type}_confusion_matrix.png' plt.savefig(cm_path, dpi=150) plt.close() saved_plots['confusion_matrix'] = str(cm_path) # 2. Classification Report Heatmap report_dict = results['classification_report'] report_data = [] emotions = [e for e in report_dict.keys() if e not in [ 'accuracy', 'macro avg', 'weighted avg']] for emotion in emotions: report_data.append([ report_dict[emotion]['precision'], report_dict[emotion]['recall'], report_dict[emotion]['f1-score'] ]) plt.figure(figsize=(8, 6)) sns.heatmap( report_data, annot=True, fmt='.3f', cmap='YlGnBu', xticklabels=['Precision', 'Recall', 'F1-Score'], yticklabels=emotions ) plt.title( f'Classification Metrics - {self.CLASSIFIERS[self.classifier_type]}') plt.tight_layout() metrics_path = output_path / f'{self.classifier_type}_metrics.png' plt.savefig(metrics_path, dpi=150) plt.close() saved_plots['metrics'] = str(metrics_path) # 3. Cross-Validation Scores plt.figure(figsize=(10, 6)) cv_scores = results['cv_scores'] folds = range(1, len(cv_scores) + 1) plt.bar(folds, cv_scores, color='skyblue', alpha=0.7) plt.axhline(y=cv_scores.mean(), color='r', linestyle='--', label=f'Mean: {cv_scores.mean():.4f}') plt.xlabel('Fold') plt.ylabel('Accuracy') plt.title( f'Cross-Validation Scores - {self.CLASSIFIERS[self.classifier_type]}') plt.legend() plt.grid(axis='y', alpha=0.3) plt.tight_layout() cv_path = output_path / f'{self.classifier_type}_cv_scores.png' plt.savefig(cv_path, dpi=150) plt.close() saved_plots['cv_scores'] = str(cv_path) # 4. Feature Importance (Random Forest only) if self.classifier_type == 'rf' and hasattr(self.model, 'feature_importances_'): importances = self.model.feature_importances_ indices = np.argsort(importances)[::-1][:20] # Top 20 features plt.figure(figsize=(12, 6)) plt.bar(range(len(indices)), importances[indices]) plt.title('Top 20 Feature Importances - Random Forest') plt.xlabel('Feature Index') plt.ylabel('Importance') plt.tight_layout() fi_path = output_path / 'rf_feature_importance.png' plt.savefig(fi_path, dpi=150) plt.close() saved_plots['feature_importance'] = str(fi_path) print(f"✓ Visualizations saved to: {output_dir}/\n") return saved_plots def compare_classifiers(self, X, y, test_size=0.2): """ Compare all three classifiers on the same dataset Args: X (np.array): Feature matrix y (np.array): Labels test_size (float): Test set proportion Returns: dict: Comparison results """ print(f"\n{'='*70}") print(f"COMPARING ALL CLASSIFIERS") print(f"{'='*70}\n") results = {} for clf_type in ['svm', 'rf', 'knn']: print(f"\nTraining {self.CLASSIFIERS[clf_type]}...") print(f"{'-'*70}") classifier = EmotionClassifier( classifier_type=clf_type, random_state=self.random_state) clf_results = classifier.train(X, y, test_size=test_size) results[clf_type] = { 'accuracy': clf_results['accuracy'], 'precision': clf_results['precision'], 'recall': clf_results['recall'], 'f1_score': clf_results['f1_score'], 'cv_mean': clf_results['cv_mean'], 'cv_std': clf_results['cv_std'] } # Display comparison print(f"\n{'='*70}") print(f"COMPARISON RESULTS") print(f"{'='*70}\n") comparison_df = pd.DataFrame(results).T comparison_df.index = [self.CLASSIFIERS[idx] for idx in comparison_df.index] print(comparison_df.to_string()) print(f"\n{'='*70}\n") # Find best classifier best_clf = max(results.items(), key=lambda x: x[1]['accuracy']) print(f"Best Classifier: {self.CLASSIFIERS[best_clf[0]]}") print(f"Accuracy: {best_clf[1]['accuracy']:.4f}\n") return results def main(): """Main function for command-line usage""" import argparse parser = argparse.ArgumentParser( description='CSR Call Recording - ML Emotion Classifier', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Train SVM classifier python ml_classifier.py --features features.npy --labels labels.npy --classifier svm # Train and compare all classifiers python ml_classifier.py --features features.npy --labels labels.npy --compare # Train with optimization python ml_classifier.py --features features.npy --labels labels.npy --classifier rf --optimize # Predict emotions python ml_classifier.py --predict feature_vector.npy --model saved_model.pkl """ ) parser.add_argument( '--features', help='Path to feature matrix (.npy file)' ) parser.add_argument( '--labels', help='Path to labels (.npy file)' ) parser.add_argument( '--classifier', choices=['svm', 'rf', 'knn'], default='svm', help='Classifier type (default: svm)' ) parser.add_argument( '--test-size', type=float, default=0.2, help='Test set proportion (default: 0.2)' ) parser.add_argument( '--optimize', action='store_true', help='Optimize hyperparameters' ) parser.add_argument( '--compare', action='store_true', help='Compare all classifiers' ) parser.add_argument( '--save-model', help='Path to save trained model' ) parser.add_argument( '--predict', help='Path to feature vector for prediction' ) parser.add_argument( '--model', help='Path to saved model for prediction' ) parser.add_argument( '--visualize', action='store_true', help='Create visualization plots' ) args = parser.parse_args() # Prediction mode if args.predict: if not args.model: print("Error: --model required for prediction") sys.exit(1) classifier = EmotionClassifier() classifier.load_model(args.model) features = np.load(args.predict) result = classifier.predict(features) print(f"\n{'='*70}") print(f"PREDICTION RESULTS") print(f"{'='*70}") for i, pred in enumerate(result['probabilities']): print(f"\nSample {i+1}:") print(f" Emotion: {pred['predicted_emotion']}") print(f" Confidence: {pred['confidence']:.4f}") print(f" All probabilities:") for emotion, prob in pred['all_probabilities'].items(): print(f" {emotion}: {prob:.4f}") print(f"{'='*70}\n") sys.exit(0) # Training mode if not args.features or not args.labels: print("Error: --features and --labels required for training") sys.exit(1) # Load data X = np.load(args.features) y = np.load(args.labels) print(f"Loaded data: {X.shape[0]} samples, {X.shape[1]} features") # Compare mode if args.compare: classifier = EmotionClassifier() results = classifier.compare_classifiers( X, y, test_size=args.test_size) sys.exit(0) # Single classifier mode classifier = EmotionClassifier(classifier_type=args.classifier) if args.optimize: classifier.optimize_hyperparameters(X, y) results = classifier.train(X, y, test_size=args.test_size) else: results = classifier.train(X, y, test_size=args.test_size) # Visualize if args.visualize: classifier.visualize_results(results) # Save model if args.save_model: classifier.save_model(args.save_model) else: # Default save path default_path = f'{args.classifier}_emotion_classifier.pkl' classifier.save_model(default_path) print(f"\n✓ Training complete!") if __name__ == '__main__': main()