| | """MIRAS Language Model - Custom Architecture""" |
| |
|
| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | from typing import Optional |
| |
|
| | def l2_loss(pred, target): |
| | return 0.5 * ((pred - target) ** 2).sum(dim=-1) |
| |
|
| | def lp_loss(pred, target, p=3): |
| | return (torch.abs(pred - target) ** p).sum(dim=-1) |
| |
|
| | def huber_loss(pred, target, delta): |
| | diff = pred - target |
| | abs_diff = torch.abs(diff) |
| | return torch.where(abs_diff <= delta, 0.5 * diff ** 2, delta * (abs_diff - 0.5 * delta)).sum(dim=-1) |
| |
|
| | def l2_retention_update(W, grad, alpha, eta): |
| | return alpha * W - eta * grad |
| |
|
| | def kl_retention_update(log_W, grad, alpha, eta, c=1.0): |
| | log_W_new = alpha * log_W - eta * grad |
| | return log_W_new, c * F.softmax(log_W_new, dim=-1) |
| |
|
| | def elastic_net_update(W, grad, lambda_decay, zeta_lr, gamma_l1): |
| | z = lambda_decay * W - zeta_lr * grad |
| | return torch.sign(z) * F.relu(torch.abs(z) - gamma_l1) |
| |
|
| |
|
| | class KeyValueProjection(nn.Module): |
| | def __init__(self, d_in, d_out): |
| | super().__init__() |
| | self.W_K = nn.Linear(d_in, d_out, bias=False) |
| | self.W_V = nn.Linear(d_in, d_out, bias=False) |
| | self.W_Q = nn.Linear(d_in, d_out, bias=False) |
| |
|
| | def forward(self, x): |
| | return self.W_K(x), self.W_V(x), self.W_Q(x) |
| |
|
| |
|
| | class MIRASLayer(nn.Module): |
| | def __init__(self, d, memory_type='deep', attentional_bias='l2', retention='l2', expansion=4, p=3, q=4): |
| | super().__init__() |
| | self.d, self.memory_type, self.attentional_bias, self.retention = d, memory_type, attentional_bias, retention |
| | self.p, self.q = p, q |
| | self.kv_proj = KeyValueProjection(d, d) |
| |
|
| | if memory_type == 'linear': |
| | self.register_buffer('M_init', torch.zeros(d, d)) |
| | else: |
| | self.W1_init = nn.Parameter(torch.randn(d, d * expansion) * 0.02) |
| | self.W2_init = nn.Parameter(torch.randn(d * expansion, d) * 0.02) |
| | self.ln = nn.LayerNorm(d) |
| |
|
| | if attentional_bias == 'huber': |
| | self.delta_proj = nn.Linear(d, 1) |
| |
|
| | self.alpha = nn.Parameter(torch.ones(1) * 0.9) |
| | self.eta = nn.Parameter(torch.ones(1) * 0.1) |
| | if retention == 'kl': |
| | self.c = nn.Parameter(torch.ones(1)) |
| | if retention == 'elastic': |
| | self.gamma = nn.Parameter(torch.ones(1) * 0.01) |
| |
|
| | def memory_forward_deep(self, x, W1, W2): |
| | h = F.gelu(x @ W2.transpose(-2, -1)) |
| | return x + self.ln(h @ W1.transpose(-2, -1)) |
| |
|
| | def get_loss(self, pred, target, x_t=None): |
| | if self.attentional_bias == 'l2': |
| | return l2_loss(pred, target).sum() |
| | elif self.attentional_bias == 'lp': |
| | return lp_loss(pred, target, self.p).sum() |
| | else: |
| | return huber_loss(pred, target, F.softplus(self.delta_proj(x_t))).sum() |
| |
|
| | def apply_retention(self, W, grad, log_W=None): |
| | alpha, eta = torch.sigmoid(self.alpha), F.softplus(self.eta) |
| | if self.retention == 'l2': |
| | return l2_retention_update(W, grad, alpha, eta), None |
| | elif self.retention == 'kl': |
| | log_W = log_W if log_W is not None else torch.log(W.clamp(min=1e-10)) |
| | log_W_new, W_new = kl_retention_update(log_W, grad, alpha, eta, self.c) |
| | return W_new, log_W_new |
| | else: |
| | return elastic_net_update(W, grad, alpha, eta, self.gamma), None |
| |
|
| | def forward(self, x): |
| | k, v, q = self.kv_proj(x) |
| | B, T, D = k.shape |
| | outputs = [] |
| |
|
| | with torch.enable_grad(): |
| | if self.memory_type == 'linear': |
| | M = self.M_init.unsqueeze(0).expand(B, -1, -1).contiguous() |
| | for t in range(T): |
| | k_t, v_t, q_t = k[:, t], v[:, t], q[:, t] |
| | M_leaf = M.detach().requires_grad_(True) |
| | pred = torch.einsum('bde,be->bd', M_leaf, k_t) |
| | loss = self.get_loss(pred, v_t, x[:, t] if self.attentional_bias == 'huber' else None) |
| | grad = torch.autograd.grad(loss, M_leaf)[0] |
| | M, _ = self.apply_retention(M, grad) |
| | outputs.append(torch.einsum('bde,be->bd', M, q_t)) |
| | else: |
| | W1 = self.W1_init.unsqueeze(0).expand(B, -1, -1).contiguous() |
| | W2 = self.W2_init.unsqueeze(0).expand(B, -1, -1).contiguous() |
| | log_W1, log_W2 = None, None |
| | if self.retention == 'kl': |
| | W1, W2 = F.softmax(W1, dim=-1), F.softmax(W2, dim=-1) |
| | log_W1, log_W2 = torch.log(W1.clamp(min=1e-10)), torch.log(W2.clamp(min=1e-10)) |
| |
|
| | for t in range(T): |
| | k_t, v_t, q_t = k[:, t], v[:, t], q[:, t] |
| | W1_leaf, W2_leaf = W1.detach().requires_grad_(True), W2.detach().requires_grad_(True) |
| | pred = self.memory_forward_deep(k_t.unsqueeze(1), W1_leaf, W2_leaf).squeeze(1) |
| | loss = self.get_loss(pred, v_t, x[:, t] if self.attentional_bias == 'huber' else None) |
| | grad1, grad2 = torch.autograd.grad(loss, [W1_leaf, W2_leaf]) |
| | W1, log_W1 = self.apply_retention(W1, grad1, log_W1) |
| | W2, log_W2 = self.apply_retention(W2, grad2, log_W2) |
| | outputs.append(self.memory_forward_deep(q_t.unsqueeze(1), W1.detach(), W2.detach()).squeeze(1)) |
| |
|
| | return torch.stack(outputs, dim=1) |
| |
|
| |
|
| | class MIRASBlock(nn.Module): |
| | def __init__(self, d_model, memory_type, attentional_bias, retention, ffn_mult=4): |
| | super().__init__() |
| | self.ln1 = nn.LayerNorm(d_model) |
| | self.memory = MIRASLayer(d_model, memory_type, attentional_bias, retention) |
| | self.ln2 = nn.LayerNorm(d_model) |
| | self.ffn = nn.Sequential(nn.Linear(d_model, d_model * ffn_mult), nn.GELU(), nn.Linear(d_model * ffn_mult, d_model)) |
| |
|
| | def forward(self, x): |
| | x = x + self.memory(self.ln1(x)) |
| | return x + self.ffn(self.ln2(x)) |
| |
|
| |
|
| | class MIRASLanguageModel(nn.Module): |
| | def __init__(self, vocab_size, d_model, n_layers, memory_type='deep', attentional_bias='l2', retention='l2', block_size=128): |
| | super().__init__() |
| | self.block_size = block_size |
| | self.token_embedding = nn.Embedding(vocab_size, d_model) |
| | self.position_embedding = nn.Embedding(block_size, d_model) |
| | self.layers = nn.ModuleList([MIRASBlock(d_model, memory_type, attentional_bias, retention) for _ in range(n_layers)]) |
| | self.ln_f = nn.LayerNorm(d_model) |
| | self.lm_head = nn.Linear(d_model, vocab_size, bias=False) |
| | self.token_embedding.weight = self.lm_head.weight |
| | self.apply(self._init_weights) |
| |
|
| | def _init_weights(self, m): |
| | if isinstance(m, nn.Linear): |
| | torch.nn.init.normal_(m.weight, mean=0.0, std=0.02) |
| | if m.bias is not None: |
| | torch.nn.init.zeros_(m.bias) |
| | elif isinstance(m, nn.Embedding): |
| | torch.nn.init.normal_(m.weight, mean=0.0, std=0.02) |
| |
|
| | def forward(self, idx, targets=None): |
| | B, T = idx.shape |
| | x = self.token_embedding(idx) + self.position_embedding(torch.arange(T, device=idx.device)) |
| | for layer in self.layers: |
| | x = layer(x) |
| | logits = self.lm_head(self.ln_f(x)) |
| | loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) if targets is not None else None |
| | return logits, loss |
| |
|
| | @torch.no_grad() |
| | def generate(self, idx, max_new_tokens, temperature=1.0): |
| | for _ in range(max_new_tokens): |
| | logits, _ = self(idx[:, -self.block_size:]) |
| | probs = F.softmax(logits[:, -1, :] / temperature, dim=-1) |
| | idx = torch.cat((idx, torch.multinomial(probs, num_samples=1)), dim=1) |
| | return idx |
| |
|
| |
|
| | def load_miras_model(repo_id_or_path, device='cpu'): |
| | """Load a MIRAS model from HuggingFace Hub or local path.""" |
| | import json |
| | from pathlib import Path |
| |
|
| | if Path(repo_id_or_path).exists(): |
| | base_path = Path(repo_id_or_path) |
| | config_path = base_path / "config.json" |
| | model_path = base_path / "model.pt" |
| | else: |
| | from huggingface_hub import hf_hub_download |
| | config_path = hf_hub_download(repo_id=repo_id_or_path, filename="config.json") |
| | model_path = hf_hub_download(repo_id=repo_id_or_path, filename="model.pt") |
| |
|
| | with open(config_path) as f: |
| | config = json.load(f) |
| |
|
| | model = MIRASLanguageModel( |
| | vocab_size=config['vocab_size'], |
| | d_model=config['d_model'], |
| | n_layers=config['n_layers'], |
| | memory_type=config['memory_type'], |
| | attentional_bias=config['attentional_bias'], |
| | retention=config['retention'], |
| | block_size=config['block_size'], |
| | ) |
| |
|
| | checkpoint = torch.load(model_path, map_location=device) |
| | model.load_state_dict(checkpoint['model_state_dict']) |
| | model.to(device) |
| | model.eval() |
| |
|
| | stoi = {ch: i for i, ch in enumerate(config['chars'])} |
| | itos = {i: ch for i, ch in enumerate(config['chars'])} |
| | encode = lambda s: [stoi[c] for c in s] |
| | decode = lambda l: ''.join([itos[i] for i in l]) |
| |
|
| | return model, encode, decode, config |
| |
|