Spaces:
Runtime error
Runtime error
| import random | |
| import torch | |
| import numpy as np | |
| import math, torchaudio | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import librosa | |
| import gradio as gr | |
| import os, glob | |
| from torchaudio.sox_effects import apply_effects_file | |
| from pathlib import Path | |
| ''' | |
| This is the ECAPA-TDNN model. | |
| This model is modified and combined based on the following three projects: | |
| 1. https://github.com/clovaai/voxceleb_trainer/issues/86 | |
| 2. https://github.com/lawlict/ECAPA-TDNN/blob/master/ecapa_tdnn.py | |
| 3. https://github.com/speechbrain/speechbrain/blob/96077e9a1afff89d3f5ff47cab4bca0202770e4f/speechbrain/lobes/models/ECAPA_TDNN.py | |
| ''' | |
| class SEModule(nn.Module): | |
| def __init__(self, channels, bottleneck=128): | |
| super(SEModule, self).__init__() | |
| self.se = nn.Sequential( | |
| nn.AdaptiveAvgPool1d(1), | |
| nn.Conv1d(channels, bottleneck, kernel_size=1, padding=0), | |
| nn.ReLU(), | |
| # nn.BatchNorm1d(bottleneck), # I remove this layer | |
| nn.Conv1d(bottleneck, channels, kernel_size=1, padding=0), | |
| nn.Sigmoid(), | |
| ) | |
| def forward(self, input): | |
| x = self.se(input) | |
| return input * x | |
| class Bottle2neck(nn.Module): | |
| def __init__(self, inplanes, planes, kernel_size=None, dilation=None, scale=8): | |
| super(Bottle2neck, self).__init__() | |
| width = int(math.floor(planes / scale)) | |
| self.conv1 = nn.Conv1d(inplanes, width * scale, kernel_size=1) | |
| self.bn1 = nn.BatchNorm1d(width * scale) | |
| self.nums = scale - 1 | |
| convs = [] | |
| bns = [] | |
| num_pad = math.floor(kernel_size / 2) * dilation | |
| for i in range(self.nums): | |
| convs.append(nn.Conv1d(width, width, kernel_size=kernel_size, dilation=dilation, padding=num_pad)) | |
| bns.append(nn.BatchNorm1d(width)) | |
| self.convs = nn.ModuleList(convs) | |
| self.bns = nn.ModuleList(bns) | |
| self.conv3 = nn.Conv1d(width * scale, planes, kernel_size=1) | |
| self.bn3 = nn.BatchNorm1d(planes) | |
| self.relu = nn.ReLU() | |
| self.width = width | |
| self.se = SEModule(planes) | |
| def forward(self, x): | |
| residual = x | |
| out = self.conv1(x) | |
| out = self.relu(out) | |
| out = self.bn1(out) | |
| spx = torch.split(out, self.width, 1) | |
| for i in range(self.nums): | |
| if i == 0: | |
| sp = spx[i] | |
| else: | |
| sp = sp + spx[i] | |
| sp = self.convs[i](sp) | |
| sp = self.relu(sp) | |
| sp = self.bns[i](sp) | |
| if i == 0: | |
| out = sp | |
| else: | |
| out = torch.cat((out, sp), 1) | |
| out = torch.cat((out, spx[self.nums]), 1) | |
| out = self.conv3(out) | |
| out = self.relu(out) | |
| out = self.bn3(out) | |
| out = self.se(out) | |
| out += residual | |
| return out | |
| class SpeechEmbedder(nn.Module): | |
| def __init__(self, C=1024): | |
| super(SpeechEmbedder, self).__init__() | |
| self.conv1 = nn.Conv1d(40, C, kernel_size=5, stride=1, padding=2) | |
| self.relu = nn.ReLU() | |
| self.bn1 = nn.BatchNorm1d(C) | |
| self.layer1 = Bottle2neck(C, C, kernel_size=3, dilation=2, scale=8) | |
| self.layer2 = Bottle2neck(C, C, kernel_size=3, dilation=3, scale=8) | |
| self.layer3 = Bottle2neck(C, C, kernel_size=3, dilation=4, scale=8) | |
| # I fixed the shape of the output from MFA layer, that is close to the setting from ECAPA paper. | |
| self.layer4 = nn.Conv1d(3 * C, 1536, kernel_size=1) | |
| self.attention = nn.Sequential( | |
| nn.Conv1d(4608, 256, kernel_size=1), | |
| nn.ReLU(), | |
| nn.BatchNorm1d(256), | |
| nn.Tanh(), # I add this layer | |
| nn.Conv1d(256, 1536, kernel_size=1), | |
| nn.Softmax(dim=2), | |
| ) | |
| self.bn5 = nn.BatchNorm1d(3072) | |
| self.fc6 = nn.Linear(3072, 192) | |
| self.bn6 = nn.BatchNorm1d(192) | |
| def forward(self, x, aug=False): | |
| #x = x.permute(0, 2, 1) | |
| x = self.conv1(x) | |
| x = self.relu(x) | |
| x = self.bn1(x) | |
| x1 = self.layer1(x) | |
| x2 = self.layer2(x + x1) | |
| x3 = self.layer3(x + x1 + x2) | |
| x = self.layer4(torch.cat((x1, x2, x3), dim=1)) | |
| x = self.relu(x) | |
| t = x.size()[-1] | |
| global_x = torch.cat((x, torch.mean(x, dim=2, keepdim=True).repeat(1, 1, t), | |
| torch.sqrt(torch.var(x, dim=2, keepdim=True).clamp(min=1e-4)).repeat(1, 1, t)), dim=1) | |
| w = self.attention(global_x) | |
| mu = torch.sum(x * w, dim=2) | |
| sg = torch.sqrt((torch.sum((x ** 2) * w, dim=2) - mu ** 2).clamp(min=1e-4)) | |
| x = torch.cat((mu, sg), 1) | |
| x = self.bn5(x) | |
| x = self.fc6(x) | |
| x = self.bn6(x) | |
| return x | |
| def feature_extractor(input_file): | |
| """ Function for resampling to ensure that the speech input is sampled at 16KHz. | |
| """ | |
| # read the file | |
| speech, sample_rate = librosa.load(input_file) | |
| sr = 16000 | |
| #speech, sample_rate = librosa.core.load(input_file, sr) | |
| # make it 1-D | |
| if len(speech.shape) > 1: | |
| speech = speech[:, 0] + speech[:, 1] | |
| # Resampling at 16KHz | |
| if sample_rate != 16000: | |
| speech = librosa.resample(speech, sample_rate, 16000) | |
| intervals = librosa.effects.split(speech, top_db=30) # voice activity detection | |
| utterances_spec = [] | |
| tisv_frame = 180 # Max number of time steps in input after preprocess | |
| hop = 0.01 | |
| window = 0.025 | |
| sr = 16000 | |
| nfft = 512 # For mel spectrogram preprocess | |
| nmels = 40 # Number of mel energies | |
| utter_min_len = (tisv_frame * hop + window) * sr # lower bound of utterance length | |
| for interval in intervals: | |
| if (interval[1] - interval[0]) > utter_min_len: # If partial utterance is sufficient long, | |
| utter_part = speech[interval[0]:interval[1]] # save first and last 180 frames of spectrogram. | |
| S = librosa.core.stft(y=utter_part, n_fft=nfft, win_length = int(window * sr), hop_length = int( | |
| hop * sr)) | |
| S = np.abs(S) ** 2 | |
| mel_basis = librosa.filters.mel(sr=sr, n_fft=nfft, n_mels=nmels) | |
| S = np.log10(np.dot(mel_basis, S) + 1e-6) # log mel spectrogram of utterances | |
| a = S[:, :tisv_frame] # first 180 frames of partial utterance | |
| b = S[:, -tisv_frame:] # last 180 frames of partial utterance | |
| utterances_spec = np.concatenate((a, b), axis=1) | |
| utterances_spec = np.array(utterances_spec) | |
| return utterances_spec | |
| def similarity_fn(path1, path2): | |
| # path1 = 'path of the first wav file' | |
| # path2 = 'path of the second wav file' | |
| if not (path1 and path2): | |
| return 'ERROR: Please record audio for *both* speakers!' | |
| # Applying the effects to both the audio input files | |
| #wav1, _ = apply_effects_file(path1, EFFECTS) | |
| #wav2, _ = apply_effects_file(path2, EFFECTS) | |
| # Extracting features | |
| input1 = feature_extractor(path1) | |
| input1 = torch.from_numpy(input1).float() | |
| input1 = torch.unsqueeze(input1, 0) | |
| emb1 = model(input1) | |
| emb1 = torch.nn.functional.normalize(emb1, dim=-1).to(device) | |
| input2 = feature_extractor(path2) | |
| input2 = torch.from_numpy(input2).float() | |
| input2 = torch.unsqueeze(input2, 0) | |
| emb2 = model(input2) | |
| emb2 = torch.nn.functional.normalize(emb2, dim=-1).to(device) | |
| similarity = F.cosine_similarity(emb1, emb2).detach().numpy()[0] | |
| if similarity >= THRESHOLD: | |
| output = OUTPUT_OK.format(similarity * 100) | |
| else: | |
| output = OUTPUT_FAIL.format(similarity * 100) | |
| return output | |
| if __name__ == "__main__": | |
| random.seed(1234) | |
| torch.manual_seed(1234) | |
| np.random.seed(1234) | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| STYLE = """ | |
| <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" integrity="sha256-YvdLHPgkqJ8DVUxjjnGVlMMJtNimJ6dYkowFFvp4kKs=" crossorigin="anonymous"> | |
| """ | |
| OUTPUT_OK = ( | |
| STYLE | |
| + """ | |
| <div class="container"> | |
| <div class="row"><h1 style="text-align: center">The speakers are</h1></div> | |
| <div class="row"><h1 class="display-1 text-success" style="text-align: center">{:.1f}%</h1></div> | |
| <div class="row"><h1 style="text-align: center">similar</h1></div> | |
| <div class="row"><h1 class="text-success" style="text-align: center">Welcome, human!</h1></div> | |
| <div class="row"><small style="text-align: center">(You must get at least 85% to be considered the same person)</small><div class="row"> | |
| </div> | |
| """ | |
| ) | |
| OUTPUT_FAIL = ( | |
| STYLE | |
| + """ | |
| <div class="container"> | |
| <div class="row"><h1 style="text-align: center">The speakers are</h1></div> | |
| <div class="row"><h1 class="display-1 text-danger" style="text-align: center">{:.1f}%</h1></div> | |
| <div class="row"><h1 style="text-align: center">similar</h1></div> | |
| <div class="row"><h1 class="text-danger" style="text-align: center">You shall not pass!</h1></div> | |
| <div class="row"><small style="text-align: center">(You must get at least 85% to be considered the same person)</small><div class="row"> | |
| </div> | |
| """ | |
| ) | |
| EFFECTS = [ | |
| ['remix', '-'], # to merge all the channels | |
| ["channels", "1"], # channel-->mono | |
| ["rate", "16000"], # resample to 16000 Hz | |
| ["gain", "-1.0"], # Attenuation -1 dB | |
| ["silence", "1", "0.1", "0.1%", "-1", "0.1", "0.1%"], | |
| # ['pad', '0', '1.5'], # for adding 1.5 seconds at the end | |
| ['trim', '0', '10'], # get the first 10 seconds | |
| ] | |
| # Setting the threshold value | |
| THRESHOLD = 0.85 | |
| model = SpeechEmbedder().to(device) | |
| e = 500 | |
| batch_id = 112 | |
| save_model_filename = "final_epoch_" + str(e) + "_batch_id_" + str(batch_id + 1) + ".model" | |
| # Load the model | |
| # ------------------------- | |
| model.load_state_dict(torch.load(save_model_filename)) | |
| model.eval() | |
| inputs = [ | |
| gr.inputs.Audio(source="microphone", type="filepath", optional=True, label="Speaker #1"), | |
| gr.inputs.Audio(source="microphone", type="filepath", optional=True, label="Speaker #2"), | |
| #"text", | |
| #"text", | |
| ] | |
| # path1 = 'samples/SA1_timit_train_DR7_MWRP0.WAV' | |
| # path2 = 'samples/*.WAV' | |
| # similarity_fn(path1, path2) | |
| #output = gr.outputs.Textbox(label="Output Text") | |
| output = gr.outputs.HTML(label="") | |
| description = ("This app evaluates whether the given audio speech inputs belong to the same individual based on Cosine Similarity score.") | |
| path = os.getcwd() | |
| print(path) | |
| examples = [ | |
| ["samples/SA1_timit_train_DR7_MTLC0.WAV", "samples/SA1_timit_train_DR7_MWRP0.WAV"], | |
| ["samples/SA1_timit_train_DR7_MTLC0.WAV", "samples/SA1_timit_train_DR8_FCLT0.WAV"], | |
| ["samples/SA1_timit_train_DR7_MWRP0.WAV", "samples/SA2_timit_train_DR7_MWRP0.WAV"], | |
| ["samples/SA1_timit_train_DR8_FCLT0.WAV", "samples/SA2_timit_train_DR8_FCLT0.WAV"], | |
| ["samples/SA1_timit_train_DR8_FNKL0.WAV", "samples/SA2_timit_train_DR8_FNKL0.WAV"], | |
| ["samples/SA1_timit_train_DR8_FNKL0.WAV", "samples/SA1_timit_train_DR8_MCXM0.WAV"], | |
| ["samples/SA1_timit_train_DR7_MWRP0.WAV", "samples/cate_blanch_3.mp3"], | |
| ["samples/cate_blanch.mp3", "samples/cate_blanch_3.mp3"], | |
| ["samples/cate_blanch.mp3", "samples/leonardo_dicaprio.mp3"], | |
| ["samples/heath_ledger.mp3", "samples/heath_ledger_3.mp3"], | |
| ["samples/russel_crowe.mp3", "samples/russel_crowe_2.mp3"], | |
| ] | |
| interface = gr.Interface( | |
| fn=similarity_fn, | |
| inputs=inputs, | |
| outputs=output, | |
| title="Voice Authentication with ECAPA-TDNN", | |
| description=description, | |
| layout="horizontal", | |
| theme="grass", | |
| allow_flagging=False, | |
| live=False, | |
| examples=examples, | |
| ) | |
| interface.launch(enable_queue=True)(share=True) |