| | """Simple, minimal implementation of Mamba in one file of PyTorch.
|
| |
|
| | Suggest reading the following before/while reading the code:
|
| | [1] Mamba: Linear-Time Sequence Modeling with Selective State Spaces (Albert Gu and Tri Dao)
|
| | https://arxiv.org/abs/2312.00752
|
| | [2] The Annotated S4 (Sasha Rush and Sidd Karamcheti)
|
| | https://srush.github.io/annotated-s4
|
| |
|
| | Glossary:
|
| | b: batch size (`B` in Mamba paper [1] Algorithm 2)
|
| | l: sequence length (`L` in [1] Algorithm 2)
|
| | d or d_model: hidden dim
|
| | n or d_state: latent state dim (`N` in [1] Algorithm 2)
|
| | expand: expansion factor (`E` in [1] Section 3.4)
|
| | d_in or d_inner: d * expand (`D` in [1] Algorithm 2)
|
| | A, B, C, D: state space parameters (See any state space representation formula)
|
| | (B, C are input-dependent (aka selective, a key innovation in Mamba); A, D are not)
|
| | Δ or delta: input-dependent step size
|
| | dt_rank: rank of Δ (See [1] Section 3.6 "Parameterization of ∆")
|
| |
|
| | """
|
| | from __future__ import annotations
|
| | import math
|
| | import json
|
| | import torch
|
| | import torch.nn as nn
|
| | import torch.nn.functional as F
|
| | from dataclasses import dataclass
|
| | from typing import Union
|
| | from einops import rearrange, repeat, einsum
|
| | from parallel_scan import pscan
|
| |
|
| |
|
| | @dataclass
|
| | class ModelArgs:
|
| | d_model: int
|
| | n_layer: int
|
| | vocab_size: int
|
| | d_state: int = 16
|
| | expand: int = 2
|
| | dt_rank: Union[int, str] = 'auto'
|
| | d_conv: int = 4
|
| | pad_vocab_size_multiple: int = 8
|
| | conv_bias: bool = True
|
| | bias: bool = False
|
| |
|
| | def __post_init__(self):
|
| | self.d_inner = int(self.expand * self.d_model)
|
| |
|
| | if self.dt_rank == 'auto':
|
| | self.dt_rank = math.ceil(self.d_model / 16)
|
| |
|
| | if self.vocab_size % self.pad_vocab_size_multiple != 0:
|
| | self.vocab_size += (self.pad_vocab_size_multiple
|
| | - self.vocab_size % self.pad_vocab_size_multiple)
|
| |
|
| |
|
| | class Mamba(nn.Module):
|
| | def __init__(self, args: ModelArgs):
|
| | """Full Mamba model."""
|
| | super().__init__()
|
| | self.args = args
|
| |
|
| | self.embedding = nn.Embedding(args.vocab_size, args.d_model)
|
| | self.layers = nn.ModuleList([ResidualBlock(args) for _ in range(args.n_layer)])
|
| | self.norm_f = RMSNorm(args.d_model)
|
| |
|
| | self.lm_head = nn.Linear(args.d_model, args.vocab_size, bias=False)
|
| | self.lm_head.weight = self.embedding.weight
|
| |
|
| |
|
| |
|
| | def forward(self, input_ids):
|
| | """
|
| | Args:
|
| | input_ids (long tensor): shape (b, l) (See Glossary at top for definitions of b, l, d_in, n...)
|
| |
|
| | Returns:
|
| | logits: shape (b, l, vocab_size)
|
| |
|
| | Official Implementation:
|
| | class MambaLMHeadModel, https://github.com/state-spaces/mamba/blob/main/mamba_ssm/models/mixer_seq_simple.py#L173
|
| |
|
| | """
|
| | x = self.embedding(input_ids)
|
| |
|
| | for layer in self.layers:
|
| | x = layer(x)
|
| |
|
| | x = self.norm_f(x)
|
| | logits = self.lm_head(x)
|
| |
|
| | return logits
|
| |
|
| |
|
| | @staticmethod
|
| | def from_config(pretrained_model_name: str):
|
| | from transformers.utils import CONFIG_NAME
|
| | from transformers.utils.hub import cached_file
|
| |
|
| | def load_config_hf(model_name):
|
| | resolved_archive_file = cached_file(model_name, CONFIG_NAME,
|
| | _raise_exceptions_for_missing_entries=False)
|
| | return json.load(open(resolved_archive_file))
|
| | config_data = load_config_hf(pretrained_model_name)
|
| | args = ModelArgs(
|
| | d_model=config_data['d_model'],
|
| | n_layer=config_data['n_layer'],
|
| | vocab_size=config_data['vocab_size']
|
| | )
|
| | model = Mamba(args)
|
| | return model
|
| |
|
| |
|
| | @staticmethod
|
| | def from_pretrained(pretrained_model_name: str):
|
| | """Load pretrained weights from HuggingFace into model.
|
| |
|
| | Args:
|
| | pretrained_model_name: One of
|
| | * 'state-spaces/mamba-2.8b-slimpj'
|
| | * 'state-spaces/mamba-2.8b'
|
| | * 'state-spaces/mamba-1.4b'
|
| | * 'state-spaces/mamba-790m'
|
| | * 'state-spaces/mamba-370m'
|
| | * 'state-spaces/mamba-130m'
|
| |
|
| | Returns:
|
| | model: Mamba model with weights loaded
|
| |
|
| | """
|
| | from transformers.utils import WEIGHTS_NAME, CONFIG_NAME
|
| | from transformers.utils.hub import cached_file
|
| |
|
| | def load_config_hf(model_name):
|
| | resolved_archive_file = cached_file(model_name, CONFIG_NAME,
|
| | _raise_exceptions_for_missing_entries=False)
|
| | return json.load(open(resolved_archive_file))
|
| |
|
| |
|
| | def load_state_dict_hf(model_name, device=None, dtype=None):
|
| | resolved_archive_file = cached_file(model_name, WEIGHTS_NAME,
|
| | _raise_exceptions_for_missing_entries=False)
|
| | return torch.load(resolved_archive_file, weights_only=True, map_location='cpu', mmap=True)
|
| |
|
| | config_data = load_config_hf(pretrained_model_name)
|
| | args = ModelArgs(
|
| | d_model=config_data['d_model'],
|
| | n_layer=config_data['n_layer'],
|
| | vocab_size=config_data['vocab_size']
|
| | )
|
| | model = Mamba(args)
|
| |
|
| | state_dict = load_state_dict_hf(pretrained_model_name)
|
| | new_state_dict = {}
|
| | for key in state_dict:
|
| | new_key = key.replace('backbone.', '')
|
| | new_state_dict[new_key] = state_dict[key]
|
| | model.load_state_dict(new_state_dict)
|
| |
|
| | return model
|
| |
|
| |
|
| | class ResidualBlock(nn.Module):
|
| | def __init__(self, args: ModelArgs):
|
| | """Simple block wrapping Mamba block with normalization and residual connection."""
|
| | super().__init__()
|
| | self.args = args
|
| | self.mixer = MambaBlock(args)
|
| | self.norm = RMSNorm(args.d_model)
|
| |
|
| |
|
| | def forward(self, x):
|
| | """
|
| | Args:
|
| | x: shape (b, l, d) (See Glossary at top for definitions of b, l, d_in, n...)
|
| |
|
| | Returns:
|
| | output: shape (b, l, d)
|
| |
|
| | Official Implementation:
|
| | Block.forward(), https://github.com/state-spaces/mamba/blob/main/mamba_ssm/modules/mamba_simple.py#L297
|
| |
|
| | Note: the official repo chains residual blocks that look like
|
| | [Add -> Norm -> Mamba] -> [Add -> Norm -> Mamba] -> [Add -> Norm -> Mamba] -> ...
|
| | where the first Add is a no-op. This is purely for performance reasons as this
|
| | allows them to fuse the Add->Norm.
|
| |
|
| | We instead implement our blocks as the more familiar, simpler, and numerically equivalent
|
| | [Norm -> Mamba -> Add] -> [Norm -> Mamba -> Add] -> [Norm -> Mamba -> Add] -> ....
|
| |
|
| | """
|
| | output = self.mixer(self.norm(x)) + x
|
| |
|
| | return output
|
| |
|
| |
|
| | class MambaBlock(nn.Module):
|
| | def __init__(self, args: ModelArgs):
|
| | """A single Mamba block, as described in Figure 3 in Section 3.4 in the Mamba paper [1]."""
|
| | super().__init__()
|
| | self.args = args
|
| |
|
| | self.in_proj = nn.Linear(args.d_model, args.d_inner * 2, bias=args.bias)
|
| |
|
| | self.conv1d = nn.Conv1d(
|
| | in_channels=args.d_inner,
|
| | out_channels=args.d_inner,
|
| | bias=args.conv_bias,
|
| | kernel_size=args.d_conv,
|
| | groups=args.d_inner,
|
| | padding=args.d_conv - 1,
|
| | )
|
| |
|
| |
|
| | self.x_proj = nn.Linear(args.d_inner, args.dt_rank + args.d_state * 2, bias=False)
|
| |
|
| |
|
| | self.dt_proj = nn.Linear(args.dt_rank, args.d_inner, bias=True)
|
| |
|
| | A = repeat(torch.arange(1, args.d_state + 1), 'n -> d n', d=args.d_inner)
|
| | self.A_log = nn.Parameter(torch.log(A))
|
| | self.D = nn.Parameter(torch.ones(args.d_inner))
|
| | self.out_proj = nn.Linear(args.d_inner, args.d_model, bias=args.bias)
|
| |
|
| |
|
| | def forward(self, x):
|
| | """Mamba block forward. This looks the same as Figure 3 in Section 3.4 in the Mamba paper [1].
|
| |
|
| | Args:
|
| | x: shape (b, l, d) (See Glossary at top for definitions of b, l, d_in, n...)
|
| |
|
| | Returns:
|
| | output: shape (b, l, d)
|
| |
|
| | Official Implementation:
|
| | class Mamba, https://github.com/state-spaces/mamba/blob/main/mamba_ssm/modules/mamba_simple.py#L119
|
| | mamba_inner_ref(), https://github.com/state-spaces/mamba/blob/main/mamba_ssm/ops/selective_scan_interface.py#L311
|
| |
|
| | """
|
| | (b, l, d) = x.shape
|
| |
|
| | x_and_res = self.in_proj(x)
|
| | (x, res) = x_and_res.split(split_size=[self.args.d_inner, self.args.d_inner], dim=-1)
|
| |
|
| | x = rearrange(x, 'b l d_in -> b d_in l')
|
| | x = self.conv1d(x)[:, :, :l]
|
| | x = rearrange(x, 'b d_in l -> b l d_in')
|
| |
|
| | x = F.silu(x)
|
| |
|
| | y = self.ssm(x)
|
| |
|
| | y = y * F.silu(res)
|
| |
|
| | output = self.out_proj(y)
|
| |
|
| | return output
|
| |
|
| |
|
| | def ssm(self, x):
|
| | """Runs the SSM. See:
|
| | - Algorithm 2 in Section 3.2 in the Mamba paper [1]
|
| | - run_SSM(A, B, C, u) in The Annotated S4 [2]
|
| |
|
| | Args:
|
| | x: shape (b, l, d_in) (See Glossary at top for definitions of b, l, d_in, n...)
|
| |
|
| | Returns:
|
| | output: shape (b, l, d_in)
|
| |
|
| | Official Implementation:
|
| | mamba_inner_ref(), https://github.com/state-spaces/mamba/blob/main/mamba_ssm/ops/selective_scan_interface.py#L311
|
| |
|
| | """
|
| | (d_in, n) = self.A_log.shape
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | A = -torch.exp(self.A_log.float())
|
| | D = self.D.float()
|
| |
|
| | x_dbl = self.x_proj(x)
|
| |
|
| | (delta, B, C) = x_dbl.split(split_size=[self.args.dt_rank, n, n], dim=-1)
|
| | delta = F.softplus(self.dt_proj(delta))
|
| |
|
| | y = self.selective_scan(x, delta, A, B, C, D)
|
| |
|
| | return y
|
| |
|
| |
|
| | def selective_scan(self, x, delta, A, B, C, D):
|
| | """Does selective scan algorithm. See:
|
| | - Section 2 State Space Models in the Mamba paper [1]
|
| | - Algorithm 2 in Section 3.2 in the Mamba paper [1]
|
| | - run_SSM(A, B, C, u) in The Annotated S4 [2]
|
| |
|
| | This is the classic discrete state space formula:
|
| | x(t + 1) = Ax(t) + Bu(t)
|
| | y(t) = Cx(t) + Du(t)
|
| | except B and C (and the step size delta, which is used for discretization) are dependent on the input x(t).
|
| |
|
| | Args:
|
| | u: shape (b, l, d_in) (See Glossary at top for definitions of b, l, d_in, n...)
|
| | delta: shape (b, l, d_in)
|
| | A: shape (d_in, n)
|
| | B: shape (b, l, n)
|
| | C: shape (b, l, n)
|
| | D: shape (d_in,)
|
| |
|
| | Returns:
|
| | output: shape (b, l, d_in)
|
| |
|
| | Official Implementation:
|
| | selective_scan_ref(), https://github.com/state-spaces/mamba/blob/main/mamba_ssm/ops/selective_scan_interface.py#L86
|
| | Note: I refactored some parts out of `selective_scan_ref` out, so the functionality doesn't match exactly.
|
| |
|
| | """
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | deltaA = torch.exp(delta.unsqueeze(-1) * A)
|
| | deltaB = delta.unsqueeze(-1) * B.unsqueeze(2)
|
| |
|
| | BX = deltaB * (x.unsqueeze(-1))
|
| |
|
| | hs = pscan(deltaA, BX)
|
| |
|
| | y = (hs @ C.unsqueeze(-1)).squeeze(3)
|
| |
|
| | y = y + D * x
|
| |
|
| | return y
|
| |
|
| |
|
| | class RMSNorm(nn.Module):
|
| | def __init__(self,
|
| | d_model: int,
|
| | eps: float = 1e-5):
|
| | super().__init__()
|
| | self.eps = eps
|
| | self.weight = nn.Parameter(torch.ones(d_model))
|
| |
|
| |
|
| | def forward(self, x):
|
| | output = x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) * self.weight
|
| |
|
| | return output |