Spaces:
Build error
Build error
| # from EDFlib.edfwriter import EDFwriter | |
| from pyedflib import highlevel | |
| from portilooplot.jupyter_plot import ProgressPlot | |
| from pathlib import Path | |
| import numpy as np | |
| import csv | |
| import time | |
| import os | |
| import warnings | |
| EDF_PATH = Path.home() / 'workspace' / 'edf_recording' | |
| # Path to the recordings | |
| RECORDING_PATH = Path.home() / 'portiloop-software' / 'portiloop' / 'recordings' | |
| class DummyAlsaMixer: | |
| def __init__(self): | |
| self.volume = 50 | |
| def getvolume(self): | |
| return [self.volume] | |
| def setvolume(self, volume): | |
| self.volume = volume | |
| # class EDFRecorder: | |
| # def __init__(self, signal_labels, filename, frequency): | |
| # self.filename = filename | |
| # self.nb_signals = len(signal_labels) | |
| # self.samples_per_datarecord_array = frequency | |
| # self.physical_max = 5000000 | |
| # self.physical_min = -5000000 | |
| # self.signal_labels = signal_labels | |
| # self.edf_buffer = [] | |
| # def open_recording_file(self): | |
| # nb_signals = self.nb_signals | |
| # samples_per_datarecord_array = self.samples_per_datarecord_array | |
| # physical_max = self.physical_max | |
| # physical_min = self.physical_min | |
| # signal_labels = self.signal_labels | |
| # print(f"Will store edf recording in {self.filename}") | |
| # self.edf_writer = EDFwriter(p_path=str(self.filename), | |
| # f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS, | |
| # number_of_signals=nb_signals) | |
| # for signal in range(nb_signals): | |
| # assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0 | |
| # assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0 | |
| # assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0 | |
| # assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0 | |
| # assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0 | |
| # assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0 | |
| # assert self.edf_writer.setPhysicalDimension(signal, 'uV') == 0 | |
| # def close_recording_file(self): | |
| # assert self.edf_writer.close() == 0 | |
| # def add_recording_data(self, data): | |
| # self.edf_buffer += data | |
| # if len(self.edf_buffer) >= self.samples_per_datarecord_array: | |
| # datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array] | |
| # self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:] | |
| # datarecord_array = np.array(datarecord_array).transpose() | |
| # assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}" | |
| # for d in datarecord_array: | |
| # assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}" | |
| # assert self.edf_writer.writeSamples(d) == 0 | |
| class EDFRecorder: | |
| def __init__(self, signal_labels, filename, frequency): | |
| self.writing_buffer = [] | |
| self.max_write = 1000 | |
| self.filename = filename | |
| self.csv_filename = str(filename).split('.')[0] + '.csv' | |
| self.signal_labels = signal_labels | |
| self.frequency = frequency | |
| def open_recording_file(self): | |
| self.file = open(self.csv_filename, 'w') | |
| def close_recording_file(self): | |
| self.file.close() | |
| data = np.genfromtxt(self.csv_filename, delimiter=',') | |
| # Convert to float values | |
| data = data.astype(np.float32) | |
| data = data.transpose() | |
| assert data.shape[0] == len(self.signal_labels), f"{data.shape[0]}!={len(self.signal_labels)}" | |
| signal_headers = [] | |
| for row_i in range(data.shape[0]): | |
| # If we only have zeros in that row, the channel was not activated so we must set the physical max and min manually | |
| if np.all(data[row_i] == 0): | |
| phys_max = 200 | |
| phys_min = -200 | |
| else: | |
| phys_max = np.amax(data[row_i]) | |
| phys_min = np.amin(data[row_i]) | |
| # Create the signal header | |
| signal_headers.append(highlevel.make_signal_header( | |
| self.signal_labels[row_i], | |
| sample_frequency=self.frequency, | |
| physical_max=phys_max, | |
| physical_min=phys_min,)) | |
| self.filename = str(self.filename) | |
| print(f"Saving to {self.filename}") | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore") | |
| highlevel.write_edf(str(self.filename), data, signal_headers) | |
| os.remove(self.csv_filename) | |
| def add_recording_data(self, data): | |
| self.writing_buffer += data | |
| # write to file | |
| if len(self.writing_buffer) >= self.max_write: | |
| for point in self.writing_buffer: | |
| self.file.write(','.join([str(elt) for elt in point]) + '\n') | |
| self.writing_buffer = [] | |
| class LiveDisplay(): | |
| def __init__(self, channel_names, window_len=100): | |
| self.datapoint_dim = len(channel_names) | |
| self.history = [] | |
| self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len) | |
| self.matplotlib = False | |
| def add_datapoints(self, datapoints): | |
| """ | |
| Adds 8 lists of datapoints to the plot | |
| Args: | |
| datapoints: list of 8 lists of floats (or list of 8 floats) | |
| """ | |
| if self.matplotlib: | |
| import matplotlib.pyplot as plt | |
| disp_list = [] | |
| for datapoint in datapoints: | |
| d = [[elt] for elt in datapoint] | |
| disp_list.append(d) | |
| if self.matplotlib: | |
| self.history += d[1] | |
| if not self.matplotlib: | |
| self.pp.update_with_datapoints(disp_list) | |
| elif len(self.history) == 1000: | |
| plt.plot(self.history) | |
| plt.show() | |
| self.history = [] | |
| def add_datapoint(self, datapoint): | |
| disp_list = [[elt] for elt in datapoint] | |
| self.pp.update(disp_list) | |
| class FileReader: | |
| def __init__(self, filename): | |
| file = open(filename, 'r') | |
| # Open a csv file | |
| print(f"Reading from file {filename}") | |
| self.csv_reader = csv.reader(file, delimiter=',') | |
| self.wait_time = 1/250.0 | |
| self.index = -1 | |
| self.last_time = time.time() | |
| def get_point(self): | |
| """ | |
| Returns the next point in the file | |
| """ | |
| try: | |
| point = next(self.csv_reader) | |
| self.index += 1 | |
| while time.time() - self.last_time < self.wait_time: | |
| continue | |
| self.last_time = time.time() | |
| return self.index, float(point[0]), float(point[1]), point[2] == '1', point[3] == '1' | |
| except StopIteration: | |
| return None |