pillars-dat-wikitext-32k / modeling_pillars.py
prism-lab's picture
Upload model
cf5dff5 verified
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from transformers import PreTrainedModel
try:
from .configuration_pillars import PillarsConfig
except ImportError:
from configuration_pillars import PillarsConfig
try:
from x_transformers import Encoder
except ImportError:
raise ImportError("To use PILLARS-DAT, you must run: pip install x-transformers")
# --- UTILS ---
class ComplexDropout(nn.Module):
def __init__(self, p=0.5):
super().__init__()
self.p = p
def forward(self, z):
if not self.training or self.p == 0.0: return z
mask = torch.ones_like(z.real)
mask = F.dropout(mask, self.p, self.training, inplace=False)
return z * mask
class RobustPhaseNorm(nn.Module):
def __init__(self, d_model, eps=1e-5):
super().__init__()
self.scale = nn.Parameter(torch.ones(d_model))
self.eps = eps
def forward(self, x):
mag = torch.abs(x)
rms = torch.sqrt(torch.mean(mag**2, dim=-1, keepdim=True) + self.eps)
return (x / rms) * self.scale
class ModReLU(nn.Module):
def __init__(self, features):
super().__init__()
self.b = nn.Parameter(torch.zeros(features))
def forward(self, z):
# 1. FORCE FLOAT32 FOR GEOMETRY
# We must calculate magnitude in high precision to prevent
# square-law overflow (Re^2 + Im^2) from killing the gradients.
z_32 = z.to(torch.complex64)
# 2. Calculate Magnitude (Safe)
mag = torch.abs(z_32)
# 3. Activation Logic (Still FP32)
new_mag = F.relu(mag + self.b.float())
# 4. Reconstruct Phase (Safe Division)
# (z / mag) is the unit vector (phase)
phase = z_32 / (mag + 1e-6)
# 5. Result
out = new_mag * phase
# 6. Cast back to network dtype (BF16/FP16)
return out.to(z.dtype)
class ComplexToRealBridge(nn.Module):
def __init__(self, d_model):
super().__init__()
self.proj = nn.Linear(d_model * 2, d_model)
self.norm = nn.LayerNorm(d_model)
def forward(self, x_complex):
cat = torch.cat([x_complex.real, x_complex.imag], dim=-1)
return self.norm(self.proj(cat))
# ==========================================
# 4. DYNAMIC RoSE (Mamba-3 Engine)
# ==========================================
class DynamicRoSE(nn.Module):
def __init__(self, num_embeddings, embedding_dim, max_period=10000.0):
super().__init__()
self.embedding_dim = embedding_dim
# 1. Master Real Embedding (The "Particle")
self.raw_embedding = nn.Embedding(num_embeddings, embedding_dim)
# 2. Complex Adapter (The "Wave" Magnitude/Initial Phase)
self.adapter = nn.Linear(embedding_dim, embedding_dim * 2)
# 3. Static Frequencies (Positional)
freqs = torch.exp(torch.arange(0, embedding_dim, dtype=torch.float32) * -(math.log(max_period) / embedding_dim))
self.register_buffer('freqs', freqs)
self.rotation_predictor = nn.Linear(embedding_dim, embedding_dim * 2)
def forward(self, input_ids):
# A. Raw Particle
real_base = self.raw_embedding(input_ids)
B, L, D = real_base.shape
# B. Complex Wave Content
complex_params = self.adapter(real_base)
z_t = torch.complex(complex_params[..., :D], complex_params[..., D:])
rot_raw = self.rotation_predictor(real_base)
rot_x, rot_y = rot_raw.chunk(2, dim=-1)
rot_mag = torch.sqrt(rot_x**2 + rot_y**2 + 1e-6)
dynamic_rot = torch.complex(rot_x / rot_mag, rot_y / rot_mag)
# D. Static Positional Rotation
pos = torch.arange(L, device=input_ids.device).float()
static_angles = torch.outer(pos, self.freqs) # [L, D]
static_rot = torch.polar(torch.ones_like(static_angles), static_angles) # [L, D]
z_final = z_t * static_rot.unsqueeze(0) * dynamic_rot
return z_final, real_base
# ==========================================
# 5. HYENA FILTER
# ==========================================
class HyenaNeuralFilter(nn.Module):
def __init__(self, d_model, max_len=1024, hidden_dim=64):
super().__init__()
self.d_model = d_model
freqs = torch.exp(torch.arange(0, hidden_dim, 2, dtype=torch.float32) * -(math.log(10000.0) / hidden_dim))
self.register_buffer("freqs", freqs)
self.mlp = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim), nn.SiLU(),
nn.Linear(hidden_dim, hidden_dim), nn.SiLU(),
nn.Linear(hidden_dim, d_model * 2)
)
def forward(self, L, device):
t = torch.linspace(0, 1, steps=L, device=device).unsqueeze(-1)
emb = torch.cat([torch.sin(t * self.freqs), torch.cos(t * self.freqs)], dim=-1)
out = self.mlp(emb).view(L, self.d_model, 2)
return torch.complex(out[..., 0], out[..., 1])
# ==========================================
# 6. GATED HARMONIC CONVOLUTION (Lean)
# ==========================================
# @title 🛠️ Fixed PRISM Layer (Precision-Gated)
# @title 🛠️ Fixed PRISM Layer (Type-Safe)
class GatedHarmonicConvolution(nn.Module):
def __init__(self, d_model, max_len=1024, dropout=0.1):
super().__init__()
self.d_model = d_model
self.filter_len = max_len
self.neural_filter = HyenaNeuralFilter(d_model, max_len=max_len)
self.gate_proj = nn.Linear(d_model * 2, d_model * 2)
self.mix_real = nn.Linear(d_model, d_model)
self.mix_imag = nn.Linear(d_model, d_model)
self.out_real = nn.Linear(d_model, d_model)
self.out_imag = nn.Linear(d_model, d_model)
self.activation = ModReLU(d_model)
self.norm = RobustPhaseNorm(d_model)
self.dropout = ComplexDropout(dropout)
def forward(self, x, src_mask=None):
residual = x
x_norm = self.norm(x)
if src_mask is not None:
x_norm = x_norm.masked_fill(src_mask.unsqueeze(-1), 0.0)
# 🛑 PRECISION GATE 🛑
# Force operations to Float32 Complex to preserve Phase Physics
with torch.amp.autocast('cuda', enabled=False):
# --- THE FIX IS HERE ---
# Old: x_32 = x_norm.float() <-- This stripped the imaginary part
# New: Explicit cast to Complex64
x_32 = x_norm.to(torch.complex64)
# -----------------------
B, L, D = x_32.shape
eff_L = min(L, self.filter_len)
# 1. FFT (Now safe because x_32 is definitely complex)
x_freq = torch.fft.fft(x_32, n=eff_L, dim=1, norm='ortho')
# 2. Filter (Ensure filter is also complex64)
h = self.neural_filter(eff_L, x.device).unsqueeze(0).to(torch.complex64)
x_filtered = x_freq * h
# 3. IFFT
x_time = torch.fft.ifft(x_filtered, n=eff_L, dim=1, norm='ortho')
if L > eff_L: x_time = F.pad(x_time, (0,0,0,L-eff_L))
else: x_time = x_time[:, :L, :]
# 4. Gating (Sigmoid logic)
# Safe concatenation because x_32 is complex64
x_cat = torch.cat([x_32.real, x_32.imag], dim=-1)
# Cast weights to Float32 for the calculation
gate_w = self.gate_proj.weight.to(torch.float32)
gate_b = self.gate_proj.bias.to(torch.float32)
gate_out = F.linear(x_cat, gate_w, gate_b)
gates = torch.sigmoid(gate_out)
g_r, g_i = gates.chunk(2, dim=-1)
x_gated_32 = torch.complex(x_time.real * g_r, x_time.imag * g_i)
# 🏁 EXIT GATE: Cast back to original dtype (likely BFloat16 from autocast)
# We cast real/imag separately to be safe
target_dtype = x.dtype
# If x was complex, target is complex. If x was real, we have an issue.
# Assuming x comes from autocast, it might be complex16.
x_gated = x_gated_32.to(target_dtype)
# 5. Mixing (Back in mixed precision)
mr, mi = self.mix_real, self.mix_imag
x_mixed = torch.complex(mr(x_gated.real) - mi(x_gated.imag), mr(x_gated.imag) + mi(x_gated.real))
x_act = self.activation(x_mixed)
or_, oi = self.out_real, self.out_imag
out = torch.complex(or_(x_act.real) - oi(x_act.imag), or_(x_act.imag) + oi(x_act.real))
return self.dropout(out) + residual
# ==========================================
# 7. MODEL WRAPPERS
# ==========================================
class PRISMEncoder(nn.Module):
def __init__(self, num_layers, d_model, max_len, dropout=0.1):
super().__init__()
self.layers = nn.ModuleList([
GatedHarmonicConvolution(d_model, max_len, dropout)
for _ in range(num_layers)
])
self.final_norm = RobustPhaseNorm(d_model)
def forward(self, x, src_mask=None):
for layer in self.layers:
if self.training: x = torch.utils.checkpoint.checkpoint(layer, x, src_mask, use_reentrant=False)
else: x = layer(x, src_mask)
return self.final_norm(x)
class PRISM_WikiText_Model(nn.Module):
def __init__(self, vocab_size, d_model, max_len, prism_depth=5, trans_depth=1, dropout=0.1):
super().__init__()
self.d_model = d_model
# 1. PRISM Core (The Optical/Passive Part)
self.rose = DynamicRoSE(vocab_size, d_model)
self.prism_encoder = PRISMEncoder(prism_depth, d_model, max_len=max_len, dropout=dropout)
self.bridge = ComplexToRealBridge(d_model)
self.periscope_proj = nn.Sequential(nn.Linear(d_model * 2, d_model), nn.LayerNorm(d_model), nn.GELU())
# 2. Refiner (The Digital/Active Part)
# 🔄 SWAPPED: Replaced Standard Transformer with RoPE-Enabled Encoder
if trans_depth > 0:
self.refiner = Encoder(
dim=d_model,
depth=trans_depth,
heads=8,
rotary_pos_emb=True,
attn_flash=True,
attn_dropout=dropout,
ff_dropout=dropout,
)
else:
self.refiner = None
# 3. Output
self.lm_head = nn.Linear(d_model, vocab_size)
self.lm_head.weight = self.rose.raw_embedding.weight
def forward(self, input_ids):
# A. Wave Physics
wave_src, particle_src = self.rose(input_ids)
wave_out = self.prism_encoder(wave_src)
wave_real = self.bridge(wave_out)
# B. Interface
mixed_memory = self.periscope_proj(torch.cat([wave_real, particle_src], dim=-1))
# C. Digital Refinement (Now with RoPE)
if self.refiner:
out = self.refiner(mixed_memory)
else:
out = mixed_memory
return self.lm_head(out)
# ==========================================
# 1. SENSORY STREAM (Transformer + RoPE)
# ==========================================
class SensoryStream(nn.Module):
def __init__(self, depth, d_model, dropout=0.1):
super().__init__()
self.encoder = Encoder(
dim=d_model,
depth=depth,
heads=4, # 256 dim / 64 head_dim = 4 heads
attn_flash=True, # Flash Attention
rotary_pos_emb=True, # <--- CRITICAL: RoPE Enabled
attn_dropout=dropout,
ff_dropout=dropout,
use_rmsnorm=True, # RMSNorm (Llama style)
ff_glu=True # SwiGLU (Llama style)
)
def forward(self, x):
return self.encoder(x)
class Pillars_DAT_Model(PreTrainedModel):
config_class = PillarsConfig
def __init__(self, config):
super().__init__(config)
self.config = config
self.d_model = config.d_model
self.d_branch = config.d_branch
# 1. Root
self.rose = DynamicRoSE(config.vocab_size, config.d_model)
# 2. Downsample
self.particle_down = nn.Linear(config.d_model, config.d_branch)
self.wave_down = nn.Linear(config.d_model * 2, config.d_branch * 2)
# 3. Stream A: Sensory (Rate)
self.stream_sensory = SensoryStream(depth=config.depth, d_model=config.d_branch, dropout=config.dropout)
# 4. Stream B: Relational (Phase)
self.stream_relational = PRISMEncoder(num_layers=config.depth, d_model=config.d_branch, max_len=config.seq_len, dropout=config.dropout)
self.relational_bridge = ComplexToRealBridge(config.d_branch)
# 5. Fusion
self.fusion_proj = nn.Linear(config.d_branch * 2, config.d_model)
self.fusion_norm = nn.LayerNorm(config.d_model)
# 6. Refiner
self.refiner = Encoder(
dim=config.d_model, depth=1, heads=8, attn_flash=True,
rotary_pos_emb=True, attn_dropout=config.dropout, ff_dropout=config.dropout
)
# 7. Output (Converted to Layer for HF Compatibility)
self.lm_head = nn.Linear(config.d_model, config.vocab_size)
# Tie Weights Explicitly
self.lm_head.weight = self.rose.raw_embedding.weight
def forward(self, input_ids, labels=None):
# 1. Physics
wave_src, particle_src = self.rose(input_ids)
p_small = self.particle_down(particle_src)
w_flat = torch.cat([wave_src.real, wave_src.imag], dim=-1)
w_small_flat = self.wave_down(w_flat)
w_small = torch.complex(w_small_flat[..., :self.d_branch], w_small_flat[..., self.d_branch:])
# 2. Parallel Streams
sensory_out = self.stream_sensory(p_small)
relational_out_complex = self.stream_relational(w_small)
relational_out = self.relational_bridge(relational_out_complex)
# 3. Fusion
stacked = torch.cat([sensory_out, relational_out], dim=-1)
context = self.fusion_norm(self.fusion_proj(stacked))
# 4. Refinement
refined = self.refiner(context)
# 5. Output
logits = self.lm_head(refined)
loss = None
if labels is not None:
loss_fct = nn.CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.config.vocab_size), labels.view(-1))
return {"loss": loss, "logits": logits}
return logits