| import torch
|
| import torch.nn as nn
|
| import torch.nn.functional as F
|
| import math
|
| import torch.optim as optim
|
| from transformers import AutoModelForCausalLM
|
| from transformers.modeling_utils import PreTrainedModel
|
| from transformers.configuration_utils import PretrainedConfig
|
|
|
| class DecoderLayer(nn.Module):
|
| def __init__(self, d_model, n_heads, dim_feedforward, dropout=0.1, group_size=16):
|
| super(DecoderLayer, self).__init__()
|
| self.self_attn = MultiHeadAttention(d_model, n_heads, dropout, group_size)
|
| self.feed_forward = PositionwiseFeedForward(d_model, dim_feedforward, dropout)
|
| self.layer_norm1 = nn.LayerNorm(d_model)
|
| self.layer_norm2 = nn.LayerNorm(d_model)
|
| self.dropout = nn.Dropout(dropout)
|
|
|
| def forward(self, x):
|
|
|
| norm_x = self.layer_norm1(x)
|
| x = x + self.dropout(self.self_attn(norm_x, norm_x, norm_x))
|
|
|
| norm_x = self.layer_norm2(x)
|
| x = x + self.dropout(self.feed_forward(norm_x))
|
| return x
|
| class MultiHeadAttention(nn.Module):
|
| def __init__(self, d_model, n_heads, dropout=0.1, group_size=16):
|
| super(MultiHeadAttention, self).__init__()
|
| self.query_linear = nn.Linear(d_model, d_model)
|
| self.key_linear = nn.Linear(d_model, d_model)
|
| self.value_linear = nn.Linear(d_model, d_model)
|
| self.dropout = nn.Dropout(dropout)
|
| self.n_heads = n_heads
|
| self.d_model = d_model
|
| self.group_size = group_size
|
|
|
| def forward(self, query, key, value):
|
|
|
| query = self.query_linear(query)
|
| key = self.key_linear(key)
|
| value = self.value_linear(value)
|
|
|
|
|
| query_groups = query.chunk(self.group_size, dim=1)
|
| key_groups = key.chunk(self.group_size, dim=1)
|
| value_groups = value.chunk(self.group_size, dim=1)
|
|
|
| attention_scores = []
|
| for q, k, v in zip(query_groups, key_groups, value_groups):
|
| scores = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(self.d_model)
|
| scores = F.softmax(scores, dim=-1)
|
| scores = self.dropout(scores)
|
| attention_scores.append(torch.matmul(scores, v))
|
|
|
|
|
| output = torch.cat(attention_scores, dim=1)
|
| return output
|
|
|
| class PositionwiseFeedForward(nn.Module):
|
| def __init__(self, d_model, dim_feedforward, dropout=0.1):
|
| super(PositionwiseFeedForward, self).__init__()
|
| self.linear1 = nn.Linear(d_model, dim_feedforward)
|
| self.dropout = nn.Dropout(dropout)
|
| self.linear2 = nn.Linear(dim_feedforward, d_model)
|
|
|
| def forward(self, x):
|
| x = F.relu(self.linear1(x))
|
| x = self.dropout(x)
|
| x = self.linear2(x)
|
| return x
|
|
|
|
|
| class Decoder(nn.Module):
|
| def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, group_size=16):
|
| super(Decoder, self).__init__()
|
| self.layers = nn.ModuleList([
|
| DecoderLayer(d_model, n_heads, dim_feedforward, dropout, group_size)
|
| for _ in range(num_layers)
|
| ])
|
| self.layer_norm = nn.LayerNorm(d_model)
|
|
|
| def forward(self, x):
|
| for layer in self.layers:
|
| x = layer(x)
|
| x = self.layer_norm(x)
|
| return x
|
|
|
| class Embeddings(nn.Module):
|
| def __init__(self, d_model, vocab_size):
|
| super(Embeddings, self).__init__()
|
| self.lut = nn.Embedding(vocab_size, d_model)
|
| self.d_model = d_model
|
|
|
| def forward(self, x):
|
| return self.lut(x) * math.sqrt(self.d_model)
|
|
|
| class PositionalEncoding(nn.Module):
|
| def __init__(self, d_model, dropout=0.1, max_len=5000):
|
| super(PositionalEncoding, self).__init__()
|
| self.dropout = nn.Dropout(dropout)
|
|
|
| pe = torch.zeros(max_len, d_model)
|
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
|
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
|
| pe[:, 0::2] = torch.sin(position * div_term)
|
| pe[:, 1::2] = torch.cos(position * div_term)
|
| pe = pe.unsqueeze(0).transpose(0, 1)
|
| self.register_buffer('pe', pe)
|
|
|
| def forward(self, x):
|
| x = x + self.pe[:x.size(0), :]
|
| return self.dropout(x)
|
| class RMSNorm(nn.Module):
|
| def __init__(self, dim, epsilon=1e-6, scale=True):
|
| super(RMSNorm, self).__init__()
|
| self.epsilon = epsilon
|
| self.scale = scale
|
| self.weight = nn.Parameter(torch.ones(dim))
|
|
|
| def forward(self, x):
|
| rms = torch.sqrt(torch.mean(torch.square(x), dim=-1, keepdim=True))
|
| if self.scale:
|
| weight = self.weight / (rms + self.epsilon)
|
| return weight * x
|
| else:
|
| return x / (rms + self.epsilon)
|
| class TransformerDecoder(nn.Module):
|
| def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, vocab_size=10000, group_size=16):
|
| super(TransformerDecoder, self).__init__()
|
| self.embeddings = Embeddings(d_model, vocab_size)
|
| self.positional_encoding = PositionalEncoding(d_model, dropout)
|
| self.decoder = Decoder(num_layers, d_model, n_heads, dim_feedforward, dropout)
|
| self.rms_norm = RMSNorm(d_model)
|
| self.group_size = group_size
|
|
|
| def forward(self, x):
|
| x = self.embeddings(x)
|
| x = self.positional_encoding(x)
|
| x = self.decoder(x)
|
| x = self.rms_norm(x)
|
| return x
|
| class TransformerDecoderLM(nn.Module):
|
| def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, vocab_size=10000, group_size=16):
|
| super(TransformerDecoderLM, self).__init__()
|
| self.transformer = TransformerDecoder(num_layers, d_model, n_heads, dim_feedforward, dropout, vocab_size, group_size)
|
| self.lm_head = nn.Linear(d_model, vocab_size)
|
|
|
| def forward(self, input_ids):
|
| transformer_output = self.transformer(input_ids)
|
| lm_logits = self.lm_head(transformer_output)
|
| return lm_logits
|
| class CustomConfig(PretrainedConfig):
|
| model_type = "custom_transformer"
|
| def __init__(self, num_layers=6, d_model=512, n_heads=8, dim_feedforward=2048, dropout=0.1, vocab_size=10000, group_size=16, **kwargs):
|
| self.num_layers = num_layers
|
| self.d_model = d_model
|
| self.n_heads = n_heads
|
| self.dim_feedforward = dim_feedforward
|
| self.dropout = dropout
|
| self.vocab_size = vocab_size
|
| self.group_size = group_size
|
| super().__init__(**kwargs)
|
|
|
| class CustomTransformerForCausalLM(PreTrainedModel):
|
| config_class = CustomConfig
|
| def __init__(self, config):
|
| super().__init__(config)
|
| self.transformer = TransformerDecoderLM(
|
| num_layers=config.num_layers,
|
| d_model=config.d_model,
|
| n_heads=config.n_heads,
|
| dim_feedforward=config.dim_feedforward,
|
| dropout=config.dropout,
|
| vocab_size=config.vocab_size,
|
| group_size=config.group_size
|
| )
|
|
|
| def forward(self, input_ids, labels=None):
|
| logits = self.transformer(input_ids)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fct = nn.CrossEntropyLoss()
|
| loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
|
|
|
| return {"loss": loss, "logits": logits}
|
|
|
|
|