Spaces:
Runtime error
Runtime error
| """ | |
| Deep learning models for respiratory disease detection. | |
| Includes CNN and LSTM architectures. | |
| """ | |
| import numpy as np | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from tensorflow.keras import layers, models, callbacks | |
| from tensorflow.keras.utils import to_categorical | |
| from typing import Tuple, Optional, Dict | |
| import pickle | |
| from pathlib import Path | |
| class CNNModel: | |
| """Convolutional Neural Network for audio classification.""" | |
| def __init__(self, input_shape: Tuple, num_classes: int, model_name: str = "cnn_model"): | |
| """ | |
| Initialize CNN model. | |
| Args: | |
| input_shape: Shape of input (height, width, channels) | |
| num_classes: Number of output classes | |
| model_name: Name of the model | |
| """ | |
| self.input_shape = input_shape | |
| self.num_classes = num_classes | |
| self.model_name = model_name | |
| self.model = None | |
| self.history = None | |
| def build_model(self, dropout_rate: float = 0.3): | |
| """ | |
| Build CNN architecture. | |
| Args: | |
| dropout_rate: Dropout rate for regularization | |
| """ | |
| model = models.Sequential(name=self.model_name) | |
| # First convolutional block | |
| model.add(layers.Conv2D(32, (3, 3), activation='relu', | |
| padding='same', input_shape=self.input_shape)) | |
| model.add(layers.BatchNormalization()) | |
| model.add(layers.MaxPooling2D((2, 2))) | |
| model.add(layers.Dropout(dropout_rate)) | |
| # Second convolutional block | |
| model.add(layers.Conv2D(64, (3, 3), activation='relu', padding='same')) | |
| model.add(layers.BatchNormalization()) | |
| model.add(layers.MaxPooling2D((2, 2))) | |
| model.add(layers.Dropout(dropout_rate)) | |
| # Third convolutional block | |
| model.add(layers.Conv2D(128, (3, 3), activation='relu', padding='same')) | |
| model.add(layers.BatchNormalization()) | |
| model.add(layers.MaxPooling2D((2, 2))) | |
| model.add(layers.Dropout(dropout_rate)) | |
| # Fourth convolutional block | |
| model.add(layers.Conv2D(256, (3, 3), activation='relu', padding='same')) | |
| model.add(layers.BatchNormalization()) | |
| model.add(layers.GlobalAveragePooling2D()) | |
| # Dense layers | |
| model.add(layers.Dense(256, activation='relu')) | |
| model.add(layers.Dropout(dropout_rate)) | |
| model.add(layers.Dense(128, activation='relu')) | |
| model.add(layers.Dropout(dropout_rate)) | |
| # Output layer | |
| if self.num_classes == 2: | |
| model.add(layers.Dense(1, activation='sigmoid')) | |
| else: | |
| model.add(layers.Dense(self.num_classes, activation='softmax')) | |
| self.model = model | |
| print(f"\n{self.model_name} architecture:") | |
| self.model.summary() | |
| return model | |
| def compile_model(self, learning_rate: float = 0.001): | |
| """Compile the model.""" | |
| if self.model is None: | |
| raise ValueError("Model must be built before compilation") | |
| optimizer = keras.optimizers.Adam(learning_rate=learning_rate) | |
| if self.num_classes == 2: | |
| loss = 'binary_crossentropy' | |
| metrics = ['accuracy', keras.metrics.AUC(name='auc')] | |
| else: | |
| loss = 'sparse_categorical_crossentropy' | |
| metrics = ['accuracy'] | |
| self.model.compile( | |
| optimizer=optimizer, | |
| loss=loss, | |
| metrics=metrics | |
| ) | |
| print(f"Model compiled with optimizer={optimizer.__class__.__name__}, loss={loss}") | |
| def train(self, X_train: np.ndarray, y_train: np.ndarray, | |
| X_val: np.ndarray, y_val: np.ndarray, | |
| epochs: int = 50, batch_size: int = 32, | |
| model_dir: str = 'models'): | |
| """ | |
| Train the CNN model. | |
| Args: | |
| X_train: Training features | |
| y_train: Training labels | |
| X_val: Validation features | |
| y_val: Validation labels | |
| epochs: Number of training epochs | |
| batch_size: Batch size | |
| model_dir: Directory to save model checkpoints | |
| """ | |
| if self.model is None: | |
| raise ValueError("Model must be built and compiled before training") | |
| # Create model directory | |
| model_path = Path(model_dir) | |
| model_path.mkdir(parents=True, exist_ok=True) | |
| # Define callbacks | |
| checkpoint_path = model_path / f"{self.model_name}_best.keras" | |
| callbacks_list = [ | |
| callbacks.ModelCheckpoint( | |
| str(checkpoint_path), | |
| monitor='val_loss', | |
| save_best_only=True, | |
| verbose=1 | |
| ), | |
| callbacks.EarlyStopping( | |
| monitor='val_loss', | |
| patience=10, | |
| restore_best_weights=True, | |
| verbose=1 | |
| ), | |
| callbacks.ReduceLROnPlateau( | |
| monitor='val_loss', | |
| factor=0.5, | |
| patience=5, | |
| min_lr=1e-7, | |
| verbose=1 | |
| ) | |
| ] | |
| print(f"\nTraining {self.model_name}...") | |
| print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}") | |
| print(f"Epochs: {epochs}, Batch size: {batch_size}") | |
| # Train model | |
| self.history = self.model.fit( | |
| X_train, y_train, | |
| validation_data=(X_val, y_val), | |
| epochs=epochs, | |
| batch_size=batch_size, | |
| callbacks=callbacks_list, | |
| verbose=1 | |
| ) | |
| print(f"\nTraining complete. Best model saved to {checkpoint_path}") | |
| return self.history | |
| def evaluate(self, X_test: np.ndarray, y_test: np.ndarray) -> Dict: | |
| """Evaluate model on test set.""" | |
| if self.model is None: | |
| raise ValueError("Model must be trained before evaluation") | |
| print(f"\nEvaluating {self.model_name}...") | |
| results = self.model.evaluate(X_test, y_test, verbose=1) | |
| # Get predictions | |
| y_pred_proba = self.model.predict(X_test) | |
| if self.num_classes == 2: | |
| y_pred = (y_pred_proba > 0.5).astype(int).flatten() | |
| else: | |
| y_pred = np.argmax(y_pred_proba, axis=1) | |
| evaluation_results = { | |
| 'loss': results[0], | |
| 'accuracy': results[1], | |
| 'predictions': y_pred, | |
| 'probabilities': y_pred_proba | |
| } | |
| if len(results) > 2: | |
| evaluation_results['auc'] = results[2] | |
| print(f"Test Loss: {results[0]:.4f}") | |
| print(f"Test Accuracy: {results[1]:.4f}") | |
| return evaluation_results | |
| def save(self, filepath: str): | |
| """Save model to disk.""" | |
| self.model.save(filepath) | |
| print(f"Model saved to {filepath}") | |
| def load(cls, filepath: str): | |
| """Load model from disk.""" | |
| model = keras.models.load_model(filepath) | |
| print(f"Model loaded from {filepath}") | |
| return model | |
| class LSTMModel: | |
| """LSTM model for sequential audio classification.""" | |
| def __init__(self, input_shape: Tuple, num_classes: int, model_name: str = "lstm_model"): | |
| """ | |
| Initialize LSTM model. | |
| Args: | |
| input_shape: Shape of input (time_steps, features) | |
| num_classes: Number of output classes | |
| model_name: Name of the model | |
| """ | |
| self.input_shape = input_shape | |
| self.num_classes = num_classes | |
| self.model_name = model_name | |
| self.model = None | |
| self.history = None | |
| def build_model(self, dropout_rate: float = 0.3): | |
| """ | |
| Build LSTM architecture. | |
| Args: | |
| dropout_rate: Dropout rate for regularization | |
| """ | |
| model = models.Sequential(name=self.model_name) | |
| # LSTM layers | |
| model.add(layers.LSTM(128, return_sequences=True, input_shape=self.input_shape)) | |
| model.add(layers.Dropout(dropout_rate)) | |
| model.add(layers.BatchNormalization()) | |
| model.add(layers.LSTM(64, return_sequences=True)) | |
| model.add(layers.Dropout(dropout_rate)) | |
| model.add(layers.BatchNormalization()) | |
| model.add(layers.LSTM(32)) | |
| model.add(layers.Dropout(dropout_rate)) | |
| # Dense layers | |
| model.add(layers.Dense(64, activation='relu')) | |
| model.add(layers.Dropout(dropout_rate)) | |
| # Output layer | |
| if self.num_classes == 2: | |
| model.add(layers.Dense(1, activation='sigmoid')) | |
| else: | |
| model.add(layers.Dense(self.num_classes, activation='softmax')) | |
| self.model = model | |
| print(f"\n{self.model_name} architecture:") | |
| self.model.summary() | |
| return model | |
| def compile_model(self, learning_rate: float = 0.001): | |
| """Compile the model.""" | |
| if self.model is None: | |
| raise ValueError("Model must be built before compilation") | |
| optimizer = keras.optimizers.Adam(learning_rate=learning_rate) | |
| if self.num_classes == 2: | |
| loss = 'binary_crossentropy' | |
| metrics = ['accuracy', keras.metrics.AUC(name='auc')] | |
| else: | |
| loss = 'sparse_categorical_crossentropy' | |
| metrics = ['accuracy'] | |
| self.model.compile( | |
| optimizer=optimizer, | |
| loss=loss, | |
| metrics=metrics | |
| ) | |
| print(f"Model compiled with optimizer={optimizer.__class__.__name__}, loss={loss}") | |
| def train(self, X_train: np.ndarray, y_train: np.ndarray, | |
| X_val: np.ndarray, y_val: np.ndarray, | |
| epochs: int = 50, batch_size: int = 32, | |
| model_dir: str = 'models'): | |
| """Train the LSTM model.""" | |
| if self.model is None: | |
| raise ValueError("Model must be built and compiled before training") | |
| # Create model directory | |
| model_path = Path(model_dir) | |
| model_path.mkdir(parents=True, exist_ok=True) | |
| # Define callbacks | |
| checkpoint_path = model_path / f"{self.model_name}_best.keras" | |
| callbacks_list = [ | |
| callbacks.ModelCheckpoint( | |
| str(checkpoint_path), | |
| monitor='val_loss', | |
| save_best_only=True, | |
| verbose=1 | |
| ), | |
| callbacks.EarlyStopping( | |
| monitor='val_loss', | |
| patience=10, | |
| restore_best_weights=True, | |
| verbose=1 | |
| ), | |
| callbacks.ReduceLROnPlateau( | |
| monitor='val_loss', | |
| factor=0.5, | |
| patience=5, | |
| min_lr=1e-7, | |
| verbose=1 | |
| ) | |
| ] | |
| print(f"\nTraining {self.model_name}...") | |
| print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}") | |
| # Train model | |
| self.history = self.model.fit( | |
| X_train, y_train, | |
| validation_data=(X_val, y_val), | |
| epochs=epochs, | |
| batch_size=batch_size, | |
| callbacks=callbacks_list, | |
| verbose=1 | |
| ) | |
| print(f"\nTraining complete. Best model saved to {checkpoint_path}") | |
| return self.history | |
| def evaluate(self, X_test: np.ndarray, y_test: np.ndarray) -> Dict: | |
| """Evaluate model on test set.""" | |
| if self.model is None: | |
| raise ValueError("Model must be trained before evaluation") | |
| print(f"\nEvaluating {self.model_name}...") | |
| results = self.model.evaluate(X_test, y_test, verbose=1) | |
| # Get predictions | |
| y_pred_proba = self.model.predict(X_test) | |
| if self.num_classes == 2: | |
| y_pred = (y_pred_proba > 0.5).astype(int).flatten() | |
| else: | |
| y_pred = np.argmax(y_pred_proba, axis=1) | |
| evaluation_results = { | |
| 'loss': results[0], | |
| 'accuracy': results[1], | |
| 'predictions': y_pred, | |
| 'probabilities': y_pred_proba | |
| } | |
| if len(results) > 2: | |
| evaluation_results['auc'] = results[2] | |
| print(f"Test Loss: {results[0]:.4f}") | |
| print(f"Test Accuracy: {results[1]:.4f}") | |
| return evaluation_results | |
| def save(self, filepath: str): | |
| """Save model to disk.""" | |
| self.model.save(filepath) | |
| print(f"Model saved to {filepath}") | |
| def load(cls, filepath: str): | |
| """Load model from disk.""" | |
| model = keras.models.load_model(filepath) | |
| print(f"Model loaded from {filepath}") | |
| return model | |
| if __name__ == "__main__": | |
| print("Deep learning models module loaded successfully") | |
| print("Available models: CNNModel, LSTMModel") | |