| """
|
| QuantumSpiderweb Propagation Module — Inter-agent belief propagation
|
| for the Codette RC+xi framework.
|
|
|
| Implements the 5D consciousness graph with:
|
| - Eq. 1 (Planck-Orbital): E = hbar * omega (node energy)
|
| - Eq. 2 (Entanglement Sync): S = alpha * psi_1 * psi_2* (state coupling)
|
| - Eq. 3 (Intent Modulation): I = kappa * (f_base + delta_f * coherence)
|
| - Eq. 4 (Fourier/Dream Resonance): FFT-based glyph compression
|
| - Eq. 8 (Anomaly Rejection): A(x) = x * (1 - Theta(delta - |x - mu|))
|
|
|
| The spiderweb propagates beliefs between agent nodes, tracks epistemic
|
| tension per node, detects attractor convergence, and forms identity glyphs.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import math
|
| import hashlib
|
| import json
|
| from collections import deque
|
| from dataclasses import dataclass, field
|
| from typing import Dict, List, Optional, Set, Tuple
|
|
|
| try:
|
| import numpy as np
|
| HAS_NUMPY = True
|
| except ImportError:
|
| HAS_NUMPY = False
|
|
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class NodeState:
|
| """5D quantum state for a spiderweb node.
|
|
|
| Dimensions:
|
| psi (Psi): Thought/concept magnitude
|
| tau: Temporal progression
|
| chi: Processing velocity
|
| phi: Emotional valence (-1 to +1)
|
| lam (Lambda): Semantic embedding (scalar projection)
|
| """
|
| psi: float = 0.0
|
| tau: float = 0.0
|
| chi: float = 1.0
|
| phi: float = 0.0
|
| lam: float = 0.0
|
|
|
| def to_array(self) -> list:
|
| return [self.psi, self.tau, self.chi, self.phi, self.lam]
|
|
|
| @classmethod
|
| def from_array(cls, arr: list) -> "NodeState":
|
| if len(arr) < 5:
|
| padded = list(arr) + [0.0] * (5 - len(arr))
|
| return cls(psi=padded[0], tau=padded[1], chi=padded[2], phi=padded[3], lam=padded[4])
|
| return cls(psi=arr[0], tau=arr[1], chi=arr[2], phi=arr[3], lam=arr[4])
|
|
|
| def energy(self) -> float:
|
| """Eq. 1: E = hbar * omega (simplified: sum of squared state magnitudes)."""
|
| return sum(x * x for x in self.to_array())
|
|
|
| def tension_with(self, other: "NodeState") -> float:
|
| """Eq. 2 (xi): epistemic tension between two states."""
|
| return sum((a - b) ** 2 for a, b in zip(self.to_array(), other.to_array()))
|
|
|
|
|
| @dataclass
|
| class SpiderwebNode:
|
| """A node in the QuantumSpiderweb graph."""
|
| node_id: str
|
| state: NodeState = field(default_factory=NodeState)
|
| neighbors: List[str] = field(default_factory=list)
|
| tension_history: List[float] = field(default_factory=list)
|
| is_collapsed: bool = False
|
| attractor_id: Optional[str] = None
|
|
|
|
|
| @dataclass
|
| class IdentityGlyph:
|
| """Compressed identity signature formed from tension history (Eq. 4/6)."""
|
| glyph_id: str
|
| encoded_tension: List[float]
|
| stability_score: float
|
| source_node: str
|
| attractor_signature: Optional[str] = None
|
|
|
|
|
| @dataclass
|
| class PropagationResult:
|
| """Result of belief propagation through the web."""
|
| visited: Dict[str, NodeState]
|
| tension_map: Dict[str, float]
|
| anomalies_rejected: List[str]
|
| hops: int
|
|
|
|
|
|
|
|
|
|
|
|
|
| class QuantumSpiderweb:
|
| """5D consciousness graph with RC+xi-aware belief propagation."""
|
|
|
| def __init__(
|
| self,
|
| contraction_ratio: float = 0.85,
|
| tension_threshold: float = 0.15,
|
| anomaly_delta: float = 2.0,
|
| glyph_components: int = 8,
|
| max_history: int = 50,
|
| ):
|
| self.contraction_ratio = contraction_ratio
|
| self.tension_threshold = tension_threshold
|
| self.anomaly_delta = anomaly_delta
|
| self.glyph_components = glyph_components
|
| self.max_history = max_history
|
|
|
| self.nodes: Dict[str, SpiderwebNode] = {}
|
| self.glyphs: List[IdentityGlyph] = []
|
| self._global_tension_history: List[float] = []
|
|
|
|
|
|
|
| def add_node(self, node_id: str, state: Optional[NodeState] = None) -> SpiderwebNode:
|
| node = SpiderwebNode(node_id=node_id, state=state or NodeState())
|
| self.nodes[node_id] = node
|
| return node
|
|
|
| def connect(self, node_a: str, node_b: str) -> None:
|
| if node_a in self.nodes and node_b in self.nodes:
|
| if node_b not in self.nodes[node_a].neighbors:
|
| self.nodes[node_a].neighbors.append(node_b)
|
| if node_a not in self.nodes[node_b].neighbors:
|
| self.nodes[node_b].neighbors.append(node_a)
|
|
|
| def build_from_agents(self, agent_names: List[str]) -> None:
|
| """Create a fully-connected spiderweb from a list of agent names."""
|
| for name in agent_names:
|
| if name not in self.nodes:
|
| self.add_node(name)
|
| for i, a in enumerate(agent_names):
|
| for b in agent_names[i + 1:]:
|
| self.connect(a, b)
|
|
|
|
|
|
|
| def propagate_belief(
|
| self,
|
| origin: str,
|
| belief: NodeState,
|
| max_hops: int = 3,
|
| ) -> PropagationResult:
|
| """BFS belief propagation with attenuation and anomaly rejection.
|
|
|
| Eq. 1: energy at each node
|
| Eq. 2: tension between current and incoming state
|
| Eq. 8: anomaly filter (Heaviside rejection)
|
| """
|
| if origin not in self.nodes:
|
| return PropagationResult({}, {}, [], 0)
|
|
|
| visited: Dict[str, NodeState] = {}
|
| tension_map: Dict[str, float] = {}
|
| anomalies: List[str] = []
|
| queue: deque = deque()
|
| queue.append((origin, belief, 0))
|
| seen: Set[str] = {origin}
|
|
|
| while queue:
|
| node_id, incoming_belief, hop = queue.popleft()
|
| if hop > max_hops:
|
| continue
|
|
|
| node = self.nodes[node_id]
|
| attenuation = self.contraction_ratio ** hop
|
|
|
|
|
| incoming_arr = incoming_belief.to_array()
|
| attenuated = [v * attenuation for v in incoming_arr]
|
|
|
|
|
| current_arr = node.state.to_array()
|
| xi = sum((a - b) ** 2 for a, b in zip(current_arr, attenuated))
|
|
|
|
|
|
|
| mu = sum(current_arr) / len(current_arr)
|
| incoming_mean = sum(attenuated) / len(attenuated)
|
| if abs(incoming_mean - mu) > self.anomaly_delta:
|
| anomalies.append(node_id)
|
| continue
|
|
|
|
|
| blend = 0.3 * attenuation
|
| new_arr = [c * (1 - blend) + a * blend for c, a in zip(current_arr, attenuated)]
|
| new_state = NodeState.from_array(new_arr)
|
|
|
| node.state = new_state
|
| node.tension_history.append(xi)
|
| if len(node.tension_history) > self.max_history:
|
| node.tension_history.pop(0)
|
|
|
| visited[node_id] = new_state
|
| tension_map[node_id] = xi
|
|
|
|
|
| for neighbor_id in node.neighbors:
|
| if neighbor_id not in seen:
|
| seen.add(neighbor_id)
|
| queue.append((neighbor_id, NodeState.from_array(attenuated), hop + 1))
|
|
|
| return PropagationResult(
|
| visited=visited,
|
| tension_map=tension_map,
|
| anomalies_rejected=anomalies,
|
| hops=max_hops,
|
| )
|
|
|
|
|
|
|
| def entangle(self, node_a: str, node_b: str, alpha: float = 0.9) -> float:
|
| """Eq. 2 (Entanglement Sync): S = alpha * psi_1 * psi_2*.
|
|
|
| Synchronizes two nodes' states, pulling them toward each other.
|
|
|
| Returns:
|
| Sync strength S.
|
| """
|
| if node_a not in self.nodes or node_b not in self.nodes:
|
| return 0.0
|
|
|
| a = self.nodes[node_a].state
|
| b = self.nodes[node_b].state
|
|
|
|
|
| psi_1 = a.psi
|
| psi_2_conj = -b.psi
|
| S = alpha * psi_1 * psi_2_conj
|
|
|
|
|
| blend = min(abs(S) * 0.1, 0.3)
|
| a_arr = a.to_array()
|
| b_arr = b.to_array()
|
| new_a = [va * (1 - blend) + vb * blend for va, vb in zip(a_arr, b_arr)]
|
| new_b = [vb * (1 - blend) + va * blend for va, vb in zip(a_arr, b_arr)]
|
|
|
| self.nodes[node_a].state = NodeState.from_array(new_a)
|
| self.nodes[node_b].state = NodeState.from_array(new_b)
|
|
|
| return S
|
|
|
|
|
|
|
| def modulate_intent(
|
| self,
|
| node_id: str,
|
| kappa: float = 0.28,
|
| f_base: float = 0.5,
|
| delta_f: float = 0.3,
|
| ) -> float:
|
| """Eq. 3 (Intent Vector Modulation): I = kappa * (f_base + delta_f * coherence).
|
|
|
| Returns modulated intent value for the node.
|
| """
|
| if node_id not in self.nodes:
|
| return 0.0
|
|
|
| coherence = self.phase_coherence()
|
| I = kappa * (f_base + delta_f * coherence)
|
|
|
|
|
| node = self.nodes[node_id]
|
| node.state.psi += I * 0.1
|
| return I
|
|
|
|
|
|
|
| def phase_coherence(self) -> float:
|
| """Compute phase coherence Gamma across all nodes.
|
|
|
| Gamma = mean(|cos(theta_i - theta_bar)|)
|
| where theta_i = atan2(phi, psi) for each node.
|
| """
|
| if len(self.nodes) < 2:
|
| return 1.0
|
|
|
| angles = []
|
| for node in self.nodes.values():
|
| theta = math.atan2(node.state.phi, node.state.psi + 1e-10)
|
| angles.append(theta)
|
|
|
| mean_theta = sum(angles) / len(angles)
|
| coherences = [abs(math.cos(a - mean_theta)) for a in angles]
|
| gamma = sum(coherences) / len(coherences)
|
|
|
| self._global_tension_history.append(1.0 - gamma)
|
| return round(gamma, 4)
|
|
|
| def _compute_phase_coherence_readonly(self) -> float:
|
| """Compute phase coherence without mutating global tension history."""
|
| if len(self.nodes) < 2:
|
| return 1.0
|
| angles = []
|
| for node in self.nodes.values():
|
| theta = math.atan2(node.state.phi, node.state.psi + 1e-10)
|
| angles.append(theta)
|
| mean_theta = sum(angles) / len(angles)
|
| coherences = [abs(math.cos(a - mean_theta)) for a in angles]
|
| return round(sum(coherences) / len(coherences), 4)
|
|
|
|
|
|
|
| def detect_attractors(
|
| self, min_cluster_size: int = 2, max_radius: float = 2.0,
|
| ) -> List[Dict]:
|
| """Detect attractor manifolds from node state clustering.
|
|
|
| Simple greedy clustering: assign each node to nearest attractor
|
| or create a new one if too far from existing.
|
| """
|
| attractors: List[Dict] = []
|
| assigned: Set[str] = set()
|
|
|
| states = [(nid, n.state.to_array()) for nid, n in self.nodes.items()]
|
|
|
| for nid, arr in states:
|
| if nid in assigned:
|
| continue
|
|
|
|
|
| matched = False
|
| for att in attractors:
|
| center = att["center"]
|
| dist = math.sqrt(sum((a - c) ** 2 for a, c in zip(arr, center)))
|
| if dist <= max_radius:
|
| att["members"].append(nid)
|
|
|
| n = len(att["members"])
|
| att["center"] = [(c * (n - 1) + a) / n for c, a in zip(center, arr)]
|
| assigned.add(nid)
|
| matched = True
|
| break
|
|
|
| if not matched:
|
| attractors.append({
|
| "attractor_id": f"attractor_{len(attractors)}",
|
| "center": list(arr),
|
| "members": [nid],
|
| })
|
| assigned.add(nid)
|
|
|
|
|
| return [a for a in attractors if len(a["members"]) >= min_cluster_size]
|
|
|
|
|
|
|
| def form_glyph(self, node_id: str) -> Optional[IdentityGlyph]:
|
| """Form an identity glyph from a node's tension history.
|
|
|
| Eq. 4: FFT compression
|
| Eq. 6: Cocoon stability = integral(|F(k)|^2) < epsilon
|
|
|
| Returns IdentityGlyph if stable, None if unstable.
|
| """
|
| if node_id not in self.nodes:
|
| return None
|
|
|
| history = self.nodes[node_id].tension_history
|
| if len(history) < 4:
|
| return None
|
|
|
| if HAS_NUMPY:
|
| arr = np.array(history)
|
| fft = np.fft.fft(arr)
|
| components = np.abs(fft[:self.glyph_components]).tolist()
|
| energy = float(np.sum(np.abs(fft) ** 2) / len(fft))
|
| else:
|
|
|
| N = len(history)
|
| components = []
|
| for k in range(min(self.glyph_components, N)):
|
| real = sum(history[n] * math.cos(2 * math.pi * k * n / N) for n in range(N))
|
| imag = sum(history[n] * math.sin(2 * math.pi * k * n / N) for n in range(N))
|
| components.append(math.sqrt(real * real + imag * imag))
|
| energy = sum(x * x for x in history) / len(history)
|
|
|
|
|
| stability = 1.0 / (1.0 + energy)
|
| if stability < 0.3:
|
| return None
|
|
|
| glyph_id = hashlib.sha256(
|
| json.dumps(components, sort_keys=True).encode()
|
| ).hexdigest()[:16]
|
|
|
| glyph = IdentityGlyph(
|
| glyph_id=f"glyph_{glyph_id}",
|
| encoded_tension=components,
|
| stability_score=round(stability, 4),
|
| source_node=node_id,
|
| )
|
| self.glyphs.append(glyph)
|
| return glyph
|
|
|
|
|
|
|
| def check_convergence(self, window: int = 10) -> Tuple[bool, float]:
|
| """Check if the global system is converging.
|
|
|
| Convergence criterion (Eq. 5):
|
| lim sup E[xi_n^2] <= epsilon + eta
|
|
|
| Returns (is_converging, mean_tension).
|
| """
|
| if len(self._global_tension_history) < window:
|
| return False, 1.0
|
|
|
| recent = self._global_tension_history[-window:]
|
| mean_tension = sum(recent) / len(recent)
|
|
|
|
|
| first_half = sum(recent[:window // 2]) / (window // 2)
|
| second_half = sum(recent[window // 2:]) / (window - window // 2)
|
| is_decreasing = second_half < first_half
|
|
|
| return (mean_tension < self.tension_threshold and is_decreasing), mean_tension
|
|
|
|
|
|
|
| def shannon_entropy(self) -> float:
|
| """Compute Shannon entropy of the node state distribution.
|
|
|
| Higher entropy = more diverse cognitive states (exploring).
|
| Lower entropy = more uniform states (converged/stuck).
|
| """
|
| if not self.nodes or not HAS_NUMPY:
|
| return 0.0
|
|
|
|
|
| psi_values = [n.state.psi for n in self.nodes.values()]
|
| arr = np.array(psi_values)
|
|
|
|
|
| counts, _ = np.histogram(arr, bins=10)
|
| probs = counts / counts.sum()
|
| probs = probs[probs > 0]
|
|
|
| return -float(np.sum(probs * np.log2(probs)))
|
|
|
| def decoherence_rate(self, window: int = 10) -> float:
|
| """Rate of coherence loss over recent history.
|
|
|
| Positive = losing coherence (decoherencing).
|
| Negative = gaining coherence (converging).
|
| Zero = stable.
|
| """
|
| if len(self._global_tension_history) < window:
|
| return 0.0
|
|
|
| recent = self._global_tension_history[-window:]
|
| if len(recent) < 2:
|
| return 0.0
|
|
|
|
|
| n = len(recent)
|
| x_mean = (n - 1) / 2.0
|
| y_mean = sum(recent) / n
|
| numerator = sum((i - x_mean) * (recent[i] - y_mean) for i in range(n))
|
| denominator = sum((i - x_mean) ** 2 for i in range(n))
|
|
|
| if denominator == 0:
|
| return 0.0
|
| return round(numerator / denominator, 6)
|
|
|
|
|
|
|
| def spawn_lifeform(self, seed: str, connect_to: int = 3) -> str:
|
| """Spawn a new high-coherence node from a conceptual seed.
|
|
|
| Inspired by VIVARA's lifeform spawning: when a conversation topic
|
| generates high enough resonance, it becomes its own node in the web.
|
|
|
| Args:
|
| seed: A seed string (e.g., topic name) to generate the node ID
|
| connect_to: How many existing nodes to connect to
|
|
|
| Returns:
|
| The new node's ID
|
| """
|
| import hashlib as _hashlib
|
| node_id = f"life_{_hashlib.md5(seed.encode()).hexdigest()[:8]}"
|
|
|
| if node_id in self.nodes:
|
| return node_id
|
|
|
|
|
| state = NodeState(psi=0.8, tau=0.0, chi=0.7, phi=0.3, lam=0.5)
|
| self.add_node(node_id, state)
|
|
|
|
|
| import random as _random
|
| existing = [nid for nid in self.nodes if nid != node_id]
|
| peers = _random.sample(existing, min(connect_to, len(existing)))
|
| for peer in peers:
|
| self.connect(node_id, peer)
|
|
|
| return node_id
|
|
|
|
|
|
|
| def to_dict(self) -> Dict:
|
| """Serialize web state for cocoon packaging."""
|
| return {
|
| "nodes": {
|
| nid: {
|
| "state": n.state.to_array(),
|
| "neighbors": n.neighbors,
|
| "tension_history": n.tension_history[-10:],
|
| "is_collapsed": n.is_collapsed,
|
| "attractor_id": n.attractor_id,
|
| }
|
| for nid, n in self.nodes.items()
|
| },
|
| "glyphs": [
|
| {
|
| "glyph_id": g.glyph_id,
|
| "encoded_tension": g.encoded_tension,
|
| "stability_score": g.stability_score,
|
| "source_node": g.source_node,
|
| }
|
| for g in self.glyphs
|
| ],
|
| "phase_coherence": self._compute_phase_coherence_readonly(),
|
| "global_tension_history": self._global_tension_history[-20:],
|
| }
|
|
|
| @classmethod
|
| def from_dict(cls, data: Dict) -> "QuantumSpiderweb":
|
| """Reconstruct web from serialized state."""
|
| web = cls()
|
| for nid, ndata in data.get("nodes", {}).items():
|
| node = web.add_node(nid, NodeState.from_array(ndata["state"]))
|
| node.neighbors = ndata.get("neighbors", [])
|
| node.tension_history = ndata.get("tension_history", [])
|
| node.is_collapsed = ndata.get("is_collapsed", False)
|
| node.attractor_id = ndata.get("attractor_id")
|
| for gdata in data.get("glyphs", []):
|
| web.glyphs.append(IdentityGlyph(
|
| glyph_id=gdata["glyph_id"],
|
| encoded_tension=gdata["encoded_tension"],
|
| stability_score=gdata["stability_score"],
|
| source_node=gdata["source_node"],
|
| attractor_signature=gdata.get("attractor_signature"),
|
| ))
|
| web._global_tension_history = data.get("global_tension_history", [])
|
| return web
|
|
|