dr-studio / model /code /ECGRecoverDatasetRandomMaskWithRS2V3.py
wogh2012's picture
Upload Dash Docker Space
12f8999 verified
from scipy import signal
import numpy as np
import scipy
import inspect
from mai.data.util.misc import LeadType
def adjust_channel_dependency(ecg):
ecg[2] = ecg[1] - ecg[0]
ecg[3] = -(ecg[1] + ecg[0]) / 2
ecg[4] = ecg[0] - ecg[1] / 2
ecg[5] = ecg[1] - ecg[0] / 2
return ecg
def get_mag_range():
return {
"PowerlineNoise": {
0: 0.0,
1: 0.0062,
2: 0.0116,
3: 0.0168,
4: 0.0222,
5: 0.0278,
6: 0.033,
7: 0.0381,
8: 0.0436,
9: 0.049,
10: 0.0543,
},
"BaselineWander": {
0: 0.0,
1: 0.01,
2: 0.022,
3: 0.035,
4: 0.045,
5: 0.06,
6: 0.064,
7: 0.086,
8: 0.09,
9: 0.1,
10: 0.12,
},
"BaselineShift": {
0: 0.0,
1: 0.0317,
2: 0.0631,
3: 0.0966,
4: 0.1286,
5: 0.1584,
6: 0.1924,
7: 0.2257,
8: 0.2514,
9: 0.2869,
10: 0.32,
},
"EMGNoise": {
0: 0.0,
1: 0.007,
2: 0.013,
3: 0.018,
4: 0.022,
5: 0.027,
6: 0.032,
7: 0.037,
8: 0.042,
9: 0.045,
10: 0.05,
},
"RandomCropResize": {
0: 1.0,
1: 1.0,
2: 1.001,
3: 1.001,
4: 1.002,
5: 1.002,
6: 1.003,
7: 1.004,
8: 1.005,
9: 1.007,
10: 1.01,
},
"TimeWarp": {
0: 0.0,
1: 0.014,
2: 0.027,
3: 0.041,
4: 0.055,
5: 0.07,
6: 0.086,
7: 0.103,
8: 0.142,
9: 0.202,
10: 0.5,
},
"DynamicTimeWarp": {
0: 0.0,
1: 0.146,
2: 0.292,
3: 0.439,
4: 0.585,
5: 0.731,
6: 0.877,
7: 1.0,
8: 1.239,
9: 1.633,
10: 2.0,
},
"GaussianSmoothing": {
0: 1.0,
1: 1.5,
2: 1.9,
3: 2.681,
4: 3.526,
5: 3.75,
6: 4.0,
7: 4.786,
8: 5.573,
9: 5.786,
10: 6.0,
},
"MagnitudeWarping": {
0: 0.0,
1: 0.02,
2: 0.037,
3: 0.055,
4: 0.071,
5: 0.088,
6: 0.105,
7: 0.122,
8: 0.139,
9: 0.156,
10: 0.172,
},
"AngleRotation": {
0: 0.533,
1: 3.742,
2: 6.231,
3: 8.839,
4: 11.26,
5: 14.112,
6: 15.868,
7: 18.057,
8: 20.816,
9: 22.954,
10: 24.0,
},
"TimeMask": {
0: 0.0,
1: 0.01,
2: 0.021,
3: 0.037,
4: 0.061,
5: 0.088,
6: 0.124,
7: 0.169,
8: 0.216,
9: 0.279,
10: 0.344,
},
"ChannelMask": {
0: 0.0,
1: 0.33,
2: 0.67,
3: 1.0,
4: 1.5,
5: 2.0,
6: 2.25,
7: 2.5,
8: 2.75,
9: 3.0,
10: 3.5,
},
"RandomSpectrogramMask": {
0: 0.0,
1: 0.002,
2: 0.005,
3: 0.009,
4: 0.014,
5: 0.02,
6: 0.028,
7: 0.036,
8: 0.046,
9: 0.057,
10: 0.072,
},
"FrequencyWarping": {
0: 0.0075,
1: 0.018,
2: 0.0225,
3: 0.024,
4: 0.0285,
5: 0.03,
6: 0.0345,
7: 0.0345,
8: 0.0375,
9: 0.0375,
10: 0.039,
},
"LowPassFilter": {
0: 0.0,
1: 0.043,
2: 0.154,
3: 0.271,
4: 0.371,
5: 0.429,
6: 0.484,
7: 0.552,
8: 0.599,
9: 0.636,
10: 0.667,
},
"PartialWhiteNoise": {
0: 0.0,
1: 0.019,
2: 0.03,
3: 0.039,
4: 0.047,
5: 0.055,
6: 0.061,
7: 0.068,
8: 0.074,
9: 0.08,
10: 0.086,
},
"PermuteWaveSegment": {
0: 0.0,
1: 0.284,
2: 0.568,
3: 0.852,
4: 1.486,
5: 2.1,
6: 2.815,
7: 3.399,
8: 4.45,
9: 5.994,
10: 8.0,
},
"ConcatWaveSegment": {
0: 0.0,
1: 1,
2: 1,
3: 2,
4: 2,
5: 3,
6: 3,
7: 4,
8: 4,
9: 5,
10: 6,
},
"Flip": {
0: 0.0,
1: 1.0,
2: 1.0,
3: 1.0,
4: 1.0,
5: 1.0,
6: 1.0,
7: 1.0,
8: 1.0,
9: 1.0,
10: 1.0,
},
"RandomSpike": {
0: 0,
1: 1,
2: 2,
3: 3,
4: 4,
5: 5,
6: 6,
7: 7,
8: 8,
9: 9,
10: 10,
},
"DigitizedLeadName": {
0: 0,
1: 1,
2: 2,
3: 3,
4: 4,
5: 5,
6: 6,
7: 7,
8: 8,
9: 9,
10: 10,
},
}
def aug_mod(aug_list, params):
# aug_mod setting for Rand Aumgnet
aug_range = get_mag_range()
da_list = []
for aug in aug_list:
params_aug = dict()
for param in params.keys():
if param in inspect.getfullargspec(globals()[aug].__init__)[0]:
params_aug[param] = params[param]
if aug in list(aug_range.keys()):
params_aug["mag"] = {
"max": aug_range[aug][int(params["mag"]["max"])],
"min": aug_range[aug][max(int(params["mag"]["min"]), 0)],
}
da_list.append(globals()[aug](**params_aug))
return da_list
## PowerlineNoise
class PowerlineNoise(object):
def __init__(self, mag={"max": 0.5, "min": 0}, p=1.0, freq=500, dependency=True):
# self.min_amplitude = min_amplitude
# self.max_amplitude = max_amplitude
self.max_amplitude = mag["max"]
self.min_amplitude = mag["min"]
self.freq = freq
self.p = p
self.dependency = dependency
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample
C, T = new_sample.shape
# amp_channel = np.random.normal(1, 0.1, size=(C,1))
amp = np.random.uniform(self.min_amplitude, self.max_amplitude, size=(1, 1))
# amp = amp_channel*amp_general
f = 50 if np.random.uniform(0, 1) > 0.5 else 60
noise = self.apply_powerline_noise(T, f)
new_sample = new_sample + noise * amp
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
def apply_powerline_noise(self, T, f):
t = np.linspace(0, T - 1, T)
phase = np.random.uniform(0, 2 * 3.14)
noise = np.cos(2 * 3.14 * f * (t / self.freq) + phase)
return noise
## BaselineWander
class BaselineWander(object):
def __init__(
self,
mag={"max": 0.5, "min": 0},
p=1.0,
aug_freq={"max": 0.2, "min": 0.01},
k=3,
freq=500,
dependency=True,
):
self.min_amplitude = mag["min"]
self.max_amplitude = mag["max"]
self.min_freq = aug_freq["min"]
self.max_freq = aug_freq["max"]
self.k = k
self.freq = freq
self.p = p
self.dependency = dependency
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
amp_channel = np.random.normal(1, 0.5, size=(C, 1))
# c = np.array([i for i in range(12)])
amp_general = np.random.uniform(
self.min_amplitude, self.max_amplitude, size=self.k
)
noise = np.zeros(shape=(1, T))
for k in range(self.k):
noise += self.apply_baseline_wander(T) * amp_general[k]
noise = noise * amp_channel
new_sample[:, :] = new_sample[:, :] + noise[:, :]
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
def apply_baseline_wander(self, T):
f = np.random.uniform(self.min_freq, self.max_freq)
t = np.linspace(0, T - 1, T)
r = np.random.uniform(0, 2 * 3.14)
noise = np.cos(2 * 3.14 * f * (t / self.freq) + r)
return noise
## BaselineShift
class BaselineShift(object):
def __init__(
self,
mag={"max": 0.5, "min": 0},
shift_ratio=0.2,
num_segment=1,
freq=500,
dependency=False,
p=1.0,
):
self.max_amplitude = mag["max"]
self.min_amplitude = mag["min"]
self.shift_ratio = shift_ratio
self.num_segment = num_segment
self.freq = freq
self.p = p
self.dependency = dependency
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
shift_length = T * self.shift_ratio
# amp_channel = np.random.normal(1,0.1,size=(C,1))
amp_channel = np.random.choice([1, -1], size=(C, 1))
amp_general = np.random.uniform(
self.min_amplitude, self.max_amplitude, size=(1, 1)
)
amp = amp_channel * amp_general
noise = np.zeros(shape=(C, T))
for i in range(self.num_segment):
segment_len = np.random.normal(shift_length, shift_length * 0.2)
t0 = int(np.random.uniform(0, T - segment_len))
t = int(t0 + segment_len)
# c = np.random.randint(1,12,size=np.random.randint(1,12))
noise[:, t0:t] = 1
new_sample = new_sample + noise * amp
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# EMGNoise
class EMGNoise(object):
def __init__(
self,
mag={"max": 0.5, "min": 0},
# min_amplitude=0, max_amplitude=0.05,
dependency=True,
p=1.0,
):
# self.min_amplitude = min_amplitude
# self.max_amplitude = max_amplitude
self.max_amplitude = mag["max"]
self.min_amplitude = mag["min"]
self.p = p
self.dependency = dependency
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
amp = np.random.uniform(self.min_amplitude, self.max_amplitude, size=(C, 1))
# c = np.random.randint(1,12,size=np.random.randint(1,12))
noise = np.random.normal(0, 1, [C, T])
# new_sample[c] = new_sample[c] + noise[c]*amp[c]
new_sample = new_sample + noise * amp
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# RandomCropResize
class RandomCropResize(object):
def __init__(
self,
mag={"max": 1.2, "min": 1.1},
# min_rate=1.0, max_rate=1.1,
dependency=False,
p=1.0,
):
# self.min_rate = min_rate
# self.max_rate = max_rate
self.min_rate = mag["min"]
self.max_rate = mag["max"]
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
try:
new_sample = sample.copy()
C, T = new_sample.shape
rate = np.random.uniform(self.min_rate, self.max_rate)
rate = 1 / rate if np.random.uniform(0, 1) > 0.5 else rate
if self.max_rate > 1:
new_sample = np.concatenate([new_sample] * 3, 1)
start = np.random.randint(new_sample.shape[1] - int(T * rate))
new_sample = new_sample[:, start : start + int(T * rate)]
new_sample = signal.resample(new_sample, T, axis=1)
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
except Exception:
new_sample = sample
else:
new_sample = sample
return new_sample
# TimeWarp
class TimeWarp(object):
def __init__(
self,
mag={"max": 20, "min": 0},
# iteration=1,
dependency=False,
p=1.0,
):
self.epsilon_max = 10 / np.max([mag["max"], 0.001])
self.epsilon_min = 10 / np.max([mag["min"], 0.001])
self.scale_max = mag["max"] # mag['max'] # mag['max']
self.scale_min = mag["min"] # mag['min'] # mag['min']
self.iteration = 1 # 210 if mag['max'] == 0 else 1
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
for _ in range(self.iteration):
epsilon = np.random.uniform(self.epsilon_min, self.epsilon_max)
scale = np.random.uniform(self.scale_min, self.scale_max)
pmf = np.random.normal(loc=0, scale=scale, size=T)
pmf = np.cumsum(pmf) # random walk
pmf = pmf - np.min(pmf) + epsilon # make it positive
cdf = np.cumsum(pmf) # by definition monotonically increasing
t_new = (
(cdf - cdf[0]) / (cdf[-1] - cdf[0]) * (len(cdf) - 1)
) # correct normalization
# t_old = np.arange(T)
new_sample = new_sample[:, t_new.astype(int)]
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# DynamicTimeWarp
class DynamicTimeWarp(object):
def __init__(
self,
mag={"max": 3, "min": 3},
freq=500,
scale=1,
epsilon=10,
dependency=True,
p=1.0,
):
self.num_of_warps_min = mag["min"]
self.num_of_warps_max = mag["max"]
self.radius = freq
self.scale = 0.1
self.epsilon = 0.1
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
radius = self.radius
t_new = np.arange(T)
num_of_warps = np.random.randint(
self.num_of_warps_min, self.num_of_warps_max + 1
)
for i in range(num_of_warps):
section_length = T // num_of_warps
point = np.random.randint(section_length * i, section_length * (i + 1))
warp_from = (
point - radius // 2
if point - radius // 2 > section_length * i
else section_length * i
)
warp_from = (
warp_from
if point + radius // 2 < section_length * (i + 1)
else section_length * (i + 1) - radius - 1
)
warp_to = warp_from + radius
pmf = np.random.normal(loc=0, scale=self.scale, size=radius)
pmf = np.cumsum(pmf) # random walk
pmf = pmf - np.min(pmf) + self.epsilon # make it positive
cdf = np.cumsum(pmf) # by definition monotonically increasing
t_new[warp_from:warp_to] = warp_from + (cdf - cdf[0]) / (
cdf[-1] - cdf[0]
) * (len(cdf) - 1)
new_sample = new_sample[:, t_new.astype(int)]
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# GaussianSmoothing
class GaussianSmoothing(object):
def __init__(
self,
mag={"max": 15, "min": 3},
p=1.0,
# min_window_length=3,
# max_window_length=15,
dependency=False,
):
self.min_window_length = mag["min"]
self.max_window_length = mag["max"]
# self.max_window_length = mag
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
window_length = np.random.randint(
self.min_window_length, self.max_window_length + 1
)
new_sample = self.apply_gaussian_smoothing(new_sample, window_length)
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
def apply_gaussian_smoothing(self, wave, window_length):
C, T = wave.shape
window = scipy.signal.windows.gaussian(window_length, std=window_length // 2)
window = window / window.sum()
new_wave = []
for i in range(C):
new_wave.append(np.convolve(window, wave[i], "same"))
new_wave = np.stack(new_wave)
return new_wave
# MagnitudeWarping
class MagnitudeWarping(object):
def __init__(self, mag={"max": 0.5, "min": 0.1}, dependency=True, p=1.0):
self.std_max = mag["max"]
self.std_min = mag["min"]
# self.std = mag
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
std = np.random.uniform(self.std_min, self.std_max)
C, T = new_sample.shape
amplitude = np.random.normal(1, std, size=(C, T))
new_sample = new_sample * amplitude
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# AngleRotation
class AngleRotation(object):
def __init__(
self, mag={"max": 45, "min": 0}, method="dower", dependency=True, p=1.0
):
assert method in ["dower", "plsv", "qlsv", "kors"]
self.min_rotation = mag["min"]
self.max_rotation = mag["max"]
# self.max_rotation = mag
self.dependency = dependency
self.p = p
if method == "dower":
self.trans_inv = np.array(
[
[-0.172, -0.073, 0.122, 0.231, 0.239, 0.193, 0.156, -0.009],
[0.057, -0.019, -0.106, -0.022, 0.040, 0.048, -0.227, 0.886],
[-0.228, -0.310, -0.245, -0.063, 0.054, 0.108, 0.021, 0.102],
]
)
elif method == "plsv":
self.trans_inv = np.array(
[
[-0.266, 0.027, 0.065, 0.131, 0.203, 0.220, 0.370, -0.154],
[0.088, -0.088, 0.003, 0.042, 0.047, 0.067, -0.131, 0.717],
[-0.319, -0.198, -0.167, -0.099, -0.009, 0.060, 0.184, -0.114],
]
)
elif method == "qlsv":
self.trans_inv = np.array(
[
[-0.147, -0.058, 0.037, 0.139, 0.232, 0.226, 0.199, -0.018],
[0.023, -0.085, -0.003, 0.033, 0.060, 0.104, -0.146, 0.503],
[-0.184, -0.163, -0.190, -0.119, -0.023, 0.043, 0.085, -0.130],
]
)
elif method == "kors":
self.trans_inv = np.array(
[
[-0.130, 0.050, -0.010, 0.140, 0.060, 0.540, 0.380, -0.070],
[0.060, -0.020, -0.050, 0.060, -0.170, 0.130, -0.070, 0.930],
[-0.430, -0.060, -0.140, -0.200, -0.110, 0.310, 0.110, -0.230],
]
)
self.trans = np.linalg.pinv(self.trans_inv)
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
theta = np.random.randint(self.min_rotation, self.max_rotation + 1, 3)
if C == 12:
new_sample_8lead = new_sample[[6, 7, 8, 9, 10, 11, 0, 1], :]
rot_x = np.array(
[
[1, 0, 0],
[
0,
np.cos(theta[0] * np.pi / 180),
-np.sin(theta[0] * np.pi / 180),
],
[
0,
np.sin(theta[0] * np.pi / 180),
np.cos(theta[0] * np.pi / 180),
],
]
)
rot_y = np.array(
[
[
np.cos(theta[1] * np.pi / 180),
0,
-np.sin(theta[1] * np.pi / 180),
],
[0, 1, 0],
[
np.sin(theta[1] * np.pi / 180),
0,
np.cos(theta[1] * np.pi / 180),
],
]
)
rot_z = np.array(
[
[
np.cos(theta[2] * np.pi / 180),
-np.sin(theta[2] * np.pi / 180),
0,
],
[
np.sin(theta[2] * np.pi / 180),
np.cos(theta[2] * np.pi / 180),
0,
],
[0, 0, 1],
]
)
rot = np.einsum("ab,bc,cd->ad", rot_x, rot_y, rot_z)
mtx1 = np.einsum("ab,bc->ac", self.trans, rot)
mtx2 = np.einsum("ab,bc->ac", self.trans_inv, new_sample_8lead)
new_sample_8lead = np.einsum("ab,bc->ac", mtx1, mtx2)
new_sample_12lead = np.zeros([C, T])
new_sample_12lead[[6, 7, 8, 9, 10, 11, 0, 1], :] = new_sample_8lead
new_sample = adjust_channel_dependency(new_sample_12lead)
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
new_sample = (np.mean(theta) / 30) * new_sample + (
30 - np.mean(theta)
) / 30 * sample
else:
new_sample = sample
return new_sample
# TimeMask
class TimeMask(object):
def __init__(
self,
mag={"max": 0.3, "min": 0.0},
# min_band_part=0.0,
# max_band_part=0.3,
num=1,
dependency=True,
p=1.0,
):
self.min_band_part = mag["max"]
self.max_band_part = mag["min"]
# self.max_band_part = mag
self.num = num
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
for _ in range(self.num):
t0 = np.random.uniform(0, 1 - self.max_band_part)
t = np.random.uniform(self.min_band_part, self.max_band_part)
mask_from = int(t0 * T)
mask_to = int((t0 + t) * T)
new_sample[:, mask_from:mask_to] = 0
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# ChannelMask
class ChannelMask(object):
def __init__(
self,
mag={"max": 4, "min": 1},
# min_num_channel=0,
# max_num_channel=4,
dependency=False,
p=1.0,
):
self.min_num_channel = mag["min"]
self.max_num_channel = mag["max"]
# self.max_num_channel = mag
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
num_channel = np.random.randint(
self.min_num_channel, self.max_num_channel + 1
)
if num_channel > C:
num_channel = C
channels = np.random.choice(list(range(C)), num_channel, replace=False)
new_sample[channels, :] = 0
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# RandomSpectrogramMask
class RandomSpectrogramMask(object):
def __init__(self, mag={"max": 0.5, "min": 0.2}, freq=500, dependency=False, p=1.0):
self.random_mask_prob_min = mag["min"]
self.random_mask_prob_max = mag["max"]
self.freq = freq
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
_, _, z = scipy.signal.stft(
new_sample, fs=self.freq, nperseg=self.freq // 5
)
N, C, T = z.shape
mask_prob = np.random.uniform(
self.random_mask_prob_min, self.random_mask_prob_max
)
mask = np.random.choice(
[0, 1], size=(N, C, T), p=[mask_prob, 1 - mask_prob]
)
_, new_sample = scipy.signal.istft(
z * mask, fs=self.freq, nperseg=self.freq // 5
)
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
new_sample = new_sample[:, : sample.shape[-1]]
else:
new_sample = sample
return new_sample
## FrequencyWarping
class FrequencyWarping(object):
def __init__(self, mag, scale=0.1, dependency=True, p=1.0):
# self.scale = scale
self.scale_max = mag["max"]
self.scale_min = mag["min"]
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
z = np.fft.fft(new_sample)
C, F = z.shape
scale = np.random.uniform(self.scale_min, self.scale_max)
pmf = np.random.normal(loc=1, scale=scale, size=F)
cdf = np.cumsum(pmf)
f_new = (cdf - cdf[0]) / (cdf[-1] - cdf[0]) * (len(cdf) - 1)
z_new = z[:, np.round(f_new).astype(int)]
new_sample = np.real(np.fft.ifft(z_new))
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
## LowPassFilter
class LowPassFilter(object):
def __init__(
self,
mag={"max": 0.8, "min": 0.5},
# cutoff_ratio_min=0.0,
# cutoff_ratio_max=0.8,
dependency=False,
p=1.0,
):
self.cutoff_ratio_min = mag["min"]
self.cutoff_ratio_max = mag["max"]
# self.cutoff_ratio_max = mag
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
z = np.fft.fft(new_sample)
C, F = z.shape
mask_prob = np.random.uniform(self.cutoff_ratio_min, self.cutoff_ratio_max)
mask = np.ones([C, F])
mask[
:,
int((F // 2) - ((F * mask_prob) // 2)) : int(
(F // 2) + ((F * mask_prob) // 2)
),
] = 0
new_sample = np.real(np.fft.ifft(z * mask))
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
## PartialWhiteNoise
class PartialWhiteNoise(object):
def __init__(
self,
mag={"max": 0.3, "min": 0.0},
# scale=3,
# min_band_part=0.0,
# max_band_part=0.3,
num=1,
dependency=True,
p=1.0,
):
self.min_band_part = mag["min"]
self.max_band_part = mag["max"]
# self.max_band_part = mag
# self.scale = scale
self.num = num
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
C, T = new_sample.shape
for _ in range(self.num):
t0 = np.random.uniform(0, 1 - self.max_band_part)
t = np.random.uniform(self.min_band_part, self.max_band_part)
mask_from = int(t0 * T)
mask_to = int(t0 * T) + int(t * T)
scale = t * 2.7
new_sample[:, mask_from:mask_to] = (
new_sample[:, mask_from:mask_to]
+ np.random.normal(0, scale, [C, mask_to - mask_from])
* np.std(new_sample, 1, keepdims=True) ** 0.5
)
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# PermuteWaveSegment
class PermuteWaveSegment(object):
def __init__(
self,
mag={"max": 4, "min": 0},
# min_num_segment=0,
# max_num_segment=4,
sampling_rate=500,
dependency=False,
p=1.0,
):
self.min_num_segment = mag["min"]
self.max_num_segment = mag["max"]
self.sampling_rate = sampling_rate
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
sample = sample.copy()
C, T = sample.shape
num_segment = np.random.randint(
self.min_num_segment, self.max_num_segment + 1
)
segment_point = np.sort(np.random.randint(0, T, num_segment + 1))
seg_index = segment_point[
[
int(x)
for x in np.linspace(0, len(segment_point) - 1, num_segment + 1)
]
]
seg_index = np.concatenate([[0], seg_index, [T]], 0)
seg_index = np.random.permutation(
[[i, j] for i, j in zip(seg_index[:-1], seg_index[1:])]
)
new_sample = []
for start, end in seg_index:
new_sample.append(sample[:, start:end])
crop_start = np.random.randint(T - (segment_point[-1] - segment_point[0]))
new_sample = np.concatenate(new_sample * 2, axis=1)[
:, crop_start : crop_start + T
]
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
else:
new_sample = sample
return new_sample
# ConcatWaveSegment
class ConcatWaveSegment(object):
def __init__(
self,
mag={"max": 4, "min": 0},
# min_num_segment=0,
# max_num_segment=4,
sampling_rate=500,
dependency=True,
p=1.0,
):
self.min_num_segment = mag["min"]
self.max_num_segment = mag["max"]
self.sampling_rate = sampling_rate
self.dependency = dependency
self.p = p
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
try:
C, T = sample.shape
num_segment = np.random.randint(
self.min_num_segment, self.max_num_segment + 1
)
segment_point = np.sort(np.random.randint(0, T, num_segment + 1))
seg_point = segment_point[
[
int(x)
for x in np.linspace(0, len(segment_point) - 1, num_segment + 1)
]
]
index = np.random.randint(0, len(seg_point) - 1, 1)[0]
sample_segment = sample[:, seg_point[index] : seg_point[index + 1]]
new_sample = np.concatenate(
[sample_segment] * (T // len(sample_segment[0]) + 1), 1
)[:, :T]
if self.dependency:
new_sample = adjust_channel_dependency(new_sample)
if new_sample.shape[1] < T:
new_sample = sample
except Exception:
new_sample = sample
else:
new_sample = sample
return new_sample
class Flip(object):
def __init__(self, mag={"max": 0.5, "min": 0.1}, p=0.1):
self.std_max = mag["max"]
self.std_min = mag["min"]
self.p = p # flip 은 확률 10%로 세팅
def __call__(self, sample):
if self.p > np.random.uniform(0, 1):
new_sample = sample.copy()
if self.std_max != 0:
new_sample = -new_sample
else:
new_sample = sample
return new_sample
class RandomSpike(object):
def __init__(self, mag={"max": 2, "min": 1}, p=1.0, freq=500):
self.max_ratio = mag["max"]
self.min_ratio = mag["min"]
self.p = p
self.freq = freq
def _visible_segment_for_channel(self, c: int, T: int):
"""
채널별 보이는 구간 반환.
T=5000 이면 segment_len = 1250:
0~2 -> 0~1250
3~5 -> 1250~2500
6~8 -> 2500~3750
9~11 -> 3750~5000
"""
segment_len = T // 4
if c <= 2: # I, II, III
return 0, segment_len
elif c <= 5: # aVR, aVL, aVF
return segment_len, 2 * segment_len
elif c <= 8: # V1, V2, V3
return 2 * segment_len, 3 * segment_len
else: # V4, V5, V6
return 3 * segment_len, 4 * segment_len
def __call__(self, sample):
if self.p <= np.random.uniform(0, 1):
return sample
new_sample = np.copy(sample)
C, T = new_sample.shape
assert C == 12, f"이 구현은 12리드를 가정합니다. 현재 C={C}"
spike_len = int(np.random.uniform(0, self.max_ratio) * 1.2) + 1
n_spikes = self.max_ratio // 2
for _ in range(n_spikes):
for c in range(C):
seg_start, seg_end = self._visible_segment_for_channel(c, T)
# 해당 리드의 보이는 구간 안에서만 시작 위치 랜덤
max_start = seg_end - spike_len
start = np.random.randint(seg_start, max_start)
# amplitude
range_c = (new_sample[c].max() - new_sample[c].min()) / 2
scale = np.random.uniform(0.5, 1.5)
sign = np.random.choice([-1.0, 1.0])
spike_amp = sign * range_c * scale
new_sample[c, start : start + spike_len] = spike_amp
return new_sample
import os
class DigitizedLeadName(object):
def __init__(self, mag={"max": 2, "min": 1}, p=1.0, freq=500):
"""
place: 1 → upper 템플릿, -1 → under 템플릿
height: 템플릿 전체에 더할 vertical shift (place * height)
width_scale: 시간축 scaling factor (1: 그대로, 2: 길이 2배)
p: augmentation probability
"""
mag_to_param_range = {
"width_scale_range": {
1: (1, 1.000000001),
2: (0.9, 1.1),
3: (0.85, 1.2),
4: (0.8, 1.3),
5: (0.75, 1.4),
6: (0.7, 1.5),
7: (0.65, 1.6),
8: (0.6, 1.7),
9: (0.55, 1.8),
10: (0.5, 1.9),
},
"height_scale_range": {
1: (1, 1.000000001),
2: (0.94, 1.06),
3: (0.88, 1.12),
4: (0.82, 1.18),
5: (0.76, 1.24),
6: (0.70, 1.30),
7: (0.64, 1.36),
8: (0.58, 1.42),
9: (0.52, 1.48),
10: (0.5, 1.54),
},
"height_shift_range": {
1: (0, 0.000000001),
2: (0.0, 0.1),
3: (0.0, 0.2),
4: (0.0, 0.3),
5: (0.0, 0.4),
6: (0.0, 0.5),
7: (0.0, 0.6),
8: (0.0, 0.7),
9: (0.0, 0.8),
10: (0.0, 0.9),
},
"text_space_range": {
1: (0, 2),
2: (0, 2),
3: (0, 2),
4: (0, 3),
5: (0, 3),
6: (0, 3),
7: (0, 4),
8: (0, 4),
9: (0, 4),
10: (0, 5),
},
}
self.p = p
self.mag = int(mag["max"])
self.height_shift_range = mag_to_param_range["height_shift_range"][self.mag]
self.height_scale_range = mag_to_param_range["height_scale_range"][self.mag]
self.width_scale_range = mag_to_param_range["width_scale_range"][self.mag]
self.text_space_range = mag_to_param_range["text_space_range"][self.mag]
self.upper_template, self.under_template = self._load_and_split_templates(
"DigitizedLeadNameTemplate.npz"
)
self.lead_order = [lead.name for lead in LeadType]
# -------------------- 템플릿 로드 -------------------- #
def _load_and_split_templates(self, filename):
"""
.npz에서 upper/under 템플릿을 읽고,
각 lead 템플릿(1D: 값 + NaN)을 NaN 기준으로 글자 단위로 분리한다.
반환:
upper_split: {lead_name: [char0, char1, ...]}
under_split: {lead_name: [char0, char1, ...]}
"""
path = os.path.join(os.path.dirname(__file__), "datasets", filename)
data = np.load(path, allow_pickle=True)
upper_raw = {
k: v.astype(np.float32)
for k, v in zip(data["upper_keys"], data["upper_values"])
}
under_raw = {
k: v.astype(np.float32)
for k, v in zip(data["under_keys"], data["under_values"])
}
def _split_dict(tpl_dict):
out = {}
for lead_name, tpl in tpl_dict.items():
tpl = np.asarray(tpl, dtype=np.float32)
L = len(tpl)
chars = []
i = 0
while i < L:
# NaN 건너뛰기
if np.isnan(tpl[i]):
i += 1
continue
# non-NaN 시작
j = i + 1
while j < L and not np.isnan(tpl[j]):
j += 1
seg = tpl[i:j]
if seg.size > 0:
chars.append(seg)
i = j
out[lead_name] = chars
return out
upper_split = _split_dict(upper_raw)
under_split = _split_dict(under_raw)
return upper_split, under_split
# -------------------- width scaling (1D, NaN 없음) -------------------- #
def _scale_width(self, tpl, scale: float):
"""
tpl: 1D float32 ndarray
scale: >0, 1이면 그대로, 2이면 길이 2배 등
"""
L = len(tpl)
if L <= 1 or scale == 1.0:
return tpl
new_L = max(2, int(round(L * scale)))
idx_old = (np.arange(new_L) / scale).astype(int)
idx_old = np.clip(idx_old, 0, L - 1)
return tpl[idx_old]
# -------------------- 글자 단위 변형 + 랜덤 간격으로 lead 템플릿 조립 -------------------- #
def _build_lead_template(self, char_list, place: int):
"""
char_list: [char0, char1, ...], 각 char는 non-NaN float32 1D
place: 1 or -1 (위/아래 템플릿 방향)
"""
segments = []
n_chars = len(char_list)
for idx, char_tpl in enumerate(char_list):
w_scale = np.random.uniform(*self.width_scale_range)
h_scale = np.random.uniform(*self.height_scale_range)
h_shift = np.random.uniform(*self.height_shift_range)
char_scaled = self._scale_width(char_tpl, w_scale)
char_scaled = char_scaled * h_scale
char_scaled = char_scaled + (place * h_shift)
segments.append(char_scaled)
# 글자 사이 gap (NaN)
if idx < n_chars - 1:
gap_len = np.random.randint(*self.text_space_range)
segments.append(np.full(gap_len, np.nan, dtype=np.float32))
if not segments:
return np.zeros(0, dtype=np.float32)
return np.concatenate(segments)
# -------------------- 메인 -------------------- #
def __call__(self, sample: np.ndarray):
if self.p <= np.random.uniform(0, 1):
return sample
# 여기서부터는 그냥 float32 로 고정
new_sample = np.copy(sample)
C, T = new_sample.shape
if C < 12:
return new_sample
seg_len = T // 4
global_start = np.random.randint(0, seg_len)
place = 1 if self.mag % 2 == 1 else -1
tpl_source = self.upper_template if place == 1 else self.under_template
for lead_idx, lead_name in enumerate(self.lead_order):
char_list = tpl_source.get(lead_name)
tpl_scaled = self._build_lead_template(char_list, place)
L = len(tpl_scaled)
group = lead_idx // 3 # 0: I~III, 1: aVR~aVF, 2: V1~V3, 3: V4~V6
base = group * seg_len
insert_start = base + global_start
seg_end = base + seg_len
max_L_in_seg = seg_end - insert_start
if L > max_L_in_seg:
tpl_seg = tpl_scaled[:max_L_in_seg]
else:
tpl_seg = tpl_scaled
target = new_sample[lead_idx, insert_start : insert_start + len(tpl_seg)]
mask = ~np.isnan(tpl_seg)
target[mask] = tpl_seg[mask]
return new_sample
import numpy as np
import pandas as pd
import torch
import logging
from torch.utils.data import Dataset
from mai.data.util.load import load_mai_data
from mai.sigproc.sigproc_composer import SigprocComposer
from mai.sigproc.datamodel import SigprocConfig
from torchvision import transforms
import traceback as tb
class BaseDatasetV3(Dataset):
def __init__(
self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None
):
self.table = table.reset_index(drop=True)
self.len = len(table)
self.stage = stage
self.randomness = True if self.stage.upper() == "TRAIN" else False
self.label = label
self.success_idx = 0
self.dataset_params = dataset_params
self.aux_data = self.dataset_params.aux_data
self.num_aux = len(self.aux_data)
self.wave_type = self.dataset_params.wave_type
self.wave_config_list = self.dataset_params.wave_config
self.preprocessor = dict()
self.sampling_rate = dict()
for wave_type in self.wave_type:
for wave_config in self.wave_config_list:
if wave_config.name == wave_type:
preproc_config = wave_config.params.preproc_config
preproc_config = SigprocConfig(preproc_config)
composer = SigprocComposer(
getattr(wave_config.params, "additional_module_path", None)
)
self.preprocessor[wave_type] = composer.create(preproc_config)
# necessary params: sampling_rate_target
for preproc in preproc_config:
proc_name, proc_config = preproc
if proc_name == "ResampleSignal":
self.sampling_rate[
wave_type
] = proc_config.params.sampling_rate_target
def __len__(self):
return self.len
def __getitem__(self, index):
try:
obj_id, input, label = self._read_signal(index)
self.success_idx = index
except Exception:
logging.getLogger().error(
f"Failed to _read_signal in Dataset. Using fallback index {self.success_idx}. {tb.format_exc()}"
)
obj_id, input, label = self._read_signal(self.success_idx)
(obj_id, offset) = obj_id
return (obj_id, offset), input, label
def _read_signal(self, index):
row = self.table.loc[index]
obj_id, mai_json = self._get_mai_json(row)
meta, sig = self.preprocessing(mai_json)
aux = self._get_aux_data(row, meta)
label = torch.tensor(row[self.label], dtype=torch.float32)
return obj_id, [sig, aux], label
def _get_mai_json(self, row):
obj_id = row["objectid"]
start, length = row["start"], row["length"]
mai_json = load_mai_data(obj_id, self.wave_type, (start, length))
return (obj_id, start), mai_json
def preprocessing(self, signal_json, required_lead_type=None):
raise NotImplementedError
def parse_signal_json(self, signal_json):
raise NotImplementedError
def _get_aux_data(self, row):
return {col: row[col] for col in self.aux_data if col in row}
def normalize_aux(self, aux):
aux_norm_params = {
"age": {"min": 0, "max": 90},
"gender": {"min": -1, "max": 1},
"weight": {"min": 25, "max": 125},
"height": {"min": 120, "max": 200},
}
aux_list = list()
for k in aux.keys():
if k in ["age", "gender", "weight", "height"]:
mn = aux_norm_params[k]["min"]
mx = aux_norm_params[k]["max"]
aux[k] = (aux[k] - mn) / (mx - mn)
aux[k] = np.nan_to_num(aux[k], 0)
aux_list.append(aux[k])
else:
aux_list.append(aux[k])
return np.array(aux_list)
def apply_augmentation(self, sig, aug_params, wave_type="ecg"):
if (self.stage == "train") and (aug_params != {}):
N, I = aug_params["number"], aug_params["intensity"] # noqa: E741
aug_setup_params = {
"mag": {"max": I, "min": I - 1},
"freq": self.sampling_rate[wave_type],
"p": aug_params["prob"],
"dependency": aug_params["lead_dependency"],
}
# this import statement must be here. see https://git.medicalai.com:50001/team-ai/solver/solver2/-/issues/273
DAList = aug_mod(self.get_aug_list(aug_params["list"]), aug_setup_params)
if len(DAList) > 0:
ops = np.random.choice(DAList, N)
if len(ops) > 0:
augment = transforms.Compose(ops)
sig = augment(sig)
return sig
def get_aug_list(self, aug_params):
aug_list = list()
for aug_item in aug_params:
if isinstance(aug_item, dict):
for aug, value in aug_item.items():
if value > np.random.uniform(0, 1):
aug_list.append(aug)
else:
aug_list.append(aug_item)
return aug_list
import numpy as np
import pandas as pd
import torch
import ast
from mai.data.util.misc import LeadType
class ECGDatasetV3(BaseDatasetV3):
def __init__(
self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None
):
super(ECGDatasetV3, self).__init__(table, label, stage, dataset_params)
def _read_signal(self, index):
row = self.table.loc[index]
obj_id, mai_json = self._get_mai_json(row)
aux_dict = self._get_aux_data(row)
required_lead_type = (
ast.literal_eval(row["lead_type"])
if hasattr(row, "lead_type")
else LeadType.__dict__["_member_names_"]
)
if self.label == "label":
label = torch.tensor(row[self.label], dtype=torch.float32)
else: # mult-label
label = torch.tensor(
[row[_label] for _label in self.label], dtype=torch.float32
)
_, ecg, aux = self.preprocessing(mai_json, aux_dict, required_lead_type)
return obj_id, [ecg, aux], label
def preprocessing(
self, mai_json, aux_dict: dict = {}, required_lead_type: list = None
):
meta, wave = mai_json
sampling_rate_input = wave["ecg"].sampling_rate
mv_unit = wave["ecg"].unit
waveform_data = wave["ecg"].waveform.data
wave_params = self.wave_config_list[0].params
if required_lead_type is None:
required_lead_type = list(waveform_data.keys())
waveform_dict = {
lead_name: np.array(waveform_data[lead_name], dtype=np.float32) * mv_unit
for lead_name in required_lead_type if lead_name in waveform_data
}
ecg_dict, _ = self.preprocessor["ecg"](
waveform_dict, sampling_rate_input, self.randomness
)
ecg_array = np.array(list(ecg_dict.values()))
ecg_array = self.apply_augmentation(ecg_array, wave_params.augmentation, "ecg")
ecg_array = torch.tensor(ecg_array, dtype=torch.float32)
aux = self.normalize_aux(aux_dict)
aux = torch.tensor(aux, dtype=torch.float32)
return meta, ecg_array, aux
import ast
import hashlib
import json
import numpy as np
import pandas as pd
import torch
from mai.data.util.misc import LeadType
class ECGRecoverDatasetRandomMaskWithRS2V3(ECGDatasetV3):
"""ECGRecoverDatasetRandomMaskWithRSV3 와 달리, Rhythmstrip(Lead II) 을 13번째 리드로 사용하지 않고 Lead II 자리에 넣어서 return"""
def __init__(
self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None
):
super(ECGRecoverDatasetRandomMaskWithRS2V3, self).__init__(
table, label, stage, dataset_params
)
def test(self, obj_id=["6116253a31ee975e584d1dad"]):
print(obj_id)
path = f"/bfai/nfs_export/workspace/share/data/ecg_recover/ptbxl_wo_leadname/{obj_id[0][18:22]}/{obj_id[0]}.json"
with open(path, "rb") as digitized_file:
digitized = np.array(json.load(digitized_file))
# if len(digitized) > 12:
# digitized = digitized[:12]
return self.apply_random_mask(digitized, obj_id=obj_id[0])
def _read_signal(self, index):
row = self.table.loc[index]
obj_id, origin_json = self._get_mai_json(row)
aux_dict = self._get_aux_data(row)
required_lead_type = (
ast.literal_eval(row["lead_type"])
if hasattr(row, "lead_type")
else LeadType.__dict__["_member_names_"]
)
_, origin, _ = self.preprocessing(origin_json, aux_dict, required_lead_type)
with open(row["digitized_path"], "rb") as digitized_file:
digitized = np.array(json.load(digitized_file))
assert len(digitized) == 13
digitized, mask = self.apply_random_mask(
digitized,
obj_id=obj_id[0],
deterministic=self.dataset_params.deterministic_masking,
)
digitized = torch.tensor(digitized, dtype=torch.float32)
mask = torch.tensor(mask, dtype=torch.float32)
return obj_id, [digitized, mask], origin
def apply_random_mask(
self, digitized: np.ndarray, obj_id: str = "", deterministic: bool = False
):
"""
digitized: (13, 5000)
return:
masked_signal: (12, 5000)
mask: (12, 5000) (1 = invisible, 0 = visible)
"""
def _seed_rng(s: str):
h = hashlib.sha256(s.encode()).hexdigest()
return np.random.default_rng(int(h[:16], 16) % (2**32))
# -------------------------
# RNG 선택
# -------------------------
if deterministic:
rng = _seed_rng(obj_id) # -> deterministic masking
else:
# -> train 일 때 매번 다르게 masking
if self.stage == "train":
rng = np.random.default_rng()
else:
rng = _seed_rng(obj_id)
fs = 500
group_len = 1250 # 2.5s * fs
total_len = 5000 # 10s * fs
visible = np.zeros_like(digitized, dtype=np.float16)
# each lead masking
for g in range(4):
vis_len = int(rng.uniform(1.25 * fs, group_len + 1))
max_offset = group_len - vis_len
offset = int(rng.uniform(0, max_offset + 1))
start = g * group_len + offset
end = start + vis_len
for lead in range(g * 3, g * 3 + 3):
visible[lead, start:end] = 1
# rhythm strip masking
left_mask = int(rng.uniform(0, 1 * fs + 1))
right_mask = int(rng.uniform(0, 1 * fs + 1))
rs_start = left_mask
rs_end = total_len - right_mask
visible[12, rs_start:rs_end] = 1
masked = digitized * visible
mask = 1 - visible
target_lead = 1 # Lead II
masked[target_lead] = masked[12]
mask[target_lead] = mask[12]
# -------------------------
# 최종 12-lead만 반환
# -------------------------
masked = masked[:12]
mask = mask[:12]
return masked, mask