GitHub Action
deploy from github actions
9982dba
Raw
History Blame Contribute Delete
14.8 kB
"""
BTC Toolkit - Arquitectura del Modelo Oficial (Versión Dinámica)
Implementación del Bidirectional Transformer for Musical Chord Recognition (BTC)
basada en la arquitectura del paper original (ISMIR 2019) y su checkpoint pre-entrenado.
Soporta longitud de secuencia dinámica para evitar desajustes en el tamaño de los tensores.
"""
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
# ==========================================
# VOCABULARIO DE ACORDES (25 clases)
# ==========================================
CHORD_VOCAB = ['N'] + [
'C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B',
'Cm', 'C#m', 'Dm', 'D#m', 'Em', 'Fm', 'F#m', 'Gm', 'G#m', 'Am', 'A#m', 'Bm'
]
NUM_CHORDS = len(CHORD_VOCAB) # 25
def _gen_bias_mask(max_length):
"""Generates bias values (-Inf) to mask future timesteps during attention."""
np_mask = np.triu(np.full([max_length, max_length], -np.inf), 1)
torch_mask = torch.from_numpy(np_mask).type(torch.FloatTensor)
return torch_mask.unsqueeze(0).unsqueeze(1)
def _gen_timing_signal(length, channels, min_timescale=1.0, max_timescale=1.0e4):
"""Generates a [1, length, channels] timing signal consisting of sinusoids."""
position = np.arange(length)
num_timescales = channels // 2
log_timescale_increment = (
math.log(float(max_timescale) / float(min_timescale)) /
(float(num_timescales) - 1))
inv_timescales = min_timescale * np.exp(
np.arange(num_timescales).astype(float) * -log_timescale_increment)
scaled_time = np.expand_dims(position, 1) * np.expand_dims(inv_timescales, 0)
signal = np.concatenate([np.sin(scaled_time), np.cos(scaled_time)], axis=1)
signal = np.pad(signal, [[0, 0], [0, channels % 2]],
'constant', constant_values=[0.0, 0.0])
signal = signal.reshape([1, length, channels])
return torch.from_numpy(signal).type(torch.FloatTensor)
class LayerNorm(nn.Module):
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
self.gamma = nn.Parameter(torch.ones(features))
self.beta = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.gamma * (x - mean) / (std + self.eps) + self.beta
class OutputLayer(nn.Module):
def __init__(self, hidden_size, output_size, probs_out=False):
super(OutputLayer, self).__init__()
self.output_size = output_size
self.output_projection = nn.Linear(hidden_size, output_size)
self.probs_out = probs_out
self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=int(hidden_size/2), batch_first=True, bidirectional=True)
self.hidden_size = hidden_size
def loss(self, hidden, labels):
raise NotImplementedError('Must implement {}.loss'.format(self.__class__.__name__))
class SoftmaxOutputLayer(OutputLayer):
def forward(self, hidden):
logits = self.output_projection(hidden)
probs = F.softmax(logits, -1)
topk, indices = torch.topk(probs, 2)
predictions = indices[:, :, 0]
second = indices[:, :, 1]
if self.probs_out is True:
return logits
return predictions, second
def loss(self, hidden, labels):
logits = self.output_projection(hidden)
log_probs = F.log_softmax(logits, -1)
return F.nll_loss(log_probs.view(-1, self.output_size), labels.view(-1))
class MultiHeadAttention(nn.Module):
def __init__(self, input_depth, total_key_depth, total_value_depth, output_depth,
num_heads, bias_mask=None, dropout=0.0, attention_map=False):
super(MultiHeadAttention, self).__init__()
if total_key_depth % num_heads != 0:
raise ValueError("Key depth (%d) must be divisible by the number of attention heads (%d)." % (total_key_depth, num_heads))
if total_value_depth % num_heads != 0:
raise ValueError("Value depth (%d) must be divisible by the number of attention heads (%d)." % (total_value_depth, num_heads))
self.attention_map = attention_map
self.num_heads = num_heads
self.query_scale = (total_key_depth // num_heads) ** -0.5
self.bias_mask = bias_mask
self.query_linear = nn.Linear(input_depth, total_key_depth, bias=False)
self.key_linear = nn.Linear(input_depth, total_key_depth, bias=False)
self.value_linear = nn.Linear(input_depth, total_value_depth, bias=False)
self.output_linear = nn.Linear(total_value_depth, output_depth, bias=False)
self.dropout = nn.Dropout(dropout)
def _split_heads(self, x):
if len(x.shape) != 3:
raise ValueError("x must have rank 3")
shape = x.shape
return x.view(shape[0], shape[1], self.num_heads, shape[2] // self.num_heads).permute(0, 2, 1, 3)
def _merge_heads(self, x):
if len(x.shape) != 4:
raise ValueError("x must have rank 4")
shape = x.shape
return x.permute(0, 2, 1, 3).contiguous().view(shape[0], shape[2], shape[3] * self.num_heads)
def forward(self, queries, keys, values, bias_mask=None):
queries = self.query_linear(queries)
keys = self.key_linear(keys)
values = self.value_linear(values)
queries = self._split_heads(queries)
keys = self._split_heads(keys)
values = self._split_heads(values)
queries *= self.query_scale
logits = torch.matmul(queries, keys.permute(0, 1, 3, 2))
# Utilizar la máscara dinámica si se provee, sino la estática
mask = bias_mask if bias_mask is not None else self.bias_mask
if mask is not None:
logits += mask[:, :, :logits.shape[-2], :logits.shape[-1]].type_as(logits.data)
weights = nn.functional.softmax(logits, dim=-1)
weights = self.dropout(weights)
contexts = torch.matmul(weights, values)
contexts = self._merge_heads(contexts)
outputs = self.output_linear(contexts)
if self.attention_map is True:
return outputs, weights
return outputs
class Conv(nn.Module):
def __init__(self, input_size, output_size, kernel_size, pad_type):
super(Conv, self).__init__()
padding = (kernel_size - 1, 0) if pad_type == 'left' else (kernel_size // 2, (kernel_size - 1) // 2)
self.pad = nn.ConstantPad1d(padding, 0)
self.conv = nn.Conv1d(input_size, output_size, kernel_size=kernel_size, padding=0)
def forward(self, inputs):
inputs = self.pad(inputs.permute(0, 2, 1))
outputs = self.conv(inputs).permute(0, 2, 1)
return outputs
class PositionwiseFeedForward(nn.Module):
def __init__(self, input_depth, filter_size, output_depth, layer_config='ll', padding='left', dropout=0.0):
super(PositionwiseFeedForward, self).__init__()
layers = []
sizes = ([(input_depth, filter_size)] +
[(filter_size, filter_size)] * (len(layer_config) - 2) +
[(filter_size, output_depth)])
for lc, s in zip(list(layer_config), sizes):
if lc == 'l':
layers.append(nn.Linear(*s))
elif lc == 'c':
layers.append(Conv(*s, kernel_size=3, pad_type=padding))
else:
raise ValueError("Unknown layer type {}".format(lc))
self.layers = nn.ModuleList(layers)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(dropout)
def forward(self, inputs):
x = inputs
for i, layer in enumerate(self.layers):
x = layer(x)
if i < len(self.layers):
x = self.relu(x)
x = self.dropout(x)
return x
class self_attention_block(nn.Module):
def __init__(self, hidden_size, total_key_depth, total_value_depth, filter_size, num_heads,
bias_mask=None, layer_dropout=0.0, attention_dropout=0.0, relu_dropout=0.0, attention_map=False):
super(self_attention_block, self).__init__()
self.attention_map = attention_map
self.multi_head_attention = MultiHeadAttention(hidden_size, total_key_depth, total_value_depth, hidden_size, num_heads, bias_mask, attention_dropout, attention_map)
self.positionwise_convolution = PositionwiseFeedForward(hidden_size, filter_size, hidden_size, layer_config='cc', padding='both', dropout=relu_dropout)
self.dropout = nn.Dropout(layer_dropout)
self.layer_norm_mha = LayerNorm(hidden_size)
self.layer_norm_ffn = LayerNorm(hidden_size)
def forward(self, inputs, bias_mask=None):
x = inputs
x_norm = self.layer_norm_mha(x)
if self.attention_map is True:
y, weights = self.multi_head_attention(x_norm, x_norm, x_norm, bias_mask=bias_mask)
else:
y = self.multi_head_attention(x_norm, x_norm, x_norm, bias_mask=bias_mask)
x = self.dropout(x + y)
x_norm = self.layer_norm_ffn(x)
y = self.positionwise_convolution(x_norm)
y = self.dropout(x + y)
if self.attention_map is True:
return y, weights
return y
class bi_directional_self_attention(nn.Module):
def __init__(self, hidden_size, total_key_depth, total_value_depth, filter_size, num_heads, max_length,
layer_dropout=0.0, attention_dropout=0.0, relu_dropout=0.0):
super(bi_directional_self_attention, self).__init__()
self.weights_list = list()
params = (hidden_size,
total_key_depth or hidden_size,
total_value_depth or hidden_size,
filter_size,
num_heads,
None, # La máscara se generará dinámicamente en forward
layer_dropout,
attention_dropout,
relu_dropout,
True)
self.attn_block = self_attention_block(*params)
params = (hidden_size,
total_key_depth or hidden_size,
total_value_depth or hidden_size,
filter_size,
num_heads,
None, # La máscara se generará dinámicamente en forward
layer_dropout,
attention_dropout,
relu_dropout,
True)
self.backward_attn_block = self_attention_block(*params)
self.linear = nn.Linear(hidden_size*2, hidden_size)
def forward(self, inputs):
x, list_weights = inputs
L = x.shape[1]
# Generar máscaras dinámicas para la longitud de secuencia actual
forward_mask = _gen_bias_mask(L).type_as(x)
backward_mask = torch.transpose(forward_mask, dim0=2, dim1=3)
# Forward Self-attention Block
encoder_outputs, weights = self.attn_block(x, bias_mask=forward_mask)
# Backward Self-attention Block
reverse_outputs, reverse_weights = self.backward_attn_block(x, bias_mask=backward_mask)
outputs = torch.cat((encoder_outputs, reverse_outputs), dim=2)
y = self.linear(outputs)
self.weights_list = list_weights
self.weights_list.append(weights)
self.weights_list.append(reverse_weights)
return y, self.weights_list
class bi_directional_self_attention_layers(nn.Module):
def __init__(self, embedding_size, hidden_size, num_layers, num_heads, total_key_depth, total_value_depth,
filter_size, max_length=100, input_dropout=0.0, layer_dropout=0.0,
attention_dropout=0.0, relu_dropout=0.0):
super(bi_directional_self_attention_layers, self).__init__()
params = (hidden_size,
total_key_depth or hidden_size,
total_value_depth or hidden_size,
filter_size,
num_heads,
max_length,
layer_dropout,
attention_dropout,
relu_dropout)
self.embedding_proj = nn.Linear(embedding_size, hidden_size, bias=False)
self.self_attn_layers = nn.Sequential(*[bi_directional_self_attention(*params) for l in range(num_layers)])
self.layer_norm = LayerNorm(hidden_size)
self.input_dropout = nn.Dropout(input_dropout)
def forward(self, inputs):
x = self.input_dropout(inputs)
x = self.embedding_proj(x)
# Generar señal de tiempo (timing signal) dinámicamente para evitar desajuste de dimensiones
timing_signal = _gen_timing_signal(x.shape[1], x.shape[2]).type_as(x)
x += timing_signal
y, weights_list = self.self_attn_layers((x, []))
y = self.layer_norm(y)
return y, weights_list
class BTCModel(nn.Module):
"""
Bidirectional Transformer for Chord Recognition (Official Architecture Wrapper).
"""
def __init__(self, n_freq: int = 144):
super().__init__()
config = {
'feature_size': n_freq,
'hidden_size': 128,
'num_layers': 8,
'num_heads': 4,
'total_key_depth': 128,
'total_value_depth': 128,
'filter_size': 128,
'timestep': 108,
'input_dropout': 0.0,
'layer_dropout': 0.0,
'attention_dropout': 0.0,
'relu_dropout': 0.0,
'num_chords': NUM_CHORDS,
'probs_out': True
}
params = (config['feature_size'],
config['hidden_size'],
config['num_layers'],
config['num_heads'],
config['total_key_depth'],
config['total_value_depth'],
config['filter_size'],
config['timestep'],
config['input_dropout'],
config['layer_dropout'],
config['attention_dropout'],
config['relu_dropout'])
self.self_attn_layers = bi_directional_self_attention_layers(*params)
self.output_layer = SoftmaxOutputLayer(hidden_size=config['hidden_size'], output_size=config['num_chords'], probs_out=config['probs_out'])
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Inferencia de acordes (retorna los logits).
Args:
x: Tensor de entrada (batch, seq_len, n_freq)
Returns:
logits: Tensor (batch, seq_len, num_chords)
"""
# Output of Bi-directional Self-attention Layers
self_attn_output, _ = self.self_attn_layers(x)
logits = self.output_layer(self_attn_output)
return logits