| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | from typing import Optional, Tuple |
| | import math |
| | from transformers import PretrainedConfig, PreTrainedModel |
| |
|
| | class MultiHeadAttention(nn.Module): |
| | def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1): |
| | super().__init__() |
| | assert d_model % num_heads == 0 |
| |
|
| | self.d_model = d_model |
| | self.num_heads = num_heads |
| | self.head_dim = d_model // num_heads |
| |
|
| | self.q_proj = nn.Linear(d_model, d_model) |
| | self.k_proj = nn.Linear(d_model, d_model) |
| | self.v_proj = nn.Linear(d_model, d_model) |
| | self.o_proj = nn.Linear(d_model, d_model) |
| |
|
| | self.dropout = nn.Dropout(dropout) |
| |
|
| | def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
| | batch_size, seq_len, d_model = x.shape |
| |
|
| | |
| | q = self.q_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) |
| | k = self.k_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) |
| | v = self.v_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) |
| |
|
| | |
| | q = q.transpose(1, 2) |
| | k = k.transpose(1, 2) |
| | v = v.transpose(1, 2) |
| |
|
| | |
| | scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) |
| |
|
| | if mask is not None: |
| | scores = scores.masked_fill(mask == 0, float('-inf')) |
| |
|
| | attn_weights = F.softmax(scores, dim=-1) |
| | attn_weights = self.dropout(attn_weights) |
| |
|
| | |
| | out = torch.matmul(attn_weights, v) |
| | out = out.transpose(1, 2) |
| | out = out.reshape(batch_size, seq_len, d_model) |
| |
|
| | return self.o_proj(out) |
| |
|
| | class PreludeBlock(nn.Module): |
| | def __init__(self, vocab_size: int, d_model: int, num_heads: int, dropout: float = 0.1): |
| | super().__init__() |
| | self.token_embedding = nn.Embedding(vocab_size, d_model) |
| | self.pos_encoding = nn.Parameter(torch.zeros(1, 1024, d_model)) |
| | self.attention = MultiHeadAttention(d_model, num_heads, dropout) |
| | self.norm1 = nn.LayerNorm(d_model) |
| | self.norm2 = nn.LayerNorm(d_model) |
| | self.feed_forward = nn.Sequential( |
| | nn.Linear(d_model, 4 * d_model), |
| | nn.GELU(), |
| | nn.Linear(4 * d_model, d_model), |
| | nn.Dropout(dropout) |
| | ) |
| |
|
| | def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
| | seq_len = x.size(1) |
| | |
| | x = self.token_embedding(x) + self.pos_encoding[:, :seq_len, :] |
| |
|
| | |
| | attended = self.attention(self.norm1(x), mask) |
| | x = x + attended |
| |
|
| | |
| | x = x + self.feed_forward(self.norm2(x)) |
| | return x |
| |
|
| | class RecurrentBlock(nn.Module): |
| | def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1): |
| | super().__init__() |
| | self.attention = MultiHeadAttention(d_model, num_heads, dropout) |
| | self.norm1 = nn.LayerNorm(d_model) |
| | self.norm2 = nn.LayerNorm(d_model) |
| | self.feed_forward = nn.Sequential( |
| | nn.Linear(d_model, 4 * d_model), |
| | nn.GELU(), |
| | nn.Linear(4 * d_model, d_model), |
| | nn.Dropout(dropout) |
| | ) |
| |
|
| | |
| | self.state_proj = nn.Linear(d_model, d_model) |
| |
|
| | def forward(self, x: torch.Tensor, recurrent_state: torch.Tensor, |
| | mask: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, torch.Tensor]: |
| | |
| | recurrent_state = self.state_proj(recurrent_state) |
| |
|
| | |
| | x = x + recurrent_state |
| |
|
| | |
| | attended = self.attention(self.norm1(x), mask) |
| | x = x + attended |
| |
|
| | |
| | x = x + self.feed_forward(self.norm2(x)) |
| |
|
| | return x, x |
| |
|
| | class CodaBlock(nn.Module): |
| | def __init__(self, d_model: int, vocab_size: int): |
| | super().__init__() |
| | self.norm = nn.LayerNorm(d_model) |
| | self.output_proj = nn.Linear(d_model, vocab_size) |
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor: |
| | x = self.norm(x) |
| | return self.output_proj(x) |
| |
|
| | class LatentRecurrentDepthLM(nn.Module): |
| | def __init__(self, vocab_size: int, d_model: int, num_heads: int, dropout: float = 0.1): |
| | super().__init__() |
| | self.prelude = PreludeBlock(vocab_size, d_model, num_heads, dropout) |
| | self.recurrent = RecurrentBlock(d_model, num_heads, dropout) |
| | self.coda = CodaBlock(d_model, vocab_size) |
| |
|
| | def forward(self, x: torch.Tensor, num_iterations: int, |
| | mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
| | |
| | hidden = self.prelude(x, mask) |
| |
|
| | |
| | recurrent_state = torch.zeros_like(hidden) |
| |
|
| | |
| | for _ in range(num_iterations): |
| | hidden, recurrent_state = self.recurrent(hidden, recurrent_state, mask) |
| |
|
| | |
| | return self.coda(hidden) |
| |
|
| |
|
| |
|
| | |
| | class LatentRecurrentDepthConfig(PretrainedConfig): |
| | model_type = "latent_recurrent_depth" |
| |
|
| | def __init__(self, vocab_size=50257, d_model=768, num_heads=12, dropout=0.1, **kwargs): |
| | super().__init__(**kwargs) |
| | self.vocab_size = vocab_size |
| | self.d_model = d_model |
| | self.num_heads = num_heads |
| | self.dropout = dropout |
| |
|
| |
|
| | |
| | class LatentRecurrentDepthModel(PreTrainedModel): |
| | config_class = LatentRecurrentDepthConfig |
| | base_model_prefix = "latent_recurrent_depth" |
| |
|
| | def __init__(self, config: LatentRecurrentDepthConfig): |
| | super().__init__(config) |
| | self.latent_model = LatentRecurrentDepthLM(config.vocab_size, config.d_model, config.num_heads, config.dropout) |
| | self.init_weights() |
| |
|
| | def forward(self, input_ids: torch.Tensor, num_iterations: int, mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
| | return self.latent_model(input_ids, num_iterations, mask) |
| |
|
| | def generate( |
| | self, |
| | input_ids: torch.Tensor, |
| | max_length: int = 20, |
| | num_iterations: int = 3, |
| | temperature: float = 1.0, |
| | top_k: Optional[int] = 50, |
| | ) -> torch.Tensor: |
| | """ |
| | Generate a sequence of tokens given input_ids. |
| | |
| | Args: |
| | input_ids: torch.Tensor of shape (batch, seq_length) containing the prompt. |
| | max_length: The number of tokens to generate. |
| | num_iterations: The number of recurrent iterations to use in each forward pass. |
| | temperature: Temperature for scaling logits. |
| | top_k: If set, only sample from the top k tokens. |
| | |
| | Returns: |
| | generated: torch.Tensor containing the generated sequence. |
| | """ |
| | generated = input_ids.clone() |
| | self.eval() |
| | with torch.no_grad(): |
| | for _ in range(max_length): |
| | |
| | logits = self.forward(generated, num_iterations=num_iterations) |
| | |
| | next_token_logits = logits[:, -1, :] / temperature |
| | if top_k is not None: |
| | |
| | top_k_logits, top_k_indices = torch.topk(next_token_logits, top_k) |
| | probabilities = F.softmax(top_k_logits, dim=-1) |
| | next_token = top_k_indices.gather(-1, torch.multinomial(probabilities, num_samples=1)) |
| | else: |
| | probabilities = F.softmax(next_token_logits, dim=-1) |
| | next_token = torch.multinomial(probabilities, num_samples=1) |
| | generated = torch.cat([generated, next_token], dim=1) |
| | |
| | if next_token.item() == self.config.eos_token_id: |
| | break |
| | return generated |
| |
|