| import numpy as np |
| import torch |
| import math |
| from torch import nn |
| import torch.nn.functional as F |
|
|
| def get_device(): |
| return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') |
|
|
| def scaled_dot_product(q, k, v, mask=None): |
| d_k = q.size()[-1] |
| scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k) |
| if mask is not None: |
| scaled = scaled.permute(1, 0, 2, 3) + mask |
| scaled = scaled.permute(1, 0, 2, 3) |
| attention = F.softmax(scaled, dim=-1) |
| values = torch.matmul(attention, v) |
| return values, attention |
|
|
| class PositionalEncoding(nn.Module): |
| def __init__(self, d_model, max_sequence_length): |
| super().__init__() |
| self.max_sequence_length = max_sequence_length |
| self.d_model = d_model |
|
|
| def forward(self): |
| even_i = torch.arange(0, self.d_model, 2).float() |
| denominator = torch.pow(10000, even_i/self.d_model) |
| position = (torch.arange(self.max_sequence_length) |
| .reshape(self.max_sequence_length, 1)) |
| even_PE = torch.sin(position / denominator) |
| odd_PE = torch.cos(position / denominator) |
| stacked = torch.stack([even_PE, odd_PE], dim=2) |
| PE = torch.flatten(stacked, start_dim=1, end_dim=2) |
| return PE |
|
|
| class SentenceEmbedding(nn.Module): |
| "For a given sentence, create an embedding" |
| def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN): |
| super().__init__() |
| self.vocab_size = len(language_to_index) |
| self.max_sequence_length = max_sequence_length |
| self.embedding = nn.Embedding(self.vocab_size, d_model) |
| self.language_to_index = language_to_index |
| self.position_encoder = PositionalEncoding(d_model, max_sequence_length) |
| self.dropout = nn.Dropout(p=0.1) |
| self.START_TOKEN = START_TOKEN |
| self.END_TOKEN = END_TOKEN |
| self.PADDING_TOKEN = PADDING_TOKEN |
| |
| def batch_tokenize(self, batch, start_token, end_token): |
|
|
| def tokenize(sentence, start_token, end_token): |
| sentence_word_indicies = [self.language_to_index[token] for token in list(sentence)] |
| if start_token: |
| sentence_word_indicies.insert(0, self.language_to_index[self.START_TOKEN]) |
| if end_token: |
| sentence_word_indicies.append(self.language_to_index[self.END_TOKEN]) |
| for _ in range(len(sentence_word_indicies), self.max_sequence_length): |
| sentence_word_indicies.append(self.language_to_index[self.PADDING_TOKEN]) |
| return torch.tensor(sentence_word_indicies) |
|
|
| tokenized = [] |
| for sentence_num in range(len(batch)): |
| tokenized.append( tokenize(batch[sentence_num], start_token, end_token) ) |
| tokenized = torch.stack(tokenized) |
| return tokenized.to(get_device()) |
| |
| def forward(self, x, start_token, end_token): |
| x = self.batch_tokenize(x, start_token, end_token) |
| x = self.embedding(x) |
| pos = self.position_encoder().to(get_device()) |
| x = self.dropout(x + pos) |
| return x |
|
|
|
|
| class MultiHeadAttention(nn.Module): |
| def __init__(self, d_model, num_heads): |
| super().__init__() |
| self.d_model = d_model |
| self.num_heads = num_heads |
| self.head_dim = d_model // num_heads |
| self.qkv_layer = nn.Linear(d_model , 3 * d_model) |
| self.linear_layer = nn.Linear(d_model, d_model) |
| |
| def forward(self, x, mask): |
| batch_size, sequence_length, d_model = x.size() |
| qkv = self.qkv_layer(x) |
| qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim) |
| qkv = qkv.permute(0, 2, 1, 3) |
| q, k, v = qkv.chunk(3, dim=-1) |
| values, attention = scaled_dot_product(q, k, v, mask) |
| values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, self.num_heads * self.head_dim) |
| out = self.linear_layer(values) |
| return out |
|
|
|
|
| class LayerNormalization(nn.Module): |
| def __init__(self, parameters_shape, eps=1e-5): |
| super().__init__() |
| self.parameters_shape=parameters_shape |
| self.eps=eps |
| self.gamma = nn.Parameter(torch.ones(parameters_shape)) |
| self.beta = nn.Parameter(torch.zeros(parameters_shape)) |
|
|
| def forward(self, inputs): |
| dims = [-(i + 1) for i in range(len(self.parameters_shape))] |
| mean = inputs.mean(dim=dims, keepdim=True) |
| var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True) |
| std = (var + self.eps).sqrt() |
| y = (inputs - mean) / std |
| out = self.gamma * y + self.beta |
| return out |
|
|
| |
| class PositionwiseFeedForward(nn.Module): |
| def __init__(self, d_model, hidden, drop_prob=0.1): |
| super(PositionwiseFeedForward, self).__init__() |
| self.linear1 = nn.Linear(d_model, hidden) |
| self.linear2 = nn.Linear(hidden, d_model) |
| self.relu = nn.ReLU() |
| self.dropout = nn.Dropout(p=drop_prob) |
|
|
| def forward(self, x): |
| x = self.linear1(x) |
| x = self.relu(x) |
| x = self.dropout(x) |
| x = self.linear2(x) |
| return x |
|
|
|
|
| class EncoderLayer(nn.Module): |
| def __init__(self, d_model, ffn_hidden, num_heads, drop_prob): |
| super(EncoderLayer, self).__init__() |
| self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads) |
| self.norm1 = LayerNormalization(parameters_shape=[d_model]) |
| self.dropout1 = nn.Dropout(p=drop_prob) |
| self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob) |
| self.norm2 = LayerNormalization(parameters_shape=[d_model]) |
| self.dropout2 = nn.Dropout(p=drop_prob) |
|
|
| def forward(self, x, self_attention_mask): |
| residual_x = x.clone() |
| x = self.attention(x, mask=self_attention_mask) |
| x = self.dropout1(x) |
| x = self.norm1(x + residual_x) |
| residual_x = x.clone() |
| x = self.ffn(x) |
| x = self.dropout2(x) |
| x = self.norm2(x + residual_x) |
| return x |
| |
| class SequentialEncoder(nn.Sequential): |
| def forward(self, *inputs): |
| x, self_attention_mask = inputs |
| for module in self._modules.values(): |
| x = module(x, self_attention_mask) |
| return x |
|
|
| class Encoder(nn.Module): |
| def __init__(self, |
| d_model, |
| ffn_hidden, |
| num_heads, |
| drop_prob, |
| num_layers, |
| max_sequence_length, |
| language_to_index, |
| START_TOKEN, |
| END_TOKEN, |
| PADDING_TOKEN): |
| super().__init__() |
| self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN) |
| self.layers = SequentialEncoder(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) |
| for _ in range(num_layers)]) |
|
|
| def forward(self, x, self_attention_mask, start_token, end_token): |
| x = self.sentence_embedding(x, start_token, end_token) |
| x = self.layers(x, self_attention_mask) |
| return x |
|
|
|
|
| class MultiHeadCrossAttention(nn.Module): |
| def __init__(self, d_model, num_heads): |
| super().__init__() |
| self.d_model = d_model |
| self.num_heads = num_heads |
| self.head_dim = d_model // num_heads |
| self.kv_layer = nn.Linear(d_model , 2 * d_model) |
| self.q_layer = nn.Linear(d_model , d_model) |
| self.linear_layer = nn.Linear(d_model, d_model) |
| |
| def forward(self, x, y, mask): |
| batch_size, sequence_length, d_model = x.size() |
| kv = self.kv_layer(x) |
| q = self.q_layer(y) |
| kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim) |
| q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim) |
| kv = kv.permute(0, 2, 1, 3) |
| q = q.permute(0, 2, 1, 3) |
| k, v = kv.chunk(2, dim=-1) |
| values, attention = scaled_dot_product(q, k, v, mask) |
| values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, d_model) |
| out = self.linear_layer(values) |
| return out |
|
|
|
|
| class DecoderLayer(nn.Module): |
| def __init__(self, d_model, ffn_hidden, num_heads, drop_prob): |
| super(DecoderLayer, self).__init__() |
| self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads) |
| self.layer_norm1 = LayerNormalization(parameters_shape=[d_model]) |
| self.dropout1 = nn.Dropout(p=drop_prob) |
|
|
| self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads) |
| self.layer_norm2 = LayerNormalization(parameters_shape=[d_model]) |
| self.dropout2 = nn.Dropout(p=drop_prob) |
|
|
| self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob) |
| self.layer_norm3 = LayerNormalization(parameters_shape=[d_model]) |
| self.dropout3 = nn.Dropout(p=drop_prob) |
|
|
| def forward(self, x, y, self_attention_mask, cross_attention_mask): |
| _y = y.clone() |
| y = self.self_attention(y, mask=self_attention_mask) |
| y = self.dropout1(y) |
| y = self.layer_norm1(y + _y) |
|
|
| _y = y.clone() |
| y = self.encoder_decoder_attention(x, y, mask=cross_attention_mask) |
| y = self.dropout2(y) |
| y = self.layer_norm2(y + _y) |
|
|
| _y = y.clone() |
| y = self.ffn(y) |
| y = self.dropout3(y) |
| y = self.layer_norm3(y + _y) |
| return y |
|
|
|
|
| class SequentialDecoder(nn.Sequential): |
| def forward(self, *inputs): |
| x, y, self_attention_mask, cross_attention_mask = inputs |
| for module in self._modules.values(): |
| y = module(x, y, self_attention_mask, cross_attention_mask) |
| return y |
|
|
| class Decoder(nn.Module): |
| def __init__(self, |
| d_model, |
| ffn_hidden, |
| num_heads, |
| drop_prob, |
| num_layers, |
| max_sequence_length, |
| language_to_index, |
| START_TOKEN, |
| END_TOKEN, |
| PADDING_TOKEN): |
| super().__init__() |
| self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN) |
| self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)]) |
|
|
| def forward(self, x, y, self_attention_mask, cross_attention_mask, start_token, end_token): |
| y = self.sentence_embedding(y, start_token, end_token) |
| y = self.layers(x, y, self_attention_mask, cross_attention_mask) |
| return y |
|
|
|
|
| class Transformer(nn.Module): |
| def __init__(self, |
| d_model, |
| ffn_hidden, |
| num_heads, |
| drop_prob, |
| num_layers, |
| max_sequence_length, |
| kn_vocab_size, |
| english_to_index, |
| kannada_to_index, |
| START_TOKEN, |
| END_TOKEN, |
| PADDING_TOKEN |
| ): |
| super().__init__() |
| self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, english_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN) |
| self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, kannada_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN) |
| self.linear = nn.Linear(d_model, kn_vocab_size) |
| self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') |
|
|
| def forward(self, |
| x, |
| y, |
| encoder_self_attention_mask=None, |
| decoder_self_attention_mask=None, |
| decoder_cross_attention_mask=None, |
| enc_start_token=False, |
| enc_end_token=False, |
| dec_start_token=False, |
| dec_end_token=False): |
| x = self.encoder(x, encoder_self_attention_mask, start_token=enc_start_token, end_token=enc_end_token) |
| out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask, start_token=dec_start_token, end_token=dec_end_token) |
| out = self.linear(out) |
| return out |