|
|
|
|
|
import math |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from transformers import AutoTokenizer, Trainer, TrainingArguments, PreTrainedModel, PretrainedConfig |
|
|
from datasets import load_dataset, IterableDataset |
|
|
|
|
|
|
|
|
class ModelConfig(PretrainedConfig): |
|
|
model_type = "custom_henyo_culturax" |
|
|
def __init__( |
|
|
self, |
|
|
vocab_size=50257, |
|
|
dim=768, |
|
|
n_layers=12, |
|
|
n_heads=12, |
|
|
n_kv_heads=4, |
|
|
multiple_of=256, |
|
|
max_seq_len=1024, |
|
|
dropout=0.05, |
|
|
**kwargs |
|
|
): |
|
|
super().__init__(**kwargs) |
|
|
self.vocab_size = vocab_size |
|
|
self.dim = dim |
|
|
self.n_layers = n_layers |
|
|
self.n_heads = n_heads |
|
|
self.n_kv_heads = n_kv_heads |
|
|
self.multiple_of = multiple_of |
|
|
self.max_seq_len = max_seq_len |
|
|
self.dropout = dropout |
|
|
self.head_dim = dim // n_heads |
|
|
|
|
|
|
|
|
class RMSNorm(nn.Module): |
|
|
def __init__(self, dim, eps=1e-6): |
|
|
super().__init__() |
|
|
self.eps = eps |
|
|
self.weight = nn.Parameter(torch.ones(dim)) |
|
|
def _norm(self, x): |
|
|
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) |
|
|
def forward(self, x): |
|
|
return self._norm(x.float()).type_as(x) * self.weight |
|
|
|
|
|
def precompute_freqs_cis(dim: int, end: int, theta: float = 10000.0): |
|
|
freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim)) |
|
|
t = torch.arange(end, device=freqs.device) |
|
|
freqs = torch.outer(t, freqs).float() |
|
|
return torch.polar(torch.ones_like(freqs), freqs) |
|
|
|
|
|
def apply_rotary_emb(xq, xk, freqs_cis): |
|
|
xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2)) |
|
|
xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2)) |
|
|
freqs_cis = freqs_cis.unsqueeze(0).unsqueeze(0) |
|
|
xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3) |
|
|
xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3) |
|
|
return xq_out.type_as(xq), xk_out.type_as(xk) |
|
|
|
|
|
class GroupedQueryAttention(nn.Module): |
|
|
def __init__(self, args: ModelConfig): |
|
|
super().__init__() |
|
|
self.n_heads = args.n_heads |
|
|
self.n_kv_heads = args.n_kv_heads |
|
|
self.head_dim = args.head_dim |
|
|
self.n_rep = self.n_heads // args.n_kv_heads |
|
|
self.wq = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False) |
|
|
self.wk = nn.Linear(args.dim, args.n_kv_heads * self.head_dim, bias=False) |
|
|
self.wv = nn.Linear(args.dim, args.n_kv_heads * self.head_dim, bias=False) |
|
|
self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False) |
|
|
self.dropout = nn.Dropout(args.dropout) |
|
|
|
|
|
def forward(self, x, freqs_cis, mask=None): |
|
|
b, s, _ = x.shape |
|
|
xq, xk, xv = self.wq(x), self.wk(x), self.wv(x) |
|
|
xq = xq.view(b, s, self.n_heads, self.head_dim).transpose(1, 2) |
|
|
xk = xk.view(b, s, self.n_kv_heads, self.head_dim).transpose(1, 2) |
|
|
xv = xv.view(b, s, self.n_kv_heads, self.head_dim).transpose(1, 2) |
|
|
xq, xk = apply_rotary_emb(xq, xk, freqs_cis) |
|
|
if self.n_rep > 1: |
|
|
xk = xk.repeat_interleave(self.n_rep, dim=1) |
|
|
xv = xv.repeat_interleave(self.n_rep, dim=1) |
|
|
output = F.scaled_dot_product_attention(xq, xk, xv, attn_mask=mask, dropout_p=self.dropout.p if self.training else 0.0, is_causal=True) |
|
|
return self.wo(output.transpose(1, 2).contiguous().view(b, s, -1)) |
|
|
|
|
|
class SwiGLU(nn.Module): |
|
|
def __init__(self, args: ModelConfig): |
|
|
super().__init__() |
|
|
hidden_dim = 4 * args.dim |
|
|
hidden_dim = int(2 * hidden_dim / 3) |
|
|
hidden_dim = args.multiple_of * ((hidden_dim + args.multiple_of - 1) // args.multiple_of) |
|
|
self.w1 = nn.Linear(args.dim, hidden_dim, bias=False) |
|
|
self.w2 = nn.Linear(hidden_dim, args.dim, bias=False) |
|
|
self.w3 = nn.Linear(args.dim, hidden_dim, bias=False) |
|
|
def forward(self, x): |
|
|
return self.w2(F.silu(self.w1(x)) * self.w3(x)) |
|
|
|
|
|
class TransformerBlock(nn.Module): |
|
|
def __init__(self, args: ModelConfig): |
|
|
super().__init__() |
|
|
self.attention_norm = RMSNorm(args.dim) |
|
|
self.attention = GroupedQueryAttention(args) |
|
|
self.ffn_norm = RMSNorm(args.dim) |
|
|
self.feed_forward = SwiGLU(args) |
|
|
def forward(self, x, freqs_cis, mask=None): |
|
|
x = x + self.attention(self.attention_norm(x), freqs_cis, mask) |
|
|
x = x + self.feed_forward(self.ffn_norm(x)) |
|
|
return x |
|
|
|
|
|
class HenyoModel(PreTrainedModel): |
|
|
config_class = ModelConfig |
|
|
def __init__(self, config): |
|
|
super().__init__(config) |
|
|
self.config = config |
|
|
self.tok_embeddings = nn.Embedding(config.vocab_size, config.dim) |
|
|
self.layers = nn.ModuleList([TransformerBlock(config) for _ in range(config.n_layers)]) |
|
|
self.norm = RMSNorm(config.dim) |
|
|
self.output = nn.Linear(config.dim, config.vocab_size, bias=False) |
|
|
self.output.weight = self.tok_embeddings.weight |
|
|
self.freqs_cis = precompute_freqs_cis(config.dim // config.n_heads, config.max_seq_len * 2) |
|
|
|
|
|
def forward(self, input_ids, labels=None, **kwargs): |
|
|
b, s = input_ids.shape |
|
|
h = self.tok_embeddings(input_ids) |
|
|
freqs_cis = self.freqs_cis[:s].to(h.device) |
|
|
mask = None |
|
|
if not hasattr(F, 'scaled_dot_product_attention'): |
|
|
mask = torch.triu(torch.full((s, s), float("-inf"), device=h.device), diagonal=1) |
|
|
for layer in self.layers: |
|
|
h = layer(h, freqs_cis, mask) |
|
|
h = self.norm(h) |
|
|
logits = self.output(h) |
|
|
loss = None |
|
|
if labels is not None: |
|
|
shift_logits = logits[..., :-1, :].contiguous().view(-1, self.config.vocab_size) |
|
|
shift_labels = labels[..., 1:].contiguous().view(-1) |
|
|
loss = F.cross_entropy(shift_logits, shift_labels) |
|
|
return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits} |
|
|
|