"""MIRAS Language Model - Custom Architecture""" import torch import torch.nn as nn import torch.nn.functional as F from typing import Optional def l2_loss(pred, target): return 0.5 * ((pred - target) ** 2).sum(dim=-1) def lp_loss(pred, target, p=3): return (torch.abs(pred - target) ** p).sum(dim=-1) def huber_loss(pred, target, delta): diff = pred - target abs_diff = torch.abs(diff) return torch.where(abs_diff <= delta, 0.5 * diff ** 2, delta * (abs_diff - 0.5 * delta)).sum(dim=-1) def l2_retention_update(W, grad, alpha, eta): return alpha * W - eta * grad def kl_retention_update(log_W, grad, alpha, eta, c=1.0): log_W_new = alpha * log_W - eta * grad return log_W_new, c * F.softmax(log_W_new, dim=-1) def elastic_net_update(W, grad, lambda_decay, zeta_lr, gamma_l1): z = lambda_decay * W - zeta_lr * grad return torch.sign(z) * F.relu(torch.abs(z) - gamma_l1) class KeyValueProjection(nn.Module): def __init__(self, d_in, d_out): super().__init__() self.W_K = nn.Linear(d_in, d_out, bias=False) self.W_V = nn.Linear(d_in, d_out, bias=False) self.W_Q = nn.Linear(d_in, d_out, bias=False) def forward(self, x): return self.W_K(x), self.W_V(x), self.W_Q(x) class MIRASLayer(nn.Module): def __init__(self, d, memory_type='deep', attentional_bias='l2', retention='l2', expansion=4, p=3, q=4): super().__init__() self.d, self.memory_type, self.attentional_bias, self.retention = d, memory_type, attentional_bias, retention self.p, self.q = p, q self.kv_proj = KeyValueProjection(d, d) if memory_type == 'linear': self.register_buffer('M_init', torch.zeros(d, d)) else: self.W1_init = nn.Parameter(torch.randn(d, d * expansion) * 0.02) self.W2_init = nn.Parameter(torch.randn(d * expansion, d) * 0.02) self.ln = nn.LayerNorm(d) if attentional_bias == 'huber': self.delta_proj = nn.Linear(d, 1) self.alpha = nn.Parameter(torch.ones(1) * 0.9) self.eta = nn.Parameter(torch.ones(1) * 0.1) if retention == 'kl': self.c = nn.Parameter(torch.ones(1)) if retention == 'elastic': self.gamma = nn.Parameter(torch.ones(1) * 0.01) def memory_forward_deep(self, x, W1, W2): h = F.gelu(x @ W2.transpose(-2, -1)) return x + self.ln(h @ W1.transpose(-2, -1)) def get_loss(self, pred, target, x_t=None): if self.attentional_bias == 'l2': return l2_loss(pred, target).sum() elif self.attentional_bias == 'lp': return lp_loss(pred, target, self.p).sum() else: return huber_loss(pred, target, F.softplus(self.delta_proj(x_t))).sum() def apply_retention(self, W, grad, log_W=None): alpha, eta = torch.sigmoid(self.alpha), F.softplus(self.eta) if self.retention == 'l2': return l2_retention_update(W, grad, alpha, eta), None elif self.retention == 'kl': log_W = log_W if log_W is not None else torch.log(W.clamp(min=1e-10)) log_W_new, W_new = kl_retention_update(log_W, grad, alpha, eta, self.c) return W_new, log_W_new else: return elastic_net_update(W, grad, alpha, eta, self.gamma), None def forward(self, x): k, v, q = self.kv_proj(x) B, T, D = k.shape outputs = [] with torch.enable_grad(): if self.memory_type == 'linear': M = self.M_init.unsqueeze(0).expand(B, -1, -1).contiguous() for t in range(T): k_t, v_t, q_t = k[:, t], v[:, t], q[:, t] M_leaf = M.detach().requires_grad_(True) pred = torch.einsum('bde,be->bd', M_leaf, k_t) loss = self.get_loss(pred, v_t, x[:, t] if self.attentional_bias == 'huber' else None) grad = torch.autograd.grad(loss, M_leaf)[0] M, _ = self.apply_retention(M, grad) outputs.append(torch.einsum('bde,be->bd', M, q_t)) else: W1 = self.W1_init.unsqueeze(0).expand(B, -1, -1).contiguous() W2 = self.W2_init.unsqueeze(0).expand(B, -1, -1).contiguous() log_W1, log_W2 = None, None if self.retention == 'kl': W1, W2 = F.softmax(W1, dim=-1), F.softmax(W2, dim=-1) log_W1, log_W2 = torch.log(W1.clamp(min=1e-10)), torch.log(W2.clamp(min=1e-10)) for t in range(T): k_t, v_t, q_t = k[:, t], v[:, t], q[:, t] W1_leaf, W2_leaf = W1.detach().requires_grad_(True), W2.detach().requires_grad_(True) pred = self.memory_forward_deep(k_t.unsqueeze(1), W1_leaf, W2_leaf).squeeze(1) loss = self.get_loss(pred, v_t, x[:, t] if self.attentional_bias == 'huber' else None) grad1, grad2 = torch.autograd.grad(loss, [W1_leaf, W2_leaf]) W1, log_W1 = self.apply_retention(W1, grad1, log_W1) W2, log_W2 = self.apply_retention(W2, grad2, log_W2) outputs.append(self.memory_forward_deep(q_t.unsqueeze(1), W1.detach(), W2.detach()).squeeze(1)) return torch.stack(outputs, dim=1) class MIRASBlock(nn.Module): def __init__(self, d_model, memory_type, attentional_bias, retention, ffn_mult=4): super().__init__() self.ln1 = nn.LayerNorm(d_model) self.memory = MIRASLayer(d_model, memory_type, attentional_bias, retention) self.ln2 = nn.LayerNorm(d_model) self.ffn = nn.Sequential(nn.Linear(d_model, d_model * ffn_mult), nn.GELU(), nn.Linear(d_model * ffn_mult, d_model)) def forward(self, x): x = x + self.memory(self.ln1(x)) return x + self.ffn(self.ln2(x)) class MIRASLanguageModel(nn.Module): def __init__(self, vocab_size, d_model, n_layers, memory_type='deep', attentional_bias='l2', retention='l2', block_size=128): super().__init__() self.block_size = block_size self.token_embedding = nn.Embedding(vocab_size, d_model) self.position_embedding = nn.Embedding(block_size, d_model) self.layers = nn.ModuleList([MIRASBlock(d_model, memory_type, attentional_bias, retention) for _ in range(n_layers)]) self.ln_f = nn.LayerNorm(d_model) self.lm_head = nn.Linear(d_model, vocab_size, bias=False) self.token_embedding.weight = self.lm_head.weight self.apply(self._init_weights) def _init_weights(self, m): if isinstance(m, nn.Linear): torch.nn.init.normal_(m.weight, mean=0.0, std=0.02) if m.bias is not None: torch.nn.init.zeros_(m.bias) elif isinstance(m, nn.Embedding): torch.nn.init.normal_(m.weight, mean=0.0, std=0.02) def forward(self, idx, targets=None): B, T = idx.shape x = self.token_embedding(idx) + self.position_embedding(torch.arange(T, device=idx.device)) for layer in self.layers: x = layer(x) logits = self.lm_head(self.ln_f(x)) loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) if targets is not None else None return logits, loss @torch.no_grad() def generate(self, idx, max_new_tokens, temperature=1.0): for _ in range(max_new_tokens): logits, _ = self(idx[:, -self.block_size:]) probs = F.softmax(logits[:, -1, :] / temperature, dim=-1) idx = torch.cat((idx, torch.multinomial(probs, num_samples=1)), dim=1) return idx def load_miras_model(repo_id_or_path, device='cpu'): """Load a MIRAS model from HuggingFace Hub or local path.""" import json from pathlib import Path if Path(repo_id_or_path).exists(): base_path = Path(repo_id_or_path) config_path = base_path / "config.json" model_path = base_path / "model.pt" else: from huggingface_hub import hf_hub_download config_path = hf_hub_download(repo_id=repo_id_or_path, filename="config.json") model_path = hf_hub_download(repo_id=repo_id_or_path, filename="model.pt") with open(config_path) as f: config = json.load(f) model = MIRASLanguageModel( vocab_size=config['vocab_size'], d_model=config['d_model'], n_layers=config['n_layers'], memory_type=config['memory_type'], attentional_bias=config['attentional_bias'], retention=config['retention'], block_size=config['block_size'], ) checkpoint = torch.load(model_path, map_location=device) model.load_state_dict(checkpoint['model_state_dict']) model.to(device) model.eval() stoi = {ch: i for i, ch in enumerate(config['chars'])} itos = {i: ch for i, ch in enumerate(config['chars'])} encode = lambda s: [stoi[c] for c in s] decode = lambda l: ''.join([itos[i] for i in l]) return model, encode, decode, config