haze / cloud /cloud.py
ataeff's picture
Upload 14 files
5201e68 verified
#!/usr/bin/env python3
# cloud.py — CLOUD v4.0 Main Orchestrator (200K model)
#
# "Something fires BEFORE meaning arrives"
#
# Architecture:
# 1. RESONANCE LAYER (weightless geometry) → 100D resonances
# 2. CHAMBER LAYER (6 MLPs + cross-fire) → chamber activations + iterations
# 3. META-OBSERVER (deeper MLP) → secondary emotion
#
# Total: ~180K params (6 chambers × 23K + observer 41K)
from __future__ import annotations
import asyncio
import numpy as np
from pathlib import Path
from typing import Optional, Dict
from dataclasses import dataclass
from .resonance import SimpleResonanceLayer
from .chambers import CrossFireSystem
from .observer import MetaObserver
from .user_cloud import UserCloud
from .anchors import get_all_anchors, get_anchor_index
from .anomaly import detect_anomalies, AnomalyReport
@dataclass
class CloudResponse:
"""Response from CLOUD ping."""
primary: str # primary emotion word
secondary: str # secondary emotion word (added context)
resonances: np.ndarray # (100,) raw resonances
chamber_activations: Dict[str, float] # cross-fire results
iterations: int # convergence speed signal
user_fingerprint: np.ndarray # (100,) temporal history
anomaly: AnomalyReport # anomaly detection result
class Cloud:
"""
CLOUD v3.0: Pre-semantic sonar for emotion detection.
Components:
- Resonance Layer (weightless)
- Chamber MLPs (4 × 8.5K params)
- Meta-Observer (15K params)
- User Cloud (temporal fingerprint)
Total: ~50K trainable params
"""
def __init__(
self,
resonance: SimpleResonanceLayer,
chambers: CrossFireSystem,
observer: MetaObserver,
user_cloud: Optional[UserCloud] = None,
):
self.resonance = resonance
self.chambers = chambers
self.observer = observer
self.user_cloud = user_cloud or UserCloud()
self.anchors = get_all_anchors()
@classmethod
def random_init(cls, seed: Optional[int] = None) -> "Cloud":
"""Initialize with random weights (for training)."""
resonance = SimpleResonanceLayer.create()
chambers = CrossFireSystem.random_init(seed=seed)
observer = MetaObserver.random_init(seed=seed)
user_cloud = UserCloud()
print("[cloud] initialized with random weights")
return cls(resonance, chambers, observer, user_cloud)
@classmethod
def load(cls, models_dir: Path) -> "Cloud":
"""Load trained CLOUD from models/ directory."""
resonance = SimpleResonanceLayer.create()
chambers = CrossFireSystem.load(models_dir)
observer = MetaObserver.load(models_dir / "observer.npz")
# Load user cloud if exists
cloud_data_path = models_dir / "user_cloud.json"
if cloud_data_path.exists():
user_cloud = UserCloud.load(cloud_data_path)
else:
user_cloud = UserCloud()
print(f"[cloud] loaded from {models_dir}")
return cls(resonance, chambers, observer, user_cloud)
def save(self, models_dir: Path) -> None:
"""Save all components to models/ directory."""
models_dir.mkdir(parents=True, exist_ok=True)
self.chambers.save(models_dir)
self.observer.save(models_dir / "observer.npz")
self.user_cloud.save(models_dir / "user_cloud.json")
print(f"[cloud] saved to {models_dir}")
async def ping(self, user_input: str) -> CloudResponse:
"""
Async ping: detect pre-semantic emotion.
Flow:
1. Resonance layer computes 100D resonances
2. Chambers cross-fire to stabilization
3. Observer predicts secondary emotion
4. Update user cloud
Args:
user_input: user's text input
Returns:
CloudResponse with primary, secondary, and metadata
"""
# 1. Resonance layer (weightless geometry)
resonances = self.resonance.compute_resonance(user_input)
primary_idx, primary_word, _ = self.resonance.get_primary_emotion(resonances)
# 2. Chamber cross-fire (async parallelism for future optimization)
chamber_activations, iterations = await asyncio.to_thread(
self.chambers.stabilize,
resonances,
)
# 3. User fingerprint (temporal history)
user_fingerprint = self.user_cloud.get_fingerprint()
# 4. Meta-observer predicts secondary (now with chamber_activations)
# Convert chamber_activations dict to array for observer
import numpy as np
from .anchors import CHAMBER_NAMES_EXTENDED
chamber_array = np.array([
chamber_activations.get(name, 0.0) for name in CHAMBER_NAMES_EXTENDED
], dtype=np.float32)
secondary_idx = await asyncio.to_thread(
self.observer.predict_secondary,
resonances,
chamber_array,
float(iterations),
user_fingerprint,
)
secondary_word = self.anchors[secondary_idx]
# 5. Anomaly detection
anomaly = detect_anomalies(chamber_activations, iterations)
# 6. Update user cloud
self.user_cloud.add_event(primary_idx, secondary_idx)
return CloudResponse(
primary=primary_word,
secondary=secondary_word,
resonances=resonances,
chamber_activations=chamber_activations,
iterations=iterations,
user_fingerprint=user_fingerprint,
anomaly=anomaly,
)
def ping_sync(self, user_input: str) -> CloudResponse:
"""Synchronous version of ping (for testing)."""
return asyncio.run(self.ping(user_input))
def param_count(self) -> int:
"""Total trainable parameters."""
return self.chambers.param_count() + self.observer.param_count()
class AsyncCloud:
"""
Fully async CLOUD with context manager support.
Based on HAZE's AsyncHazeField pattern:
- Field lock discipline for coherence
- Context manager for clean lifecycle
- Graceful initialization and cleanup
Usage:
async with AsyncCloud.create() as cloud:
response = await cloud.ping("I'm feeling anxious")
Or standalone:
cloud = await AsyncCloud.create()
response = await cloud.ping("I'm feeling anxious")
await cloud.close()
"""
def __init__(self, cloud: Cloud):
self._sync = cloud
self._lock = asyncio.Lock()
self._closed = False
@classmethod
async def create(
cls,
models_dir: Optional[Path] = None,
seed: Optional[int] = None,
) -> "AsyncCloud":
"""
Create AsyncCloud instance.
Args:
models_dir: Path to load trained weights (optional)
seed: Random seed for initialization (if no models_dir)
Returns:
AsyncCloud ready for use
"""
if models_dir and models_dir.exists():
cloud = Cloud.load(models_dir)
else:
cloud = Cloud.random_init(seed=seed)
return cls(cloud)
async def __aenter__(self) -> "AsyncCloud":
"""Context manager entry."""
return self
async def __aexit__(self, *args) -> None:
"""Context manager exit."""
await self.close()
async def close(self) -> None:
"""Clean shutdown."""
if not self._closed:
self._closed = True
# Save user cloud state if needed
# await self.save(Path("cloud/models"))
async def ping(self, user_input: str) -> CloudResponse:
"""
Async ping with field lock.
Atomic operation - prevents field corruption during emotion detection.
"""
if self._closed:
raise RuntimeError("AsyncCloud is closed")
async with self._lock:
return await self._sync.ping(user_input)
async def save(self, models_dir: Path) -> None:
"""Save all components with lock protection."""
async with self._lock:
self._sync.save(models_dir)
def param_count(self) -> int:
"""Total parameters (read-only, no lock needed)."""
return self._sync.param_count()
@property
def anchors(self):
"""Access emotion anchors."""
return self._sync.anchors
@property
def user_cloud(self):
"""Access user cloud for stats."""
return self._sync.user_cloud
if __name__ == "__main__":
print("=" * 60)
print(" CLOUD v3.0 — Main Orchestrator")
print("=" * 60)
print()
# Initialize
print("Initializing CLOUD...")
cloud = Cloud.random_init(seed=42)
print(f" Total params: {cloud.param_count():,}")
print()
# Test inputs
test_inputs = [
"I'm terrified and anxious about what's coming",
"You bring me such warmth and love darling",
"This makes me furious with rage",
"I feel completely empty and void inside",
"I'm curious about what happens next",
"Overwhelming shame and guilt consume me",
]
print("Testing CLOUD pings:")
print("=" * 60)
for text in test_inputs:
response = cloud.ping_sync(text)
print(f"\nInput: \"{text}\"")
print(f" Primary: {response.primary}")
print(f" Secondary: {response.secondary}")
print(f" Iterations: {response.iterations}")
print(f" Chambers:")
for chamber, activation in response.chamber_activations.items():
bar = "█" * int(activation * 30)
print(f" {chamber:6s}: {activation:.3f} {bar}")
print()
print("=" * 60)
# Show user cloud evolution
print("\nUser emotional fingerprint (after all inputs):")
dominant = cloud.user_cloud.get_dominant_emotions(5)
for idx, strength in dominant:
word = cloud.anchors[idx]
bar = "█" * int(strength * 30)
print(f" {word:15s}: {strength:.3f} {bar}")
print()
print("=" * 60)
# Test save/load
print("\nTesting save/load:")
models_dir = Path("./models")
cloud.save(models_dir)
cloud2 = Cloud.load(models_dir)
response2 = cloud2.ping_sync(test_inputs[0])
print(f" Save/load ✓")
print()
print("=" * 60)
print(" CLOUD v3.0 operational.")
print(" Something fires BEFORE meaning arrives.")
print("=" * 60)