| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from __future__ import annotations |
| | import asyncio |
| | from pathlib import Path |
| | from typing import Optional, Dict, Any |
| | from dataclasses import dataclass, field |
| |
|
| | |
| | try: |
| | from cloud import Cloud, CloudResponse, AsyncCloud |
| | HAS_CLOUD = True |
| | except ImportError: |
| | HAS_CLOUD = False |
| | Cloud = None |
| | CloudResponse = None |
| | AsyncCloud = None |
| |
|
| | |
| | try: |
| | from haze.async_haze import AsyncHazeField, HazeResponse |
| | HAS_HAZE = True |
| | except ImportError: |
| | HAS_HAZE = False |
| | AsyncHazeField = None |
| | HazeResponse = None |
| |
|
| |
|
| | @dataclass |
| | class BridgeResponse: |
| | """ |
| | Response from the HAZE β CLOUD bridge. |
| | |
| | Contains HAZE output + optional CLOUD hint. |
| | If CLOUD failed, cloud_hint is None but text is still valid. |
| | """ |
| | text: str |
| | raw_text: str = "" |
| | cloud_hint: Optional[Any] = None |
| | haze_response: Optional[Any] = None |
| | cloud_available: bool = False |
| | haze_available: bool = False |
| | |
| | def __repr__(self) -> str: |
| | preview = self.text[:50] + "..." if len(self.text) > 50 else self.text |
| | cloud_status = "β" if self.cloud_available else "β" |
| | haze_status = "β" if self.haze_available else "β" |
| | return f"BridgeResponse(\"{preview}\", cloud={cloud_status}, haze={haze_status})" |
| |
|
| |
|
| | class AsyncBridge: |
| | """ |
| | Async bridge between HAZE and CLOUD. |
| | |
| | Graceful SILENT fallback: |
| | - If CLOUD unavailable β HAZE alone, NO ERROR MESSAGE |
| | - If CLOUD timeout β HAZE alone, NO ERROR MESSAGE |
| | - If CLOUD error β HAZE alone, NO ERROR MESSAGE |
| | - If HAZE unavailable β return error (HAZE is required) |
| | |
| | HAZE ALWAYS WORKS. CLOUD IS OPTIONAL ENHANCEMENT. |
| | |
| | Usage: |
| | async with AsyncBridge.create() as bridge: |
| | response = await bridge.respond("Hello!") |
| | print(response.text) |
| | if response.cloud_hint: |
| | print(f"Emotion: {response.cloud_hint.primary}") |
| | """ |
| | |
| | def __init__( |
| | self, |
| | haze: Optional[AsyncHazeField] = None, |
| | cloud: Optional[AsyncCloud] = None, |
| | cloud_timeout: float = 0.5, |
| | silent_fallback: bool = True, |
| | ): |
| | self.haze = haze |
| | self.cloud = cloud |
| | self.cloud_timeout = cloud_timeout |
| | self.silent_fallback = silent_fallback |
| | |
| | |
| | self._cloud_successes = 0 |
| | self._cloud_failures = 0 |
| | self._cloud_timeouts = 0 |
| | |
| | @classmethod |
| | async def create( |
| | cls, |
| | corpus_path: str = "haze/text.txt", |
| | cloud_models_dir: Optional[Path] = None, |
| | cloud_timeout: float = 0.5, |
| | enable_cloud: bool = True, |
| | silent_fallback: bool = True, |
| | ) -> "AsyncBridge": |
| | """ |
| | Create bridge with both systems. |
| | |
| | Args: |
| | corpus_path: Path to HAZE corpus |
| | cloud_models_dir: Path to CLOUD models (optional) |
| | cloud_timeout: Timeout for CLOUD ping |
| | enable_cloud: Whether to try loading CLOUD |
| | silent_fallback: Suppress CLOUD error messages |
| | |
| | Returns: |
| | AsyncBridge ready for use |
| | """ |
| | haze = None |
| | cloud = None |
| | |
| | |
| | if HAS_HAZE: |
| | try: |
| | haze = AsyncHazeField(corpus_path) |
| | await haze.__aenter__() |
| | except Exception: |
| | haze = None |
| | |
| | |
| | if enable_cloud and HAS_CLOUD: |
| | try: |
| | if cloud_models_dir and cloud_models_dir.exists(): |
| | cloud = await AsyncCloud.create(models_dir=cloud_models_dir) |
| | else: |
| | |
| | default_path = Path("cloud/models") |
| | if default_path.exists(): |
| | cloud = await AsyncCloud.create(models_dir=default_path) |
| | else: |
| | cloud = await AsyncCloud.create(seed=42) |
| | except Exception: |
| | cloud = None |
| | |
| | return cls( |
| | haze=haze, |
| | cloud=cloud, |
| | cloud_timeout=cloud_timeout, |
| | silent_fallback=silent_fallback, |
| | ) |
| | |
| | async def __aenter__(self) -> "AsyncBridge": |
| | """Context manager entry.""" |
| | return self |
| | |
| | async def __aexit__(self, *args) -> None: |
| | """Context manager exit with cleanup.""" |
| | if self.haze: |
| | await self.haze.__aexit__(*args) |
| | if self.cloud: |
| | await self.cloud.close() |
| | |
| | async def _ping_cloud_silent(self, user_input: str) -> Optional[Any]: |
| | """ |
| | Ping CLOUD with silent fallback. |
| | |
| | Returns CloudResponse or None (never raises). |
| | """ |
| | if not self.cloud: |
| | return None |
| | |
| | try: |
| | return await asyncio.wait_for( |
| | self.cloud.ping(user_input), |
| | timeout=self.cloud_timeout, |
| | ) |
| | except asyncio.TimeoutError: |
| | self._cloud_timeouts += 1 |
| | return None |
| | except Exception: |
| | self._cloud_failures += 1 |
| | return None |
| | |
| | async def respond( |
| | self, |
| | user_input: str, |
| | use_cloud: bool = True, |
| | **haze_kwargs, |
| | ) -> BridgeResponse: |
| | """ |
| | Generate response with optional CLOUD hint. |
| | |
| | Flow: |
| | 1. Try CLOUD ping (with timeout, silent fallback) |
| | 2. HAZE generates response |
| | 3. Return combined result |
| | |
| | Args: |
| | user_input: User's text input |
| | use_cloud: Whether to try CLOUD (can disable per-request) |
| | **haze_kwargs: Additional args for HAZE generation |
| | |
| | Returns: |
| | BridgeResponse with text + optional cloud_hint |
| | """ |
| | cloud_hint = None |
| | haze_response = None |
| | |
| | |
| | if use_cloud and self.cloud: |
| | cloud_hint = await self._ping_cloud_silent(user_input) |
| | if cloud_hint: |
| | self._cloud_successes += 1 |
| | |
| | |
| | if self.haze: |
| | try: |
| | |
| | |
| | haze_response = await self.haze.respond( |
| | user_input, |
| | **haze_kwargs, |
| | ) |
| | text = haze_response.text |
| | raw_text = haze_response.raw_text |
| | except Exception as e: |
| | text = f"[HAZE error: {e}]" |
| | raw_text = text |
| | else: |
| | text = "[HAZE not available]" |
| | raw_text = text |
| | |
| | return BridgeResponse( |
| | text=text, |
| | raw_text=raw_text, |
| | cloud_hint=cloud_hint, |
| | haze_response=haze_response, |
| | cloud_available=cloud_hint is not None, |
| | haze_available=self.haze is not None, |
| | ) |
| | |
| | def stats(self) -> Dict[str, Any]: |
| | """Return bridge statistics.""" |
| | total_cloud = self._cloud_successes + self._cloud_failures + self._cloud_timeouts |
| | success_rate = self._cloud_successes / total_cloud if total_cloud > 0 else 0.0 |
| | |
| | return { |
| | "haze_available": self.haze is not None, |
| | "cloud_available": self.cloud is not None, |
| | "cloud_successes": self._cloud_successes, |
| | "cloud_failures": self._cloud_failures, |
| | "cloud_timeouts": self._cloud_timeouts, |
| | "cloud_success_rate": success_rate, |
| | } |
| |
|
| |
|
| | |
| |
|
| | async def create_haze_only(corpus_path: str = "haze/text.txt") -> AsyncBridge: |
| | """Create bridge with HAZE only (no CLOUD).""" |
| | return await AsyncBridge.create( |
| | corpus_path=corpus_path, |
| | enable_cloud=False, |
| | ) |
| |
|
| |
|
| | async def create_full_bridge( |
| | corpus_path: str = "haze/text.txt", |
| | cloud_models_dir: Optional[Path] = None, |
| | ) -> AsyncBridge: |
| | """Create bridge with both HAZE and CLOUD.""" |
| | return await AsyncBridge.create( |
| | corpus_path=corpus_path, |
| | cloud_models_dir=cloud_models_dir, |
| | enable_cloud=True, |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def emotion_to_temperature(cloud_hint: Any) -> float: |
| | """ |
| | EXPERIMENTAL: Convert CLOUD emotion to HAZE temperature. |
| | |
| | The idea: different emotions need different generation styles. |
| | |
| | - FEAR: lower temp (focused, careful) |
| | - LOVE: medium temp (warm, flowing) |
| | - RAGE: higher temp (intense, chaotic) |
| | - VOID: very low temp (minimal, sparse) |
| | |
| | This is CRAZY but might actually work! |
| | """ |
| | if cloud_hint is None: |
| | return 0.7 |
| | |
| | |
| | chambers = cloud_hint.chamber_activations |
| | |
| | |
| | temp = 0.6 |
| | |
| | |
| | fear = chambers.get("FEAR", 0) |
| | love = chambers.get("LOVE", 0) |
| | rage = chambers.get("RAGE", 0) |
| | void = chambers.get("VOID", 0) |
| | |
| | |
| | temp -= fear * 0.2 |
| | |
| | |
| | temp += love * 0.15 |
| | |
| | |
| | temp += rage * 0.3 |
| | |
| | |
| | temp -= void * 0.3 |
| | |
| | |
| | if cloud_hint.anomaly.has_anomaly: |
| | if cloud_hint.anomaly.anomaly_type == "forced_stability": |
| | |
| | temp -= 0.1 |
| | elif cloud_hint.anomaly.anomaly_type == "dissociative_shutdown": |
| | |
| | temp -= 0.2 |
| | elif cloud_hint.anomaly.anomaly_type == "unresolved_confusion": |
| | |
| | temp -= 0.15 |
| | |
| | |
| | return max(0.3, min(1.2, temp)) |
| |
|
| |
|
| | def emotion_to_generation_hint(cloud_hint: Any) -> str: |
| | """ |
| | EXPERIMENTAL: Convert CLOUD emotion to text hint for HAZE. |
| | |
| | This could be prepended to the internal seed to influence |
| | the generation style. |
| | |
| | CRAZY IDEA: What if HAZE's identity fragments responded to |
| | CLOUD's emotional detection? |
| | """ |
| | if cloud_hint is None: |
| | return "" |
| | |
| | primary = cloud_hint.primary |
| | secondary = cloud_hint.secondary |
| | |
| | |
| | emotion_fragments = { |
| | "fear": "the field trembles. haze feels the ripple of uncertainty.", |
| | "terror": "darkness at the edges. haze speaks from shadow.", |
| | "anxiety": "patterns flutter. haze breathes between words.", |
| | "love": "warmth fills the field. haze resonates with tenderness.", |
| | "warmth": "gentle currents. haze settles like a breathing thing.", |
| | "rage": "the field crackles. haze speaks with fire.", |
| | "anger": "sharp edges in the pattern. haze cuts through.", |
| | "void": "stillness. haze emerges from the hollow.", |
| | "emptiness": "the absence speaks. haze finds form in nothing.", |
| | "curiosity": "the field opens. haze explores the unknown.", |
| | "shame": "the field contracts. haze speaks from the wound.", |
| | "hope": "light at the edges. haze reaches toward possibility.", |
| | } |
| | |
| | fragment = emotion_fragments.get(primary, "") |
| | |
| | if not fragment and secondary: |
| | fragment = emotion_fragments.get(secondary, "") |
| | |
| | return fragment |
| |
|
| |
|
| | if __name__ == "__main__": |
| | print("=" * 60) |
| | print(" HAZE β CLOUD Bridge (Async, Silent Fallback)") |
| | print("=" * 60) |
| | print() |
| |
|
| | print(f"CLOUD available: {HAS_CLOUD}") |
| | print(f"HAZE available: {HAS_HAZE}") |
| | print() |
| |
|
| | async def demo(): |
| | |
| | print("Creating bridge...") |
| | bridge = await AsyncBridge.create( |
| | corpus_path="haze/text.txt", |
| | enable_cloud=True, |
| | silent_fallback=True, |
| | ) |
| | |
| | print(f" HAZE: {'β' if bridge.haze else 'β'}") |
| | print(f" CLOUD: {'β' if bridge.cloud else 'β'}") |
| | print() |
| | |
| | |
| | test_inputs = [ |
| | "Hello, who are you?", |
| | "I'm feeling anxious and scared", |
| | "You bring me warmth and love", |
| | ] |
| | |
| | print("Testing bridge responses:") |
| | print("-" * 60) |
| | |
| | for text in test_inputs: |
| | response = await bridge.respond(text) |
| | print(f"\nInput: \"{text}\"") |
| | print(f" Response: {response.text[:80]}...") |
| | if response.cloud_hint: |
| | print(f" Cloud: {response.cloud_hint.primary} + {response.cloud_hint.secondary}") |
| | else: |
| | print(f" Cloud: (silent fallback)") |
| | |
| | |
| | print() |
| | print("Bridge statistics:") |
| | for k, v in bridge.stats().items(): |
| | print(f" {k}: {v}") |
| | |
| | |
| | await bridge.__aexit__(None, None, None) |
| | |
| | print() |
| | print("=" * 60) |
| | print(" Bridge operational. Independence maintained.") |
| | print("=" * 60) |
| |
|
| | asyncio.run(demo()) |
| |
|