| from predictions import get_embeddings, get_cosine_distance | |
| from utils.pt_util import restore_objects, save_model, save_objects, restore_model | |
| from utils.preprocessing import extract_fbanks | |
| from models.cross_entropy_model import FBankCrossEntropyNetV2 | |
| from trainer.cross_entropy_train import test, train | |
| import numpy as np | |
| import torch | |
| from data_proc.cross_entropy_dataset import FBanksCrossEntropyDataset, DataLoader | |
| import json | |
| from torch import optim | |
| import os | |
| os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' | |
| async def train_auth( | |
| train_dataset_path: str = 'dataset-speaker-csf/fbanks-train', | |
| test_dataset_path: str = 'dataset-speaker-csf/fbanks-test', | |
| model_name: str = 'fbanks-net-auth', | |
| model_layers : int = 4, | |
| epochs: int = 2, | |
| lr: float = 0.0005, | |
| batch_size: int = 16, | |
| labId: str = '', | |
| ): | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| import multiprocessing | |
| kwargs = {'num_workers': multiprocessing.cpu_count(), | |
| 'pin_memory': True} if torch.cuda.is_available() else {} | |
| try: | |
| train_dataset = FBanksCrossEntropyDataset(train_dataset_path) | |
| train_loader = DataLoader( | |
| train_dataset, batch_size=batch_size, shuffle=True, **kwargs) | |
| test_dataset = FBanksCrossEntropyDataset(test_dataset_path) | |
| test_loader = DataLoader( | |
| test_dataset, batch_size=batch_size, shuffle=True, **kwargs) | |
| except: | |
| return 'path dataset test or train is not exist' | |
| if model_name == 'fbanks-net-auth': | |
| model = FBankCrossEntropyNetV2(num_layers= model_layers, reduction='mean').to(device) | |
| else: | |
| model = None | |
| return {"model not exist in lab"} | |
| model_path = f'./modelDir/{labId}/log_train/{model_name}/{model_layers}/' | |
| model = restore_model(model, model_path) | |
| last_epoch, max_accuracy, train_losses, test_losses, train_accuracies, test_accuracies = restore_objects( | |
| model_path, (0, 0, [], [], [], [])) | |
| start = last_epoch + 1 if max_accuracy > 0 else 0 | |
| models_path = [] | |
| optimizer = optim.Adam(model.parameters(), lr=lr) | |
| for epoch in range(start, epochs): | |
| train_loss, train_accuracy = train( | |
| model, device, train_loader, optimizer, epoch, 500) | |
| test_loss, test_accuracy = test(model, device, test_loader) | |
| print('After epoch: {}, train_loss: {}, test loss is: {}, train_accuracy: {}, ' | |
| 'test_accuracy: {}'.format(epoch, train_loss, test_loss, train_accuracy, test_accuracy)) | |
| train_losses.append(train_loss) | |
| test_losses.append(test_loss) | |
| train_accuracies.append(train_accuracy) | |
| test_accuracies.append(test_accuracy) | |
| if test_accuracy > max_accuracy: | |
| max_accuracy = test_accuracy | |
| model_path = save_model(model, epoch, model_path) | |
| models_path.append(model_path) | |
| save_objects((epoch, max_accuracy, train_losses, test_losses, | |
| train_accuracies, test_accuracies), epoch, model_path) | |
| print('saved epoch: {} as checkpoint'.format(epoch)) | |
| train_history = { | |
| "train_accuracies": train_accuracies, | |
| "test_accuracies": test_accuracies, | |
| "train_losses": train_losses, | |
| "test_losses": test_losses, | |
| "model_path": models_path | |
| } | |
| return { | |
| 'history': json.dumps(train_history) | |
| } | |
| async def test_auth( | |
| test_dataset_path: str = 'dataset-speaker-csf/fbanks-test', | |
| model_name: str = 'fbanks-net-auth', | |
| model_layers : int = 4, | |
| batch_size: int = 2, | |
| labId: str = '', | |
| ): | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| import multiprocessing | |
| kwargs = {'num_workers': multiprocessing.cpu_count(), | |
| 'pin_memory': True} if torch.cuda.is_available() else {} | |
| try: | |
| test_dataset = FBanksCrossEntropyDataset(test_dataset_path) | |
| test_loader = DataLoader( | |
| test_dataset, batch_size=batch_size, shuffle=True, **kwargs) | |
| except: | |
| return 'path dataset test is not exist' | |
| model_folder_path = f'./modelDir/{labId}/log_train/{model_name}/{model_layers}/' | |
| for file in os.listdir(model_folder_path): | |
| if file.endswith(".pth"): | |
| model_path = os.path.join(model_folder_path, file) | |
| if model_name == 'fbanks-net-auth': | |
| try: | |
| model = FBankCrossEntropyNetV2(num_layers=model_layers, reduction= "mean") | |
| cpkt = torch.load(model_path) | |
| model.load_state_dict(cpkt) | |
| model.to(device) | |
| except: | |
| print('cuda load is error') | |
| device = torch.device("cpu") | |
| model = FBankCrossEntropyNetV2(num_layers=model_layers,reduction= "mean") | |
| cpkt = torch.load(model_path) | |
| model.load_state_dict(cpkt) | |
| model.to(device) | |
| else: | |
| model = None | |
| return {"model not exist in lab"} | |
| test_loss, accurancy_mean = test(model, device, test_loader) | |
| return { | |
| 'test_loss': test_loss, | |
| 'test_accuracy': accurancy_mean | |
| } | |
| async def infer_auth( | |
| speech_file_path: str = 'sample.wav', | |
| model_name: str = 'fbanks-net-auth', | |
| model_layers : int = 4, | |
| name_speaker: str = 'Hưng Phạm', | |
| threshold: float = 0.1, | |
| labId: str = '', | |
| ): | |
| speaker_path = f'./modelDir/{labId}/speaker/' | |
| dir_ = speaker_path + name_speaker | |
| if not os.path.exists(dir_): | |
| return {'message': 'name speaker is not exist,please add speaker'} | |
| model_folder_path = f'./modelDir/{labId}/log_train/{model_name}/{model_layers}/' | |
| for file in os.listdir(model_folder_path): | |
| if file.endswith(".pth"): | |
| model_path = os.path.join(model_folder_path, file) | |
| if model_name == 'fbanks-net-auth': | |
| try: | |
| model = FBankCrossEntropyNetV2(num_layers=model_layers, reduction= "mean") | |
| cpkt = torch.load(model_path) | |
| model.load_state_dict(cpkt) | |
| model.to(device) | |
| except: | |
| print('cuda load is error') | |
| device = torch.device("cpu") | |
| model = FBankCrossEntropyNetV2(num_layers=model_layers,reduction= "mean") | |
| cpkt = torch.load(model_path) | |
| model.load_state_dict(cpkt) | |
| model.to(device) | |
| else: | |
| model = None | |
| return {"model not exist in lab"} | |
| fbanks = extract_fbanks(speech_file_path) | |
| embeddings = get_embeddings(fbanks, model) | |
| stored_embeddings = np.load( | |
| speaker_path + name_speaker + '/embeddings.npy') | |
| stored_embeddings = stored_embeddings.reshape((1, -1)) | |
| distances = get_cosine_distance(embeddings, stored_embeddings) | |
| print('mean distances', np.mean(distances), flush=True) | |
| positives = distances < threshold | |
| positives_mean = np.mean(positives) | |
| if positives_mean >= threshold: | |
| return { | |
| "positives_mean": positives_mean, | |
| "name_speaker": name_speaker, | |
| "auth": True, | |
| } | |
| else: | |
| return { | |
| "positives_mean": positives_mean, | |
| "name_speaker": name_speaker, | |
| "auth": False, | |
| } | |
| if __name__ == '__main__': | |
| result = train_auth() | |
| print(result) |