En-Uz-Transformer-Model / modeling_en_uz.py
davron04's picture
Upload model
e24f815 verified
import torch
import torch.nn as nn
from transformers import PreTrainedModel
from configuration_en_uz import EnUzConfig
class Positional_Encoding(nn.Module):
def __init__(self, config: EnUzConfig):
super().__init__()
pe = torch.zeros(config.context_length, config.hidden_size) # (seq_length, d_model)
position = torch.arange(0, config.context_length).unsqueeze(1) # (seq_length, 1)
dimension = torch.arange(0, config.hidden_size) # (d_model,)
pe[:, 0::2] = torch.sin(position / (10000 ** (dimension[0::2] / config.hidden_size))) # (seq_length, d_model/2)
pe[:, 1::2] = torch.cos(position / (10000 ** (dimension[1::2] / config.hidden_size))) # (seq_length, d_model/2)
self.register_buffer('pos_enc', pe)
def forward(self, x):
seq_length = x.size(1)
pos_enc_matrix = self.pos_enc[:seq_length] # (seq_length, d_model)
return x + pos_enc_matrix # (batch_size, seq_length, d_model)
class Masked_Self_Attention(nn.Module):
def __init__(self, config: EnUzConfig):
super().__init__()
self.d_model = config.hidden_size
self.num_heads = config.num_attention_heads
self.d_head = config.hidden_size // config.num_attention_heads
assert self.d_head * config.num_attention_heads == config.hidden_size, "hidden_size must be divisible by num_attention_heads"
self.w_q = nn.Linear(config.hidden_size, config.hidden_size)
self.w_k = nn.Linear(config.hidden_size, config.hidden_size)
self.w_v = nn.Linear(config.hidden_size, config.hidden_size)
self.dropout_layer = nn.Dropout(config.dropout)
self.w_out = nn.Linear(config.hidden_size, config.hidden_size)
self.w_out.is_residual = True
def forward(self, x):
B, T, C = x.size()
q = self.w_q(x).view(B, T, self.num_heads, self.d_head).transpose(1, 2) # (batch_size, num_heads, seq_length, head_dim)
k = self.w_k(x).view(B, T, self.num_heads, self.d_head).transpose(1, 2) # (batch_size, num_heads, seq_length, head_dim)
v = self.w_v(x).view(B, T, self.num_heads, self.d_head).transpose(1, 2) # (batch_size, num_heads, seq_length, head_dim)
scale_factor = 1 / (self.d_head ** 0.5)
attn_bias = torch.zeros(T, T, device=x.device) # (seq_length, seq_length)
casual_mask = torch.ones(T, T, dtype=torch.bool, device=x.device).triu(diagonal=1) # (seq_length, seq_length) with 1s in the lower triangle and 0s elsewhere
attn_bias.masked_fill_(casual_mask, float('-inf')) # Set the upper triangle (future positions) to -inf
attn_weights = torch.matmul(q, k.transpose(-2, -1)) * scale_factor # (batch_size, num_heads, seq_length, seq_length)
attn_weights += attn_bias
attn_weights = torch.softmax(attn_weights, dim=-1) # (batch_size, num_heads, seq_length, seq_length)
output = torch.matmul(attn_weights, v) # (batch_size, num_heads, seq_length, head_dim)
output = output.transpose(1, 2).contiguous().view(B, T, self.d_model) # (batch_size, seq_length, d_model)
output = self.w_out(output) # (batch_size, seq_length, d_model)
output = self.dropout_layer(output)
return output
class MLP(nn.Module):
def __init__(self, config: EnUzConfig):
super().__init__()
self.linear1 = nn.Linear(config.hidden_size, config.hidden_size * config.ffn_scaler)
self.linear2 = nn.Linear(config.hidden_size * config.ffn_scaler, config.hidden_size)
self.linear2.is_residual = True
self.gelu = nn.GELU(approximate='tanh')
self.dropout_layer = nn.Dropout(config.dropout)
def forward(self, x):
x = self.linear1(x)
x = self.gelu(x)
x = self.linear2(x)
x = self.dropout_layer(x)
return x
class Decoder_Block(nn.Module):
def __init__(self, config: EnUzConfig):
super().__init__()
self.msa = Masked_Self_Attention(config=config)
self.mlp = MLP(config=config)
self.ln1 = nn.LayerNorm(config.hidden_size)
self.ln2 = nn.LayerNorm(config.hidden_size)
def forward(self, x):
x = x + self.msa(self.ln1(x))
x = x + self.mlp(self.ln2(x))
return x
class Decoder_Only_Model(nn.Module):
def __init__(self, config: EnUzConfig):
super().__init__()
self.d_model = config.hidden_size
self.num_layers = config.num_layers
self.input_embedding = nn.Embedding(config.vocab_size, config.hidden_size)
self.pos_embedding = Positional_Encoding(config=config)
self.dropout_layer = nn.Dropout(config.dropout)
self.decoder_blocks = nn.ModuleList([
Decoder_Block(config=config) for _ in range(config.num_layers)
])
self.final_ln = nn.LayerNorm(config.hidden_size)
self.output_layer = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
self.output_layer.weight = self.input_embedding.weight
self.apply(self._init_weights)
def _init_weights(self, module):
if isinstance(module, nn.Linear):
std = 0.02
if hasattr(module, 'is_residual') and module.is_residual:
std *= (2 * self.num_layers) ** -0.5
nn.init.normal_(module.weight, mean=0.0, std=std)
if module.bias is not None:
nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
std = 0.02
nn.init.normal_(module.weight, mean=0.0, std=std)
def forward(self, x, labels=None):
x = self.input_embedding(x) * (self.d_model ** 0.5) # (batch_size, seq_length, d_model)
x = self.pos_embedding(x) # (batch_size, seq_length, d_model)
x = self.dropout_layer(x)
for block in self.decoder_blocks:
x = block(x)
x = self.final_ln(x) # (batch_size, seq_length, d_model)
x = self.output_layer(x)
return x
class EnUzModel(PreTrainedModel):
config_class = EnUzConfig
_tied_weights_keys = ["model.output_layer.weight"]
def __init__(self, config: EnUzConfig):
super().__init__(config)
self.model = Decoder_Only_Model(config=config)
def forward(self, input_ids, labels=None, loss_fn: nn.CrossEntropyLoss = None):
logits = self.model(input_ids)
if labels is not None and loss_fn is not None:
B, T, C = logits.size()
loss = loss_fn(logits.view(B * T, C), labels.view(B * T))
return {"loss": loss, "logits": logits}
return {"logits": logits}
def tie_weights(self):
self.model.output_layer.weight = self.model.input_embedding.weight
def get_input_embeddings(self):
return self.model.input_embedding
def set_input_embeddings(self, new_embeddings):
self.model.input_embedding = new_embeddings