Spaces:
Running
Running
| """ | |
| Crystal Chip Node | |
| ================== | |
| Loads a frozen crystal (grown by EEG Crystal Maker) and probes it like a chip. | |
| The crystal's electrode positions become I/O pins: | |
| - FRONTAL pins (FP1, FP2, F3, F4, F7, F8, FZ) β Input region | |
| - POSTERIOR pins (O1, O2, OZ, P3, P4, P7, P8, PZ) β Output region | |
| - CENTRAL pins (C3, C4, CZ, T7, T8) β Internal processing | |
| Input modes: | |
| - image_in β Projects onto input pins spatially | |
| - latent_in β 16-dim vector distributed across input pins | |
| - signal_in β Direct signal injection at all input pins | |
| - Individual pin signals β Fine control | |
| Output modes: | |
| - image_out β Activity pattern at output pins | |
| - latent_out β 16-dim compressed output state | |
| - signal_out β Mean activity at output pins | |
| - Individual pin signals β Direct readings | |
| The crystal processes inputs through its learned geometry. | |
| What comes out depends on what it learned during gestation. | |
| Author: Built for Antti's consciousness crystallography research | |
| """ | |
| import os | |
| import numpy as np | |
| import cv2 | |
| # --- HOST IMPORT BLOCK --- | |
| import __main__ | |
| try: | |
| BaseNode = __main__.BaseNode | |
| QtGui = __main__.QtGui | |
| except Exception: | |
| from PyQt6 import QtGui | |
| class BaseNode: | |
| def __init__(self): | |
| self.inputs = {} | |
| self.outputs = {} | |
| class CrystalChipNode(BaseNode): | |
| """ | |
| A frozen crystal used as a computational chip. | |
| Input at frontal pins β process through crystal geometry β output at posterior pins. | |
| """ | |
| NODE_NAME = "Crystal Chip" | |
| NODE_TITLE = "Crystal Chip" | |
| NODE_CATEGORY = "Processing" | |
| NODE_COLOR = QtGui.QColor(100, 200, 180) if QtGui else None | |
| # Pin categorization by neuroanatomical region | |
| INPUT_PINS = ['FP1', 'FP2', 'F3', 'F4', 'F7', 'F8', 'FZ'] # Frontal = input | |
| OUTPUT_PINS = ['O1', 'O2', 'OZ', 'P3', 'P4', 'P7', 'P8', 'PZ'] # Posterior = output | |
| INTERNAL_PINS = ['C3', 'C4', 'CZ', 'T7', 'T8'] # Central = internal processing | |
| def __init__(self): | |
| super().__init__() | |
| # === INPUTS === | |
| self.inputs = { | |
| # Multi-modal inputs | |
| "image_in": "image", # Visual input β projected to input pins | |
| "latent_in": "spectrum", # 16-dim latent β distributed to input pins | |
| "signal_in": "signal", # Raw signal β all input pins equally | |
| # Modulation | |
| "gain": "signal", # Input amplification | |
| "coupling": "signal", # Neighbor coupling strength | |
| # Control | |
| "reset": "signal", # Reset neural state | |
| } | |
| # === OUTPUTS === | |
| self.outputs = { | |
| # Multi-modal outputs | |
| "image_out": "image", # Activity at output pins as image | |
| "latent_out": "spectrum", # 16-dim compressed output state | |
| "signal_out": "signal", # Mean activity at output pins | |
| # Visualization | |
| "chip_view": "image", # Main display | |
| "activity_view": "image", # Full activity pattern | |
| # Analysis | |
| "resonance": "signal", # How much the crystal is resonating | |
| "energy": "signal", # Total activity energy | |
| # EEG-like frequency band outputs | |
| "delta": "signal", # 0.5-4 Hz - slow oscillations | |
| "theta": "signal", # 4-8 Hz - memory, navigation | |
| "alpha": "signal", # 8-13 Hz - relaxed awareness | |
| "beta": "signal", # 13-30 Hz - active thinking | |
| "gamma": "signal", # 30-100 Hz - binding, cognition | |
| "lfp": "signal", # Local field potential (raw mean) | |
| } | |
| # === CRYSTAL STATE === | |
| self.crystal_path = "" | |
| self._last_path = "" | |
| self.is_loaded = False | |
| self.status_msg = "No crystal loaded" | |
| # Crystal data | |
| self.grid_size = 64 | |
| self.weights_up = None | |
| self.weights_down = None | |
| self.weights_left = None | |
| self.weights_right = None | |
| # Pin mapping | |
| self.pin_coords = [] # [(row, col), ...] from crystal file | |
| self.pin_names = [] # ['FP1', 'F3', ...] from crystal file | |
| self.input_pin_indices = [] # Indices into pin_coords for input pins | |
| self.output_pin_indices = [] # Indices into pin_coords for output pins | |
| self.internal_pin_indices = [] | |
| # Crystal metadata | |
| self.learning_steps = 0 | |
| self.total_spikes = 0 | |
| self.edf_source = "" | |
| self.created = "" | |
| # === NEURAL STATE === | |
| # Izhikevich parameters | |
| self.a = 0.02 | |
| self.b = 0.2 | |
| self.c = -65.0 | |
| self.d = 8.0 | |
| self.dt = 0.5 | |
| # State arrays (initialized when crystal loads) | |
| self.v = None | |
| self.u = None | |
| # Processing parameters | |
| self.base_coupling = 5.0 | |
| self.input_gain = 50.0 | |
| self.spread_radius = 3 # How far input spreads from pins | |
| # Statistics | |
| self.step_count = 0 | |
| self.current_resonance = 0.0 | |
| self.current_energy = 0.0 | |
| # === EEG-LIKE OUTPUT === | |
| # History buffer for frequency analysis | |
| self.lfp_history_size = 256 # ~2.5 seconds at 100Hz | |
| self.lfp_history = np.zeros(self.lfp_history_size, dtype=np.float32) | |
| self.lfp_idx = 0 | |
| # Frequency band powers | |
| self.band_powers = { | |
| "delta": 0.0, # 0.5-4 Hz | |
| "theta": 0.0, # 4-8 Hz | |
| "alpha": 0.0, # 8-13 Hz | |
| "beta": 0.0, # 13-30 Hz | |
| "gamma": 0.0, # 30-100 Hz | |
| } | |
| self.current_lfp = 0.0 | |
| # Assume ~100 Hz sample rate for the simulation | |
| self.sample_rate = 100.0 | |
| # Output cache | |
| self._output_values = { | |
| "signal_out": 0.0, | |
| "resonance": 0.0, | |
| "energy": 0.0, | |
| "delta": 0.0, | |
| "theta": 0.0, | |
| "alpha": 0.0, | |
| "beta": 0.0, | |
| "gamma": 0.0, | |
| "lfp": 0.0, | |
| } | |
| self._latent_out = np.zeros(16, dtype=np.float32) | |
| # Display | |
| self.display_image = None | |
| self._update_display() | |
| def get_config_options(self): | |
| return [ | |
| ("Crystal File (.npz)", "crystal_path", self.crystal_path, None), | |
| ("Base Coupling", "base_coupling", self.base_coupling, None), | |
| ("Input Gain", "input_gain", self.input_gain, None), | |
| ("Spread Radius", "spread_radius", self.spread_radius, None), | |
| ] | |
| def set_config_options(self, options): | |
| if isinstance(options, dict): | |
| for key, value in options.items(): | |
| if hasattr(self, key): | |
| setattr(self, key, value) | |
| def _maybe_reload(self): | |
| """Check if we need to load a new crystal file.""" | |
| path = str(self.crystal_path or "").strip().strip('"').strip("'") | |
| path = path.replace("\\", "/") | |
| if path != self._last_path: | |
| self._last_path = path | |
| self.crystal_path = path | |
| if path: | |
| self._load_crystal() | |
| else: | |
| self.is_loaded = False | |
| self.status_msg = "No crystal loaded" | |
| def _load_crystal(self): | |
| """Load a frozen crystal from .npz file.""" | |
| if not os.path.exists(self.crystal_path): | |
| self.status_msg = "File not found" | |
| self.is_loaded = False | |
| return | |
| try: | |
| data = np.load(self.crystal_path, allow_pickle=True) | |
| # Load weights | |
| self.weights_up = data['weights_up'].astype(np.float32) | |
| self.weights_down = data['weights_down'].astype(np.float32) | |
| self.weights_left = data['weights_left'].astype(np.float32) | |
| self.weights_right = data['weights_right'].astype(np.float32) | |
| self.grid_size = self.weights_up.shape[0] | |
| # Load pin coordinates | |
| if 'pin_coords' in data: | |
| self.pin_coords = [tuple(p) for p in data['pin_coords']] | |
| else: | |
| self.pin_coords = [] | |
| if 'pin_names' in data: | |
| self.pin_names = list(data['pin_names']) | |
| else: | |
| self.pin_names = [] | |
| # Load metadata | |
| self.learning_steps = int(data.get('learning_steps', 0)) | |
| self.total_spikes = int(data.get('total_spikes', 0)) | |
| self.edf_source = str(data.get('edf_source', 'unknown')) | |
| self.created = str(data.get('created', 'unknown')) | |
| # Initialize neural state | |
| n = self.grid_size | |
| self.v = np.ones((n, n), dtype=np.float32) * self.c | |
| self.u = self.v * self.b | |
| # Categorize pins by region | |
| self._categorize_pins() | |
| fname = os.path.basename(self.crystal_path) | |
| self.status_msg = f"Loaded {fname} | {n}x{n} | {len(self.pin_coords)} pins" | |
| self.is_loaded = True | |
| print(f"[CrystalChip] Loaded crystal: {n}x{n}, {len(self.pin_coords)} pins") | |
| print(f" Input pins: {len(self.input_pin_indices)}") | |
| print(f" Output pins: {len(self.output_pin_indices)}") | |
| print(f" Internal pins: {len(self.internal_pin_indices)}") | |
| print(f" Learned from: {self.edf_source}") | |
| print(f" Training steps: {self.learning_steps}") | |
| except Exception as e: | |
| self.status_msg = f"Load error: {str(e)[:30]}" | |
| self.is_loaded = False | |
| print(f"[CrystalChip] Error loading crystal: {e}") | |
| def _categorize_pins(self): | |
| """Sort pins into input/output/internal categories based on position.""" | |
| self.input_pin_indices = [] | |
| self.output_pin_indices = [] | |
| self.internal_pin_indices = [] | |
| # First try by name if available | |
| if self.pin_names and len(self.pin_names) == len(self.pin_coords): | |
| for i, name in enumerate(self.pin_names): | |
| name_upper = str(name).upper().strip() | |
| if any(inp in name_upper for inp in self.INPUT_PINS): | |
| self.input_pin_indices.append(i) | |
| elif any(out in name_upper for out in self.OUTPUT_PINS): | |
| self.output_pin_indices.append(i) | |
| elif any(internal in name_upper for internal in self.INTERNAL_PINS): | |
| self.internal_pin_indices.append(i) | |
| else: | |
| self.internal_pin_indices.append(i) | |
| # If no categorization worked (no names or names didn't match), use position | |
| if not self.input_pin_indices and not self.output_pin_indices: | |
| # Use neuroanatomical position: frontal (top) = input, occipital (bottom) = output | |
| for i, (r, c) in enumerate(self.pin_coords): | |
| # Normalize position to 0-1 range | |
| r_norm = r / self.grid_size | |
| if r_norm < 0.35: # Top 35% = frontal = input | |
| self.input_pin_indices.append(i) | |
| elif r_norm > 0.65: # Bottom 35% = occipital = output | |
| self.output_pin_indices.append(i) | |
| else: # Middle = central = internal | |
| self.internal_pin_indices.append(i) | |
| # Ensure we have at least some inputs and outputs | |
| if not self.input_pin_indices and self.pin_coords: | |
| # Take first third as inputs | |
| n = len(self.pin_coords) | |
| self.input_pin_indices = list(range(n // 3)) | |
| if not self.output_pin_indices and self.pin_coords: | |
| # Take last third as outputs | |
| n = len(self.pin_coords) | |
| self.output_pin_indices = list(range(2 * n // 3, n)) | |
| def _read_input(self, name, default=None): | |
| """Read an input value.""" | |
| fn = getattr(self, "get_blended_input", None) | |
| if callable(fn): | |
| try: | |
| val = fn(name, "mean") | |
| if val is None: | |
| return default | |
| return val | |
| except: | |
| return default | |
| return default | |
| def _read_image_input(self, name): | |
| """Read an image input, converting QImage to numpy if needed.""" | |
| fn = getattr(self, "get_blended_input", None) | |
| if callable(fn): | |
| try: | |
| val = fn(name, "first") | |
| if val is None: | |
| return None | |
| # If it's already a numpy array, return it | |
| if hasattr(val, 'shape') and hasattr(val, 'dtype'): | |
| return val | |
| # If it's a QImage, convert to numpy | |
| if hasattr(val, 'width') and hasattr(val, 'height') and hasattr(val, 'bits'): | |
| # QImage conversion | |
| width = val.width() | |
| height = val.height() | |
| # Get bytes per line for proper array reshaping | |
| bytes_per_line = val.bytesPerLine() | |
| # Get pointer to image data | |
| ptr = val.bits() | |
| if ptr is None: | |
| return None | |
| # Convert to numpy - handle different formats | |
| try: | |
| ptr.setsize(height * bytes_per_line) | |
| arr = np.array(ptr).reshape(height, bytes_per_line) | |
| # Determine channels based on format | |
| fmt = val.format() | |
| if fmt == 4: # Format_RGB32 or Format_ARGB32 | |
| arr = arr[:, :width*4].reshape(height, width, 4) | |
| arr = arr[:, :, :3] # Drop alpha, keep RGB | |
| elif fmt == 13: # Format_RGB888 | |
| arr = arr[:, :width*3].reshape(height, width, 3) | |
| elif fmt == 24: # Format_Grayscale8 | |
| arr = arr[:, :width] | |
| else: | |
| # Try to handle as RGB | |
| if bytes_per_line >= width * 3: | |
| arr = arr[:, :width*3].reshape(height, width, 3) | |
| else: | |
| arr = arr[:, :width] | |
| return arr.astype(np.float32) | |
| except Exception as e: | |
| print(f"[CrystalChip] QImage conversion error: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"[CrystalChip] Image read error: {e}") | |
| pass | |
| return None | |
| def _read_latent_input(self, name): | |
| """Read a latent/spectrum input.""" | |
| fn = getattr(self, "get_blended_input", None) | |
| if callable(fn): | |
| try: | |
| val = fn(name, "first") | |
| if val is not None and isinstance(val, np.ndarray): | |
| return val | |
| except: | |
| pass | |
| return None | |
| def step(self): | |
| self._maybe_reload() | |
| if not self.is_loaded: | |
| self._update_display() | |
| return | |
| self.step_count += 1 | |
| # Read modulation inputs | |
| gain_mod = self._read_input("gain", 1.0) | |
| coupling_mod = self._read_input("coupling", 1.0) | |
| reset = self._read_input("reset", 0.0) | |
| if reset and reset > 0.5: | |
| self._reset_state() | |
| return | |
| effective_gain = self.input_gain * float(gain_mod) | |
| effective_coupling = self.base_coupling * float(coupling_mod) | |
| # === BUILD INPUT CURRENT === | |
| n = self.grid_size | |
| I = np.zeros((n, n), dtype=np.float32) | |
| # 1. Signal input β all input pins equally | |
| signal_in = self._read_input("signal_in", 0.0) | |
| if signal_in and signal_in != 0.0: | |
| self._inject_at_pins(I, self.input_pin_indices, float(signal_in) * effective_gain) | |
| # 2. Image input β spatially mapped to input pins | |
| image_in = self._read_image_input("image_in") | |
| if image_in is not None: | |
| self._inject_image(I, image_in, effective_gain) | |
| # 3. Latent input β distributed across input pins | |
| latent_in = self._read_latent_input("latent_in") | |
| if latent_in is not None: | |
| self._inject_latent(I, latent_in, effective_gain) | |
| # === NEURAL DYNAMICS === | |
| v = self.v.copy() | |
| u = self.u.copy() | |
| # Get neighbor voltages | |
| v_up = np.roll(v, -1, axis=0) | |
| v_down = np.roll(v, 1, axis=0) | |
| v_left = np.roll(v, -1, axis=1) | |
| v_right = np.roll(v, 1, axis=1) | |
| # Weighted coupling through crystal geometry | |
| neighbor_influence = ( | |
| self.weights_up * v_up + | |
| self.weights_down * v_down + | |
| self.weights_left * v_left + | |
| self.weights_right * v_right | |
| ) | |
| total_weight = (self.weights_up + self.weights_down + | |
| self.weights_left + self.weights_right) | |
| neighbor_avg = neighbor_influence / (total_weight + 1e-6) | |
| I_coupling = effective_coupling * (neighbor_avg - v) | |
| # Clamp to prevent overflow | |
| I = np.clip(I, -100, 100) | |
| I_coupling = np.clip(I_coupling, -50, 50) | |
| # Izhikevich dynamics | |
| dv = (0.04 * v * v + 5.0 * v + 140.0 - u + I + I_coupling) * self.dt | |
| du = self.a * (self.b * v - u) * self.dt | |
| v = v + dv | |
| u = u + du | |
| # Clamp to prevent overflow | |
| v = np.clip(v, -100, 50) | |
| u = np.clip(u, -50, 50) | |
| # Detect spikes | |
| spikes = v >= 30.0 | |
| v[spikes] = self.c | |
| u[spikes] += self.d | |
| # Clean up NaN | |
| v = np.nan_to_num(v, nan=self.c, posinf=50.0, neginf=-100.0) | |
| u = np.nan_to_num(u, nan=0.0, posinf=50.0, neginf=-50.0) | |
| self.v = v | |
| self.u = u | |
| # === COMPUTE OUTPUTS === | |
| self._compute_outputs() | |
| self._update_display() | |
| def _inject_at_pins(self, I, pin_indices, value): | |
| """Inject current at specified pins with spatial spread.""" | |
| if not pin_indices: | |
| return | |
| r = self.spread_radius | |
| for idx in pin_indices: | |
| if idx < len(self.pin_coords): | |
| row, col = self.pin_coords[idx] | |
| for dr in range(-r, r + 1): | |
| for dc in range(-r, r + 1): | |
| nr, nc = row + dr, col + dc | |
| if 0 <= nr < self.grid_size and 0 <= nc < self.grid_size: | |
| dist = np.sqrt(dr * dr + dc * dc) | |
| weight = np.exp(-dist / max(r, 1)) | |
| I[nr, nc] += value * weight | |
| def _inject_image(self, I, image, gain): | |
| """Project image onto input pins based on their spatial arrangement.""" | |
| if len(self.input_pin_indices) == 0: | |
| return | |
| # Convert to grayscale if needed | |
| if len(image.shape) == 3: | |
| gray = np.mean(image, axis=2) | |
| else: | |
| gray = image.astype(np.float32) | |
| # Normalize | |
| gray = (gray - np.min(gray)) / (np.max(gray) - np.min(gray) + 1e-6) | |
| # For each input pin, sample the image at its relative position | |
| for idx in self.input_pin_indices: | |
| if idx < len(self.pin_coords): | |
| row, col = self.pin_coords[idx] | |
| # Map pin position to image coordinates | |
| img_row = int((row / self.grid_size) * gray.shape[0]) | |
| img_col = int((col / self.grid_size) * gray.shape[1]) | |
| img_row = np.clip(img_row, 0, gray.shape[0] - 1) | |
| img_col = np.clip(img_col, 0, gray.shape[1] - 1) | |
| value = gray[img_row, img_col] * gain | |
| self._inject_at_pins(I, [idx], value) | |
| def _inject_latent(self, I, latent, gain): | |
| """Distribute latent vector across input pins.""" | |
| if len(self.input_pin_indices) == 0: | |
| return | |
| # Ensure latent is 1D | |
| if latent.ndim > 1: | |
| latent = latent.flatten() | |
| # Map latent dimensions to input pins (cycling if needed) | |
| for i, idx in enumerate(self.input_pin_indices): | |
| latent_idx = i % len(latent) | |
| value = float(latent[latent_idx]) * gain | |
| self._inject_at_pins(I, [idx], value) | |
| def _compute_outputs(self): | |
| """Compute output signals from output pin activity.""" | |
| # Read activity at output pins | |
| output_activities = [] | |
| for idx in self.output_pin_indices: | |
| if idx < len(self.pin_coords): | |
| row, col = self.pin_coords[idx] | |
| if 0 <= row < self.grid_size and 0 <= col < self.grid_size: | |
| output_activities.append(self.v[row, col]) | |
| if output_activities: | |
| # Signal out = mean output activity | |
| self._output_values["signal_out"] = float(np.mean(output_activities)) | |
| # Latent out = first 16 output activities (or padded) | |
| latent = np.zeros(16, dtype=np.float32) | |
| for i, act in enumerate(output_activities[:16]): | |
| latent[i] = act | |
| self._latent_out = latent | |
| else: | |
| self._output_values["signal_out"] = float(np.mean(self.v)) | |
| self._latent_out = np.zeros(16, dtype=np.float32) | |
| # Resonance = variance of activity (high variance = resonating) | |
| self.current_resonance = float(np.var(self.v)) | |
| self._output_values["resonance"] = self.current_resonance | |
| # Energy = sum of squared activity | |
| self.current_energy = float(np.sum(self.v ** 2)) | |
| self._output_values["energy"] = self.current_energy | |
| # === EEG-LIKE FREQUENCY BAND EXTRACTION === | |
| # Compute LFP (local field potential) as mean activity | |
| self.current_lfp = float(np.mean(self.v)) | |
| # Add to history buffer (circular) | |
| self.lfp_history[self.lfp_idx] = self.current_lfp | |
| self.lfp_idx = (self.lfp_idx + 1) % self.lfp_history_size | |
| # Extract frequency bands using FFT | |
| self._extract_frequency_bands() | |
| # Update output values | |
| self._output_values["lfp"] = self.current_lfp | |
| self._output_values["delta"] = self.band_powers["delta"] | |
| self._output_values["theta"] = self.band_powers["theta"] | |
| self._output_values["alpha"] = self.band_powers["alpha"] | |
| self._output_values["beta"] = self.band_powers["beta"] | |
| self._output_values["gamma"] = self.band_powers["gamma"] | |
| def _extract_frequency_bands(self): | |
| """Extract EEG-like frequency bands from LFP history using FFT.""" | |
| # Reorder history to be chronological | |
| history = np.roll(self.lfp_history, -self.lfp_idx) | |
| # Remove DC offset | |
| history = history - np.mean(history) | |
| # Apply window to reduce spectral leakage | |
| window = np.hanning(len(history)) | |
| windowed = history * window | |
| # Compute FFT | |
| fft = np.fft.rfft(windowed) | |
| power = np.abs(fft) ** 2 | |
| freqs = np.fft.rfftfreq(len(history), d=1.0/self.sample_rate) | |
| # Extract band powers | |
| # Delta: 0.5-4 Hz | |
| delta_mask = (freqs >= 0.5) & (freqs < 4) | |
| self.band_powers["delta"] = float(np.sum(power[delta_mask])) if np.any(delta_mask) else 0.0 | |
| # Theta: 4-8 Hz | |
| theta_mask = (freqs >= 4) & (freqs < 8) | |
| self.band_powers["theta"] = float(np.sum(power[theta_mask])) if np.any(theta_mask) else 0.0 | |
| # Alpha: 8-13 Hz | |
| alpha_mask = (freqs >= 8) & (freqs < 13) | |
| self.band_powers["alpha"] = float(np.sum(power[alpha_mask])) if np.any(alpha_mask) else 0.0 | |
| # Beta: 13-30 Hz | |
| beta_mask = (freqs >= 13) & (freqs < 30) | |
| self.band_powers["beta"] = float(np.sum(power[beta_mask])) if np.any(beta_mask) else 0.0 | |
| # Gamma: 30-50 Hz (limited by Nyquist at 100Hz sample rate) | |
| gamma_mask = (freqs >= 30) & (freqs < 50) | |
| self.band_powers["gamma"] = float(np.sum(power[gamma_mask])) if np.any(gamma_mask) else 0.0 | |
| # Normalize to reasonable range (log scale for display) | |
| for band in self.band_powers: | |
| val = self.band_powers[band] | |
| if val > 0: | |
| # Log scale, shifted to be mostly positive | |
| self.band_powers[band] = np.log10(val + 1) * 10 | |
| else: | |
| self.band_powers[band] = 0.0 | |
| def _reset_state(self): | |
| """Reset neural state to resting.""" | |
| if self.is_loaded: | |
| n = self.grid_size | |
| self.v = np.ones((n, n), dtype=np.float32) * self.c | |
| self.u = self.v * self.b | |
| def get_output(self, port_name): | |
| if port_name == "chip_view": | |
| return self.display_image | |
| elif port_name == "activity_view": | |
| return self._render_activity() | |
| elif port_name == "image_out": | |
| return self._render_output_image() | |
| elif port_name == "latent_out": | |
| return self._latent_out | |
| elif port_name in self._output_values: | |
| return self._output_values.get(port_name, 0.0) | |
| return None | |
| def _render_activity(self): | |
| """Render full activity pattern.""" | |
| if not self.is_loaded: | |
| return np.zeros((256, 256, 3), dtype=np.uint8) | |
| n = self.grid_size | |
| disp = np.clip(self.v, -90.0, 40.0) | |
| norm = ((disp + 90.0) / 130.0 * 255.0).astype(np.uint8) | |
| heat = cv2.applyColorMap(norm, cv2.COLORMAP_INFERNO) | |
| heat = cv2.resize(heat, (256, 256), interpolation=cv2.INTER_NEAREST) | |
| # Draw pins | |
| scale = 256 / n | |
| for i, (r, c) in enumerate(self.pin_coords): | |
| center = (int(c * scale), int(r * scale)) | |
| if i in self.input_pin_indices: | |
| color = (0, 255, 0) # Green = input | |
| elif i in self.output_pin_indices: | |
| color = (0, 0, 255) # Red = output | |
| else: | |
| color = (255, 255, 0) # Yellow = internal | |
| cv2.circle(heat, center, 4, color, -1) | |
| return heat | |
| def _render_output_image(self): | |
| """Render output pin activity as a small image.""" | |
| # Create image from output pin activities | |
| n_out = len(self.output_pin_indices) | |
| if n_out == 0: | |
| return np.zeros((8, 8, 3), dtype=np.uint8) | |
| # Find grid size that fits output pins | |
| size = int(np.ceil(np.sqrt(n_out))) | |
| img = np.zeros((size, size, 3), dtype=np.uint8) | |
| for i, idx in enumerate(self.output_pin_indices): | |
| if idx < len(self.pin_coords): | |
| row, col = self.pin_coords[idx] | |
| if 0 <= row < self.grid_size and 0 <= col < self.grid_size: | |
| activity = self.v[row, col] | |
| # Normalize to 0-255 | |
| val = int(np.clip((activity + 90) / 130 * 255, 0, 255)) | |
| img_row = i // size | |
| img_col = i % size | |
| if img_row < size and img_col < size: | |
| img[img_row, img_col] = [val, val, val] | |
| # Scale up | |
| img = cv2.resize(img, (64, 64), interpolation=cv2.INTER_NEAREST) | |
| return img | |
| def _update_display(self): | |
| """Create main display.""" | |
| w, h = 512, 400 | |
| img = np.zeros((h, w, 3), dtype=np.uint8) | |
| # Title | |
| cv2.putText(img, "CRYSTAL CHIP", (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (100, 200, 180), 2) | |
| if not self.is_loaded: | |
| cv2.putText(img, self.status_msg, (10, 70), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (150, 150, 150), 1) | |
| cv2.putText(img, "Load a crystal .npz file", (10, 100), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (100, 100, 100), 1) | |
| else: | |
| # Status line | |
| cv2.putText(img, self.status_msg, (10, 55), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (200, 200, 200), 1) | |
| # Activity view | |
| activity = self._render_activity() | |
| activity_small = cv2.resize(activity, (200, 200)) | |
| img[70:270, 10:210] = activity_small | |
| cv2.putText(img, "Activity", (10, 285), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| # Pin legend | |
| cv2.circle(img, (20, 305), 5, (0, 255, 0), -1) | |
| cv2.putText(img, f"Input ({len(self.input_pin_indices)})", (30, 310), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 255, 0), 1) | |
| cv2.circle(img, (120, 305), 5, (0, 0, 255), -1) | |
| cv2.putText(img, f"Output ({len(self.output_pin_indices)})", (130, 310), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1) | |
| # Crystal structure view | |
| crystal = self._render_crystal() | |
| crystal_small = cv2.resize(crystal, (200, 200)) | |
| img[70:270, 230:430] = crystal_small | |
| cv2.putText(img, "Crystal Structure", (230, 285), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| # Output preview - position it to fit within 512 width | |
| out_img = self._render_output_image() | |
| out_img_resized = cv2.resize(out_img, (70, 70)) | |
| img[70:140, 440:510] = out_img_resized | |
| cv2.putText(img, "Output", (445, 155), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 255, 255), 1) | |
| # Stats | |
| stats_y = 200 | |
| cv2.putText(img, f"Step: {self.step_count}", (440, stats_y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (150, 150, 150), 1) | |
| cv2.putText(img, f"Signal Out: {self._output_values['signal_out']:.1f}", (440, stats_y + 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (100, 255, 100), 1) | |
| cv2.putText(img, f"Resonance: {self.current_resonance:.1f}", (440, stats_y + 40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 200, 100), 1) | |
| cv2.putText(img, f"Energy: {self.current_energy:.0f}", (440, stats_y + 60), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (100, 200, 255), 1) | |
| # Crystal metadata | |
| cv2.putText(img, "Crystal Info:", (10, 330), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (180, 180, 180), 1) | |
| cv2.putText(img, f"Source: {os.path.basename(self.edf_source)}", (10, 350), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (150, 150, 150), 1) | |
| cv2.putText(img, f"Training: {self.learning_steps} steps", (10, 370), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (150, 150, 150), 1) | |
| cv2.putText(img, f"Spikes: {self.total_spikes:,}", (10, 390), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (150, 150, 150), 1) | |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| if QtGui: | |
| qimg = QtGui.QImage(img_rgb.data, w, h, w * 3, QtGui.QImage.Format.Format_RGB888).copy() | |
| self.display_image = qimg | |
| def _render_crystal(self): | |
| """Render the crystal weight structure.""" | |
| if not self.is_loaded: | |
| return np.zeros((256, 256, 3), dtype=np.uint8) | |
| n = self.grid_size | |
| # Combine weights into visualization | |
| horizontal = (self.weights_left + self.weights_right) / 2 | |
| vertical = (self.weights_up + self.weights_down) / 2 | |
| # Normalize | |
| w_min, w_max = 0.01, 2.0 | |
| h_norm = np.clip((horizontal - w_min) / (w_max - w_min), 0, 1) | |
| v_norm = np.clip((vertical - w_min) / (w_max - w_min), 0, 1) | |
| anisotropy = np.abs(h_norm - v_norm) | |
| img = np.zeros((n, n, 3), dtype=np.uint8) | |
| img[:, :, 0] = (h_norm * 255).astype(np.uint8) | |
| img[:, :, 1] = ((1 - anisotropy) * 255).astype(np.uint8) | |
| img[:, :, 2] = (v_norm * 255).astype(np.uint8) | |
| img = cv2.resize(img, (256, 256), interpolation=cv2.INTER_NEAREST) | |
| return img | |
| def get_display_image(self): | |
| return self.display_image | |
| # === STATE PERSISTENCE === | |
| def save_custom_state(self, folder_path, node_id): | |
| """Save current state.""" | |
| filename = f"crystal_chip_{node_id}.npz" | |
| filepath = os.path.join(folder_path, filename) | |
| np.savez(filepath, | |
| crystal_path=self.crystal_path, | |
| step_count=self.step_count) | |
| return filename | |
| def load_custom_state(self, filepath): | |
| """Load saved state.""" | |
| try: | |
| data = np.load(filepath, allow_pickle=True) | |
| self.crystal_path = str(data.get('crystal_path', '')) | |
| self.step_count = int(data.get('step_count', 0)) | |
| if self.crystal_path: | |
| self._last_path = "" # Force reload | |
| self._maybe_reload() | |
| except Exception as e: | |
| print(f"[CrystalChip] Error loading state: {e}") |