| ''' |
| This is the ECAPA-TDNN model. |
| This model is modified and combined based on the following three projects: |
| 1. https://github.com/clovaai/voxceleb_trainer/issues/86 |
| 2. https://github.com/lawlict/ECAPA-TDNN/blob/master/ecapa_tdnn.py |
| 3. https://github.com/speechbrain/speechbrain/blob/96077e9a1afff89d3f5ff47cab4bca0202770e4f/speechbrain/lobes/models/ECAPA_TDNN.py |
| |
| ''' |
|
|
| import math, torch, torchaudio |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| class SEModule(nn.Module): |
| def __init__(self, channels, bottleneck=128): |
| super(SEModule, self).__init__() |
| self.se = nn.Sequential( |
| nn.AdaptiveAvgPool1d(1), |
| nn.Conv1d(channels, bottleneck, kernel_size=1, padding=0), |
| nn.ReLU(), |
| |
| nn.Conv1d(bottleneck, channels, kernel_size=1, padding=0), |
| nn.Sigmoid(), |
| ) |
|
|
| def forward(self, input): |
| x = self.se(input) |
| return input * x |
|
|
| class Bottle2neck(nn.Module): |
|
|
| def __init__(self, inplanes, planes, kernel_size=None, dilation=None, scale = 8): |
| super(Bottle2neck, self).__init__() |
| width = int(math.floor(planes / scale)) |
| self.conv1 = nn.Conv1d(inplanes, width*scale, kernel_size=1) |
| self.bn1 = nn.BatchNorm1d(width*scale) |
| self.nums = scale -1 |
| convs = [] |
| bns = [] |
| num_pad = math.floor(kernel_size/2)*dilation |
| for i in range(self.nums): |
| convs.append(nn.Conv1d(width, width, kernel_size=kernel_size, dilation=dilation, padding=num_pad)) |
| bns.append(nn.BatchNorm1d(width)) |
| self.convs = nn.ModuleList(convs) |
| self.bns = nn.ModuleList(bns) |
| self.conv3 = nn.Conv1d(width*scale, planes, kernel_size=1) |
| self.bn3 = nn.BatchNorm1d(planes) |
| self.relu = nn.ReLU() |
| self.width = width |
| self.se = SEModule(planes) |
|
|
| def forward(self, x): |
| residual = x |
| out = self.conv1(x) |
| out = self.relu(out) |
| out = self.bn1(out) |
|
|
| spx = torch.split(out, self.width, 1) |
| for i in range(self.nums): |
| if i==0: |
| sp = spx[i] |
| else: |
| sp = sp + spx[i] |
| sp = self.convs[i](sp) |
| sp = self.relu(sp) |
| sp = self.bns[i](sp) |
| if i==0: |
| out = sp |
| else: |
| out = torch.cat((out, sp), 1) |
| out = torch.cat((out, spx[self.nums]),1) |
|
|
| out = self.conv3(out) |
| out = self.relu(out) |
| out = self.bn3(out) |
| |
| out = self.se(out) |
| out += residual |
| return out |
|
|
| class EcapaTdnnEncoder(nn.Module): |
|
|
| def __init__(self, C): |
|
|
| super(EcapaTdnnEncoder, self).__init__() |
|
|
| |
| |
| self.conv1 = nn.Conv1d(232, C, kernel_size=5, stride=1, padding=2) |
| self.relu = nn.ReLU() |
| self.bn1 = nn.BatchNorm1d(C) |
| self.layer1 = Bottle2neck(C, C, kernel_size=3, dilation=2, scale=8) |
| self.layer2 = Bottle2neck(C, C, kernel_size=3, dilation=3, scale=8) |
| self.layer3 = Bottle2neck(C, C, kernel_size=3, dilation=4, scale=8) |
| |
| self.layer4 = nn.Conv1d(3*C, 1536, kernel_size=1) |
| self.attention = nn.Sequential( |
| nn.Conv1d(4608, 256, kernel_size=1), |
| nn.ReLU(), |
| nn.BatchNorm1d(256), |
| nn.Tanh(), |
| nn.Conv1d(256, 1536, kernel_size=1), |
| nn.Softmax(dim=2), |
| ) |
| self.bn5 = nn.BatchNorm1d(3072) |
| self.fc6 = nn.Linear(3072, 192) |
| self.bn6 = nn.BatchNorm1d(192) |
|
|
|
|
| def forward(self, x): |
| |
| x = self.conv1(x) |
| x = self.relu(x) |
| x = self.bn1(x) |
|
|
| x1 = self.layer1(x) |
| x2 = self.layer2(x+x1) |
| x3 = self.layer3(x+x1+x2) |
|
|
| x = self.layer4(torch.cat((x1,x2,x3),dim=1)) |
| x = self.relu(x) |
|
|
| t = x.size()[-1] |
|
|
| global_x = torch.cat((x,torch.mean(x,dim=2,keepdim=True).repeat(1,1,t), torch.sqrt(torch.var(x,dim=2,keepdim=True).clamp(min=1e-4)).repeat(1,1,t)), dim=1) |
| |
| w = self.attention(global_x) |
|
|
| mu = torch.sum(x * w, dim=2) |
| sg = torch.sqrt( ( torch.sum((x**2) * w, dim=2) - mu**2 ).clamp(min=1e-4) ) |
|
|
| x = torch.cat((mu,sg),1) |
| x = self.bn5(x) |
| x = self.fc6(x) |
| x = self.bn6(x) |
|
|
| return x |
| |
| |
| import torch |
| from transformers import PreTrainedModel, PretrainedConfig |
|
|
|
|
| class ECAPAConfig(PretrainedConfig): |
| model_type = "ecapa_tdnn" |
| def __init__(self, C=1024, **kwargs): |
| super().__init__(**kwargs) |
| self.C = C |
|
|
|
|
|
|
|
|
| class SpeakerEncoder(PreTrainedModel): |
| config_class = ECAPAConfig |
| base_model_prefix = "ecapa_tdnn" |
| def __init__(self, config): |
| super().__init__(config) |
| self.model = EcapaTdnnEncoder(C=config.C) |
| def forward(self, *args, **kwargs): |
| return self.model(*args, **kwargs) |
|
|