| import torch |
| import torch.nn as nn |
| from transformers import PreTrainedModel |
| from configuration_en_uz import EnUzConfig |
|
|
| class Positional_Encoding(nn.Module): |
| def __init__(self, config: EnUzConfig): |
| super().__init__() |
| pe = torch.zeros(config.context_length, config.hidden_size) |
| position = torch.arange(0, config.context_length).unsqueeze(1) |
| dimension = torch.arange(0, config.hidden_size) |
| pe[:, 0::2] = torch.sin(position / (10000 ** (dimension[0::2] / config.hidden_size))) |
| pe[:, 1::2] = torch.cos(position / (10000 ** (dimension[1::2] / config.hidden_size))) |
| self.register_buffer('pos_enc', pe) |
|
|
| def forward(self, x): |
| seq_length = x.size(1) |
| pos_enc_matrix = self.pos_enc[:seq_length] |
| return x + pos_enc_matrix |
|
|
| class Masked_Self_Attention(nn.Module): |
| def __init__(self, config: EnUzConfig): |
| super().__init__() |
| self.d_model = config.hidden_size |
| self.num_heads = config.num_attention_heads |
| self.d_head = config.hidden_size // config.num_attention_heads |
| assert self.d_head * config.num_attention_heads == config.hidden_size, "hidden_size must be divisible by num_attention_heads" |
| self.w_q = nn.Linear(config.hidden_size, config.hidden_size) |
| self.w_k = nn.Linear(config.hidden_size, config.hidden_size) |
| self.w_v = nn.Linear(config.hidden_size, config.hidden_size) |
| self.dropout_layer = nn.Dropout(config.dropout) |
| self.w_out = nn.Linear(config.hidden_size, config.hidden_size) |
| self.w_out.is_residual = True |
| def forward(self, x): |
| B, T, C = x.size() |
|
|
| q = self.w_q(x).view(B, T, self.num_heads, self.d_head).transpose(1, 2) |
| k = self.w_k(x).view(B, T, self.num_heads, self.d_head).transpose(1, 2) |
| v = self.w_v(x).view(B, T, self.num_heads, self.d_head).transpose(1, 2) |
|
|
| scale_factor = 1 / (self.d_head ** 0.5) |
| attn_bias = torch.zeros(T, T, device=x.device) |
| casual_mask = torch.ones(T, T, dtype=torch.bool, device=x.device).triu(diagonal=1) |
| attn_bias.masked_fill_(casual_mask, float('-inf')) |
|
|
| attn_weights = torch.matmul(q, k.transpose(-2, -1)) * scale_factor |
| attn_weights += attn_bias |
| attn_weights = torch.softmax(attn_weights, dim=-1) |
| output = torch.matmul(attn_weights, v) |
| output = output.transpose(1, 2).contiguous().view(B, T, self.d_model) |
| output = self.w_out(output) |
| output = self.dropout_layer(output) |
| return output |
|
|
|
|
| class MLP(nn.Module): |
| def __init__(self, config: EnUzConfig): |
| super().__init__() |
| self.linear1 = nn.Linear(config.hidden_size, config.hidden_size * config.ffn_scaler) |
| self.linear2 = nn.Linear(config.hidden_size * config.ffn_scaler, config.hidden_size) |
| self.linear2.is_residual = True |
| self.gelu = nn.GELU(approximate='tanh') |
| self.dropout_layer = nn.Dropout(config.dropout) |
|
|
| def forward(self, x): |
| x = self.linear1(x) |
| x = self.gelu(x) |
| x = self.linear2(x) |
| x = self.dropout_layer(x) |
| return x |
|
|
| class Decoder_Block(nn.Module): |
| def __init__(self, config: EnUzConfig): |
| super().__init__() |
| self.msa = Masked_Self_Attention(config=config) |
| self.mlp = MLP(config=config) |
| self.ln1 = nn.LayerNorm(config.hidden_size) |
| self.ln2 = nn.LayerNorm(config.hidden_size) |
|
|
| def forward(self, x): |
| x = x + self.msa(self.ln1(x)) |
| x = x + self.mlp(self.ln2(x)) |
| return x |
|
|
| class Decoder_Only_Model(nn.Module): |
| def __init__(self, config: EnUzConfig): |
| super().__init__() |
| self.d_model = config.hidden_size |
| self.num_layers = config.num_layers |
| self.input_embedding = nn.Embedding(config.vocab_size, config.hidden_size) |
| self.pos_embedding = Positional_Encoding(config=config) |
| self.dropout_layer = nn.Dropout(config.dropout) |
| self.decoder_blocks = nn.ModuleList([ |
| Decoder_Block(config=config) for _ in range(config.num_layers) |
| ]) |
| self.final_ln = nn.LayerNorm(config.hidden_size) |
| self.output_layer = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
| self.output_layer.weight = self.input_embedding.weight |
| self.apply(self._init_weights) |
|
|
| def _init_weights(self, module): |
| if isinstance(module, nn.Linear): |
| std = 0.02 |
| if hasattr(module, 'is_residual') and module.is_residual: |
| std *= (2 * self.num_layers) ** -0.5 |
| nn.init.normal_(module.weight, mean=0.0, std=std) |
| if module.bias is not None: |
| nn.init.zeros_(module.bias) |
| elif isinstance(module, nn.Embedding): |
| std = 0.02 |
| nn.init.normal_(module.weight, mean=0.0, std=std) |
|
|
| def forward(self, x, labels=None): |
| x = self.input_embedding(x) * (self.d_model ** 0.5) |
| x = self.pos_embedding(x) |
| x = self.dropout_layer(x) |
| for block in self.decoder_blocks: |
| x = block(x) |
| x = self.final_ln(x) |
| x = self.output_layer(x) |
| return x |
|
|
| class EnUzModel(PreTrainedModel): |
| config_class = EnUzConfig |
| _tied_weights_keys = ["model.output_layer.weight"] |
| def __init__(self, config: EnUzConfig): |
| super().__init__(config) |
| self.model = Decoder_Only_Model(config=config) |
| def forward(self, input_ids, labels=None, loss_fn: nn.CrossEntropyLoss = None): |
| logits = self.model(input_ids) |
| if labels is not None and loss_fn is not None: |
| B, T, C = logits.size() |
| loss = loss_fn(logits.view(B * T, C), labels.view(B * T)) |
| return {"loss": loss, "logits": logits} |
| return {"logits": logits} |
| def tie_weights(self): |
| self.model.output_layer.weight = self.model.input_embedding.weight |
| def get_input_embeddings(self): |
| return self.model.input_embedding |
| def set_input_embeddings(self, new_embeddings): |
| self.model.input_embedding = new_embeddings |
|
|
|
|