ASV_new / app.py
monisankha
add it
d0ac1b9
import random
import torch
import numpy as np
import math, torchaudio
import torch.nn as nn
import torch.nn.functional as F
import librosa
import gradio as gr
import os, glob
from torchaudio.sox_effects import apply_effects_file
from pathlib import Path
'''
This is the ECAPA-TDNN model.
This model is modified and combined based on the following three projects:
1. https://github.com/clovaai/voxceleb_trainer/issues/86
2. https://github.com/lawlict/ECAPA-TDNN/blob/master/ecapa_tdnn.py
3. https://github.com/speechbrain/speechbrain/blob/96077e9a1afff89d3f5ff47cab4bca0202770e4f/speechbrain/lobes/models/ECAPA_TDNN.py
'''
class SEModule(nn.Module):
def __init__(self, channels, bottleneck=128):
super(SEModule, self).__init__()
self.se = nn.Sequential(
nn.AdaptiveAvgPool1d(1),
nn.Conv1d(channels, bottleneck, kernel_size=1, padding=0),
nn.ReLU(),
# nn.BatchNorm1d(bottleneck), # I remove this layer
nn.Conv1d(bottleneck, channels, kernel_size=1, padding=0),
nn.Sigmoid(),
)
def forward(self, input):
x = self.se(input)
return input * x
class Bottle2neck(nn.Module):
def __init__(self, inplanes, planes, kernel_size=None, dilation=None, scale=8):
super(Bottle2neck, self).__init__()
width = int(math.floor(planes / scale))
self.conv1 = nn.Conv1d(inplanes, width * scale, kernel_size=1)
self.bn1 = nn.BatchNorm1d(width * scale)
self.nums = scale - 1
convs = []
bns = []
num_pad = math.floor(kernel_size / 2) * dilation
for i in range(self.nums):
convs.append(nn.Conv1d(width, width, kernel_size=kernel_size, dilation=dilation, padding=num_pad))
bns.append(nn.BatchNorm1d(width))
self.convs = nn.ModuleList(convs)
self.bns = nn.ModuleList(bns)
self.conv3 = nn.Conv1d(width * scale, planes, kernel_size=1)
self.bn3 = nn.BatchNorm1d(planes)
self.relu = nn.ReLU()
self.width = width
self.se = SEModule(planes)
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.relu(out)
out = self.bn1(out)
spx = torch.split(out, self.width, 1)
for i in range(self.nums):
if i == 0:
sp = spx[i]
else:
sp = sp + spx[i]
sp = self.convs[i](sp)
sp = self.relu(sp)
sp = self.bns[i](sp)
if i == 0:
out = sp
else:
out = torch.cat((out, sp), 1)
out = torch.cat((out, spx[self.nums]), 1)
out = self.conv3(out)
out = self.relu(out)
out = self.bn3(out)
out = self.se(out)
out += residual
return out
class SpeechEmbedder(nn.Module):
def __init__(self, C=1024):
super(SpeechEmbedder, self).__init__()
self.conv1 = nn.Conv1d(40, C, kernel_size=5, stride=1, padding=2)
self.relu = nn.ReLU()
self.bn1 = nn.BatchNorm1d(C)
self.layer1 = Bottle2neck(C, C, kernel_size=3, dilation=2, scale=8)
self.layer2 = Bottle2neck(C, C, kernel_size=3, dilation=3, scale=8)
self.layer3 = Bottle2neck(C, C, kernel_size=3, dilation=4, scale=8)
# I fixed the shape of the output from MFA layer, that is close to the setting from ECAPA paper.
self.layer4 = nn.Conv1d(3 * C, 1536, kernel_size=1)
self.attention = nn.Sequential(
nn.Conv1d(4608, 256, kernel_size=1),
nn.ReLU(),
nn.BatchNorm1d(256),
nn.Tanh(), # I add this layer
nn.Conv1d(256, 1536, kernel_size=1),
nn.Softmax(dim=2),
)
self.bn5 = nn.BatchNorm1d(3072)
self.fc6 = nn.Linear(3072, 192)
self.bn6 = nn.BatchNorm1d(192)
def forward(self, x, aug=False):
#x = x.permute(0, 2, 1)
x = self.conv1(x)
x = self.relu(x)
x = self.bn1(x)
x1 = self.layer1(x)
x2 = self.layer2(x + x1)
x3 = self.layer3(x + x1 + x2)
x = self.layer4(torch.cat((x1, x2, x3), dim=1))
x = self.relu(x)
t = x.size()[-1]
global_x = torch.cat((x, torch.mean(x, dim=2, keepdim=True).repeat(1, 1, t),
torch.sqrt(torch.var(x, dim=2, keepdim=True).clamp(min=1e-4)).repeat(1, 1, t)), dim=1)
w = self.attention(global_x)
mu = torch.sum(x * w, dim=2)
sg = torch.sqrt((torch.sum((x ** 2) * w, dim=2) - mu ** 2).clamp(min=1e-4))
x = torch.cat((mu, sg), 1)
x = self.bn5(x)
x = self.fc6(x)
x = self.bn6(x)
return x
def feature_extractor(input_file):
""" Function for resampling to ensure that the speech input is sampled at 16KHz.
"""
# read the file
speech, sample_rate = librosa.load(input_file)
sr = 16000
#speech, sample_rate = librosa.core.load(input_file, sr)
# make it 1-D
if len(speech.shape) > 1:
speech = speech[:, 0] + speech[:, 1]
# Resampling at 16KHz
if sample_rate != 16000:
speech = librosa.resample(speech, sample_rate, 16000)
intervals = librosa.effects.split(speech, top_db=30) # voice activity detection
utterances_spec = []
tisv_frame = 180 # Max number of time steps in input after preprocess
hop = 0.01
window = 0.025
sr = 16000
nfft = 512 # For mel spectrogram preprocess
nmels = 40 # Number of mel energies
utter_min_len = (tisv_frame * hop + window) * sr # lower bound of utterance length
for interval in intervals:
if (interval[1] - interval[0]) > utter_min_len: # If partial utterance is sufficient long,
utter_part = speech[interval[0]:interval[1]] # save first and last 180 frames of spectrogram.
S = librosa.core.stft(y=utter_part, n_fft=nfft, win_length = int(window * sr), hop_length = int(
hop * sr))
S = np.abs(S) ** 2
mel_basis = librosa.filters.mel(sr=sr, n_fft=nfft, n_mels=nmels)
S = np.log10(np.dot(mel_basis, S) + 1e-6) # log mel spectrogram of utterances
a = S[:, :tisv_frame] # first 180 frames of partial utterance
b = S[:, -tisv_frame:] # last 180 frames of partial utterance
utterances_spec = np.concatenate((a, b), axis=1)
utterances_spec = np.array(utterances_spec)
return utterances_spec
def similarity_fn(path1, path2):
# path1 = 'path of the first wav file'
# path2 = 'path of the second wav file'
if not (path1 and path2):
return 'ERROR: Please record audio for *both* speakers!'
# Applying the effects to both the audio input files
#wav1, _ = apply_effects_file(path1, EFFECTS)
#wav2, _ = apply_effects_file(path2, EFFECTS)
# Extracting features
input1 = feature_extractor(path1)
input1 = torch.from_numpy(input1).float()
input1 = torch.unsqueeze(input1, 0)
emb1 = model(input1)
emb1 = torch.nn.functional.normalize(emb1, dim=-1).to(device)
input2 = feature_extractor(path2)
input2 = torch.from_numpy(input2).float()
input2 = torch.unsqueeze(input2, 0)
emb2 = model(input2)
emb2 = torch.nn.functional.normalize(emb2, dim=-1).to(device)
similarity = F.cosine_similarity(emb1, emb2).detach().numpy()[0]
if similarity >= THRESHOLD:
output = OUTPUT_OK.format(similarity * 100)
else:
output = OUTPUT_FAIL.format(similarity * 100)
return output
if __name__ == "__main__":
random.seed(1234)
torch.manual_seed(1234)
np.random.seed(1234)
device = "cuda" if torch.cuda.is_available() else "cpu"
STYLE = """
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" integrity="sha256-YvdLHPgkqJ8DVUxjjnGVlMMJtNimJ6dYkowFFvp4kKs=" crossorigin="anonymous">
"""
OUTPUT_OK = (
STYLE
+ """
<div class="container">
<div class="row"><h1 style="text-align: center">The speakers are</h1></div>
<div class="row"><h1 class="display-1 text-success" style="text-align: center">{:.1f}%</h1></div>
<div class="row"><h1 style="text-align: center">similar</h1></div>
<div class="row"><h1 class="text-success" style="text-align: center">Welcome, human!</h1></div>
<div class="row"><small style="text-align: center">(You must get at least 85% to be considered the same person)</small><div class="row">
</div>
"""
)
OUTPUT_FAIL = (
STYLE
+ """
<div class="container">
<div class="row"><h1 style="text-align: center">The speakers are</h1></div>
<div class="row"><h1 class="display-1 text-danger" style="text-align: center">{:.1f}%</h1></div>
<div class="row"><h1 style="text-align: center">similar</h1></div>
<div class="row"><h1 class="text-danger" style="text-align: center">You shall not pass!</h1></div>
<div class="row"><small style="text-align: center">(You must get at least 85% to be considered the same person)</small><div class="row">
</div>
"""
)
EFFECTS = [
['remix', '-'], # to merge all the channels
["channels", "1"], # channel-->mono
["rate", "16000"], # resample to 16000 Hz
["gain", "-1.0"], # Attenuation -1 dB
["silence", "1", "0.1", "0.1%", "-1", "0.1", "0.1%"],
# ['pad', '0', '1.5'], # for adding 1.5 seconds at the end
['trim', '0', '10'], # get the first 10 seconds
]
# Setting the threshold value
THRESHOLD = 0.85
model = SpeechEmbedder().to(device)
e = 500
batch_id = 112
save_model_filename = "final_epoch_" + str(e) + "_batch_id_" + str(batch_id + 1) + ".model"
# Load the model
# -------------------------
model.load_state_dict(torch.load(save_model_filename))
model.eval()
inputs = [
gr.inputs.Audio(source="microphone", type="filepath", optional=True, label="Speaker #1"),
gr.inputs.Audio(source="microphone", type="filepath", optional=True, label="Speaker #2"),
#"text",
#"text",
]
# path1 = 'samples/SA1_timit_train_DR7_MWRP0.WAV'
# path2 = 'samples/*.WAV'
# similarity_fn(path1, path2)
#output = gr.outputs.Textbox(label="Output Text")
output = gr.outputs.HTML(label="")
description = ("This app evaluates whether the given audio speech inputs belong to the same individual based on Cosine Similarity score.")
path = os.getcwd()
print(path)
examples = [
["samples/SA1_timit_train_DR7_MTLC0.WAV", "samples/SA1_timit_train_DR7_MWRP0.WAV"],
["samples/SA1_timit_train_DR7_MTLC0.WAV", "samples/SA1_timit_train_DR8_FCLT0.WAV"],
["samples/SA1_timit_train_DR7_MWRP0.WAV", "samples/SA2_timit_train_DR7_MWRP0.WAV"],
["samples/SA1_timit_train_DR8_FCLT0.WAV", "samples/SA2_timit_train_DR8_FCLT0.WAV"],
["samples/SA1_timit_train_DR8_FNKL0.WAV", "samples/SA2_timit_train_DR8_FNKL0.WAV"],
["samples/SA1_timit_train_DR8_FNKL0.WAV", "samples/SA1_timit_train_DR8_MCXM0.WAV"],
["samples/SA1_timit_train_DR7_MWRP0.WAV", "samples/cate_blanch_3.mp3"],
["samples/cate_blanch.mp3", "samples/cate_blanch_3.mp3"],
["samples/cate_blanch.mp3", "samples/leonardo_dicaprio.mp3"],
["samples/heath_ledger.mp3", "samples/heath_ledger_3.mp3"],
["samples/russel_crowe.mp3", "samples/russel_crowe_2.mp3"],
]
interface = gr.Interface(
fn=similarity_fn,
inputs=inputs,
outputs=output,
title="Voice Authentication with ECAPA-TDNN",
description=description,
layout="horizontal",
theme="grass",
allow_flagging=False,
live=False,
examples=examples,
)
interface.launch(enable_queue=True)(share=True)