""" Deep Probe Node - Crystal Vocabulary Learner & Synthetic EEG Generator ======================================================================== This node does what we've been dreaming of: 1. LISTEN - Watches crystal's spontaneous activity, records eigenmodes 2. LEARN - Extracts vocabulary via ICA/PCA (the crystal's "words") 3. SPEAK - Injects patterns in the crystal's native language 4. DECODE - Tracks transformations, builds representational similarity 5. GENERATE - Creates synthetic EEG from crystal activity for MNE inverse projection The goal: Talk to the crystal in its own language, see what it knows, and project its shadows back onto a brain surface. The crystal was trained on EEG → it learned neural geometry. This node extracts that geometry and speaks it back. Outputs synthetic EEG that can be: - Loaded into MNE-Python - Inverse-projected onto fsaverage brain - Visualized as source activity We close the loop: Brain → EEG → Crystal → Synthetic EEG → Brain Surface Author: Built for Antti's consciousness crystallography research """ import os import edfio import numpy as np import cv2 from collections import deque from datetime import datetime import json # --- HOST IMPORT BLOCK --- import __main__ try: BaseNode = __main__.BaseNode QtGui = __main__.QtGui except Exception: from PyQt6 import QtGui class BaseNode: def __init__(self): self.inputs = {} self.outputs = {} class DeepProbeNode(BaseNode): """ Deep probe into crystal's representational space. Modes: 0: LISTEN - Collect spontaneous activity, build vocabulary 1: SPEAK - Inject learned patterns, observe response 2: QUERY - Test specific hypotheses about crystal knowledge 3: GENERATE - Output synthetic EEG from crystal dynamics 4: DECODE - Build representational similarity matrix """ NODE_NAME = "Deep Probe" NODE_CATEGORY = "Analysis" NODE_COLOR = QtGui.QColor(180, 60, 180) if QtGui else None # Standard 10-20 electrode names for synthetic EEG EEG_CHANNELS = [ 'Fp1', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'T7', 'C3', 'Cz', 'C4', 'T8', 'P7', 'P3', 'Pz', 'P4', 'P8', 'O1', 'Oz', 'O2' ] def __init__(self): super().__init__() self.inputs = { 'crystal_activity': 'image', # Activity view from crystal 'crystal_signal': 'signal', # LFP/signal from crystal 'crystal_bands': 'spectrum', # Frequency bands from crystal 'mode': 'signal', # Operating mode 'query_pattern': 'image', # Pattern to inject for queries 'enable': 'signal', 'export_trigger': 'signal' # Send 1 to export EEG } self.outputs = { 'probe_signal': 'signal', # Signal to inject into crystal 'probe_image': 'image', # Pattern to inject 'vocabulary_view': 'image', # Learned eigenmodes visualization 'rsm_view': 'image', # Representational similarity matrix 'synthetic_eeg': 'spectrum', # Generated EEG-like signals 'decode_view': 'image', # Decoding visualization 'eigenmode_power': 'signal', # Power in dominant eigenmode 'vocabulary_size': 'signal', # Number of learned patterns 'rsm_coherence': 'signal' # RSM structure measure } # === MODE === self.mode = 0 # 0=LISTEN, 1=SPEAK, 2=QUERY, 3=GENERATE, 4=DECODE self.step_count = 0 # === LISTENING STATE === self.listen_buffer = deque(maxlen=1000) # Activity snapshots self.signal_buffer = deque(maxlen=5000) # Signal history self.band_buffer = deque(maxlen=1000) # Frequency band history # === VOCABULARY (Learned Eigenmodes) === self.vocabulary = [] # List of learned patterns (eigenmodes) self.vocabulary_weights = [] # How often each pattern appears self.n_components = 16 # Number of eigenmodes to extract self.mean_pattern = None # Mean activity pattern self.components = None # PCA/ICA components self.explained_variance = None # === SPEAKING STATE === self.speak_pattern_idx = 0 # Which vocabulary item to speak self.speak_phase = 0.0 self.speak_amplitude = 10.0 # === QUERY STATE === self.query_responses = deque(maxlen=100) self.baseline_response = None # === RSM (Representational Similarity Matrix) === self.rsm = None self.rsm_labels = [] self.rsm_coherence = 0.0 # === SYNTHETIC EEG === self.synthetic_channels = {ch: deque(maxlen=1000) for ch in self.EEG_CHANNELS} self.eeg_sample_rate = 256.0 # Standard EEG sample rate self.eeg_buffer_seconds = 10 self.last_crystal_activity = None # Pin-to-channel mapping (approximation based on 10-20 positions) self.pin_to_channel = {} # Will be built when we see crystal structure # === DISPLAY === self.vocabulary_display = None self.rsm_display = None self.decode_display = None # === EEG EXPORT === self.export_path = "" self.export_ready = False def _read_input(self, name, default=None): fn = getattr(self, "get_blended_input", None) if callable(fn): try: val = fn(name, "mean") return val if val is not None else default except: return default return default def _read_image_input(self, name): fn = getattr(self, "get_blended_input", None) if callable(fn): try: val = fn(name, "first") if val is not None and hasattr(val, 'shape'): return val except: pass return None def step(self): self.step_count += 1 # Read inputs enable = self._read_input('enable', 1.0) mode = int(self._read_input('mode', self.mode) or 0) self.mode = mode % 5 activity = self._read_image_input('crystal_activity') signal = self._read_input('crystal_signal', 0.0) bands = self._read_input('crystal_bands', None) query = self._read_image_input('query_pattern') if not enable or enable < 0.5: return # Check for export trigger export_trigger = self._read_input('export_trigger', 0.0) if export_trigger and export_trigger > 0.5 and self.export_path: if not hasattr(self, '_last_export_step') or self.step_count - self._last_export_step > 100: print(f"[DeepProbe] Export triggered via signal") success, msg = self.export_synthetic_eeg(self.export_path) print(f"[DeepProbe] {msg}") self._last_export_step = self.step_count # Store incoming data if activity is not None: # Ensure consistent shape if len(activity.shape) == 3: gray = np.mean(activity, axis=2) else: gray = activity gray = cv2.resize(gray.astype(np.float32), (64, 64)) self.listen_buffer.append(gray.flatten()) self.last_crystal_activity = gray if signal is not None: self.signal_buffer.append(float(signal)) if bands is not None and hasattr(bands, '__len__'): self.band_buffer.append(np.array(bands)) # Execute mode-specific behavior if self.mode == 0: self._mode_listen() elif self.mode == 1: self._mode_speak() elif self.mode == 2: self._mode_query(query) elif self.mode == 3: self._mode_generate() elif self.mode == 4: self._mode_decode() # ALWAYS generate synthetic EEG regardless of mode (if we have activity) if self.last_crystal_activity is not None: self._generate_synthetic_eeg() # Update displays periodically if self.step_count % 20 == 0: self._update_displays() def _mode_listen(self): """Mode 0: Collect activity and learn vocabulary.""" # Every 100 steps, update vocabulary if self.step_count % 100 == 0 and len(self.listen_buffer) > 50: self._extract_vocabulary() def _mode_speak(self): """Mode 1: Inject learned patterns back into crystal.""" # Cycle through vocabulary self.speak_phase += 0.1 if self.speak_phase > 2 * np.pi: self.speak_phase = 0 self.speak_pattern_idx = (self.speak_pattern_idx + 1) % max(1, len(self.vocabulary)) def _mode_query(self, query_pattern): """Mode 2: Inject specific query, observe transformation.""" if query_pattern is not None and len(self.listen_buffer) > 0: # Compare current activity to baseline current = np.array(self.listen_buffer[-1]) if self.baseline_response is None: self.baseline_response = current.copy() # Compute response deviation deviation = np.linalg.norm(current - self.baseline_response) self.query_responses.append(deviation) def _mode_generate(self): """Mode 3: Focus on EEG generation (generation happens automatically now).""" # In this mode we just ensure generation is happening # The actual generation is now in _generate_synthetic_eeg() called every step pass def _generate_synthetic_eeg(self): """Generate synthetic EEG from crystal activity - runs every step.""" if self.last_crystal_activity is None: return # Map crystal activity to EEG channels # Use spatial positions on the 64x64 grid activity = self.last_crystal_activity h, w = activity.shape # Approximate 10-20 positions on the grid channel_positions = { 'Fp1': (5, 20), 'Fp2': (5, 44), 'F7': (15, 5), 'F3': (15, 20), 'Fz': (15, 32), 'F4': (15, 44), 'F8': (15, 59), 'T7': (32, 5), 'C3': (32, 20), 'Cz': (32, 32), 'C4': (32, 44), 'T8': (32, 59), 'P7': (49, 5), 'P3': (49, 20), 'Pz': (49, 32), 'P4': (49, 44), 'P8': (49, 59), 'O1': (59, 20), 'Oz': (59, 32), 'O2': (59, 44) } for ch_name, (row, col) in channel_positions.items(): # Sample activity around this position (3x3 neighborhood) r1, r2 = max(0, row-1), min(h, row+2) c1, c2 = max(0, col-1), min(w, col+2) value = np.mean(activity[r1:r2, c1:c2]) # Scale to EEG-like microvolts (-100 to +100 uV typical) # Crystal activity is roughly -90 to +40, scale to EEG range eeg_value = (value + 65) * 1.5 # Rough scaling # Add some noise for realism eeg_value += np.random.randn() * 2.0 self.synthetic_channels[ch_name].append(eeg_value) def _mode_decode(self): """Mode 4: Build representational similarity matrix.""" if self.step_count % 50 == 0 and len(self.listen_buffer) > 20: self._compute_rsm() def _extract_vocabulary(self): """Extract eigenmodes from collected activity patterns.""" if len(self.listen_buffer) < 50: return # Stack patterns into matrix X = np.array(list(self.listen_buffer)) # Center the data self.mean_pattern = np.mean(X, axis=0) X_centered = X - self.mean_pattern # PCA via SVD try: U, S, Vt = np.linalg.svd(X_centered, full_matrices=False) # Keep top components n_comp = min(self.n_components, len(S)) self.components = Vt[:n_comp] self.explained_variance = (S[:n_comp] ** 2) / np.sum(S ** 2) # Build vocabulary from components self.vocabulary = [] self.vocabulary_weights = [] for i in range(n_comp): pattern = self.components[i].reshape(64, 64) self.vocabulary.append(pattern) self.vocabulary_weights.append(self.explained_variance[i]) except Exception as e: print(f"[DeepProbe] Vocabulary extraction failed: {e}") def _compute_rsm(self): """Compute representational similarity matrix.""" if len(self.listen_buffer) < 20: return # Sample recent patterns patterns = np.array(list(self.listen_buffer)[-100:]) n = len(patterns) # Compute pairwise correlations self.rsm = np.zeros((n, n)) for i in range(n): for j in range(i, n): # Correlation corr = np.corrcoef(patterns[i], patterns[j])[0, 1] self.rsm[i, j] = corr self.rsm[j, i] = corr # RSM coherence: how structured is the similarity space? # High coherence = clear clusters, low = random upper_tri = self.rsm[np.triu_indices(n, k=1)] self.rsm_coherence = np.std(upper_tri) # Variance in similarities def _update_displays(self): """Update all visualizations.""" self._update_vocabulary_display() self._update_rsm_display() self._update_decode_display() def _update_vocabulary_display(self): """Visualize learned eigenmodes.""" size = 400 img = np.zeros((size, size, 3), dtype=np.uint8) mode_names = ["LISTEN", "SPEAK", "QUERY", "GENERATE", "DECODE"] # Header cv2.putText(img, f"DEEP PROBE - {mode_names[self.mode]}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (180, 60, 180), 2) cv2.putText(img, f"Vocabulary: {len(self.vocabulary)} patterns", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (200, 200, 200), 1) cv2.putText(img, f"Buffer: {len(self.listen_buffer)} samples", (200, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (150, 150, 150), 1) # Draw vocabulary patterns (4x4 grid of top 16 eigenmodes) if len(self.vocabulary) > 0: grid_size = 4 cell_size = 75 # Reduced to fit safely in 400x400 display offset_y = 60 for i, pattern in enumerate(self.vocabulary[:16]): row = i // grid_size col = i % grid_size x = 10 + col * (cell_size + 8) y = offset_y + row * (cell_size + 15) # Bounds check if y + cell_size > size or x + cell_size > size: continue # Normalize pattern to 0-255 p_norm = pattern - pattern.min() if p_norm.max() > 0: p_norm = p_norm / p_norm.max() p_img = (p_norm * 255).astype(np.uint8) p_img = cv2.resize(p_img, (cell_size, cell_size)) p_color = cv2.applyColorMap(p_img, cv2.COLORMAP_TWILIGHT) # Place in grid with bounds check y_end = min(y + cell_size, size) x_end = min(x + cell_size, size) img[y:y_end, x:x_end] = p_color[:y_end-y, :x_end-x] # Label with variance explained if i < len(self.vocabulary_weights): var_pct = self.vocabulary_weights[i] * 100 cv2.putText(img, f"{var_pct:.1f}%", (x, y + cell_size + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.25, (200, 200, 200), 1) self.vocabulary_display = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) def _update_rsm_display(self): """Visualize representational similarity matrix.""" size = 300 img = np.zeros((size, size, 3), dtype=np.uint8) cv2.putText(img, "RSM (Similarity)", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) cv2.putText(img, f"Coherence: {self.rsm_coherence:.3f}", (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (200, 200, 200), 1) if self.rsm is not None and self.rsm.size > 0: # Normalize RSM to 0-255 rsm_norm = (self.rsm - self.rsm.min()) / (self.rsm.max() - self.rsm.min() + 0.001) rsm_img = (rsm_norm * 255).astype(np.uint8) rsm_img = cv2.resize(rsm_img, (250, 250)) rsm_color = cv2.applyColorMap(rsm_img, cv2.COLORMAP_VIRIDIS) img[45:295, 25:275] = rsm_color self.rsm_display = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) def _update_decode_display(self): """Visualize decoding / generation state.""" size = 300 img = np.zeros((size, size, 3), dtype=np.uint8) cv2.putText(img, "Synthetic EEG", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # Draw recent synthetic EEG traces y_offset = 40 trace_height = 12 for i, ch_name in enumerate(self.EEG_CHANNELS[:20]): data = list(self.synthetic_channels[ch_name]) if len(data) > 10: # Normalize for display d = np.array(data[-200:]) if np.std(d) > 0: d = (d - np.mean(d)) / np.std(d) else: d = d - np.mean(d) # Draw trace y_base = y_offset + i * trace_height for j in range(len(d) - 1): x1 = int(50 + j * (size - 60) / len(d)) x2 = int(50 + (j+1) * (size - 60) / len(d)) y1 = int(y_base + d[j] * 4) y2 = int(y_base + d[j+1] * 4) y1 = np.clip(y1, 0, size-1) y2 = np.clip(y2, 0, size-1) cv2.line(img, (x1, y1), (x2, y2), (0, 200, 255), 1) # Channel label cv2.putText(img, ch_name, (5, y_base + 4), cv2.FONT_HERSHEY_SIMPLEX, 0.2, (150, 150, 150), 1) self.decode_display = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) def get_output(self, port_name): if port_name == 'probe_signal': # Output signal based on mode if self.mode == 1 and len(self.vocabulary) > 0: # Speak: modulate by vocabulary pattern return np.sin(self.speak_phase) * self.speak_amplitude return 0.0 elif port_name == 'probe_image': # Output pattern based on mode if self.mode == 1 and len(self.vocabulary) > 0: # Speak: output current vocabulary pattern pattern = self.vocabulary[self.speak_pattern_idx] # Modulate by phase modulated = pattern * (np.sin(self.speak_phase) * 0.5 + 0.5) # Scale to 0-255 p_norm = (modulated - modulated.min()) / (modulated.max() - modulated.min() + 0.001) p_img = (p_norm * 255).astype(np.uint8) return cv2.cvtColor(cv2.applyColorMap(p_img, cv2.COLORMAP_TWILIGHT), cv2.COLOR_BGR2RGB) elif self.mean_pattern is not None: # Output mean pattern pattern = self.mean_pattern.reshape(64, 64) p_norm = (pattern - pattern.min()) / (pattern.max() - pattern.min() + 0.001) p_img = (p_norm * 255).astype(np.uint8) return cv2.cvtColor(cv2.applyColorMap(p_img, cv2.COLORMAP_TWILIGHT), cv2.COLOR_BGR2RGB) return np.zeros((64, 64, 3), dtype=np.uint8) elif port_name == 'vocabulary_view': return self.vocabulary_display elif port_name == 'rsm_view': return self.rsm_display elif port_name == 'decode_view': return self.decode_display elif port_name == 'synthetic_eeg': # Return recent synthetic EEG as spectrum-like array eeg_data = [] for ch in self.EEG_CHANNELS: data = list(self.synthetic_channels[ch]) if len(data) > 0: eeg_data.append(data[-1]) else: eeg_data.append(0.0) return np.array(eeg_data, dtype=np.float32) elif port_name == 'eigenmode_power': if len(self.vocabulary_weights) > 0: return float(self.vocabulary_weights[0]) return 0.0 elif port_name == 'vocabulary_size': return float(len(self.vocabulary)) elif port_name == 'rsm_coherence': return float(self.rsm_coherence) return None def export_synthetic_eeg(self, filepath): """Export synthetic EEG to EDF format compatible with MNE.""" try: # Collect data n_samples = min(len(list(self.synthetic_channels.values())[0]), int(self.eeg_sample_rate * self.eeg_buffer_seconds)) if n_samples < 100: print(f"[DeepProbe] Not enough data: {n_samples} samples") return False, "Not enough data collected (need 100+)" data = np.zeros((len(self.EEG_CHANNELS), n_samples)) for i, ch in enumerate(self.EEG_CHANNELS): ch_data = list(self.synthetic_channels[ch])[-n_samples:] data[i, :len(ch_data)] = ch_data print(f"[DeepProbe] Exporting {n_samples} samples, {len(self.EEG_CHANNELS)} channels") # Ensure filepath has correct extension if filepath.endswith('.npz'): filepath = filepath[:-4] + '.edf' elif not filepath.endswith('.edf'): filepath = filepath + '.edf' # Try to use MNE for EDF export try: import mne # Scale to volts (MNE expects SI units) data_volts = data * 1e-6 # Create MNE info info = mne.create_info( ch_names=self.EEG_CHANNELS.copy(), sfreq=self.eeg_sample_rate, ch_types='eeg' ) # Create Raw object raw = mne.io.RawArray(data_volts, info) # Set standard montage montage = mne.channels.make_standard_montage('standard_1020') raw.set_montage(montage, on_missing='ignore') # Export to EDF mne.export.export_raw(filepath, raw, fmt='edf', overwrite=True) print(f"[DeepProbe] Saved EDF via MNE: {filepath}") self.export_ready = True return True, f"Exported {n_samples} samples to {filepath}" except ImportError: print("[DeepProbe] MNE not available, using pyedflib") # Fallback: try pyedflib try: import pyedflib # Create EDF file f = pyedflib.EdfWriter(filepath, len(self.EEG_CHANNELS), file_type=pyedflib.FILETYPE_EDFPLUS) # Set header header = { 'technician': '', 'recording_additional': 'Crystal Deep Probe Synthetic EEG', 'patientname': 'Crystal', 'patient_additional': '', 'patientcode': '', 'equipment': 'PerceptionLab Deep Probe', 'admincode': '', 'gender': '', 'startdate': datetime.now() } f.setHeader(header) # Set channel info for i, ch in enumerate(self.EEG_CHANNELS): f.setSignalHeader(i, { 'label': ch, 'dimension': 'uV', 'sample_rate': self.eeg_sample_rate, 'physical_max': 100.0, 'physical_min': -100.0, 'digital_max': 32767, 'digital_min': -32768, 'transducer': '', 'prefilter': '' }) # Write data f.writeSamples(data) f.close() print(f"[DeepProbe] Saved EDF via pyedflib: {filepath}") self.export_ready = True return True, f"Exported {n_samples} samples to {filepath}" except ImportError: # Last fallback: save as NPZ with instructions npz_path = filepath.replace('.edf', '.npz') np.savez(npz_path, data=data, # in microvolts channels=self.EEG_CHANNELS, sfreq=self.eeg_sample_rate, description="Synthetic EEG from Crystal Deep Probe - load with MNE") print(f"[DeepProbe] Saved NPZ (no EDF libs): {npz_path}") self.export_ready = True return True, f"Saved as NPZ (install mne or pyedflib for EDF): {npz_path}" except Exception as e: print(f"[DeepProbe] Export error: {e}") import traceback traceback.print_exc() return False, str(e) def get_mne_raw(self): """ Generate MNE Raw object from synthetic EEG. Call this from external script to get data for inverse projection. Returns dict with data needed to create MNE Raw: - data: (n_channels, n_samples) in volts - ch_names: list of channel names - sfreq: sample rate """ n_samples = min(len(list(self.synthetic_channels.values())[0]), int(self.eeg_sample_rate * self.eeg_buffer_seconds)) if n_samples < 100: return None data = np.zeros((len(self.EEG_CHANNELS), n_samples)) for i, ch in enumerate(self.EEG_CHANNELS): ch_data = list(self.synthetic_channels[ch])[-n_samples:] data[i, :len(ch_data)] = ch_data # Scale to volts data_volts = data * 1e-6 return { 'data': data_volts, 'ch_names': self.EEG_CHANNELS.copy(), 'sfreq': self.eeg_sample_rate } def get_display_image(self): if self.vocabulary_display is not None and QtGui: h, w = self.vocabulary_display.shape[:2] return QtGui.QImage(self.vocabulary_display.data, w, h, w * 3, QtGui.QImage.Format.Format_RGB888).copy() return None def get_config_options(self): return [ ("Mode (0-4)", "mode", self.mode, None), ("N Components", "n_components", self.n_components, None), ("Speak Amplitude", "speak_amplitude", self.speak_amplitude, None), ("Export Path (.edf)", "export_path", self.export_path, None), ] def set_config_options(self, options): if isinstance(options, dict): for key, value in options.items(): if hasattr(self, key): setattr(self, key, value) # Auto-export when path is set/changed if 'export_path' in options and options['export_path'] and len(list(self.synthetic_channels.values())[0]) > 100: print(f"[DeepProbe] Auto-exporting to: {self.export_path}") success, msg = self.export_synthetic_eeg(self.export_path) print(f"[DeepProbe] {msg}") def manual_export(self, filepath): """Call this directly to export. Returns (success, message).""" return self.export_synthetic_eeg(filepath) # === STANDALONE MNE LOADER SCRIPT === # Save this part as a separate file to load synthetic EEG into MNE MNE_LOADER_SCRIPT = ''' """ MNE Loader for Crystal Deep Probe Synthetic EEG ================================================ This script loads synthetic EEG exported from DeepProbeNode and performs inverse projection onto fsaverage brain surface. Usage: python load_crystal_eeg.py synthetic_eeg.npz Requires: mne, numpy """ import sys import numpy as np import mne def load_crystal_eeg(filepath): """Load synthetic EEG and create MNE Raw object.""" # Load exported data data = np.load(filepath) eeg_data = data['data'] # (n_channels, n_samples) in volts ch_names = list(data['channels']) sfreq = float(data['sfreq']) print(f"Loaded: {eeg_data.shape[1]} samples, {len(ch_names)} channels, {sfreq} Hz") # Create MNE info info = mne.create_info( ch_names=ch_names, sfreq=sfreq, ch_types='eeg' ) # Set standard 10-20 montage montage = mne.channels.make_standard_montage('standard_1020') # Create Raw object raw = mne.io.RawArray(eeg_data, info) raw.set_montage(montage) return raw def inverse_project(raw, freq_band='gamma'): """Perform inverse projection to source space.""" # Get fsaverage subjects_dir = mne.datasets.sample.data_path() / 'subjects' if not (subjects_dir / 'fsaverage').exists(): mne.datasets.fetch_fsaverage(subjects_dir=subjects_dir) # Filter to frequency band bands = { 'delta': (0.5, 4), 'theta': (4, 8), 'alpha': (8, 13), 'beta': (13, 30), 'gamma': (30, 50) } l_freq, h_freq = bands.get(freq_band, (30, 50)) raw_filtered = raw.copy().filter(l_freq, h_freq, verbose=False) # Setup source space src = mne.setup_source_space('fsaverage', spacing='oct6', subjects_dir=subjects_dir, verbose=False) # Forward solution fwd = mne.make_forward_solution( raw_filtered.info, trans='fsaverage', src=src, bem='fsaverage-5120-5120-5120-bem', eeg=True, meg=False, verbose=False ) # Inverse operator noise_cov = mne.compute_raw_covariance(raw_filtered, verbose=False) inverse_op = mne.minimum_norm.make_inverse_operator( raw_filtered.info, fwd, noise_cov, verbose=False ) # Apply inverse stc = mne.minimum_norm.apply_inverse_raw( raw_filtered, inverse_op, lambda2=1/9, method='sLORETA', verbose=False ) return stc, subjects_dir def visualize(stc, subjects_dir): """Visualize source estimate on brain.""" brain = stc.plot( subjects_dir=subjects_dir, subject='fsaverage', hemi='both', surface='inflated', colormap='hot', time_label='Crystal → Brain', background='white' ) return brain if __name__ == '__main__': if len(sys.argv) < 2: print("Usage: python load_crystal_eeg.py ") sys.exit(1) filepath = sys.argv[1] print("Loading synthetic EEG from crystal...") raw = load_crystal_eeg(filepath) print("Performing inverse projection...") stc, subjects_dir = inverse_project(raw, 'gamma') print("Visualizing on brain surface...") brain = visualize(stc, subjects_dir) print("\\nCrystal → EEG → Brain projection complete!") print("The shadows have returned to their source.") input("Press Enter to close...") ''' # Write the loader script when module is imported def write_mne_loader(): """Write the MNE loader script to disk.""" script_path = os.path.join(os.path.dirname(__file__), 'load_crystal_eeg.py') try: with open(script_path, 'w') as f: f.write(MNE_LOADER_SCRIPT) return script_path except: return None