| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | import torch
|
| | import torch.nn as nn
|
| | import torch.nn.functional as F
|
| | import math
|
| |
|
| |
|
| | VOCAB_SIZE = 50257
|
| | MODEL_DIM = 4096
|
| | NUM_HEADS = 32
|
| | NUM_KV_HEADS = 8
|
| | NUM_LAYERS = 32
|
| | MAX_SEQ_LEN = 2048
|
| | FFN_HIDDEN_DIM = 11008
|
| | HEAD_DIM = MODEL_DIM // NUM_HEADS
|
| | EPSILON = 1e-5
|
| | WINDOW_SIZE = 1024
|
| |
|
| |
|
| |
|
| | class RMSNorm(nn.Module):
|
| | def __init__(self, dim, eps=EPSILON):
|
| | super().__init__()
|
| | self.eps = eps
|
| | self.weight = nn.Parameter(torch.ones(dim))
|
| | def forward(self, x):
|
| | return (x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)) * self.weight
|
| |
|
| | def precompute_freqs_cis(dim, seq_len, theta=10000.0):
|
| | freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
|
| | t = torch.arange(seq_len)
|
| | freqs = torch.outer(t, freqs).float()
|
| | return torch.polar(torch.ones_like(freqs), freqs)
|
| |
|
| | def apply_rotary_emb(xq, xk, freqs_cis):
|
| | xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
|
| | xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))
|
| | freqs_cis = freqs_cis.view(1, xq_.size(1), 1, xq_.size(3))
|
| | xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3)
|
| | xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3)
|
| | return xq_out.type_as(xq), xk_out.type_as(xk)
|
| |
|
| | def repeat_kv(x: torch.Tensor, n_rep: int) -> torch.Tensor:
|
| | """Для Grouped-Query Attention: повторяет KV головы для матчинга с Q"""
|
| | if n_rep == 1:
|
| | return x
|
| | bs, slen, n_kv_heads, head_dim = x.shape
|
| | return (
|
| | x[:, :, :, None, :]
|
| | .expand(bs, slen, n_kv_heads, n_rep, head_dim)
|
| | .reshape(bs, slen, n_kv_heads * n_rep, head_dim)
|
| | )
|
| |
|
| | class MultiHeadAttention(nn.Module):
|
| | def __init__(self):
|
| | super().__init__()
|
| | self.n_kv_heads = NUM_KV_HEADS
|
| | self.n_rep = NUM_HEADS // NUM_KV_HEADS
|
| |
|
| | self.wq = nn.Linear(MODEL_DIM, NUM_HEADS * HEAD_DIM, bias=False)
|
| | self.wk = nn.Linear(MODEL_DIM, NUM_KV_HEADS * HEAD_DIM, bias=False)
|
| | self.wv = nn.Linear(MODEL_DIM, NUM_KV_HEADS * HEAD_DIM, bias=False)
|
| | self.wo = nn.Linear(NUM_HEADS * HEAD_DIM, MODEL_DIM, bias=False)
|
| |
|
| | def forward(self, x, freqs_cis, past_kv=None):
|
| | b, t, _ = x.shape
|
| | q, k, v = self.wq(x), self.wk(x), self.wv(x)
|
| |
|
| | q = q.view(b, t, NUM_HEADS, HEAD_DIM)
|
| | k = k.view(b, t, self.n_kv_heads, HEAD_DIM)
|
| | v = v.view(b, t, self.n_kv_heads, HEAD_DIM)
|
| |
|
| | q, k = apply_rotary_emb(q, k, freqs_cis[:t])
|
| |
|
| | if past_kv is not None:
|
| | pk, pv = past_kv
|
| | k = torch.cat([pk, k], dim=1)
|
| | v = torch.cat([pv, v], dim=1)
|
| | if k.size(1) > WINDOW_SIZE:
|
| | k, v = k[:, -WINDOW_SIZE:], v[:, -WINDOW_SIZE:]
|
| |
|
| | current_kv = (k.detach(), v.detach())
|
| |
|
| |
|
| | k = repeat_kv(k, self.n_rep)
|
| | v = repeat_kv(v, self.n_rep)
|
| |
|
| | q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2)
|
| | out = F.scaled_dot_product_attention(q, k, v, is_causal=True)
|
| | return self.wo(out.transpose(1, 2).contiguous().view(b, t, MODEL_DIM)), current_kv
|
| |
|
| | class SwiGLU(nn.Module):
|
| | def __init__(self):
|
| | super().__init__()
|
| | self.w1 = nn.Linear(MODEL_DIM, FFN_HIDDEN_DIM, bias=False)
|
| | self.w2 = nn.Linear(FFN_HIDDEN_DIM, MODEL_DIM, bias=False)
|
| | self.w3 = nn.Linear(MODEL_DIM, FFN_HIDDEN_DIM, bias=False)
|
| | def forward(self, x):
|
| | return self.w2(F.silu(self.w1(x)) * self.w3(x))
|
| |
|
| | class JiRackPyTorch(nn.Module):
|
| | def __init__(self):
|
| | super().__init__()
|
| | self.token_emb = nn.Embedding(VOCAB_SIZE, MODEL_DIM)
|
| | self.blocks = nn.ModuleList([nn.ModuleDict({
|
| | 'norm1': RMSNorm(MODEL_DIM),
|
| | 'attn': MultiHeadAttention(),
|
| | 'norm2': RMSNorm(MODEL_DIM),
|
| | 'ffn': SwiGLU()
|
| | }) for _ in range(NUM_LAYERS)])
|
| | self.norm_f = RMSNorm(MODEL_DIM)
|
| | self.head = nn.Linear(MODEL_DIM, VOCAB_SIZE, bias=False)
|
| |
|
| |
|
| | self.head.weight = self.token_emb.weight
|
| |
|
| | self.register_buffer("freqs_cis", precompute_freqs_cis(HEAD_DIM, MAX_SEQ_LEN * 2))
|
| |
|
| | signature = "Author: Konstantin Vladimirovich Grabko (CMS Manhattan) 2025"
|
| | self.register_buffer("proof_of_authorship", torch.tensor([ord(c) for c in signature], dtype=torch.uint8))
|
| |
|
| | def get_author_info(self):
|
| | return "".join([chr(c) for c in self.proof_of_authorship.tolist()])
|
| |
|
| | def forward(self, idx, targets=None, past_kv=None):
|
| | x = self.token_emb(idx)
|
| | new_kvs = []
|
| |
|
| | for i, block in enumerate(self.blocks):
|
| |
|
| | h, kv = block['attn'](block['norm1'](x), self.freqs_cis, past_kv[i] if past_kv else None)
|
| | x = x + h
|
| | x = x + block['ffn'](block['norm2'](x))
|
| | new_kvs.append(kv)
|
| |
|
| | x = self.norm_f(x)
|
| | logits = self.head(x)
|
| |
|
| | if targets is not None:
|
| |
|
| | loss = F.cross_entropy(logits.view(-1, VOCAB_SIZE), targets.view(-1))
|
| | return logits, loss, new_kvs
|
| |
|
| | return logits, new_kvs |