import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset import numpy as np import os # Import the CNNRes2D class from your classification_network.py from classification_network import CNNRes2D # Adjust the path if your file structure is different from label import label_preprocessed_dataset # Assuming label.py contains the labeling function # Define the dataset class class NoisySpeechClassificationDataset(Dataset): def __init__(self, data_dir, labels): self.data_dir = data_dir self.labels = labels self.files = [f for f in os.listdir(data_dir) if f.endswith('.npy')] def __len__(self): return len(self.files) def __getitem__(self, idx): file_path = os.path.join(self.data_dir, self.files[idx]) spectrogram = np.load(file_path) label = self.labels[idx] return torch.tensor(spectrogram, dtype=torch.float32), torch.tensor(label, dtype=torch.long) # Paths preprocessed_test_dir = "/home/siddharth/Sid/ASR/ANC/Pre_processed_test_data" # Path to pre-processed test data models_path = "/home/siddharth/Sid/ASR/ANC/models" # Path to your trained models for labeling data_dir = "/home/siddharth/Sid/ASR/ANC/Pre_processed_test_data/noisy" # Path to your pre-processed noisy data labels_output_path = "labels.npy" # Path where labels will be saved # Hyperparameters batch_size = 32 num_epochs = 25 learning_rate = 0.001 num_classes = 15 # Assuming 15 classes based on your classification task # Device configuration device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def main(): # Step 1: Label the dataset using label.py models = [torch.load(os.path.join(models_path, f"model_{i}.pth"), map_location=device) for i in range(num_classes)] labels = label_preprocessed_dataset(preprocessed_test_dir, models) np.save(labels_output_path, labels) print(f"Labels saved to {labels_output_path}") # Step 2: Create dataset and data loader dataset = NoisySpeechClassificationDataset(data_dir, labels) train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True) # Step 3: Initialize the model, loss function, and optimizer model = CNNRes2D( channels=[[128], [128]*2], conv_kernels=[(3, 3), (3, 3)], conv_strides=[(1, 1), (1, 1)], conv_padding=[(1, 1), (1, 1)], pool_padding=[(0, 0), (0, 0)], num_classes=num_classes ).to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) # Step 4: Train the model model.train() for epoch in range(num_epochs): running_loss = 0.0 for i, (inputs, labels) in enumerate(train_loader): inputs = inputs.unsqueeze(1).to(device) # Add channel dimension for Conv2D labels = labels.to(device) # Zero the parameter gradients optimizer.zero_grad() # Forward pass outputs = model(inputs) loss = criterion(outputs, labels) # Backward pass and optimize loss.backward() optimizer.step() running_loss += loss.item() if (i + 1) % 10 == 0: print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}') print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {running_loss/len(train_loader):.4f}') # Step 5: Save the trained model torch.save(model.state_dict(), "classification_model.pth") print("Model saved to classification_model.pth") if __name__ == "__main__": main()