|
|
import tensorflow as tf |
|
|
from tensorflow.keras import layers, Model |
|
|
|
|
|
class SwiGLU(layers.Layer): |
|
|
def __init__(self, d_model, d_ff): |
|
|
super().__init__() |
|
|
self.proj = layers.Dense(d_ff) |
|
|
self.out = layers.Dense(d_model) |
|
|
def call(self, x): |
|
|
x_proj = self.proj(x) |
|
|
x_val, x_gate = tf.split(x_proj, 2, axis=-1) |
|
|
return self.out(x_val * tf.nn.silu(x_gate)) |
|
|
|
|
|
class CrossBlock(layers.Layer): |
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.alpha = layers.Dense(1, activation='sigmoid', dtype='float32') |
|
|
def call(self, x, z): |
|
|
a = self.alpha(x) |
|
|
y = a * x + (1.0 - a) * z |
|
|
return y |
|
|
|
|
|
class EncoderBlock(layers.Layer): |
|
|
def __init__(self, d_model, num_heads, dff, dropout=0.1): |
|
|
super().__init__() |
|
|
self.mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model) |
|
|
self.ffn = SwiGLU(d_model, 512) |
|
|
self.norm1 = layers.LayerNormalization(epsilon=1e-6) |
|
|
self.norm2 = layers.LayerNormalization(epsilon=1e-6) |
|
|
self.dropout1 = layers.Dropout(dropout) |
|
|
self.dropout2 = layers.Dropout(dropout) |
|
|
def call(self, x, mask=None, training=False): |
|
|
attn_out = self.dropout1(self.mha(x, x, x, attention_mask=mask), training=training) |
|
|
out1 = self.norm1(x + attn_out) |
|
|
ffn_out = self.dropout2(self.ffn(out1), training=training) |
|
|
return self.norm2(out1 + ffn_out) |
|
|
|
|
|
class LoU(layers.Layer): |
|
|
def __init__(self, d_model, clip_value=5.0, eps=1e-6): |
|
|
super().__init__() |
|
|
self.d_model = d_model |
|
|
self.clip_value = float(clip_value) |
|
|
self.eps = float(eps) |
|
|
self.Q = layers.Dense(d_model, dtype='float32') |
|
|
self.K = layers.Dense(d_model, dtype='float32') |
|
|
self.V = layers.Dense(d_model, dtype='float32') |
|
|
self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') |
|
|
self.norm1 = layers.LayerNormalization(epsilon=1e-5, dtype='float32') |
|
|
|
|
|
self.alpha_linear = layers.Dense(1, activation='sigmoid', dtype='float32') |
|
|
|
|
|
self.cross = CrossBlock() |
|
|
self.glu = SwiGLU(d_model, 512) |
|
|
|
|
|
def _ema_over_time(self, score, alpha_dynamic): |
|
|
seq = tf.transpose(score, perm=[1, 0, 2]) |
|
|
alpha_seq = tf.transpose(alpha_dynamic, perm=[1, 0, 2]) |
|
|
|
|
|
def step(prev_ema, inputs): |
|
|
x_t, alpha_t = inputs |
|
|
new = alpha_t * x_t + (1.0 - alpha_t) * prev_ema |
|
|
return new |
|
|
|
|
|
init = seq[0] |
|
|
first_alpha = alpha_seq[0] |
|
|
remaining_seq = seq[1:] |
|
|
remaining_alpha = alpha_seq[1:] |
|
|
elems = (remaining_seq, remaining_alpha) |
|
|
ema_seq = tf.scan(fn=step, elems=elems, initializer=init) |
|
|
ema_seq = tf.concat([tf.expand_dims(init, 0), ema_seq], axis=0) |
|
|
ema = tf.transpose(ema_seq, perm=[1, 0, 2]) |
|
|
return ema |
|
|
|
|
|
def call(self, x, z): |
|
|
x_f32 = tf.cast(x, tf.float32) |
|
|
residual = x_f32 |
|
|
x_f32 = self.norm1(x) |
|
|
|
|
|
q = self.Q(x_f32) |
|
|
k = self.K(x_f32) |
|
|
V = self.V(x_f32) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
g_q = (tf.nn.tanh(q) + 1.0) / 2.0 |
|
|
g_k = (tf.nn.tanh(k) + 1.0) / 2.0 |
|
|
score = g_q * g_k |
|
|
|
|
|
alpha_dynamic = self.alpha_linear(x_f32) |
|
|
score_ema = self._ema_over_time(score, alpha_dynamic) |
|
|
mean_last = tf.reduce_mean(score_ema, axis=-1, keepdims=True) |
|
|
denom = tf.maximum(mean_last, self.eps) |
|
|
score_norm = score_ema / denom |
|
|
score_clipped = tf.clip_by_value(score_norm, -self.clip_value, self.clip_value) |
|
|
x_comb = score_clipped * V |
|
|
out = self.norm(x_comb + residual) |
|
|
out = self.cross(out, z) |
|
|
out = self.glu(out) |
|
|
return tf.cast(out, x.dtype) |
|
|
|
|
|
class Transformer(tf.keras.Model): |
|
|
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_len=100, dropout=0.1): |
|
|
super().__init__() |
|
|
self.max_len = max_len |
|
|
self.d_model = d_model |
|
|
self.enc_embedding = layers.Embedding(input_vocab_size, d_model) |
|
|
self.enc_pos_embedding = layers.Embedding(max_len, d_model) |
|
|
self.dec_embedding = layers.Embedding(target_vocab_size, d_model) |
|
|
self.dec_pos_embedding = layers.Embedding(max_len, d_model) |
|
|
self.enc_layers = [EncoderBlock(d_model, num_heads, dff, dropout) for _ in range(num_layers)] |
|
|
self.dec_layers = [LoU(d_model) for _ in range(num_layers)] |
|
|
self.final_layer = layers.Dense(target_vocab_size) |
|
|
def call(self, inputs, training=False): |
|
|
enc_inputs = inputs["enc_inputs"] |
|
|
dec_inputs = inputs["dec_inputs"] |
|
|
enc_pos = tf.range(tf.shape(enc_inputs)[1])[tf.newaxis, :] |
|
|
dec_pos = tf.range(tf.shape(dec_inputs)[1])[tf.newaxis, :] |
|
|
x = self.enc_embedding(enc_inputs) + self.enc_pos_embedding(enc_pos) |
|
|
for layer in self.enc_layers: x = layer(x, training=training) |
|
|
enc_out = x |
|
|
y = self.dec_embedding(dec_inputs) + self.dec_pos_embedding(dec_pos) |
|
|
for layer in self.dec_layers: y = layer(y, enc_out, training=training) |
|
|
return self.final_layer(y) |
|
|
|