import tensorflow as tf from tensorflow.keras.layers import Dense, Dropout, Embedding, LayerNormalization, Layer, Flatten from tensorflow.keras.models import Model import numpy as np class PositionalEncoder(Layer): def __init__(self, name = "Positional_Encoder"): super(PositionalEncoder, self).__init__(name = name) def get_angles(self, pos, i, d_model): # pos: (seq_length, 1) i: (1, d_model) angles = 1 / np.power(10000., (2*(i//2)) / np.float32(d_model)) return pos * angles # (seq_length, d_model) def call(self, inputs): seq_length = inputs.shape.as_list()[-2] d_model = inputs.shape.as_list()[-1] angles = self.get_angles(np.arange(seq_length)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model) angles[:, 0::2] = np.sin(angles[:, 0::2]) angles[:, 1::2] = np.cos(angles[:, 1::2]) pos_encoding = angles[np.newaxis, ...] return inputs + tf.cast(pos_encoding, tf.float32) class ScaledDotProductAttention(Layer): def __init__(self, name = "Attention"): super(ScaledDotProductAttention, self).__init__(name = name) def call(self, queries, keys, values, mask): product = tf.matmul(queries, keys, transpose_b = True) keys_dim = tf.cast(tf.shape(keys)[-1], dtype = tf.float32) scaled_product = product / tf.math.sqrt(keys_dim) if mask is not None: scaled_product += (mask * -1e9) attention = tf.matmul(tf.nn.softmax(scaled_product, axis = -1), values) return attention class MultiHeadAttention(Layer): def __init__(self, nb_proj, name = "Multi_Head_Attention"): super(MultiHeadAttention, self).__init__(name = name) self.nb_proj = nb_proj def build(self, input_shape): self.d_model = input_shape[-1] assert self.d_model % self.nb_proj == 0 self.d_proj = self.d_model // self.nb_proj self.Query_Dense = Dense(units = self.d_model) self.Key_Dense = Dense(units = self.d_model) self.Value_Dense = Dense(units = self.d_model) self.Final_Dense = Dense(units = self.d_model) self.Attention = ScaledDotProductAttention() def split_proj(self, inputs, batch_size): # inputs: (batch_size, seq_length, d_model) shape = (batch_size, -1, self.nb_proj, self.d_proj) splitted_inputs = tf.reshape(inputs, shape = shape) # (batch_size, seq_length, nb_proj, d_proj) return tf.transpose(splitted_inputs, perm = [0, 2, 1, 3]) # (batch_size, nb_proj, seq_length, d_proj) def call(self, queries, keys, values, mask): batch_size = tf.shape(queries)[0] queries = self.Query_Dense(queries) keys = self.Key_Dense(keys) values = self.Value_Dense(values) queries = self.split_proj(queries, batch_size) keys = self.split_proj(keys, batch_size) values = self.split_proj(values, batch_size) attention = self.Attention(queries, keys, values, mask) attention = tf.transpose(attention, perm = [0, 2, 1, 3]) # (batch_size, seq_length, nb_proj, d_proj) concat_attention = tf.reshape(attention, shape = (batch_size, -1, self.d_model)) outputs = self.Final_Dense(concat_attention) return outputs class EncoderLayer(Layer): def __init__(self, FFN_units, nb_proj, dropout_rate, name = "Encoder_Layer"): super(EncoderLayer, self).__init__(name = name) self.FFN_units = FFN_units self.nb_proj = nb_proj self.dropout_rate = dropout_rate def build(self, input_shape): self.d_model = input_shape[-1] self.multi_head_attention = MultiHeadAttention(self.nb_proj) self.dropout_1 = Dropout(rate = self.dropout_rate) self.norm_1 = LayerNormalization(epsilon = 1e-6) self.Dense_1 = Dense(units = self.FFN_units, activation = "relu") self.Dense_2 = Dense(units = self.d_model) self.dropout_2 = Dropout(rate = self.dropout_rate) self.norm_2 = LayerNormalization(epsilon = 1e-6) def call(self, inputs, mask, training): attention = self.multi_head_attention(inputs, inputs, inputs, mask) attention = self.dropout_1(attention, training) attention = self.norm_1(attention + inputs) outputs = self.Dense_1(attention) outputs = self.Dense_2(outputs) outputs = self.dropout_2(outputs, training) outputs = self.norm_2(outputs + attention) return outputs class Encoder(Layer): def __init__(self, nb_layers, FFN_units, nb_proj, dropout_rate, vocab_size, d_model, name = "Encoder"): super(Encoder, self).__init__(name = name) self.nb_layers = nb_layers self.d_model = d_model self.embedding = Embedding(vocab_size, d_model) self.pos_encoder = PositionalEncoder() self.dropout = Dropout(rate = dropout_rate) self.enc_layers = [EncoderLayer(FFN_units, nb_proj, dropout_rate) for _ in range(nb_layers)] def call(self, inputs, mask, training): outputs = self.embedding(inputs) outputs *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) outputs = self.pos_encoder(outputs) outputs = self.dropout(outputs, training) for i in range(self.nb_layers): outputs = self.enc_layers[i](outputs, mask, training) return outputs class Transformer(Model): def __init__(self, vocab_size_enc, vocab_size_dec, d_model, nb_layers, FFN_units, nb_proj, dropout_rate, name = "Transformer"): super(Transformer, self).__init__(name = name) self.encoder = Encoder(nb_layers, FFN_units, nb_proj, dropout_rate, vocab_size_enc, d_model) self.Flatten = Flatten() self.Last_Dense = Dense(units = vocab_size_dec, activation = "sigmoid", name = "Linear_Output") def create_padding_mask(self, seq): # seq: (batch_size, seq_length) mask = tf.cast(tf.equal(seq, 0), dtype = tf.float32) return mask[:, tf.newaxis, tf.newaxis, :] def create_look_ahead_mask(self, seq): seq_len = tf.shape(seq)[1] look_ahead_mask = 1 - tf.linalg.band_part(tf.ones(shape = (seq_len, seq_len)), -1, 0) return look_ahead_mask def call(self, enc_inputs, training): enc_mask = self.create_padding_mask(enc_inputs) enc_outputs = self.encoder(enc_inputs, enc_mask, training) enc_outputs = self.Flatten(enc_outputs) outputs = self.Last_Dense(enc_outputs) return outputs