| import math |
| import torch |
| import torch.nn as nn |
| from einops import rearrange, repeat |
| from einops.layers.torch import EinMix |
| from transformers import PreTrainedModel, PretrainedConfig |
|
|
| |
| |
| |
| class TRMConfig(PretrainedConfig): |
| model_type = "trm" |
|
|
| def __init__(self, |
| vocab_size=32000, |
| hidden_size=256, |
| seq_len=128, |
| depth_L=2, |
| depth_H=2, |
| act_threshold=0.9, |
| act_epsilon=1e-2, |
| **kwargs): |
| super().__init__(**kwargs) |
| self.vocab_size = vocab_size |
| self.hidden_size = hidden_size |
| self.seq_len = seq_len |
| self.depth_L = depth_L |
| self.depth_H = depth_H |
| self.act_threshold = act_threshold |
| self.act_epsilon = act_epsilon |
|
|
|
|
| |
| |
| |
| class HaltingBlock(nn.Module): |
| def __init__(self, hidden_size, act_threshold, act_epsilon): |
| super().__init__() |
| self.proj = nn.Linear(hidden_size, hidden_size) |
| self.act_proj = nn.Linear(hidden_size, 1) |
| self.act_threshold = act_threshold |
| self.act_epsilon = act_epsilon |
|
|
| def forward(self, x): |
| halting_probs = torch.sigmoid(self.act_proj(x)) |
| remainders = torch.zeros_like(halting_probs) |
| n_updates = torch.zeros_like(halting_probs) |
| still_running = torch.ones_like(halting_probs, dtype=torch.bool) |
| accumulated_output = torch.zeros_like(x) |
| accumulated_prob = torch.zeros_like(halting_probs) |
|
|
| while still_running.any(): |
| p = torch.where(still_running, halting_probs, torch.zeros_like(halting_probs)) |
| new_accum = accumulated_prob + p |
|
|
| still_running = new_accum < self.act_threshold |
| remainder = torch.where(still_running, torch.zeros_like(halting_probs), 1 - accumulated_prob) |
|
|
| update_weights = torch.where(still_running, p, remainder) |
| accumulated_output += update_weights * torch.tanh(self.proj(x)) |
| accumulated_prob += update_weights |
| n_updates += still_running.float() |
|
|
| if (1 - accumulated_prob).mean() < self.act_epsilon: |
| break |
|
|
| return accumulated_output, accumulated_prob.mean() |
|
|
|
|
| class TRMLayer(nn.Module): |
| def __init__(self, hidden_size, depth_H, act_threshold, act_epsilon): |
| super().__init__() |
| self.blocks = nn.ModuleList([ |
| HaltingBlock(hidden_size, act_threshold, act_epsilon) for _ in range(depth_H) |
| ]) |
| self.norm = nn.LayerNorm(hidden_size) |
|
|
| def forward(self, x): |
| for block in self.blocks: |
| x, _ = block(x) |
| return self.norm(x) |
|
|
|
|
| class TRM(PreTrainedModel): |
| config_class = TRMConfig |
|
|
| def __init__(self, config): |
| super().__init__(config) |
| self.emb = nn.Embedding(config.vocab_size, config.hidden_size) |
| self.pos_emb = nn.Parameter(torch.zeros(1, config.seq_len, config.hidden_size)) |
| self.layers = nn.ModuleList([ |
| TRMLayer(config.hidden_size, config.depth_H, config.act_threshold, config.act_epsilon) |
| for _ in range(config.depth_L) |
| ]) |
| self.norm = nn.LayerNorm(config.hidden_size) |
| self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
|
|
| self.post_init() |
|
|
| def forward(self, input_ids, labels=None): |
| x = self.emb(input_ids) + self.pos_emb[:, :input_ids.size(1), :] |
| for layer in self.layers: |
| x = layer(x) |
| x = self.norm(x) |
| logits = self.lm_head(x) |
|
|
| loss = None |
| if labels is not None: |
| shift_logits = logits[..., :-1, :].contiguous() |
| shift_labels = labels[..., 1:].contiguous() |
| loss_fct = nn.CrossEntropyLoss() |
| loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)) |
|
|
| return {"loss": loss, "logits": logits} |
|
|
|
|
| |
| |
| |
| from transformers import AutoConfig, AutoModel |
|
|
| AutoConfig.register("trm", TRMConfig) |
| AutoModel.register(TRMConfig, TRM) |
|
|