Spaces:
Running
Running
| import os | |
| import glob | |
| import librosa | |
| import numpy as np | |
| import pandas as pd | |
| import scipy.signal | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.svm import SVC | |
| from sklearn.metrics import accuracy_score, confusion_matrix, roc_curve, auc, cohen_kappa_score | |
| import joblib | |
| # Numpy 2.0 compatibility for librosa | |
| if not hasattr(np, 'trapz'): | |
| np.trapz = np.trapezoid | |
| if not hasattr(np, 'in1d'): | |
| def in1d_patch(ar1, ar2, assume_unique=False, invert=False): | |
| return np.isin(ar1, ar2, assume_unique=assume_unique, invert=invert) | |
| np.in1d = in1d_patch | |
| # Config | |
| DATASET_DIR = "dataset" | |
| TARGET_SR = 16000 | |
| AUDIO_LENGTH_SEC = 5 | |
| os.makedirs("weights", exist_ok=True) | |
| os.makedirs("metrics", exist_ok=True) | |
| def apply_clinical_bandpass(y, sr): | |
| nyq = 0.5 * sr | |
| low = 25.0 / nyq | |
| high = 400.0 / nyq | |
| b, a = scipy.signal.butter(4, [low, high], btype='band') | |
| return scipy.signal.filtfilt(b, a, y) | |
| def extract_statistical_features(y, sr): | |
| """Extracts 1D interpretable statistical biomarkers.""" | |
| features = {} | |
| mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) | |
| for i in range(13): | |
| features[f'mfcc_{i}_mean'] = np.mean(mfccs[i]) | |
| features[f'mfcc_{i}_std'] = np.std(mfccs[i]) | |
| features['centroid_mean'] = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr)) | |
| features['zcr_mean'] = np.mean(librosa.feature.zero_crossing_rate(y)) | |
| features['rms_mean'] = np.mean(librosa.feature.rms(y=y)) | |
| prob = np.square(np.abs(librosa.stft(y))) | |
| prob = prob / np.sum(prob) | |
| features['entropy'] = -np.sum(prob * np.log2(prob + 1e-10)) | |
| return features | |
| def load_dataset(): | |
| print("Scanning dataset directory...") | |
| files = glob.glob(os.path.join(DATASET_DIR, "*.wav")) | |
| if not files: | |
| print("ERROR: No .wav files found in dataset/") | |
| return None, None | |
| X_features = [] | |
| y_labels = [] | |
| for f in files: | |
| try: | |
| basename = os.path.basename(f).lower() | |
| label = 1 if 'murmur' in basename or 'abnormal' in basename else 0 | |
| y, sr = librosa.load(f, sr=TARGET_SR, mono=True) | |
| y = librosa.util.normalize(y) | |
| y_clean = apply_clinical_bandpass(y, sr) | |
| target_length = TARGET_SR * AUDIO_LENGTH_SEC | |
| if len(y_clean) > target_length: | |
| y_clean = y_clean[:target_length] | |
| else: | |
| y_clean = np.pad(y_clean, (0, target_length - len(y_clean))) | |
| feats = extract_statistical_features(y_clean, sr) | |
| X_features.append(feats) | |
| y_labels.append(label) | |
| except Exception as e: | |
| print(f"Error processing {f}: {e}") | |
| df = pd.DataFrame(X_features) | |
| labels = np.array(y_labels) | |
| print(f"Successfully processed {len(df)} canine recordings.") | |
| return df, labels | |
| def evaluate_model(y_true, y_pred): | |
| acc = accuracy_score(y_true, y_pred) | |
| cm = confusion_matrix(y_true, y_pred, labels=[0, 1]) | |
| if cm.shape == (2, 2): | |
| tn, fp, fn, tp = cm.ravel() | |
| sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0.0 | |
| specificity = tn / (tn + fp) if (tn + fp) > 0 else 0.0 | |
| else: | |
| # Handle all one class cases for tiny datasets | |
| sensitivity = 0.0 | |
| specificity = 0.0 | |
| return acc, sensitivity, specificity, cm | |
| def train_and_evaluate(): | |
| X, y = load_dataset() | |
| if X is None: return | |
| # Feature Scaling is critical for SVM and Logistic Regression | |
| scaler = StandardScaler() | |
| feature_names = X.columns | |
| X_scaled = scaler.fit_transform(X) | |
| X_scaled = pd.DataFrame(X_scaled, columns=feature_names) | |
| joblib.dump(scaler, "weights/scaler.pkl") | |
| joblib.dump(list(feature_names), "weights/feature_columns.pkl") | |
| # Strictly 70/30 split | |
| X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42) | |
| print(f"\n--- Training on {len(X_train)} samples, Testing on {len(X_test)} samples (70/30 Split) ---") | |
| models = { | |
| "Logistic Regression": LogisticRegression(max_iter=1000, random_state=42), | |
| "Random Forest": RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42), | |
| "SVM (RBF)": SVC(kernel='rbf', probability=True, random_state=42) | |
| } | |
| results = {} | |
| y_preds_all = {} | |
| y_proba_all = {} | |
| for name, model in models.items(): | |
| print(f"\nTraining {name}...") | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_test) | |
| y_proba = model.predict_proba(X_test)[:, 1] | |
| y_preds_all[name] = y_pred | |
| y_proba_all[name] = y_proba | |
| acc, sens, spec, cm = evaluate_model(y_test, y_pred) | |
| results[name] = { | |
| "Accuracy": acc, | |
| "Sensitivity": sens, | |
| "Specificity": spec, | |
| "CM": cm | |
| } | |
| print(f"Accuracy: {acc*100:.1f}%") | |
| print(f"Sensitivity: {sens*100:.1f}%") | |
| print(f"Specificity: {spec*100:.1f}%") | |
| filename = name.lower().replace(" ", "_").replace("(", "").replace(")", "") | |
| joblib.dump(model, f"weights/canine_{filename}.pkl") | |
| # 1. Output ROC Curve Plot | |
| plt.figure(figsize=(8, 6)) | |
| for name, y_proba in y_proba_all.items(): | |
| fpr, tpr, _ = roc_curve(y_test, y_proba) | |
| roc_auc = auc(fpr, tpr) | |
| plt.plot(fpr, tpr, lw=2, label=f'{name} (AUC = {roc_auc:.2f})') | |
| plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') | |
| plt.xlim([0.0, 1.0]) | |
| plt.ylim([0.0, 1.05]) | |
| plt.xlabel('False Positive Rate (1 - Specificity)') | |
| plt.ylabel('True Positive Rate (Sensitivity)') | |
| plt.title('Receiver Operating Characteristic (ROC) Comparison') | |
| plt.legend(loc="lower right") | |
| plt.grid(True, alpha=0.3) | |
| plt.savefig('metrics/roc_curve.png') | |
| plt.close() | |
| # 2. Confusion Matrices Plot | |
| fig, axes = plt.subplots(1, 3, figsize=(15, 4)) | |
| for ax, (name, res) in zip(axes, results.items()): | |
| sns.heatmap(res["CM"], annot=True, fmt='d', cmap='Blues', ax=ax, cbar=False) | |
| ax.set_title(f'{name}\nAcc: {res["Accuracy"]:.2f}') | |
| ax.set_xlabel('Predicted Label') | |
| ax.set_ylabel('True Label') | |
| ax.set_xticklabels(['Normal (0)', 'Murmur (1)']) | |
| ax.set_yticklabels(['Normal (0)', 'Murmur (1)']) | |
| plt.tight_layout() | |
| plt.savefig('metrics/confusion_matrix.png') | |
| plt.close() | |
| # 3. Random Forest Feature Importance Plot | |
| rf_model = models["Random Forest"] | |
| importances = rf_model.feature_importances_ | |
| indices = np.argsort(importances)[::-1][:15] # Top 15 features | |
| plt.figure(figsize=(10, 6)) | |
| plt.title("Top 15 Feature Importances (Random Forest)") | |
| plt.bar(range(15), importances[indices], align="center", color='skyblue', edgecolor='black') | |
| plt.xticks(range(15), [feature_names[i] for i in indices], rotation=45, ha='right') | |
| plt.xlim([-1, 15]) | |
| plt.tight_layout() | |
| plt.savefig('metrics/feature_importance.png') | |
| plt.close() | |
| # 4. Model Agreement (Kappa between RF and SVM) | |
| kappa = cohen_kappa_score(y_preds_all["Random Forest"], y_preds_all["SVM (RBF)"]) | |
| print(f"\n--- Model Agreement ---") | |
| print(f"Cohen's Kappa (Random Forest vs SVM): {kappa:.3f}") | |
| print("\nTraining Pipeline Complete.") | |
| print("Interpretable Models saved to weights/") | |
| print("Clinical visual metrics saved to metrics/") | |
| if __name__ == "__main__": | |
| train_and_evaluate() | |