Spaces:
Running
Running
| """ | |
| BTC Toolkit - Arquitectura del Modelo Oficial (Versión Dinámica) | |
| Implementación del Bidirectional Transformer for Musical Chord Recognition (BTC) | |
| basada en la arquitectura del paper original (ISMIR 2019) y su checkpoint pre-entrenado. | |
| Soporta longitud de secuencia dinámica para evitar desajustes en el tamaño de los tensores. | |
| """ | |
| import math | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| # ========================================== | |
| # VOCABULARIO DE ACORDES (25 clases) | |
| # ========================================== | |
| CHORD_VOCAB = ['N'] + [ | |
| 'C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B', | |
| 'Cm', 'C#m', 'Dm', 'D#m', 'Em', 'Fm', 'F#m', 'Gm', 'G#m', 'Am', 'A#m', 'Bm' | |
| ] | |
| NUM_CHORDS = len(CHORD_VOCAB) # 25 | |
| def _gen_bias_mask(max_length): | |
| """Generates bias values (-Inf) to mask future timesteps during attention.""" | |
| np_mask = np.triu(np.full([max_length, max_length], -np.inf), 1) | |
| torch_mask = torch.from_numpy(np_mask).type(torch.FloatTensor) | |
| return torch_mask.unsqueeze(0).unsqueeze(1) | |
| def _gen_timing_signal(length, channels, min_timescale=1.0, max_timescale=1.0e4): | |
| """Generates a [1, length, channels] timing signal consisting of sinusoids.""" | |
| position = np.arange(length) | |
| num_timescales = channels // 2 | |
| log_timescale_increment = ( | |
| math.log(float(max_timescale) / float(min_timescale)) / | |
| (float(num_timescales) - 1)) | |
| inv_timescales = min_timescale * np.exp( | |
| np.arange(num_timescales).astype(float) * -log_timescale_increment) | |
| scaled_time = np.expand_dims(position, 1) * np.expand_dims(inv_timescales, 0) | |
| signal = np.concatenate([np.sin(scaled_time), np.cos(scaled_time)], axis=1) | |
| signal = np.pad(signal, [[0, 0], [0, channels % 2]], | |
| 'constant', constant_values=[0.0, 0.0]) | |
| signal = signal.reshape([1, length, channels]) | |
| return torch.from_numpy(signal).type(torch.FloatTensor) | |
| class LayerNorm(nn.Module): | |
| def __init__(self, features, eps=1e-6): | |
| super(LayerNorm, self).__init__() | |
| self.gamma = nn.Parameter(torch.ones(features)) | |
| self.beta = nn.Parameter(torch.zeros(features)) | |
| self.eps = eps | |
| def forward(self, x): | |
| mean = x.mean(-1, keepdim=True) | |
| std = x.std(-1, keepdim=True) | |
| return self.gamma * (x - mean) / (std + self.eps) + self.beta | |
| class OutputLayer(nn.Module): | |
| def __init__(self, hidden_size, output_size, probs_out=False): | |
| super(OutputLayer, self).__init__() | |
| self.output_size = output_size | |
| self.output_projection = nn.Linear(hidden_size, output_size) | |
| self.probs_out = probs_out | |
| self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=int(hidden_size/2), batch_first=True, bidirectional=True) | |
| self.hidden_size = hidden_size | |
| def loss(self, hidden, labels): | |
| raise NotImplementedError('Must implement {}.loss'.format(self.__class__.__name__)) | |
| class SoftmaxOutputLayer(OutputLayer): | |
| def forward(self, hidden): | |
| logits = self.output_projection(hidden) | |
| probs = F.softmax(logits, -1) | |
| topk, indices = torch.topk(probs, 2) | |
| predictions = indices[:, :, 0] | |
| second = indices[:, :, 1] | |
| if self.probs_out is True: | |
| return logits | |
| return predictions, second | |
| def loss(self, hidden, labels): | |
| logits = self.output_projection(hidden) | |
| log_probs = F.log_softmax(logits, -1) | |
| return F.nll_loss(log_probs.view(-1, self.output_size), labels.view(-1)) | |
| class MultiHeadAttention(nn.Module): | |
| def __init__(self, input_depth, total_key_depth, total_value_depth, output_depth, | |
| num_heads, bias_mask=None, dropout=0.0, attention_map=False): | |
| super(MultiHeadAttention, self).__init__() | |
| if total_key_depth % num_heads != 0: | |
| raise ValueError("Key depth (%d) must be divisible by the number of attention heads (%d)." % (total_key_depth, num_heads)) | |
| if total_value_depth % num_heads != 0: | |
| raise ValueError("Value depth (%d) must be divisible by the number of attention heads (%d)." % (total_value_depth, num_heads)) | |
| self.attention_map = attention_map | |
| self.num_heads = num_heads | |
| self.query_scale = (total_key_depth // num_heads) ** -0.5 | |
| self.bias_mask = bias_mask | |
| self.query_linear = nn.Linear(input_depth, total_key_depth, bias=False) | |
| self.key_linear = nn.Linear(input_depth, total_key_depth, bias=False) | |
| self.value_linear = nn.Linear(input_depth, total_value_depth, bias=False) | |
| self.output_linear = nn.Linear(total_value_depth, output_depth, bias=False) | |
| self.dropout = nn.Dropout(dropout) | |
| def _split_heads(self, x): | |
| if len(x.shape) != 3: | |
| raise ValueError("x must have rank 3") | |
| shape = x.shape | |
| return x.view(shape[0], shape[1], self.num_heads, shape[2] // self.num_heads).permute(0, 2, 1, 3) | |
| def _merge_heads(self, x): | |
| if len(x.shape) != 4: | |
| raise ValueError("x must have rank 4") | |
| shape = x.shape | |
| return x.permute(0, 2, 1, 3).contiguous().view(shape[0], shape[2], shape[3] * self.num_heads) | |
| def forward(self, queries, keys, values, bias_mask=None): | |
| queries = self.query_linear(queries) | |
| keys = self.key_linear(keys) | |
| values = self.value_linear(values) | |
| queries = self._split_heads(queries) | |
| keys = self._split_heads(keys) | |
| values = self._split_heads(values) | |
| queries *= self.query_scale | |
| logits = torch.matmul(queries, keys.permute(0, 1, 3, 2)) | |
| # Utilizar la máscara dinámica si se provee, sino la estática | |
| mask = bias_mask if bias_mask is not None else self.bias_mask | |
| if mask is not None: | |
| logits += mask[:, :, :logits.shape[-2], :logits.shape[-1]].type_as(logits.data) | |
| weights = nn.functional.softmax(logits, dim=-1) | |
| weights = self.dropout(weights) | |
| contexts = torch.matmul(weights, values) | |
| contexts = self._merge_heads(contexts) | |
| outputs = self.output_linear(contexts) | |
| if self.attention_map is True: | |
| return outputs, weights | |
| return outputs | |
| class Conv(nn.Module): | |
| def __init__(self, input_size, output_size, kernel_size, pad_type): | |
| super(Conv, self).__init__() | |
| padding = (kernel_size - 1, 0) if pad_type == 'left' else (kernel_size // 2, (kernel_size - 1) // 2) | |
| self.pad = nn.ConstantPad1d(padding, 0) | |
| self.conv = nn.Conv1d(input_size, output_size, kernel_size=kernel_size, padding=0) | |
| def forward(self, inputs): | |
| inputs = self.pad(inputs.permute(0, 2, 1)) | |
| outputs = self.conv(inputs).permute(0, 2, 1) | |
| return outputs | |
| class PositionwiseFeedForward(nn.Module): | |
| def __init__(self, input_depth, filter_size, output_depth, layer_config='ll', padding='left', dropout=0.0): | |
| super(PositionwiseFeedForward, self).__init__() | |
| layers = [] | |
| sizes = ([(input_depth, filter_size)] + | |
| [(filter_size, filter_size)] * (len(layer_config) - 2) + | |
| [(filter_size, output_depth)]) | |
| for lc, s in zip(list(layer_config), sizes): | |
| if lc == 'l': | |
| layers.append(nn.Linear(*s)) | |
| elif lc == 'c': | |
| layers.append(Conv(*s, kernel_size=3, pad_type=padding)) | |
| else: | |
| raise ValueError("Unknown layer type {}".format(lc)) | |
| self.layers = nn.ModuleList(layers) | |
| self.relu = nn.ReLU() | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, inputs): | |
| x = inputs | |
| for i, layer in enumerate(self.layers): | |
| x = layer(x) | |
| if i < len(self.layers): | |
| x = self.relu(x) | |
| x = self.dropout(x) | |
| return x | |
| class self_attention_block(nn.Module): | |
| def __init__(self, hidden_size, total_key_depth, total_value_depth, filter_size, num_heads, | |
| bias_mask=None, layer_dropout=0.0, attention_dropout=0.0, relu_dropout=0.0, attention_map=False): | |
| super(self_attention_block, self).__init__() | |
| self.attention_map = attention_map | |
| self.multi_head_attention = MultiHeadAttention(hidden_size, total_key_depth, total_value_depth, hidden_size, num_heads, bias_mask, attention_dropout, attention_map) | |
| self.positionwise_convolution = PositionwiseFeedForward(hidden_size, filter_size, hidden_size, layer_config='cc', padding='both', dropout=relu_dropout) | |
| self.dropout = nn.Dropout(layer_dropout) | |
| self.layer_norm_mha = LayerNorm(hidden_size) | |
| self.layer_norm_ffn = LayerNorm(hidden_size) | |
| def forward(self, inputs, bias_mask=None): | |
| x = inputs | |
| x_norm = self.layer_norm_mha(x) | |
| if self.attention_map is True: | |
| y, weights = self.multi_head_attention(x_norm, x_norm, x_norm, bias_mask=bias_mask) | |
| else: | |
| y = self.multi_head_attention(x_norm, x_norm, x_norm, bias_mask=bias_mask) | |
| x = self.dropout(x + y) | |
| x_norm = self.layer_norm_ffn(x) | |
| y = self.positionwise_convolution(x_norm) | |
| y = self.dropout(x + y) | |
| if self.attention_map is True: | |
| return y, weights | |
| return y | |
| class bi_directional_self_attention(nn.Module): | |
| def __init__(self, hidden_size, total_key_depth, total_value_depth, filter_size, num_heads, max_length, | |
| layer_dropout=0.0, attention_dropout=0.0, relu_dropout=0.0): | |
| super(bi_directional_self_attention, self).__init__() | |
| self.weights_list = list() | |
| params = (hidden_size, | |
| total_key_depth or hidden_size, | |
| total_value_depth or hidden_size, | |
| filter_size, | |
| num_heads, | |
| None, # La máscara se generará dinámicamente en forward | |
| layer_dropout, | |
| attention_dropout, | |
| relu_dropout, | |
| True) | |
| self.attn_block = self_attention_block(*params) | |
| params = (hidden_size, | |
| total_key_depth or hidden_size, | |
| total_value_depth or hidden_size, | |
| filter_size, | |
| num_heads, | |
| None, # La máscara se generará dinámicamente en forward | |
| layer_dropout, | |
| attention_dropout, | |
| relu_dropout, | |
| True) | |
| self.backward_attn_block = self_attention_block(*params) | |
| self.linear = nn.Linear(hidden_size*2, hidden_size) | |
| def forward(self, inputs): | |
| x, list_weights = inputs | |
| L = x.shape[1] | |
| # Generar máscaras dinámicas para la longitud de secuencia actual | |
| forward_mask = _gen_bias_mask(L).type_as(x) | |
| backward_mask = torch.transpose(forward_mask, dim0=2, dim1=3) | |
| # Forward Self-attention Block | |
| encoder_outputs, weights = self.attn_block(x, bias_mask=forward_mask) | |
| # Backward Self-attention Block | |
| reverse_outputs, reverse_weights = self.backward_attn_block(x, bias_mask=backward_mask) | |
| outputs = torch.cat((encoder_outputs, reverse_outputs), dim=2) | |
| y = self.linear(outputs) | |
| self.weights_list = list_weights | |
| self.weights_list.append(weights) | |
| self.weights_list.append(reverse_weights) | |
| return y, self.weights_list | |
| class bi_directional_self_attention_layers(nn.Module): | |
| def __init__(self, embedding_size, hidden_size, num_layers, num_heads, total_key_depth, total_value_depth, | |
| filter_size, max_length=100, input_dropout=0.0, layer_dropout=0.0, | |
| attention_dropout=0.0, relu_dropout=0.0): | |
| super(bi_directional_self_attention_layers, self).__init__() | |
| params = (hidden_size, | |
| total_key_depth or hidden_size, | |
| total_value_depth or hidden_size, | |
| filter_size, | |
| num_heads, | |
| max_length, | |
| layer_dropout, | |
| attention_dropout, | |
| relu_dropout) | |
| self.embedding_proj = nn.Linear(embedding_size, hidden_size, bias=False) | |
| self.self_attn_layers = nn.Sequential(*[bi_directional_self_attention(*params) for l in range(num_layers)]) | |
| self.layer_norm = LayerNorm(hidden_size) | |
| self.input_dropout = nn.Dropout(input_dropout) | |
| def forward(self, inputs): | |
| x = self.input_dropout(inputs) | |
| x = self.embedding_proj(x) | |
| # Generar señal de tiempo (timing signal) dinámicamente para evitar desajuste de dimensiones | |
| timing_signal = _gen_timing_signal(x.shape[1], x.shape[2]).type_as(x) | |
| x += timing_signal | |
| y, weights_list = self.self_attn_layers((x, [])) | |
| y = self.layer_norm(y) | |
| return y, weights_list | |
| class BTCModel(nn.Module): | |
| """ | |
| Bidirectional Transformer for Chord Recognition (Official Architecture Wrapper). | |
| """ | |
| def __init__(self, n_freq: int = 144): | |
| super().__init__() | |
| config = { | |
| 'feature_size': n_freq, | |
| 'hidden_size': 128, | |
| 'num_layers': 8, | |
| 'num_heads': 4, | |
| 'total_key_depth': 128, | |
| 'total_value_depth': 128, | |
| 'filter_size': 128, | |
| 'timestep': 108, | |
| 'input_dropout': 0.0, | |
| 'layer_dropout': 0.0, | |
| 'attention_dropout': 0.0, | |
| 'relu_dropout': 0.0, | |
| 'num_chords': NUM_CHORDS, | |
| 'probs_out': True | |
| } | |
| params = (config['feature_size'], | |
| config['hidden_size'], | |
| config['num_layers'], | |
| config['num_heads'], | |
| config['total_key_depth'], | |
| config['total_value_depth'], | |
| config['filter_size'], | |
| config['timestep'], | |
| config['input_dropout'], | |
| config['layer_dropout'], | |
| config['attention_dropout'], | |
| config['relu_dropout']) | |
| self.self_attn_layers = bi_directional_self_attention_layers(*params) | |
| self.output_layer = SoftmaxOutputLayer(hidden_size=config['hidden_size'], output_size=config['num_chords'], probs_out=config['probs_out']) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| """ | |
| Inferencia de acordes (retorna los logits). | |
| Args: | |
| x: Tensor de entrada (batch, seq_len, n_freq) | |
| Returns: | |
| logits: Tensor (batch, seq_len, num_chords) | |
| """ | |
| # Output of Bi-directional Self-attention Layers | |
| self_attn_output, _ = self.self_attn_layers(x) | |
| logits = self.output_layer(self_attn_output) | |
| return logits | |