""" BTC Toolkit - Arquitectura del Modelo Oficial (Versión Dinámica) Implementación del Bidirectional Transformer for Musical Chord Recognition (BTC) basada en la arquitectura del paper original (ISMIR 2019) y su checkpoint pre-entrenado. Soporta longitud de secuencia dinámica para evitar desajustes en el tamaño de los tensores. """ import math import numpy as np import torch import torch.nn as nn import torch.nn.functional as F # ========================================== # VOCABULARIO DE ACORDES (25 clases) # ========================================== CHORD_VOCAB = ['N'] + [ 'C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B', 'Cm', 'C#m', 'Dm', 'D#m', 'Em', 'Fm', 'F#m', 'Gm', 'G#m', 'Am', 'A#m', 'Bm' ] NUM_CHORDS = len(CHORD_VOCAB) # 25 def _gen_bias_mask(max_length): """Generates bias values (-Inf) to mask future timesteps during attention.""" np_mask = np.triu(np.full([max_length, max_length], -np.inf), 1) torch_mask = torch.from_numpy(np_mask).type(torch.FloatTensor) return torch_mask.unsqueeze(0).unsqueeze(1) def _gen_timing_signal(length, channels, min_timescale=1.0, max_timescale=1.0e4): """Generates a [1, length, channels] timing signal consisting of sinusoids.""" position = np.arange(length) num_timescales = channels // 2 log_timescale_increment = ( math.log(float(max_timescale) / float(min_timescale)) / (float(num_timescales) - 1)) inv_timescales = min_timescale * np.exp( np.arange(num_timescales).astype(float) * -log_timescale_increment) scaled_time = np.expand_dims(position, 1) * np.expand_dims(inv_timescales, 0) signal = np.concatenate([np.sin(scaled_time), np.cos(scaled_time)], axis=1) signal = np.pad(signal, [[0, 0], [0, channels % 2]], 'constant', constant_values=[0.0, 0.0]) signal = signal.reshape([1, length, channels]) return torch.from_numpy(signal).type(torch.FloatTensor) class LayerNorm(nn.Module): def __init__(self, features, eps=1e-6): super(LayerNorm, self).__init__() self.gamma = nn.Parameter(torch.ones(features)) self.beta = nn.Parameter(torch.zeros(features)) self.eps = eps def forward(self, x): mean = x.mean(-1, keepdim=True) std = x.std(-1, keepdim=True) return self.gamma * (x - mean) / (std + self.eps) + self.beta class OutputLayer(nn.Module): def __init__(self, hidden_size, output_size, probs_out=False): super(OutputLayer, self).__init__() self.output_size = output_size self.output_projection = nn.Linear(hidden_size, output_size) self.probs_out = probs_out self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=int(hidden_size/2), batch_first=True, bidirectional=True) self.hidden_size = hidden_size def loss(self, hidden, labels): raise NotImplementedError('Must implement {}.loss'.format(self.__class__.__name__)) class SoftmaxOutputLayer(OutputLayer): def forward(self, hidden): logits = self.output_projection(hidden) probs = F.softmax(logits, -1) topk, indices = torch.topk(probs, 2) predictions = indices[:, :, 0] second = indices[:, :, 1] if self.probs_out is True: return logits return predictions, second def loss(self, hidden, labels): logits = self.output_projection(hidden) log_probs = F.log_softmax(logits, -1) return F.nll_loss(log_probs.view(-1, self.output_size), labels.view(-1)) class MultiHeadAttention(nn.Module): def __init__(self, input_depth, total_key_depth, total_value_depth, output_depth, num_heads, bias_mask=None, dropout=0.0, attention_map=False): super(MultiHeadAttention, self).__init__() if total_key_depth % num_heads != 0: raise ValueError("Key depth (%d) must be divisible by the number of attention heads (%d)." % (total_key_depth, num_heads)) if total_value_depth % num_heads != 0: raise ValueError("Value depth (%d) must be divisible by the number of attention heads (%d)." % (total_value_depth, num_heads)) self.attention_map = attention_map self.num_heads = num_heads self.query_scale = (total_key_depth // num_heads) ** -0.5 self.bias_mask = bias_mask self.query_linear = nn.Linear(input_depth, total_key_depth, bias=False) self.key_linear = nn.Linear(input_depth, total_key_depth, bias=False) self.value_linear = nn.Linear(input_depth, total_value_depth, bias=False) self.output_linear = nn.Linear(total_value_depth, output_depth, bias=False) self.dropout = nn.Dropout(dropout) def _split_heads(self, x): if len(x.shape) != 3: raise ValueError("x must have rank 3") shape = x.shape return x.view(shape[0], shape[1], self.num_heads, shape[2] // self.num_heads).permute(0, 2, 1, 3) def _merge_heads(self, x): if len(x.shape) != 4: raise ValueError("x must have rank 4") shape = x.shape return x.permute(0, 2, 1, 3).contiguous().view(shape[0], shape[2], shape[3] * self.num_heads) def forward(self, queries, keys, values, bias_mask=None): queries = self.query_linear(queries) keys = self.key_linear(keys) values = self.value_linear(values) queries = self._split_heads(queries) keys = self._split_heads(keys) values = self._split_heads(values) queries *= self.query_scale logits = torch.matmul(queries, keys.permute(0, 1, 3, 2)) # Utilizar la máscara dinámica si se provee, sino la estática mask = bias_mask if bias_mask is not None else self.bias_mask if mask is not None: logits += mask[:, :, :logits.shape[-2], :logits.shape[-1]].type_as(logits.data) weights = nn.functional.softmax(logits, dim=-1) weights = self.dropout(weights) contexts = torch.matmul(weights, values) contexts = self._merge_heads(contexts) outputs = self.output_linear(contexts) if self.attention_map is True: return outputs, weights return outputs class Conv(nn.Module): def __init__(self, input_size, output_size, kernel_size, pad_type): super(Conv, self).__init__() padding = (kernel_size - 1, 0) if pad_type == 'left' else (kernel_size // 2, (kernel_size - 1) // 2) self.pad = nn.ConstantPad1d(padding, 0) self.conv = nn.Conv1d(input_size, output_size, kernel_size=kernel_size, padding=0) def forward(self, inputs): inputs = self.pad(inputs.permute(0, 2, 1)) outputs = self.conv(inputs).permute(0, 2, 1) return outputs class PositionwiseFeedForward(nn.Module): def __init__(self, input_depth, filter_size, output_depth, layer_config='ll', padding='left', dropout=0.0): super(PositionwiseFeedForward, self).__init__() layers = [] sizes = ([(input_depth, filter_size)] + [(filter_size, filter_size)] * (len(layer_config) - 2) + [(filter_size, output_depth)]) for lc, s in zip(list(layer_config), sizes): if lc == 'l': layers.append(nn.Linear(*s)) elif lc == 'c': layers.append(Conv(*s, kernel_size=3, pad_type=padding)) else: raise ValueError("Unknown layer type {}".format(lc)) self.layers = nn.ModuleList(layers) self.relu = nn.ReLU() self.dropout = nn.Dropout(dropout) def forward(self, inputs): x = inputs for i, layer in enumerate(self.layers): x = layer(x) if i < len(self.layers): x = self.relu(x) x = self.dropout(x) return x class self_attention_block(nn.Module): def __init__(self, hidden_size, total_key_depth, total_value_depth, filter_size, num_heads, bias_mask=None, layer_dropout=0.0, attention_dropout=0.0, relu_dropout=0.0, attention_map=False): super(self_attention_block, self).__init__() self.attention_map = attention_map self.multi_head_attention = MultiHeadAttention(hidden_size, total_key_depth, total_value_depth, hidden_size, num_heads, bias_mask, attention_dropout, attention_map) self.positionwise_convolution = PositionwiseFeedForward(hidden_size, filter_size, hidden_size, layer_config='cc', padding='both', dropout=relu_dropout) self.dropout = nn.Dropout(layer_dropout) self.layer_norm_mha = LayerNorm(hidden_size) self.layer_norm_ffn = LayerNorm(hidden_size) def forward(self, inputs, bias_mask=None): x = inputs x_norm = self.layer_norm_mha(x) if self.attention_map is True: y, weights = self.multi_head_attention(x_norm, x_norm, x_norm, bias_mask=bias_mask) else: y = self.multi_head_attention(x_norm, x_norm, x_norm, bias_mask=bias_mask) x = self.dropout(x + y) x_norm = self.layer_norm_ffn(x) y = self.positionwise_convolution(x_norm) y = self.dropout(x + y) if self.attention_map is True: return y, weights return y class bi_directional_self_attention(nn.Module): def __init__(self, hidden_size, total_key_depth, total_value_depth, filter_size, num_heads, max_length, layer_dropout=0.0, attention_dropout=0.0, relu_dropout=0.0): super(bi_directional_self_attention, self).__init__() self.weights_list = list() params = (hidden_size, total_key_depth or hidden_size, total_value_depth or hidden_size, filter_size, num_heads, None, # La máscara se generará dinámicamente en forward layer_dropout, attention_dropout, relu_dropout, True) self.attn_block = self_attention_block(*params) params = (hidden_size, total_key_depth or hidden_size, total_value_depth or hidden_size, filter_size, num_heads, None, # La máscara se generará dinámicamente en forward layer_dropout, attention_dropout, relu_dropout, True) self.backward_attn_block = self_attention_block(*params) self.linear = nn.Linear(hidden_size*2, hidden_size) def forward(self, inputs): x, list_weights = inputs L = x.shape[1] # Generar máscaras dinámicas para la longitud de secuencia actual forward_mask = _gen_bias_mask(L).type_as(x) backward_mask = torch.transpose(forward_mask, dim0=2, dim1=3) # Forward Self-attention Block encoder_outputs, weights = self.attn_block(x, bias_mask=forward_mask) # Backward Self-attention Block reverse_outputs, reverse_weights = self.backward_attn_block(x, bias_mask=backward_mask) outputs = torch.cat((encoder_outputs, reverse_outputs), dim=2) y = self.linear(outputs) self.weights_list = list_weights self.weights_list.append(weights) self.weights_list.append(reverse_weights) return y, self.weights_list class bi_directional_self_attention_layers(nn.Module): def __init__(self, embedding_size, hidden_size, num_layers, num_heads, total_key_depth, total_value_depth, filter_size, max_length=100, input_dropout=0.0, layer_dropout=0.0, attention_dropout=0.0, relu_dropout=0.0): super(bi_directional_self_attention_layers, self).__init__() params = (hidden_size, total_key_depth or hidden_size, total_value_depth or hidden_size, filter_size, num_heads, max_length, layer_dropout, attention_dropout, relu_dropout) self.embedding_proj = nn.Linear(embedding_size, hidden_size, bias=False) self.self_attn_layers = nn.Sequential(*[bi_directional_self_attention(*params) for l in range(num_layers)]) self.layer_norm = LayerNorm(hidden_size) self.input_dropout = nn.Dropout(input_dropout) def forward(self, inputs): x = self.input_dropout(inputs) x = self.embedding_proj(x) # Generar señal de tiempo (timing signal) dinámicamente para evitar desajuste de dimensiones timing_signal = _gen_timing_signal(x.shape[1], x.shape[2]).type_as(x) x += timing_signal y, weights_list = self.self_attn_layers((x, [])) y = self.layer_norm(y) return y, weights_list class BTCModel(nn.Module): """ Bidirectional Transformer for Chord Recognition (Official Architecture Wrapper). """ def __init__(self, n_freq: int = 144): super().__init__() config = { 'feature_size': n_freq, 'hidden_size': 128, 'num_layers': 8, 'num_heads': 4, 'total_key_depth': 128, 'total_value_depth': 128, 'filter_size': 128, 'timestep': 108, 'input_dropout': 0.0, 'layer_dropout': 0.0, 'attention_dropout': 0.0, 'relu_dropout': 0.0, 'num_chords': NUM_CHORDS, 'probs_out': True } params = (config['feature_size'], config['hidden_size'], config['num_layers'], config['num_heads'], config['total_key_depth'], config['total_value_depth'], config['filter_size'], config['timestep'], config['input_dropout'], config['layer_dropout'], config['attention_dropout'], config['relu_dropout']) self.self_attn_layers = bi_directional_self_attention_layers(*params) self.output_layer = SoftmaxOutputLayer(hidden_size=config['hidden_size'], output_size=config['num_chords'], probs_out=config['probs_out']) def forward(self, x: torch.Tensor) -> torch.Tensor: """ Inferencia de acordes (retorna los logits). Args: x: Tensor de entrada (batch, seq_len, n_freq) Returns: logits: Tensor (batch, seq_len, num_chords) """ # Output of Bi-directional Self-attention Layers self_attn_output, _ = self.self_attn_layers(x) logits = self.output_layer(self_attn_output) return logits