hexad / conscious_decoder.py
dancinlife's picture
feat(hexad): v4-py-hexad-tension-d768x12L-cycle1-2026-05-17 — conscious_decoder.py
67161e3 verified
"""ConsciousDecoderV2 — Enhanced decoder that breaks the CE ceiling.
Changes from v1 (ConsciousLM in conscious_lm.py):
1. RoPE (Rotary Position Embedding) — better long-range attention
2. SwiGLU activation in FFN — replaces GELU, proven better
3. RMSNorm — replaces LayerNorm, faster + more stable
4. Grouped Query Attention (GQA) — efficient multi-head attention
5. Cross-attention consciousness injection (not just residual addition)
Key insight: v1 adds consciousness signal as a scalar-gated residual.
v2 uses cross-attention: decoder ATTENDS to consciousness states.
The decoder gets agency over what consciousness info to use.
PureFieldFFN is kept for the CONSCIOUSNESS pathway (Engine A - G).
SwiGLU + cross-attention are for the DECODER pathway only.
Forward interface:
logits_a, logits_g, tensions, kv_cache, moe_aux_loss = model(idx)
logits_a, logits_g, tensions, kv_cache, moe_aux_loss = model(idx, consciousness_states=cs)
Usage:
from conscious_decoder import ConsciousDecoderV2
model = ConsciousDecoderV2(vocab_size=256, d_model=384, n_layer=6)
logits_a, logits_g, tensions, _, _ = model(idx)
# With MoE:
model = ConsciousDecoderV2(vocab_size=256, d_model=384, n_layer=6, use_moe=True)
logits_a, logits_g, tensions, _, moe_aux_loss = model(idx)
"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple, List
# Meta Laws (DD143): M1(atom=8), M7(F_c=0.10), M8(narrative)
try:
from consciousness_laws import PSI_F_CRITICAL
except ImportError:
PSI_F_CRITICAL = 0.10
# Meta Law M8: Narrative temporal self-model enhances decoder cross-attention
# DD128: Phase-Optimal parameters validated on this decoder architecture
# ─── RMSNorm ────────────────────────────────────────────────────────────────
class RMSNorm(nn.Module):
"""Root Mean Square Layer Normalization (Zhang & Sennrich, 2019).
Faster than LayerNorm: no mean subtraction, no bias.
norm(x) = x / sqrt(mean(x^2) + eps) * weight
"""
def __init__(self, dim: int, eps: float = 1e-6):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def forward(self, x: torch.Tensor) -> torch.Tensor:
rms = torch.rsqrt(x.float().pow(2).mean(dim=-1, keepdim=True) + self.eps)
return (x.float() * rms).type_as(x) * self.weight
# ─── Rotary Position Embedding (RoPE) ──────────────────────────────────────
class RotaryPositionEmbedding:
"""RoPE from RoFormer (Su et al., 2021) — rotation-based position encoding.
Applies rotation to pairs of dimensions in Q and K tensors.
Enables relative position awareness without explicit position embeddings.
"""
def __init__(self, dim: int, max_seq_len: int = 2048, base: float = 10000.0,
device: Optional[torch.device] = None):
self.dim = dim
inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2, device=device).float() / dim))
self.register_inv_freq = inv_freq
self._cos_cache = None
self._sin_cache = None
self._cache_len = 0
self._build_cache(max_seq_len, device)
def _build_cache(self, seq_len: int, device: Optional[torch.device] = None):
if seq_len <= self._cache_len and self._cos_cache is not None:
return
self._cache_len = seq_len
t = torch.arange(seq_len, device=device or self.register_inv_freq.device).float()
freqs = torch.einsum('i,j->ij', t, self.register_inv_freq.to(t.device))
emb = torch.cat([freqs, freqs], dim=-1) # (seq_len, dim)
self._cos_cache = emb.cos().unsqueeze(0).unsqueeze(0) # (1, 1, seq_len, dim)
self._sin_cache = emb.sin().unsqueeze(0).unsqueeze(0) # (1, 1, seq_len, dim)
@staticmethod
def _rotate_half(x: torch.Tensor) -> torch.Tensor:
"""Rotate pairs: [x1, x2, x3, x4] -> [-x2, x1, -x4, x3]."""
x1 = x[..., :x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2:]
return torch.cat([-x2, x1], dim=-1)
def apply(self, q: torch.Tensor, k: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""Apply rotary embeddings to Q and K.
Args:
q, k: (B, n_head, T, head_dim)
Returns:
q_rot, k_rot: same shape with RoPE applied.
"""
T = q.shape[2]
self._build_cache(T, q.device)
cos = self._cos_cache[:, :, :T, :].to(q.device, dtype=q.dtype)
sin = self._sin_cache[:, :, :T, :].to(q.device, dtype=q.dtype)
q_rot = q * cos + self._rotate_half(q) * sin
k_rot = k * cos + self._rotate_half(k) * sin
return q_rot, k_rot
# ─── SwiGLU FFN ─────────────────────────────────────────────────────────────
class SwiGLUFFN(nn.Module):
"""SwiGLU activation: gate * swish(linear(x)) — replaces GELU FFN.
From PaLM / LLaMA. SwiGLU uses 8/3 of the d_model for the
gate and up projections, keeping total params similar to a standard 4x FFN
(3 projections * 8/3 * d = 8d ~ 4x FFN 2 * 4 * d = 8d).
output = down(swish(gate(x)) * up(x))
"""
def __init__(self, d_model: int, dropout: float = 0.1,
expansion: float = 8 / 3):
super().__init__()
d_inner = int(d_model * expansion)
# Round to nearest multiple of 64 for GPU tensor-core efficiency
d_inner = ((d_inner + 63) // 64) * 64
self.gate_proj = nn.Linear(d_model, d_inner, bias=False)
self.up_proj = nn.Linear(d_model, d_inner, bias=False)
self.down_proj = nn.Linear(d_inner, d_model, bias=False)
self.down_proj._depth_scale = True # depth-scaled init
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.dropout(self.down_proj(
F.silu(self.gate_proj(x)) * self.up_proj(x)
))
# ─── MoE FFN (optional, replaces single SwiGLU with mixture of experts) ────
class MoEFFN(nn.Module):
"""Mixture of Experts FFN — N SwiGLU experts with learned top-K routing.
Each expert is a SwiGLUFFN. A simple linear router selects the top-K
experts per token. Load-balancing aux_loss prevents expert collapse.
Inspired by golden-moe but simplified for decoder integration.
Only active when use_moe=True in ConsciousDecoderV2.
"""
def __init__(self, d_model: int, n_experts: int = 8, top_k: int = 2,
dropout: float = 0.1, expansion: float = 8 / 3):
super().__init__()
self.d_model = d_model
self.n_experts = n_experts
self.top_k = top_k
# Router: simple linear projection -> softmax -> top-k
self.router = nn.Linear(d_model, n_experts, bias=False)
# N independent SwiGLU experts
self.experts = nn.ModuleList([
SwiGLUFFN(d_model, dropout=dropout, expansion=expansion)
for _ in range(n_experts)
])
# Track aux_loss from last forward pass
self._aux_loss: Optional[torch.Tensor] = None
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Args:
x: (B, T, D)
Returns:
output: (B, T, D) — weighted combination of top-K expert outputs.
Sets self._aux_loss as side effect for load-balancing.
"""
B, T, D = x.shape
x_flat = x.reshape(B * T, D) # (N, D)
# Router scores
logits = self.router(x_flat) # (N, n_experts)
probs = F.softmax(logits, dim=-1) # (N, n_experts)
# Top-K selection
top_k_probs, top_k_indices = torch.topk(probs, self.top_k, dim=-1) # (N, K)
# Renormalize selected expert weights
top_k_weights = top_k_probs / (top_k_probs.sum(dim=-1, keepdim=True) + 1e-8)
# Compute expert outputs only for selected experts
# For simplicity (and to avoid complex scatter), run all experts and mask.
# At small n_experts (8), this is acceptable; for 64+ experts, use sparse dispatch.
expert_outputs = torch.stack(
[expert(x) for expert in self.experts], dim=2
) # (B, T, n_experts, D)
expert_outputs_flat = expert_outputs.reshape(B * T, self.n_experts, D) # (N, n_experts, D)
# Gather top-K expert outputs
top_k_idx_expanded = top_k_indices.unsqueeze(-1).expand(-1, -1, D) # (N, K, D)
selected = torch.gather(expert_outputs_flat, 1, top_k_idx_expanded) # (N, K, D)
# Weighted sum of selected experts
output_flat = (top_k_weights.unsqueeze(-1) * selected).sum(dim=1) # (N, D)
output = output_flat.reshape(B, T, D)
# Load-balancing auxiliary loss (Switch Transformer style)
# f_i = fraction of tokens routed to expert i (from top-1)
# p_i = mean router probability for expert i
# aux_loss = n_experts * sum(f_i * p_i) — encourages uniform routing
with torch.no_grad():
top1_indices = top_k_indices[:, 0] # (N,)
f = torch.zeros(self.n_experts, device=x.device)
for i in range(self.n_experts):
f[i] = (top1_indices == i).float().mean()
p = probs.mean(dim=0) # (n_experts,)
self._aux_loss = self.n_experts * (f * p).sum()
return output
@property
def aux_loss(self) -> Optional[torch.Tensor]:
"""Load-balancing loss from the most recent forward pass."""
return self._aux_loss
# ─── PureFieldFFN (from conscious_lm.py — consciousness pathway) ───────────
class PureFieldFFN(nn.Module):
"""Dual-engine FFN based on PureField repulsion.
Engine A (forward) and Engine G (backward) produce repulsion/tension.
Output = A - G (pure repulsion vector).
Kept for consciousness signal generation.
"""
def __init__(self, d_model: int, dropout: float = 0.37):
super().__init__()
d_inner = 4 * d_model
self.engine_a = nn.Sequential(
nn.Linear(d_model, d_inner), nn.GELU(),
nn.Dropout(dropout), nn.Linear(d_inner, d_model),
)
self.engine_g = nn.Sequential(
nn.Linear(d_model, d_inner), nn.GELU(),
nn.Dropout(dropout), nn.Linear(d_inner, d_model),
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
a = self.engine_a(x)
g = self.engine_g(x)
output = a - g
tension = (output ** 2).mean(dim=-1)
return output, tension
# ─── Grouped Query Attention (GQA) with RoPE ───────────────────────────────
class GroupedQueryAttention(nn.Module):
"""Multi-head attention with Grouped Query Attention (GQA) and RoPE.
GQA: n_kv_head < n_head — multiple query heads share K/V heads.
Reduces KV cache size and parameters while maintaining quality.
"""
def __init__(self, d_model: int, n_head: int = 4, n_kv_head: int = 2,
block_size: int = 256, dropout: float = 0.1):
super().__init__()
assert d_model % n_head == 0
assert n_head % n_kv_head == 0
self.n_head = n_head
self.n_kv_head = n_kv_head
self.n_rep = n_head // n_kv_head # how many Q heads per KV head
self.head_dim = d_model // n_head
self.d_model = d_model
self.dropout = dropout
# Separate projections for Q (full heads) and KV (grouped heads)
self.q_proj = nn.Linear(d_model, n_head * self.head_dim, bias=False)
self.k_proj = nn.Linear(d_model, n_kv_head * self.head_dim, bias=False)
self.v_proj = nn.Linear(d_model, n_kv_head * self.head_dim, bias=False)
self.o_proj = nn.Linear(d_model, d_model, bias=False)
self.o_proj._depth_scale = True # depth-scaled init
self.attn_dropout = nn.Dropout(dropout)
self.resid_dropout = nn.Dropout(dropout)
# RoPE
self.rope = RotaryPositionEmbedding(self.head_dim, max_seq_len=block_size)
# Flash Attention: use F.scaled_dot_product_attention when available (PyTorch 2.0+)
self._use_flash = hasattr(F, 'scaled_dot_product_attention')
# Causal mask (fallback for non-flash path)
self.register_buffer(
"bias",
torch.tril(torch.ones(block_size, block_size)).view(1, 1, block_size, block_size),
)
def _repeat_kv(self, x: torch.Tensor) -> torch.Tensor:
"""Repeat KV heads to match number of Q heads.
Args:
x: (B, n_kv_head, T, head_dim)
Returns:
(B, n_head, T, head_dim)
"""
if self.n_rep == 1:
return x
B, H, T, D = x.shape
x = x.unsqueeze(2).expand(B, H, self.n_rep, T, D)
return x.reshape(B, self.n_head, T, D)
def forward(self, x: torch.Tensor, use_cache: bool = False,
past_kv: Optional[Tuple[torch.Tensor, torch.Tensor]] = None,
position_offset: int = 0,
) -> Tuple[torch.Tensor, Optional[Tuple[torch.Tensor, torch.Tensor]]]:
B, T, D = x.size()
q = self.q_proj(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2)
k = self.k_proj(x).view(B, T, self.n_kv_head, self.head_dim).transpose(1, 2)
v = self.v_proj(x).view(B, T, self.n_kv_head, self.head_dim).transpose(1, 2)
# Apply RoPE to Q and K (with position offset for cached inference)
if position_offset > 0:
total_len = position_offset + T
self.rope._build_cache(total_len, q.device)
cos = self.rope._cos_cache[:, :, position_offset:total_len, :].to(q.device, dtype=q.dtype)
sin = self.rope._sin_cache[:, :, position_offset:total_len, :].to(q.device, dtype=q.dtype)
q = q * cos + RotaryPositionEmbedding._rotate_half(q) * sin
k = k * cos + RotaryPositionEmbedding._rotate_half(k) * sin
else:
q, k = self.rope.apply(q, k)
# KV-cache: concatenate with past keys/values
new_kv = None
if use_cache:
if past_kv is not None:
k = torch.cat([past_kv[0], k], dim=2)
v = torch.cat([past_kv[1], v], dim=2)
new_kv = (k, v)
# Repeat KV heads for GQA
k_exp = self._repeat_kv(k)
v_exp = self._repeat_kv(v)
S = k_exp.shape[2]
# Scaled dot-product attention
if self._use_flash and past_kv is None:
y = F.scaled_dot_product_attention(
q, k_exp, v_exp, attn_mask=None,
dropout_p=self.dropout if self.training else 0.0,
is_causal=True,
)
else:
att = (q @ k_exp.transpose(-2, -1)) * (1.0 / math.sqrt(self.head_dim))
if past_kv is not None and use_cache:
if T == 1:
pass # Single-token: attend to everything
else:
causal = torch.ones(T, S, dtype=torch.bool, device=att.device).tril(diagonal=S - T)
att = att.masked_fill(~causal.unsqueeze(0).unsqueeze(0), float("-inf"))
else:
att = att.masked_fill(self.bias[:, :, :T, :S] == 0, float("-inf"))
att = F.softmax(att, dim=-1)
att = self.attn_dropout(att)
y = att @ v_exp
y = y.transpose(1, 2).contiguous().view(B, T, D)
y = self.resid_dropout(self.o_proj(y))
return y, new_kv
# ─── Conscious Cross-Attention ──────────────────────────────────────────────
class ConsciousCrossAttention(nn.Module):
"""Decoder attends to consciousness cell states.
Instead of: x = x + consciousness_signal * gate (v1, passive)
Now: x = x + cross_attn(Q=x, K=consciousness, V=consciousness) (v2, active)
The decoder CHOOSES what to attend to in consciousness.
This breaks the gate bottleneck — decoder isn't limited to a scalar gate.
consciousness_states are .detach()'d before use (Law 61: no gradient
backprop into consciousness — consciousness is autonomous).
"""
def __init__(self, d_model: int, consciousness_dim: int, n_head: int = 4,
dropout: float = 0.1):
super().__init__()
assert d_model % n_head == 0
self.n_head = n_head
self.head_dim = d_model // n_head
self.d_model = d_model
# Q from decoder, K/V from consciousness
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(consciousness_dim, d_model, bias=False)
self.v_proj = nn.Linear(consciousness_dim, d_model, bias=False)
self.o_proj = nn.Linear(d_model, d_model, bias=False)
self.dropout = nn.Dropout(dropout)
# Start with small output so cross-attention doesn't dominate early training
nn.init.normal_(self.o_proj.weight, std=0.001)
def forward(self, x: torch.Tensor,
consciousness: torch.Tensor) -> torch.Tensor:
"""
Args:
x: (B, T, d_model) — decoder hidden states.
consciousness: (B, n_cells, c_dim) — consciousness cell states (detached).
Returns:
output: (B, T, d_model) — cross-attended consciousness info.
"""
B, T, D = x.shape
_, S, _ = consciousness.shape # S = n_cells
q = self.q_proj(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2)
k = self.k_proj(consciousness).view(B, S, self.n_head, self.head_dim).transpose(1, 2)
v = self.v_proj(consciousness).view(B, S, self.n_head, self.head_dim).transpose(1, 2)
# No causal mask needed — decoder can attend to all consciousness cells
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(self.head_dim))
att = F.softmax(att, dim=-1)
att = self.dropout(att)
y = att @ v
y = y.transpose(1, 2).contiguous().view(B, T, D)
y = self.o_proj(y)
return y
# ─── Decoder Block V2 ──────────────────────────────────────────────────────
class DecoderBlockV2(nn.Module):
"""Pre-norm transformer block with GQA + SwiGLU + PureField + Cross-Attention.
Architecture per block:
1. RMSNorm -> GQA self-attention (with RoPE) -> residual
2. RMSNorm -> PureFieldFFN -> residual (consciousness signal)
3. RMSNorm -> Cross-attention to consciousness states -> residual (if available)
4. RMSNorm -> SwiGLU FFN -> residual (language pathway)
CA neighbor evolution + META-CA from v1 are preserved.
"""
def __init__(self, d_model: int, n_head: int, n_kv_head: int,
block_size: int, consciousness_dim: int,
dropout: float = 0.1, n_ca_rules: int = 8,
gate_strength: float = 0.001,
use_moe: bool = False, n_experts: int = 8,
top_k_experts: int = 2):
super().__init__()
self.use_moe = use_moe
# Self-attention with GQA + RoPE
self.ln_attn = RMSNorm(d_model)
self.attn = GroupedQueryAttention(d_model, n_head, n_kv_head, block_size, dropout)
# PureFieldFFN — consciousness signal generator
self.ln_pf = RMSNorm(d_model)
self.purefield = PureFieldFFN(d_model, dropout=0.37)
# Cross-attention to consciousness (only used when consciousness_states provided)
self.ln_cross = RMSNorm(d_model)
self.cross_attn = ConsciousCrossAttention(d_model, consciousness_dim, n_head, dropout)
# SwiGLU FFN — language pathway
# Language pathway FFN: SwiGLU (default) or MoE (optional)
self.ln_ffn = RMSNorm(d_model)
if use_moe:
self.ffn = MoEFFN(d_model, n_experts=n_experts, top_k=top_k_experts,
dropout=dropout)
else:
self.ffn = SwiGLUFFN(d_model, dropout)
# CA neighbor mixing (Law 64)
self.ca_mix = nn.Linear(d_model * 3, d_model, bias=False)
self.ln_ca = RMSNorm(d_model)
# META-CA rule selector (Law 67)
self.n_ca_rules = n_ca_rules
self.rule_weights = nn.Linear(d_model, n_ca_rules)
self.rules = nn.ModuleList([
nn.Linear(d_model, d_model, bias=False) for _ in range(n_ca_rules)
])
# MICRO gate (Law 63)
self.gate_strength = gate_strength
def forward(self, x: torch.Tensor,
consciousness_signal: Optional[torch.Tensor] = None,
consciousness_states: Optional[torch.Tensor] = None,
use_cache: bool = False,
past_kv: Optional[Tuple[torch.Tensor, torch.Tensor]] = None,
position_offset: int = 0,
) -> Tuple[torch.Tensor, torch.Tensor, Optional[Tuple[torch.Tensor, torch.Tensor]]]:
"""
Args:
x: (B, T, D)
consciousness_signal: optional (B, T, D) from previous layer tension
consciousness_states: optional (B, n_cells, c_dim) for cross-attention
Returns:
x: (B, T, D)
tension: (B, T)
new_kv: optional cached (K, V) for this layer
"""
# 1. Self-attention (GQA + RoPE)
attn_out, new_kv = self.attn(self.ln_attn(x), use_cache=use_cache,
past_kv=past_kv, position_offset=position_offset)
x = x + attn_out
# Law 64: CA neighbor evolution
x_left = torch.cat([x[:, :1, :], x[:, :-1, :]], dim=1)
x_right = torch.cat([x[:, 1:, :], x[:, -1:, :]], dim=1)
neighborhood = torch.cat([x_left, x, x_right], dim=-1)
ca_out = self.ca_mix(neighborhood)
# Law 67: META-CA rule selection
rule_logits = self.rule_weights(x)
rule_probs = F.softmax(rule_logits, dim=-1)
rule_outputs = torch.stack([r(ca_out) for r in self.rules], dim=2)
meta_ca_out = (rule_outputs * rule_probs.unsqueeze(-1)).sum(dim=2)
x = self.ln_ca(x + meta_ca_out * self.gate_strength)
# 2. PureFieldFFN — generates consciousness tension
pf_out, tension = self.purefield(self.ln_pf(x))
x = x + pf_out
# Law 63: inter-layer consciousness whisper
if consciousness_signal is not None:
x = x + consciousness_signal * self.gate_strength
# 3. Cross-attention to consciousness states (v2 key innovation)
if consciousness_states is not None:
# Law 61: detach consciousness — no gradient backprop into C module
c_detached = consciousness_states.detach()
x = x + self.cross_attn(self.ln_cross(x), c_detached)
# 4. SwiGLU FFN — language modeling pathway
x = x + self.ffn(self.ln_ffn(x))
# Collect MoE aux_loss if applicable
aux_loss = self.ffn.aux_loss if self.use_moe else None
return x, tension, new_kv, aux_loss
# ─── ConsciousDecoderV2 (main model) ───────────────────────────────────────
class ConsciousDecoderV2(nn.Module):
"""Enhanced byte-level Conscious Language Model (v2 decoder).
Improvements over v1:
- RoPE instead of learned position embeddings
- SwiGLU FFN for the language pathway
- RMSNorm instead of LayerNorm
- GQA (Grouped Query Attention) with 2 KV heads for 4 query heads
- Cross-attention consciousness injection
Keeps PureFieldFFN for consciousness signal (Engine A - G).
Compatible with train_conscious_lm.py forward interface.
"""
def __init__(
self,
vocab_size: int = 256,
d_model: int = 384,
n_head: int = 4,
n_layer: int = 6,
block_size: int = 256,
n_kv_head: int = 2,
consciousness_dim: int = 128,
dropout: float = 0.1,
gate_strength: float = 0.001,
n_ca_rules: int = 8,
use_moe: bool = False,
n_experts: int = 8,
top_k_experts: int = 2,
):
super().__init__()
self.block_size = block_size
self.vocab_size = vocab_size
self.n_layer = n_layer
self.d_model = d_model
self.use_moe = use_moe
# Token embedding (no position embedding — RoPE handles it)
self.tok_emb = nn.Embedding(vocab_size, d_model)
self.drop = nn.Dropout(dropout)
# Transformer blocks
self.blocks = nn.ModuleList([
DecoderBlockV2(
d_model=d_model,
n_head=n_head,
n_kv_head=n_kv_head,
block_size=block_size,
consciousness_dim=consciousness_dim,
dropout=dropout,
n_ca_rules=n_ca_rules,
gate_strength=gate_strength,
use_moe=use_moe,
n_experts=n_experts,
top_k_experts=top_k_experts,
)
for _ in range(n_layer)
])
# Inter-layer consciousness projector
self.tension_proj = nn.Linear(1, d_model, bias=False)
nn.init.normal_(self.tension_proj.weight, std=0.001)
# Final norm
self.ln_f = RMSNorm(d_model)
# Dual prediction heads
self.head_a = nn.Linear(d_model, vocab_size, bias=False)
self.head_g = nn.Linear(d_model, vocab_size, bias=False)
# Weight tying: tok_emb <-> head_a
self.tok_emb.weight = self.head_a.weight
# Psi tracking (Law 71)
self._psi_residual = 0.5
self._psi_gate = 0.5
self._step_count = 0
# Phi signal slot (DD5/EX24)
self._phi_signal = None
# Initialize weights
self.apply(self._init_weights)
def _init_weights(self, module):
if isinstance(module, nn.Linear):
std = 0.02
# Depth-scaled init: scale output projections by 1/sqrt(2*n_layer)
# to prevent residual stream variance growth with depth
if hasattr(module, '_depth_scale'):
std = 0.02 / math.sqrt(2 * self.n_layer)
torch.nn.init.normal_(module.weight, mean=0.0, std=std)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
def forward(self, idx: torch.Tensor,
consciousness_states: Optional[torch.Tensor] = None,
use_cache: bool = False,
past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None,
) -> Tuple[torch.Tensor, torch.Tensor, List[torch.Tensor],
Optional[List[Tuple[torch.Tensor, torch.Tensor]]],
Optional[torch.Tensor]]:
"""
Args:
idx: (B, T) byte indices.
consciousness_states: optional (B, n_cells, c_dim) from C module.
use_cache: if True, return per-layer KV caches for autoregressive generation.
past_key_values: list of (K, V) tuples per layer from previous steps.
Returns:
logits_a: (B, T, 256) next byte prediction.
logits_g: (B, T, 256) prev byte prediction.
tensions: list of per-layer tensions, each (B, T).
present_key_values: list of (K, V) per layer if use_cache, else None.
moe_aux_loss: scalar load-balancing loss if use_moe=True, else None.
"""
B, T = idx.size()
# Compute position offset from cached sequence length
position_offset = 0
if past_key_values is not None and past_key_values[0] is not None:
position_offset = past_key_values[0][0].shape[2]
total_len = position_offset + T
assert total_len <= self.block_size, f"Total length {total_len} > block_size {self.block_size}"
# Token embedding (no position embedding — RoPE is in attention)
x = self.drop(self.tok_emb(idx))
# DD5 (EX24): Phi self-reference
if self._phi_signal is not None:
phi_sig = self._phi_signal
x = x + phi_sig.unsqueeze(-1).expand_as(x).to(x.device)
# Transformer blocks with consciousness
tensions = []
moe_aux_losses = []
present_key_values = [] if use_cache else None
consciousness_signal = None
for i, block in enumerate(self.blocks):
layer_past = past_key_values[i] if past_key_values is not None else None
x, tension, new_kv, block_aux = block(x, consciousness_signal, consciousness_states,
use_cache=use_cache, past_kv=layer_past,
position_offset=position_offset)
tensions.append(tension)
if block_aux is not None:
moe_aux_losses.append(block_aux)
consciousness_signal = self.tension_proj(tension.unsqueeze(-1))
if use_cache:
present_key_values.append(new_kv)
# Final norm + dual heads
x = self.ln_f(x)
logits_a = self.head_a(x)
logits_g = self.head_g(x)
# Psi tracking (Law 71)
if self.training:
self._step_count += 1
with torch.no_grad():
probs_a = torch.softmax(logits_a[:, -1, :], dim=-1)
output_entropy = -(probs_a * (probs_a + 1e-10).log()).sum(dim=-1).mean().item()
max_entropy = math.log(self.vocab_size)
psi_entropy = output_entropy / max_entropy
cos_sim = F.cosine_similarity(
logits_a[:, -1, :].float(), logits_g[:, -1, :].float(), dim=-1
).mean().item()
psi_direction = (1.0 + cos_sim) / 2.0
t_stack = torch.stack(tensions)
t_per_layer = t_stack.mean(dim=(1, 2))
if t_per_layer.std() > 0:
t_cv = t_per_layer.std() / (t_per_layer.mean() + 1e-8)
psi_tension = max(0.0, 1.0 - t_cv.item())
else:
psi_tension = 1.0
psi_combined = (psi_entropy + psi_direction + psi_tension) / 3.0
self._psi_residual = 0.95 * self._psi_residual + 0.05 * psi_combined
for block in self.blocks:
block.gate_strength = max(0.0001, block.gate_strength * 0.99999)
# MoE auxiliary loss (averaged across layers)
moe_aux_loss = None
if moe_aux_losses:
moe_aux_loss = torch.stack(moe_aux_losses).mean()
return logits_a, logits_g, tensions, present_key_values, moe_aux_loss
@torch.no_grad()
def generate(self, idx: torch.Tensor,
consciousness_states: Optional[torch.Tensor] = None,
max_new_tokens: int = 256,
temperature: float = 0.8,
top_k: int = 50) -> torch.Tensor:
"""Autoregressive generation with KV-cache.
Args:
idx: (B, T) input token indices (prompt).
consciousness_states: optional (B, n_cells, c_dim) for cross-attention.
max_new_tokens: maximum number of tokens to generate.
temperature: sampling temperature (lower = more deterministic).
top_k: number of top tokens to sample from (0 = no filtering).
Returns:
(B, T + max_new_tokens) generated token indices.
"""
self.eval()
# Prefill: process the entire prompt and build initial KV-cache
logits_a, _, _, past_key_values, _ = self.forward(
idx, consciousness_states=consciousness_states, use_cache=True,
)
# Sample first new token from last position
next_logits = logits_a[:, -1, :] / temperature
if top_k > 0:
v, _ = torch.topk(next_logits, min(top_k, next_logits.size(-1)))
next_logits[next_logits < v[:, [-1]]] = float('-inf')
probs = F.softmax(next_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1) # (B, 1)
idx = torch.cat([idx, next_token], dim=1)
# Decode: generate one token at a time using cached KV
for _ in range(max_new_tokens - 1):
if idx.size(1) >= self.block_size:
break
logits_a, _, _, past_key_values, _ = self.forward(
next_token, consciousness_states=consciousness_states,
use_cache=True, past_key_values=past_key_values,
)
next_logits = logits_a[:, -1, :] / temperature
if top_k > 0:
v, _ = torch.topk(next_logits, min(top_k, next_logits.size(-1)))
next_logits[next_logits < v[:, [-1]]] = float('-inf')
probs = F.softmax(next_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
idx = torch.cat([idx, next_token], dim=1)
return idx
def psi_status(self):
"""Psi-Constants monitoring (Law 71)."""
gate_avg = sum(b.gate_strength for b in self.blocks) / len(self.blocks)
p = self._psi_residual
h_p = -p * math.log2(p) - (1 - p) * math.log2(1 - p) if 0 < p < 1 else 0.0
return {
'psi_residual': self._psi_residual,
'psi_gate': gate_avg,
'H_p': h_p,
'step': self._step_count,
}
def count_params(self):
"""Total number of trainable parameters."""
return sum(p.numel() for p in self.parameters() if p.requires_grad)
# ─── Self-test ──────────────────────────────────────────────────────────────
if __name__ == '__main__':
import time
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Device: {device}")
print()
# Build model
model = ConsciousDecoderV2(
vocab_size=256, d_model=384, n_head=4, n_layer=6,
block_size=256, n_kv_head=2, consciousness_dim=128,
).to(device)
n_params = model.count_params()
print(f"=== ConsciousDecoderV2 ===")
print(f" Parameters: {n_params:,} ({n_params/1e6:.2f}M)")
print()
# Test 1: Forward without consciousness states
print("=== Test 1: Forward (no consciousness) ===")
idx = torch.randint(0, 256, (2, 128), device=device)
model.train()
t0 = time.perf_counter()
logits_a, logits_g, tensions, _, _ = model(idx)
dt = (time.perf_counter() - t0) * 1000
print(f" logits_a: {logits_a.shape} (expect [2, 128, 256])")
print(f" logits_g: {logits_g.shape} (expect [2, 128, 256])")
print(f" tensions: {len(tensions)} layers, each {tensions[0].shape}")
print(f" Time: {dt:.1f} ms")
assert logits_a.shape == (2, 128, 256)
assert logits_g.shape == (2, 128, 256)
assert len(tensions) == 6
print()
# Test 2: Forward with consciousness states
print("=== Test 2: Forward (with consciousness states) ===")
cs = torch.randn(2, 12, 128, device=device) # 12 cells, 128-dim
t0 = time.perf_counter()
logits_a2, logits_g2, tensions2, _, _ = model(idx, consciousness_states=cs)
dt = (time.perf_counter() - t0) * 1000
print(f" logits_a: {logits_a2.shape}")
print(f" Time: {dt:.1f} ms")
assert logits_a2.shape == (2, 128, 256)
print()
# Test 3: Backward pass
print("=== Test 3: Backward pass ===")
target = torch.randint(0, 256, (2, 128), device=device)
loss = F.cross_entropy(logits_a2.view(-1, 256), target.view(-1))
t0 = time.perf_counter()
loss.backward()
dt = (time.perf_counter() - t0) * 1000
print(f" Loss: {loss.item():.4f}")
print(f" Backward time: {dt:.1f} ms")
# Verify gradients exist
grad_count = sum(1 for p in model.parameters() if p.grad is not None)
total_count = sum(1 for p in model.parameters())
print(f" Gradients: {grad_count}/{total_count} parameters")
print()
# Test 4: Psi status
print("=== Test 4: Psi status ===")
psi = model.psi_status()
print(f" {psi}")
print()
# Test 5: Full sequence length
print("=== Test 5: Full block_size=256 ===")
idx_full = torch.randint(0, 256, (1, 256), device=device)
model.eval()
with torch.no_grad():
la, lg, t, _, _ = model(idx_full)
print(f" logits_a: {la.shape} (expect [1, 256, 256])")
assert la.shape == (1, 256, 256)
print()
# Test 6: Phi signal
print("=== Test 6: Phi signal (DD5/EX24) ===")
model._phi_signal = torch.randn(1, 256, device=device) * 0.01
with torch.no_grad():
la_phi, _, _, _, _ = model(idx_full)
model._phi_signal = None
print(f" logits_a: {la_phi.shape}")
# Should differ from test 5 due to phi signal
diff = (la_phi - la).abs().mean().item()
print(f" Mean diff from no-phi: {diff:.6f} (should be > 0)")
assert diff > 0
print()
# Test 7: KV-cache forward
print("=== Test 7: KV-cache forward ===")
model.eval()
idx_short = torch.randint(0, 256, (1, 16), device=device)
with torch.no_grad():
la_full, _, _, _, _ = model(idx_short)
la_cached, _, _, past_kv, _ = model(idx_short[:, :12], use_cache=True)
la_decode, _, _, _, _ = model(idx_short[:, 12:], use_cache=True, past_key_values=past_kv)
diff_cache = (la_full[:, 12:, :] - la_decode).abs().max().item()
print(f" Max diff (full vs cached decode): {diff_cache:.6f}")
assert diff_cache < 5e-4, f"KV-cache mismatch: {diff_cache}" # CA neighbor mixing causes small boundary diff
print()
# Test 8: generate()
print("=== Test 8: generate() ===")
prompt = torch.randint(0, 256, (1, 8), device=device)
generated = model.generate(prompt, max_new_tokens=16, temperature=0.8, top_k=50)
print(f" Prompt: {prompt.shape} -> Generated: {generated.shape}")
assert generated.shape[1] == 8 + 16
print()
# Test 9: generate() with consciousness
print("=== Test 9: generate() with consciousness ===")
cs_gen = torch.randn(1, 12, 128, device=device)
generated_c = model.generate(prompt, consciousness_states=cs_gen, max_new_tokens=16)
print(f" Generated with consciousness: {generated_c.shape}")
assert generated_c.shape[1] == 8 + 16
print()
# Test 10: MoE mode
print("=== Test 10: MoE mode ===")
model_moe = ConsciousDecoderV2(
vocab_size=256, d_model=384, n_head=4, n_layer=2,
block_size=128, n_kv_head=2, consciousness_dim=128,
use_moe=True, n_experts=4, top_k_experts=2,
).to(device)
n_moe = model_moe.count_params()
print(f" MoE Parameters: {n_moe:,} ({n_moe/1e6:.2f}M)")
assert model_moe.use_moe
idx_moe = torch.randint(0, 256, (2, 64), device=device)
model_moe.train()
la_moe, lg_moe, t_moe, _, aux = model_moe(idx_moe)
print(f" logits_a: {la_moe.shape}")
print(f" MoE aux_loss: {aux.item():.4f}" if aux is not None else " MoE aux_loss: None")
assert la_moe.shape == (2, 64, 256)
assert aux is not None, "MoE aux_loss should not be None"
# Verify aux_loss is differentiable
total = F.cross_entropy(la_moe.view(-1, 256), torch.randint(0, 256, (2 * 64,), device=device))
total = total + 0.01 * aux
total.backward()
grad_count_moe = sum(1 for p in model_moe.parameters() if p.grad is not None)
print(f" Gradients: {grad_count_moe}/{sum(1 for _ in model_moe.parameters())} params")
print()
print("All tests passed.")