Spaces:
Runtime error
Runtime error
| import numpy as np | |
| from scipy.signal import resample, butter, filtfilt | |
| from baseline_wander_removal import bw_remover | |
| import pywt | |
| import matplotlib.pyplot as plt | |
| def normalize(sig, val=2): | |
| return val*((sig-np.min(sig))/(np.max(sig)-np.min(sig))) | |
| def butter_lowpass_filter(data, cutoff, fs, order): | |
| nyq = 0.5 * fs | |
| normal_cutoff = cutoff / nyq | |
| # Get the filter coefficients | |
| b, a = butter(order, normal_cutoff, btype='low', analog=False) | |
| y = filtfilt(b, a, data) | |
| return y | |
| def visualize_sig(sig, filename="test.png"): | |
| fig, ax = plt.subplots(3, 1, figsize=(10, 10)) | |
| ax[0].plot(sig[0]) | |
| ax[1].plot(sig[1]) | |
| ax[2].plot(sig[2]) | |
| # plt.show() | |
| plt.savefig(filename) | |
| def preprocess_one_chunk(chunk, lvl=2): | |
| chunk = resample(chunk, 1000) | |
| chunk = pywt.wavedec(chunk, 'db6', level=lvl)[0] | |
| # print(x.shape) | |
| # x = pad(x) | |
| chunk = normalize(chunk) | |
| return chunk | |
| def prepare_all_leads(path, butter_filter=False): | |
| if path.endswith(".txt"): | |
| signal = np.loadtxt(path, delimiter=',', unpack=True) | |
| elif path.endswith(".npy"): | |
| sig = np.load(path, allow_pickle=True) | |
| x = pywt.wavedec(sig[0], 'db6', level=2)[0] | |
| y = pywt.wavedec(sig[1], 'db6', level=2)[0] | |
| z = pywt.wavedec(sig[2], 'db6', level=2)[0] | |
| return x[None, :], y[None, :], z[None, :] | |
| freq = signal.shape[1] // 60 | |
| print(freq) | |
| if freq < 250: | |
| lvl = 1 | |
| else: | |
| lvl = 2 | |
| sig = [signal[0], signal[1], signal[2]] | |
| sig[0] = bw_remover(freq, sig[0]) | |
| sig[1] = bw_remover(freq, sig[1]) | |
| sig[2] = bw_remover(freq, sig[2]) | |
| if butter_filter: | |
| cutoff = 20 # desired cutoff frequency of the filter, Hz , slightly higher than actual 1.2 Hz | |
| order = 1 # sin wave can be approx represented as quadratic | |
| sig[0] = butter_lowpass_filter(sig[0], cutoff, freq, order) | |
| sig[1] = butter_lowpass_filter(sig[1], cutoff, freq, order) | |
| sig[2] = butter_lowpass_filter(sig[2], cutoff, freq, order) | |
| sig_length = freq*2 | |
| total_samples = sig[0].shape[0] // 1000 | |
| lead_1 = [] | |
| lead_2 = [] | |
| lead_3 = [] | |
| for i in range(total_samples): | |
| x = sig[0][i*sig_length:(i+1)*sig_length] | |
| y = sig[1][i*sig_length:(i+1)*sig_length] | |
| z = sig[2][i*sig_length:(i+1)*sig_length] | |
| x = preprocess_one_chunk(x, lvl=lvl) | |
| y = preprocess_one_chunk(y, lvl=lvl) | |
| z = preprocess_one_chunk(z, lvl=lvl) | |
| lead_1.append(x) | |
| lead_2.append(y) | |
| lead_3.append(z) | |
| return np.asarray(lead_1), np.asarray(lead_2), np.asarray(lead_3) | |
| def pad(sig): | |
| sig = np.pad(sig, (0, 258-sig.shape[0]), 'constant') | |
| return sig |