import os import numpy as np import librosa from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder, StandardScaler from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.svm import SVC from sklearn.neural_network import MLPClassifier from sklearn.metrics import classification_report from sklearn.utils import class_weight import xgboost as xgb import joblib from signal_processing import SignalProcessor class SoundClassifier: def __init__(self, data_dir, model_type='rf', sr=22050, duration=20, include_benchmark=True, use_class_weights=True, augmented_data_dir=None, use_enhanced_features=True): self.data_dir = data_dir self.sr = sr self.duration = duration self.model = None self.le = LabelEncoder() self.scaler = StandardScaler() self.model_type = model_type self.include_benchmark = include_benchmark self.use_class_weights = use_class_weights self.augmented_data_dir = augmented_data_dir self.use_enhanced_features = use_enhanced_features self.signal_processor = SignalProcessor(sr=sr) def extract_features(self, file_path): # Load audio file y, _ = librosa.load(file_path, sr=self.sr, duration=self.duration) # Pad or truncate to fixed length if len(y) < self.sr * self.duration: y = np.pad(y, (0, self.sr * self.duration - len(y))) else: y = y[:self.sr * self.duration] # Check if this is a valve lash file and use enhanced features if enabled if self.use_enhanced_features and ('valve_lash' in file_path or 'enhanced_valve_lash' in file_path): # Apply valve lash specific processing y = self.signal_processor.bandpass_filter(y, low_freq=800, high_freq=5000) y = self.signal_processor.enhance_transients(y, threshold=0.05, boost_factor=2.5) # Use specialized valve lash feature extraction return self.signal_processor.extract_valve_lash_features(y) # Standard feature extraction for other audio types # Extract features mfccs = librosa.feature.mfcc(y=y, sr=self.sr, n_mfcc=13) spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=self.sr) spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=self.sr) # Add more features for better classification chroma = librosa.feature.chroma_stft(y=y, sr=self.sr) zero_crossing_rate = librosa.feature.zero_crossing_rate(y) # Compute statistics - ensure all arrays are 1D features = np.concatenate([ mfccs.mean(axis=1), mfccs.std(axis=1), spectral_centroid.mean(axis=1), spectral_rolloff.mean(axis=1), chroma.mean(axis=1), zero_crossing_rate.mean(axis=1).reshape(-1) # Ensure 1D array ]) return features def prepare_data(self): X = [] y = [] # Determine which data directory to use data_dirs = [self.data_dir] if self.augmented_data_dir and os.path.exists(self.augmented_data_dir): data_dirs = [self.augmented_data_dir] # Use only augmented data if available print(f"Using augmented data from {self.augmented_data_dir}") # Check if enhanced valve lash directory exists and add it enhanced_valve_lash_dir = os.path.join(os.path.dirname(self.data_dir), 'enhanced_valve_lash') if self.use_enhanced_features and os.path.exists(enhanced_valve_lash_dir): print(f"Including enhanced valve lash data from {enhanced_valve_lash_dir}") # Add enhanced valve lash directory to data dirs if using original data if self.data_dir in data_dirs: data_dirs.append(enhanced_valve_lash_dir) for data_dir in data_dirs: # Iterate through each issue folder for issue in os.listdir(data_dir): issue_path = os.path.join(data_dir, issue) if os.path.isdir(issue_path): # Skip benchmark folder if not included if issue == 'benchmark' and not self.include_benchmark: continue # Process each audio file in the folder for audio_file in os.listdir(issue_path): if audio_file.endswith('.wav'): file_path = os.path.join(issue_path, audio_file) features = self.extract_features(file_path) X.append(features) # Label benchmark data as 'normal' and other folders as their respective issues if issue == 'benchmark': y.append('normal') # Handle enhanced valve lash directory elif data_dir == enhanced_valve_lash_dir: y.append('valve_lash') else: y.append(issue) print(f"Total samples: {len(X)}") # Count samples per class class_counts = {} for label in y: if label not in class_counts: class_counts[label] = 0 class_counts[label] += 1 print(f"Class distribution: {class_counts}") # Check feature dimensions feature_lengths = [len(x) for x in X] if len(set(feature_lengths)) > 1: print(f"Warning: Inconsistent feature lengths detected: {set(feature_lengths)}") # Find the most common feature length from collections import Counter most_common_length = Counter(feature_lengths).most_common(1)[0][0] print(f"Standardizing to length {most_common_length}") # Standardize feature lengths X_standardized = [] y_standardized = [] for i, x in enumerate(X): if len(x) == most_common_length: X_standardized.append(x) y_standardized.append(y[i]) else: print(f"Skipping sample with length {len(x)}") X = X_standardized y = y_standardized print(f"After standardization: {len(X)} samples") X = np.array(X) y = np.array(y) # Encode labels y_encoded = self.le.fit_transform(y) return X, y_encoded, y def train(self): # Prepare data X, y_encoded, y_original = self.prepare_data() # Split data X_train, X_test, y_train, y_test = train_test_split( X, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded ) # Scale features X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) # Calculate class weights if enabled class_weights = None if self.use_class_weights: class_weights = class_weight.compute_class_weight( 'balanced', classes=np.unique(y_train), y=y_train ) class_weights = dict(zip(np.unique(y_train), class_weights)) print(f"Using class weights: {class_weights}") # Train model based on model_type if self.model_type == 'rf': self.model = RandomForestClassifier( n_estimators=100, random_state=42, class_weight=class_weights if self.use_class_weights else None, n_jobs=-1 # Use all available cores ) elif self.model_type == 'lr': self.model = LogisticRegression( random_state=42, max_iter=1000, class_weight=class_weights if self.use_class_weights else None, multi_class='multinomial', solver='lbfgs' ) elif self.model_type == 'svm': self.model = SVC( kernel='rbf', random_state=42, class_weight=class_weights if self.use_class_weights else None, probability=True # Enable probability estimates ) elif self.model_type == 'nn': self.model = MLPClassifier( hidden_layer_sizes=(100, 50), max_iter=1000, random_state=42, early_stopping=True, # Enable early stopping validation_fraction=0.1 # Use 10% of training data for validation ) elif self.model_type == 'xgb': # Prepare sample weights if class weights are enabled sample_weights = None if self.use_class_weights: sample_weights = np.ones(len(y_train)) for i, y in enumerate(y_train): sample_weights[i] = class_weights.get(y, 1.0) # Create XGBoost model self.model = xgb.XGBClassifier( n_estimators=100, learning_rate=0.1, max_depth=5, random_state=42, use_label_encoder=False, eval_metric='mlogloss', n_jobs=-1 # Use all available cores ) # Fit with sample weights if available if sample_weights is not None: self.model.fit(X_train_scaled, y_train, sample_weight=sample_weights) else: self.model.fit(X_train_scaled, y_train) # Skip the general fit below since we've already fit the model fitted = True else: raise ValueError("Invalid model type. Choose 'rf', 'lr', 'svm', 'nn', or 'xgb'.") # Fit the model if not already fitted if not locals().get('fitted', False): self.model.fit(X_train_scaled, y_train) # Evaluate y_pred = self.model.predict(X_test_scaled) print(f"\nModel Performance ({self.model_type}):") print(classification_report(y_test, y_pred, labels=np.unique(y_test), target_names=self.le.classes_[np.unique(y_test)])) return self.model def predict(self, audio_file): # Extract features from new audio features = self.extract_features(audio_file) # Scale features features_scaled = self.scaler.transform([features]) # Make prediction prediction = self.model.predict(features_scaled)[0] predicted_label = self.le.inverse_transform([prediction])[0] # Get prediction probabilities if available confidence = None if hasattr(self.model, 'predict_proba'): proba = self.model.predict_proba(features_scaled)[0] confidence = proba[prediction] return predicted_label, confidence def save_model(self, model_path='sound_classifier_model.joblib'): """Save the trained model, label encoder, and scaler""" if self.model is None: raise ValueError("Model hasn't been trained yet!") model_data = { 'model': self.model, 'label_encoder': self.le, 'scaler': self.scaler, 'model_type': self.model_type, 'include_benchmark': self.include_benchmark, 'use_class_weights': self.use_class_weights, 'use_enhanced_features': self.use_enhanced_features } joblib.dump(model_data, model_path) @classmethod def load_model(cls, model_path='sound_classifier_model.joblib'): """Load a trained model""" model_data = joblib.load(model_path) # Create instance with appropriate parameters classifier = cls( data_dir=None, model_type=model_data.get('model_type', 'rf'), include_benchmark=model_data.get('include_benchmark', True), use_class_weights=model_data.get('use_class_weights', True), use_enhanced_features=model_data.get('use_enhanced_features', True) ) classifier.model = model_data['model'] classifier.le = model_data['label_encoder'] classifier.scaler = model_data['scaler'] return classifier