import random import torch import numpy as np import math, torchaudio import torch.nn as nn import torch.nn.functional as F import librosa import gradio as gr import os, glob from torchaudio.sox_effects import apply_effects_file from pathlib import Path ''' This is the ECAPA-TDNN model. This model is modified and combined based on the following three projects: 1. https://github.com/clovaai/voxceleb_trainer/issues/86 2. https://github.com/lawlict/ECAPA-TDNN/blob/master/ecapa_tdnn.py 3. https://github.com/speechbrain/speechbrain/blob/96077e9a1afff89d3f5ff47cab4bca0202770e4f/speechbrain/lobes/models/ECAPA_TDNN.py ''' class SEModule(nn.Module): def __init__(self, channels, bottleneck=128): super(SEModule, self).__init__() self.se = nn.Sequential( nn.AdaptiveAvgPool1d(1), nn.Conv1d(channels, bottleneck, kernel_size=1, padding=0), nn.ReLU(), # nn.BatchNorm1d(bottleneck), # I remove this layer nn.Conv1d(bottleneck, channels, kernel_size=1, padding=0), nn.Sigmoid(), ) def forward(self, input): x = self.se(input) return input * x class Bottle2neck(nn.Module): def __init__(self, inplanes, planes, kernel_size=None, dilation=None, scale=8): super(Bottle2neck, self).__init__() width = int(math.floor(planes / scale)) self.conv1 = nn.Conv1d(inplanes, width * scale, kernel_size=1) self.bn1 = nn.BatchNorm1d(width * scale) self.nums = scale - 1 convs = [] bns = [] num_pad = math.floor(kernel_size / 2) * dilation for i in range(self.nums): convs.append(nn.Conv1d(width, width, kernel_size=kernel_size, dilation=dilation, padding=num_pad)) bns.append(nn.BatchNorm1d(width)) self.convs = nn.ModuleList(convs) self.bns = nn.ModuleList(bns) self.conv3 = nn.Conv1d(width * scale, planes, kernel_size=1) self.bn3 = nn.BatchNorm1d(planes) self.relu = nn.ReLU() self.width = width self.se = SEModule(planes) def forward(self, x): residual = x out = self.conv1(x) out = self.relu(out) out = self.bn1(out) spx = torch.split(out, self.width, 1) for i in range(self.nums): if i == 0: sp = spx[i] else: sp = sp + spx[i] sp = self.convs[i](sp) sp = self.relu(sp) sp = self.bns[i](sp) if i == 0: out = sp else: out = torch.cat((out, sp), 1) out = torch.cat((out, spx[self.nums]), 1) out = self.conv3(out) out = self.relu(out) out = self.bn3(out) out = self.se(out) out += residual return out class SpeechEmbedder(nn.Module): def __init__(self, C=1024): super(SpeechEmbedder, self).__init__() self.conv1 = nn.Conv1d(40, C, kernel_size=5, stride=1, padding=2) self.relu = nn.ReLU() self.bn1 = nn.BatchNorm1d(C) self.layer1 = Bottle2neck(C, C, kernel_size=3, dilation=2, scale=8) self.layer2 = Bottle2neck(C, C, kernel_size=3, dilation=3, scale=8) self.layer3 = Bottle2neck(C, C, kernel_size=3, dilation=4, scale=8) # I fixed the shape of the output from MFA layer, that is close to the setting from ECAPA paper. self.layer4 = nn.Conv1d(3 * C, 1536, kernel_size=1) self.attention = nn.Sequential( nn.Conv1d(4608, 256, kernel_size=1), nn.ReLU(), nn.BatchNorm1d(256), nn.Tanh(), # I add this layer nn.Conv1d(256, 1536, kernel_size=1), nn.Softmax(dim=2), ) self.bn5 = nn.BatchNorm1d(3072) self.fc6 = nn.Linear(3072, 192) self.bn6 = nn.BatchNorm1d(192) def forward(self, x, aug=False): #x = x.permute(0, 2, 1) x = self.conv1(x) x = self.relu(x) x = self.bn1(x) x1 = self.layer1(x) x2 = self.layer2(x + x1) x3 = self.layer3(x + x1 + x2) x = self.layer4(torch.cat((x1, x2, x3), dim=1)) x = self.relu(x) t = x.size()[-1] global_x = torch.cat((x, torch.mean(x, dim=2, keepdim=True).repeat(1, 1, t), torch.sqrt(torch.var(x, dim=2, keepdim=True).clamp(min=1e-4)).repeat(1, 1, t)), dim=1) w = self.attention(global_x) mu = torch.sum(x * w, dim=2) sg = torch.sqrt((torch.sum((x ** 2) * w, dim=2) - mu ** 2).clamp(min=1e-4)) x = torch.cat((mu, sg), 1) x = self.bn5(x) x = self.fc6(x) x = self.bn6(x) return x def feature_extractor(input_file): """ Function for resampling to ensure that the speech input is sampled at 16KHz. """ # read the file speech, sample_rate = librosa.load(input_file) sr = 16000 #speech, sample_rate = librosa.core.load(input_file, sr) # make it 1-D if len(speech.shape) > 1: speech = speech[:, 0] + speech[:, 1] # Resampling at 16KHz if sample_rate != 16000: speech = librosa.resample(speech, sample_rate, 16000) intervals = librosa.effects.split(speech, top_db=30) # voice activity detection utterances_spec = [] tisv_frame = 180 # Max number of time steps in input after preprocess hop = 0.01 window = 0.025 sr = 16000 nfft = 512 # For mel spectrogram preprocess nmels = 40 # Number of mel energies utter_min_len = (tisv_frame * hop + window) * sr # lower bound of utterance length for interval in intervals: if (interval[1] - interval[0]) > utter_min_len: # If partial utterance is sufficient long, utter_part = speech[interval[0]:interval[1]] # save first and last 180 frames of spectrogram. S = librosa.core.stft(y=utter_part, n_fft=nfft, win_length = int(window * sr), hop_length = int( hop * sr)) S = np.abs(S) ** 2 mel_basis = librosa.filters.mel(sr=sr, n_fft=nfft, n_mels=nmels) S = np.log10(np.dot(mel_basis, S) + 1e-6) # log mel spectrogram of utterances a = S[:, :tisv_frame] # first 180 frames of partial utterance b = S[:, -tisv_frame:] # last 180 frames of partial utterance utterances_spec = np.concatenate((a, b), axis=1) utterances_spec = np.array(utterances_spec) return utterances_spec def similarity_fn(path1, path2): # path1 = 'path of the first wav file' # path2 = 'path of the second wav file' if not (path1 and path2): return 'ERROR: Please record audio for *both* speakers!' # Applying the effects to both the audio input files #wav1, _ = apply_effects_file(path1, EFFECTS) #wav2, _ = apply_effects_file(path2, EFFECTS) # Extracting features input1 = feature_extractor(path1) input1 = torch.from_numpy(input1).float() input1 = torch.unsqueeze(input1, 0) emb1 = model(input1) emb1 = torch.nn.functional.normalize(emb1, dim=-1).to(device) input2 = feature_extractor(path2) input2 = torch.from_numpy(input2).float() input2 = torch.unsqueeze(input2, 0) emb2 = model(input2) emb2 = torch.nn.functional.normalize(emb2, dim=-1).to(device) similarity = F.cosine_similarity(emb1, emb2).detach().numpy()[0] if similarity >= THRESHOLD: output = OUTPUT_OK.format(similarity * 100) else: output = OUTPUT_FAIL.format(similarity * 100) return output if __name__ == "__main__": random.seed(1234) torch.manual_seed(1234) np.random.seed(1234) device = "cuda" if torch.cuda.is_available() else "cpu" STYLE = """ """ OUTPUT_OK = ( STYLE + """

The speakers are

{:.1f}%

similar

Welcome, human!

(You must get at least 85% to be considered the same person)
""" ) OUTPUT_FAIL = ( STYLE + """

The speakers are

{:.1f}%

similar

You shall not pass!

(You must get at least 85% to be considered the same person)
""" ) EFFECTS = [ ['remix', '-'], # to merge all the channels ["channels", "1"], # channel-->mono ["rate", "16000"], # resample to 16000 Hz ["gain", "-1.0"], # Attenuation -1 dB ["silence", "1", "0.1", "0.1%", "-1", "0.1", "0.1%"], # ['pad', '0', '1.5'], # for adding 1.5 seconds at the end ['trim', '0', '10'], # get the first 10 seconds ] # Setting the threshold value THRESHOLD = 0.85 model = SpeechEmbedder().to(device) e = 500 batch_id = 112 save_model_filename = "final_epoch_" + str(e) + "_batch_id_" + str(batch_id + 1) + ".model" # Load the model # ------------------------- model.load_state_dict(torch.load(save_model_filename)) model.eval() inputs = [ gr.inputs.Audio(source="microphone", type="filepath", optional=True, label="Speaker #1"), gr.inputs.Audio(source="microphone", type="filepath", optional=True, label="Speaker #2"), #"text", #"text", ] # path1 = 'samples/SA1_timit_train_DR7_MWRP0.WAV' # path2 = 'samples/*.WAV' # similarity_fn(path1, path2) #output = gr.outputs.Textbox(label="Output Text") output = gr.outputs.HTML(label="") description = ("This app evaluates whether the given audio speech inputs belong to the same individual based on Cosine Similarity score.") path = os.getcwd() print(path) examples = [ ["samples/SA1_timit_train_DR7_MTLC0.WAV", "samples/SA1_timit_train_DR7_MWRP0.WAV"], ["samples/SA1_timit_train_DR7_MTLC0.WAV", "samples/SA1_timit_train_DR8_FCLT0.WAV"], ["samples/SA1_timit_train_DR7_MWRP0.WAV", "samples/SA2_timit_train_DR7_MWRP0.WAV"], ["samples/SA1_timit_train_DR8_FCLT0.WAV", "samples/SA2_timit_train_DR8_FCLT0.WAV"], ["samples/SA1_timit_train_DR8_FNKL0.WAV", "samples/SA2_timit_train_DR8_FNKL0.WAV"], ["samples/SA1_timit_train_DR8_FNKL0.WAV", "samples/SA1_timit_train_DR8_MCXM0.WAV"], ["samples/SA1_timit_train_DR7_MWRP0.WAV", "samples/cate_blanch_3.mp3"], ["samples/cate_blanch.mp3", "samples/cate_blanch_3.mp3"], ["samples/cate_blanch.mp3", "samples/leonardo_dicaprio.mp3"], ["samples/heath_ledger.mp3", "samples/heath_ledger_3.mp3"], ["samples/russel_crowe.mp3", "samples/russel_crowe_2.mp3"], ] interface = gr.Interface( fn=similarity_fn, inputs=inputs, outputs=output, title="Voice Authentication with ECAPA-TDNN", description=description, layout="horizontal", theme="grass", allow_flagging=False, live=False, examples=examples, ) interface.launch(enable_queue=True)(share=True)