Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import gc | |
| from joblib import Parallel, delayed | |
| import joblib | |
| import mlflow | |
| import mlflow.keras | |
| import numpy as np | |
| import pandas as pd | |
| import librosa | |
| import librosa.display | |
| import optuna | |
| from tqdm import tqdm | |
| import matplotlib.pyplot as plt | |
| from sklearn.metrics import roc_auc_score, roc_curve, confusion_matrix, classification_report | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import LabelEncoder | |
| from keras.models import Sequential | |
| from keras.utils import to_categorical, normalize | |
| from keras.layers import Conv2D, Dense, MaxPooling2D, Dropout, BatchNormalization, GlobalAveragePooling2D | |
| from keras.layers import Conv1D, MaxPooling1D,GlobalAveragePooling1D | |
| from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping | |
| from tensorflow.keras.preprocessing.image import ImageDataGenerator | |
| from imblearn.over_sampling import SMOTE | |
| from scipy.signal import butter, sosfilt | |
| import argparse | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| data_logger = logging.getLogger("data_loading") | |
| processing_logger = logging.getLogger("data_processing") | |
| model_logger = logging.getLogger("model_training") | |
| def load_data(diagnosis_path='/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database/patient_diagnosis.csv', | |
| demographic_path='/kaggle/input/respiratory-sound-database/demographic_info.txt'): | |
| """Load patient diagnosis and demographic data.""" | |
| data_logger.info("Loading patient diagnosis and demographic data.") | |
| # Load diagnosis data | |
| diagnosis_df = pd.read_csv(diagnosis_path, | |
| names=['Patient number', 'Diagnosis']) | |
| # Load demographic data | |
| patient_df = pd.read_csv(demographic_path, | |
| names=['Patient number', 'Age', 'Sex', 'Adult BMI (kg/m2)', 'Child Weight (kg)', 'Child Height (cm)'], | |
| delimiter=' ') | |
| data_logger.info("Data successfully loaded.") | |
| # Merge and return | |
| return pd.merge(left=patient_df, right=diagnosis_df, how='left') | |
| def process_audio_metadata(folder_path): | |
| """Extract audio metadata from filenames.""" | |
| processing_logger.info("Extracting audio metadata from filenames.") | |
| data = [] | |
| for filename in os.listdir(folder_path): | |
| if filename.endswith('.txt'): | |
| parts = filename.split('_') | |
| data.append({ | |
| 'Patient number': int(parts[0]), | |
| 'Recording index': parts[1], | |
| 'Chest location': parts[2], | |
| 'Acquisition mode': parts[3], | |
| 'Recording equipment': parts[4].split('.')[0] | |
| }) | |
| processing_logger.info("Audio metadata extraction complete.") | |
| return pd.DataFrame(data) | |
| def merge_datasets(df1, df2): | |
| """Merge metadata and diagnosis data.""" | |
| processing_logger.info("Merging metadata and diagnosis data.") | |
| merged_df = pd.merge(left=df1, right=df2, how='left').sort_values('Patient number').reset_index(drop=True) | |
| merged_df['audio_file_name'] = merged_df.apply(lambda row: f"{row['Patient number']}_{row['Recording index']}_{row['Chest location']}_{row['Acquisition mode']}_{row['Recording equipment']}.wav", axis=1) | |
| processing_logger.info("Merging complete.") | |
| return merged_df | |
| def filter_and_sample_data(df, mode='binary'): | |
| """ | |
| Filter and sample the dataset for binary or multi-class classification. | |
| Returns filtered and processed DataFrame. | |
| """ | |
| processing_logger.info(f"Filtering and sampling the dataset for {mode} classification.") | |
| if mode == 'binary': | |
| # Binary classification: Normal vs. Abnormal | |
| df['Diagnosis'] = df['Diagnosis'].apply(lambda x: 'Normal' if x == 'Healthy' else 'Abnormal') | |
| elif mode == 'multi': | |
| # Multi-class classification: Group classes | |
| # I grouped disease based on their similarities | |
| processing_logger.info("Grouping classes for multi-class classification.") | |
| df['Diagnosis'] = df['Diagnosis'].replace({ | |
| 'Healthy': 'Normal', | |
| 'COPD': 'Chronic Respiratory Diseases', | |
| 'Asthma': 'Chronic Respiratory Diseases', | |
| 'URTI': 'Respiratory Infections', | |
| 'Bronchiolitis': 'Respiratory Infections', | |
| 'LRTI': 'Respiratory Infections', | |
| 'Pneumonia': 'Respiratory Infections', | |
| 'Bronchiectasis': 'Respiratory Infections' | |
| }) | |
| # Filter out rare classes with fewer than 5 samples | |
| class_counts = df['Diagnosis'].value_counts() | |
| valid_classes = class_counts[class_counts >= 5].index | |
| df = df[df['Diagnosis'].isin(valid_classes)].reset_index(drop=True) | |
| processing_logger.info(f"Filtered classes: {df['Diagnosis'].unique()}") | |
| processing_logger.info(f"Filtering and sampling complete with mode={mode}.") | |
| return df | |
| def prepare_dataset_augmented(df_filtered, audio_files_path, classification_mode): | |
| """Prepare the dataset for augmented features. it will be 1D array""" | |
| processing_logger.info("Preparing dataset with AUGMENTED pipeline.") | |
| # Extract features and labels | |
| X, y = mfccs_feature_extraction(audio_files_path, df_filtered) | |
| # Apply label encoding | |
| le = LabelEncoder() | |
| y_encoded = le.fit_transform(np.array(y)) # Encode labels to integers | |
| if classification_mode == "binary": | |
| # Use single column with 0 and 1 for binary classification | |
| processing_logger.info("Binary classification mode: Using single column labels (0/1).") | |
| y_processed = y_encoded # No one-hot encoding | |
| else: | |
| # One-hot encode labels for multi-class classification | |
| processing_logger.info("Multi-class classification mode: Applying one-hot encoding.") | |
| y_processed = to_categorical(y_encoded) | |
| # Log the mapping of one-hot encoding to class labels | |
| print("One-hot encoding mapping:") | |
| for idx, label in enumerate(le.classes_): | |
| print(f"{idx} -> {label}") | |
| processing_logger.info("Dataset preparation with augmented pipeline complete.") | |
| return X, y_processed, le | |
| def mfccs_feature_extraction(audio_files_path, df_filtered, n_jobs=-1): | |
| """ | |
| Make the process of MFCC feature extraction faster by running jobs in-parallel | |
| Returns array of features extracted from the audio files and Array of target labels. | |
| """ | |
| processing_logger.info(f"Processing audio files in: {audio_files_path}") | |
| files = [file for file in os.listdir(audio_files_path) if file.endswith('.wav') and file[:3] not in ['103', '108', '115']] | |
| #files = files[:30] ## DEBUG | |
| # Use Parallel and delayed to process files in parallel | |
| results = Parallel(n_jobs=n_jobs, backend="loky")(delayed(process_audio_file)(file, audio_files_path, df_filtered) for file in tqdm(files, desc="Processing audio files")) | |
| # Flatten results | |
| X_ = [] | |
| y_ = [] | |
| for X_local, y_local in results: | |
| X_.extend(X_local) | |
| y_.extend(y_local) | |
| X_data = np.array(X_) | |
| y_data = np.array(y_) | |
| processing_logger.info("MFCC feature extraction and augmentation complete.") | |
| return X_data, y_data | |
| def process_audio_file(soundDir, audio_files_path, df_filtered): | |
| """ | |
| Process a single audio file: extract MFCC features and augment with noise, stretching, and shifting. | |
| """ | |
| X_local = [] | |
| y_local = [] | |
| features = 52 | |
| # Extract patient ID and disease from filename and DataFrame | |
| patient_id = int(soundDir.split('_')[0]) | |
| disease = df_filtered.loc[df_filtered['Patient number'] == patient_id, 'Diagnosis'].values[0] | |
| # Load audio file | |
| data_x, sampling_rate = librosa.load(os.path.join(audio_files_path, soundDir), sr=None) | |
| data_x = preprocess_audio(data_x, sampling_rate) # Apply filtering | |
| mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T, axis=0) | |
| X_local.append(mfccs) | |
| y_local.append(disease) | |
| # Data augmentation | |
| for augmentation in [add_noise, shift, stretch, pitch_shift]: | |
| if augmentation == add_noise: | |
| augmented_data = augmentation(data_x, 0.001) | |
| elif augmentation == shift: | |
| augmented_data = augmentation(data_x, 1600) | |
| elif augmentation == stretch: | |
| augmented_data = augmentation(data_x, 1.2) | |
| elif augmentation == pitch_shift: | |
| augmented_data = augmentation(data_x, sampling_rate, 3) | |
| mfccs_augmented = np.mean(librosa.feature.mfcc(y=augmented_data, sr=sampling_rate, n_mfcc=features).T, axis=0) | |
| X_local.append(mfccs_augmented) | |
| y_local.append(disease) | |
| return X_local, y_local | |
| def add_noise(data,x): | |
| noise = np.random.randn(len(data)) | |
| data_noise = data + x * noise | |
| return data_noise | |
| def shift(data, x): | |
| return np.roll(data, int(x)) | |
| def stretch(data, rate): | |
| return librosa.effects.time_stretch(data, rate=rate) | |
| def pitch_shift (data , sr, rate): | |
| return librosa.effects.pitch_shift(data, sr=sr, n_steps=rate) | |
| def prepare_dataset_parallel(df, audio_files_path, mode, classification_mode): | |
| """Prepare the dataset by extracting features from audio files in parallel.""" | |
| processing_logger.info(f"Preparing dataset using {mode} features in parallel.") | |
| results = Parallel(n_jobs=-1)(delayed(preprocess_file)(row, audio_files_path, mode) for _, row in tqdm(df.iterrows(), total=len(df))) | |
| X, y = zip(*results) | |
| X = np.array(X) | |
| X = np.expand_dims(X, axis=-1) # Add channel dimension | |
| X = normalize(X, axis=1) | |
| le = LabelEncoder() | |
| y_encoded = le.fit_transform(np.array(y)) # Encode labels | |
| if classification_mode == "binary": | |
| # Use single column with 0 and 1 for binary classification | |
| processing_logger.info("Binary classification mode: Using single column labels (0/1).") | |
| y = y_encoded # No one-hot encoding | |
| else: | |
| # One-hot encode labels for multi-class classification | |
| processing_logger.info("Multi-class classification mode: Applying one-hot encoding.") | |
| y = to_categorical(y_encoded) | |
| processing_logger.info(f"Dataset preparation using {mode} complete.") | |
| return X, y, le | |
| def preprocess_file(row, audio_files_path, mode): | |
| """Preprocess a single audio file.""" | |
| file_path = os.path.join(audio_files_path, row['audio_file_name']) | |
| feature = preprocessing(file_path, mode) | |
| label = row['Diagnosis'] | |
| return feature, label | |
| def preprocessing(audio_file, mode): | |
| """Preprocess audio file by resampling, padding/truncating, and extracting features.""" | |
| sr_new = 16000 # Resample audio to 16 kHz | |
| x, sr = librosa.load(audio_file, sr=sr_new) | |
| x = preprocess_audio(x, sr) | |
| # Padding or truncating to 5 seconds (5 * sr_new samples) | |
| max_len = 5 * sr_new | |
| if x.shape[0] < max_len: | |
| x = np.pad(x, (0, max_len - x.shape[0])) | |
| else: | |
| x = x[:max_len] | |
| # Extract features | |
| # I understand the common choice for n_mfcc is 13, but here i assumed we need to capture more informationm, therefore I choose 20. | |
| if mode == 'mfcc': | |
| feature = librosa.feature.mfcc(y=x, sr=sr_new, n_mfcc=20) # Ensure consistent shape | |
| elif mode == 'log_mel': | |
| feature = librosa.feature.melspectrogram(y=x, sr=sr_new, n_mels=20, fmax=8000) # Match n_mels to 20 | |
| feature = librosa.power_to_db(feature, ref=np.max) | |
| return feature | |
| def oversample_data(X, y): | |
| """Apply SMOTE to balance classes.""" | |
| processing_logger.info("Applying SMOTE to balance classes.") | |
| # Save the original shape of features | |
| original_shape = X.shape[1:] | |
| # Flatten for SMOTE processing | |
| X = X.reshape((X.shape[0], -1)) | |
| # Convert one-hot encoded labels to integers | |
| y = np.argmax(y, axis=1) | |
| # Apply SMOTE | |
| smote = SMOTE(random_state=42) | |
| X_resampled, y_resampled = smote.fit_resample(X, y) | |
| # Reshape back to the original dimensions | |
| X_resampled = X_resampled.reshape((-1, *original_shape)) | |
| # Convert labels back to one-hot encoding | |
| y_resampled = to_categorical(y_resampled) | |
| processing_logger.info("SMOTE oversampling complete.") | |
| return X_resampled, y_resampled | |
| def build_model(input_shape, n_filters, dense_units, dropout_rate, num_classes, model_type='1D', classification_mode='binary'): | |
| """ | |
| Build and compile a CNN model for 1D or 2D data. | |
| Returns CNN model. | |
| """ | |
| print(f"Building the updated {model_type} CNN model with {classification_mode} classification.") | |
| model = Sequential() | |
| # Add convolutional layers based on the model type | |
| if model_type == '1D': | |
| # 1D CNN layers | |
| model.add(Conv1D(n_filters, kernel_size=3, activation='relu', input_shape=input_shape)) | |
| model.add(BatchNormalization()) | |
| model.add(MaxPooling1D(pool_size=2)) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(Conv1D(n_filters * 2, kernel_size=3, activation='relu')) | |
| model.add(BatchNormalization()) | |
| model.add(MaxPooling1D(pool_size=2)) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(Conv1D(n_filters * 4, kernel_size=3, activation='relu')) | |
| model.add(BatchNormalization()) | |
| model.add(GlobalAveragePooling1D()) | |
| model.add(Dropout(dropout_rate)) | |
| elif model_type == '2D': | |
| # 2D CNN layers | |
| model.add(Conv2D(n_filters, (3, 3), activation='relu', input_shape=input_shape)) | |
| model.add(BatchNormalization()) | |
| if input_shape[0] >= 2: | |
| model.add(MaxPooling2D((2, 2))) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(Conv2D(n_filters * 2, (3, 3), activation='relu')) | |
| model.add(BatchNormalization()) | |
| if input_shape[0] >= 4: | |
| model.add(MaxPooling2D((2, 2))) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(Conv2D(n_filters * 4, (3, 3), activation='relu')) | |
| model.add(BatchNormalization()) | |
| model.add(GlobalAveragePooling2D()) | |
| model.add(Dropout(dropout_rate)) | |
| else: | |
| raise ValueError("Invalid model_type. Must be '1D' or '2D'.") | |
| # Add fully connected layers | |
| model.add(Dense(dense_units, activation='relu')) | |
| model.add(BatchNormalization()) | |
| model.add(Dropout(dropout_rate)) | |
| # Add output layer dynamically based on classification mode | |
| if classification_mode == 'binary': | |
| # Binary classification: Single unit with sigmoid activation | |
| model.add(Dense(1, activation='sigmoid')) | |
| loss_function = 'binary_crossentropy' | |
| else: | |
| # Multi-class classification: num_classes units with softmax activation | |
| model.add(Dense(num_classes, activation='softmax')) | |
| loss_function = 'categorical_crossentropy' | |
| # Compile the model | |
| model.compile(optimizer='adam', loss=loss_function, metrics=['accuracy']) | |
| print(f"{model_type} CNN model built and compiled successfully for {classification_mode} classification.") | |
| return model | |
| def log_metrics(y_true, y_pred, mode): | |
| """Log evaluation metrics.""" | |
| precision = classification_report(y_true, y_pred, output_dict=True)['weighted avg']['precision'] | |
| recall = classification_report(y_true, y_pred, output_dict=True)['weighted avg']['recall'] | |
| f1_score = classification_report(y_true, y_pred, output_dict=True)['weighted avg']['f1-score'] | |
| mlflow.log_metric(f"{mode}_precision", precision) | |
| mlflow.log_metric(f"{mode}_recall", recall) | |
| mlflow.log_metric(f"{mode}_f1_score", f1_score) | |
| def track_experiment_with_mlflow_and_optuna(mode, num_classes, model_type='1D', classification_mode='binary'): | |
| """ | |
| Optimize hyperparameters using Optuna and track experiments with MLflow. | |
| mode: Feature extraction mode (e.g., 'augmented', 'mfcc', 'log_mel'). | |
| num_classes: Number of classes for classification. | |
| model_type: Type of model ('1D' for Conv1D, '2D' for Conv2D). | |
| classification_mode: 'binary' for binary classification, 'multi' for multi-class classification. | |
| """ | |
| def objective(trial): | |
| with mlflow.start_run(nested=True): # Start a new MLflow run for each trial | |
| # Hyperparameters to tune | |
| n_filters = trial.suggest_categorical('n_filters', [16, 32, 64]) | |
| dense_units = trial.suggest_int('dense_units', 64, 256, step=32) | |
| dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5, step=0.1) | |
| learning_rate = trial.suggest_loguniform('learning_rate', 1e-4, 1e-2) | |
| # Build and compile the model | |
| model = build_model( | |
| input_shape=X_train.shape[1:], | |
| n_filters=n_filters, | |
| dense_units=dense_units, | |
| dropout_rate=dropout_rate, | |
| num_classes=num_classes, | |
| model_type=model_type, | |
| classification_mode=classification_mode | |
| ) | |
| # Define EarlyStopping callback | |
| early_stopping = EarlyStopping( | |
| monitor='val_loss', # Monitor validation loss | |
| patience=5, # Stop training after 5 epochs with no improvement | |
| restore_best_weights=True | |
| ) | |
| # Train the model | |
| history = model.fit( | |
| X_train, y_train, | |
| validation_data=(X_val, y_val), | |
| epochs=50, # Allow a larger max epoch since EarlyStopping will handle early termination | |
| batch_size=32, | |
| callbacks=[early_stopping], | |
| verbose=0 | |
| ) | |
| # Log hyperparameters and metrics to MLflow | |
| mlflow.log_params({ | |
| 'n_filters': n_filters, | |
| 'dense_units': dense_units, | |
| 'dropout_rate': dropout_rate, | |
| 'learning_rate': learning_rate, | |
| 'model_type': model_type, | |
| 'classification_mode': classification_mode | |
| }) | |
| mlflow.log_metric("best_val_accuracy", max(history.history['val_accuracy'])) | |
| # Save training and validation loss curves | |
| plt.figure() | |
| plt.plot(history.history['loss'], label='Train Loss') | |
| plt.plot(history.history['val_loss'], label='Validation Loss') | |
| plt.legend() | |
| plt.title("Training and Validation Loss") | |
| loss_curve_path = f"loss_curve_{trial.number}_{model_type}.png" | |
| plt.savefig(loss_curve_path) | |
| mlflow.log_artifact(loss_curve_path) | |
| return max(history.history['val_accuracy']) | |
| # Start Optuna study | |
| study = optuna.create_study(direction='maximize') | |
| study.optimize(objective, n_trials=20) | |
| # Retrieve best trial and log results | |
| best_trial = study.best_trial | |
| model_logger.info(f"Best Trial for {mode} ({model_type}): {best_trial.params}") | |
| # Build the best model (already compiled in build_model) | |
| best_model = build_model( | |
| input_shape=X_train.shape[1:], | |
| n_filters=best_trial.params['n_filters'], | |
| dense_units=best_trial.params['dense_units'], | |
| dropout_rate=best_trial.params['dropout_rate'], | |
| num_classes=num_classes, | |
| model_type=model_type, | |
| classification_mode=classification_mode | |
| ) | |
| # Train the best model with EarlyStopping | |
| early_stopping = EarlyStopping( | |
| monitor='val_loss', | |
| patience=5, | |
| restore_best_weights=True | |
| ) | |
| best_model.fit( | |
| X_train, y_train, | |
| validation_data=(X_val, y_val), | |
| epochs=50, batch_size=32, | |
| callbacks=[early_stopping], | |
| verbose=1 | |
| ) | |
| # Save the best model | |
| best_model_path = f"best_model_{mode}_{model_type}.h5" | |
| best_model.save(best_model_path) | |
| mlflow.log_artifact(best_model_path) | |
| model_logger.info(f"Best model for {mode} ({model_type}) saved successfully.") | |
| return best_model | |
| def log_class_distribution(y, message): | |
| """Log the class distribution.""" | |
| if y.ndim == 1: # Binary classification (1D array of 0s and 1s) | |
| unique, counts = np.unique(y, return_counts=True) | |
| else: # Multi-class classification (2D one-hot encoded array) | |
| unique, counts = np.unique(np.argmax(y, axis=1), return_counts=True) | |
| class_distribution = dict(zip(unique, counts)) | |
| processing_logger.info(f"{message} Class Distribution: {class_distribution}") | |
| def preprocess_audio(audio, sr): | |
| """ | |
| Apply a bandpass filter to audio data. | |
| """ | |
| # Define cutoff frequencies | |
| low_cutoff = 50 # 50 Hz | |
| high_cutoff = min(5000, sr / 2 - 1) # Ensure it is below Nyquist frequency | |
| if low_cutoff >= high_cutoff: | |
| raise ValueError( | |
| f"Invalid filter range: low_cutoff={low_cutoff}, high_cutoff={high_cutoff} for sampling rate {sr}" | |
| ) | |
| # Design a bandpass filter | |
| sos = butter(N=10, Wn=[low_cutoff, high_cutoff], btype='band', fs=sr, output='sos') | |
| # Apply the filter | |
| filtered_audio = sosfilt(sos, audio) | |
| return filtered_audio | |
| def generate_random_audio_data(samples=20000, feature_dim=20): | |
| """Generate random audio-like data for testing purposes.""" | |
| X = np.random.rand(samples, feature_dim, feature_dim) # Simulate 2D audio features | |
| y = np.random.randint(0, 2, size=samples) # Binary classification labels | |
| return X, y | |
| def test_model(): | |
| """Test 2D CNN model with simulated audio data for debugging.""" | |
| print("[DEBUG] Generating simulated audio data...") | |
| global X_train, X_val, X_test, y_train, y_val, y_test | |
| X, y = generate_random_audio_data() | |
| # Simulate preprocessing similar to audio processing pipeline | |
| print("[DEBUG] Preprocessing simulated audio data...") | |
| X_preprocessed = np.array([np.log1p(sample) for sample in X]) # Simulate a log transform or feature extraction | |
| # Split data into train, validation, and test sets | |
| X_train, X_temp, y_train, y_temp = train_test_split(X_preprocessed, y, test_size=0.3, stratify=y, random_state=42) | |
| X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42) | |
| print(f"[DEBUG] Data split: Training={X_train.shape}, Validation={X_val.shape}, Test={X_test.shape}") | |
| # Expand dimensions for 2D CNN input | |
| X_train = np.expand_dims(X_train, axis=-1) | |
| X_val = np.expand_dims(X_val, axis=-1) | |
| X_test = np.expand_dims(X_test, axis=-1) | |
| print("[DEBUG] Initializing 2D CNN model...") | |
| model = track_experiment_with_mlflow_and_optuna( | |
| mode='mfcc', | |
| num_classes=1, | |
| model_type='2D', # Specify 2D CNN for MFCC and Log-Mel | |
| classification_mode='binary' | |
| ) | |
| print("[DEBUG] Training the model...") | |
| # Train the model with a single epoch for testing | |
| model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=1, batch_size=32) | |
| print("[DEBUG] Evaluating the model...") | |
| results = model.evaluate(X_test, y_test) | |
| print(f"[DEBUG] Test evaluation results: {results}") | |
| def main(): | |
| # how to run: | |
| # python legacy/test.py --metadata_path data/Respiratory_Sound_Database/audio_and_txt_files --audio_files_path data/Respiratory_Sound_Database/audio_and_txt_files --demographic_path data/demographic_info.txt --diagnosis_path data/Respiratory_Sound_Database/patient_diagnosis.csv --classification_modes binary --feature_types mfcc | |
| # Parse arguments | |
| parser = argparse.ArgumentParser(description="Run the respiratory sound analysis pipeline.") | |
| parser.add_argument("--metadata_path", type=str, default="/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database/audio_and_txt_files", help="Path to the metadata directory.") | |
| parser.add_argument("--audio_files_path", type=str, default="/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database/audio_and_txt_files", help="Path to the directory containing audio files.") | |
| parser.add_argument("--demographic_path", type=str, default="/kaggle/input/respiratory-sound-database/demographic_info.txt", help="Path to the demographic info file.") | |
| parser.add_argument("--diagnosis_path", type=str, default="/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database/patient_diagnosis.csv", help="Path to the patient diagnosis CSV file.") | |
| parser.add_argument("--tracking_uri", type=str, default="./mlruns", help="MLflow tracking URI.") | |
| parser.add_argument("--classification_modes", type=str, nargs='+', default=['multi', 'binary'], help="Classification modes to run (default: all modes). Options: 'binary', 'multi'.") | |
| parser.add_argument("--feature_types", type=str, nargs='+', default=['mfcc', 'log_mel', 'augmented'], help="Feature types to use (default: all types). Options: 'mfcc', 'log_mel', 'augmented'.") | |
| parser.add_argument("--debug", action='store_true', help="Run in debug mode with random test data.") | |
| args = parser.parse_args() | |
| if args.debug: | |
| test_model() | |
| return | |
| # Assign arguments to variables | |
| metadata_path = args.metadata_path | |
| audio_files_path = args.audio_files_path | |
| demographic_path = args.demographic_path | |
| diagnosis_path = args.diagnosis_path | |
| # Set MLflow tracking URI | |
| mlflow.set_tracking_uri(args.tracking_uri) | |
| metadata_path = args.metadata_path | |
| audio_files_path = args.audio_files_path | |
| data_logger.info("Starting data pipeline.") | |
| df = load_data(demographic_path=demographic_path, diagnosis_path=diagnosis_path) | |
| audio_metadata = process_audio_metadata(audio_files_path) | |
| df_all = merge_datasets(audio_metadata, df) | |
| # Use user-specified or default classification modes and feature types | |
| classification_modes = args.classification_modes | |
| feature_types = args.feature_types | |
| models = [] | |
| for classification_mode in classification_modes: | |
| # Preprocess dataset for binary or multi-class classification | |
| df_filtered = filter_and_sample_data(df_all, mode=classification_mode) | |
| processing_logger.info(f"Dataset shape for {classification_mode} mode: {df_filtered.shape}") | |
| for feature_type in feature_types: | |
| processing_logger.info(f"Running experiment for {classification_mode} classification with {feature_type} features.") | |
| global X_train, X_val, X_test, y_train, y_val, y_test | |
| # Prepare the dataset | |
| if feature_type == 'augmented': | |
| X, y, le = prepare_dataset_augmented( | |
| df_filtered, | |
| audio_files_path, | |
| classification_mode=classification_mode | |
| ) | |
| else: | |
| X, y, le = prepare_dataset_parallel( | |
| df_filtered, | |
| audio_files_path, | |
| mode=feature_type, | |
| classification_mode=classification_mode | |
| ) | |
| # Split data into train/val/test | |
| X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, stratify=y, random_state=42) | |
| X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42) | |
| # Save test data for future evaluation | |
| np.save(f"X_test_{classification_mode}_{feature_type}.npy", X_test) | |
| np.save(f"y_test_{classification_mode}_{feature_type}.npy", y_test) | |
| mlflow.log_artifact(f"X_test_{classification_mode}_{feature_type}.npy") | |
| mlflow.log_artifact(f"y_test_{classification_mode}_{feature_type}.npy") | |
| # Log dataset characteristics | |
| log_class_distribution(y_train, "Before Oversampling") | |
| processing_logger.info(f"Train size: {X_train.shape}, Validation size: {X_val.shape}, Test size: {X_test.shape}") | |
| try: | |
| X_train, y_train = oversample_data(X_train, y_train) | |
| except ValueError as e: | |
| processing_logger.warning(f"SMOTE skipped: {e}") | |
| log_class_distribution(y_train, "After Oversampling") | |
| # Determine number of classes | |
| if classification_mode == "binary": | |
| num_classes = 1 # Single output for binary classification | |
| else: | |
| num_classes = y_train.shape[1] # Number of classes for multi-class | |
| # Train and save model | |
| with mlflow.start_run(run_name=f"Experiment_{classification_mode}_{feature_type}", nested=True): | |
| if feature_type == 'augmented': | |
| # Expand dimensions for 1D CNN input | |
| X_train = np.expand_dims(X_train, axis=-1) | |
| X_val = np.expand_dims(X_val, axis=-1) | |
| X_test = np.expand_dims(X_test, axis=-1) | |
| # Optimize and train 1D CNN | |
| model = track_experiment_with_mlflow_and_optuna( | |
| mode=feature_type, | |
| num_classes=num_classes, | |
| model_type='1D', # Specify 1D CNN for GRU features | |
| classification_mode=classification_mode | |
| ) | |
| else: | |
| # Optimize and train CNN models for MFCC and MEL | |
| model = track_experiment_with_mlflow_and_optuna( | |
| mode=feature_type, | |
| num_classes=num_classes, | |
| model_type='2D', # Specify 2D CNN for MFCC and Log-Mel | |
| classification_mode=classification_mode | |
| ) | |
| # Save final model | |
| final_model_path = f"final_model_{classification_mode}_{feature_type}.h5" | |
| model.save(final_model_path) | |
| mlflow.log_artifact(final_model_path) | |
| models.append(model) | |
| processing_logger.info("All experiments completed successfully!") | |
| if __name__ == "__main__": | |
| main() | |