|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import math |
| from transformers import PreTrainedModel |
| try: |
| from .configuration_pillars import PillarsConfig |
| except ImportError: |
| from configuration_pillars import PillarsConfig |
|
|
| try: |
| from x_transformers import Encoder |
| except ImportError: |
| raise ImportError("To use PILLARS-DAT, you must run: pip install x-transformers") |
|
|
| |
|
|
| class ComplexDropout(nn.Module): |
| def __init__(self, p=0.5): |
| super().__init__() |
| self.p = p |
| def forward(self, z): |
| if not self.training or self.p == 0.0: return z |
| mask = torch.ones_like(z.real) |
| mask = F.dropout(mask, self.p, self.training, inplace=False) |
| return z * mask |
|
|
| class RobustPhaseNorm(nn.Module): |
| def __init__(self, d_model, eps=1e-5): |
| super().__init__() |
| self.scale = nn.Parameter(torch.ones(d_model)) |
| self.eps = eps |
| def forward(self, x): |
| mag = torch.abs(x) |
| rms = torch.sqrt(torch.mean(mag**2, dim=-1, keepdim=True) + self.eps) |
| return (x / rms) * self.scale |
|
|
| class ModReLU(nn.Module): |
| def __init__(self, features): |
| super().__init__() |
| self.b = nn.Parameter(torch.zeros(features)) |
|
|
| def forward(self, z): |
| |
| |
| |
| z_32 = z.to(torch.complex64) |
|
|
| |
| mag = torch.abs(z_32) |
|
|
| |
| new_mag = F.relu(mag + self.b.float()) |
|
|
| |
| |
| phase = z_32 / (mag + 1e-6) |
|
|
| |
| out = new_mag * phase |
|
|
| |
| return out.to(z.dtype) |
|
|
| class ComplexToRealBridge(nn.Module): |
| def __init__(self, d_model): |
| super().__init__() |
| self.proj = nn.Linear(d_model * 2, d_model) |
| self.norm = nn.LayerNorm(d_model) |
| def forward(self, x_complex): |
| cat = torch.cat([x_complex.real, x_complex.imag], dim=-1) |
| return self.norm(self.proj(cat)) |
|
|
| |
| |
| |
| class DynamicRoSE(nn.Module): |
| def __init__(self, num_embeddings, embedding_dim, max_period=10000.0): |
| super().__init__() |
| self.embedding_dim = embedding_dim |
|
|
| |
| self.raw_embedding = nn.Embedding(num_embeddings, embedding_dim) |
|
|
| |
| self.adapter = nn.Linear(embedding_dim, embedding_dim * 2) |
|
|
| |
| freqs = torch.exp(torch.arange(0, embedding_dim, dtype=torch.float32) * -(math.log(max_period) / embedding_dim)) |
| self.register_buffer('freqs', freqs) |
|
|
| self.rotation_predictor = nn.Linear(embedding_dim, embedding_dim * 2) |
|
|
| def forward(self, input_ids): |
| |
| real_base = self.raw_embedding(input_ids) |
| B, L, D = real_base.shape |
|
|
| |
| complex_params = self.adapter(real_base) |
| z_t = torch.complex(complex_params[..., :D], complex_params[..., D:]) |
|
|
| rot_raw = self.rotation_predictor(real_base) |
| rot_x, rot_y = rot_raw.chunk(2, dim=-1) |
|
|
| rot_mag = torch.sqrt(rot_x**2 + rot_y**2 + 1e-6) |
| dynamic_rot = torch.complex(rot_x / rot_mag, rot_y / rot_mag) |
|
|
| |
| pos = torch.arange(L, device=input_ids.device).float() |
| static_angles = torch.outer(pos, self.freqs) |
| static_rot = torch.polar(torch.ones_like(static_angles), static_angles) |
|
|
| z_final = z_t * static_rot.unsqueeze(0) * dynamic_rot |
|
|
| return z_final, real_base |
|
|
| |
| |
| |
| class HyenaNeuralFilter(nn.Module): |
| def __init__(self, d_model, max_len=1024, hidden_dim=64): |
| super().__init__() |
| self.d_model = d_model |
| freqs = torch.exp(torch.arange(0, hidden_dim, 2, dtype=torch.float32) * -(math.log(10000.0) / hidden_dim)) |
| self.register_buffer("freqs", freqs) |
| self.mlp = nn.Sequential( |
| nn.Linear(hidden_dim, hidden_dim), nn.SiLU(), |
| nn.Linear(hidden_dim, hidden_dim), nn.SiLU(), |
| nn.Linear(hidden_dim, d_model * 2) |
| ) |
| def forward(self, L, device): |
| t = torch.linspace(0, 1, steps=L, device=device).unsqueeze(-1) |
| emb = torch.cat([torch.sin(t * self.freqs), torch.cos(t * self.freqs)], dim=-1) |
| out = self.mlp(emb).view(L, self.d_model, 2) |
| return torch.complex(out[..., 0], out[..., 1]) |
|
|
| |
| |
| |
| |
|
|
| |
|
|
| class GatedHarmonicConvolution(nn.Module): |
| def __init__(self, d_model, max_len=1024, dropout=0.1): |
| super().__init__() |
| self.d_model = d_model |
| self.filter_len = max_len |
| self.neural_filter = HyenaNeuralFilter(d_model, max_len=max_len) |
| self.gate_proj = nn.Linear(d_model * 2, d_model * 2) |
|
|
| self.mix_real = nn.Linear(d_model, d_model) |
| self.mix_imag = nn.Linear(d_model, d_model) |
| self.out_real = nn.Linear(d_model, d_model) |
| self.out_imag = nn.Linear(d_model, d_model) |
|
|
| self.activation = ModReLU(d_model) |
| self.norm = RobustPhaseNorm(d_model) |
| self.dropout = ComplexDropout(dropout) |
|
|
| def forward(self, x, src_mask=None): |
| residual = x |
| x_norm = self.norm(x) |
| if src_mask is not None: |
| x_norm = x_norm.masked_fill(src_mask.unsqueeze(-1), 0.0) |
|
|
| |
| |
| with torch.amp.autocast('cuda', enabled=False): |
|
|
| |
| |
| |
| x_32 = x_norm.to(torch.complex64) |
| |
|
|
| B, L, D = x_32.shape |
| eff_L = min(L, self.filter_len) |
|
|
| |
| x_freq = torch.fft.fft(x_32, n=eff_L, dim=1, norm='ortho') |
|
|
| |
| h = self.neural_filter(eff_L, x.device).unsqueeze(0).to(torch.complex64) |
| x_filtered = x_freq * h |
|
|
| |
| x_time = torch.fft.ifft(x_filtered, n=eff_L, dim=1, norm='ortho') |
|
|
| if L > eff_L: x_time = F.pad(x_time, (0,0,0,L-eff_L)) |
| else: x_time = x_time[:, :L, :] |
|
|
| |
| |
| x_cat = torch.cat([x_32.real, x_32.imag], dim=-1) |
|
|
| |
| gate_w = self.gate_proj.weight.to(torch.float32) |
| gate_b = self.gate_proj.bias.to(torch.float32) |
|
|
| gate_out = F.linear(x_cat, gate_w, gate_b) |
| gates = torch.sigmoid(gate_out) |
|
|
| g_r, g_i = gates.chunk(2, dim=-1) |
| x_gated_32 = torch.complex(x_time.real * g_r, x_time.imag * g_i) |
|
|
| |
| |
| target_dtype = x.dtype |
| |
| |
|
|
| x_gated = x_gated_32.to(target_dtype) |
|
|
| |
| mr, mi = self.mix_real, self.mix_imag |
| x_mixed = torch.complex(mr(x_gated.real) - mi(x_gated.imag), mr(x_gated.imag) + mi(x_gated.real)) |
|
|
| x_act = self.activation(x_mixed) |
|
|
| or_, oi = self.out_real, self.out_imag |
| out = torch.complex(or_(x_act.real) - oi(x_act.imag), or_(x_act.imag) + oi(x_act.real)) |
|
|
| return self.dropout(out) + residual |
| |
| |
| |
| class PRISMEncoder(nn.Module): |
| def __init__(self, num_layers, d_model, max_len, dropout=0.1): |
| super().__init__() |
| self.layers = nn.ModuleList([ |
| GatedHarmonicConvolution(d_model, max_len, dropout) |
| for _ in range(num_layers) |
| ]) |
| self.final_norm = RobustPhaseNorm(d_model) |
| def forward(self, x, src_mask=None): |
| for layer in self.layers: |
| if self.training: x = torch.utils.checkpoint.checkpoint(layer, x, src_mask, use_reentrant=False) |
| else: x = layer(x, src_mask) |
| return self.final_norm(x) |
|
|
| class PRISM_WikiText_Model(nn.Module): |
| def __init__(self, vocab_size, d_model, max_len, prism_depth=5, trans_depth=1, dropout=0.1): |
| super().__init__() |
| self.d_model = d_model |
|
|
| |
| self.rose = DynamicRoSE(vocab_size, d_model) |
| self.prism_encoder = PRISMEncoder(prism_depth, d_model, max_len=max_len, dropout=dropout) |
| self.bridge = ComplexToRealBridge(d_model) |
| self.periscope_proj = nn.Sequential(nn.Linear(d_model * 2, d_model), nn.LayerNorm(d_model), nn.GELU()) |
|
|
| |
| |
| if trans_depth > 0: |
| self.refiner = Encoder( |
| dim=d_model, |
| depth=trans_depth, |
| heads=8, |
| rotary_pos_emb=True, |
| attn_flash=True, |
| attn_dropout=dropout, |
| ff_dropout=dropout, |
|
|
| ) |
| else: |
| self.refiner = None |
|
|
| |
| self.lm_head = nn.Linear(d_model, vocab_size) |
| self.lm_head.weight = self.rose.raw_embedding.weight |
|
|
| def forward(self, input_ids): |
| |
| wave_src, particle_src = self.rose(input_ids) |
| wave_out = self.prism_encoder(wave_src) |
| wave_real = self.bridge(wave_out) |
|
|
| |
| mixed_memory = self.periscope_proj(torch.cat([wave_real, particle_src], dim=-1)) |
|
|
| |
| if self.refiner: |
| out = self.refiner(mixed_memory) |
| else: |
| out = mixed_memory |
|
|
| return self.lm_head(out) |
|
|
| |
| |
| |
| class SensoryStream(nn.Module): |
| def __init__(self, depth, d_model, dropout=0.1): |
| super().__init__() |
| self.encoder = Encoder( |
| dim=d_model, |
| depth=depth, |
| heads=4, |
| attn_flash=True, |
| rotary_pos_emb=True, |
| attn_dropout=dropout, |
| ff_dropout=dropout, |
| use_rmsnorm=True, |
| ff_glu=True |
| ) |
|
|
| def forward(self, x): |
| return self.encoder(x) |
|
|
| class Pillars_DAT_Model(PreTrainedModel): |
| config_class = PillarsConfig |
| |
| def __init__(self, config): |
| super().__init__(config) |
| self.config = config |
| self.d_model = config.d_model |
| self.d_branch = config.d_branch |
| |
| |
| self.rose = DynamicRoSE(config.vocab_size, config.d_model) |
| |
| |
| self.particle_down = nn.Linear(config.d_model, config.d_branch) |
| self.wave_down = nn.Linear(config.d_model * 2, config.d_branch * 2) |
| |
| |
| self.stream_sensory = SensoryStream(depth=config.depth, d_model=config.d_branch, dropout=config.dropout) |
| |
| |
| self.stream_relational = PRISMEncoder(num_layers=config.depth, d_model=config.d_branch, max_len=config.seq_len, dropout=config.dropout) |
| self.relational_bridge = ComplexToRealBridge(config.d_branch) |
| |
| |
| self.fusion_proj = nn.Linear(config.d_branch * 2, config.d_model) |
| self.fusion_norm = nn.LayerNorm(config.d_model) |
| |
| |
| self.refiner = Encoder( |
| dim=config.d_model, depth=1, heads=8, attn_flash=True, |
| rotary_pos_emb=True, attn_dropout=config.dropout, ff_dropout=config.dropout |
| ) |
| |
| |
| self.lm_head = nn.Linear(config.d_model, config.vocab_size) |
| |
| |
| self.lm_head.weight = self.rose.raw_embedding.weight |
|
|
| def forward(self, input_ids, labels=None): |
| |
| wave_src, particle_src = self.rose(input_ids) |
| p_small = self.particle_down(particle_src) |
| |
| w_flat = torch.cat([wave_src.real, wave_src.imag], dim=-1) |
| w_small_flat = self.wave_down(w_flat) |
| w_small = torch.complex(w_small_flat[..., :self.d_branch], w_small_flat[..., self.d_branch:]) |
| |
| |
| sensory_out = self.stream_sensory(p_small) |
| relational_out_complex = self.stream_relational(w_small) |
| relational_out = self.relational_bridge(relational_out_complex) |
| |
| |
| stacked = torch.cat([sensory_out, relational_out], dim=-1) |
| context = self.fusion_norm(self.fusion_proj(stacked)) |
| |
| |
| refined = self.refiner(context) |
| |
| |
| logits = self.lm_head(refined) |
| |
| loss = None |
| if labels is not None: |
| loss_fct = nn.CrossEntropyLoss() |
| loss = loss_fct(logits.view(-1, self.config.vocab_size), labels.view(-1)) |
| return {"loss": loss, "logits": logits} |
| |
| return logits |
|
|