Spaces:
Running
Running
| """ | |
| Advanced Crystal Probe Node | |
| ============================ | |
| Comprehensive probing toolkit for EEG-trained crystal chips. | |
| Implements methods from 2025 computational neuroscience: | |
| 1. FINE GAMMA MAPPING (30-100 Hz, 0.1 Hz resolution) | |
| - High-resolution sweep around the 49.8 Hz resonance | |
| - Bode plot generation (amplitude + phase vs frequency) | |
| - Subharmonic detection (alpha-theta interactions) | |
| 2. IMPULSE RESPONSE ANALYSIS | |
| - Tuned impulse trains at resonant frequencies | |
| - Ring-down time measurement | |
| - Eigenmode extraction | |
| 3. THETA-BURST STIMULATION | |
| - 5 Hz bursts of gamma (50-100 Hz) | |
| - Mimics hippocampal plasticity protocols | |
| - Vibrational resonance probing | |
| 4. HIGH FREQUENCY OSCILLATIONS (80-500 Hz) | |
| - Safe chirp probes (capped at 1 MHz) | |
| - HFO/ripple detection | |
| - Stochastic resonance via noise injection | |
| 5. SPATIAL PATTERN INJECTION | |
| - Pin-specific activation patterns | |
| - Sequential replay patterns | |
| - Gradient probes for connectivity mapping | |
| Based on: CLS theory, holographic ensemble stimulation, | |
| vibrational resonance, HFO detection methods | |
| Author: Built for Antti's consciousness crystallography research | |
| """ | |
| import numpy as np | |
| import cv2 | |
| from collections import deque | |
| from scipy import signal as scipy_signal | |
| # --- HOST IMPORT BLOCK --- | |
| import __main__ | |
| try: | |
| BaseNode = __main__.BaseNode | |
| QtGui = __main__.QtGui | |
| except Exception: | |
| from PyQt6 import QtGui | |
| class BaseNode: | |
| def __init__(self): | |
| self.inputs = {} | |
| self.outputs = {} | |
| class AdvancedCrystalProbeNode(BaseNode): | |
| """ | |
| Advanced multi-modal probe for crystal chip analysis. | |
| Modes: | |
| 0: Fine Gamma Sweep (30-100 Hz, high resolution) | |
| 1: Impulse at Resonance (49.8 Hz tuned) | |
| 2: Theta-Burst Stimulation | |
| 3: HFO Chirp (80-500 Hz) | |
| 4: Noise + Resonance Hunt | |
| 5: Spatial Replay Patterns | |
| 6: Bi-Frequency Modulation | |
| 7: Custom Frequency Lock | |
| """ | |
| NODE_NAME = "Advanced Crystal Probe" | |
| NODE_CATEGORY = "Analysis" | |
| NODE_COLOR = QtGui.QColor(220, 80, 60) if QtGui else None | |
| # Maximum frequency to prevent overflow | |
| MAX_FREQ = 1e6 # 1 MHz cap | |
| def __init__(self): | |
| super().__init__() | |
| self.inputs = { | |
| 'crystal_response': 'signal', | |
| 'crystal_image': 'image', | |
| 'enable': 'signal', | |
| 'probe_mode': 'signal', | |
| 'target_freq': 'signal', # Lock to specific frequency | |
| 'modulation': 'signal', # External modulation | |
| 'spatial_pattern': 'image' # Custom spatial input | |
| } | |
| self.outputs = { | |
| 'probe_signal': 'signal', | |
| 'probe_image': 'image', | |
| 'spectrum_view': 'image', | |
| 'bode_view': 'image', # Amplitude + phase plot | |
| 'resonance_map': 'image', # Spatial resonance map | |
| 'peak_freq': 'signal', | |
| 'q_factor': 'signal', | |
| 'phase_shift': 'signal', | |
| 'ring_time': 'signal', # Impulse ring-down time | |
| 'coherence': 'signal' # Probe-response coherence | |
| } | |
| # === Mode Configuration === | |
| self.probe_mode = 0 | |
| self.step_count = 0 | |
| # === Fine Gamma Parameters (Mode 0) === | |
| self.gamma_start = 30.0 # Hz | |
| self.gamma_end = 100.0 # Hz | |
| self.gamma_resolution = 0.1 # Hz per step | |
| self.gamma_sweep_steps = int((100 - 30) / 0.1) # 700 steps | |
| # === Impulse Parameters (Mode 1) === | |
| self.impulse_target = 49.8 # Hz - the discovered resonance | |
| self.impulse_interval = 20 # steps (tuned to ~50 Hz) | |
| self.last_impulse = 0 | |
| self.ring_down_samples = deque(maxlen=100) | |
| self.ring_time = 0.0 | |
| # === Theta-Burst Parameters (Mode 2) === | |
| self.theta_freq = 5.0 # Hz - burst frequency | |
| self.gamma_burst_freq = 50.0 # Hz - within burst | |
| self.burst_duration = 10 # steps per burst | |
| self.burst_count = 0 | |
| self.in_burst = False | |
| # === HFO Parameters (Mode 3) === | |
| self.hfo_start = 80.0 # Hz | |
| self.hfo_end = 500.0 # Hz | |
| self.hfo_sweep_duration = 1000 | |
| # === Noise Parameters (Mode 4) === | |
| self.noise_bandwidth = (30, 100) # Focus on gamma band | |
| self.noise_amplitude = 5.0 | |
| # === Spatial Replay Parameters (Mode 5) === | |
| self.replay_sequence = [] | |
| self.replay_step = 0 | |
| self.replay_speed = 5 # steps per pattern | |
| # === Bi-Frequency Parameters (Mode 6) === | |
| self.carrier_freq = 50.0 # High freq carrier | |
| self.envelope_freq = 5.0 # Theta envelope | |
| # === Frequency Lock Parameters (Mode 7) === | |
| self.locked_freq = 49.8 # User-adjustable | |
| # === State Variables === | |
| self.current_freq = 30.0 | |
| self.phase = 0.0 | |
| self.envelope_phase = 0.0 | |
| self.dt = 0.01 # 10ms per step (~100 Hz update rate) | |
| # === Response Collection === | |
| self.response_history = deque(maxlen=4096) | |
| self.probe_history = deque(maxlen=4096) | |
| self.frequency_log = deque(maxlen=4096) | |
| self.phase_history = deque(maxlen=4096) | |
| # === Bode Plot Data === | |
| self.bode_freqs = [] | |
| self.bode_amplitudes = [] | |
| self.bode_phases = [] | |
| # === Analysis Results === | |
| self.spectrum = np.zeros(1024) | |
| self.resonances = [] | |
| self.peak_frequency = 49.8 | |
| self.q_factor = 0.0 | |
| self.phase_shift = 0.0 | |
| self.coherence = 0.0 | |
| # === Spatial Probe === | |
| self.grid_size = 64 | |
| self.probe_pattern = np.zeros((self.grid_size, self.grid_size), dtype=np.float32) | |
| self._init_replay_sequence() | |
| # === Displays === | |
| self.spectrum_display = None | |
| self.bode_display = None | |
| self.resonance_map = None | |
| def _init_replay_sequence(self): | |
| """Initialize spatial replay patterns mimicking memory episodes.""" | |
| # Create sequence of activation patterns: frontal → parietal → occipital | |
| self.replay_sequence = [] | |
| g = self.grid_size | |
| # Pattern 1: Frontal activation (top of grid) | |
| p1 = np.zeros((g, g), dtype=np.float32) | |
| p1[:g//4, g//4:3*g//4] = 1.0 | |
| self.replay_sequence.append(p1) | |
| # Pattern 2: Central spread | |
| p2 = np.zeros((g, g), dtype=np.float32) | |
| p2[g//4:g//2, g//4:3*g//4] = 1.0 | |
| self.replay_sequence.append(p2) | |
| # Pattern 3: Parietal | |
| p3 = np.zeros((g, g), dtype=np.float32) | |
| p3[g//2:3*g//4, :] = 1.0 | |
| self.replay_sequence.append(p3) | |
| # Pattern 4: Occipital (bottom) | |
| p4 = np.zeros((g, g), dtype=np.float32) | |
| p4[3*g//4:, :] = 1.0 | |
| self.replay_sequence.append(p4) | |
| # Pattern 5: Global synchrony | |
| p5 = np.ones((g, g), dtype=np.float32) * 0.5 | |
| self.replay_sequence.append(p5) | |
| def _read_input(self, name, default=None): | |
| fn = getattr(self, "get_blended_input", None) | |
| if callable(fn): | |
| try: | |
| val = fn(name, "mean") | |
| return val if val is not None else default | |
| except: | |
| return default | |
| return default | |
| def _read_image_input(self, name): | |
| fn = getattr(self, "get_blended_input", None) | |
| if callable(fn): | |
| try: | |
| val = fn(name, "first") | |
| if val is not None and hasattr(val, 'shape'): | |
| return val | |
| except: | |
| pass | |
| return None | |
| def step(self): | |
| self.step_count += 1 | |
| # Read inputs | |
| enable = self._read_input('enable', 1.0) | |
| mode = int(self._read_input('probe_mode', self.probe_mode) or 0) | |
| target = self._read_input('target_freq', None) | |
| modulation = self._read_input('modulation', 0.0) | |
| self.probe_mode = mode % 8 # 8 modes | |
| if target is not None and target > 0: | |
| self.locked_freq = min(target, self.MAX_FREQ) | |
| # Read crystal response | |
| response = self._read_input('crystal_response', 0.0) or 0.0 | |
| self.response_history.append(float(response)) | |
| # Generate probe based on mode | |
| if enable and enable > 0.5: | |
| probe_value = self._generate_probe(modulation) | |
| else: | |
| probe_value = 0.0 | |
| self.probe_history.append(probe_value) | |
| self.frequency_log.append(self.current_freq) | |
| self.phase_history.append(self.phase) | |
| # Update spatial pattern | |
| self._update_probe_pattern() | |
| # Periodic analysis | |
| if self.step_count % 50 == 0: | |
| self._analyze_response() | |
| # Update displays | |
| if self.step_count % 10 == 0: | |
| self._update_displays() | |
| def _generate_probe(self, modulation=0.0): | |
| """Generate probe signal based on current mode.""" | |
| if self.probe_mode == 0: | |
| return self._generate_fine_gamma_sweep() | |
| elif self.probe_mode == 1: | |
| return self._generate_resonant_impulse() | |
| elif self.probe_mode == 2: | |
| return self._generate_theta_burst() | |
| elif self.probe_mode == 3: | |
| return self._generate_hfo_chirp() | |
| elif self.probe_mode == 4: | |
| return self._generate_resonance_noise() | |
| elif self.probe_mode == 5: | |
| return self._generate_spatial_replay() | |
| elif self.probe_mode == 6: | |
| return self._generate_bifreq_modulation() | |
| elif self.probe_mode == 7: | |
| return self._generate_freq_lock(modulation) | |
| return 0.0 | |
| def _generate_fine_gamma_sweep(self): | |
| """Mode 0: High-resolution sweep in gamma band (30-100 Hz).""" | |
| # Calculate position in sweep | |
| sweep_pos = self.step_count % self.gamma_sweep_steps | |
| # Linear frequency sweep for fine resolution | |
| self.current_freq = self.gamma_start + sweep_pos * self.gamma_resolution | |
| self.current_freq = min(self.current_freq, self.gamma_end) | |
| # Phase accumulation | |
| self.phase += 2 * np.pi * self.current_freq * self.dt | |
| self.phase %= (2 * np.pi) | |
| # Record for Bode plot at each frequency | |
| if sweep_pos % 10 == 0: # Sample every 1 Hz | |
| self._record_bode_point() | |
| return np.sin(self.phase) * 10.0 | |
| def _generate_resonant_impulse(self): | |
| """Mode 1: Impulse train tuned to 49.8 Hz resonance.""" | |
| # Interval tuned to resonant frequency | |
| interval = int(1.0 / (self.impulse_target * self.dt)) | |
| interval = max(1, interval) | |
| self.current_freq = self.impulse_target | |
| if self.step_count - self.last_impulse >= interval: | |
| self.last_impulse = self.step_count | |
| # Start collecting ring-down | |
| self.ring_down_samples.clear() | |
| return 50.0 # Sharp impulse | |
| # Collect ring-down response | |
| if len(self.response_history) > 0: | |
| self.ring_down_samples.append(self.response_history[-1]) | |
| self._analyze_ring_down() | |
| return 0.0 | |
| def _generate_theta_burst(self): | |
| """Mode 2: Theta-burst stimulation (5 Hz bursts of gamma).""" | |
| # Theta envelope | |
| self.envelope_phase += 2 * np.pi * self.theta_freq * self.dt | |
| self.envelope_phase %= (2 * np.pi) | |
| # Are we in burst? (positive half of theta cycle) | |
| self.in_burst = np.sin(self.envelope_phase) > 0 | |
| if self.in_burst: | |
| # Generate gamma burst | |
| self.current_freq = self.gamma_burst_freq | |
| self.phase += 2 * np.pi * self.gamma_burst_freq * self.dt | |
| self.phase %= (2 * np.pi) | |
| # Burst amplitude modulated by theta envelope | |
| envelope = np.sin(self.envelope_phase) | |
| return np.sin(self.phase) * envelope * 15.0 | |
| else: | |
| self.current_freq = self.theta_freq | |
| return 0.0 | |
| def _generate_hfo_chirp(self): | |
| """Mode 3: High Frequency Oscillation chirp (80-500 Hz).""" | |
| # Progress through sweep | |
| progress = (self.step_count % self.hfo_sweep_duration) / self.hfo_sweep_duration | |
| # Logarithmic sweep for HFO range | |
| log_start = np.log10(self.hfo_start) | |
| log_end = np.log10(min(self.hfo_end, self.MAX_FREQ)) | |
| log_freq = log_start + progress * (log_end - log_start) | |
| self.current_freq = min(10 ** log_freq, self.MAX_FREQ) | |
| # Phase accumulation | |
| self.phase += 2 * np.pi * self.current_freq * self.dt | |
| self.phase %= (2 * np.pi) | |
| return np.sin(self.phase) * 8.0 | |
| def _generate_resonance_noise(self): | |
| """Mode 4: Band-limited noise for stochastic resonance hunting.""" | |
| self.current_freq = (self.noise_bandwidth[0] + self.noise_bandwidth[1]) / 2 | |
| # Generate white noise | |
| noise = np.random.randn() * self.noise_amplitude | |
| # Simple band-pass approximation using phase modulation | |
| # Add some structure at gamma frequencies | |
| gamma_component = np.sin(self.phase) * 2.0 | |
| self.phase += 2 * np.pi * 50.0 * self.dt # Gamma carrier | |
| self.phase %= (2 * np.pi) | |
| return noise + gamma_component | |
| def _generate_spatial_replay(self): | |
| """Mode 5: Sequential spatial patterns mimicking memory replay.""" | |
| self.current_freq = 10.0 # Nominal | |
| # Change pattern every replay_speed steps | |
| if self.step_count % self.replay_speed == 0: | |
| self.replay_step = (self.replay_step + 1) % len(self.replay_sequence) | |
| # Modulate signal based on current pattern energy | |
| pattern_energy = np.mean(self.replay_sequence[self.replay_step]) | |
| # Return signal proportional to pattern activation | |
| return pattern_energy * 20.0 | |
| def _generate_bifreq_modulation(self): | |
| """Mode 6: Bi-frequency probe (gamma carrier on theta envelope).""" | |
| # Theta envelope | |
| self.envelope_phase += 2 * np.pi * self.envelope_freq * self.dt | |
| self.envelope_phase %= (2 * np.pi) | |
| envelope = (np.sin(self.envelope_phase) + 1) / 2 # 0 to 1 | |
| # Gamma carrier | |
| self.phase += 2 * np.pi * self.carrier_freq * self.dt | |
| self.phase %= (2 * np.pi) | |
| carrier = np.sin(self.phase) | |
| self.current_freq = self.carrier_freq | |
| return carrier * envelope * 15.0 | |
| def _generate_freq_lock(self, modulation=0.0): | |
| """Mode 7: Lock to specific frequency with optional modulation.""" | |
| # Apply external modulation to locked frequency | |
| freq = self.locked_freq + modulation * 10.0 | |
| freq = np.clip(freq, 0.1, self.MAX_FREQ) | |
| self.current_freq = freq | |
| self.phase += 2 * np.pi * freq * self.dt | |
| self.phase %= (2 * np.pi) | |
| return np.sin(self.phase) * 10.0 | |
| def _update_probe_pattern(self): | |
| """Update spatial probe pattern based on mode.""" | |
| t = self.step_count * self.dt | |
| if self.probe_mode == 0: | |
| # Gamma sweep: spatial frequency matches temporal | |
| k = self.current_freq / 100.0 # Spatial frequency | |
| x = np.arange(self.grid_size) | |
| y = np.arange(self.grid_size) | |
| X, Y = np.meshgrid(x, y) | |
| self.probe_pattern = np.sin(k * X + self.phase) * np.cos(k * Y) | |
| elif self.probe_mode == 1: | |
| # Impulse: center point activation | |
| self.probe_pattern = np.zeros((self.grid_size, self.grid_size)) | |
| if self.step_count == self.last_impulse: | |
| cx, cy = self.grid_size // 2, self.grid_size // 2 | |
| # Create a sharp gaussian | |
| x = np.arange(self.grid_size) | |
| y = np.arange(self.grid_size) | |
| X, Y = np.meshgrid(x, y) | |
| self.probe_pattern = np.exp(-((X-cx)**2 + (Y-cy)**2) / 8) | |
| elif self.probe_mode == 2: | |
| # Theta-burst: pulsing concentric rings | |
| if self.in_burst: | |
| cx, cy = self.grid_size // 2, self.grid_size // 2 | |
| x = np.arange(self.grid_size) - cx | |
| y = np.arange(self.grid_size) - cy | |
| X, Y = np.meshgrid(x, y) | |
| R = np.sqrt(X**2 + Y**2) | |
| self.probe_pattern = np.sin(R * 0.3 + self.phase) * np.sin(self.envelope_phase) | |
| else: | |
| self.probe_pattern *= 0.9 # Decay | |
| elif self.probe_mode == 3: | |
| # HFO chirp: radial wave expanding | |
| cx, cy = self.grid_size // 2, self.grid_size // 2 | |
| x = np.arange(self.grid_size) - cx | |
| y = np.arange(self.grid_size) - cy | |
| X, Y = np.meshgrid(x, y) | |
| R = np.sqrt(X**2 + Y**2) | |
| k = self.current_freq / 200.0 | |
| self.probe_pattern = np.sin(k * R - self.phase) | |
| elif self.probe_mode == 4: | |
| # Noise: random spatial pattern | |
| self.probe_pattern = np.random.randn(self.grid_size, self.grid_size) * 0.5 | |
| # Apply spatial smoothing | |
| self.probe_pattern = cv2.GaussianBlur(self.probe_pattern, (5, 5), 1.0) | |
| elif self.probe_mode == 5: | |
| # Spatial replay: use current sequence pattern | |
| self.probe_pattern = self.replay_sequence[self.replay_step].copy() | |
| elif self.probe_mode == 6: | |
| # Bi-freq: theta envelope modulates spatial gamma pattern | |
| k = self.carrier_freq / 100.0 | |
| x = np.arange(self.grid_size) | |
| y = np.arange(self.grid_size) | |
| X, Y = np.meshgrid(x, y) | |
| envelope = (np.sin(self.envelope_phase) + 1) / 2 | |
| self.probe_pattern = np.sin(k * X + self.phase) * envelope | |
| elif self.probe_mode == 7: | |
| # Freq lock: standing wave at locked frequency | |
| k = self.locked_freq / 100.0 | |
| x = np.arange(self.grid_size) | |
| y = np.arange(self.grid_size) | |
| X, Y = np.meshgrid(x, y) | |
| self.probe_pattern = np.sin(k * X) * np.sin(k * Y) * np.sin(self.phase) | |
| # Normalize | |
| self.probe_pattern = np.clip(self.probe_pattern, -1, 1).astype(np.float32) | |
| def _record_bode_point(self): | |
| """Record amplitude and phase for Bode plot.""" | |
| if len(self.response_history) < 50: | |
| return | |
| # Get recent response and probe | |
| response = np.array(list(self.response_history)[-50:]) | |
| probe = np.array(list(self.probe_history)[-50:]) | |
| # Amplitude ratio | |
| resp_amp = np.std(response) | |
| probe_amp = np.std(probe) | |
| amplitude = resp_amp / (probe_amp + 0.01) | |
| # Phase estimation via cross-correlation | |
| if len(response) > 10: | |
| corr = np.correlate(probe - np.mean(probe), | |
| response - np.mean(response), mode='full') | |
| lag = np.argmax(corr) - len(probe) + 1 | |
| phase = lag * self.dt * self.current_freq * 360 # degrees | |
| else: | |
| phase = 0 | |
| # Store | |
| self.bode_freqs.append(self.current_freq) | |
| self.bode_amplitudes.append(amplitude) | |
| self.bode_phases.append(phase % 360) | |
| # Keep limited history | |
| if len(self.bode_freqs) > 500: | |
| self.bode_freqs = self.bode_freqs[-500:] | |
| self.bode_amplitudes = self.bode_amplitudes[-500:] | |
| self.bode_phases = self.bode_phases[-500:] | |
| def _analyze_ring_down(self): | |
| """Analyze impulse response ring-down time.""" | |
| if len(self.ring_down_samples) < 20: | |
| return | |
| samples = np.array(list(self.ring_down_samples)) | |
| envelope = np.abs(samples) | |
| # Find time to decay to 1/e | |
| if np.max(envelope) > 0: | |
| threshold = np.max(envelope) / np.e | |
| below_threshold = np.where(envelope < threshold)[0] | |
| if len(below_threshold) > 0: | |
| self.ring_time = below_threshold[0] * self.dt * 1000 # ms | |
| else: | |
| self.ring_time = len(samples) * self.dt * 1000 | |
| def _analyze_response(self): | |
| """Comprehensive response analysis.""" | |
| if len(self.response_history) < 256: | |
| return | |
| response = np.array(list(self.response_history)[-1024:]) | |
| probe = np.array(list(self.probe_history)[-1024:]) | |
| # === Spectrum Analysis === | |
| fft = np.fft.rfft(response) | |
| magnitude = np.abs(fft) | |
| freqs = np.fft.rfftfreq(len(response), d=self.dt) | |
| # Resample to fixed size | |
| if len(magnitude) > 10: | |
| self.spectrum = np.interp( | |
| np.linspace(0, len(magnitude)-1, 1024), | |
| np.arange(len(magnitude)), | |
| magnitude | |
| ) | |
| # === Peak Detection === | |
| self.resonances = [] | |
| for i in range(2, len(magnitude) - 2): | |
| if (magnitude[i] > magnitude[i-1] and | |
| magnitude[i] > magnitude[i+1] and | |
| magnitude[i] > np.mean(magnitude) * 2): | |
| freq = freqs[i] if i < len(freqs) else 0 | |
| amp = magnitude[i] | |
| # Q factor estimation | |
| half_max = magnitude[i] / 2 | |
| width = 1 | |
| for j in range(1, min(20, i, len(magnitude)-i)): | |
| if magnitude[i-j] < half_max or magnitude[i+j] < half_max: | |
| width = j | |
| break | |
| df = freqs[1] - freqs[0] if len(freqs) > 1 else 1 | |
| q = freq / (2 * width * df + 0.001) | |
| self.resonances.append((freq, amp, q)) | |
| self.resonances.sort(key=lambda x: x[1], reverse=True) | |
| if self.resonances: | |
| self.peak_frequency = self.resonances[0][0] | |
| self.q_factor = self.resonances[0][2] | |
| # === Coherence === | |
| if len(probe) > 0 and len(response) > 0: | |
| # Cross-correlation based coherence | |
| corr = np.correlate(probe - np.mean(probe), | |
| response - np.mean(response), mode='valid') | |
| auto_p = np.correlate(probe - np.mean(probe), | |
| probe - np.mean(probe), mode='valid') | |
| auto_r = np.correlate(response - np.mean(response), | |
| response - np.mean(response), mode='valid') | |
| if auto_p[0] > 0 and auto_r[0] > 0: | |
| self.coherence = np.max(np.abs(corr)) / np.sqrt(auto_p[0] * auto_r[0]) | |
| def _update_displays(self): | |
| """Create all visualization outputs.""" | |
| size = 400 | |
| # === Spectrum Display === | |
| self._update_spectrum_display(size) | |
| # === Bode Plot Display === | |
| self._update_bode_display(size) | |
| # === Resonance Map === | |
| self._update_resonance_map(size) | |
| def _update_spectrum_display(self, size): | |
| """Spectrum with mode info and resonance markers.""" | |
| img = np.zeros((size, size, 3), dtype=np.uint8) | |
| mode_names = [ | |
| "Fine Gamma (30-100Hz)", "Resonant Impulse", "Theta-Burst", | |
| "HFO Chirp", "Noise Hunt", "Spatial Replay", | |
| "Bi-Freq Mod", "Freq Lock" | |
| ] | |
| # Draw spectrum | |
| if np.max(self.spectrum) > 0: | |
| spec_norm = self.spectrum / np.max(self.spectrum) | |
| for i in range(min(len(spec_norm), size)): | |
| height = int(spec_norm[i] * (size - 80)) | |
| x = int(i * size / len(spec_norm)) | |
| hue = int(i * 180 / len(spec_norm)) | |
| color = self._hsv_to_rgb(hue, 255, 200) | |
| cv2.line(img, (x, size - 40), (x, size - 40 - height), color, 1) | |
| # Mark resonances | |
| for freq, amp, q in self.resonances[:5]: | |
| if freq > 0 and freq < 100: | |
| x = int(freq * size / 100) | |
| cv2.line(img, (x, 0), (x, 30), (0, 255, 255), 2) | |
| # Labels | |
| cv2.putText(img, f"Mode: {mode_names[self.probe_mode]}", (10, 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| cv2.putText(img, f"Freq: {self.current_freq:.1f} Hz", (10, 40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (200, 200, 200), 1) | |
| cv2.putText(img, f"Peak: {self.peak_frequency:.1f} Hz", (10, 60), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (100, 255, 100), 1) | |
| cv2.putText(img, f"Q: {self.q_factor:.1f}", (10, 80), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (100, 255, 100), 1) | |
| cv2.putText(img, f"Coherence: {self.coherence:.2f}", (200, 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 200, 100), 1) | |
| cv2.putText(img, f"Ring-down: {self.ring_time:.1f} ms", (200, 40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (200, 200, 200), 1) | |
| # Frequency axis | |
| for f in [10, 30, 50, 70, 100]: | |
| x = int(f * size / 100) | |
| cv2.putText(img, str(f), (x, size - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.3, (150, 150, 150), 1) | |
| self.spectrum_display = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| def _update_bode_display(self, size): | |
| """Bode plot: amplitude and phase vs frequency.""" | |
| img = np.zeros((size, size, 3), dtype=np.uint8) | |
| if len(self.bode_freqs) > 2: | |
| freqs = np.array(self.bode_freqs) | |
| amps = np.array(self.bode_amplitudes) | |
| phases = np.array(self.bode_phases) | |
| # Sort by frequency | |
| idx = np.argsort(freqs) | |
| freqs = freqs[idx] | |
| amps = amps[idx] | |
| phases = phases[idx] | |
| # Amplitude plot (top half) | |
| if np.max(amps) > 0: | |
| amp_norm = amps / np.max(amps) | |
| for i in range(len(freqs) - 1): | |
| x1 = int((freqs[i] - 30) / 70 * size) | |
| x2 = int((freqs[i+1] - 30) / 70 * size) | |
| y1 = int(size/2 - amp_norm[i] * (size/2 - 40)) | |
| y2 = int(size/2 - amp_norm[i+1] * (size/2 - 40)) | |
| if 0 <= x1 < size and 0 <= x2 < size: | |
| cv2.line(img, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| # Phase plot (bottom half) | |
| phase_norm = phases / 360 | |
| for i in range(len(freqs) - 1): | |
| x1 = int((freqs[i] - 30) / 70 * size) | |
| x2 = int((freqs[i+1] - 30) / 70 * size) | |
| y1 = int(size - 20 - phase_norm[i] * (size/2 - 40)) | |
| y2 = int(size - 20 - phase_norm[i+1] * (size/2 - 40)) | |
| if 0 <= x1 < size and 0 <= x2 < size: | |
| cv2.line(img, (x1, y1), (x2, y2), (255, 100, 100), 2) | |
| # Divider | |
| cv2.line(img, (0, size//2), (size, size//2), (100, 100, 100), 1) | |
| # Labels | |
| cv2.putText(img, "BODE PLOT", (10, 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) | |
| cv2.putText(img, "Amplitude (green)", (10, 40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 255, 0), 1) | |
| cv2.putText(img, "Phase (red)", (10, size//2 + 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 100, 100), 1) | |
| # Mark 49.8 Hz | |
| x_498 = int((49.8 - 30) / 70 * size) | |
| cv2.line(img, (x_498, 0), (x_498, size), (255, 255, 0), 1) | |
| cv2.putText(img, "49.8", (x_498 - 15, size//2 - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 0), 1) | |
| self.bode_display = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| def _update_resonance_map(self, size): | |
| """Spatial map showing probe pattern and resonance hotspots.""" | |
| img = np.zeros((size, size, 3), dtype=np.uint8) | |
| # Draw probe pattern | |
| pattern_scaled = cv2.resize( | |
| ((self.probe_pattern + 1) * 127).astype(np.uint8), | |
| (size, size) | |
| ) | |
| img[:, :, 1] = pattern_scaled | |
| # Overlay resonance info | |
| cv2.putText(img, "PROBE PATTERN", (10, 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) | |
| # List resonances | |
| y = 45 | |
| for i, (freq, amp, q) in enumerate(self.resonances[:8]): | |
| text = f"{i+1}. {freq:.1f} Hz (Q={q:.1f})" | |
| cv2.putText(img, text, (10, y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 255, 0), 1) | |
| y += 16 | |
| # Draw recent time series | |
| if len(self.probe_history) > 50: | |
| probe = np.array(list(self.probe_history)[-100:]) | |
| resp = np.array(list(self.response_history)[-100:]) | |
| p_norm = probe / (np.max(np.abs(probe)) + 0.01) | |
| r_norm = resp / (np.max(np.abs(resp)) + 0.01) | |
| for i in range(len(p_norm) - 1): | |
| x1 = int(i * size / len(p_norm)) | |
| x2 = int((i+1) * size / len(p_norm)) | |
| # Probe (blue) | |
| y1 = int(size - 60 + p_norm[i] * 20) | |
| y2 = int(size - 60 + p_norm[i+1] * 20) | |
| cv2.line(img, (x1, y1), (x2, y2), (255, 100, 100), 1) | |
| # Response (yellow) | |
| if i < len(r_norm) - 1: | |
| y1 = int(size - 30 + r_norm[i] * 20) | |
| y2 = int(size - 30 + r_norm[i+1] * 20) | |
| cv2.line(img, (x1, y1), (x2, y2), (0, 255, 255), 1) | |
| cv2.putText(img, "Probe/Response", (10, size - 70), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.3, (200, 200, 200), 1) | |
| self.resonance_map = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| def _hsv_to_rgb(self, h, s, v): | |
| """Convert HSV to BGR tuple for OpenCV.""" | |
| h = h / 180.0 | |
| s = s / 255.0 | |
| v = v / 255.0 | |
| i = int(h * 6) | |
| f = h * 6 - i | |
| p = v * (1 - s) | |
| q = v * (1 - f * s) | |
| t = v * (1 - (1 - f) * s) | |
| if i % 6 == 0: r, g, b = v, t, p | |
| elif i % 6 == 1: r, g, b = q, v, p | |
| elif i % 6 == 2: r, g, b = p, v, t | |
| elif i % 6 == 3: r, g, b = p, q, v | |
| elif i % 6 == 4: r, g, b = t, p, v | |
| else: r, g, b = v, p, q | |
| return (int(b * 255), int(g * 255), int(r * 255)) | |
| def get_output(self, port_name): | |
| if port_name == 'probe_signal': | |
| return float(self.probe_history[-1]) if self.probe_history else 0.0 | |
| elif port_name == 'probe_image': | |
| img = ((self.probe_pattern + 1) * 127).astype(np.uint8) | |
| return cv2.cvtColor(cv2.applyColorMap(img, cv2.COLORMAP_TWILIGHT), cv2.COLOR_BGR2RGB) | |
| elif port_name == 'spectrum_view': | |
| return self.spectrum_display | |
| elif port_name == 'bode_view': | |
| return self.bode_display | |
| elif port_name == 'resonance_map': | |
| return self.resonance_map | |
| elif port_name == 'peak_freq': | |
| return float(self.peak_frequency) | |
| elif port_name == 'q_factor': | |
| return float(self.q_factor) | |
| elif port_name == 'phase_shift': | |
| return float(self.phase_shift) | |
| elif port_name == 'ring_time': | |
| return float(self.ring_time) | |
| elif port_name == 'coherence': | |
| return float(self.coherence) | |
| return None | |
| def get_display_image(self): | |
| if self.spectrum_display is not None and QtGui: | |
| h, w = self.spectrum_display.shape[:2] | |
| return QtGui.QImage(self.spectrum_display.data, w, h, w * 3, | |
| QtGui.QImage.Format.Format_RGB888).copy() | |
| return None | |
| def get_config_options(self): | |
| return [ | |
| ("Probe Mode (0-7)", "probe_mode", self.probe_mode, None), | |
| ("Gamma Start (Hz)", "gamma_start", self.gamma_start, None), | |
| ("Gamma End (Hz)", "gamma_end", self.gamma_end, None), | |
| ("Gamma Resolution (Hz)", "gamma_resolution", self.gamma_resolution, None), | |
| ("Impulse Target (Hz)", "impulse_target", self.impulse_target, None), | |
| ("Theta Freq (Hz)", "theta_freq", self.theta_freq, None), | |
| ("Gamma Burst Freq (Hz)", "gamma_burst_freq", self.gamma_burst_freq, None), | |
| ("HFO Start (Hz)", "hfo_start", self.hfo_start, None), | |
| ("HFO End (Hz)", "hfo_end", self.hfo_end, None), | |
| ("Carrier Freq (Hz)", "carrier_freq", self.carrier_freq, None), | |
| ("Envelope Freq (Hz)", "envelope_freq", self.envelope_freq, None), | |
| ("Locked Freq (Hz)", "locked_freq", self.locked_freq, None), | |
| ("Replay Speed (steps)", "replay_speed", self.replay_speed, None), | |
| ] | |
| def set_config_options(self, options): | |
| if isinstance(options, dict): | |
| for key, value in options.items(): | |
| if hasattr(self, key): | |
| setattr(self, key, value) | |