import os import glob import librosa import numpy as np import pandas as pd import scipy.signal import matplotlib.pyplot as plt import seaborn as sns from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.svm import SVC from sklearn.metrics import accuracy_score, confusion_matrix, roc_curve, auc, cohen_kappa_score import joblib # Numpy 2.0 compatibility for librosa if not hasattr(np, 'trapz'): np.trapz = np.trapezoid if not hasattr(np, 'in1d'): def in1d_patch(ar1, ar2, assume_unique=False, invert=False): return np.isin(ar1, ar2, assume_unique=assume_unique, invert=invert) np.in1d = in1d_patch # Config DATASET_DIR = "dataset" TARGET_SR = 16000 AUDIO_LENGTH_SEC = 5 os.makedirs("weights", exist_ok=True) os.makedirs("metrics", exist_ok=True) def apply_clinical_bandpass(y, sr): nyq = 0.5 * sr low = 25.0 / nyq high = 400.0 / nyq b, a = scipy.signal.butter(4, [low, high], btype='band') return scipy.signal.filtfilt(b, a, y) def extract_statistical_features(y, sr): """Extracts 1D interpretable statistical biomarkers.""" features = {} mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) for i in range(13): features[f'mfcc_{i}_mean'] = np.mean(mfccs[i]) features[f'mfcc_{i}_std'] = np.std(mfccs[i]) features['centroid_mean'] = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr)) features['zcr_mean'] = np.mean(librosa.feature.zero_crossing_rate(y)) features['rms_mean'] = np.mean(librosa.feature.rms(y=y)) prob = np.square(np.abs(librosa.stft(y))) prob = prob / np.sum(prob) features['entropy'] = -np.sum(prob * np.log2(prob + 1e-10)) return features def load_dataset(): print("Scanning dataset directory...") files = glob.glob(os.path.join(DATASET_DIR, "*.wav")) if not files: print("ERROR: No .wav files found in dataset/") return None, None X_features = [] y_labels = [] for f in files: try: basename = os.path.basename(f).lower() label = 1 if 'murmur' in basename or 'abnormal' in basename else 0 y, sr = librosa.load(f, sr=TARGET_SR, mono=True) y = librosa.util.normalize(y) y_clean = apply_clinical_bandpass(y, sr) target_length = TARGET_SR * AUDIO_LENGTH_SEC if len(y_clean) > target_length: y_clean = y_clean[:target_length] else: y_clean = np.pad(y_clean, (0, target_length - len(y_clean))) feats = extract_statistical_features(y_clean, sr) X_features.append(feats) y_labels.append(label) except Exception as e: print(f"Error processing {f}: {e}") df = pd.DataFrame(X_features) labels = np.array(y_labels) print(f"Successfully processed {len(df)} canine recordings.") return df, labels def evaluate_model(y_true, y_pred): acc = accuracy_score(y_true, y_pred) cm = confusion_matrix(y_true, y_pred, labels=[0, 1]) if cm.shape == (2, 2): tn, fp, fn, tp = cm.ravel() sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0.0 specificity = tn / (tn + fp) if (tn + fp) > 0 else 0.0 else: # Handle all one class cases for tiny datasets sensitivity = 0.0 specificity = 0.0 return acc, sensitivity, specificity, cm def train_and_evaluate(): X, y = load_dataset() if X is None: return # Feature Scaling is critical for SVM and Logistic Regression scaler = StandardScaler() feature_names = X.columns X_scaled = scaler.fit_transform(X) X_scaled = pd.DataFrame(X_scaled, columns=feature_names) joblib.dump(scaler, "weights/scaler.pkl") joblib.dump(list(feature_names), "weights/feature_columns.pkl") # Strictly 70/30 split X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42) print(f"\n--- Training on {len(X_train)} samples, Testing on {len(X_test)} samples (70/30 Split) ---") models = { "Logistic Regression": LogisticRegression(max_iter=1000, random_state=42), "Random Forest": RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42), "SVM (RBF)": SVC(kernel='rbf', probability=True, random_state=42) } results = {} y_preds_all = {} y_proba_all = {} for name, model in models.items(): print(f"\nTraining {name}...") model.fit(X_train, y_train) y_pred = model.predict(X_test) y_proba = model.predict_proba(X_test)[:, 1] y_preds_all[name] = y_pred y_proba_all[name] = y_proba acc, sens, spec, cm = evaluate_model(y_test, y_pred) results[name] = { "Accuracy": acc, "Sensitivity": sens, "Specificity": spec, "CM": cm } print(f"Accuracy: {acc*100:.1f}%") print(f"Sensitivity: {sens*100:.1f}%") print(f"Specificity: {spec*100:.1f}%") filename = name.lower().replace(" ", "_").replace("(", "").replace(")", "") joblib.dump(model, f"weights/canine_{filename}.pkl") # 1. Output ROC Curve Plot plt.figure(figsize=(8, 6)) for name, y_proba in y_proba_all.items(): fpr, tpr, _ = roc_curve(y_test, y_proba) roc_auc = auc(fpr, tpr) plt.plot(fpr, tpr, lw=2, label=f'{name} (AUC = {roc_auc:.2f})') plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate (1 - Specificity)') plt.ylabel('True Positive Rate (Sensitivity)') plt.title('Receiver Operating Characteristic (ROC) Comparison') plt.legend(loc="lower right") plt.grid(True, alpha=0.3) plt.savefig('metrics/roc_curve.png') plt.close() # 2. Confusion Matrices Plot fig, axes = plt.subplots(1, 3, figsize=(15, 4)) for ax, (name, res) in zip(axes, results.items()): sns.heatmap(res["CM"], annot=True, fmt='d', cmap='Blues', ax=ax, cbar=False) ax.set_title(f'{name}\nAcc: {res["Accuracy"]:.2f}') ax.set_xlabel('Predicted Label') ax.set_ylabel('True Label') ax.set_xticklabels(['Normal (0)', 'Murmur (1)']) ax.set_yticklabels(['Normal (0)', 'Murmur (1)']) plt.tight_layout() plt.savefig('metrics/confusion_matrix.png') plt.close() # 3. Random Forest Feature Importance Plot rf_model = models["Random Forest"] importances = rf_model.feature_importances_ indices = np.argsort(importances)[::-1][:15] # Top 15 features plt.figure(figsize=(10, 6)) plt.title("Top 15 Feature Importances (Random Forest)") plt.bar(range(15), importances[indices], align="center", color='skyblue', edgecolor='black') plt.xticks(range(15), [feature_names[i] for i in indices], rotation=45, ha='right') plt.xlim([-1, 15]) plt.tight_layout() plt.savefig('metrics/feature_importance.png') plt.close() # 4. Model Agreement (Kappa between RF and SVM) kappa = cohen_kappa_score(y_preds_all["Random Forest"], y_preds_all["SVM (RBF)"]) print(f"\n--- Model Agreement ---") print(f"Cohen's Kappa (Random Forest vs SVM): {kappa:.3f}") print("\nTraining Pipeline Complete.") print("Interpretable Models saved to weights/") print("Clinical visual metrics saved to metrics/") if __name__ == "__main__": train_and_evaluate()