|
|
import torch |
|
|
import numpy as np |
|
|
import os |
|
|
from torch.utils.data import DataLoader |
|
|
from test_trim import NoisySpeechTestDataset |
|
|
from Deep_ANC_model_trim import CRN |
|
|
|
|
|
|
|
|
|
|
|
preprocessed_test_dir = "/home/siddharth/Sid/ASR/ANC/Pre_processed_test_data" |
|
|
models_path = "/home/siddharth/Sid/ASR/ANC/models" |
|
|
labels_output_path = "labels.npy" |
|
|
|
|
|
|
|
|
model_filenames = [f"model_{i}.pth" for i in range(15)] |
|
|
|
|
|
|
|
|
models = [] |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
for model_filename in model_filenames: |
|
|
model = CRN().to(device) |
|
|
model_path = os.path.join(models_path, model_filename) |
|
|
|
|
|
|
|
|
state_dict = torch.load(model_path, map_location=device) |
|
|
new_state_dict = {k[7:] if k.startswith("module.") else k: v for k, v in state_dict.items()} |
|
|
model.load_state_dict(new_state_dict) |
|
|
model.eval() |
|
|
models.append(model) |
|
|
|
|
|
|
|
|
def calculate_snr(noisy, denoised): |
|
|
signal_power = np.mean(denoised ** 2) |
|
|
noise_power = np.mean((noisy - denoised) ** 2) |
|
|
snr = 10 * np.log10(signal_power / noise_power) |
|
|
return snr |
|
|
|
|
|
|
|
|
def label_preprocessed_dataset(preprocessed_test_dir, models): |
|
|
labels = [] |
|
|
|
|
|
test_dataset = NoisySpeechTestDataset(os.path.join(preprocessed_test_dir, 'noisy')) |
|
|
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False) |
|
|
|
|
|
for noisy_spectrogram, noisy_path in test_loader: |
|
|
noisy_spectrogram = noisy_spectrogram.squeeze(0) |
|
|
noisy_path_str = noisy_path[0] |
|
|
|
|
|
best_snr = -np.inf |
|
|
best_model_idx = -1 |
|
|
|
|
|
for i, model in enumerate(models): |
|
|
with torch.no_grad(): |
|
|
|
|
|
denoised_output = model(noisy_spectrogram.unsqueeze(0).to(device)).squeeze(0) |
|
|
|
|
|
|
|
|
snr_improvement = calculate_snr(noisy_spectrogram.cpu().numpy(), denoised_output.cpu().numpy()) |
|
|
|
|
|
if snr_improvement > best_snr: |
|
|
best_snr = snr_improvement |
|
|
best_model_idx = i |
|
|
|
|
|
|
|
|
labels.append(best_model_idx) |
|
|
|
|
|
return np.array(labels) |
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
labels = label_preprocessed_dataset(preprocessed_test_dir, models) |
|
|
|
|
|
|
|
|
np.save(labels_output_path, labels) |
|
|
print(f"Labels saved to {labels_output_path}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|