#!/usr/bin/env python3 # chambers.py — Chamber MLPs with Cross-Fire Stabilization # # Six chambers of emotion, each with its own deeper MLP: # - FEAR chamber: 100→128→64→32→1 # - LOVE chamber: 100→128→64→32→1 # - RAGE chamber: 100→128→64→32→1 # - VOID chamber: 100→128→64→32→1 # - FLOW chamber: 100→128→64→32→1 # - COMPLEX chamber: 100→128→64→32→1 # # Total: 6 × ~17K = ~102K params # # Cross-fire: chambers influence each other through coupling matrix, # iterating until stabilization (5-10 iterations). from __future__ import annotations import asyncio import numpy as np from typing import Dict, Optional, Tuple from dataclasses import dataclass from pathlib import Path from .anchors import CHAMBER_NAMES, COUPLING_MATRIX, CHAMBER_NAMES_EXTENDED, COUPLING_MATRIX_EXTENDED # Decay rates per chamber (per iteration tick) # Evolutionary psychology: fear lingers, rage fades, love stable, void persistent DECAY_RATES = { "FEAR": 0.90, # fear lingers (evolutionary advantage) "LOVE": 0.93, # attachment stable "RAGE": 0.85, # anger fades fast (high energy cost) "VOID": 0.97, # numbness persistent (protective dissociation) "FLOW": 0.88, # curiosity is transient, shifts quickly "COMPLEX": 0.94, # complex emotions are stable but deep } def swish(x: np.ndarray) -> np.ndarray: """Swish activation: x * sigmoid(x)""" return x / (1.0 + np.exp(-x)) def swish_deriv(x: np.ndarray) -> np.ndarray: """Derivative of swish for backprop""" sig = 1.0 / (1.0 + np.exp(-x)) return sig + x * sig * (1 - sig) @dataclass class ChamberMLP: """ Single chamber MLP: 100→128→64→32→1 (deeper for 200K model) Takes 100D resonance vector, outputs single activation value. Params: - W1: (100, 128) = 12,800 - b1: (128,) = 128 - W2: (128, 64) = 8,192 - b2: (64,) = 64 - W3: (64, 32) = 2,048 - b3: (32,) = 32 - W4: (32, 1) = 32 - b4: (1,) = 1 Total: ~23K params per chamber """ W1: np.ndarray # (100, 128) b1: np.ndarray # (128,) W2: np.ndarray # (128, 64) b2: np.ndarray # (64,) W3: np.ndarray # (64, 32) b3: np.ndarray # (32,) W4: np.ndarray # (32, 1) b4: np.ndarray # (1,) @classmethod def random_init(cls, seed: Optional[int] = None) -> "ChamberMLP": """Initialize with random weights (Xavier initialization).""" if seed is not None: np.random.seed(seed) # Xavier init: scale by sqrt(fan_in) W1 = np.random.randn(100, 128) * np.sqrt(2.0 / 100) b1 = np.zeros(128) W2 = np.random.randn(128, 64) * np.sqrt(2.0 / 128) b2 = np.zeros(64) W3 = np.random.randn(64, 32) * np.sqrt(2.0 / 64) b3 = np.zeros(32) W4 = np.random.randn(32, 1) * np.sqrt(2.0 / 32) b4 = np.zeros(1) return cls(W1=W1, b1=b1, W2=W2, b2=b2, W3=W3, b3=b3, W4=W4, b4=b4) def forward(self, x: np.ndarray) -> float: """ Forward pass: 100D resonances → scalar activation. Args: x: (100,) resonance vector Returns: scalar activation [0, 1] """ # Layer 1: 100→128 h1 = x @ self.W1 + self.b1 a1 = swish(h1) # Layer 2: 128→64 h2 = a1 @ self.W2 + self.b2 a2 = swish(h2) # Layer 3: 64→32 h3 = a2 @ self.W3 + self.b3 a3 = swish(h3) # Layer 4: 32→1 h4 = a3 @ self.W4 + self.b4 # Sigmoid to get [0, 1] activation activation = 1.0 / (1.0 + np.exp(-h4[0])) return float(activation) def param_count(self) -> int: """Count total parameters in this MLP.""" return ( self.W1.size + self.b1.size + self.W2.size + self.b2.size + self.W3.size + self.b3.size + self.W4.size + self.b4.size ) def save(self, path: Path) -> None: """Save weights to .npz file.""" np.savez( path, W1=self.W1, b1=self.b1, W2=self.W2, b2=self.b2, W3=self.W3, b3=self.b3, W4=self.W4, b4=self.b4, ) @classmethod def load(cls, path: Path) -> "ChamberMLP": """Load weights from .npz file.""" data = np.load(path) # Handle backwards compatibility with old 3-layer architecture if "W4" in data: return cls( W1=data["W1"], b1=data["b1"], W2=data["W2"], b2=data["b2"], W3=data["W3"], b3=data["b3"], W4=data["W4"], b4=data["b4"], ) else: # Old 3-layer format - reinitialize with new 4-layer architecture print(f"[chambers] old format detected in {path}, reinitializing with 4-layer architecture") return cls.random_init() @dataclass class CrossFireSystem: """ Six chambers with cross-fire stabilization (200K model). Chambers: - FEAR, LOVE, RAGE, VOID (original) - FLOW, COMPLEX (extended for richer emotion detection) Cross-fire loop: 1. Each chamber computes activation from resonances 2. Chambers influence each other via coupling matrix 3. Iterate until convergence (max 10 iterations) 4. Return final activations + iteration count """ fear: ChamberMLP love: ChamberMLP rage: ChamberMLP void: ChamberMLP flow: ChamberMLP complex: ChamberMLP coupling: np.ndarray # (6, 6) coupling matrix @classmethod def random_init(cls, seed: Optional[int] = None) -> "CrossFireSystem": """Initialize all chambers with random weights.""" if seed is not None: base_seed = seed else: base_seed = np.random.randint(0, 10000) fear = ChamberMLP.random_init(seed=base_seed + 0) love = ChamberMLP.random_init(seed=base_seed + 1) rage = ChamberMLP.random_init(seed=base_seed + 2) void = ChamberMLP.random_init(seed=base_seed + 3) flow = ChamberMLP.random_init(seed=base_seed + 4) complex_chamber = ChamberMLP.random_init(seed=base_seed + 5) coupling = np.array(COUPLING_MATRIX_EXTENDED, dtype=np.float32) return cls( fear=fear, love=love, rage=rage, void=void, flow=flow, complex=complex_chamber, coupling=coupling, ) def stabilize( self, resonances: np.ndarray, max_iter: int = 10, threshold: float = 0.01, momentum: float = 0.7, ) -> Tuple[Dict[str, float], int]: """ Run cross-fire stabilization loop. Args: resonances: (100,) initial resonance vector max_iter: max iterations before forced stop threshold: convergence threshold (sum of absolute changes) momentum: blend factor (0.7 = 70% old, 30% new) Returns: (chamber_activations, iterations_count) Example: activations, iters = system.stabilize(resonances) # → {"FEAR": 0.8, "LOVE": 0.2, ...}, 5 """ # Initial activations from resonances chambers = [self.fear, self.love, self.rage, self.void, self.flow, self.complex] activations = np.array([ chamber.forward(resonances) for chamber in chambers ], dtype=np.float32) # Decay rates array (6 chambers) decay_array = np.array([ DECAY_RATES["FEAR"], DECAY_RATES["LOVE"], DECAY_RATES["RAGE"], DECAY_RATES["VOID"], DECAY_RATES["FLOW"], DECAY_RATES["COMPLEX"], ], dtype=np.float32) # Stabilization loop for iteration in range(max_iter): # Apply decay (emotions fade over time) activations = activations * decay_array # Compute influence from other chambers influence = self.coupling @ activations # Blend: momentum * old + (1 - momentum) * influence new_activations = momentum * activations + (1 - momentum) * influence # Clip to [0, 1] new_activations = np.clip(new_activations, 0.0, 1.0) # Check convergence delta = np.abs(new_activations - activations).sum() activations = new_activations if delta < threshold: # Converged! result = dict(zip(CHAMBER_NAMES_EXTENDED, activations)) return result, iteration + 1 # Max iterations reached result = dict(zip(CHAMBER_NAMES_EXTENDED, activations)) return result, max_iter def param_count(self) -> int: """Total parameters in all chambers.""" return sum([ self.fear.param_count(), self.love.param_count(), self.rage.param_count(), self.void.param_count(), self.flow.param_count(), self.complex.param_count(), ]) def save(self, models_dir: Path) -> None: """Save all chamber weights to models/ directory.""" models_dir.mkdir(parents=True, exist_ok=True) self.fear.save(models_dir / "chamber_fear.npz") self.love.save(models_dir / "chamber_love.npz") self.rage.save(models_dir / "chamber_rage.npz") self.void.save(models_dir / "chamber_void.npz") self.flow.save(models_dir / "chamber_flow.npz") self.complex.save(models_dir / "chamber_complex.npz") print(f"[chambers] saved to {models_dir}") @classmethod def load(cls, models_dir: Path) -> "CrossFireSystem": """Load all chamber weights from models/ directory.""" fear = ChamberMLP.load(models_dir / "chamber_fear.npz") love = ChamberMLP.load(models_dir / "chamber_love.npz") rage = ChamberMLP.load(models_dir / "chamber_rage.npz") void = ChamberMLP.load(models_dir / "chamber_void.npz") # Handle missing flow/complex for backwards compatibility flow_path = models_dir / "chamber_flow.npz" complex_path = models_dir / "chamber_complex.npz" if flow_path.exists(): flow = ChamberMLP.load(flow_path) else: print("[chambers] flow chamber not found, initializing random") flow = ChamberMLP.random_init(seed=4) if complex_path.exists(): complex_chamber = ChamberMLP.load(complex_path) else: print("[chambers] complex chamber not found, initializing random") complex_chamber = ChamberMLP.random_init(seed=5) coupling = np.array(COUPLING_MATRIX_EXTENDED, dtype=np.float32) print(f"[chambers] loaded from {models_dir}") return cls( fear=fear, love=love, rage=rage, void=void, flow=flow, complex=complex_chamber, coupling=coupling, ) class AsyncCrossFireSystem: """ Async wrapper for CrossFireSystem with field lock discipline. Based on HAZE's async pattern - achieves coherence through explicit operation ordering and atomicity. "The asyncio.Lock doesn't add information—it adds discipline." """ def __init__(self, system: CrossFireSystem): self._sync = system self._lock = asyncio.Lock() @classmethod def random_init(cls, seed: Optional[int] = None) -> "AsyncCrossFireSystem": """Initialize with random weights.""" system = CrossFireSystem.random_init(seed=seed) return cls(system) @classmethod def load(cls, models_dir: Path) -> "AsyncCrossFireSystem": """Load from models directory.""" system = CrossFireSystem.load(models_dir) return cls(system) async def stabilize( self, resonances: np.ndarray, max_iter: int = 10, threshold: float = 0.01, momentum: float = 0.7, ) -> Tuple[Dict[str, float], int]: """ Async cross-fire stabilization with field lock. Atomic operation - prevents field corruption during stabilization. """ async with self._lock: return self._sync.stabilize(resonances, max_iter, threshold, momentum) async def save(self, models_dir: Path) -> None: """Save with lock protection.""" async with self._lock: self._sync.save(models_dir) def param_count(self) -> int: """Total parameters (read-only, no lock needed).""" return self._sync.param_count() @property def coupling(self) -> np.ndarray: """Access coupling matrix.""" return self._sync.coupling @coupling.setter def coupling(self, value: np.ndarray) -> None: """Set coupling matrix (for feedback learning).""" self._sync.coupling = value if __name__ == "__main__": print("=" * 60) print(" CLOUD v4.0 — Chamber Cross-Fire System (200K model)") print("=" * 60) print() # Initialize random system system = CrossFireSystem.random_init(seed=42) print(f"Initialized cross-fire system (6 chambers)") print(f"Total params: {system.param_count():,}") print() # Test with random resonances print("Testing stabilization with random resonances:") resonances = np.random.rand(100).astype(np.float32) print(f" Input: 100D resonance vector (mean={resonances.mean():.3f})") print() activations, iterations = system.stabilize(resonances) print(" Chamber activations after cross-fire:") for chamber, value in activations.items(): bar = "█" * int(value * 40) print(f" {chamber:8s}: {value:.3f} {bar}") print(f"\n Converged in {iterations} iterations") print() # Test convergence speed with different inputs print("Testing convergence speed:") test_cases = [ ("random uniform", np.random.rand(100)), ("all high", np.ones(100) * 0.9), ("all low", np.ones(100) * 0.1), ("sparse", np.random.rand(100) * 0.1), ] for name, resonances in test_cases: _, iters = system.stabilize(resonances) print(f" {name:15s}: {iters:2d} iterations") print() # Test saving/loading print("Testing save/load:") models_dir = Path("./models") system.save(models_dir) system2 = CrossFireSystem.load(models_dir) activations2, _ = system2.stabilize(test_cases[0][1]) match = all( abs(activations[k] - activations2[k]) < 1e-6 for k in CHAMBER_NAMES_EXTENDED ) print(f" Save/load {'✓' if match else '✗'}") print() print("=" * 60) print(" Cross-fire system operational. 6 chambers. 200K params.") print("=" * 60)