| import tensorflow as tf
|
| from tensorflow.keras.layers import Layer, Dense, Dropout, LayerNormalization
|
|
|
| @tf.keras.utils.register_keras_serializable()
|
| class EncoderLayer(Layer):
|
| def __init__(self, d_model, num_heads, dff, dropout_rate, **kwargs):
|
| super(EncoderLayer, self).__init__(**kwargs)
|
| self.mha = MultiHeadAttention(d_model, num_heads)
|
| self.ffn = point_wise_feed_forward_network(d_model, dff)
|
| self.layernorm1 = LayerNormalization(epsilon=1e-6)
|
| self.layernorm2 = LayerNormalization(epsilon=1e-6)
|
| self.dropout1 = Dropout(dropout_rate)
|
| self.dropout2 = Dropout(dropout_rate)
|
|
|
| def call(self, x, training=None, mask=None):
|
| attn_output, _ = self.mha(x, x, x, mask)
|
| attn_output = self.dropout1(attn_output, training=training)
|
| out1 = self.layernorm1(x + attn_output)
|
| ffn_output = self.ffn(out1)
|
| ffn_output = self.dropout2(ffn_output, training=training)
|
| out2 = self.layernorm2(out1 + ffn_output)
|
| return out2
|
|
|
| def get_config(self):
|
| config = super().get_config()
|
| config.update({
|
| 'd_model': self.mha.d_model,
|
| 'num_heads': self.mha.num_heads,
|
| 'dff': self.ffn.layers[0].units,
|
| 'dropout_rate': self.dropout1.rate
|
| })
|
| return config
|
|
|
| @tf.keras.utils.register_keras_serializable()
|
| class DecoderLayer(Layer):
|
| def __init__(self, d_model, num_heads, dff, dropout_rate, **kwargs):
|
| super(DecoderLayer, self).__init__(**kwargs)
|
| self.mha1 = MultiHeadAttention(d_model, num_heads)
|
| self.mha2 = MultiHeadAttention(d_model, num_heads)
|
| self.ffn = point_wise_feed_forward_network(d_model, dff)
|
| self.layernorm1 = LayerNormalization(epsilon=1e-6)
|
| self.layernorm2 = LayerNormalization(epsilon=1e-6)
|
| self.layernorm3 = LayerNormalization(epsilon=1e-6)
|
| self.dropout1 = Dropout(dropout_rate)
|
| self.dropout2 = Dropout(dropout_rate)
|
| self.dropout3 = Dropout(dropout_rate)
|
|
|
| def call(self, x, enc_output, training=None, look_ahead_mask=None, padding_mask=None):
|
| attn1, _ = self.mha1(x, x, x, look_ahead_mask)
|
| attn1 = self.dropout1(attn1, training=training)
|
| out1 = self.layernorm1(x + attn1)
|
| attn2, _ = self.mha2(enc_output, enc_output, out1, padding_mask)
|
| attn2 = self.dropout2(attn2, training=training)
|
| out2 = self.layernorm2(out1 + attn2)
|
| ffn_output = self.ffn(out2)
|
| ffn_output = self.dropout3(ffn_output, training=training)
|
| out3 = self.layernorm3(out2 + ffn_output)
|
| return out3
|
|
|
| def get_config(self):
|
| config = super().get_config()
|
| config.update({
|
| 'd_model': self.mha1.d_model,
|
| 'num_heads': self.mha1.num_heads,
|
| 'dff': self.ffn.layers[0].units,
|
| 'dropout_rate': self.dropout1.rate
|
| })
|
| return config
|
|
|
| @tf.keras.utils.register_keras_serializable()
|
| class MultiHeadAttention(Layer):
|
| def __init__(self, d_model, num_heads, **kwargs):
|
| super(MultiHeadAttention, self).__init__(**kwargs)
|
| self.num_heads = num_heads
|
| self.d_model = d_model
|
| assert d_model % num_heads == 0
|
| self.depth = d_model // num_heads
|
| self.wq = Dense(d_model)
|
| self.wk = Dense(d_model)
|
| self.wv = Dense(d_model)
|
| self.dense = Dense(d_model)
|
|
|
| def split_heads(self, x, batch_size):
|
| x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
|
| return tf.transpose(x, perm=[0, 2, 1, 3])
|
|
|
| def call(self, v, k, q, mask=None):
|
| batch_size = tf.shape(q)[0]
|
| q = self.wq(q)
|
| k = self.wk(k)
|
| v = self.wv(v)
|
| q = self.split_heads(q, batch_size)
|
| k = self.split_heads(k, batch_size)
|
| v = self.split_heads(v, batch_size)
|
| scaled_attention, _ = self.scaled_dot_product_attention(q, k, v, mask)
|
| scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
|
| concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
|
| output = self.dense(concat_attention)
|
| return output, _
|
|
|
| def scaled_dot_product_attention(self, q, k, v, mask):
|
| matmul_qk = tf.matmul(q, k, transpose_b=True)
|
| dk = tf.cast(tf.shape(k)[-1], tf.float32)
|
| scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
|
| if mask is not None:
|
| scaled_attention_logits += (mask * -1e9)
|
| attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
|
| output = tf.matmul(attention_weights, v)
|
| return output, attention_weights
|
|
|
| def get_config(self):
|
| config = super().get_config()
|
| config.update({
|
| 'd_model': self.d_model,
|
| 'num_heads': self.num_heads
|
| })
|
| return config
|
|
|
| def point_wise_feed_forward_network(d_model, dff):
|
| return tf.keras.Sequential([
|
| Dense(dff, activation='relu'),
|
| Dense(d_model)
|
| ]) |