| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from __future__ import annotations |
| | import asyncio |
| | import numpy as np |
| | from typing import Dict, Optional, Tuple |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| |
|
| | from .anchors import CHAMBER_NAMES, COUPLING_MATRIX, CHAMBER_NAMES_EXTENDED, COUPLING_MATRIX_EXTENDED |
| |
|
| | |
| | |
| | DECAY_RATES = { |
| | "FEAR": 0.90, |
| | "LOVE": 0.93, |
| | "RAGE": 0.85, |
| | "VOID": 0.97, |
| | "FLOW": 0.88, |
| | "COMPLEX": 0.94, |
| | } |
| |
|
| |
|
| | def swish(x: np.ndarray) -> np.ndarray: |
| | """Swish activation: x * sigmoid(x)""" |
| | return x / (1.0 + np.exp(-x)) |
| |
|
| |
|
| | def swish_deriv(x: np.ndarray) -> np.ndarray: |
| | """Derivative of swish for backprop""" |
| | sig = 1.0 / (1.0 + np.exp(-x)) |
| | return sig + x * sig * (1 - sig) |
| |
|
| |
|
| | @dataclass |
| | class ChamberMLP: |
| | """ |
| | Single chamber MLP: 100→128→64→32→1 (deeper for 200K model) |
| | |
| | Takes 100D resonance vector, outputs single activation value. |
| | |
| | Params: |
| | - W1: (100, 128) = 12,800 |
| | - b1: (128,) = 128 |
| | - W2: (128, 64) = 8,192 |
| | - b2: (64,) = 64 |
| | - W3: (64, 32) = 2,048 |
| | - b3: (32,) = 32 |
| | - W4: (32, 1) = 32 |
| | - b4: (1,) = 1 |
| | Total: ~23K params per chamber |
| | """ |
| |
|
| | W1: np.ndarray |
| | b1: np.ndarray |
| | W2: np.ndarray |
| | b2: np.ndarray |
| | W3: np.ndarray |
| | b3: np.ndarray |
| | W4: np.ndarray |
| | b4: np.ndarray |
| |
|
| | @classmethod |
| | def random_init(cls, seed: Optional[int] = None) -> "ChamberMLP": |
| | """Initialize with random weights (Xavier initialization).""" |
| | if seed is not None: |
| | np.random.seed(seed) |
| |
|
| | |
| | W1 = np.random.randn(100, 128) * np.sqrt(2.0 / 100) |
| | b1 = np.zeros(128) |
| |
|
| | W2 = np.random.randn(128, 64) * np.sqrt(2.0 / 128) |
| | b2 = np.zeros(64) |
| |
|
| | W3 = np.random.randn(64, 32) * np.sqrt(2.0 / 64) |
| | b3 = np.zeros(32) |
| |
|
| | W4 = np.random.randn(32, 1) * np.sqrt(2.0 / 32) |
| | b4 = np.zeros(1) |
| |
|
| | return cls(W1=W1, b1=b1, W2=W2, b2=b2, W3=W3, b3=b3, W4=W4, b4=b4) |
| |
|
| | def forward(self, x: np.ndarray) -> float: |
| | """ |
| | Forward pass: 100D resonances → scalar activation. |
| | |
| | Args: |
| | x: (100,) resonance vector |
| | |
| | Returns: |
| | scalar activation [0, 1] |
| | """ |
| | |
| | h1 = x @ self.W1 + self.b1 |
| | a1 = swish(h1) |
| |
|
| | |
| | h2 = a1 @ self.W2 + self.b2 |
| | a2 = swish(h2) |
| |
|
| | |
| | h3 = a2 @ self.W3 + self.b3 |
| | a3 = swish(h3) |
| |
|
| | |
| | h4 = a3 @ self.W4 + self.b4 |
| |
|
| | |
| | activation = 1.0 / (1.0 + np.exp(-h4[0])) |
| |
|
| | return float(activation) |
| |
|
| | def param_count(self) -> int: |
| | """Count total parameters in this MLP.""" |
| | return ( |
| | self.W1.size + self.b1.size + |
| | self.W2.size + self.b2.size + |
| | self.W3.size + self.b3.size + |
| | self.W4.size + self.b4.size |
| | ) |
| |
|
| | def save(self, path: Path) -> None: |
| | """Save weights to .npz file.""" |
| | np.savez( |
| | path, |
| | W1=self.W1, |
| | b1=self.b1, |
| | W2=self.W2, |
| | b2=self.b2, |
| | W3=self.W3, |
| | b3=self.b3, |
| | W4=self.W4, |
| | b4=self.b4, |
| | ) |
| |
|
| | @classmethod |
| | def load(cls, path: Path) -> "ChamberMLP": |
| | """Load weights from .npz file.""" |
| | data = np.load(path) |
| | |
| | if "W4" in data: |
| | return cls( |
| | W1=data["W1"], |
| | b1=data["b1"], |
| | W2=data["W2"], |
| | b2=data["b2"], |
| | W3=data["W3"], |
| | b3=data["b3"], |
| | W4=data["W4"], |
| | b4=data["b4"], |
| | ) |
| | else: |
| | |
| | print(f"[chambers] old format detected in {path}, reinitializing with 4-layer architecture") |
| | return cls.random_init() |
| |
|
| |
|
| | @dataclass |
| | class CrossFireSystem: |
| | """ |
| | Six chambers with cross-fire stabilization (200K model). |
| | |
| | Chambers: |
| | - FEAR, LOVE, RAGE, VOID (original) |
| | - FLOW, COMPLEX (extended for richer emotion detection) |
| | |
| | Cross-fire loop: |
| | 1. Each chamber computes activation from resonances |
| | 2. Chambers influence each other via coupling matrix |
| | 3. Iterate until convergence (max 10 iterations) |
| | 4. Return final activations + iteration count |
| | """ |
| |
|
| | fear: ChamberMLP |
| | love: ChamberMLP |
| | rage: ChamberMLP |
| | void: ChamberMLP |
| | flow: ChamberMLP |
| | complex: ChamberMLP |
| | coupling: np.ndarray |
| |
|
| | @classmethod |
| | def random_init(cls, seed: Optional[int] = None) -> "CrossFireSystem": |
| | """Initialize all chambers with random weights.""" |
| | if seed is not None: |
| | base_seed = seed |
| | else: |
| | base_seed = np.random.randint(0, 10000) |
| |
|
| | fear = ChamberMLP.random_init(seed=base_seed + 0) |
| | love = ChamberMLP.random_init(seed=base_seed + 1) |
| | rage = ChamberMLP.random_init(seed=base_seed + 2) |
| | void = ChamberMLP.random_init(seed=base_seed + 3) |
| | flow = ChamberMLP.random_init(seed=base_seed + 4) |
| | complex_chamber = ChamberMLP.random_init(seed=base_seed + 5) |
| |
|
| | coupling = np.array(COUPLING_MATRIX_EXTENDED, dtype=np.float32) |
| |
|
| | return cls( |
| | fear=fear, |
| | love=love, |
| | rage=rage, |
| | void=void, |
| | flow=flow, |
| | complex=complex_chamber, |
| | coupling=coupling, |
| | ) |
| |
|
| | def stabilize( |
| | self, |
| | resonances: np.ndarray, |
| | max_iter: int = 10, |
| | threshold: float = 0.01, |
| | momentum: float = 0.7, |
| | ) -> Tuple[Dict[str, float], int]: |
| | """ |
| | Run cross-fire stabilization loop. |
| | |
| | Args: |
| | resonances: (100,) initial resonance vector |
| | max_iter: max iterations before forced stop |
| | threshold: convergence threshold (sum of absolute changes) |
| | momentum: blend factor (0.7 = 70% old, 30% new) |
| | |
| | Returns: |
| | (chamber_activations, iterations_count) |
| | |
| | Example: |
| | activations, iters = system.stabilize(resonances) |
| | # → {"FEAR": 0.8, "LOVE": 0.2, ...}, 5 |
| | """ |
| | |
| | chambers = [self.fear, self.love, self.rage, self.void, self.flow, self.complex] |
| | activations = np.array([ |
| | chamber.forward(resonances) |
| | for chamber in chambers |
| | ], dtype=np.float32) |
| |
|
| | |
| | decay_array = np.array([ |
| | DECAY_RATES["FEAR"], |
| | DECAY_RATES["LOVE"], |
| | DECAY_RATES["RAGE"], |
| | DECAY_RATES["VOID"], |
| | DECAY_RATES["FLOW"], |
| | DECAY_RATES["COMPLEX"], |
| | ], dtype=np.float32) |
| |
|
| | |
| | for iteration in range(max_iter): |
| | |
| | activations = activations * decay_array |
| |
|
| | |
| | influence = self.coupling @ activations |
| |
|
| | |
| | new_activations = momentum * activations + (1 - momentum) * influence |
| |
|
| | |
| | new_activations = np.clip(new_activations, 0.0, 1.0) |
| |
|
| | |
| | delta = np.abs(new_activations - activations).sum() |
| | activations = new_activations |
| |
|
| | if delta < threshold: |
| | |
| | result = dict(zip(CHAMBER_NAMES_EXTENDED, activations)) |
| | return result, iteration + 1 |
| |
|
| | |
| | result = dict(zip(CHAMBER_NAMES_EXTENDED, activations)) |
| | return result, max_iter |
| |
|
| | def param_count(self) -> int: |
| | """Total parameters in all chambers.""" |
| | return sum([ |
| | self.fear.param_count(), |
| | self.love.param_count(), |
| | self.rage.param_count(), |
| | self.void.param_count(), |
| | self.flow.param_count(), |
| | self.complex.param_count(), |
| | ]) |
| |
|
| | def save(self, models_dir: Path) -> None: |
| | """Save all chamber weights to models/ directory.""" |
| | models_dir.mkdir(parents=True, exist_ok=True) |
| | self.fear.save(models_dir / "chamber_fear.npz") |
| | self.love.save(models_dir / "chamber_love.npz") |
| | self.rage.save(models_dir / "chamber_rage.npz") |
| | self.void.save(models_dir / "chamber_void.npz") |
| | self.flow.save(models_dir / "chamber_flow.npz") |
| | self.complex.save(models_dir / "chamber_complex.npz") |
| | print(f"[chambers] saved to {models_dir}") |
| |
|
| | @classmethod |
| | def load(cls, models_dir: Path) -> "CrossFireSystem": |
| | """Load all chamber weights from models/ directory.""" |
| | fear = ChamberMLP.load(models_dir / "chamber_fear.npz") |
| | love = ChamberMLP.load(models_dir / "chamber_love.npz") |
| | rage = ChamberMLP.load(models_dir / "chamber_rage.npz") |
| | void = ChamberMLP.load(models_dir / "chamber_void.npz") |
| | |
| | |
| | flow_path = models_dir / "chamber_flow.npz" |
| | complex_path = models_dir / "chamber_complex.npz" |
| | |
| | if flow_path.exists(): |
| | flow = ChamberMLP.load(flow_path) |
| | else: |
| | print("[chambers] flow chamber not found, initializing random") |
| | flow = ChamberMLP.random_init(seed=4) |
| | |
| | if complex_path.exists(): |
| | complex_chamber = ChamberMLP.load(complex_path) |
| | else: |
| | print("[chambers] complex chamber not found, initializing random") |
| | complex_chamber = ChamberMLP.random_init(seed=5) |
| |
|
| | coupling = np.array(COUPLING_MATRIX_EXTENDED, dtype=np.float32) |
| |
|
| | print(f"[chambers] loaded from {models_dir}") |
| | return cls( |
| | fear=fear, |
| | love=love, |
| | rage=rage, |
| | void=void, |
| | flow=flow, |
| | complex=complex_chamber, |
| | coupling=coupling, |
| | ) |
| |
|
| |
|
| | class AsyncCrossFireSystem: |
| | """ |
| | Async wrapper for CrossFireSystem with field lock discipline. |
| | |
| | Based on HAZE's async pattern - achieves coherence through |
| | explicit operation ordering and atomicity. |
| | |
| | "The asyncio.Lock doesn't add information—it adds discipline." |
| | """ |
| | |
| | def __init__(self, system: CrossFireSystem): |
| | self._sync = system |
| | self._lock = asyncio.Lock() |
| | |
| | @classmethod |
| | def random_init(cls, seed: Optional[int] = None) -> "AsyncCrossFireSystem": |
| | """Initialize with random weights.""" |
| | system = CrossFireSystem.random_init(seed=seed) |
| | return cls(system) |
| | |
| | @classmethod |
| | def load(cls, models_dir: Path) -> "AsyncCrossFireSystem": |
| | """Load from models directory.""" |
| | system = CrossFireSystem.load(models_dir) |
| | return cls(system) |
| | |
| | async def stabilize( |
| | self, |
| | resonances: np.ndarray, |
| | max_iter: int = 10, |
| | threshold: float = 0.01, |
| | momentum: float = 0.7, |
| | ) -> Tuple[Dict[str, float], int]: |
| | """ |
| | Async cross-fire stabilization with field lock. |
| | |
| | Atomic operation - prevents field corruption during stabilization. |
| | """ |
| | async with self._lock: |
| | return self._sync.stabilize(resonances, max_iter, threshold, momentum) |
| | |
| | async def save(self, models_dir: Path) -> None: |
| | """Save with lock protection.""" |
| | async with self._lock: |
| | self._sync.save(models_dir) |
| | |
| | def param_count(self) -> int: |
| | """Total parameters (read-only, no lock needed).""" |
| | return self._sync.param_count() |
| | |
| | @property |
| | def coupling(self) -> np.ndarray: |
| | """Access coupling matrix.""" |
| | return self._sync.coupling |
| | |
| | @coupling.setter |
| | def coupling(self, value: np.ndarray) -> None: |
| | """Set coupling matrix (for feedback learning).""" |
| | self._sync.coupling = value |
| |
|
| |
|
| | if __name__ == "__main__": |
| | print("=" * 60) |
| | print(" CLOUD v4.0 — Chamber Cross-Fire System (200K model)") |
| | print("=" * 60) |
| | print() |
| |
|
| | |
| | system = CrossFireSystem.random_init(seed=42) |
| | print(f"Initialized cross-fire system (6 chambers)") |
| | print(f"Total params: {system.param_count():,}") |
| | print() |
| |
|
| | |
| | print("Testing stabilization with random resonances:") |
| | resonances = np.random.rand(100).astype(np.float32) |
| | print(f" Input: 100D resonance vector (mean={resonances.mean():.3f})") |
| | print() |
| |
|
| | activations, iterations = system.stabilize(resonances) |
| |
|
| | print(" Chamber activations after cross-fire:") |
| | for chamber, value in activations.items(): |
| | bar = "█" * int(value * 40) |
| | print(f" {chamber:8s}: {value:.3f} {bar}") |
| | print(f"\n Converged in {iterations} iterations") |
| | print() |
| |
|
| | |
| | print("Testing convergence speed:") |
| | test_cases = [ |
| | ("random uniform", np.random.rand(100)), |
| | ("all high", np.ones(100) * 0.9), |
| | ("all low", np.ones(100) * 0.1), |
| | ("sparse", np.random.rand(100) * 0.1), |
| | ] |
| |
|
| | for name, resonances in test_cases: |
| | _, iters = system.stabilize(resonances) |
| | print(f" {name:15s}: {iters:2d} iterations") |
| | print() |
| |
|
| | |
| | print("Testing save/load:") |
| | models_dir = Path("./models") |
| | system.save(models_dir) |
| |
|
| | system2 = CrossFireSystem.load(models_dir) |
| | activations2, _ = system2.stabilize(test_cases[0][1]) |
| |
|
| | match = all( |
| | abs(activations[k] - activations2[k]) < 1e-6 |
| | for k in CHAMBER_NAMES_EXTENDED |
| | ) |
| | print(f" Save/load {'✓' if match else '✗'}") |
| | print() |
| |
|
| | print("=" * 60) |
| | print(" Cross-fire system operational. 6 chambers. 200K params.") |
| | print("=" * 60) |
| |
|