""" Address Projection & Dynamics Nodes ==================================== These nodes connect AFTER ModeAddressAlgebraNode. AddressProjectionNode: - Takes a field and an address - Projects the field through the address (filters it) - Shows what an attractor "sees" through its address lens - Implements: ψ_seen = P_A[ψ] AttractorDynamicsNode: - Takes stable_address and metrics from ModeAddressAlgebra - Implements division-dilution balance from IHT-AI - Tracks attractor stability over time - Shows convergence/divergence dynamics AddressLearnerNode: - Learns optimal address via gradient descent - Implements the W-matrix training from IHT-AI - Finds protected mode combinations """ import numpy as np import cv2 from scipy.fft import fft2, ifft2, fftshift, ifftshift from scipy.ndimage import gaussian_filter # --- HOST COMMUNICATION --- import __main__ try: BaseNode = __main__.BaseNode QtGui = __main__.QtGui except AttributeError: class BaseNode: def get_blended_input(self, name, mode): return None import PyQt6.QtGui as QtGui class AddressProjectionNode(BaseNode): """ Projects a quantum field through an address filter. Implements: ψ_seen = P_A[ψ] = F^{-1}[A · F[ψ]] This is what the attractor "sees" - reality filtered through its address. """ NODE_CATEGORY = "Intelligence" NODE_TITLE = "Address Projection" NODE_COLOR = QtGui.QColor(200, 150, 100) # Orange-brown def __init__(self): super().__init__() self.inputs = { 'complex_spectrum': 'complex_spectrum', # The field ψ(k) 'address_mask': 'image', # The address A (from ModeAddressAlgebra) 'projection_strength': 'signal' # How hard to filter (0=pass all, 1=strict) } self.outputs = { 'projected_field': 'complex_spectrum', # P_A[ψ] 'projected_image': 'image', # |P_A[ψ]| in position space 'filtered_out': 'image', # What was rejected 'projection_loss': 'signal' # How much energy was lost } self.size = 128 # State self.psi_in = None self.psi_projected = None self.address = None self.projected_spatial = None self.filtered_out_spatial = None self.projection_loss = 0.0 # Parameters self.projection_strength = 1.0 def step(self): # Get inputs psi = self.get_blended_input('complex_spectrum', 'first') address = self.get_blended_input('address_mask', 'first') strength = self.get_blended_input('projection_strength', 'sum') if strength is not None: self.projection_strength = np.clip(float(strength), 0.0, 1.0) if psi is None: return # Ensure correct size if psi.shape != (self.size, self.size): # Can't easily resize complex, so skip return self.psi_in = psi.astype(np.complex64) # Process address mask if address is not None: if address.ndim == 3: address = np.mean(address, axis=2) if address.shape != (self.size, self.size): address = cv2.resize(address.astype(np.float32), (self.size, self.size)) # Normalize to 0-1 self.address = address.astype(np.float32) / (np.max(address) + 1e-9) else: # Default: pass everything self.address = np.ones((self.size, self.size), dtype=np.float32) # Apply projection strength (interpolate between full pass and strict filter) effective_address = (1 - self.projection_strength) + self.projection_strength * self.address # Shift to centered k-space for proper filtering psi_k_centered = fftshift(self.psi_in) # Apply address filter psi_projected_k = psi_k_centered * effective_address psi_rejected_k = psi_k_centered * (1 - effective_address) # Shift back and store self.psi_projected = ifftshift(psi_projected_k) # Transform to position space for visualization self.projected_spatial = np.abs(ifft2(self.psi_projected)) self.filtered_out_spatial = np.abs(ifft2(ifftshift(psi_rejected_k))) # Compute projection loss (fraction of energy filtered out) energy_in = np.sum(np.abs(psi_k_centered) ** 2) energy_out = np.sum(np.abs(psi_projected_k) ** 2) self.projection_loss = 1.0 - (energy_out / (energy_in + 1e-9)) def get_output(self, port_name): if port_name == 'projected_field': return self.psi_projected elif port_name == 'projected_image': if self.projected_spatial is not None: img = self.projected_spatial img_norm = img / (np.max(img) + 1e-9) return (img_norm * 255).astype(np.uint8) return None elif port_name == 'filtered_out': if self.filtered_out_spatial is not None: img = self.filtered_out_spatial img_norm = img / (np.max(img) + 1e-9) return (img_norm * 255).astype(np.uint8) return None elif port_name == 'projection_loss': return float(self.projection_loss) return None def get_display_image(self): if self.projected_spatial is None: return None h, w = self.size, self.size # Left: What passes through (projected) proj_norm = self.projected_spatial / (np.max(self.projected_spatial) + 1e-9) proj_vis = (proj_norm * 255).astype(np.uint8) proj_color = cv2.applyColorMap(proj_vis, cv2.COLORMAP_VIRIDIS) # Right: What was filtered out filt_norm = self.filtered_out_spatial / (np.max(self.filtered_out_spatial) + 1e-9) filt_vis = (filt_norm * 255).astype(np.uint8) filt_color = cv2.applyColorMap(filt_vis, cv2.COLORMAP_HOT) full = np.hstack((proj_color, filt_color)) cv2.putText(full, f"Seen (loss={self.projection_loss:.1%})", (5, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) cv2.putText(full, "Filtered Out", (w + 5, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) return QtGui.QImage(full.data, w*2, h, w*2*3, QtGui.QImage.Format.Format_BGR888) def get_config_options(self): return [ ("Projection Strength", "projection_strength", self.projection_strength, None), ] class AttractorDynamicsNode(BaseNode): """ Implements the division-dilution balance from IHT-AI. Division: Amplitude spreading (+1+1+1...) Dilution: Normalization constraint (→1) Stable attractors exist only where these balance. Takes metrics from ModeAddressAlgebra and tracks attractor health. """ NODE_CATEGORY = "Intelligence" NODE_TITLE = "Attractor Dynamics" NODE_COLOR = QtGui.QColor(150, 200, 100) # Yellow-green def __init__(self): super().__init__() self.inputs = { 'stable_address': 'image', # From ModeAddressAlgebra 'address_entropy': 'signal', # S(A) 'participation_ratio': 'signal', # PR 'complex_spectrum': 'complex_spectrum', # Optional: the field itself 'dilution_rate': 'signal' # γ parameter } self.outputs = { 'attractor_health': 'signal', # 0-1 overall health metric 'stability_map': 'image', # Spatial stability 'division_rate': 'signal', # How fast it's spreading 'time_to_collapse': 'signal', # Estimated steps until collapse 'evolved_field': 'complex_spectrum' # Field after dynamics applied } self.size = 128 # History tracking self.entropy_history = [] self.pr_history = [] self.health_history = [] self.stable_size_history = [] # Current state self.stable_address = None self.stability_map = None self.attractor_health = 0.5 self.division_rate = 0.0 self.time_to_collapse = float('inf') # Internal field for evolution self.psi = None # Parameters self.dilution_rate = 0.02 self.division_strength = 0.1 def compute_health(self, entropy, pr, stable_size): """ Attractor health based on: - Moderate entropy (not too spread, not too concentrated) - High participation ratio (uses many modes) - Large stable address (many protected modes) """ # Optimal entropy around 0.5 (normalized) entropy_score = 1.0 - abs(entropy - 0.5) * 2 # PR should be high but not infinite # Normalize assuming max useful PR around 10000 pr_score = min(pr / 5000.0, 1.0) # Stable size as fraction of total size_score = stable_size / (self.size * self.size) # Weighted combination health = 0.3 * entropy_score + 0.3 * pr_score + 0.4 * size_score return np.clip(health, 0, 1) def estimate_collapse_time(self): """Estimate time to collapse based on health trend""" if len(self.health_history) < 10: return float('inf') # Linear regression on recent health recent = self.health_history[-20:] x = np.arange(len(recent)) slope = np.polyfit(x, recent, 1)[0] if slope >= 0: return float('inf') # Improving or stable # Time to reach 0 from current health current = self.health_history[-1] return -current / slope def step(self): # Get inputs stable_addr = self.get_blended_input('stable_address', 'first') entropy = self.get_blended_input('address_entropy', 'sum') pr = self.get_blended_input('participation_ratio', 'sum') psi = self.get_blended_input('complex_spectrum', 'first') dilution = self.get_blended_input('dilution_rate', 'sum') if dilution is not None: self.dilution_rate = np.clip(float(dilution), 0.0, 0.5) # Process stable address if stable_addr is not None: if stable_addr.ndim == 3: stable_addr = np.mean(stable_addr, axis=2) if stable_addr.shape != (self.size, self.size): stable_addr = cv2.resize(stable_addr.astype(np.float32), (self.size, self.size)) self.stable_address = stable_addr.astype(np.float32) / (np.max(stable_addr) + 1e-9) else: self.stable_address = np.ones((self.size, self.size), dtype=np.float32) * 0.5 # Get metrics with defaults entropy_val = float(entropy) if entropy is not None else 0.5 pr_val = float(pr) if pr is not None else 1000.0 stable_size = np.sum(self.stable_address > 0.5) # Store history self.entropy_history.append(entropy_val) self.pr_history.append(pr_val) self.stable_size_history.append(stable_size) # Trim history max_hist = 100 for hist in [self.entropy_history, self.pr_history, self.stable_size_history, self.health_history]: while len(hist) > max_hist: hist.pop(0) # Compute health self.attractor_health = self.compute_health(entropy_val, pr_val, stable_size) self.health_history.append(self.attractor_health) # Estimate collapse time self.time_to_collapse = self.estimate_collapse_time() # Compute division rate (how fast the address is spreading) if len(self.stable_size_history) > 1: self.division_rate = (self.stable_size_history[-1] - self.stable_size_history[-2]) / self.size**2 # Create stability map # High stability = high in stable address AND consistent over time self.stability_map = self.stable_address.copy() # Apply division-dilution to field if provided if psi is not None and psi.shape == (self.size, self.size): self.psi = psi.astype(np.complex64) # Division: slight spreading via Laplacian in k-space # (equivalent to multiplication by k^2) center = self.size // 2 y, x = np.ogrid[:self.size, :self.size] k2 = ((x - center)**2 + (y - center)**2).astype(np.float32) k2 = k2 / (center**2) # Normalize psi_k = fftshift(fft2(self.psi)) # Division: amplitude wants to spread to higher k division = 1.0 + self.division_strength * k2 * 0.01 # Dilution: decay proportional to dilution rate dilution_factor = 1.0 - self.dilution_rate # Apply stable address as protection # Modes in stable address are protected from dilution protection = fftshift(self.stable_address) effective_dilution = dilution_factor + (1 - dilution_factor) * protection # Apply dynamics psi_k = psi_k * division * effective_dilution # Transform back self.psi = ifft2(ifftshift(psi_k)).astype(np.complex64) def get_output(self, port_name): if port_name == 'attractor_health': return float(self.attractor_health) elif port_name == 'stability_map': if self.stability_map is not None: return (self.stability_map * 255).astype(np.uint8) return None elif port_name == 'division_rate': return float(self.division_rate) elif port_name == 'time_to_collapse': if np.isinf(self.time_to_collapse): return 9999.0 return float(self.time_to_collapse) elif port_name == 'evolved_field': return self.psi return None def get_display_image(self): h, w = self.size, self.size # Left: Stability map if self.stability_map is not None: stab_vis = (self.stability_map * 255).astype(np.uint8) stab_color = cv2.applyColorMap(stab_vis, cv2.COLORMAP_VIRIDIS) else: stab_color = np.zeros((h, w, 3), dtype=np.uint8) # Right: Health history plot plot = np.zeros((h, w, 3), dtype=np.uint8) if len(self.health_history) > 1: n = len(self.health_history) # Health line (green when high, red when low) for i in range(n - 1): x1 = int(i * w / n) x2 = int((i + 1) * w / n) y1 = int((1 - self.health_history[i]) * (h - 20)) + 10 y2 = int((1 - self.health_history[i + 1]) * (h - 20)) + 10 # Color based on health value health_val = self.health_history[i] color = (0, int(255 * health_val), int(255 * (1 - health_val))) cv2.line(plot, (x1, y1), (x2, y2), color, 2) # Health indicator cv2.putText(plot, f"Health: {self.attractor_health:.2f}", (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 255, 255), 1) ttc_str = f"{self.time_to_collapse:.0f}" if not np.isinf(self.time_to_collapse) else "INF" cv2.putText(plot, f"TTC: {ttc_str}", (5, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 200, 100), 1) cv2.putText(plot, f"Div: {self.division_rate:+.4f}", (5, h - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (200, 200, 200), 1) full = np.hstack((stab_color, plot)) cv2.putText(full, "Stability Map", (5, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) cv2.putText(full, "Health Dynamics", (w + 5, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) return QtGui.QImage(full.data, w*2, h, w*2*3, QtGui.QImage.Format.Format_BGR888) def get_config_options(self): return [ ("Dilution Rate", "dilution_rate", self.dilution_rate, None), ("Division Strength", "division_strength", self.division_strength, None), ] class AddressLearnerNode(BaseNode): """ Learns optimal address via gradient descent. Implements the W-matrix training from IHT-AI: - Objective: maximize coherence under decoherence - Method: gradient descent on address weights Finds the protected mode combinations where attractors survive. """ NODE_CATEGORY = "Intelligence" NODE_TITLE = "Address Learner (W-Matrix)" NODE_COLOR = QtGui.QColor(200, 100, 200) # Purple def __init__(self): super().__init__() self.inputs = { 'complex_spectrum': 'complex_spectrum', # Field to learn from 'decoherence_map': 'image', # γ(k) landscape 'target_coherence': 'signal', # Target coherence level 'learning_rate': 'signal' } self.outputs = { 'learned_address': 'image', # The learned W mask 'coherence': 'signal', # Current coherence 'loss': 'signal', # Training loss 'projected_field': 'complex_spectrum' # Field through learned address } self.size = 128 center = self.size // 2 # The learnable address W (sigmoid of weights) # Initialize with low-frequency bias y, x = np.ogrid[:self.size, :self.size] r = np.sqrt((x - center)**2 + (y - center)**2).astype(np.float32) # Logits (pre-sigmoid weights) self.W_logits = 2.0 - 0.05 * r # Bias toward center # Decoherence landscape self.gamma = np.clip(r / center, 0, 0.95).astype(np.float32) # Training state self.coherence = 0.0 self.loss = 1.0 self.loss_history = [] self.coherence_history = [] # Parameters self.learning_rate = 0.01 self.target_coherence = 0.9 # Internal state self.psi = None self.W = None def sigmoid(self, x): return 1.0 / (1.0 + np.exp(-np.clip(x, -20, 20))) def compute_coherence(self, psi_projected): """Coherence = how phase-aligned the projected field is""" if psi_projected is None or np.sum(np.abs(psi_projected)) < 1e-9: return 0.0 # Coherence = |mean(psi)| / mean(|psi|) # = 1 if all phases aligned, 0 if random phases mean_psi = np.mean(psi_projected) mean_abs = np.mean(np.abs(psi_projected)) if mean_abs < 1e-9: return 0.0 return np.abs(mean_psi) / mean_abs def compute_gradient(self, psi_k, W): """ Compute gradient of coherence w.r.t. W logits Uses finite differences for simplicity """ eps = 0.01 grad = np.zeros_like(self.W_logits) # Sample a subset of points for efficiency sample_size = 100 indices = np.random.choice(self.size * self.size, sample_size, replace=False) for idx in indices: i, j = idx // self.size, idx % self.size # Perturb up self.W_logits[i, j] += eps W_up = self.sigmoid(self.W_logits) psi_up = psi_k * fftshift(W_up) coh_up = self.compute_coherence(ifft2(ifftshift(psi_up))) # Perturb down self.W_logits[i, j] -= 2 * eps W_down = self.sigmoid(self.W_logits) psi_down = psi_k * fftshift(W_down) coh_down = self.compute_coherence(ifft2(ifftshift(psi_down))) # Restore self.W_logits[i, j] += eps # Gradient grad[i, j] = (coh_up - coh_down) / (2 * eps) return grad def step(self): # Get inputs psi = self.get_blended_input('complex_spectrum', 'first') gamma = self.get_blended_input('decoherence_map', 'first') target = self.get_blended_input('target_coherence', 'sum') lr = self.get_blended_input('learning_rate', 'sum') if target is not None: self.target_coherence = np.clip(float(target), 0.1, 1.0) if lr is not None: self.learning_rate = np.clip(float(lr), 0.001, 0.1) # Update decoherence map if gamma is not None: if gamma.ndim == 3: gamma = np.mean(gamma, axis=2) if gamma.shape != (self.size, self.size): gamma = cv2.resize(gamma.astype(np.float32), (self.size, self.size)) self.gamma = gamma.astype(np.float32) / (np.max(gamma) + 1e-9) if psi is None or psi.shape != (self.size, self.size): return self.psi = psi.astype(np.complex64) # Current address (sigmoid of logits) self.W = self.sigmoid(self.W_logits) # Apply decoherence penalty to address # Modes with high γ should be suppressed protection_penalty = 1.0 - self.gamma effective_W = self.W * protection_penalty # Project field through address psi_k = fftshift(fft2(self.psi)) psi_projected_k = psi_k * fftshift(effective_W) psi_projected = ifft2(ifftshift(psi_projected_k)) # Compute coherence self.coherence = self.compute_coherence(psi_projected) # Compute loss (want to maximize coherence toward target) self.loss = max(0, self.target_coherence - self.coherence) # Store history self.loss_history.append(self.loss) self.coherence_history.append(self.coherence) while len(self.loss_history) > 200: self.loss_history.pop(0) self.coherence_history.pop(0) # Gradient update (every few steps for efficiency) if len(self.loss_history) % 5 == 0 and self.loss > 0.01: grad = self.compute_gradient(psi_k, self.W) # Also add gradient toward protected regions protection_grad = protection_penalty - 0.5 # Combined gradient total_grad = grad + 0.1 * protection_grad # Update self.W_logits += self.learning_rate * total_grad # Regularization: slight decay toward zero self.W_logits *= 0.999 def get_output(self, port_name): if port_name == 'learned_address': if self.W is not None: return (fftshift(self.W) * 255).astype(np.uint8) return None elif port_name == 'coherence': return float(self.coherence) elif port_name == 'loss': return float(self.loss) elif port_name == 'projected_field': if self.psi is not None and self.W is not None: psi_k = fftshift(fft2(self.psi)) effective_W = self.W * (1.0 - self.gamma) psi_projected_k = psi_k * fftshift(effective_W) return ifftshift(psi_projected_k) return None return None def get_display_image(self): h, w = self.size, self.size # Left: Learned address W if self.W is not None: W_shifted = fftshift(self.W) W_vis = (W_shifted * 255).astype(np.uint8) W_color = cv2.applyColorMap(W_vis, cv2.COLORMAP_PLASMA) else: W_color = np.zeros((h, w, 3), dtype=np.uint8) # Right: Training plot plot = np.zeros((h, w, 3), dtype=np.uint8) if len(self.coherence_history) > 1: n = len(self.coherence_history) # Coherence (green) for i in range(n - 1): x1 = int(i * w / n) x2 = int((i + 1) * w / n) y1 = int((1 - self.coherence_history[i]) * (h - 20)) + 10 y2 = int((1 - self.coherence_history[i + 1]) * (h - 20)) + 10 cv2.line(plot, (x1, y1), (x2, y2), (0, 255, 0), 1) # Loss (red) max_loss = max(self.loss_history) + 1e-9 for i in range(n - 1): x1 = int(i * w / n) x2 = int((i + 1) * w / n) y1 = int((1 - self.loss_history[i] / max_loss) * (h - 20)) + 10 y2 = int((1 - self.loss_history[i + 1] / max_loss) * (h - 20)) + 10 cv2.line(plot, (x1, y1), (x2, y2), (0, 0, 255), 1) # Target line target_y = int((1 - self.target_coherence) * (h - 20)) + 10 cv2.line(plot, (0, target_y), (w, target_y), (255, 255, 0), 1) cv2.putText(plot, f"Coh: {self.coherence:.3f}", (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 255, 0), 1) cv2.putText(plot, f"Loss: {self.loss:.3f}", (5, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 0, 255), 1) cv2.putText(plot, f"LR: {self.learning_rate:.4f}", (5, h - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (200, 200, 200), 1) full = np.hstack((W_color, plot)) cv2.putText(full, "Learned W", (5, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) cv2.putText(full, "Training", (w + 5, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) return QtGui.QImage(full.data, w*2, h, w*2*3, QtGui.QImage.Format.Format_BGR888) def get_config_options(self): return [ ("Learning Rate", "learning_rate", self.learning_rate, None), ("Target Coherence", "target_coherence", self.target_coherence, None), ]