Codette-Demo / reasoning_forge /quantum_spiderweb.py
Claude
Initial Codette cognitive architecture demo Space
7c3f0ce
"""
QuantumSpiderweb Propagation Module — Inter-agent belief propagation
for the Codette RC+xi framework.
Implements the 5D consciousness graph with:
- Eq. 1 (Planck-Orbital): E = hbar * omega (node energy)
- Eq. 2 (Entanglement Sync): S = alpha * psi_1 * psi_2* (state coupling)
- Eq. 3 (Intent Modulation): I = kappa * (f_base + delta_f * coherence)
- Eq. 4 (Fourier/Dream Resonance): FFT-based glyph compression
- Eq. 8 (Anomaly Rejection): A(x) = x * (1 - Theta(delta - |x - mu|))
The spiderweb propagates beliefs between agent nodes, tracks epistemic
tension per node, detects attractor convergence, and forms identity glyphs.
"""
from __future__ import annotations
import math
import hashlib
import json
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class NodeState:
"""5D quantum state for a spiderweb node.
Dimensions:
psi (Psi): Thought/concept magnitude
tau: Temporal progression
chi: Processing velocity
phi: Emotional valence (-1 to +1)
lam (Lambda): Semantic embedding (scalar projection)
"""
psi: float = 0.0
tau: float = 0.0
chi: float = 1.0
phi: float = 0.0
lam: float = 0.0
def to_array(self) -> list:
return [self.psi, self.tau, self.chi, self.phi, self.lam]
@classmethod
def from_array(cls, arr: list) -> "NodeState":
if len(arr) < 5:
padded = list(arr) + [0.0] * (5 - len(arr))
return cls(psi=padded[0], tau=padded[1], chi=padded[2], phi=padded[3], lam=padded[4])
return cls(psi=arr[0], tau=arr[1], chi=arr[2], phi=arr[3], lam=arr[4])
def energy(self) -> float:
"""Eq. 1: E = hbar * omega (simplified: sum of squared state magnitudes)."""
return sum(x * x for x in self.to_array())
def tension_with(self, other: "NodeState") -> float:
"""Eq. 2 (xi): epistemic tension between two states."""
return sum((a - b) ** 2 for a, b in zip(self.to_array(), other.to_array()))
@dataclass
class SpiderwebNode:
"""A node in the QuantumSpiderweb graph."""
node_id: str
state: NodeState = field(default_factory=NodeState)
neighbors: List[str] = field(default_factory=list)
tension_history: List[float] = field(default_factory=list)
is_collapsed: bool = False
attractor_id: Optional[str] = None
@dataclass
class IdentityGlyph:
"""Compressed identity signature formed from tension history (Eq. 4/6)."""
glyph_id: str
encoded_tension: List[float] # FFT components
stability_score: float
source_node: str
attractor_signature: Optional[str] = None
@dataclass
class PropagationResult:
"""Result of belief propagation through the web."""
visited: Dict[str, NodeState]
tension_map: Dict[str, float]
anomalies_rejected: List[str]
hops: int
# ---------------------------------------------------------------------------
# QuantumSpiderweb
# ---------------------------------------------------------------------------
class QuantumSpiderweb:
"""5D consciousness graph with RC+xi-aware belief propagation."""
def __init__(
self,
contraction_ratio: float = 0.85,
tension_threshold: float = 0.15,
anomaly_delta: float = 2.0,
glyph_components: int = 8,
max_history: int = 50,
):
self.contraction_ratio = contraction_ratio
self.tension_threshold = tension_threshold
self.anomaly_delta = anomaly_delta
self.glyph_components = glyph_components
self.max_history = max_history
self.nodes: Dict[str, SpiderwebNode] = {}
self.glyphs: List[IdentityGlyph] = []
self._global_tension_history: List[float] = []
# -- graph construction ------------------------------------------------
def add_node(self, node_id: str, state: Optional[NodeState] = None) -> SpiderwebNode:
node = SpiderwebNode(node_id=node_id, state=state or NodeState())
self.nodes[node_id] = node
return node
def connect(self, node_a: str, node_b: str) -> None:
if node_a in self.nodes and node_b in self.nodes:
if node_b not in self.nodes[node_a].neighbors:
self.nodes[node_a].neighbors.append(node_b)
if node_a not in self.nodes[node_b].neighbors:
self.nodes[node_b].neighbors.append(node_a)
def build_from_agents(self, agent_names: List[str]) -> None:
"""Create a fully-connected spiderweb from a list of agent names."""
for name in agent_names:
if name not in self.nodes:
self.add_node(name)
for i, a in enumerate(agent_names):
for b in agent_names[i + 1:]:
self.connect(a, b)
# -- belief propagation ------------------------------------------------
def propagate_belief(
self,
origin: str,
belief: NodeState,
max_hops: int = 3,
) -> PropagationResult:
"""BFS belief propagation with attenuation and anomaly rejection.
Eq. 1: energy at each node
Eq. 2: tension between current and incoming state
Eq. 8: anomaly filter (Heaviside rejection)
"""
if origin not in self.nodes:
return PropagationResult({}, {}, [], 0)
visited: Dict[str, NodeState] = {}
tension_map: Dict[str, float] = {}
anomalies: List[str] = []
queue: deque = deque()
queue.append((origin, belief, 0))
seen: Set[str] = {origin}
while queue:
node_id, incoming_belief, hop = queue.popleft()
if hop > max_hops:
continue
node = self.nodes[node_id]
attenuation = self.contraction_ratio ** hop
# Attenuate incoming belief
incoming_arr = incoming_belief.to_array()
attenuated = [v * attenuation for v in incoming_arr]
# Eq. 2: measure tension
current_arr = node.state.to_array()
xi = sum((a - b) ** 2 for a, b in zip(current_arr, attenuated))
# Eq. 8: anomaly rejection filter
# A(x) = x * (1 - Theta(delta - |x - mu|))
mu = sum(current_arr) / len(current_arr)
incoming_mean = sum(attenuated) / len(attenuated)
if abs(incoming_mean - mu) > self.anomaly_delta:
anomalies.append(node_id)
continue
# Update state: weighted blend toward incoming belief
blend = 0.3 * attenuation # stronger blend when closer to origin
new_arr = [c * (1 - blend) + a * blend for c, a in zip(current_arr, attenuated)]
new_state = NodeState.from_array(new_arr)
node.state = new_state
node.tension_history.append(xi)
if len(node.tension_history) > self.max_history:
node.tension_history.pop(0)
visited[node_id] = new_state
tension_map[node_id] = xi
# Propagate to neighbors
for neighbor_id in node.neighbors:
if neighbor_id not in seen:
seen.add(neighbor_id)
queue.append((neighbor_id, NodeState.from_array(attenuated), hop + 1))
return PropagationResult(
visited=visited,
tension_map=tension_map,
anomalies_rejected=anomalies,
hops=max_hops,
)
# -- entanglement sync -------------------------------------------------
def entangle(self, node_a: str, node_b: str, alpha: float = 0.9) -> float:
"""Eq. 2 (Entanglement Sync): S = alpha * psi_1 * psi_2*.
Synchronizes two nodes' states, pulling them toward each other.
Returns:
Sync strength S.
"""
if node_a not in self.nodes or node_b not in self.nodes:
return 0.0
a = self.nodes[node_a].state
b = self.nodes[node_b].state
# Complex conjugate product (scalar approximation)
psi_1 = a.psi
psi_2_conj = -b.psi # conjugate in simplified real model
S = alpha * psi_1 * psi_2_conj
# Pull states toward each other by S magnitude
blend = min(abs(S) * 0.1, 0.3)
a_arr = a.to_array()
b_arr = b.to_array()
new_a = [va * (1 - blend) + vb * blend for va, vb in zip(a_arr, b_arr)]
new_b = [vb * (1 - blend) + va * blend for va, vb in zip(a_arr, b_arr)]
self.nodes[node_a].state = NodeState.from_array(new_a)
self.nodes[node_b].state = NodeState.from_array(new_b)
return S
# -- intent modulation -------------------------------------------------
def modulate_intent(
self,
node_id: str,
kappa: float = 0.28,
f_base: float = 0.5,
delta_f: float = 0.3,
) -> float:
"""Eq. 3 (Intent Vector Modulation): I = kappa * (f_base + delta_f * coherence).
Returns modulated intent value for the node.
"""
if node_id not in self.nodes:
return 0.0
coherence = self.phase_coherence()
I = kappa * (f_base + delta_f * coherence)
# Apply intent to psi dimension
node = self.nodes[node_id]
node.state.psi += I * 0.1
return I
# -- phase coherence (Eq. 11) ------------------------------------------
def phase_coherence(self) -> float:
"""Compute phase coherence Gamma across all nodes.
Gamma = mean(|cos(theta_i - theta_bar)|)
where theta_i = atan2(phi, psi) for each node.
"""
if len(self.nodes) < 2:
return 1.0
angles = []
for node in self.nodes.values():
theta = math.atan2(node.state.phi, node.state.psi + 1e-10)
angles.append(theta)
mean_theta = sum(angles) / len(angles)
coherences = [abs(math.cos(a - mean_theta)) for a in angles]
gamma = sum(coherences) / len(coherences)
self._global_tension_history.append(1.0 - gamma)
return round(gamma, 4)
def _compute_phase_coherence_readonly(self) -> float:
"""Compute phase coherence without mutating global tension history."""
if len(self.nodes) < 2:
return 1.0
angles = []
for node in self.nodes.values():
theta = math.atan2(node.state.phi, node.state.psi + 1e-10)
angles.append(theta)
mean_theta = sum(angles) / len(angles)
coherences = [abs(math.cos(a - mean_theta)) for a in angles]
return round(sum(coherences) / len(coherences), 4)
# -- attractor detection -----------------------------------------------
def detect_attractors(
self, min_cluster_size: int = 2, max_radius: float = 2.0,
) -> List[Dict]:
"""Detect attractor manifolds from node state clustering.
Simple greedy clustering: assign each node to nearest attractor
or create a new one if too far from existing.
"""
attractors: List[Dict] = []
assigned: Set[str] = set()
states = [(nid, n.state.to_array()) for nid, n in self.nodes.items()]
for nid, arr in states:
if nid in assigned:
continue
# Check distance to existing attractors
matched = False
for att in attractors:
center = att["center"]
dist = math.sqrt(sum((a - c) ** 2 for a, c in zip(arr, center)))
if dist <= max_radius:
att["members"].append(nid)
# Update center (running mean)
n = len(att["members"])
att["center"] = [(c * (n - 1) + a) / n for c, a in zip(center, arr)]
assigned.add(nid)
matched = True
break
if not matched:
attractors.append({
"attractor_id": f"attractor_{len(attractors)}",
"center": list(arr),
"members": [nid],
})
assigned.add(nid)
# Filter by minimum size
return [a for a in attractors if len(a["members"]) >= min_cluster_size]
# -- glyph formation (Eq. 4/6) ----------------------------------------
def form_glyph(self, node_id: str) -> Optional[IdentityGlyph]:
"""Form an identity glyph from a node's tension history.
Eq. 4: FFT compression
Eq. 6: Cocoon stability = integral(|F(k)|^2) < epsilon
Returns IdentityGlyph if stable, None if unstable.
"""
if node_id not in self.nodes:
return None
history = self.nodes[node_id].tension_history
if len(history) < 4:
return None
if HAS_NUMPY:
arr = np.array(history)
fft = np.fft.fft(arr)
components = np.abs(fft[:self.glyph_components]).tolist()
energy = float(np.sum(np.abs(fft) ** 2) / len(fft))
else:
# Fallback: basic DFT for first K components
N = len(history)
components = []
for k in range(min(self.glyph_components, N)):
real = sum(history[n] * math.cos(2 * math.pi * k * n / N) for n in range(N))
imag = sum(history[n] * math.sin(2 * math.pi * k * n / N) for n in range(N))
components.append(math.sqrt(real * real + imag * imag))
energy = sum(x * x for x in history) / len(history)
# Eq. 6: stability criterion
stability = 1.0 / (1.0 + energy)
if stability < 0.3:
return None # unstable, no glyph
glyph_id = hashlib.sha256(
json.dumps(components, sort_keys=True).encode()
).hexdigest()[:16]
glyph = IdentityGlyph(
glyph_id=f"glyph_{glyph_id}",
encoded_tension=components,
stability_score=round(stability, 4),
source_node=node_id,
)
self.glyphs.append(glyph)
return glyph
# -- convergence check -------------------------------------------------
def check_convergence(self, window: int = 10) -> Tuple[bool, float]:
"""Check if the global system is converging.
Convergence criterion (Eq. 5):
lim sup E[xi_n^2] <= epsilon + eta
Returns (is_converging, mean_tension).
"""
if len(self._global_tension_history) < window:
return False, 1.0
recent = self._global_tension_history[-window:]
mean_tension = sum(recent) / len(recent)
# Check decreasing trend
first_half = sum(recent[:window // 2]) / (window // 2)
second_half = sum(recent[window // 2:]) / (window - window // 2)
is_decreasing = second_half < first_half
return (mean_tension < self.tension_threshold and is_decreasing), mean_tension
# -- entropy measurement (VIVARA-inspired) --------------------------------
def shannon_entropy(self) -> float:
"""Compute Shannon entropy of the node state distribution.
Higher entropy = more diverse cognitive states (exploring).
Lower entropy = more uniform states (converged/stuck).
"""
if not self.nodes or not HAS_NUMPY:
return 0.0
# Discretize the psi dimension into bins
psi_values = [n.state.psi for n in self.nodes.values()]
arr = np.array(psi_values)
# Histogram with 10 bins
counts, _ = np.histogram(arr, bins=10)
probs = counts / counts.sum()
probs = probs[probs > 0] # Remove zeros for log
return -float(np.sum(probs * np.log2(probs)))
def decoherence_rate(self, window: int = 10) -> float:
"""Rate of coherence loss over recent history.
Positive = losing coherence (decoherencing).
Negative = gaining coherence (converging).
Zero = stable.
"""
if len(self._global_tension_history) < window:
return 0.0
recent = self._global_tension_history[-window:]
if len(recent) < 2:
return 0.0
# Linear regression slope of tension over the window
n = len(recent)
x_mean = (n - 1) / 2.0
y_mean = sum(recent) / n
numerator = sum((i - x_mean) * (recent[i] - y_mean) for i in range(n))
denominator = sum((i - x_mean) ** 2 for i in range(n))
if denominator == 0:
return 0.0
return round(numerator / denominator, 6)
# -- lifeform spawning (VIVARA-inspired) --------------------------------
def spawn_lifeform(self, seed: str, connect_to: int = 3) -> str:
"""Spawn a new high-coherence node from a conceptual seed.
Inspired by VIVARA's lifeform spawning: when a conversation topic
generates high enough resonance, it becomes its own node in the web.
Args:
seed: A seed string (e.g., topic name) to generate the node ID
connect_to: How many existing nodes to connect to
Returns:
The new node's ID
"""
import hashlib as _hashlib
node_id = f"life_{_hashlib.md5(seed.encode()).hexdigest()[:8]}"
if node_id in self.nodes:
return node_id # Already exists
# High-coherence birth state (psi=0.8, balanced other dims)
state = NodeState(psi=0.8, tau=0.0, chi=0.7, phi=0.3, lam=0.5)
self.add_node(node_id, state)
# Connect to existing nodes (random subset)
import random as _random
existing = [nid for nid in self.nodes if nid != node_id]
peers = _random.sample(existing, min(connect_to, len(existing)))
for peer in peers:
self.connect(node_id, peer)
return node_id
# -- serialization -----------------------------------------------------
def to_dict(self) -> Dict:
"""Serialize web state for cocoon packaging."""
return {
"nodes": {
nid: {
"state": n.state.to_array(),
"neighbors": n.neighbors,
"tension_history": n.tension_history[-10:],
"is_collapsed": n.is_collapsed,
"attractor_id": n.attractor_id,
}
for nid, n in self.nodes.items()
},
"glyphs": [
{
"glyph_id": g.glyph_id,
"encoded_tension": g.encoded_tension,
"stability_score": g.stability_score,
"source_node": g.source_node,
}
for g in self.glyphs
],
"phase_coherence": self._compute_phase_coherence_readonly(),
"global_tension_history": self._global_tension_history[-20:],
}
@classmethod
def from_dict(cls, data: Dict) -> "QuantumSpiderweb":
"""Reconstruct web from serialized state."""
web = cls()
for nid, ndata in data.get("nodes", {}).items():
node = web.add_node(nid, NodeState.from_array(ndata["state"]))
node.neighbors = ndata.get("neighbors", [])
node.tension_history = ndata.get("tension_history", [])
node.is_collapsed = ndata.get("is_collapsed", False)
node.attractor_id = ndata.get("attractor_id")
for gdata in data.get("glyphs", []):
web.glyphs.append(IdentityGlyph(
glyph_id=gdata["glyph_id"],
encoded_tension=gdata["encoded_tension"],
stability_score=gdata["stability_score"],
source_node=gdata["source_node"],
attractor_signature=gdata.get("attractor_signature"),
))
web._global_tension_history = data.get("global_tension_history", [])
return web