Europarl_Translate / Model.py
SergioMtz's picture
Create new file
7acb46b
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense, Dropout, Embedding, LayerNormalization
from tensorflow.keras.models import Model
import numpy as np
class PositionalEncoding(Layer):
def __init__(self):
super(PositionalEncoding, self).__init__()
def get_angles(self, pos, i, d_model): # pos: (seq_length, 1) i: (1, d_model)
angles = 1 / np.power(10000., (2*(i//2)) / np.float32(d_model))
return pos * angles # (seq_length, d_model)
def call(self, inputs):
seq_length = inputs.shape.as_list()[-2]
d_model = inputs.shape.as_list()[-1]
angles = self.get_angles(np.arange(seq_length)[:, np.newaxis],
np.arange(d_model)[np.newaxis, :],
d_model)
angles[:, 0::2] = np.sin(angles[:, 0::2])
angles[:, 1::2] = np.cos(angles[:, 1::2])
pos_encoding = angles[np.newaxis, ...]
return inputs + tf.cast(pos_encoding, tf.float32)
def scaled_dot_product_attention(queries, keys, values, mask):
product = tf.matmul(queries, keys, transpose_b = True)
keys_dim = tf.cast(tf.shape(keys)[-1], tf.float32)
scaled_product = product / tf.math.sqrt(keys_dim)
if mask is not None:
scaled_product += (mask * -1e9)
attention = tf.matmul(tf.nn.softmax(scaled_product, axis = -1), values)
return attention
class MultiHeadAttention(Layer):
def __init__(self, nb_proj):
super(MultiHeadAttention, self).__init__()
self.nb_proj = nb_proj
def build(self, input_shape):
self.d_model = input_shape[-1]
assert self.d_model % self.nb_proj == 0
self.d_proj = self.d_model // self.nb_proj
self.query_lin = Dense(units=self.d_model)
self.key_lin = Dense(units=self.d_model)
self.value_lin = Dense(units=self.d_model)
self.final_lin = Dense(units=self.d_model)
def split_proj(self, inputs, batch_size): # inputs: (batch_size, seq_length, d_model)
shape = (batch_size,
-1,
self.nb_proj,
self.d_proj)
splited_inputs = tf.reshape(inputs, shape=shape) # (batch_size, seq_length, nb_proj, d_proj)
return tf.transpose(splited_inputs, perm=[0, 2, 1, 3]) # (batch_size, nb_proj, seq_length, d_proj)
def call(self, queries, keys, values, mask):
batch_size = tf.shape(queries)[0]
queries = self.query_lin(queries)
keys = self.key_lin(keys)
values = self.value_lin(values)
queries = self.split_proj(queries, batch_size)
keys = self.split_proj(keys, batch_size)
values = self.split_proj(values, batch_size)
attention = scaled_dot_product_attention(queries, keys, values, mask)
attention = tf.transpose(attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(attention,
shape=(batch_size, -1, self.d_model))
outputs = self.final_lin(concat_attention)
return outputs
class EncoderLayer(Layer):
def __init__(self, FFN_units, nb_proj, dropout_rate):
super(EncoderLayer, self).__init__()
self.FFN_units = FFN_units
self.nb_proj = nb_proj
self.dropout_rate = dropout_rate
def build(self, input_shape):
self.d_model = input_shape[-1]
self.multi_head_attention = MultiHeadAttention(self.nb_proj)
self.dropout_1 = Dropout(rate=self.dropout_rate)
self.norm_1 = LayerNormalization(epsilon=1e-6)
self.dense_1 = Dense(units=self.FFN_units, activation = "relu")
self.dense_2 = Dense(units=self.d_model)
self.dropout_2 = Dropout(rate=self.dropout_rate)
self.norm_2 = LayerNormalization(epsilon=1e-6)
def call(self, inputs, mask, training):
attention = self.multi_head_attention(inputs,
inputs,
inputs,
mask)
attention = self.dropout_1(attention, training=training)
attention = self.norm_1(attention + inputs)
outputs = self.dense_1(attention)
outputs = self.dense_2(outputs)
outputs = self.dropout_2(outputs, training=training)
outputs = self.norm_2(outputs + attention)
return outputs
class Encoder(Layer):
def __init__(self,
nb_layers,
FFN_units,
nb_proj,
dropout_rate,
vocab_size,
d_model,
name = "Encoder"):
super(Encoder, self).__init__(name = name)
self.nb_layers = nb_layers
self.d_model = d_model
self.embedding = Embedding(vocab_size, d_model)
self.pos_encoding = PositionalEncoding()
self.dropout = Dropout(rate = dropout_rate)
self.enc_layers = [EncoderLayer(FFN_units,
nb_proj,
dropout_rate)
for _ in range(nb_layers)]
def call(self, inputs, mask, training):
outputs = self.embedding(inputs)
outputs *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
outputs = self.pos_encoding(outputs)
outputs = self.dropout(outputs, training)
for i in range(self.nb_layers):
outputs = self.enc_layers[i](outputs, mask, training)
return outputs
class DecoderLayer(Layer):
def __init__(self, FFN_units, nb_proj, dropout_rate):
super(DecoderLayer, self).__init__()
self.FFN_units = FFN_units
self.nb_proj = nb_proj
self.dropout_rate = dropout_rate
def build(self, input_shape):
self.d_model = input_shape[-1]
# Self multi head attention
self.multi_head_attention_1 = MultiHeadAttention(self.nb_proj)
self.dropout_1 = Dropout(rate = self.dropout_rate)
self.norm_1 = LayerNormalization(epsilon=1e-6)
# Multi head attention combinado con la salida del encoder
self.multi_head_attention_2 = MultiHeadAttention(self.nb_proj)
self.dropout_2 = Dropout(rate = self.dropout_rate)
self.norm_2 = LayerNormalization(epsilon = 1e-6)
# Feed foward
self.dense_1 = Dense(units = self.FFN_units,
activation = "relu")
self.dense_2 = Dense(units = self.d_model)
self.dropout_3 = Dropout(rate = self.dropout_rate)
self.norm_3 = LayerNormalization(epsilon=1e-6)
def call(self, inputs, enc_outputs, mask_1, mask_2, training):
attention = self.multi_head_attention_1(inputs,
inputs,
inputs,
mask_1)
attention = self.dropout_1(attention, training)
attention = self.norm_1(attention + inputs)
attention_2 = self.multi_head_attention_2(attention,
enc_outputs,
enc_outputs,
mask_2)
attention_2 = self.dropout_2(attention_2, training)
attention_2 = self.norm_2(attention_2 + attention)
outputs = self.dense_1(attention_2)
outputs = self.dense_2(outputs)
outputs = self.dropout_3(outputs, training)
outputs = self.norm_3(outputs + attention_2)
return outputs
class Decoder(Layer):
def __init__(self,
nb_layers,
FFN_units,
nb_proj,
dropout_rate,
vocab_size,
d_model,
name = "Decoder"):
super(Decoder, self).__init__(name = name)
self.d_model = d_model
self.nb_layers = nb_layers
self.embedding = Embedding(vocab_size, d_model)
self.pos_encoding = PositionalEncoding()
self.dropout = Dropout(rate = dropout_rate)
self.dec_layers = [DecoderLayer(FFN_units,
nb_proj,
dropout_rate)
for _ in range(nb_layers)]
def call(self, inputs, enc_outputs, mask_1, mask_2, training):
outputs = self.embedding(inputs)
outputs *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
outputs = self.pos_encoding(outputs)
outputs = self.dropout(outputs, training)
for i in range(self.nb_layers):
outputs = self.dec_layers[i](outputs,
enc_outputs,
mask_1,
mask_2,
training)
return outputs
class Transformer(Model):
def __init__(self,
vocab_size_enc,
vocab_size_dec,
d_model,
nb_layers,
FFN_units,
nb_proj,
dropout_rate,
name = "Transformer"):
super(Transformer, self).__init__(name=name)
self.encoder = Encoder(nb_layers,
FFN_units,
nb_proj,
dropout_rate,
vocab_size_enc,
d_model)
self.decoder = Decoder(nb_layers,
FFN_units,
nb_proj,
dropout_rate,
vocab_size_dec,
d_model)
self.last_linear = Dense(units=vocab_size_dec, name = "lin_ouput")
def create_padding_mask(self, seq): #seq: (batch_size, seq_length)
mask = tf.cast(tf.math.equal(seq, 0), tf.float32)
return mask[:, tf.newaxis, tf.newaxis, :]
def create_look_ahead_mask(self, seq):
seq_len = tf.shape(seq)[1]
look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
return look_ahead_mask
def call(self, enc_inputs, dec_inputs, training):
enc_mask = self.create_padding_mask(enc_inputs)
dec_mask_1 = tf.maximum(
self.create_padding_mask(dec_inputs),
self.create_look_ahead_mask(dec_inputs)
)
dec_mask_2 = self.create_padding_mask(enc_inputs)
enc_outputs = self.encoder(enc_inputs, enc_mask, training)
dec_outputs = self.decoder(dec_inputs,
enc_outputs,
dec_mask_1,
dec_mask_2,
training)
outputs = self.last_linear(dec_outputs)
return outputs