miras-shakespeare / modeling_miras.py
av-codes's picture
Upload folder using huggingface_hub
1ee2101 verified
"""MIRAS Language Model - Custom Architecture"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional
def l2_loss(pred, target):
return 0.5 * ((pred - target) ** 2).sum(dim=-1)
def lp_loss(pred, target, p=3):
return (torch.abs(pred - target) ** p).sum(dim=-1)
def huber_loss(pred, target, delta):
diff = pred - target
abs_diff = torch.abs(diff)
return torch.where(abs_diff <= delta, 0.5 * diff ** 2, delta * (abs_diff - 0.5 * delta)).sum(dim=-1)
def l2_retention_update(W, grad, alpha, eta):
return alpha * W - eta * grad
def kl_retention_update(log_W, grad, alpha, eta, c=1.0):
log_W_new = alpha * log_W - eta * grad
return log_W_new, c * F.softmax(log_W_new, dim=-1)
def elastic_net_update(W, grad, lambda_decay, zeta_lr, gamma_l1):
z = lambda_decay * W - zeta_lr * grad
return torch.sign(z) * F.relu(torch.abs(z) - gamma_l1)
class KeyValueProjection(nn.Module):
def __init__(self, d_in, d_out):
super().__init__()
self.W_K = nn.Linear(d_in, d_out, bias=False)
self.W_V = nn.Linear(d_in, d_out, bias=False)
self.W_Q = nn.Linear(d_in, d_out, bias=False)
def forward(self, x):
return self.W_K(x), self.W_V(x), self.W_Q(x)
class MIRASLayer(nn.Module):
def __init__(self, d, memory_type='deep', attentional_bias='l2', retention='l2', expansion=4, p=3, q=4):
super().__init__()
self.d, self.memory_type, self.attentional_bias, self.retention = d, memory_type, attentional_bias, retention
self.p, self.q = p, q
self.kv_proj = KeyValueProjection(d, d)
if memory_type == 'linear':
self.register_buffer('M_init', torch.zeros(d, d))
else:
self.W1_init = nn.Parameter(torch.randn(d, d * expansion) * 0.02)
self.W2_init = nn.Parameter(torch.randn(d * expansion, d) * 0.02)
self.ln = nn.LayerNorm(d)
if attentional_bias == 'huber':
self.delta_proj = nn.Linear(d, 1)
self.alpha = nn.Parameter(torch.ones(1) * 0.9)
self.eta = nn.Parameter(torch.ones(1) * 0.1)
if retention == 'kl':
self.c = nn.Parameter(torch.ones(1))
if retention == 'elastic':
self.gamma = nn.Parameter(torch.ones(1) * 0.01)
def memory_forward_deep(self, x, W1, W2):
h = F.gelu(x @ W2.transpose(-2, -1))
return x + self.ln(h @ W1.transpose(-2, -1))
def get_loss(self, pred, target, x_t=None):
if self.attentional_bias == 'l2':
return l2_loss(pred, target).sum()
elif self.attentional_bias == 'lp':
return lp_loss(pred, target, self.p).sum()
else:
return huber_loss(pred, target, F.softplus(self.delta_proj(x_t))).sum()
def apply_retention(self, W, grad, log_W=None):
alpha, eta = torch.sigmoid(self.alpha), F.softplus(self.eta)
if self.retention == 'l2':
return l2_retention_update(W, grad, alpha, eta), None
elif self.retention == 'kl':
log_W = log_W if log_W is not None else torch.log(W.clamp(min=1e-10))
log_W_new, W_new = kl_retention_update(log_W, grad, alpha, eta, self.c)
return W_new, log_W_new
else:
return elastic_net_update(W, grad, alpha, eta, self.gamma), None
def forward(self, x):
k, v, q = self.kv_proj(x)
B, T, D = k.shape
outputs = []
with torch.enable_grad():
if self.memory_type == 'linear':
M = self.M_init.unsqueeze(0).expand(B, -1, -1).contiguous()
for t in range(T):
k_t, v_t, q_t = k[:, t], v[:, t], q[:, t]
M_leaf = M.detach().requires_grad_(True)
pred = torch.einsum('bde,be->bd', M_leaf, k_t)
loss = self.get_loss(pred, v_t, x[:, t] if self.attentional_bias == 'huber' else None)
grad = torch.autograd.grad(loss, M_leaf)[0]
M, _ = self.apply_retention(M, grad)
outputs.append(torch.einsum('bde,be->bd', M, q_t))
else:
W1 = self.W1_init.unsqueeze(0).expand(B, -1, -1).contiguous()
W2 = self.W2_init.unsqueeze(0).expand(B, -1, -1).contiguous()
log_W1, log_W2 = None, None
if self.retention == 'kl':
W1, W2 = F.softmax(W1, dim=-1), F.softmax(W2, dim=-1)
log_W1, log_W2 = torch.log(W1.clamp(min=1e-10)), torch.log(W2.clamp(min=1e-10))
for t in range(T):
k_t, v_t, q_t = k[:, t], v[:, t], q[:, t]
W1_leaf, W2_leaf = W1.detach().requires_grad_(True), W2.detach().requires_grad_(True)
pred = self.memory_forward_deep(k_t.unsqueeze(1), W1_leaf, W2_leaf).squeeze(1)
loss = self.get_loss(pred, v_t, x[:, t] if self.attentional_bias == 'huber' else None)
grad1, grad2 = torch.autograd.grad(loss, [W1_leaf, W2_leaf])
W1, log_W1 = self.apply_retention(W1, grad1, log_W1)
W2, log_W2 = self.apply_retention(W2, grad2, log_W2)
outputs.append(self.memory_forward_deep(q_t.unsqueeze(1), W1.detach(), W2.detach()).squeeze(1))
return torch.stack(outputs, dim=1)
class MIRASBlock(nn.Module):
def __init__(self, d_model, memory_type, attentional_bias, retention, ffn_mult=4):
super().__init__()
self.ln1 = nn.LayerNorm(d_model)
self.memory = MIRASLayer(d_model, memory_type, attentional_bias, retention)
self.ln2 = nn.LayerNorm(d_model)
self.ffn = nn.Sequential(nn.Linear(d_model, d_model * ffn_mult), nn.GELU(), nn.Linear(d_model * ffn_mult, d_model))
def forward(self, x):
x = x + self.memory(self.ln1(x))
return x + self.ffn(self.ln2(x))
class MIRASLanguageModel(nn.Module):
def __init__(self, vocab_size, d_model, n_layers, memory_type='deep', attentional_bias='l2', retention='l2', block_size=128):
super().__init__()
self.block_size = block_size
self.token_embedding = nn.Embedding(vocab_size, d_model)
self.position_embedding = nn.Embedding(block_size, d_model)
self.layers = nn.ModuleList([MIRASBlock(d_model, memory_type, attentional_bias, retention) for _ in range(n_layers)])
self.ln_f = nn.LayerNorm(d_model)
self.lm_head = nn.Linear(d_model, vocab_size, bias=False)
self.token_embedding.weight = self.lm_head.weight
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
torch.nn.init.normal_(m.weight, mean=0.0, std=0.02)
if m.bias is not None:
torch.nn.init.zeros_(m.bias)
elif isinstance(m, nn.Embedding):
torch.nn.init.normal_(m.weight, mean=0.0, std=0.02)
def forward(self, idx, targets=None):
B, T = idx.shape
x = self.token_embedding(idx) + self.position_embedding(torch.arange(T, device=idx.device))
for layer in self.layers:
x = layer(x)
logits = self.lm_head(self.ln_f(x))
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) if targets is not None else None
return logits, loss
@torch.no_grad()
def generate(self, idx, max_new_tokens, temperature=1.0):
for _ in range(max_new_tokens):
logits, _ = self(idx[:, -self.block_size:])
probs = F.softmax(logits[:, -1, :] / temperature, dim=-1)
idx = torch.cat((idx, torch.multinomial(probs, num_samples=1)), dim=1)
return idx
def load_miras_model(repo_id_or_path, device='cpu'):
"""Load a MIRAS model from HuggingFace Hub or local path."""
import json
from pathlib import Path
if Path(repo_id_or_path).exists():
base_path = Path(repo_id_or_path)
config_path = base_path / "config.json"
model_path = base_path / "model.pt"
else:
from huggingface_hub import hf_hub_download
config_path = hf_hub_download(repo_id=repo_id_or_path, filename="config.json")
model_path = hf_hub_download(repo_id=repo_id_or_path, filename="model.pt")
with open(config_path) as f:
config = json.load(f)
model = MIRASLanguageModel(
vocab_size=config['vocab_size'],
d_model=config['d_model'],
n_layers=config['n_layers'],
memory_type=config['memory_type'],
attentional_bias=config['attentional_bias'],
retention=config['retention'],
block_size=config['block_size'],
)
checkpoint = torch.load(model_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()
stoi = {ch: i for i, ch in enumerate(config['chars'])}
itos = {i: ch for i, ch in enumerate(config['chars'])}
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])
return model, encode, decode, config