Spaces:
Build error
Build error
| from time import sleep | |
| import time | |
| import numpy as np | |
| from copy import deepcopy | |
| from datetime import datetime | |
| import multiprocessing as mp | |
| import warnings | |
| from threading import Thread, Lock | |
| from portiloop.src import ADS | |
| if ADS: | |
| import alsaaudio | |
| from portiloop.src.hardware.frontend import Frontend | |
| from portiloop.src.hardware.leds import LEDs, Color | |
| from portiloop.src.stimulation import UpStateDelayer | |
| from portiloop.src.processing import FilterPipeline, int_to_float | |
| from portiloop.src.config import mod_config, LEADOFF_CONFIG, FRONTEND_CONFIG, to_ads_frequency | |
| from portiloop.src.utils import FileReader, LiveDisplay, DummyAlsaMixer, EDFRecorder, EDF_PATH, RECORDING_PATH | |
| from IPython.display import clear_output, display | |
| import ipywidgets as widgets | |
| def capture_process(p_data_o, p_msg_io, duration, frequency, python_clock, time_msg_in, channel_states): | |
| """ | |
| Args: | |
| p_data_o: multiprocessing.Pipe: captured datapoints are put here | |
| p_msg_io: mutliprocessing.Pipe: to communicate with the parent process | |
| duration: float: max duration of the experiment in seconds | |
| frequency: float: sampling frequency | |
| ptyhon_clock: bool: if True, the Coral clock is used, otherwise, the ADS interrupts are used | |
| time_msg_in: float: min time between attempts to recv incomming messages | |
| """ | |
| if duration <= 0: | |
| duration = np.inf | |
| sample_time = 1 / frequency | |
| frontend = Frontend() | |
| leds = LEDs() | |
| leds.led2(Color.PURPLE) | |
| leds.aquisition(True) | |
| try: | |
| data = frontend.read_regs(0x00, 1) | |
| assert data == [0x3E], "The communication with the ADS failed, please try again." | |
| leds.led2(Color.BLUE) | |
| config = FRONTEND_CONFIG | |
| if python_clock: # set ADS to 2 * frequency | |
| datarate = 2 * frequency | |
| else: # set ADS to frequency | |
| datarate = frequency | |
| config = mod_config(config, datarate, channel_states) | |
| frontend.write_regs(0x00, config) | |
| data = frontend.read_regs(0x00, len(config)) | |
| assert data == config, f"Wrong config: {data} vs {config}" | |
| frontend.start() | |
| leds.led2(Color.PURPLE) | |
| while not frontend.is_ready(): | |
| pass | |
| # Set up of leds | |
| leds.aquisition(True) | |
| sleep(0.5) | |
| leds.aquisition(False) | |
| sleep(0.5) | |
| leds.aquisition(True) | |
| c = True | |
| it = 0 | |
| t_start = time.time() | |
| t_max = t_start + duration | |
| t = t_start | |
| # first sample: | |
| reading = frontend.read() | |
| datapoint = reading.channels() | |
| p_data_o.send(datapoint) | |
| t_next = t + sample_time | |
| t_chk_msg = t + time_msg_in | |
| # sampling loop: | |
| while c and t < t_max: | |
| t = time.time() | |
| if python_clock: | |
| if t <= t_next: | |
| time.sleep(t_next - t) | |
| t_next += sample_time | |
| reading = frontend.read() | |
| else: | |
| reading = frontend.wait_new_data() | |
| datapoint = reading.channels() | |
| p_data_o.send(datapoint) | |
| # Check for messages | |
| if t >= t_chk_msg: | |
| t_chk_msg = t + time_msg_in | |
| if p_msg_io.poll(): | |
| message = p_msg_io.recv() | |
| if message == 'STOP': | |
| c = False | |
| it += 1 | |
| t = time.time() | |
| tot = (t - t_start) / it | |
| p_msg_io.send(("PRT", f"Average frequency: {1 / tot} Hz for {it} samples")) | |
| finally: | |
| leds.aquisition(False) | |
| leds.close() | |
| frontend.close() | |
| p_msg_io.send('STOP') | |
| p_msg_io.close() | |
| p_data_o.close() | |
| class Capture: | |
| def __init__(self, detector_cls=None, stimulator_cls=None): | |
| # {now.strftime('%m_%d_%Y_%H_%M_%S')} | |
| self.filename = EDF_PATH / 'recording.edf' | |
| self._p_capture = None | |
| self.__capture_on = False | |
| self.frequency = 250 | |
| self.duration = 28800 | |
| self.power_line = 60 | |
| self.polyak_mean = 0.1 | |
| self.polyak_std = 0.001 | |
| self.epsilon = 0.000001 | |
| self.custom_fir = False | |
| self.custom_fir_order = 20 | |
| self.custom_fir_cutoff = 30 | |
| self.filter = True | |
| self.filter_args = [True, True, True] | |
| self.record = False | |
| self.detect = False | |
| self.stimulate = False | |
| self.threshold = 0.82 | |
| self.lsl = False | |
| self.display = False | |
| self.signal_input = "ADS" | |
| self.python_clock = True | |
| self.edf_writer = None | |
| self.edf_buffer = [] | |
| self.signal_labels = ['Common Mode', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8'] | |
| self._lock_msg_out = Lock() | |
| self._msg_out = None | |
| self._t_capture = None | |
| self.channel_states = ['disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled'] | |
| self.channel_detection = 2 | |
| self.spindle_detection_mode = 'Fast' | |
| self.spindle_freq = 10 | |
| self.detector_cls = detector_cls | |
| self.stimulator_cls = stimulator_cls | |
| self._test_stimulus_lock = Lock() | |
| self._test_stimulus = False | |
| self._pause_detect_lock = Lock() | |
| self._pause_detect = True | |
| if ADS: | |
| try: | |
| mixers = alsaaudio.mixers() | |
| if len(mixers) <= 0: | |
| warnings.warn(f"No ALSA mixer found.") | |
| self.mixer = DummyAlsaMixer() | |
| elif 'PCM' in mixers: | |
| self.mixer = alsaaudio.Mixer(control='PCM') | |
| else: | |
| warnings.warn(f"Could not find mixer PCM, using {mixers[0]} instead.") | |
| self.mixer = alsaaudio.Mixer(control=mixers[0]) | |
| except ALSAAudioError as e: | |
| warnings.warn(f"No ALSA mixer found.") | |
| self.mixer = DummyAlsaMixer() | |
| self.volume = self.mixer.getvolume()[0] # we will set the same volume on all channels | |
| else: | |
| self.mixer = DummyAlsaMixer() | |
| self.volume = self.mixer.getvolume()[0] | |
| # widgets =============================== | |
| # CHANNELS ------------------------------ | |
| # self.b_radio_ch1 = widgets.RadioButtons( | |
| # options=['disabled', 'simple'], | |
| # value='disabled', | |
| # disabled=True | |
| # ) | |
| self.b_radio_ch2 = widgets.RadioButtons( | |
| options=['disabled', 'simple'], | |
| value='disabled', | |
| disabled=False | |
| ) | |
| self.b_radio_ch3 = widgets.RadioButtons( | |
| options=['disabled', 'simple'], | |
| value='disabled', | |
| disabled=False | |
| ) | |
| self.b_radio_ch4 = widgets.RadioButtons( | |
| options=['disabled', 'simple'], | |
| value='disabled', | |
| disabled=False | |
| ) | |
| self.b_radio_ch5 = widgets.RadioButtons( | |
| options=['disabled', 'simple'], | |
| value='disabled', | |
| disabled=False | |
| ) | |
| self.b_radio_ch6 = widgets.RadioButtons( | |
| options=['disabled', 'simple'], | |
| value='disabled', | |
| disabled=False | |
| ) | |
| self.b_radio_ch7 = widgets.RadioButtons( | |
| options=['disabled', 'simple'], | |
| value='disabled', | |
| disabled=False | |
| ) | |
| self.b_radio_ch8 = widgets.RadioButtons( | |
| options=['disabled', 'simple'], | |
| value='disabled', | |
| disabled=False | |
| ) | |
| self.b_channel_detect = widgets.Dropdown( | |
| options=[('2', 2), ('3', 3), ('4', 4), ('5', 5), ('6', 6), ('7', 7), ('8', 8)], | |
| value=2, | |
| description='Detection Channel:', | |
| disabled=False, | |
| style={'description_width': 'initial'} | |
| ) | |
| self.b_spindle_mode = widgets.Dropdown( | |
| options=['Fast', 'Peak', 'Through'], | |
| value='Fast', | |
| description='Spindle Stimulation Mode', | |
| disabled=False, | |
| style={'description_width': 'initial'} | |
| ) | |
| self.b_spindle_freq = widgets.IntText( | |
| value=self.spindle_freq, | |
| description='Spindle Freq (Hz):', | |
| disabled=False, | |
| style={'description_width': 'initial'} | |
| ) | |
| self.b_accordion_channels = widgets.Accordion( | |
| children=[ | |
| widgets.GridBox([ | |
| widgets.Label('CH2'), | |
| widgets.Label('CH3'), | |
| widgets.Label('CH4'), | |
| widgets.Label('CH5'), | |
| widgets.Label('CH6'), | |
| widgets.Label('CH7'), | |
| widgets.Label('CH8'), | |
| self.b_radio_ch2, | |
| self.b_radio_ch3, | |
| self.b_radio_ch4, | |
| self.b_radio_ch5, | |
| self.b_radio_ch6, | |
| self.b_radio_ch7, | |
| self.b_radio_ch8], layout=widgets.Layout(grid_template_columns="repeat(7, 90px)") | |
| ) | |
| ]) | |
| self.b_accordion_channels.set_title(index = 0, title = 'Channels') | |
| # OTHERS ------------------------------ | |
| self.b_capture = widgets.ToggleButtons( | |
| options=['Stop', 'Start'], | |
| description='Capture:', | |
| disabled=False, | |
| button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
| tooltips=['Stop capture', 'Start capture'], | |
| ) | |
| self.b_pause = widgets.ToggleButtons( | |
| options=['Paused', 'Active'], | |
| description='Detection', | |
| disabled=True, | |
| button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
| tooltips=['Detector and stimulator active', 'Detector and stimulator paused'], | |
| ) | |
| self.b_clock = widgets.ToggleButtons( | |
| options=['ADS', 'Coral'], | |
| description='Clock:', | |
| disabled=False, | |
| button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
| tooltips=['Use Coral clock (very precise, not very timely)', | |
| 'Use ADS clock (not very precise, very timely)'], | |
| ) | |
| self.b_power_line = widgets.ToggleButtons( | |
| options=['60 Hz', '50 Hz'], | |
| description='Power line:', | |
| disabled=False, | |
| button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
| tooltips=['North America 60 Hz', | |
| 'Europe 50 Hz'], | |
| ) | |
| self.b_signal_input = widgets.ToggleButtons( | |
| options=['ADS', 'File'], | |
| description='Signal Input:', | |
| disabled=False, | |
| button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
| tooltips=['Read data from ADS.', | |
| 'Read data from file.'], | |
| ) | |
| self.b_custom_fir = widgets.ToggleButtons( | |
| options=['Default', 'Custom'], | |
| description='FIR filter:', | |
| disabled=False, | |
| button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
| tooltips=['Use the default 30Hz low-pass FIR from the Portiloop paper', | |
| 'Use a custom FIR'], | |
| ) | |
| self.b_filename = widgets.Text( | |
| value='recording.edf', | |
| description='Recording:', | |
| disabled=False | |
| ) | |
| self.b_frequency = widgets.IntText( | |
| value=self.frequency, | |
| description='Freq (Hz):', | |
| disabled=False | |
| ) | |
| self.b_threshold = widgets.FloatText( | |
| value=self.threshold, | |
| description='Threshold:', | |
| disabled=True | |
| ) | |
| self.b_polyak_mean = widgets.FloatText( | |
| value=self.polyak_mean, | |
| description='Polyak mean:', | |
| disabled=False | |
| ) | |
| self.b_polyak_std = widgets.FloatText( | |
| value=self.polyak_std, | |
| description='Polyak std:', | |
| disabled=False | |
| ) | |
| self.b_epsilon = widgets.FloatText( | |
| value=self.epsilon, | |
| description='Epsilon:', | |
| disabled=False | |
| ) | |
| self.b_custom_fir_order = widgets.IntText( | |
| value=self.custom_fir_order, | |
| description='FIR order:', | |
| disabled=True | |
| ) | |
| self.b_custom_fir_cutoff = widgets.IntText( | |
| value=self.custom_fir_cutoff, | |
| description='FIR cutoff:', | |
| disabled=True | |
| ) | |
| self.b_use_fir = widgets.Checkbox( | |
| value=self.filter_args[0], | |
| description='Use FIR', | |
| disabled=False, | |
| indent=False | |
| ) | |
| self.b_use_notch = widgets.Checkbox( | |
| value=self.filter_args[1], | |
| description='Use notch', | |
| disabled=False, | |
| indent=False | |
| ) | |
| self.b_use_std = widgets.Checkbox( | |
| value=self.filter_args[2], | |
| description='Use standardization', | |
| disabled=False, | |
| indent=False | |
| ) | |
| self.b_accordion_filter = widgets.Accordion( | |
| children=[ | |
| widgets.VBox([ | |
| self.b_custom_fir, | |
| self.b_custom_fir_order, | |
| self.b_custom_fir_cutoff, | |
| self.b_polyak_mean, | |
| self.b_polyak_std, | |
| self.b_epsilon, | |
| widgets.HBox([ | |
| self.b_use_fir, | |
| self.b_use_notch, | |
| self.b_use_std | |
| ]) | |
| ]) | |
| ]) | |
| self.b_accordion_filter.set_title(index = 0, title = 'Filtering') | |
| self.b_duration = widgets.IntText( | |
| value=self.duration, | |
| description='Time (s):', | |
| disabled=False | |
| ) | |
| self.b_filter = widgets.Checkbox( | |
| value=self.filter, | |
| description='Filter', | |
| disabled=False, | |
| indent=False | |
| ) | |
| self.b_detect = widgets.Checkbox( | |
| value=self.detect, | |
| description='Detect', | |
| disabled=False, | |
| indent=False | |
| ) | |
| self.b_stimulate = widgets.Checkbox( | |
| value=self.stimulate, | |
| description='Stimulate', | |
| disabled=True, | |
| indent=False | |
| ) | |
| self.b_record = widgets.Checkbox( | |
| value=self.record, | |
| description='Record EDF', | |
| disabled=False, | |
| indent=False | |
| ) | |
| self.b_lsl = widgets.Checkbox( | |
| value=self.lsl, | |
| description='Stream LSL', | |
| disabled=False, | |
| indent=False | |
| ) | |
| self.b_display = widgets.Checkbox( | |
| value=self.display, | |
| description='Display', | |
| disabled=False, | |
| indent=False | |
| ) | |
| self.b_volume = widgets.IntSlider( | |
| value=self.volume, | |
| min=0, | |
| max=100, | |
| step=1, | |
| description="Volume", | |
| disabled=False | |
| ) | |
| self.b_test_stimulus = widgets.Button( | |
| description='Test stimulus', | |
| disabled=True, | |
| button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
| tooltip='Send a test stimulus' | |
| ) | |
| self.b_test_impedance = widgets.Button( | |
| description='Impedance Check', | |
| disabled=False, | |
| button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
| tooltip='Check if electrodes are properly connected' | |
| ) | |
| # CALLBACKS ---------------------- | |
| self.b_capture.observe(self.on_b_capture, 'value') | |
| self.b_clock.observe(self.on_b_clock, 'value') | |
| self.b_signal_input.observe(self.on_b_signal_input, 'value') | |
| self.b_frequency.observe(self.on_b_frequency, 'value') | |
| self.b_threshold.observe(self.on_b_threshold, 'value') | |
| self.b_duration.observe(self.on_b_duration, 'value') | |
| self.b_filter.observe(self.on_b_filter, 'value') | |
| self.b_use_fir.observe(self.on_b_use_fir, 'value') | |
| self.b_use_notch.observe(self.on_b_use_notch, 'value') | |
| self.b_use_std.observe(self.on_b_use_std, 'value') | |
| self.b_detect.observe(self.on_b_detect, 'value') | |
| self.b_stimulate.observe(self.on_b_stimulate, 'value') | |
| self.b_record.observe(self.on_b_record, 'value') | |
| self.b_lsl.observe(self.on_b_lsl, 'value') | |
| self.b_display.observe(self.on_b_display, 'value') | |
| self.b_filename.observe(self.on_b_filename, 'value') | |
| self.b_radio_ch2.observe(self.on_b_radio_ch2, 'value') | |
| self.b_radio_ch3.observe(self.on_b_radio_ch3, 'value') | |
| self.b_radio_ch4.observe(self.on_b_radio_ch4, 'value') | |
| self.b_radio_ch5.observe(self.on_b_radio_ch5, 'value') | |
| self.b_radio_ch6.observe(self.on_b_radio_ch6, 'value') | |
| self.b_radio_ch7.observe(self.on_b_radio_ch7, 'value') | |
| self.b_radio_ch8.observe(self.on_b_radio_ch8, 'value') | |
| self.b_channel_detect.observe(self.on_b_channel_detect, 'value') | |
| self.b_spindle_mode.observe(self.on_b_spindle_mode, 'value') | |
| self.b_spindle_freq.observe(self.on_b_spindle_freq, 'value') | |
| self.b_power_line.observe(self.on_b_power_line, 'value') | |
| self.b_signal_input.observe(self.on_b_power_line, 'value') | |
| self.b_custom_fir.observe(self.on_b_custom_fir, 'value') | |
| self.b_custom_fir_order.observe(self.on_b_custom_fir_order, 'value') | |
| self.b_custom_fir_cutoff.observe(self.on_b_custom_fir_cutoff, 'value') | |
| self.b_polyak_mean.observe(self.on_b_polyak_mean, 'value') | |
| self.b_polyak_std.observe(self.on_b_polyak_std, 'value') | |
| self.b_epsilon.observe(self.on_b_epsilon, 'value') | |
| self.b_volume.observe(self.on_b_volume, 'value') | |
| self.b_test_stimulus.on_click(self.on_b_test_stimulus) | |
| self.b_test_impedance.on_click(self.on_b_test_impedance) | |
| self.b_pause.observe(self.on_b_pause, 'value') | |
| self.display_buttons() | |
| def __del__(self): | |
| self.b_capture.close() | |
| def display_buttons(self): | |
| display(widgets.VBox([self.b_accordion_channels, | |
| self.b_channel_detect, | |
| self.b_frequency, | |
| self.b_duration, | |
| self.b_filename, | |
| self.b_signal_input, | |
| self.b_power_line, | |
| self.b_clock, | |
| widgets.HBox([self.b_filter, self.b_detect, self.b_stimulate, self.b_record, self.b_lsl, self.b_display]), | |
| widgets.HBox([self.b_threshold, self.b_test_stimulus]), | |
| self.b_volume, | |
| widgets.HBox([self.b_spindle_mode, self.b_spindle_freq]), | |
| self.b_test_impedance, | |
| self.b_accordion_filter, | |
| self.b_capture, | |
| self.b_pause])) | |
| def enable_buttons(self): | |
| self.b_frequency.disabled = False | |
| self.b_duration.disabled = False | |
| self.b_filename.disabled = False | |
| self.b_filter.disabled = False | |
| self.b_detect.disabled = False | |
| self.b_record.disabled = False | |
| self.b_lsl.disabled = False | |
| self.b_display.disabled = False | |
| self.b_clock.disabled = False | |
| self.b_radio_ch2.disabled = False | |
| self.b_radio_ch3.disabled = False | |
| self.b_radio_ch4.disabled = False | |
| self.b_radio_ch5.disabled = False | |
| self.b_radio_ch6.disabled = False | |
| self.b_radio_ch7.disabled = False | |
| self.b_radio_ch8.disabled = False | |
| self.b_power_line.disabled = False | |
| self.b_signal_input.disabled = False | |
| self.b_channel_detect.disabled = False | |
| self.b_spindle_freq.disabled = False | |
| self.b_spindle_mode.disabled = False | |
| self.b_polyak_mean.disabled = False | |
| self.b_polyak_std.disabled = False | |
| self.b_epsilon.disabled = False | |
| self.b_use_fir.disabled = False | |
| self.b_use_notch.disabled = False | |
| self.b_use_std.disabled = False | |
| self.b_custom_fir.disabled = False | |
| self.b_custom_fir_order.disabled = not self.custom_fir | |
| self.b_custom_fir_cutoff.disabled = not self.custom_fir | |
| self.b_stimulate.disabled = not self.detect | |
| self.b_threshold.disabled = not self.detect | |
| self.b_pause.disabled = not self.detect | |
| self.b_test_stimulus.disabled = True # only enabled when running | |
| self.b_test_impedance.disabled = False | |
| def disable_buttons(self): | |
| self.b_frequency.disabled = True | |
| self.b_duration.disabled = True | |
| self.b_filename.disabled = True | |
| self.b_filter.disabled = True | |
| self.b_stimulate.disabled = True | |
| self.b_filter.disabled = True | |
| self.b_detect.disabled = True | |
| self.b_record.disabled = True | |
| self.b_lsl.disabled = True | |
| self.b_display.disabled = True | |
| self.b_clock.disabled = True | |
| self.b_radio_ch2.disabled = True | |
| self.b_radio_ch3.disabled = True | |
| self.b_radio_ch4.disabled = True | |
| self.b_radio_ch5.disabled = True | |
| self.b_radio_ch6.disabled = True | |
| self.b_radio_ch7.disabled = True | |
| self.b_radio_ch8.disabled = True | |
| self.b_channel_detect.disabled = True | |
| self.b_spindle_freq.disabled = True | |
| self.b_spindle_mode.disabled = True | |
| self.b_signal_input.disabled = True | |
| self.b_power_line.disabled = True | |
| self.b_polyak_mean.disabled = True | |
| self.b_polyak_std.disabled = True | |
| self.b_epsilon.disabled = True | |
| self.b_use_fir.disabled = True | |
| self.b_use_notch.disabled = True | |
| self.b_use_std.disabled = True | |
| self.b_custom_fir.disabled = True | |
| self.b_custom_fir_order.disabled = True | |
| self.b_custom_fir_cutoff.disabled = True | |
| self.b_threshold.disabled = True | |
| self.b_test_stimulus.disabled = not self.stimulate # only enabled when running | |
| self.b_test_impedance.disabled = True | |
| def on_b_radio_ch2(self, value): | |
| self.channel_states[0] = value['new'] | |
| def on_b_radio_ch3(self, value): | |
| self.channel_states[1] = value['new'] | |
| def on_b_radio_ch4(self, value): | |
| self.channel_states[2] = value['new'] | |
| def on_b_radio_ch5(self, value): | |
| self.channel_states[3] = value['new'] | |
| def on_b_radio_ch6(self, value): | |
| self.channel_states[4] = value['new'] | |
| def on_b_radio_ch7(self, value): | |
| self.channel_states[5] = value['new'] | |
| def on_b_radio_ch8(self, value): | |
| self.channel_states[6] = value['new'] | |
| def on_b_channel_detect(self, value): | |
| self.channel_detection = value['new'] | |
| def on_b_spindle_freq(self, value): | |
| val = value['new'] | |
| if val > 0: | |
| self.spindle_freq = val | |
| else: | |
| self.b_spindle_freq.value = self.spindle_freq | |
| def on_b_spindle_mode(self, value): | |
| self.spindle_detection_mode = value['new'] | |
| def on_b_capture(self, value): | |
| val = value['new'] | |
| if val == 'Start': | |
| clear_output() | |
| self.disable_buttons() | |
| if not self.python_clock: # ADS clock: force the frequency to an ADS-compatible frequency | |
| self.frequency = to_ads_frequency(self.frequency) | |
| self.b_frequency.value = self.frequency | |
| self.display_buttons() | |
| with self._lock_msg_out: | |
| self._msg_out = None | |
| if self._t_capture is not None: | |
| warnings.warn("Capture already running, operation aborted.") | |
| return | |
| detector_cls = self.detector_cls if self.detect else None | |
| stimulator_cls = self.stimulator_cls if self.stimulate else None | |
| self._t_capture = Thread(target=self.start_capture, | |
| args=(self.filter, | |
| self.filter_args, | |
| detector_cls, | |
| self.threshold, | |
| self.channel_detection, | |
| stimulator_cls, | |
| self.record, | |
| self.lsl, | |
| self.display, | |
| 2500, | |
| self.python_clock)) | |
| self._t_capture.start() | |
| elif val == 'Stop': | |
| with self._lock_msg_out: | |
| self._msg_out = 'STOP' | |
| assert self._t_capture is not None | |
| self._t_capture.join() | |
| self._t_capture = None | |
| self.enable_buttons() | |
| def on_b_custom_fir(self, value): | |
| val = value['new'] | |
| if val == 'Default': | |
| self.custom_fir = False | |
| elif val == 'Custom': | |
| self.custom_fir = True | |
| self.enable_buttons() | |
| def on_b_clock(self, value): | |
| val = value['new'] | |
| if val == 'Coral': | |
| self.python_clock = True | |
| elif val == 'ADS': | |
| self.python_clock = False | |
| def on_b_signal_input(self, value): | |
| val = value['new'] | |
| if val == "ADS": | |
| self.signal_input = "ADS" | |
| elif val == "File": | |
| self.signal_input = "File" | |
| def on_b_power_line(self, value): | |
| val = value['new'] | |
| if val == '60 Hz': | |
| self.power_line = 60 | |
| elif val == '50 Hz': | |
| self.power_line = 50 | |
| def on_b_frequency(self, value): | |
| val = value['new'] | |
| if val > 0: | |
| self.frequency = val | |
| else: | |
| self.b_frequency.value = self.frequency | |
| def on_b_threshold(self, value): | |
| val = value['new'] | |
| if val >= 0 and val <= 1: | |
| self.threshold = val | |
| else: | |
| self.b_threshold.value = self.threshold | |
| def on_b_filename(self, value): | |
| val = value['new'] | |
| if val != '': | |
| if not val.endswith('.edf'): | |
| val += '.edf' | |
| self.filename = EDF_PATH / val | |
| else: | |
| now = datetime.now() | |
| self.filename = EDF_PATH / 'recording.edf' | |
| def on_b_duration(self, value): | |
| val = value['new'] | |
| if val > 0: | |
| self.duration = val | |
| def on_b_custom_fir_order(self, value): | |
| val = value['new'] | |
| if val > 0: | |
| self.custom_fir_order = val | |
| else: | |
| self.b_custom_fir_order.value = self.custom_fir_order | |
| def on_b_custom_fir_cutoff(self, value): | |
| val = value['new'] | |
| if val > 0 and val < self.frequency / 2: | |
| self.custom_fir_cutoff = val | |
| else: | |
| self.b_custom_fir_cutoff.value = self.custom_fir_cutoff | |
| def on_b_polyak_mean(self, value): | |
| val = value['new'] | |
| if val >= 0 and val <= 1: | |
| self.polyak_mean = val | |
| else: | |
| self.b_polyak_mean.value = self.polyak_mean | |
| def on_b_polyak_std(self, value): | |
| val = value['new'] | |
| if val >= 0 and val <= 1: | |
| self.polyak_std = val | |
| else: | |
| self.b_polyak_std.value = self.polyak_std | |
| def on_b_epsilon(self, value): | |
| val = value['new'] | |
| if val > 0 and val < 0.1: | |
| self.epsilon = val | |
| else: | |
| self.b_epsilon.value = self.epsilon | |
| def on_b_filter(self, value): | |
| val = value['new'] | |
| self.filter = val | |
| def on_b_use_fir(self, value): | |
| val = value['new'] | |
| self.filter_args[0] = val | |
| def on_b_use_notch(self, value): | |
| val = value['new'] | |
| self.filter_args[1] = val | |
| def on_b_use_std(self, value): | |
| val = value['new'] | |
| self.filter_args[2] = val | |
| def on_b_stimulate(self, value): | |
| val = value['new'] | |
| self.stimulate = val | |
| def on_b_detect(self, value): | |
| val = value['new'] | |
| self.detect = val | |
| self.enable_buttons() | |
| def on_b_record(self, value): | |
| val = value['new'] | |
| self.record = val | |
| def on_b_lsl(self, value): | |
| val = value['new'] | |
| self.lsl = val | |
| def on_b_display(self, value): | |
| val = value['new'] | |
| self.display = val | |
| def on_b_volume(self, value): | |
| val = value['new'] | |
| if val >= 0 and val <= 100: | |
| self.volume = val | |
| self.mixer.setvolume(self.volume) | |
| def on_b_test_stimulus(self, b): | |
| with self._test_stimulus_lock: | |
| self._test_stimulus = True | |
| def on_b_test_impedance(self, b): | |
| frontend = Frontend() | |
| def is_set(x, n): | |
| return x & 1 << n != 0 | |
| try: | |
| frontend.write_regs(0x00, LEADOFF_CONFIG) | |
| frontend.start() | |
| start_time = time.time() | |
| current_time = time.time() | |
| while current_time - start_time < 2: | |
| current_time = time.time() | |
| reading = frontend.read() | |
| # Check if any of the negative bits are set and initialize the impedance array | |
| # impedance_check = [any([is_set(leadoff_n, i) for i in range(2, 9)])] | |
| impedance_check = [any([reading.loff_n(i) for i in range(7)])] | |
| for i in range(7): | |
| impedance_check.append(reading.loff_p(i)) | |
| def print_impedance(impedance): | |
| names = ["Ref", "Ch2", "Ch3", "Ch4", "Ch5", "Ch6", "Ch7", "Ch8"] | |
| vals = [' Y ' if val else ' N ' for val in impedance] | |
| print(' '.join(str(name) for name in names)) | |
| print(' '.join(str(val) for val in vals)) | |
| print_impedance(impedance_check) | |
| finally: | |
| frontend.close() | |
| def on_b_pause(self, value): | |
| val = value['new'] | |
| if val == 'Active': | |
| with self._pause_detect_lock: | |
| self._pause_detect = False | |
| elif val == 'Paused': | |
| with self._pause_detect_lock: | |
| self._pause_detect = True | |
| def start_capture(self, | |
| filter, | |
| filter_args, | |
| detector_cls, | |
| threshold, | |
| channel, | |
| stimulator_cls, | |
| record, | |
| lsl, | |
| viz, | |
| width, | |
| python_clock): | |
| if self.signal_input == "ADS": | |
| if self.__capture_on: | |
| warnings.warn("Capture is already ongoing, ignoring command.") | |
| return | |
| else: | |
| self.__capture_on = True | |
| p_msg_io, p_msg_io_2 = mp.Pipe() | |
| p_data_i, p_data_o = mp.Pipe(duplex=False) | |
| else: | |
| p_msg_io, _ = mp.Pipe() | |
| # Initialize filtering pipeline | |
| if filter: | |
| fp = FilterPipeline(nb_channels=8, | |
| sampling_rate=self.frequency, | |
| power_line_fq=self.power_line, | |
| use_custom_fir=self.custom_fir, | |
| custom_fir_order=self.custom_fir_order, | |
| custom_fir_cutoff=self.custom_fir_cutoff, | |
| alpha_avg=self.polyak_mean, | |
| alpha_std=self.polyak_std, | |
| epsilon=self.epsilon, | |
| filter_args=filter_args) | |
| # Initialize detector and stimulator | |
| detector = detector_cls(threshold, channel=channel) if detector_cls is not None else None | |
| stimulator = stimulator_cls() if stimulator_cls is not None else None | |
| # Launch the capture process | |
| if self.signal_input == "ADS": | |
| self._p_capture = mp.Process(target=capture_process, | |
| args=(p_data_o, | |
| p_msg_io_2, | |
| self.duration, | |
| self.frequency, | |
| python_clock, | |
| 1.0, | |
| self.channel_states) | |
| ) | |
| self._p_capture.start() | |
| print(f"PID capture: {self._p_capture.pid}") | |
| else: | |
| filename = RECORDING_PATH / 'test_recording.csv' | |
| file_reader = FileReader(filename) | |
| # Initialize display if requested | |
| if viz: | |
| live_disp = LiveDisplay(channel_names = self.signal_labels, window_len=width) | |
| # Initialize recording if requested | |
| if record: | |
| recorder = EDFRecorder(self.signal_labels, self.filename, self.frequency) | |
| recorder.open_recording_file() | |
| # Initialize LSL to stream if requested | |
| if lsl: | |
| from pylsl import StreamInfo, StreamOutlet | |
| lsl_info = StreamInfo(name='Portiloop Filtered', | |
| type='Filtered EEG', | |
| channel_count=8, | |
| nominal_srate=self.frequency, | |
| channel_format='float32', | |
| source_id='portiloop1') # TODO: replace this by unique device identifier | |
| lsl_outlet = StreamOutlet(lsl_info) | |
| lsl_info_raw = StreamInfo(name='Portiloop Raw Data', | |
| type='Raw EEG signal', | |
| channel_count=8, | |
| nominal_srate=self.frequency, | |
| channel_format='float32', | |
| source_id='portiloop1') # TODO: replace this by unique device identifier | |
| lsl_outlet_raw = StreamOutlet(lsl_info_raw) | |
| buffer = [] | |
| # Initialize stimulation delayer if requested | |
| if not self.spindle_detection_mode == 'Fast' and stimulator is not None: | |
| stimulation_delayer = UpStateDelayer(self.frequency, self.spindle_detection_mode == 'Peak', 0.3) | |
| stimulator.add_delayer(stimulation_delayer) | |
| else: | |
| stimulation_delayer = None | |
| # Main capture loop | |
| while True: | |
| if self.signal_input == "ADS": | |
| # Send message in communication pipe if we have one | |
| with self._lock_msg_out: | |
| if self._msg_out is not None: | |
| p_msg_io.send(self._msg_out) | |
| self._msg_out = None | |
| # Check if we have received a message in communication pipe | |
| if p_msg_io.poll(): | |
| mess = p_msg_io.recv() | |
| if mess == 'STOP': | |
| break | |
| elif mess[0] == 'PRT': | |
| print(mess[1]) | |
| # Retrieve all data points from data pipe p_data | |
| point = None | |
| if p_data_i.poll(timeout=(1 / self.frequency)): | |
| point = p_data_i.recv() | |
| else: | |
| continue | |
| # Convert point from int to corresponding value in microvolts | |
| n_array_raw = int_to_float(np.array([point])) | |
| elif self.signal_input == "File": | |
| # Check if the message to stop has been sent | |
| with self._lock_msg_out: | |
| if self._msg_out == "STOP": | |
| break | |
| file_point = file_reader.get_point() | |
| if file_point is None: | |
| break | |
| index, raw_point, off_filtered_point, past_stimulation, lacourse_stimulation = file_point | |
| n_array_raw = np.array([0, raw_point, 0, 0, 0, 0, 0, 0]) | |
| n_array_raw = np.reshape(n_array_raw, (1, 8)) | |
| # Go through filtering pipeline | |
| if filter: | |
| n_array = fp.filter(deepcopy(n_array_raw)) | |
| else: | |
| n_array = deepcopy(n_array_raw) | |
| # Contains the filtered point (if filtering is off, contains a copy of the raw point) | |
| filtered_point = n_array.tolist() | |
| # Send both raw and filtered points over LSL | |
| if lsl: | |
| raw_point = n_array_raw.tolist() | |
| lsl_outlet_raw.push_sample(raw_point[-1]) | |
| lsl_outlet.push_sample(filtered_point[-1]) | |
| # Adds point to buffer for delayed stimulation | |
| if stimulation_delayer is not None: | |
| stimulation_delayer.step_timesteps(filtered_point[0][channel-1]) | |
| # Check if detection is on or off | |
| with self._pause_detect_lock: | |
| pause = self._pause_detect | |
| # If detection is on | |
| if detector is not None and not pause: | |
| # Detect using the latest point | |
| detection_signal = detector.detect(filtered_point) | |
| # Stimulate | |
| if stimulator is not None: | |
| stimulator.stimulate(detection_signal) | |
| with self._test_stimulus_lock: | |
| test_stimulus = self._test_stimulus | |
| self._test_stimulus = False | |
| if test_stimulus: | |
| stimulator.test_stimulus() | |
| # Send the stimulation from the file reader | |
| if stimulator is not None: | |
| if self.signal_input == "File" and lacourse_stimulation: | |
| stimulator.send_stimulation("GROUND_TRUTH_STIM", False) | |
| # Add point to the buffer to send to viz and recorder | |
| buffer += filtered_point | |
| if len(buffer) >= 50: | |
| if viz: | |
| live_disp.add_datapoints(buffer) | |
| if record: | |
| recorder.add_recording_data(buffer) | |
| buffer = [] | |
| if self.signal_input == "ADS": | |
| # Empty pipes | |
| while True: | |
| if p_data_i.poll(): | |
| _ = p_data_i.recv() | |
| elif p_msg_io.poll(): | |
| _ = p_msg_io.recv() | |
| else: | |
| break | |
| p_data_i.close() | |
| p_msg_io.close() | |
| self._p_capture.join() | |
| self.__capture_on = False | |
| if record: | |
| recorder.close_recording_file() | |
| if __name__ == "__main__": | |
| pass | |