| | """ |
| | 1B Parameter Decoder-Only Transformer — built from scratch. |
| | |
| | Techniques: |
| | - RoPE (Rotary Position Embeddings) |
| | - Grouped Query Attention (GQA) |
| | - SwiGLU Feed-Forward |
| | - RMSNorm (pre-norm architecture) |
| | - Flash Attention 2 (via PyTorch SDPA) |
| | """ |
| |
|
| | import math |
| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | from .config import ModelConfig |
| |
|
| |
|
| | class RMSNorm(nn.Module): |
| | def __init__(self, dim: int, eps: float = 1e-5): |
| | super().__init__() |
| | self.eps = eps |
| | self.weight = nn.Parameter(torch.ones(dim)) |
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor: |
| | norm = x.float().pow(2).mean(-1, keepdim=True).add(self.eps).rsqrt() |
| | return (x.float() * norm).type_as(x) * self.weight |
| |
|
| |
|
| | def precompute_rope_freqs(dim: int, max_seq_len: int, theta: float = 10000.0) -> torch.Tensor: |
| | freqs = 1.0 / (theta ** (torch.arange(0, dim, 2).float() / dim)) |
| | t = torch.arange(max_seq_len, dtype=torch.float32) |
| | freqs = torch.outer(t, freqs) |
| | return torch.polar(torch.ones_like(freqs), freqs) |
| |
|
| |
|
| | def apply_rope(xq: torch.Tensor, xk: torch.Tensor, freqs_cis: torch.Tensor): |
| | B, S, H, D = xq.shape |
| | xq_c = torch.view_as_complex(xq.float().reshape(B, S, H, D // 2, 2)) |
| | xk_c = torch.view_as_complex(xk.float().reshape(B, S, xk.shape[2], D // 2, 2)) |
| | freqs = freqs_cis[:S].clone().unsqueeze(0).unsqueeze(2) |
| | xq_out = torch.view_as_real(xq_c * freqs).flatten(3) |
| | xk_out = torch.view_as_real(xk_c * freqs).flatten(3) |
| | return xq_out.type_as(xq), xk_out.type_as(xk) |
| |
|
| |
|
| | class GroupedQueryAttention(nn.Module): |
| | def __init__(self, config: ModelConfig): |
| | super().__init__() |
| | self.num_heads = config.num_attention_heads |
| | self.num_kv_heads = config.num_kv_heads |
| | self.head_dim = config.head_dim |
| | self.num_groups = self.num_heads // self.num_kv_heads |
| |
|
| | self.wq = nn.Linear(config.hidden_dim, self.num_heads * self.head_dim, bias=False) |
| | self.wk = nn.Linear(config.hidden_dim, self.num_kv_heads * self.head_dim, bias=False) |
| | self.wv = nn.Linear(config.hidden_dim, self.num_kv_heads * self.head_dim, bias=False) |
| | self.wo = nn.Linear(self.num_heads * self.head_dim, config.hidden_dim, bias=False) |
| |
|
| | def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor) -> torch.Tensor: |
| | B, S, _ = x.shape |
| |
|
| | q = self.wq(x).view(B, S, self.num_heads, self.head_dim) |
| | k = self.wk(x).view(B, S, self.num_kv_heads, self.head_dim) |
| | v = self.wv(x).view(B, S, self.num_kv_heads, self.head_dim) |
| |
|
| | q, k = apply_rope(q, k, freqs_cis) |
| |
|
| | |
| | if self.num_groups > 1: |
| | k = k.unsqueeze(3).expand(B, S, self.num_kv_heads, self.num_groups, self.head_dim) |
| | k = k.reshape(B, S, self.num_heads, self.head_dim) |
| | v = v.unsqueeze(3).expand(B, S, self.num_kv_heads, self.num_groups, self.head_dim) |
| | v = v.reshape(B, S, self.num_heads, self.head_dim) |
| |
|
| | |
| | q = q.transpose(1, 2) |
| | k = k.transpose(1, 2) |
| | v = v.transpose(1, 2) |
| |
|
| | out = F.scaled_dot_product_attention(q, k, v, is_causal=True) |
| | out = out.transpose(1, 2).contiguous().view(B, S, -1) |
| | return self.wo(out) |
| |
|
| |
|
| | class SwiGLUFFN(nn.Module): |
| | def __init__(self, config: ModelConfig): |
| | super().__init__() |
| | self.w_gate = nn.Linear(config.hidden_dim, config.intermediate_dim, bias=False) |
| | self.w_up = nn.Linear(config.hidden_dim, config.intermediate_dim, bias=False) |
| | self.w_down = nn.Linear(config.intermediate_dim, config.hidden_dim, bias=False) |
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor: |
| | return self.w_down(F.silu(self.w_gate(x)) * self.w_up(x)) |
| |
|
| |
|
| | class TransformerBlock(nn.Module): |
| | def __init__(self, config: ModelConfig): |
| | super().__init__() |
| | self.attention_norm = RMSNorm(config.hidden_dim, eps=config.rms_norm_eps) |
| | self.attention = GroupedQueryAttention(config) |
| | self.ffn_norm = RMSNorm(config.hidden_dim, eps=config.rms_norm_eps) |
| | self.ffn = SwiGLUFFN(config) |
| |
|
| | def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor) -> torch.Tensor: |
| | x = x + self.attention(self.attention_norm(x), freqs_cis) |
| | x = x + self.ffn(self.ffn_norm(x)) |
| | return x |
| |
|
| |
|
| | class Transformer(nn.Module): |
| | def __init__(self, config: ModelConfig): |
| | super().__init__() |
| | self.config = config |
| |
|
| | self.tok_embeddings = nn.Embedding(config.vocab_size, config.hidden_dim) |
| | self.layers = nn.ModuleList([TransformerBlock(config) for _ in range(config.num_layers)]) |
| | self.norm = RMSNorm(config.hidden_dim, eps=config.rms_norm_eps) |
| | self.output = nn.Linear(config.hidden_dim, config.vocab_size, bias=False) |
| |
|
| | |
| | self.register_buffer( |
| | "freqs_cis", |
| | precompute_rope_freqs(config.head_dim, config.max_seq_len * 2, config.rope_theta), |
| | persistent=False, |
| | ) |
| |
|
| | self._init_weights() |
| |
|
| | def _init_weights(self): |
| | """Initialize with scaled normal, following GPT-NeoX / LLaMA conventions.""" |
| | for module in self.modules(): |
| | if isinstance(module, nn.Linear): |
| | nn.init.normal_(module.weight, mean=0.0, std=0.02) |
| | if module.bias is not None: |
| | nn.init.zeros_(module.bias) |
| | elif isinstance(module, nn.Embedding): |
| | nn.init.normal_(module.weight, mean=0.0, std=0.02) |
| |
|
| | |
| | scale = (2 * self.config.num_layers) ** -0.5 |
| | for layer in self.layers: |
| | nn.init.normal_(layer.attention.wo.weight, mean=0.0, std=0.02 * scale) |
| | nn.init.normal_(layer.ffn.w_down.weight, mean=0.0, std=0.02 * scale) |
| |
|
| | def forward(self, tokens: torch.Tensor, targets: torch.Tensor = None): |
| | B, S = tokens.shape |
| | h = self.tok_embeddings(tokens) |
| |
|
| | freqs_cis = self.freqs_cis[:S] |
| | for layer in self.layers: |
| | h = layer(h, freqs_cis) |
| | h = self.norm(h) |
| | logits = self.output(h) |
| |
|
| | loss = None |
| | if targets is not None: |
| | loss = F.cross_entropy( |
| | logits.view(-1, logits.size(-1)), |
| | targets.view(-1), |
| | ignore_index=-100, |
| | ) |
| | return logits, loss |
| |
|