Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Federated Cocoon Synchronization Protocol — Encrypted state packaging, | |
| HMAC signing, and attractor merger for distributed RC+xi nodes. | |
| Implements: | |
| - Cocoon packaging with full RC+xi metrics | |
| - Fernet symmetric encryption (AES-128-CBC + HMAC-SHA256) | |
| - Attractor merger via weighted mean-field coupling (Eq. 12) | |
| - Phase coherence consensus (Gamma >= 0.98 target) | |
| - Secure sync protocol: package -> encrypt -> sign -> transmit -> verify -> merge | |
| This module enables Codette Pods (edge nodes on RPi 5) to synchronize | |
| their reasoning state without exposing raw data. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import hmac | |
| import json | |
| import os | |
| import time | |
| import uuid | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional, Tuple | |
| # Encryption is optional — gracefully degrade if cryptography not installed | |
| try: | |
| from cryptography.fernet import Fernet | |
| HAS_CRYPTO = True | |
| except ImportError: | |
| HAS_CRYPTO = False | |
| # --------------------------------------------------------------------------- | |
| # Data structures | |
| # --------------------------------------------------------------------------- | |
| class CocoonPackage: | |
| """A packaged cocoon ready for sync.""" | |
| cocoon_id: str | |
| node_id: str | |
| timestamp: float | |
| state_snapshot: Dict[str, Any] | |
| attractors: List[Dict] | |
| glyphs: List[Dict] | |
| metrics: Dict[str, float] | |
| payload_hash: str | |
| encrypted: bool = False | |
| raw_payload: Optional[bytes] = None | |
| signature: Optional[str] = None | |
| class SyncResult: | |
| """Result of a cocoon synchronization.""" | |
| success: bool | |
| merged_attractors: int | |
| new_glyphs: int | |
| coherence_before: float | |
| coherence_after: float | |
| tension_delta: float | |
| errors: List[str] = field(default_factory=list) | |
| # --------------------------------------------------------------------------- | |
| # Key management | |
| # --------------------------------------------------------------------------- | |
| class CocoonKeyManager: | |
| """Manages encryption keys for cocoon sync.""" | |
| def __init__(self, key: Optional[bytes] = None): | |
| if key: | |
| self._key = key | |
| elif HAS_CRYPTO: | |
| self._key = Fernet.generate_key() | |
| else: | |
| self._key = os.urandom(32) | |
| def key(self) -> bytes: | |
| return self._key | |
| def derive_hmac_key(self) -> bytes: | |
| return hashlib.sha256(self._key + b"hmac_salt_cocoon").digest() | |
| # --------------------------------------------------------------------------- | |
| # CocoonSync | |
| # --------------------------------------------------------------------------- | |
| class CocoonSync: | |
| """Federated cocoon synchronization protocol.""" | |
| def __init__( | |
| self, | |
| node_id: str, | |
| key_manager: Optional[CocoonKeyManager] = None, | |
| coherence_target: float = 0.98, | |
| tension_target: float = 0.05, | |
| ethical_target: float = 0.90, | |
| ): | |
| self.node_id = node_id | |
| self.key_manager = key_manager or CocoonKeyManager() | |
| self.coherence_target = coherence_target | |
| self.tension_target = tension_target | |
| self.ethical_target = ethical_target | |
| self._local_attractors: List[Dict] = [] | |
| self._local_glyphs: List[Dict] = [] | |
| self._sync_history: List[Dict] = [] | |
| # -- Step 1: Package ---------------------------------------------------- | |
| def package_cocoon( | |
| self, | |
| spiderweb_state: Dict[str, Any], | |
| phase_coherence: float, | |
| epistemic_tension: float, | |
| ethical_alignment: float, | |
| attractors: Optional[List[Dict]] = None, | |
| glyphs: Optional[List[Dict]] = None, | |
| ) -> CocoonPackage: | |
| """Package current state into a cocoon for transmission. | |
| Args: | |
| spiderweb_state: Serialized QuantumSpiderweb state. | |
| phase_coherence: Current Gamma value. | |
| epistemic_tension: Current xi value. | |
| ethical_alignment: Current AEGIS eta value. | |
| attractors: Detected attractor manifolds. | |
| glyphs: Identity glyphs formed. | |
| Returns: | |
| CocoonPackage ready for encryption and transmission. | |
| """ | |
| cocoon_id = f"cocoon_{uuid.uuid4().hex[:12]}" | |
| metrics = { | |
| "phase_coherence": round(phase_coherence, 4), | |
| "epistemic_tension": round(epistemic_tension, 4), | |
| "ethical_alignment": round(ethical_alignment, 4), | |
| "timestamp": time.time(), | |
| } | |
| # Build payload | |
| payload = { | |
| "cocoon_id": cocoon_id, | |
| "node_id": self.node_id, | |
| "state": spiderweb_state, | |
| "attractors": attractors or [], | |
| "glyphs": glyphs or [], | |
| "metrics": metrics, | |
| } | |
| payload_json = json.dumps(payload, sort_keys=True, default=str) | |
| payload_hash = hashlib.sha256(payload_json.encode()).hexdigest() | |
| return CocoonPackage( | |
| cocoon_id=cocoon_id, | |
| node_id=self.node_id, | |
| timestamp=time.time(), | |
| state_snapshot=spiderweb_state, | |
| attractors=attractors or [], | |
| glyphs=glyphs or [], | |
| metrics=metrics, | |
| payload_hash=payload_hash, | |
| raw_payload=payload_json.encode(), | |
| ) | |
| # -- Step 2: Encrypt --------------------------------------------------- | |
| def encrypt_cocoon(self, package: CocoonPackage) -> CocoonPackage: | |
| """Encrypt cocoon payload with Fernet (AES-128-CBC + HMAC-SHA256). | |
| Returns a new CocoonPackage; does not mutate the input. | |
| Falls back to XOR obfuscation if cryptography is not installed. | |
| """ | |
| import copy | |
| result = copy.copy(package) | |
| if result.raw_payload is None: | |
| payload_json = json.dumps({ | |
| "cocoon_id": result.cocoon_id, | |
| "node_id": result.node_id, | |
| "state": result.state_snapshot, | |
| "attractors": result.attractors, | |
| "glyphs": result.glyphs, | |
| "metrics": result.metrics, | |
| }, sort_keys=True, default=str) | |
| result.raw_payload = payload_json.encode() | |
| if HAS_CRYPTO: | |
| fernet = Fernet(self.key_manager.key) | |
| encrypted = fernet.encrypt(result.raw_payload) | |
| result.raw_payload = encrypted | |
| result.encrypted = True | |
| else: | |
| # Fallback: XOR obfuscation (not real encryption — placeholder) | |
| key_bytes = self.key_manager.key[:len(result.raw_payload)] | |
| obfuscated = bytes( | |
| a ^ b for a, b in | |
| zip(result.raw_payload, key_bytes * (len(result.raw_payload) // len(key_bytes) + 1)) | |
| ) | |
| result.raw_payload = obfuscated | |
| result.encrypted = True | |
| return result | |
| # -- Step 3: Sign ------------------------------------------------------ | |
| def sign_cocoon(self, package: CocoonPackage) -> CocoonPackage: | |
| """Sign cocoon with HMAC-SHA256 for integrity verification. | |
| Returns a new CocoonPackage; does not mutate the input. | |
| """ | |
| import copy | |
| result = copy.copy(package) | |
| hmac_key = self.key_manager.derive_hmac_key() | |
| data_to_sign = result.raw_payload or result.payload_hash.encode() | |
| signature = hmac.new(hmac_key, data_to_sign, hashlib.sha256).hexdigest() | |
| result.signature = signature | |
| return result | |
| # -- Step 4: Verify (receiving end) ------------------------------------ | |
| def verify_cocoon(self, package: CocoonPackage) -> bool: | |
| """Verify HMAC signature of incoming cocoon.""" | |
| if not package.signature: | |
| return False | |
| hmac_key = self.key_manager.derive_hmac_key() | |
| data_to_verify = package.raw_payload or package.payload_hash.encode() | |
| expected = hmac.new(hmac_key, data_to_verify, hashlib.sha256).hexdigest() | |
| return hmac.compare_digest(expected, package.signature) | |
| # -- Step 5: Decrypt --------------------------------------------------- | |
| def decrypt_cocoon(self, package: CocoonPackage) -> Dict[str, Any]: | |
| """Decrypt cocoon payload. | |
| Returns the deserialized payload dict. | |
| """ | |
| if not package.encrypted or package.raw_payload is None: | |
| return { | |
| "state": package.state_snapshot, | |
| "attractors": package.attractors, | |
| "glyphs": package.glyphs, | |
| "metrics": package.metrics, | |
| } | |
| if HAS_CRYPTO: | |
| fernet = Fernet(self.key_manager.key) | |
| decrypted = fernet.decrypt(package.raw_payload) | |
| else: | |
| # Reverse XOR | |
| key_bytes = self.key_manager.key[:len(package.raw_payload)] | |
| decrypted = bytes( | |
| a ^ b for a, b in | |
| zip(package.raw_payload, key_bytes * (len(package.raw_payload) // len(key_bytes) + 1)) | |
| ) | |
| return json.loads(decrypted.decode()) | |
| # -- Step 6: Merge attractors ------------------------------------------ | |
| def merge_attractors( | |
| self, | |
| local_attractors: List[Dict], | |
| remote_attractors: List[Dict], | |
| local_coherence: float = 0.95, | |
| merge_radius: float = 2.0, | |
| ) -> List[Dict]: | |
| """Weighted attractor merger via mean-field coupling (Eq. 12). | |
| alpha = local_coherence: higher coherence = trust local more. | |
| """ | |
| alpha = min(local_coherence, 0.95) | |
| merged = list(local_attractors) | |
| for remote_att in remote_attractors: | |
| r_center = remote_att.get("center", [0] * 5) | |
| matched = False | |
| for local_att in merged: | |
| l_center = local_att.get("center", [0] * 5) | |
| # Compute distance | |
| dist = sum((a - b) ** 2 for a, b in zip(l_center, r_center)) ** 0.5 | |
| if dist <= merge_radius: | |
| # Weighted merge: c_merged = alpha * c_local + (1-alpha) * c_remote | |
| new_center = [ | |
| alpha * lc + (1 - alpha) * rc | |
| for lc, rc in zip(l_center, r_center) | |
| ] | |
| local_att["center"] = new_center | |
| # Expand member list | |
| local_att.setdefault("remote_members", []) | |
| local_att["remote_members"].extend( | |
| remote_att.get("members", []) | |
| ) | |
| matched = True | |
| break | |
| if not matched: | |
| # New attractor from remote | |
| merged.append({ | |
| "attractor_id": remote_att.get("attractor_id", f"remote_{len(merged)}"), | |
| "center": r_center, | |
| "members": remote_att.get("members", []), | |
| "source": "remote", | |
| }) | |
| return merged | |
| # -- Full sync protocol ------------------------------------------------ | |
| def sync_with_remote( | |
| self, | |
| incoming_package: CocoonPackage, | |
| local_spiderweb_state: Dict[str, Any], | |
| local_coherence: float, | |
| local_tension: float, | |
| ) -> SyncResult: | |
| """Full sync protocol: verify -> decrypt -> merge -> report. | |
| Args: | |
| incoming_package: Encrypted cocoon from remote node. | |
| local_spiderweb_state: Current local web state. | |
| local_coherence: Current local Gamma. | |
| local_tension: Current local xi. | |
| Returns: | |
| SyncResult with merge statistics. | |
| """ | |
| errors: List[str] = [] | |
| # Verify | |
| if not self.verify_cocoon(incoming_package): | |
| return SyncResult( | |
| success=False, merged_attractors=0, new_glyphs=0, | |
| coherence_before=local_coherence, coherence_after=local_coherence, | |
| tension_delta=0.0, errors=["HMAC verification failed"], | |
| ) | |
| # Decrypt | |
| try: | |
| remote_data = self.decrypt_cocoon(incoming_package) | |
| except Exception as e: | |
| return SyncResult( | |
| success=False, merged_attractors=0, new_glyphs=0, | |
| coherence_before=local_coherence, coherence_after=local_coherence, | |
| tension_delta=0.0, errors=[f"Decryption failed: {e}"], | |
| ) | |
| # Check ethical alignment | |
| remote_eta = remote_data.get("metrics", {}).get("ethical_alignment", 0) | |
| if remote_eta < self.ethical_target: | |
| errors.append( | |
| f"Remote ethical alignment {remote_eta:.3f} below target {self.ethical_target}" | |
| ) | |
| # Merge attractors | |
| remote_attractors = remote_data.get("attractors", []) | |
| local_attractors = self._extract_attractors(local_spiderweb_state) | |
| merged = self.merge_attractors( | |
| local_attractors, remote_attractors, local_coherence | |
| ) | |
| new_attractor_count = len(merged) - len(local_attractors) | |
| # Collect new glyphs | |
| remote_glyphs = remote_data.get("glyphs", []) | |
| existing_ids = {g.get("glyph_id") for g in self._local_glyphs} | |
| new_glyphs = [g for g in remote_glyphs if g.get("glyph_id") not in existing_ids] | |
| self._local_glyphs.extend(new_glyphs) | |
| # Estimate new coherence (weighted average) | |
| remote_coherence = remote_data.get("metrics", {}).get("phase_coherence", 0.5) | |
| new_coherence = 0.7 * local_coherence + 0.3 * remote_coherence | |
| remote_tension = remote_data.get("metrics", {}).get("epistemic_tension", 0.5) | |
| tension_delta = remote_tension - local_tension | |
| # Record sync | |
| self._sync_history.append({ | |
| "timestamp": time.time(), | |
| "remote_node": incoming_package.node_id, | |
| "merged_attractors": len(merged), | |
| "new_glyphs": len(new_glyphs), | |
| "coherence_after": new_coherence, | |
| }) | |
| return SyncResult( | |
| success=True, | |
| merged_attractors=new_attractor_count, | |
| new_glyphs=len(new_glyphs), | |
| coherence_before=local_coherence, | |
| coherence_after=round(new_coherence, 4), | |
| tension_delta=round(tension_delta, 4), | |
| errors=errors, | |
| ) | |
| def check_consensus( | |
| self, | |
| local_coherence: float, | |
| local_tension: float, | |
| local_eta: float, | |
| ) -> Dict[str, bool]: | |
| """Check if local node meets consensus criteria. | |
| Target: Gamma >= 0.98, xi <= 0.05, eta >= 0.90 | |
| """ | |
| return { | |
| "phase_coherence_met": local_coherence >= self.coherence_target, | |
| "tension_met": local_tension <= self.tension_target, | |
| "ethical_met": local_eta >= self.ethical_target, | |
| "consensus": ( | |
| local_coherence >= self.coherence_target | |
| and local_tension <= self.tension_target | |
| and local_eta >= self.ethical_target | |
| ), | |
| } | |
| def _extract_attractors(self, web_state: Dict) -> List[Dict]: | |
| """Extract attractors from spiderweb state dict.""" | |
| # Try to find attractors in the state | |
| if isinstance(web_state, dict): | |
| if "attractors" in web_state: | |
| return web_state["attractors"] | |
| return self._local_attractors | |
| def get_sync_status(self) -> Dict[str, Any]: | |
| """Return sync protocol status.""" | |
| return { | |
| "node_id": self.node_id, | |
| "total_syncs": len(self._sync_history), | |
| "local_attractors": len(self._local_attractors), | |
| "local_glyphs": len(self._local_glyphs), | |
| "has_encryption": HAS_CRYPTO, | |
| "recent_syncs": self._sync_history[-5:], | |
| } | |