Spaces:
Running
Running
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| class ConformerEncoder(nn.Module): | |
| def __init__(self, idim, n_layers, n_head, d_model, | |
| residual_dropout=0.1, dropout_rate=0.1, kernel_size=33, | |
| pe_maxlen=5000): | |
| super().__init__() | |
| self.odim = d_model | |
| self.input_preprocessor = Conv2dSubsampling(idim, d_model) | |
| self.positional_encoding = RelPositionalEncoding(d_model) | |
| self.dropout = nn.Dropout(residual_dropout) | |
| self.layer_stack = nn.ModuleList() | |
| for l in range(n_layers): | |
| block = RelPosEmbConformerBlock(d_model, n_head, | |
| residual_dropout, | |
| dropout_rate, kernel_size) | |
| self.layer_stack.append(block) | |
| def forward(self, padded_input, input_lengths, pad=True): | |
| if pad: | |
| padded_input = F.pad(padded_input, | |
| (0, 0, 0, self.input_preprocessor.context - 1), 'constant', 0.0) | |
| src_mask = self.padding_position_is_0(padded_input, input_lengths) | |
| embed_output, input_lengths, src_mask = self.input_preprocessor(padded_input, src_mask) | |
| enc_output = self.dropout(embed_output) | |
| pos_emb = self.dropout(self.positional_encoding(embed_output)) | |
| enc_outputs = [] | |
| for enc_layer in self.layer_stack: | |
| enc_output = enc_layer(enc_output, pos_emb, slf_attn_mask=src_mask, | |
| pad_mask=src_mask) | |
| enc_outputs.append(enc_output) | |
| return enc_output, input_lengths, src_mask | |
| def padding_position_is_0(self, padded_input, input_lengths): | |
| N, T = padded_input.size()[:2] | |
| mask = torch.ones((N, T)).to(padded_input.device) | |
| for i in range(N): | |
| mask[i, input_lengths[i]:] = 0 | |
| mask = mask.unsqueeze(dim=1) | |
| return mask.to(torch.uint8) | |
| class RelPosEmbConformerBlock(nn.Module): | |
| def __init__(self, d_model, n_head, | |
| residual_dropout=0.1, | |
| dropout_rate=0.1, kernel_size=33): | |
| super().__init__() | |
| self.ffn1 = ConformerFeedForward(d_model, dropout_rate) | |
| self.mhsa = RelPosMultiHeadAttention(n_head, d_model, | |
| residual_dropout) | |
| self.conv = ConformerConvolution(d_model, kernel_size, | |
| dropout_rate) | |
| self.ffn2 = ConformerFeedForward(d_model, dropout_rate) | |
| self.layer_norm = nn.LayerNorm(d_model) | |
| def forward(self, x, pos_emb, slf_attn_mask=None, pad_mask=None): | |
| out = 0.5 * x + 0.5 * self.ffn1(x) | |
| out = self.mhsa(out, out, out, pos_emb, mask=slf_attn_mask)[0] | |
| out = self.conv(out, pad_mask) | |
| out = 0.5 * out + 0.5 * self.ffn2(out) | |
| out = self.layer_norm(out) | |
| return out | |
| class Swish(nn.Module): | |
| def forward(self, x): | |
| return x * torch.sigmoid(x) | |
| class Conv2dSubsampling(nn.Module): | |
| def __init__(self, idim, d_model, out_channels=32): | |
| super().__init__() | |
| self.conv = nn.Sequential( | |
| nn.Conv2d(1, out_channels, 3, 2), | |
| nn.ReLU(), | |
| nn.Conv2d(out_channels, out_channels, 3, 2), | |
| nn.ReLU(), | |
| ) | |
| subsample_idim = ((idim - 1) // 2 - 1) // 2 | |
| self.out = nn.Linear(out_channels * subsample_idim, d_model) | |
| self.subsampling = 4 | |
| left_context = right_context = 3 # both exclude currect frame | |
| self.context = left_context + 1 + right_context # 7 | |
| def forward(self, x, x_mask): | |
| x = x.unsqueeze(1) | |
| x = self.conv(x) | |
| N, C, T, D = x.size() | |
| x = self.out(x.transpose(1, 2).contiguous().view(N, T, C * D)) | |
| mask = x_mask[:, :, :-2:2][:, :, :-2:2] | |
| input_lengths = mask[:, -1, :].sum(dim=-1) | |
| return x, input_lengths, mask | |
| class RelPositionalEncoding(torch.nn.Module): | |
| def __init__(self, d_model, max_len=5000): | |
| super().__init__() | |
| pe_positive = torch.zeros(max_len, d_model, requires_grad=False) | |
| pe_negative = torch.zeros(max_len, d_model, requires_grad=False) | |
| position = torch.arange(0, max_len).unsqueeze(1).float() | |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * | |
| -(torch.log(torch.tensor(10000.0)).item()/d_model)) | |
| pe_positive[:, 0::2] = torch.sin(position * div_term) | |
| pe_positive[:, 1::2] = torch.cos(position * div_term) | |
| pe_negative[:, 0::2] = torch.sin(-1 * position * div_term) | |
| pe_negative[:, 1::2] = torch.cos(-1 * position * div_term) | |
| pe_positive = torch.flip(pe_positive, [0]).unsqueeze(0) | |
| pe_negative = pe_negative[1:].unsqueeze(0) | |
| pe = torch.cat([pe_positive, pe_negative], dim=1) | |
| self.register_buffer('pe', pe) | |
| def forward(self, x): | |
| # Tmax = 2 * max_len - 1 | |
| Tmax, T = self.pe.size(1), x.size(1) | |
| pos_emb = self.pe[:, Tmax // 2 - T + 1 : Tmax // 2 + T].clone().detach() | |
| return pos_emb | |
| class ConformerFeedForward(nn.Module): | |
| def __init__(self, d_model, dropout_rate=0.1): | |
| super().__init__() | |
| pre_layer_norm = nn.LayerNorm(d_model) | |
| linear_expand = nn.Linear(d_model, d_model*4) | |
| nonlinear = Swish() | |
| dropout_pre = nn.Dropout(dropout_rate) | |
| linear_project = nn.Linear(d_model*4, d_model) | |
| dropout_post = nn.Dropout(dropout_rate) | |
| self.net = nn.Sequential(pre_layer_norm, | |
| linear_expand, | |
| nonlinear, | |
| dropout_pre, | |
| linear_project, | |
| dropout_post) | |
| def forward(self, x): | |
| residual = x | |
| output = self.net(x) | |
| output = output + residual | |
| return output | |
| class ConformerConvolution(nn.Module): | |
| def __init__(self, d_model, kernel_size=33, dropout_rate=0.1): | |
| super().__init__() | |
| assert kernel_size % 2 == 1 | |
| self.pre_layer_norm = nn.LayerNorm(d_model) | |
| self.pointwise_conv1 = nn.Conv1d(d_model, d_model*4, kernel_size=1, bias=False) | |
| self.glu = F.glu | |
| self.padding = (kernel_size - 1) // 2 | |
| self.depthwise_conv = nn.Conv1d(d_model*2, d_model*2, | |
| kernel_size, stride=1, | |
| padding=self.padding, | |
| groups=d_model*2, bias=False) | |
| self.batch_norm = nn.LayerNorm(d_model*2) | |
| self.swish = Swish() | |
| self.pointwise_conv2 = nn.Conv1d(d_model*2, d_model, kernel_size=1, bias=False) | |
| self.dropout = nn.Dropout(dropout_rate) | |
| def forward(self, x, mask=None): | |
| residual = x | |
| out = self.pre_layer_norm(x) | |
| out = out.transpose(1, 2) | |
| if mask is not None: | |
| out.masked_fill_(mask.ne(1), 0.0) | |
| out = self.pointwise_conv1(out) | |
| out = F.glu(out, dim=1) | |
| out = self.depthwise_conv(out) | |
| out = out.transpose(1, 2) | |
| out = self.swish(self.batch_norm(out)) | |
| out = out.transpose(1, 2) | |
| out = self.dropout(self.pointwise_conv2(out)) | |
| if mask is not None: | |
| out.masked_fill_(mask.ne(1), 0.0) | |
| out = out.transpose(1, 2) | |
| return out + residual | |
| class EncoderMultiHeadAttention(nn.Module): | |
| def __init__(self, n_head, d_model, | |
| residual_dropout=0.1): | |
| super().__init__() | |
| assert d_model % n_head == 0 | |
| self.n_head = n_head | |
| self.d_k = d_model // n_head | |
| self.d_v = self.d_k | |
| self.w_qs = nn.Linear(d_model, n_head * self.d_k, bias=False) | |
| self.w_ks = nn.Linear(d_model, n_head * self.d_k, bias=False) | |
| self.w_vs = nn.Linear(d_model, n_head * self.d_v, bias=False) | |
| self.layer_norm_q = nn.LayerNorm(d_model) | |
| self.layer_norm_k = nn.LayerNorm(d_model) | |
| self.layer_norm_v = nn.LayerNorm(d_model) | |
| self.attention = ScaledDotProductAttention(temperature=self.d_k ** 0.5) | |
| self.fc = nn.Linear(n_head * self.d_v, d_model, bias=False) | |
| self.dropout = nn.Dropout(residual_dropout) | |
| def forward(self, q, k, v, mask=None): | |
| sz_b, len_q = q.size(0), q.size(1) | |
| residual = q | |
| q, k, v = self.forward_qkv(q, k, v) | |
| output, attn = self.attention(q, k, v, mask=mask) | |
| output = self.forward_output(output, residual, sz_b, len_q) | |
| return output, attn | |
| def forward_qkv(self, q, k, v): | |
| d_k, d_v, n_head = self.d_k, self.d_v, self.n_head | |
| sz_b, len_q, len_k, len_v = q.size(0), q.size(1), k.size(1), v.size(1) | |
| q = self.layer_norm_q(q) | |
| k = self.layer_norm_k(k) | |
| v = self.layer_norm_v(v) | |
| q = self.w_qs(q).view(sz_b, len_q, n_head, d_k) | |
| k = self.w_ks(k).view(sz_b, len_k, n_head, d_k) | |
| v = self.w_vs(v).view(sz_b, len_v, n_head, d_v) | |
| q = q.transpose(1, 2) | |
| k = k.transpose(1, 2) | |
| v = v.transpose(1, 2) | |
| return q, k, v | |
| def forward_output(self, output, residual, sz_b, len_q): | |
| output = output.transpose(1, 2).contiguous().view(sz_b, len_q, -1) | |
| fc_out = self.fc(output) | |
| output = self.dropout(fc_out) | |
| output = output + residual | |
| return output | |
| class ScaledDotProductAttention(nn.Module): | |
| def __init__(self, temperature): | |
| super().__init__() | |
| self.temperature = temperature | |
| self.dropout = nn.Dropout(0.0) | |
| self.INF = float('inf') | |
| def forward(self, q, k, v, mask=None): | |
| attn = torch.matmul(q, k.transpose(2, 3)) / self.temperature | |
| output, attn = self.forward_attention(attn, v, mask) | |
| return output, attn | |
| def forward_attention(self, attn, v, mask=None): | |
| if mask is not None: | |
| mask = mask.unsqueeze(1) | |
| mask = mask.eq(0) | |
| attn = attn.masked_fill(mask, -self.INF) | |
| attn = torch.softmax(attn, dim=-1).masked_fill(mask, 0.0) | |
| else: | |
| attn = torch.softmax(attn, dim=-1) | |
| d_attn = self.dropout(attn) | |
| output = torch.matmul(d_attn, v) | |
| return output, attn | |
| class RelPosMultiHeadAttention(EncoderMultiHeadAttention): | |
| def __init__(self, n_head, d_model, | |
| residual_dropout=0.1): | |
| super().__init__(n_head, d_model, | |
| residual_dropout) | |
| d_k = d_model // n_head | |
| self.scale = 1.0 / (d_k ** 0.5) | |
| self.linear_pos = nn.Linear(d_model, n_head * d_k, bias=False) | |
| self.pos_bias_u = nn.Parameter(torch.FloatTensor(n_head, d_k)) | |
| self.pos_bias_v = nn.Parameter(torch.FloatTensor(n_head, d_k)) | |
| torch.nn.init.xavier_uniform_(self.pos_bias_u) | |
| torch.nn.init.xavier_uniform_(self.pos_bias_v) | |
| def _rel_shift(self, x): | |
| N, H, T1, T2 = x.size() | |
| zero_pad = torch.zeros((N, H, T1, 1), device=x.device, dtype=x.dtype) | |
| x_padded = torch.cat([zero_pad, x], dim=-1) | |
| x_padded = x_padded.view(N, H, T2 + 1, T1) | |
| x = x_padded[:, :, 1:].view_as(x) | |
| x = x[:, :, :, : x.size(-1) // 2 + 1] | |
| return x | |
| def forward(self, q, k, v, pos_emb, mask=None): | |
| sz_b, len_q = q.size(0), q.size(1) | |
| residual = q | |
| q, k, v = self.forward_qkv(q, k, v) | |
| q = q.transpose(1, 2) | |
| n_batch_pos = pos_emb.size(0) | |
| p = self.linear_pos(pos_emb).view(n_batch_pos, -1, self.n_head, self.d_k) | |
| p = p.transpose(1, 2) | |
| q_with_bias_u = (q + self.pos_bias_u).transpose(1, 2) | |
| q_with_bias_v = (q + self.pos_bias_v).transpose(1, 2) | |
| matrix_ac = torch.matmul(q_with_bias_u, k.transpose(-2, -1)) | |
| matrix_bd = torch.matmul(q_with_bias_v, p.transpose(-2, -1)) | |
| matrix_bd = self._rel_shift(matrix_bd) | |
| attn_scores = matrix_ac + matrix_bd | |
| attn_scores.mul_(self.scale) | |
| output, attn = self.attention.forward_attention(attn_scores, v, mask=mask) | |
| output = self.forward_output(output, residual, sz_b, len_q) | |
| return output, attn | |