Spaces:
Build error
Build error
| from EDFlib.edfwriter import EDFwriter | |
| from portilooplot.jupyter_plot import ProgressPlot | |
| from pathlib import Path | |
| import numpy as np | |
| EDF_PATH = Path.home() / 'workspace' / 'edf_recording' | |
| class DummyAlsaMixer: | |
| def __init__(self): | |
| self.volume = 50 | |
| def getvolume(self): | |
| return [self.volume] | |
| def setvolume(self, volume): | |
| self.volume = volume | |
| class EDFRecorder: | |
| def __init__(self, signal_labels): | |
| self.filename = EDF_PATH / 'recording.edf' | |
| self.nb_signals = 8 | |
| self.samples_per_datarecord_array = self.frequency | |
| self.physical_max = 5 | |
| self.physical_min = -5 | |
| self.signal_labels = signal_labels | |
| def open_recording_file(self): | |
| nb_signals = self.nb_signals | |
| samples_per_datarecord_array = self.samples_per_datarecord_array | |
| physical_max = self.physical_max | |
| physical_min = self.physical_min | |
| signal_labels = self.signal_labels | |
| print(f"Will store edf recording in {self.filename}") | |
| self.edf_writer = EDFwriter(p_path=str(self.filename), | |
| f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS, | |
| number_of_signals=nb_signals) | |
| for signal in range(nb_signals): | |
| assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0 | |
| assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0 | |
| assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0 | |
| assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0 | |
| assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0 | |
| assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0 | |
| assert self.edf_writer.setPhysicalDimension(signal, 'V') == 0 | |
| def close_recording_file(self): | |
| assert self.edf_writer.close() == 0 | |
| def add_recording_data(self, data): | |
| self.edf_buffer += data | |
| if len(self.edf_buffer) >= self.samples_per_datarecord_array: | |
| datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array] | |
| self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:] | |
| datarecord_array = np.array(datarecord_array).transpose() | |
| assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}" | |
| for d in datarecord_array: | |
| assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}" | |
| assert self.edf_writer.writeSamples(d) == 0 | |
| class LiveDisplay(): | |
| def __init__(self, channel_names, window_len=100): | |
| self.datapoint_dim = len(channel_names) | |
| self.history = [] | |
| self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len) | |
| self.matplotlib = False | |
| def add_datapoints(self, datapoints): | |
| """ | |
| Adds 8 lists of datapoints to the plot | |
| Args: | |
| datapoints: list of 8 lists of floats (or list of 8 floats) | |
| """ | |
| if self.matplotlib: | |
| import matplotlib.pyplot as plt | |
| disp_list = [] | |
| for datapoint in datapoints: | |
| d = [[elt] for elt in datapoint] | |
| disp_list.append(d) | |
| if self.matplotlib: | |
| self.history += d[1] | |
| if not self.matplotlib: | |
| self.pp.update_with_datapoints(disp_list) | |
| elif len(self.history) == 1000: | |
| plt.plot(self.history) | |
| plt.show() | |
| self.history = [] | |
| def add_datapoint(self, datapoint): | |
| disp_list = [[elt] for elt in datapoint] | |
| self.pp.update(disp_list) | |
| class FileReader: | |
| def __init__(self, filename): | |
| raise NotImplementedError | |
| def get_point(self): | |
| raise NotImplementedError | |