Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import argparse | |
| import torch | |
| import torchaudio | |
| import torchvision | |
| from torch.utils.data import Dataset, DataLoader | |
| from torch.utils.tensorboard import SummaryWriter | |
| import numpy as np | |
| from efficient_model import MobileNetGRUModel, EfficientNetCNNModel, SqueezeNetTransformerModel | |
| # Print library version information | |
| print(f"\033[92mINFO\033[0m: PyTorch version: {torch.__version__}") | |
| print(f"\033[92mINFO\033[0m: Torchaudio version: {torchaudio.__version__}") | |
| print(f"\033[92mINFO\033[0m: Torchvision version: {torchvision.__version__}") | |
| # Device selection | |
| device = torch.device( | |
| "cuda" | |
| if torch.cuda.is_available() | |
| else "mps" if torch.backends.mps.is_available() else "cpu" | |
| ) | |
| print(f"\033[92mINFO\033[0m: Using device: {device}") | |
| # Hyperparameters (using the best configuration from search) | |
| batch_size = 4 | |
| epochs = 20 | |
| fc_hidden_size = 64 | |
| learning_rate = 0.0005 | |
| dropout_rate = 0.5 | |
| # Model save directory | |
| os.makedirs("./models/", exist_ok=True) | |
| class PreprocessedDataset(Dataset): | |
| def __init__(self, data_dir): | |
| self.data_dir = data_dir | |
| self.samples = [ | |
| os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith(".pt") | |
| ] | |
| def __len__(self): | |
| return len(self.samples) | |
| def __getitem__(self, idx): | |
| sample_path = self.samples[idx] | |
| mfcc, image, label = torch.load(sample_path) | |
| return mfcc.float(), image.float(), label | |
| def calculate_mae(outputs, labels): | |
| """Calculate Mean Absolute Error between outputs and labels""" | |
| return torch.abs(outputs - labels).mean().item() | |
| def evaluate_model(model, test_loader, criterion): | |
| model.eval() | |
| test_loss = 0.0 | |
| mae_sum = 0.0 | |
| all_predictions = [] | |
| all_labels = [] | |
| # For debugging | |
| debug_samples = [] | |
| with torch.no_grad(): | |
| for mfcc, image, label in test_loader: | |
| mfcc, image, label = mfcc.to(device), image.to(device), label.to(device) | |
| output = model(mfcc, image) | |
| label = label.view(-1, 1).float() | |
| # Store debug samples (handling batch dimension properly) | |
| if len(debug_samples) < 5: | |
| # Extract individual samples from the batch | |
| for i in range(min(len(output), 5 - len(debug_samples))): | |
| debug_samples.append((output[i][0].item(), label[i][0].item())) | |
| # Calculate MSE loss | |
| loss = criterion(output, label) | |
| test_loss += loss.item() | |
| # Calculate MAE | |
| mae = torch.abs(output - label).mean() | |
| mae_sum += mae.item() | |
| # Store predictions and labels for additional analysis | |
| all_predictions.extend(output.cpu().numpy()) | |
| all_labels.extend(label.cpu().numpy()) | |
| avg_loss = test_loss / len(test_loader) | |
| avg_mae = mae_sum / len(test_loader) | |
| # Convert to numpy arrays for easier analysis | |
| all_predictions = np.array(all_predictions).flatten() | |
| all_labels = np.array(all_labels).flatten() | |
| # Print debug samples | |
| print("\nDEBUG SAMPLES (Prediction, Label):") | |
| for i, (pred, label) in enumerate(debug_samples): | |
| print(f"Sample {i+1}: Prediction = {pred:.4f}, Label = {label:.4f}, Difference = {abs(pred-label):.4f}") | |
| return avg_loss, avg_mae, all_predictions, all_labels | |
| def train_model(model_type): | |
| try: | |
| # Create model based on type | |
| if model_type == "mobilenet_gru": | |
| model = MobileNetGRUModel( | |
| gru_hidden_size=32, | |
| gru_layers=1, | |
| fc_hidden_size=fc_hidden_size, | |
| dropout_rate=dropout_rate | |
| ).to(device) | |
| model_name = "MobileNetGRU" | |
| elif model_type == "efficientnet_cnn": | |
| model = EfficientNetCNNModel( | |
| fc_hidden_size=fc_hidden_size, | |
| dropout_rate=dropout_rate | |
| ).to(device) | |
| model_name = "EfficientNetCNN" | |
| elif model_type == "squeezenet_transformer": | |
| model = SqueezeNetTransformerModel( | |
| nhead=4, | |
| dim_feedforward=128, | |
| fc_hidden_size=fc_hidden_size, | |
| dropout_rate=dropout_rate | |
| ).to(device) | |
| model_name = "SqueezeNetTransformer" | |
| else: | |
| raise ValueError(f"Unknown model type: {model_type}") | |
| # Data loading | |
| data_dir = "./processed/" | |
| dataset = PreprocessedDataset(data_dir) | |
| n_samples = len(dataset) | |
| # Check label range | |
| all_labels = [] | |
| for i in range(min(10, len(dataset))): | |
| _, _, label = dataset[i] | |
| all_labels.append(label) | |
| print("\nLABEL RANGE CHECK:") | |
| print(f"Sample labels: {all_labels}") | |
| print(f"Min label: {min(all_labels)}, Max label: {max(all_labels)}") | |
| train_size = int(0.7 * n_samples) | |
| val_size = int(0.2 * n_samples) | |
| test_size = n_samples - train_size - val_size | |
| train_dataset, val_dataset, test_dataset = torch.utils.data.random_split( | |
| dataset, [train_size, val_size, test_size] | |
| ) | |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) | |
| val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) | |
| test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) | |
| # Loss function and optimizer | |
| criterion = torch.nn.MSELoss() | |
| optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) | |
| # TensorBoard | |
| writer = SummaryWriter(f"runs/{model_name}/") | |
| global_step = 0 | |
| print(f"\033[92mINFO\033[0m: Training {model_name} model for {epochs} epochs") | |
| print(f"\033[92mINFO\033[0m: Training samples: {len(train_dataset)}") | |
| print(f"\033[92mINFO\033[0m: Validation samples: {len(val_dataset)}") | |
| print(f"\033[92mINFO\033[0m: Test samples: {len(test_dataset)}") | |
| print(f"\033[92mINFO\033[0m: Batch size: {batch_size}") | |
| print(f"\033[92mINFO\033[0m: Learning rate: {learning_rate}") | |
| print(f"\033[92mINFO\033[0m: Dropout rate: {dropout_rate}") | |
| best_val_loss = float('inf') | |
| best_model_path = None | |
| # Calculate model size | |
| model_size = sum(p.numel() for p in model.parameters()) / 1e6 # in millions | |
| print(f"\033[92mINFO\033[0m: Model parameters: {model_size:.2f}M") | |
| # Training loop | |
| for epoch in range(epochs): | |
| print(f"\033[92mINFO\033[0m: Training epoch ({epoch+1}/{epochs})") | |
| model.train() | |
| running_loss = 0.0 | |
| running_mae = 0.0 | |
| n_batches = 0 | |
| start_time = time.time() | |
| try: | |
| for mfcc, image, label in train_loader: | |
| mfcc, image, label = mfcc.to(device), image.to(device), label.to(device) | |
| optimizer.zero_grad() | |
| output = model(mfcc, image) | |
| label = label.view(-1, 1).float() | |
| loss = criterion(output, label) | |
| loss.backward() | |
| optimizer.step() | |
| running_loss += loss.item() | |
| running_mae += calculate_mae(output, label) | |
| n_batches += 1 | |
| writer.add_scalar("Training/Loss", loss.item(), global_step) | |
| writer.add_scalar("Training/MAE", calculate_mae(output, label), global_step) | |
| global_step += 1 | |
| except Exception as e: | |
| print(f"\033[91mERR!\033[0m: {e}") | |
| epoch_time = time.time() - start_time | |
| # Validation phase | |
| model.eval() | |
| val_loss = 0.0 | |
| val_mae = 0.0 | |
| val_batches = 0 | |
| with torch.no_grad(): | |
| try: | |
| for mfcc, image, label in val_loader: | |
| mfcc, image, label = ( | |
| mfcc.to(device), | |
| image.to(device), | |
| label.to(device), | |
| ) | |
| output = model(mfcc, image) | |
| label = label.view(-1, 1).float() | |
| # Calculate loss | |
| loss = criterion(output, label) | |
| val_loss += loss.item() | |
| # Calculate MAE | |
| val_mae += calculate_mae(output, label) | |
| val_batches += 1 | |
| except Exception as e: | |
| print(f"\033[91mERR!\033[0m: {e}") | |
| avg_train_loss = running_loss / n_batches | |
| avg_train_mae = running_mae / n_batches | |
| avg_val_loss = val_loss / val_batches | |
| avg_val_mae = val_mae / val_batches | |
| # Record validation metrics | |
| writer.add_scalar("Validation/Loss", avg_val_loss, epoch) | |
| writer.add_scalar("Validation/MAE", avg_val_mae, epoch) | |
| print( | |
| f"Epoch [{epoch+1}/{epochs}], Time: {epoch_time:.2f}s, " | |
| f"Train Loss: {avg_train_loss:.4f}, Train MAE: {avg_train_mae:.4f}, " | |
| f"Val Loss: {avg_val_loss:.4f}, Val MAE: {avg_val_mae:.4f}" | |
| ) | |
| # Save model checkpoint | |
| timestamp = time.strftime("%Y%m%d-%H%M%S") | |
| model_path = f"models/{model_name}_model_{epoch+1}_{timestamp}.pt" | |
| torch.save(model.state_dict(), model_path) | |
| # Save the best model based on validation loss | |
| if avg_val_loss < best_val_loss: | |
| best_val_loss = avg_val_loss | |
| best_model_path = model_path | |
| print(f"\033[92mINFO\033[0m: New best model saved with validation loss: {best_val_loss:.4f}") | |
| print( | |
| f"\033[92mINFO\033[0m: Model checkpoint epoch [{epoch+1}/{epochs}] saved: {model_path}" | |
| ) | |
| print(f"\033[92mINFO\033[0m: Training complete") | |
| # Load the best model for testing | |
| print(f"\033[92mINFO\033[0m: Loading best model from {best_model_path} for testing") | |
| model.load_state_dict(torch.load(best_model_path)) | |
| # Evaluate on test set | |
| test_loss, test_mae, predictions, labels = evaluate_model(model, test_loader, criterion) | |
| # Calculate additional metrics | |
| max_error = np.max(np.abs(predictions - labels)) | |
| min_error = np.min(np.abs(predictions - labels)) | |
| print("\n" + "="*50) | |
| print(f"TEST RESULTS FOR {model_name}:") | |
| print(f"Test Loss (MSE): {test_loss:.4f}") | |
| print(f"Mean Absolute Error: {test_mae:.4f}") | |
| print(f"Maximum Absolute Error: {max_error:.4f}") | |
| print(f"Minimum Absolute Error: {min_error:.4f}") | |
| # Add test results to TensorBoard | |
| writer.add_scalar("Test/MSE", test_loss, 0) | |
| writer.add_scalar("Test/MAE", test_mae, 0) | |
| writer.add_scalar("Test/Max_Error", max_error, 0) | |
| writer.add_scalar("Test/Min_Error", min_error, 0) | |
| # Create a histogram of absolute errors | |
| abs_errors = np.abs(predictions - labels) | |
| writer.add_histogram("Test/Absolute_Errors", abs_errors, 0) | |
| print("="*50) | |
| # Final summary | |
| print("\nTRAINING SUMMARY:") | |
| print(f"Model: {model_name}") | |
| print(f"Model Size: {model_size:.2f}M parameters") | |
| print(f"Best Validation Loss: {best_val_loss:.4f}") | |
| print(f"Final Test Loss: {test_loss:.4f}") | |
| print(f"Final Test MAE: {test_mae:.4f}") | |
| print(f"Best model saved at: {best_model_path}") | |
| writer.close() | |
| # Return metrics for comparison | |
| return { | |
| "model_name": model_name, | |
| "model_size": model_size, | |
| "val_loss": best_val_loss, | |
| "test_loss": test_loss, | |
| "test_mae": test_mae, | |
| "model_path": best_model_path | |
| } | |
| except Exception as e: | |
| print(f"\033[91mERR!\033[0m: Error training {model_type}: {e}") | |
| # Return a placeholder result | |
| return { | |
| "model_name": model_type, | |
| "model_size": 0, | |
| "val_loss": float('inf'), | |
| "test_loss": float('inf'), | |
| "test_mae": float('inf'), | |
| "model_path": None, | |
| "error": str(e) | |
| } | |
| def test_cpu_inference(model_path, model_type): | |
| """Test CPU inference speed for the given model""" | |
| # Create model based on type | |
| if model_type == "mobilenet_gru": | |
| model = MobileNetGRUModel( | |
| gru_hidden_size=32, | |
| gru_layers=1, | |
| fc_hidden_size=fc_hidden_size, | |
| dropout_rate=dropout_rate | |
| ) | |
| model_name = "MobileNetGRU" | |
| elif model_type == "efficientnet_cnn": | |
| model = EfficientNetCNNModel( | |
| fc_hidden_size=fc_hidden_size, | |
| dropout_rate=dropout_rate | |
| ) | |
| model_name = "EfficientNetCNN" | |
| elif model_type == "squeezenet_transformer": | |
| model = SqueezeNetTransformerModel( | |
| nhead=4, | |
| dim_feedforward=128, | |
| fc_hidden_size=fc_hidden_size, | |
| dropout_rate=dropout_rate | |
| ) | |
| model_name = "SqueezeNetTransformer" | |
| else: | |
| raise ValueError(f"Unknown model type: {model_type}") | |
| # Load model weights | |
| model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu'))) | |
| model.eval() | |
| # Create dummy input | |
| dummy_mfcc = torch.randn(1, 10, 376) # Batch size 1, 10 time steps, 376 features | |
| dummy_image = torch.randn(1, 3, 224, 224) # Batch size 1, 3 channels, 224x224 image | |
| # Warm-up | |
| for _ in range(10): | |
| _ = model(dummy_mfcc, dummy_image) | |
| # Measure inference time | |
| num_runs = 100 | |
| start_time = time.time() | |
| for _ in range(num_runs): | |
| _ = model(dummy_mfcc, dummy_image) | |
| end_time = time.time() | |
| avg_time = (end_time - start_time) / num_runs | |
| print(f"\n{model_name} CPU Inference Time:") | |
| print(f"Average over {num_runs} runs: {avg_time*1000:.2f} ms") | |
| return avg_time | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Train and evaluate efficient models") | |
| parser.add_argument( | |
| "--model", | |
| type=str, | |
| choices=["mobilenet_gru", "efficientnet_cnn", "squeezenet_transformer", "all"], | |
| default="all", | |
| help="Model architecture to train" | |
| ) | |
| args = parser.parse_args() | |
| results = [] | |
| if args.model == "all": | |
| # Train all models | |
| for model_type in ["mobilenet_gru", "efficientnet_cnn", "squeezenet_transformer"]: | |
| print(f"\n\n{'='*50}") | |
| print(f"TRAINING {model_type.upper()}") | |
| print(f"{'='*50}\n") | |
| result = train_model(model_type) | |
| results.append(result) | |
| # Test CPU inference | |
| inference_time = test_cpu_inference(result["model_path"], model_type) | |
| result["inference_time"] = inference_time | |
| else: | |
| # Train specific model | |
| result = train_model(args.model) | |
| results.append(result) | |
| # Test CPU inference | |
| inference_time = test_cpu_inference(result["model_path"], args.model) | |
| result["inference_time"] = inference_time | |
| # Compare results | |
| print("\n\n" + "="*80) | |
| print("MODEL COMPARISON") | |
| print("="*80) | |
| print(f"{'Model':<25} {'Size (M)':<10} {'Val Loss':<10} {'Test Loss':<10} {'Test MAE':<10} {'CPU Time (ms)':<15}") | |
| print("-"*80) | |
| for result in results: | |
| print(f"{result['model_name']:<25} {result['model_size']:<10.2f} {result['val_loss']:<10.4f} " | |
| f"{result['test_loss']:<10.4f} {result['test_mae']:<10.4f} {result['inference_time']*1000:<15.2f}") | |
| print("="*80) | |
| # Find best model | |
| best_model = min(results, key=lambda x: x["test_mae"]) | |
| print(f"\nBEST MODEL: {best_model['model_name']}") | |
| print(f"Test MAE: {best_model['test_mae']:.4f}") | |
| print(f"CPU Inference Time: {best_model['inference_time']*1000:.2f} ms") | |
| print(f"Model Path: {best_model['model_path']}") |