Spaces:
Build error
Build error
| from abc import ABC, abstractmethod | |
| import time | |
| from threading import Thread, Lock | |
| from pathlib import Path | |
| import alsaaudio | |
| import wave | |
| import pylsl | |
| # Abstract interface for developers: | |
| class Stimulator(ABC): | |
| def stimulate(self, detection_signal): | |
| """ | |
| Stimulates accordingly to the output of the Detector. | |
| Args: | |
| detection_signal: Object: the output of the Detector.add_datapoints method. | |
| """ | |
| raise NotImplementedError | |
| def test_stimulus(self): | |
| """ | |
| Optional: this is called when the 'Test stimulus' button is pressed. | |
| """ | |
| pass | |
| # Example implementation for sleep spindles | |
| class SleepSpindleRealTimeStimulator(Stimulator): | |
| def __init__(self): | |
| self._sound = Path(__file__).parent / 'sounds' / 'stimulus.wav' | |
| print(f"DEBUG:{self._sound}") | |
| self._thread = None | |
| self._lock = Lock() | |
| self.last_detected_ts = time.time() | |
| self.wait_t = 0.4 # 400 ms | |
| lsl_markers_info = pylsl.StreamInfo(name='Portiloop_stimuli', | |
| type='Markers', | |
| channel_count=1, | |
| channel_format='string', | |
| source_id='portiloop1') # TODO: replace this by unique device identifier | |
| self.lsl_outlet_markers = pylsl.StreamOutlet(lsl_markers_info) | |
| # Initialize Alsa stuff | |
| # Open WAV file and set PCM device | |
| with wave.open(str(self._sound), 'rb') as f: | |
| device = 'default' | |
| format = None | |
| # 8bit is unsigned in wav files | |
| if f.getsampwidth() == 1: | |
| format = alsaaudio.PCM_FORMAT_U8 | |
| # Otherwise we assume signed data, little endian | |
| elif f.getsampwidth() == 2: | |
| format = alsaaudio.PCM_FORMAT_S16_LE | |
| elif f.getsampwidth() == 3: | |
| format = alsaaudio.PCM_FORMAT_S24_3LE | |
| elif f.getsampwidth() == 4: | |
| format = alsaaudio.PCM_FORMAT_S32_LE | |
| else: | |
| raise ValueError('Unsupported format') | |
| self.periodsize = f.getframerate() // 8 | |
| self.pcm = alsaaudio.PCM(channels=f.getnchannels(), rate=f.getframerate(), format=format, periodsize=self.periodsize, device=device) | |
| # Store data in list to avoid reopening the file | |
| data = f.readframes(self.periodsize) | |
| self.wav_list = [data] | |
| while data: | |
| self.wav_list.append(data) | |
| data = f.readframes(self.periodsize) | |
| def play_sound(self): | |
| ''' | |
| Open the wav file and play a sound | |
| ''' | |
| for data in self.wav_list: | |
| self.pcm.write(data) | |
| def stimulate(self, detection_signal): | |
| for sig in detection_signal: | |
| if sig: | |
| ts = time.time() | |
| if ts - self.last_detected_ts > self.wait_t: | |
| with self._lock: | |
| if self._thread is None: | |
| self._thread = Thread(target=self._t_sound, daemon=True) | |
| self._thread.start() | |
| self.last_detected_ts = ts | |
| def _t_sound(self): | |
| self.lsl_outlet_markers.push_sample(['STIM']) | |
| self.play_sound() | |
| with self._lock: | |
| self._thread = None | |
| def test_stimulus(self): | |
| with self._lock: | |
| if self._thread is None: | |
| self._thread = Thread(target=self._t_sound, daemon=True) | |
| self._thread.start() | |