|
|
from sklearn.linear_model import LogisticRegression |
|
|
from sklearn.multiclass import OneVsRestClassifier |
|
|
from models.base_model import ClassificationModel |
|
|
import pickle |
|
|
from tqdm import tqdm |
|
|
import numpy as np |
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
import pywt |
|
|
import scipy.stats |
|
|
import multiprocessing |
|
|
from collections import Counter |
|
|
from keras.layers import Dropout, Dense, Input |
|
|
from keras.models import Model |
|
|
from keras.models import load_model |
|
|
from keras.callbacks import ModelCheckpoint |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
|
|
|
|
|
|
def calculate_entropy(list_values): |
|
|
counter_values = Counter(list_values).most_common() |
|
|
probabilities = [elem[1] / len(list_values) for elem in counter_values] |
|
|
entropy = scipy.stats.entropy(probabilities) |
|
|
return entropy |
|
|
|
|
|
|
|
|
def calculate_statistics(list_values): |
|
|
n5 = np.nanpercentile(list_values, 5) |
|
|
n25 = np.nanpercentile(list_values, 25) |
|
|
n75 = np.nanpercentile(list_values, 75) |
|
|
n95 = np.nanpercentile(list_values, 95) |
|
|
median = np.nanpercentile(list_values, 50) |
|
|
mean = np.nanmean(list_values) |
|
|
std = np.nanstd(list_values) |
|
|
var = np.nanvar(list_values) |
|
|
rms = np.nanmean(np.sqrt(list_values ** 2)) |
|
|
return [n5, n25, n75, n95, median, mean, std, var, rms] |
|
|
|
|
|
|
|
|
def calculate_crossings(list_values): |
|
|
zero_crossing_indices = np.nonzero(np.diff(np.array(list_values) > 0))[0] |
|
|
no_zero_crossings = len(zero_crossing_indices) |
|
|
mean_crossing_indices = np.nonzero(np.diff(np.array(list_values) > np.nanmean(list_values)))[0] |
|
|
no_mean_crossings = len(mean_crossing_indices) |
|
|
return [no_zero_crossings, no_mean_crossings] |
|
|
|
|
|
|
|
|
def get_features(list_values): |
|
|
entropy = calculate_entropy(list_values) |
|
|
crossings = calculate_crossings(list_values) |
|
|
statistics = calculate_statistics(list_values) |
|
|
return [entropy] + crossings + statistics |
|
|
|
|
|
|
|
|
def get_single_ecg_features(signal, waveletname='db6'): |
|
|
features = [] |
|
|
for channel in signal.T: |
|
|
list_coeff = pywt.wavedec(channel, wavelet=waveletname, level=5) |
|
|
channel_features = [] |
|
|
for coeff in list_coeff: |
|
|
channel_features += get_features(coeff) |
|
|
features.append(channel_features) |
|
|
return np.array(features).flatten() |
|
|
|
|
|
|
|
|
def get_ecg_features(ecg_data, parallel=True): |
|
|
if parallel: |
|
|
pool = multiprocessing.Pool(18) |
|
|
return np.array(pool.map(get_single_ecg_features, ecg_data)) |
|
|
else: |
|
|
list_features = [] |
|
|
for signal in tqdm(ecg_data): |
|
|
features = get_single_ecg_features(signal) |
|
|
list_features.append(features) |
|
|
return np.array(list_features) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WaveletModel(ClassificationModel): |
|
|
def __init__(self, name, n_classes, freq, outputfolder, input_shape, regularizer_C=.001, classifier='RF'): |
|
|
|
|
|
|
|
|
super().__init__() |
|
|
self.name = name |
|
|
self.outputfolder = outputfolder |
|
|
self.n_classes = n_classes |
|
|
self.freq = freq |
|
|
self.regularizer_C = regularizer_C |
|
|
self.classifier = classifier |
|
|
self.dropout = .25 |
|
|
self.activation = 'relu' |
|
|
self.final_activation = 'sigmoid' |
|
|
self.n_dense_dim = 128 |
|
|
self.epochs = 30 |
|
|
|
|
|
def fit(self, X_train, y_train, X_val, y_val): |
|
|
XF_train = get_ecg_features(X_train) |
|
|
XF_val = get_ecg_features(X_val) |
|
|
|
|
|
if self.classifier == 'LR': |
|
|
if self.n_classes > 1: |
|
|
clf = OneVsRestClassifier( |
|
|
LogisticRegression(C=self.regularizer_C, solver='lbfgs', max_iter=1000, n_jobs=-1)) |
|
|
else: |
|
|
clf = LogisticRegression(C=self.regularizer_C, solver='lbfgs', max_iter=1000, n_jobs=-1) |
|
|
clf.fit(XF_train, y_train) |
|
|
pickle.dump(clf, open(self.outputfolder + 'clf.pkl', 'wb')) |
|
|
elif self.classifier == 'RF': |
|
|
clf = RandomForestClassifier(n_estimators=1000, n_jobs=16) |
|
|
clf.fit(XF_train, y_train) |
|
|
pickle.dump(clf, open(self.outputfolder + 'clf.pkl', 'wb')) |
|
|
elif self.classifier == 'NN': |
|
|
|
|
|
ss = StandardScaler() |
|
|
XFT_train = ss.fit_transform(XF_train) |
|
|
XFT_val = ss.transform(XF_val) |
|
|
pickle.dump(ss, open(self.outputfolder + 'ss.pkl', 'wb')) |
|
|
|
|
|
input_x = Input(shape=(XFT_train.shape[1],)) |
|
|
x = Dense(self.n_dense_dim, activation=self.activation)(input_x) |
|
|
x = Dropout(self.dropout)(x) |
|
|
y = Dense(self.n_classes, activation=self.final_activation)(x) |
|
|
self.model = Model(input_x, y) |
|
|
|
|
|
self.model.compile(optimizer='adamax', loss='binary_crossentropy') |
|
|
|
|
|
mc_loss = ModelCheckpoint(self.outputfolder + 'best_loss_model.h5', monitor='val_loss', mode='min', |
|
|
verbose=1, save_best_only=True) |
|
|
|
|
|
self.model.fit(XFT_train, y_train, validation_data=(XFT_val, y_val), epochs=self.epochs, batch_size=128, |
|
|
callbacks=[mc_loss]) |
|
|
self.model.save(self.outputfolder + 'last_model.h5') |
|
|
|
|
|
def predict(self, X): |
|
|
XF = get_ecg_features(X) |
|
|
if self.classifier == 'LR': |
|
|
clf = pickle.load(open(self.outputfolder + 'clf.pkl', 'rb')) |
|
|
if self.n_classes > 1: |
|
|
return clf.predict_proba(XF) |
|
|
else: |
|
|
return clf.predict_proba(XF)[:, 1][:, np.newaxis] |
|
|
elif self.classifier == 'RF': |
|
|
clf = pickle.load(open(self.outputfolder + 'clf.pkl', 'rb')) |
|
|
y_pred = clf.predict_proba(XF) |
|
|
if self.n_classes > 1: |
|
|
return np.array([yi[:, 1] for yi in y_pred]).T |
|
|
else: |
|
|
return y_pred[:, 1][:, np.newaxis] |
|
|
elif self.classifier == 'NN': |
|
|
ss = pickle.load(open(self.outputfolder + 'ss.pkl', 'rb')) |
|
|
XFT = ss.transform(XF) |
|
|
model = load_model( |
|
|
self.outputfolder + 'best_loss_model.h5') |
|
|
|
|
|
|
|
|
return model.predict(XFT) |