|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import torch
|
| import torch.nn as nn
|
| import torch.nn.functional as F
|
| import math
|
|
|
|
|
| VOCAB_SIZE = 50257
|
| MODEL_DIM = 2048
|
| NUM_HEADS = 32
|
| NUM_LAYERS = 16
|
| MAX_SEQ_LEN = 2048
|
| FFN_HIDDEN_DIM = MODEL_DIM * 4
|
| HEAD_DIM = MODEL_DIM // NUM_HEADS
|
| EPSILON = 1e-6
|
| WINDOW_SIZE = 512
|
|
|
| class RMSNorm(nn.Module):
|
| def __init__(self, dim, eps=EPSILON):
|
| super().__init__()
|
| self.eps = eps
|
| self.weight = nn.Parameter(torch.ones(dim))
|
| def forward(self, x):
|
| return (x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)) * self.weight
|
|
|
| def precompute_freqs_cis(dim, seq_len, theta=10000.0):
|
| freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
|
| t = torch.arange(seq_len)
|
| freqs = torch.outer(t, freqs).float()
|
| return torch.polar(torch.ones_like(freqs), freqs)
|
|
|
| def apply_rotary_emb(xq, xk, freqs_cis):
|
|
|
| xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
|
| xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))
|
| freqs_cis = freqs_cis.view(1, xq_.size(1), 1, xq_.size(3))
|
| xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3)
|
| xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3)
|
| return xq_out.type_as(xq), xk_out.type_as(xk)
|
|
|
| class MultiHeadAttention(nn.Module):
|
| def __init__(self):
|
| super().__init__()
|
| self.wq = nn.Linear(MODEL_DIM, MODEL_DIM, bias=False)
|
| self.wk = nn.Linear(MODEL_DIM, MODEL_DIM, bias=False)
|
| self.wv = nn.Linear(MODEL_DIM, MODEL_DIM, bias=False)
|
| self.wo = nn.Linear(MODEL_DIM, MODEL_DIM, bias=False)
|
|
|
| def forward(self, x, freqs_cis, past_kv=None):
|
| b, t, _ = x.shape
|
| q, k, v = self.wq(x), self.wk(x), self.wv(x)
|
| q = q.view(b, t, NUM_HEADS, HEAD_DIM)
|
| k = k.view(b, t, NUM_HEADS, HEAD_DIM)
|
| v = v.view(b, t, NUM_HEADS, HEAD_DIM)
|
|
|
| q, k = apply_rotary_emb(q, k, freqs_cis[:t])
|
|
|
| if past_kv is not None:
|
| pk, pv = past_kv
|
| k = torch.cat([pk, k], dim=1)
|
| v = torch.cat([pv, v], dim=1)
|
| if k.size(1) > WINDOW_SIZE:
|
| k, v = k[:, -WINDOW_SIZE:], v[:, -WINDOW_SIZE:]
|
|
|
| current_kv = (k, v)
|
| q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2)
|
| out = F.scaled_dot_product_attention(q, k, v, is_causal=True)
|
| return self.wo(out.transpose(1, 2).contiguous().view(b, t, MODEL_DIM)), current_kv
|
|
|
| class SwiGLU(nn.Module):
|
| def __init__(self):
|
| super().__init__()
|
| self.w1 = nn.Linear(MODEL_DIM, FFN_HIDDEN_DIM, bias=False)
|
| self.w2 = nn.Linear(FFN_HIDDEN_DIM, MODEL_DIM, bias=False)
|
| self.w3 = nn.Linear(MODEL_DIM, FFN_HIDDEN_DIM, bias=False)
|
| def forward(self, x):
|
| return self.w2(F.silu(self.w1(x)) * self.w3(x))
|
|
|
| class JiRackPyTorch(nn.Module):
|
| def __init__(self):
|
| super().__init__()
|
| self.token_emb = nn.Embedding(VOCAB_SIZE, MODEL_DIM)
|
| self.blocks = nn.ModuleList([nn.ModuleDict({
|
| 'norm1': RMSNorm(MODEL_DIM),
|
| 'attn': MultiHeadAttention(),
|
| 'norm2': RMSNorm(MODEL_DIM),
|
| 'ffn': SwiGLU()
|
| }) for _ in range(NUM_LAYERS)])
|
| self.norm_f = RMSNorm(MODEL_DIM)
|
| self.head = nn.Linear(MODEL_DIM, VOCAB_SIZE, bias=False)
|
| self.head.weight = self.token_emb.weight
|
|
|
| self.register_buffer("freqs_cis", precompute_freqs_cis(HEAD_DIM, MAX_SEQ_LEN * 2))
|
|
|
| signature = "Author: Konstantin Vladimirovich Grabko (CMS Manhattan) 2025"
|
| self.register_buffer("proof_of_authorship", torch.tensor([ord(c) for c in signature], dtype=torch.uint8))
|
|
|
| def get_author_info(self):
|
| return "".join([chr(c) for c in self.proof_of_authorship.tolist()])
|
|
|
| def forward(self, idx, targets=None, past_kv=None):
|
| x = self.token_emb(idx)
|
| new_kvs = []
|
| for i, block in enumerate(self.blocks):
|
| h, kv = block['attn'](block['norm1'](x), self.freqs_cis, past_kv[i] if past_kv else None)
|
| x = x + h
|
| x = x + block['ffn'](block['norm2'](x))
|
| new_kvs.append(kv)
|
| logits = self.head(self.norm_f(x))
|
| if targets is not None:
|
| loss = F.cross_entropy(logits.view(-1, VOCAB_SIZE), targets.view(-1))
|
| return logits, loss, new_kvs
|
| return logits, new_kvs |