Spaces:
Running
Running
File size: 6,622 Bytes
3bb804c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
"""
EEG File Source Node - Loads a real .edf file and streams band power
Place this file in the 'nodes' folder
"""
import numpy as np
from PyQt6 import QtGui
import os
import sys
# Add parent directory to path to import BaseNode
# --- This is the new, correct block ---
import __main__
BaseNode = __main__.BaseNode
PA_INSTANCE = getattr(__main__, "PA_INSTANCE", None)
# ------------------------------------
try:
import mne
from scipy import signal
MNE_AVAILABLE = True
except ImportError:
MNE_AVAILABLE = False
# Define brain regions from brain_set_system.py
EEG_REGIONS = {
"All": [],
"Occipital": ['O1', 'O2', 'OZ', 'POZ', 'PO3', 'PO4', 'PO7', 'PO8'],
"Temporal": ['T7', 'T8', 'TP7', 'TP8', 'FT7', 'FT8'],
"Parietal": ['P1', 'P2', 'P3', 'P4', 'PZ', 'CP1', 'CP2'],
"Frontal": ['FP1', 'FP2', 'FZ', 'F1', 'F2', 'F3', 'F4'],
"Central": ['C1', 'C2', 'C3', 'C4', 'CZ', 'FC1', 'FC2']
}
class EEGFileSourceNode(BaseNode):
NODE_CATEGORY = "Source"
NODE_COLOR = QtGui.QColor(60, 140, 160) # A clinical blue
def __init__(self, edf_file_path=""):
super().__init__()
self.node_title = "EEG File Source"
self.outputs = {
'delta': 'signal',
'theta': 'signal',
'alpha': 'signal',
'beta': 'signal',
'gamma': 'signal',
# --- FIX: ADD NEW RAW SIGNAL OUTPUT ---
'raw_signal': 'signal'
}
self.edf_file_path = edf_file_path
self.selected_region = "Occipital"
self._last_path = ""
self._last_region = ""
self.raw = None
self.fs = 100.0 # Resample to this frequency
self.current_time = 0.0
self.window_size = 1.0 # 1-second window
self.output_powers = {band: 0.0 for band in self.outputs}
self.output_powers['raw_signal'] = 0.0 # Initialize new output
self.history = np.zeros(64) # For display
if not MNE_AVAILABLE:
self.node_title = "EEG (MNE Required!)"
print("Error: EEGFileSourceNode requires 'mne' and 'scipy'.")
print("Please run: pip install mne")
def load_edf(self):
"""Loads or re-loads the EDF file based on config."""
if not MNE_AVAILABLE or not os.path.exists(self.edf_file_path):
self.raw = None
self.node_title = f"EEG (File Not Found)"
return
try:
raw = mne.io.read_raw_edf(self.edf_file_path, preload=True, verbose=False)
raw.rename_channels(lambda name: name.strip().replace('.', '').upper())
if self.selected_region != "All":
region_channels = EEG_REGIONS[self.selected_region]
available_channels = [ch for ch in region_channels if ch in raw.ch_names]
if not available_channels:
print(f"Warning: No channels found for region {self.selected_region}")
self.raw = None
return
raw.pick_channels(available_channels)
raw.resample(self.fs, verbose=False)
self.raw = raw
self.current_time = 0.0
self._last_path = self.edf_file_path
self._last_region = self.selected_region
self.node_title = f"EEG ({self.selected_region})"
print(f"Successfully loaded EEG: {self.edf_file_path}")
except Exception as e:
self.raw = None
self.node_title = f"EEG (Load Error)"
print(f"Error loading EEG file {self.edf_file_path}: {e}")
def step(self):
# Check if config changed
if self.edf_file_path != self._last_path or self.selected_region != self._last_region:
self.load_edf()
if self.raw is None:
return # Do nothing if no data
# Get data for the current time window
start_sample = int(self.current_time * self.fs)
end_sample = start_sample + int(self.window_size * self.fs)
if end_sample >= self.raw.n_times:
self.current_time = 0.0 # Loop
start_sample = 0
end_sample = int(self.window_size * self.fs)
data, _ = self.raw[:, start_sample:end_sample]
# Average across all selected channels
if data.ndim > 1:
data = np.mean(data, axis=0)
if data.size == 0:
return
# --- FIX: Calculate and normalize the raw signal output ---
# Output the *normalized* instantaneous level
self.output_powers['raw_signal'] = np.mean(data) * 5.0 # Scale up for visibility
# --- END FIX ---
# Calculate band powers
bands = {
'delta': (1, 4), 'theta': (4, 8), 'alpha': (8, 13),
'beta': (13, 30), 'gamma': (30, 45)
}
nyq = self.fs / 2.0
for band, (low, high) in bands.items():
if band in self.outputs:
b, a = signal.butter(4, [low/nyq, high/nyq], btype='band')
filtered = signal.filtfilt(b, a, data)
power = np.log1p(np.mean(filtered**2))
# Smooth the output
self.output_powers[band] = self.output_powers[band] * 0.8 + power * 0.2
# Update display history with alpha power
self.history[:-1] = self.history[1:]
self.history[-1] = self.output_powers['alpha'] * 0.5 # Scale for vis
# Increment time
self.current_time += (1.0 / 30.0) # Assume ~30fps step rate
def get_output(self, port_name):
return self.output_powers.get(port_name, 0.0)
def get_display_image(self):
w, h = 64, 64
img = np.zeros((h, w), dtype=np.uint8)
# Draw waveform (alpha history)
vis_data = self.history
vis_data = (vis_data - np.min(vis_data)) / (np.max(vis_data) - np.min(vis_data) + 1e-9)
vis_data = vis_data * (h - 1)
for i in range(w - 1):
y1 = int(np.clip(vis_data[i], 0, h - 1))
img[h - 1 - y1, i] = 255
img = np.ascontiguousarray(img)
return QtGui.QImage(img.data, w, h, w, QtGui.QImage.Format.Format_Grayscale8)
def get_config_options(self):
region_options = [(name, name) for name in EEG_REGIONS.keys()]
return [
("EDF File Path", "edf_file_path", self.edf_file_path, None),
("Brain Region", "selected_region", self.selected_region, region_options),
] |