| import torch |
| import torch.nn.functional as F |
| from typing import Tuple, Optional |
| from torch import nn |
|
|
|
|
| |
| class SwiGLU(nn.Module): |
| """ |
| A Module that encapsulates the SwiGLU activation function, which combines |
| linear transformations with the SiLU (Sigmoid Linear Unit) activation function. |
| |
| Args: |
| in_features (int): Number of features in the input. |
| hidden_features (int): Number of hidden features. |
| out_features (Optional[int], optional): Number of features in the output. |
| If None, it defaults to the number of input features. |
| bias (bool, optional): If True, includes a bias term in the linear layers. |
| Defaults to True. |
| _pack_weights (bool, optional): If True, uses a single linear layer for w1 and w2. |
| Defaults to True. |
| """ |
|
|
| def __init__( |
| self, |
| in_features: int, |
| hidden_features: int, |
| out_features: Optional[int] = None, |
| bias: bool = True, |
| *, |
| _pack_weights: bool = True, |
| ) -> None: |
| super().__init__() |
| out_features = out_features or in_features |
| self._pack_weights = _pack_weights |
| self.hidden_features = hidden_features |
| self.in_features = in_features |
| self.out_features = out_features |
|
|
| if _pack_weights: |
| self.w12 = nn.Linear(in_features, 2 * hidden_features, bias=bias) |
| self.w1 = None |
| self.w2 = None |
| else: |
| self.w12 = None |
| self.w1 = nn.Linear(in_features, hidden_features, bias=bias) |
| self.w2 = nn.Linear(in_features, hidden_features, bias=bias) |
| self.w3 = nn.Linear(hidden_features, out_features, bias=bias) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| """ |
| Applies the SwiGLU activation function to the input tensor. |
| |
| Args: |
| x (torch.Tensor): Input tensor of shape (..., in_features). |
| |
| Returns: |
| torch.Tensor: Output tensor of shape (..., out_features). |
| """ |
| if self._pack_weights and self.w12 is not None: |
| x12 = self.w12(x) |
| x1, x2 = x12.chunk(2, dim=-1) |
| else: |
| assert self.w1 is not None and self.w2 is not None, "Weights w1 and w2 must be initialized." |
| x1 = self.w1(x) |
| x2 = self.w2(x) |
| hidden = F.silu(x1) * x2 |
| return self.w3(hidden) |
|
|
| class RMSNorm(nn.Module): |
| def __init__(self, dim: int, eps: float = 1e-6): |
| """ |
| Initialize the RMSNorm normalization layer. |
| Args: |
| dim (int): The dimension of the input tensor. |
| eps (float, optional): A small value added to the denominator for numerical stability. Default is 1e-6. |
| Attributes: |
| eps (float): A small value added to the denominator for numerical stability. |
| weight (nn.Parameter): Learnable scaling parameter. |
| """ |
| super().__init__() |
| self.eps = eps |
| self.weight = nn.Parameter(torch.ones(dim)) |
|
|
| def _norm(self, x): |
| return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) |
|
|
| def forward(self, x): |
| """ |
| Forward pass through the RMSNorm layer. |
| Args: |
| x (torch.Tensor): The input tensor. |
| Returns: |
| torch.Tensor: The output tensor after applying RMSNorm. |
| """ |
| output = self._norm(x.float()).type_as(x) |
| return output * self.weight |
|
|
| def precompute_freqs_cis(dim: int, end: int, theta: float = 10000.0): |
| """ |
| Precompute the frequency tensor for complex exponentials (cis) with given dimensions. |
| |
| This function calculates a frequency tensor with complex exponentials using the given dimension 'dim' |
| and the end index 'end'. The 'theta' parameter scales the frequencies. |
| The returned tensor contains complex values in complex64 data type. |
| |
| Args: |
| dim (int): Dimension of the frequency tensor. |
| end (int): End index for precomputing frequencies. |
| theta (float, optional): Scaling factor for frequency computation. Defaults to 10000.0. |
| |
| Returns: |
| torch.Tensor: Precomputed frequency tensor with complex exponentials. |
| """ |
|
|
| freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim)) |
| t = torch.arange(end, device=freqs.device) |
| freqs = torch.outer(t, freqs).float() |
| return torch.polar(torch.ones_like(freqs), freqs) |
|
|
|
|
| def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor): |
| """ |
| Reshape frequency tensor for broadcasting it with another tensor. |
| |
| This function reshapes the frequency tensor to have the same shape as the target tensor 'x' |
| for the purpose of broadcasting the frequency tensor during element-wise operations. |
| |
| Args: |
| freqs_cis (torch.Tensor): Frequency tensor to be reshaped. |
| x (torch.Tensor): Target tensor for broadcasting compatibility. |
| |
| Returns: |
| torch.Tensor: Reshaped frequency tensor. |
| |
| Raises: |
| AssertionError: If the frequency tensor doesn't match the expected shape. |
| AssertionError: If the target tensor 'x' doesn't have the expected number of dimensions. |
| """ |
|
|
| ndim = x.ndim |
| assert 0 <= 1 < ndim |
| assert freqs_cis.shape == (x.shape[1], x.shape[-1]) |
| shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)] |
| return freqs_cis.view(*shape) |
|
|
|
|
| def apply_rotary_emb( |
| xq: torch.Tensor, |
| xk: torch.Tensor, |
| freqs_cis: torch.Tensor, |
| ) -> Tuple[torch.Tensor, torch.Tensor]: |
| """ |
| Apply rotary embeddings to input tensors using the given frequency tensor. |
| |
| This function applies rotary embeddings to the given query 'xq' and key 'xk' tensors using the provided |
| frequency tensor 'freqs_cis'. The input tensors are reshaped as complex numbers, and the frequency tensor |
| is reshaped for broadcasting compatibility. The resulting tensors contain rotary embeddings and are |
| returned as real tensors. |
| |
| Args: |
| xq (torch.Tensor): Query tensor to apply rotary embeddings. |
| xk (torch.Tensor): Key tensor to apply rotary embeddings. |
| freqs_cis (torch.Tensor): Precomputed frequency tensor for complex exponentials. |
| |
| Returns: |
| Tuple[torch.Tensor, torch.Tensor]: Tuple of modified query tensor and key tensor with rotary embeddings. |
| """ |
| |
| xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2)) |
| xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2)) |
| freqs_cis = reshape_for_broadcast(freqs_cis, xq_) |
| xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3) |
| xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3) |
| return xq_out.type_as(xq), xk_out.type_as(xk) |
|
|