import torch import torch.nn as nn import torch.nn.functional as F import math from transformers import PreTrainedModel try: from .configuration_pillars import PillarsConfig except ImportError: from configuration_pillars import PillarsConfig try: from x_transformers import Encoder except ImportError: raise ImportError("To use PILLARS, you must run: pip install x-transformers") # --- UTILS --- class ComplexDropout(nn.Module): def __init__(self, p=0.5): super().__init__() self.p = p def forward(self, z): if not self.training or self.p == 0.0: return z mask = torch.ones_like(z.real) mask = F.dropout(mask, self.p, self.training, inplace=False) return z * mask class RobustPhaseNorm(nn.Module): def __init__(self, d_model, eps=1e-5): super().__init__() self.scale = nn.Parameter(torch.ones(d_model)) self.eps = eps def forward(self, x): mag = torch.abs(x) rms = torch.sqrt(torch.mean(mag**2, dim=-1, keepdim=True) + self.eps) return (x / rms) * self.scale class ModReLU(nn.Module): def __init__(self, features): super().__init__() self.b = nn.Parameter(torch.zeros(features)) def forward(self, z): mag = torch.abs(z) new_mag = F.relu(mag + self.b) phase = z / (mag + 1e-6) return new_mag * phase class ComplexToRealBridge(nn.Module): def __init__(self, d_model): super().__init__() self.proj = nn.Linear(d_model * 2, d_model) self.norm = nn.LayerNorm(d_model) def forward(self, x_complex): cat = torch.cat([x_complex.real, x_complex.imag], dim=-1) return self.norm(self.proj(cat)) # --- COMPONENTS --- class DynamicRoSE(nn.Module): def __init__(self, num_embeddings, embedding_dim, max_period=10000.0): super().__init__() self.embedding_dim = embedding_dim self.raw_embedding = nn.Embedding(num_embeddings, embedding_dim) self.adapter = nn.Linear(embedding_dim, embedding_dim * 2) freqs = torch.exp(torch.arange(0, embedding_dim, dtype=torch.float32) * -(math.log(max_period) / embedding_dim)) self.register_buffer('freqs', freqs) self.rotation_predictor = nn.Linear(embedding_dim, embedding_dim * 2) def forward(self, input_ids): real_base = self.raw_embedding(input_ids) B, L, D = real_base.shape complex_params = self.adapter(real_base) z_t = torch.complex(complex_params[..., :D], complex_params[..., D:]) rot_raw = self.rotation_predictor(real_base) rot_x, rot_y = rot_raw.chunk(2, dim=-1) rot_mag = torch.sqrt(rot_x**2 + rot_y**2 + 1e-6) dynamic_rot = torch.complex(rot_x / rot_mag, rot_y / rot_mag) pos = torch.arange(L, device=input_ids.device).float() static_angles = torch.outer(pos, self.freqs) static_rot = torch.polar(torch.ones_like(static_angles), static_angles) z_final = z_t * static_rot.unsqueeze(0) * dynamic_rot return z_final, real_base class HyenaNeuralFilter(nn.Module): def __init__(self, d_model, max_len=1024, hidden_dim=64): super().__init__() self.d_model = d_model freqs = torch.exp(torch.arange(0, hidden_dim, 2, dtype=torch.float32) * -(math.log(10000.0) / hidden_dim)) self.register_buffer("freqs", freqs) self.mlp = nn.Sequential( nn.Linear(hidden_dim, hidden_dim), nn.SiLU(), nn.Linear(hidden_dim, hidden_dim), nn.SiLU(), nn.Linear(hidden_dim, d_model * 2) ) def forward(self, L, device): t = torch.linspace(0, 1, steps=L, device=device).unsqueeze(-1) emb = torch.cat([torch.sin(t * self.freqs), torch.cos(t * self.freqs)], dim=-1) out = self.mlp(emb).view(L, self.d_model, 2) return torch.complex(out[..., 0], out[..., 1]) class GatedHarmonicConvolution(nn.Module): def __init__(self, d_model, max_len=1024, dropout=0.1): super().__init__() self.d_model = d_model self.filter_len = max_len self.neural_filter = HyenaNeuralFilter(d_model, max_len=max_len) self.gate_proj = nn.Linear(d_model * 2, d_model * 2) self.mix_real = nn.Linear(d_model, d_model) self.mix_imag = nn.Linear(d_model, d_model) self.out_real = nn.Linear(d_model, d_model) self.out_imag = nn.Linear(d_model, d_model) self.activation = ModReLU(d_model) self.norm = RobustPhaseNorm(d_model) self.dropout = ComplexDropout(dropout) def forward(self, x, src_mask=None): residual = x x_norm = self.norm(x) if src_mask is not None: x_norm = x_norm.masked_fill(src_mask.unsqueeze(-1), 0.0) B, L, D = x_norm.shape eff_L = min(L, self.filter_len) x_freq = torch.fft.fft(x_norm, n=eff_L, dim=1, norm='ortho') h = self.neural_filter(eff_L, x.device).unsqueeze(0) x_filtered = x_freq * h x_time = torch.fft.ifft(x_filtered, n=eff_L, dim=1, norm='ortho') if L > eff_L: x_time = F.pad(x_time, (0,0,0,L-eff_L)) else: x_time = x_time[:, :L, :] gates = torch.sigmoid(self.gate_proj(torch.cat([x_norm.real, x_norm.imag], dim=-1))) g_r, g_i = gates.chunk(2, dim=-1) x_gated = torch.complex(x_time.real * g_r, x_time.imag * g_i) mr, mi = self.mix_real, self.mix_imag x_mixed = torch.complex(mr(x_gated.real) - mi(x_gated.imag), mr(x_gated.imag) + mi(x_gated.real)) x_act = self.activation(x_mixed) or_, oi = self.out_real, self.out_imag out = torch.complex(or_(x_act.real) - oi(x_act.imag), or_(x_act.imag) + oi(x_act.real)) return self.dropout(out) + residual class PRISMEncoder(nn.Module): def __init__(self, num_layers, d_model, max_len, dropout=0.1): super().__init__() self.layers = nn.ModuleList([ GatedHarmonicConvolution(d_model, max_len, dropout) for _ in range(num_layers) ]) self.final_norm = RobustPhaseNorm(d_model) def forward(self, x, src_mask=None): for layer in self.layers: if self.training: x = torch.utils.checkpoint.checkpoint(layer, x, src_mask, use_reentrant=False) else: x = layer(x, src_mask) return self.final_norm(x) class FNetBlock(nn.Module): def __init__(self, d_model, d_ff, dropout): super().__init__() self.norm_mix = nn.LayerNorm(d_model) self.norm_ff = nn.LayerNorm(d_model) self.mix_dropout = nn.Dropout(dropout) self.ff = nn.Sequential( nn.Linear(d_model, d_ff), nn.GELU(), nn.Dropout(dropout), nn.Linear(d_ff, d_model), nn.Dropout(dropout) ) def forward(self, x): residual = x x = self.norm_mix(x) with torch.cuda.amp.autocast(enabled=False): x = x.float() x = torch.fft.fftn(x, dim=(-2, -1), norm='ortho').real x = x.to(dtype=residual.dtype) x = self.mix_dropout(x) x = x + residual residual = x x = self.norm_ff(x) x = self.ff(x) return x + residual class FNetEncoder(nn.Module): def __init__(self, depth, d_model, d_ff, dropout): super().__init__() self.layers = nn.ModuleList([ FNetBlock(d_model, d_ff, dropout) for _ in range(depth) ]) self.norm_out = nn.LayerNorm(d_model) def forward(self, x): for layer in self.layers: x = layer(x) return self.norm_out(x) # --- MAIN MODEL --- class PillarsModel(PreTrainedModel): config_class = PillarsConfig def __init__(self, config): super().__init__(config) self.config = config # 1. SHARED ROOT self.rose = DynamicRoSE(config.vocab_size, config.d_model) # 2. DOWNSAMPLE self.particle_down = nn.Linear(config.d_model, config.d_branch) self.wave_down = nn.Linear(config.d_model * 2, config.d_branch * 2) # 3. RATE STREAM (FNet) self.fnet_pos = nn.Embedding(config.seq_len, config.d_branch) self.stream_rate = FNetEncoder(depth=config.depth, d_model=config.d_branch, d_ff=config.d_branch*4, dropout=config.dropout) # 4. PHASE STREAM (PRISM) self.stream_phase = PRISMEncoder(num_layers=config.depth, d_model=config.d_branch, max_len=config.seq_len, dropout=config.dropout) self.phase_bridge = ComplexToRealBridge(config.d_branch) # 5. FUSION self.fusion_proj = nn.Linear(config.d_branch * 2, config.d_model) self.fusion_norm = nn.LayerNorm(config.d_model) # 6. REFINER self.refiner = Encoder( dim=config.d_model, depth=config.refine_depth, heads=8, attn_flash=True, rotary_pos_emb=True, attn_dropout=config.dropout, ff_dropout=config.dropout ) # 7. HEAD self.head_bias = nn.Parameter(torch.zeros(config.vocab_size)) def forward(self, input_ids, labels=None): # A. Shared Root wave_src, particle_src = self.rose(input_ids) # B. Downsample p_small = self.particle_down(particle_src) w_flat = torch.cat([wave_src.real, wave_src.imag], dim=-1) w_small_flat = self.wave_down(w_flat) w_small = torch.complex(w_small_flat[..., :self.config.d_branch], w_small_flat[..., self.config.d_branch:]) # C. Branches pos_emb = self.fnet_pos(torch.arange(input_ids.shape[1], device=input_ids.device)) rate_out = self.stream_rate(p_small + pos_emb) phase_out = self.phase_bridge(self.stream_phase(w_small)) # D. Fusion stacked = torch.cat([rate_out, phase_out], dim=-1) context = self.fusion_norm(self.fusion_proj(stacked)) # E. Refiner & Output refined = self.refiner(context) # Weight tying: Use rose embeddings as output weights logits = F.linear(refined, self.rose.raw_embedding.weight, self.head_bias) loss = None if labels is not None: loss_fct = nn.CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.config.vocab_size), labels.view(-1)) return {"loss": loss, "logits": logits} return logits