import torch import torch.nn as nn device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') class RotaryEmbedding(nn.Module): def __init__(self, head_dim: int, max_seq_len: int, theta: float = 10000.0): super().__init__() # Match RotaryEmbedding exactly self.rot_dim = head_dim // 2 # Only half of head_dim is rotated # Frequency calculation - match RotaryEmbedding exactly freqs = 1.0 / (theta ** (torch.arange(0, self.rot_dim, 2).float() / self.rot_dim)) t = torch.arange(max_seq_len, dtype=torch.float32).unsqueeze(1) freqs = t * freqs.unsqueeze(0) freqs_cis = torch.exp(1j * freqs) cos_vals = freqs_cis.real sin_vals = freqs_cis.imag self.register_buffer('cos_cache', cos_vals, persistent=False) self.register_buffer('sin_cache', sin_vals, persistent=False) def apply(self, x: torch.Tensor, position_ids: torch.Tensor) -> torch.Tensor: """ Now works with [..., heads, seq_len, head_dim] - no permutation needed! """ d = self.rot_dim // 2 # Get cos/sin for positions - shape: [seq_len, d] cos = self.cos_cache[position_ids] # [seq_len, d] sin = self.sin_cache[position_ids] # [seq_len, d] # Broadcast to match x: [..., heads, seq_len, d] cos = cos.unsqueeze(0).unsqueeze(0) # [1, 1, seq_len, d] sin = sin.unsqueeze(0).unsqueeze(0) # [1, 1, seq_len, d] # Split rotated part xq_r = x[..., :d] # [..., heads, seq_len, d] xq_i = x[..., d:d*2] # [..., heads, seq_len, d] # Apply rotation xq_out_r = xq_r * cos - xq_i * sin xq_out_i = xq_r * sin + xq_i * cos # Update in-place x[..., :self.rot_dim] = torch.stack([xq_out_r, xq_out_i], dim=-1).view(*x.shape[:-1], self.rot_dim) return x