from scipy import signal import numpy as np import scipy import inspect from mai.data.util.misc import LeadType def adjust_channel_dependency(ecg): ecg[2] = ecg[1] - ecg[0] ecg[3] = -(ecg[1] + ecg[0]) / 2 ecg[4] = ecg[0] - ecg[1] / 2 ecg[5] = ecg[1] - ecg[0] / 2 return ecg def get_mag_range(): return { "PowerlineNoise": { 0: 0.0, 1: 0.0062, 2: 0.0116, 3: 0.0168, 4: 0.0222, 5: 0.0278, 6: 0.033, 7: 0.0381, 8: 0.0436, 9: 0.049, 10: 0.0543, }, "BaselineWander": { 0: 0.0, 1: 0.01, 2: 0.022, 3: 0.035, 4: 0.045, 5: 0.06, 6: 0.064, 7: 0.086, 8: 0.09, 9: 0.1, 10: 0.12, }, "BaselineShift": { 0: 0.0, 1: 0.0317, 2: 0.0631, 3: 0.0966, 4: 0.1286, 5: 0.1584, 6: 0.1924, 7: 0.2257, 8: 0.2514, 9: 0.2869, 10: 0.32, }, "EMGNoise": { 0: 0.0, 1: 0.007, 2: 0.013, 3: 0.018, 4: 0.022, 5: 0.027, 6: 0.032, 7: 0.037, 8: 0.042, 9: 0.045, 10: 0.05, }, "RandomCropResize": { 0: 1.0, 1: 1.0, 2: 1.001, 3: 1.001, 4: 1.002, 5: 1.002, 6: 1.003, 7: 1.004, 8: 1.005, 9: 1.007, 10: 1.01, }, "TimeWarp": { 0: 0.0, 1: 0.014, 2: 0.027, 3: 0.041, 4: 0.055, 5: 0.07, 6: 0.086, 7: 0.103, 8: 0.142, 9: 0.202, 10: 0.5, }, "DynamicTimeWarp": { 0: 0.0, 1: 0.146, 2: 0.292, 3: 0.439, 4: 0.585, 5: 0.731, 6: 0.877, 7: 1.0, 8: 1.239, 9: 1.633, 10: 2.0, }, "GaussianSmoothing": { 0: 1.0, 1: 1.5, 2: 1.9, 3: 2.681, 4: 3.526, 5: 3.75, 6: 4.0, 7: 4.786, 8: 5.573, 9: 5.786, 10: 6.0, }, "MagnitudeWarping": { 0: 0.0, 1: 0.02, 2: 0.037, 3: 0.055, 4: 0.071, 5: 0.088, 6: 0.105, 7: 0.122, 8: 0.139, 9: 0.156, 10: 0.172, }, "AngleRotation": { 0: 0.533, 1: 3.742, 2: 6.231, 3: 8.839, 4: 11.26, 5: 14.112, 6: 15.868, 7: 18.057, 8: 20.816, 9: 22.954, 10: 24.0, }, "TimeMask": { 0: 0.0, 1: 0.01, 2: 0.021, 3: 0.037, 4: 0.061, 5: 0.088, 6: 0.124, 7: 0.169, 8: 0.216, 9: 0.279, 10: 0.344, }, "ChannelMask": { 0: 0.0, 1: 0.33, 2: 0.67, 3: 1.0, 4: 1.5, 5: 2.0, 6: 2.25, 7: 2.5, 8: 2.75, 9: 3.0, 10: 3.5, }, "RandomSpectrogramMask": { 0: 0.0, 1: 0.002, 2: 0.005, 3: 0.009, 4: 0.014, 5: 0.02, 6: 0.028, 7: 0.036, 8: 0.046, 9: 0.057, 10: 0.072, }, "FrequencyWarping": { 0: 0.0075, 1: 0.018, 2: 0.0225, 3: 0.024, 4: 0.0285, 5: 0.03, 6: 0.0345, 7: 0.0345, 8: 0.0375, 9: 0.0375, 10: 0.039, }, "LowPassFilter": { 0: 0.0, 1: 0.043, 2: 0.154, 3: 0.271, 4: 0.371, 5: 0.429, 6: 0.484, 7: 0.552, 8: 0.599, 9: 0.636, 10: 0.667, }, "PartialWhiteNoise": { 0: 0.0, 1: 0.019, 2: 0.03, 3: 0.039, 4: 0.047, 5: 0.055, 6: 0.061, 7: 0.068, 8: 0.074, 9: 0.08, 10: 0.086, }, "PermuteWaveSegment": { 0: 0.0, 1: 0.284, 2: 0.568, 3: 0.852, 4: 1.486, 5: 2.1, 6: 2.815, 7: 3.399, 8: 4.45, 9: 5.994, 10: 8.0, }, "ConcatWaveSegment": { 0: 0.0, 1: 1, 2: 1, 3: 2, 4: 2, 5: 3, 6: 3, 7: 4, 8: 4, 9: 5, 10: 6, }, "Flip": { 0: 0.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 5: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, }, "RandomSpike": { 0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, }, "DigitizedLeadName": { 0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, }, } def aug_mod(aug_list, params): # aug_mod setting for Rand Aumgnet aug_range = get_mag_range() da_list = [] for aug in aug_list: params_aug = dict() for param in params.keys(): if param in inspect.getfullargspec(globals()[aug].__init__)[0]: params_aug[param] = params[param] if aug in list(aug_range.keys()): params_aug["mag"] = { "max": aug_range[aug][int(params["mag"]["max"])], "min": aug_range[aug][max(int(params["mag"]["min"]), 0)], } da_list.append(globals()[aug](**params_aug)) return da_list ## PowerlineNoise class PowerlineNoise(object): def __init__(self, mag={"max": 0.5, "min": 0}, p=1.0, freq=500, dependency=True): # self.min_amplitude = min_amplitude # self.max_amplitude = max_amplitude self.max_amplitude = mag["max"] self.min_amplitude = mag["min"] self.freq = freq self.p = p self.dependency = dependency def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample C, T = new_sample.shape # amp_channel = np.random.normal(1, 0.1, size=(C,1)) amp = np.random.uniform(self.min_amplitude, self.max_amplitude, size=(1, 1)) # amp = amp_channel*amp_general f = 50 if np.random.uniform(0, 1) > 0.5 else 60 noise = self.apply_powerline_noise(T, f) new_sample = new_sample + noise * amp if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample def apply_powerline_noise(self, T, f): t = np.linspace(0, T - 1, T) phase = np.random.uniform(0, 2 * 3.14) noise = np.cos(2 * 3.14 * f * (t / self.freq) + phase) return noise ## BaselineWander class BaselineWander(object): def __init__( self, mag={"max": 0.5, "min": 0}, p=1.0, aug_freq={"max": 0.2, "min": 0.01}, k=3, freq=500, dependency=True, ): self.min_amplitude = mag["min"] self.max_amplitude = mag["max"] self.min_freq = aug_freq["min"] self.max_freq = aug_freq["max"] self.k = k self.freq = freq self.p = p self.dependency = dependency def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape amp_channel = np.random.normal(1, 0.5, size=(C, 1)) # c = np.array([i for i in range(12)]) amp_general = np.random.uniform( self.min_amplitude, self.max_amplitude, size=self.k ) noise = np.zeros(shape=(1, T)) for k in range(self.k): noise += self.apply_baseline_wander(T) * amp_general[k] noise = noise * amp_channel new_sample[:, :] = new_sample[:, :] + noise[:, :] if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample def apply_baseline_wander(self, T): f = np.random.uniform(self.min_freq, self.max_freq) t = np.linspace(0, T - 1, T) r = np.random.uniform(0, 2 * 3.14) noise = np.cos(2 * 3.14 * f * (t / self.freq) + r) return noise ## BaselineShift class BaselineShift(object): def __init__( self, mag={"max": 0.5, "min": 0}, shift_ratio=0.2, num_segment=1, freq=500, dependency=False, p=1.0, ): self.max_amplitude = mag["max"] self.min_amplitude = mag["min"] self.shift_ratio = shift_ratio self.num_segment = num_segment self.freq = freq self.p = p self.dependency = dependency def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape shift_length = T * self.shift_ratio # amp_channel = np.random.normal(1,0.1,size=(C,1)) amp_channel = np.random.choice([1, -1], size=(C, 1)) amp_general = np.random.uniform( self.min_amplitude, self.max_amplitude, size=(1, 1) ) amp = amp_channel * amp_general noise = np.zeros(shape=(C, T)) for i in range(self.num_segment): segment_len = np.random.normal(shift_length, shift_length * 0.2) t0 = int(np.random.uniform(0, T - segment_len)) t = int(t0 + segment_len) # c = np.random.randint(1,12,size=np.random.randint(1,12)) noise[:, t0:t] = 1 new_sample = new_sample + noise * amp if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # EMGNoise class EMGNoise(object): def __init__( self, mag={"max": 0.5, "min": 0}, # min_amplitude=0, max_amplitude=0.05, dependency=True, p=1.0, ): # self.min_amplitude = min_amplitude # self.max_amplitude = max_amplitude self.max_amplitude = mag["max"] self.min_amplitude = mag["min"] self.p = p self.dependency = dependency def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape amp = np.random.uniform(self.min_amplitude, self.max_amplitude, size=(C, 1)) # c = np.random.randint(1,12,size=np.random.randint(1,12)) noise = np.random.normal(0, 1, [C, T]) # new_sample[c] = new_sample[c] + noise[c]*amp[c] new_sample = new_sample + noise * amp if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # RandomCropResize class RandomCropResize(object): def __init__( self, mag={"max": 1.2, "min": 1.1}, # min_rate=1.0, max_rate=1.1, dependency=False, p=1.0, ): # self.min_rate = min_rate # self.max_rate = max_rate self.min_rate = mag["min"] self.max_rate = mag["max"] self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): try: new_sample = sample.copy() C, T = new_sample.shape rate = np.random.uniform(self.min_rate, self.max_rate) rate = 1 / rate if np.random.uniform(0, 1) > 0.5 else rate if self.max_rate > 1: new_sample = np.concatenate([new_sample] * 3, 1) start = np.random.randint(new_sample.shape[1] - int(T * rate)) new_sample = new_sample[:, start : start + int(T * rate)] new_sample = signal.resample(new_sample, T, axis=1) if self.dependency: new_sample = adjust_channel_dependency(new_sample) except Exception: new_sample = sample else: new_sample = sample return new_sample # TimeWarp class TimeWarp(object): def __init__( self, mag={"max": 20, "min": 0}, # iteration=1, dependency=False, p=1.0, ): self.epsilon_max = 10 / np.max([mag["max"], 0.001]) self.epsilon_min = 10 / np.max([mag["min"], 0.001]) self.scale_max = mag["max"] # mag['max'] # mag['max'] self.scale_min = mag["min"] # mag['min'] # mag['min'] self.iteration = 1 # 210 if mag['max'] == 0 else 1 self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape for _ in range(self.iteration): epsilon = np.random.uniform(self.epsilon_min, self.epsilon_max) scale = np.random.uniform(self.scale_min, self.scale_max) pmf = np.random.normal(loc=0, scale=scale, size=T) pmf = np.cumsum(pmf) # random walk pmf = pmf - np.min(pmf) + epsilon # make it positive cdf = np.cumsum(pmf) # by definition monotonically increasing t_new = ( (cdf - cdf[0]) / (cdf[-1] - cdf[0]) * (len(cdf) - 1) ) # correct normalization # t_old = np.arange(T) new_sample = new_sample[:, t_new.astype(int)] if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # DynamicTimeWarp class DynamicTimeWarp(object): def __init__( self, mag={"max": 3, "min": 3}, freq=500, scale=1, epsilon=10, dependency=True, p=1.0, ): self.num_of_warps_min = mag["min"] self.num_of_warps_max = mag["max"] self.radius = freq self.scale = 0.1 self.epsilon = 0.1 self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape radius = self.radius t_new = np.arange(T) num_of_warps = np.random.randint( self.num_of_warps_min, self.num_of_warps_max + 1 ) for i in range(num_of_warps): section_length = T // num_of_warps point = np.random.randint(section_length * i, section_length * (i + 1)) warp_from = ( point - radius // 2 if point - radius // 2 > section_length * i else section_length * i ) warp_from = ( warp_from if point + radius // 2 < section_length * (i + 1) else section_length * (i + 1) - radius - 1 ) warp_to = warp_from + radius pmf = np.random.normal(loc=0, scale=self.scale, size=radius) pmf = np.cumsum(pmf) # random walk pmf = pmf - np.min(pmf) + self.epsilon # make it positive cdf = np.cumsum(pmf) # by definition monotonically increasing t_new[warp_from:warp_to] = warp_from + (cdf - cdf[0]) / ( cdf[-1] - cdf[0] ) * (len(cdf) - 1) new_sample = new_sample[:, t_new.astype(int)] if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # GaussianSmoothing class GaussianSmoothing(object): def __init__( self, mag={"max": 15, "min": 3}, p=1.0, # min_window_length=3, # max_window_length=15, dependency=False, ): self.min_window_length = mag["min"] self.max_window_length = mag["max"] # self.max_window_length = mag self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape window_length = np.random.randint( self.min_window_length, self.max_window_length + 1 ) new_sample = self.apply_gaussian_smoothing(new_sample, window_length) if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample def apply_gaussian_smoothing(self, wave, window_length): C, T = wave.shape window = scipy.signal.windows.gaussian(window_length, std=window_length // 2) window = window / window.sum() new_wave = [] for i in range(C): new_wave.append(np.convolve(window, wave[i], "same")) new_wave = np.stack(new_wave) return new_wave # MagnitudeWarping class MagnitudeWarping(object): def __init__(self, mag={"max": 0.5, "min": 0.1}, dependency=True, p=1.0): self.std_max = mag["max"] self.std_min = mag["min"] # self.std = mag self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() std = np.random.uniform(self.std_min, self.std_max) C, T = new_sample.shape amplitude = np.random.normal(1, std, size=(C, T)) new_sample = new_sample * amplitude if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # AngleRotation class AngleRotation(object): def __init__( self, mag={"max": 45, "min": 0}, method="dower", dependency=True, p=1.0 ): assert method in ["dower", "plsv", "qlsv", "kors"] self.min_rotation = mag["min"] self.max_rotation = mag["max"] # self.max_rotation = mag self.dependency = dependency self.p = p if method == "dower": self.trans_inv = np.array( [ [-0.172, -0.073, 0.122, 0.231, 0.239, 0.193, 0.156, -0.009], [0.057, -0.019, -0.106, -0.022, 0.040, 0.048, -0.227, 0.886], [-0.228, -0.310, -0.245, -0.063, 0.054, 0.108, 0.021, 0.102], ] ) elif method == "plsv": self.trans_inv = np.array( [ [-0.266, 0.027, 0.065, 0.131, 0.203, 0.220, 0.370, -0.154], [0.088, -0.088, 0.003, 0.042, 0.047, 0.067, -0.131, 0.717], [-0.319, -0.198, -0.167, -0.099, -0.009, 0.060, 0.184, -0.114], ] ) elif method == "qlsv": self.trans_inv = np.array( [ [-0.147, -0.058, 0.037, 0.139, 0.232, 0.226, 0.199, -0.018], [0.023, -0.085, -0.003, 0.033, 0.060, 0.104, -0.146, 0.503], [-0.184, -0.163, -0.190, -0.119, -0.023, 0.043, 0.085, -0.130], ] ) elif method == "kors": self.trans_inv = np.array( [ [-0.130, 0.050, -0.010, 0.140, 0.060, 0.540, 0.380, -0.070], [0.060, -0.020, -0.050, 0.060, -0.170, 0.130, -0.070, 0.930], [-0.430, -0.060, -0.140, -0.200, -0.110, 0.310, 0.110, -0.230], ] ) self.trans = np.linalg.pinv(self.trans_inv) def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape theta = np.random.randint(self.min_rotation, self.max_rotation + 1, 3) if C == 12: new_sample_8lead = new_sample[[6, 7, 8, 9, 10, 11, 0, 1], :] rot_x = np.array( [ [1, 0, 0], [ 0, np.cos(theta[0] * np.pi / 180), -np.sin(theta[0] * np.pi / 180), ], [ 0, np.sin(theta[0] * np.pi / 180), np.cos(theta[0] * np.pi / 180), ], ] ) rot_y = np.array( [ [ np.cos(theta[1] * np.pi / 180), 0, -np.sin(theta[1] * np.pi / 180), ], [0, 1, 0], [ np.sin(theta[1] * np.pi / 180), 0, np.cos(theta[1] * np.pi / 180), ], ] ) rot_z = np.array( [ [ np.cos(theta[2] * np.pi / 180), -np.sin(theta[2] * np.pi / 180), 0, ], [ np.sin(theta[2] * np.pi / 180), np.cos(theta[2] * np.pi / 180), 0, ], [0, 0, 1], ] ) rot = np.einsum("ab,bc,cd->ad", rot_x, rot_y, rot_z) mtx1 = np.einsum("ab,bc->ac", self.trans, rot) mtx2 = np.einsum("ab,bc->ac", self.trans_inv, new_sample_8lead) new_sample_8lead = np.einsum("ab,bc->ac", mtx1, mtx2) new_sample_12lead = np.zeros([C, T]) new_sample_12lead[[6, 7, 8, 9, 10, 11, 0, 1], :] = new_sample_8lead new_sample = adjust_channel_dependency(new_sample_12lead) if self.dependency: new_sample = adjust_channel_dependency(new_sample) new_sample = (np.mean(theta) / 30) * new_sample + ( 30 - np.mean(theta) ) / 30 * sample else: new_sample = sample return new_sample # TimeMask class TimeMask(object): def __init__( self, mag={"max": 0.3, "min": 0.0}, # min_band_part=0.0, # max_band_part=0.3, num=1, dependency=True, p=1.0, ): self.min_band_part = mag["max"] self.max_band_part = mag["min"] # self.max_band_part = mag self.num = num self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape for _ in range(self.num): t0 = np.random.uniform(0, 1 - self.max_band_part) t = np.random.uniform(self.min_band_part, self.max_band_part) mask_from = int(t0 * T) mask_to = int((t0 + t) * T) new_sample[:, mask_from:mask_to] = 0 if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # ChannelMask class ChannelMask(object): def __init__( self, mag={"max": 4, "min": 1}, # min_num_channel=0, # max_num_channel=4, dependency=False, p=1.0, ): self.min_num_channel = mag["min"] self.max_num_channel = mag["max"] # self.max_num_channel = mag self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape num_channel = np.random.randint( self.min_num_channel, self.max_num_channel + 1 ) if num_channel > C: num_channel = C channels = np.random.choice(list(range(C)), num_channel, replace=False) new_sample[channels, :] = 0 if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # RandomSpectrogramMask class RandomSpectrogramMask(object): def __init__(self, mag={"max": 0.5, "min": 0.2}, freq=500, dependency=False, p=1.0): self.random_mask_prob_min = mag["min"] self.random_mask_prob_max = mag["max"] self.freq = freq self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() _, _, z = scipy.signal.stft( new_sample, fs=self.freq, nperseg=self.freq // 5 ) N, C, T = z.shape mask_prob = np.random.uniform( self.random_mask_prob_min, self.random_mask_prob_max ) mask = np.random.choice( [0, 1], size=(N, C, T), p=[mask_prob, 1 - mask_prob] ) _, new_sample = scipy.signal.istft( z * mask, fs=self.freq, nperseg=self.freq // 5 ) if self.dependency: new_sample = adjust_channel_dependency(new_sample) new_sample = new_sample[:, : sample.shape[-1]] else: new_sample = sample return new_sample ## FrequencyWarping class FrequencyWarping(object): def __init__(self, mag, scale=0.1, dependency=True, p=1.0): # self.scale = scale self.scale_max = mag["max"] self.scale_min = mag["min"] self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() z = np.fft.fft(new_sample) C, F = z.shape scale = np.random.uniform(self.scale_min, self.scale_max) pmf = np.random.normal(loc=1, scale=scale, size=F) cdf = np.cumsum(pmf) f_new = (cdf - cdf[0]) / (cdf[-1] - cdf[0]) * (len(cdf) - 1) z_new = z[:, np.round(f_new).astype(int)] new_sample = np.real(np.fft.ifft(z_new)) if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample ## LowPassFilter class LowPassFilter(object): def __init__( self, mag={"max": 0.8, "min": 0.5}, # cutoff_ratio_min=0.0, # cutoff_ratio_max=0.8, dependency=False, p=1.0, ): self.cutoff_ratio_min = mag["min"] self.cutoff_ratio_max = mag["max"] # self.cutoff_ratio_max = mag self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() z = np.fft.fft(new_sample) C, F = z.shape mask_prob = np.random.uniform(self.cutoff_ratio_min, self.cutoff_ratio_max) mask = np.ones([C, F]) mask[ :, int((F // 2) - ((F * mask_prob) // 2)) : int( (F // 2) + ((F * mask_prob) // 2) ), ] = 0 new_sample = np.real(np.fft.ifft(z * mask)) if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample ## PartialWhiteNoise class PartialWhiteNoise(object): def __init__( self, mag={"max": 0.3, "min": 0.0}, # scale=3, # min_band_part=0.0, # max_band_part=0.3, num=1, dependency=True, p=1.0, ): self.min_band_part = mag["min"] self.max_band_part = mag["max"] # self.max_band_part = mag # self.scale = scale self.num = num self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() C, T = new_sample.shape for _ in range(self.num): t0 = np.random.uniform(0, 1 - self.max_band_part) t = np.random.uniform(self.min_band_part, self.max_band_part) mask_from = int(t0 * T) mask_to = int(t0 * T) + int(t * T) scale = t * 2.7 new_sample[:, mask_from:mask_to] = ( new_sample[:, mask_from:mask_to] + np.random.normal(0, scale, [C, mask_to - mask_from]) * np.std(new_sample, 1, keepdims=True) ** 0.5 ) if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # PermuteWaveSegment class PermuteWaveSegment(object): def __init__( self, mag={"max": 4, "min": 0}, # min_num_segment=0, # max_num_segment=4, sampling_rate=500, dependency=False, p=1.0, ): self.min_num_segment = mag["min"] self.max_num_segment = mag["max"] self.sampling_rate = sampling_rate self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): sample = sample.copy() C, T = sample.shape num_segment = np.random.randint( self.min_num_segment, self.max_num_segment + 1 ) segment_point = np.sort(np.random.randint(0, T, num_segment + 1)) seg_index = segment_point[ [ int(x) for x in np.linspace(0, len(segment_point) - 1, num_segment + 1) ] ] seg_index = np.concatenate([[0], seg_index, [T]], 0) seg_index = np.random.permutation( [[i, j] for i, j in zip(seg_index[:-1], seg_index[1:])] ) new_sample = [] for start, end in seg_index: new_sample.append(sample[:, start:end]) crop_start = np.random.randint(T - (segment_point[-1] - segment_point[0])) new_sample = np.concatenate(new_sample * 2, axis=1)[ :, crop_start : crop_start + T ] if self.dependency: new_sample = adjust_channel_dependency(new_sample) else: new_sample = sample return new_sample # ConcatWaveSegment class ConcatWaveSegment(object): def __init__( self, mag={"max": 4, "min": 0}, # min_num_segment=0, # max_num_segment=4, sampling_rate=500, dependency=True, p=1.0, ): self.min_num_segment = mag["min"] self.max_num_segment = mag["max"] self.sampling_rate = sampling_rate self.dependency = dependency self.p = p def __call__(self, sample): if self.p > np.random.uniform(0, 1): try: C, T = sample.shape num_segment = np.random.randint( self.min_num_segment, self.max_num_segment + 1 ) segment_point = np.sort(np.random.randint(0, T, num_segment + 1)) seg_point = segment_point[ [ int(x) for x in np.linspace(0, len(segment_point) - 1, num_segment + 1) ] ] index = np.random.randint(0, len(seg_point) - 1, 1)[0] sample_segment = sample[:, seg_point[index] : seg_point[index + 1]] new_sample = np.concatenate( [sample_segment] * (T // len(sample_segment[0]) + 1), 1 )[:, :T] if self.dependency: new_sample = adjust_channel_dependency(new_sample) if new_sample.shape[1] < T: new_sample = sample except Exception: new_sample = sample else: new_sample = sample return new_sample class Flip(object): def __init__(self, mag={"max": 0.5, "min": 0.1}, p=0.1): self.std_max = mag["max"] self.std_min = mag["min"] self.p = p # flip 은 확률 10%로 세팅 def __call__(self, sample): if self.p > np.random.uniform(0, 1): new_sample = sample.copy() if self.std_max != 0: new_sample = -new_sample else: new_sample = sample return new_sample class RandomSpike(object): def __init__(self, mag={"max": 2, "min": 1}, p=1.0, freq=500): self.max_ratio = mag["max"] self.min_ratio = mag["min"] self.p = p self.freq = freq def _visible_segment_for_channel(self, c: int, T: int): """ 채널별 보이는 구간 반환. T=5000 이면 segment_len = 1250: 0~2 -> 0~1250 3~5 -> 1250~2500 6~8 -> 2500~3750 9~11 -> 3750~5000 """ segment_len = T // 4 if c <= 2: # I, II, III return 0, segment_len elif c <= 5: # aVR, aVL, aVF return segment_len, 2 * segment_len elif c <= 8: # V1, V2, V3 return 2 * segment_len, 3 * segment_len else: # V4, V5, V6 return 3 * segment_len, 4 * segment_len def __call__(self, sample): if self.p <= np.random.uniform(0, 1): return sample new_sample = np.copy(sample) C, T = new_sample.shape assert C == 12, f"이 구현은 12리드를 가정합니다. 현재 C={C}" spike_len = int(np.random.uniform(0, self.max_ratio) * 1.2) + 1 n_spikes = self.max_ratio // 2 for _ in range(n_spikes): for c in range(C): seg_start, seg_end = self._visible_segment_for_channel(c, T) # 해당 리드의 보이는 구간 안에서만 시작 위치 랜덤 max_start = seg_end - spike_len start = np.random.randint(seg_start, max_start) # amplitude range_c = (new_sample[c].max() - new_sample[c].min()) / 2 scale = np.random.uniform(0.5, 1.5) sign = np.random.choice([-1.0, 1.0]) spike_amp = sign * range_c * scale new_sample[c, start : start + spike_len] = spike_amp return new_sample import os class DigitizedLeadName(object): def __init__(self, mag={"max": 2, "min": 1}, p=1.0, freq=500): """ place: 1 → upper 템플릿, -1 → under 템플릿 height: 템플릿 전체에 더할 vertical shift (place * height) width_scale: 시간축 scaling factor (1: 그대로, 2: 길이 2배) p: augmentation probability """ mag_to_param_range = { "width_scale_range": { 1: (1, 1.000000001), 2: (0.9, 1.1), 3: (0.85, 1.2), 4: (0.8, 1.3), 5: (0.75, 1.4), 6: (0.7, 1.5), 7: (0.65, 1.6), 8: (0.6, 1.7), 9: (0.55, 1.8), 10: (0.5, 1.9), }, "height_scale_range": { 1: (1, 1.000000001), 2: (0.94, 1.06), 3: (0.88, 1.12), 4: (0.82, 1.18), 5: (0.76, 1.24), 6: (0.70, 1.30), 7: (0.64, 1.36), 8: (0.58, 1.42), 9: (0.52, 1.48), 10: (0.5, 1.54), }, "height_shift_range": { 1: (0, 0.000000001), 2: (0.0, 0.1), 3: (0.0, 0.2), 4: (0.0, 0.3), 5: (0.0, 0.4), 6: (0.0, 0.5), 7: (0.0, 0.6), 8: (0.0, 0.7), 9: (0.0, 0.8), 10: (0.0, 0.9), }, "text_space_range": { 1: (0, 2), 2: (0, 2), 3: (0, 2), 4: (0, 3), 5: (0, 3), 6: (0, 3), 7: (0, 4), 8: (0, 4), 9: (0, 4), 10: (0, 5), }, } self.p = p self.mag = int(mag["max"]) self.height_shift_range = mag_to_param_range["height_shift_range"][self.mag] self.height_scale_range = mag_to_param_range["height_scale_range"][self.mag] self.width_scale_range = mag_to_param_range["width_scale_range"][self.mag] self.text_space_range = mag_to_param_range["text_space_range"][self.mag] self.upper_template, self.under_template = self._load_and_split_templates( "DigitizedLeadNameTemplate.npz" ) self.lead_order = [lead.name for lead in LeadType] # -------------------- 템플릿 로드 -------------------- # def _load_and_split_templates(self, filename): """ .npz에서 upper/under 템플릿을 읽고, 각 lead 템플릿(1D: 값 + NaN)을 NaN 기준으로 글자 단위로 분리한다. 반환: upper_split: {lead_name: [char0, char1, ...]} under_split: {lead_name: [char0, char1, ...]} """ path = os.path.join(os.path.dirname(__file__), "datasets", filename) data = np.load(path, allow_pickle=True) upper_raw = { k: v.astype(np.float32) for k, v in zip(data["upper_keys"], data["upper_values"]) } under_raw = { k: v.astype(np.float32) for k, v in zip(data["under_keys"], data["under_values"]) } def _split_dict(tpl_dict): out = {} for lead_name, tpl in tpl_dict.items(): tpl = np.asarray(tpl, dtype=np.float32) L = len(tpl) chars = [] i = 0 while i < L: # NaN 건너뛰기 if np.isnan(tpl[i]): i += 1 continue # non-NaN 시작 j = i + 1 while j < L and not np.isnan(tpl[j]): j += 1 seg = tpl[i:j] if seg.size > 0: chars.append(seg) i = j out[lead_name] = chars return out upper_split = _split_dict(upper_raw) under_split = _split_dict(under_raw) return upper_split, under_split # -------------------- width scaling (1D, NaN 없음) -------------------- # def _scale_width(self, tpl, scale: float): """ tpl: 1D float32 ndarray scale: >0, 1이면 그대로, 2이면 길이 2배 등 """ L = len(tpl) if L <= 1 or scale == 1.0: return tpl new_L = max(2, int(round(L * scale))) idx_old = (np.arange(new_L) / scale).astype(int) idx_old = np.clip(idx_old, 0, L - 1) return tpl[idx_old] # -------------------- 글자 단위 변형 + 랜덤 간격으로 lead 템플릿 조립 -------------------- # def _build_lead_template(self, char_list, place: int): """ char_list: [char0, char1, ...], 각 char는 non-NaN float32 1D place: 1 or -1 (위/아래 템플릿 방향) """ segments = [] n_chars = len(char_list) for idx, char_tpl in enumerate(char_list): w_scale = np.random.uniform(*self.width_scale_range) h_scale = np.random.uniform(*self.height_scale_range) h_shift = np.random.uniform(*self.height_shift_range) char_scaled = self._scale_width(char_tpl, w_scale) char_scaled = char_scaled * h_scale char_scaled = char_scaled + (place * h_shift) segments.append(char_scaled) # 글자 사이 gap (NaN) if idx < n_chars - 1: gap_len = np.random.randint(*self.text_space_range) segments.append(np.full(gap_len, np.nan, dtype=np.float32)) if not segments: return np.zeros(0, dtype=np.float32) return np.concatenate(segments) # -------------------- 메인 -------------------- # def __call__(self, sample: np.ndarray): if self.p <= np.random.uniform(0, 1): return sample # 여기서부터는 그냥 float32 로 고정 new_sample = np.copy(sample) C, T = new_sample.shape if C < 12: return new_sample seg_len = T // 4 global_start = np.random.randint(0, seg_len) place = 1 if self.mag % 2 == 1 else -1 tpl_source = self.upper_template if place == 1 else self.under_template for lead_idx, lead_name in enumerate(self.lead_order): char_list = tpl_source.get(lead_name) tpl_scaled = self._build_lead_template(char_list, place) L = len(tpl_scaled) group = lead_idx // 3 # 0: I~III, 1: aVR~aVF, 2: V1~V3, 3: V4~V6 base = group * seg_len insert_start = base + global_start seg_end = base + seg_len max_L_in_seg = seg_end - insert_start if L > max_L_in_seg: tpl_seg = tpl_scaled[:max_L_in_seg] else: tpl_seg = tpl_scaled target = new_sample[lead_idx, insert_start : insert_start + len(tpl_seg)] mask = ~np.isnan(tpl_seg) target[mask] = tpl_seg[mask] return new_sample import numpy as np import pandas as pd import torch import logging from torch.utils.data import Dataset from mai.data.util.load import load_mai_data from mai.sigproc.sigproc_composer import SigprocComposer from mai.sigproc.datamodel import SigprocConfig from torchvision import transforms import traceback as tb class BaseDatasetV3(Dataset): def __init__( self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None ): self.table = table.reset_index(drop=True) self.len = len(table) self.stage = stage self.randomness = True if self.stage.upper() == "TRAIN" else False self.label = label self.success_idx = 0 self.dataset_params = dataset_params self.aux_data = self.dataset_params.aux_data self.num_aux = len(self.aux_data) self.wave_type = self.dataset_params.wave_type self.wave_config_list = self.dataset_params.wave_config self.preprocessor = dict() self.sampling_rate = dict() for wave_type in self.wave_type: for wave_config in self.wave_config_list: if wave_config.name == wave_type: preproc_config = wave_config.params.preproc_config preproc_config = SigprocConfig(preproc_config) composer = SigprocComposer( getattr(wave_config.params, "additional_module_path", None) ) self.preprocessor[wave_type] = composer.create(preproc_config) # necessary params: sampling_rate_target for preproc in preproc_config: proc_name, proc_config = preproc if proc_name == "ResampleSignal": self.sampling_rate[ wave_type ] = proc_config.params.sampling_rate_target def __len__(self): return self.len def __getitem__(self, index): try: obj_id, input, label = self._read_signal(index) self.success_idx = index except Exception: logging.getLogger().error( f"Failed to _read_signal in Dataset. Using fallback index {self.success_idx}. {tb.format_exc()}" ) obj_id, input, label = self._read_signal(self.success_idx) (obj_id, offset) = obj_id return (obj_id, offset), input, label def _read_signal(self, index): row = self.table.loc[index] obj_id, mai_json = self._get_mai_json(row) meta, sig = self.preprocessing(mai_json) aux = self._get_aux_data(row, meta) label = torch.tensor(row[self.label], dtype=torch.float32) return obj_id, [sig, aux], label def _get_mai_json(self, row): obj_id = row["objectid"] start, length = row["start"], row["length"] mai_json = load_mai_data(obj_id, self.wave_type, (start, length)) return (obj_id, start), mai_json def preprocessing(self, signal_json, required_lead_type=None): raise NotImplementedError def parse_signal_json(self, signal_json): raise NotImplementedError def _get_aux_data(self, row): return {col: row[col] for col in self.aux_data if col in row} def normalize_aux(self, aux): aux_norm_params = { "age": {"min": 0, "max": 90}, "gender": {"min": -1, "max": 1}, "weight": {"min": 25, "max": 125}, "height": {"min": 120, "max": 200}, } aux_list = list() for k in aux.keys(): if k in ["age", "gender", "weight", "height"]: mn = aux_norm_params[k]["min"] mx = aux_norm_params[k]["max"] aux[k] = (aux[k] - mn) / (mx - mn) aux[k] = np.nan_to_num(aux[k], 0) aux_list.append(aux[k]) else: aux_list.append(aux[k]) return np.array(aux_list) def apply_augmentation(self, sig, aug_params, wave_type="ecg"): if (self.stage == "train") and (aug_params != {}): N, I = aug_params["number"], aug_params["intensity"] # noqa: E741 aug_setup_params = { "mag": {"max": I, "min": I - 1}, "freq": self.sampling_rate[wave_type], "p": aug_params["prob"], "dependency": aug_params["lead_dependency"], } # this import statement must be here. see https://git.medicalai.com:50001/team-ai/solver/solver2/-/issues/273 DAList = aug_mod(self.get_aug_list(aug_params["list"]), aug_setup_params) if len(DAList) > 0: ops = np.random.choice(DAList, N) if len(ops) > 0: augment = transforms.Compose(ops) sig = augment(sig) return sig def get_aug_list(self, aug_params): aug_list = list() for aug_item in aug_params: if isinstance(aug_item, dict): for aug, value in aug_item.items(): if value > np.random.uniform(0, 1): aug_list.append(aug) else: aug_list.append(aug_item) return aug_list import numpy as np import pandas as pd import torch import ast from mai.data.util.misc import LeadType class ECGDatasetV3(BaseDatasetV3): def __init__( self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None ): super(ECGDatasetV3, self).__init__(table, label, stage, dataset_params) def _read_signal(self, index): row = self.table.loc[index] obj_id, mai_json = self._get_mai_json(row) aux_dict = self._get_aux_data(row) required_lead_type = ( ast.literal_eval(row["lead_type"]) if hasattr(row, "lead_type") else LeadType.__dict__["_member_names_"] ) if self.label == "label": label = torch.tensor(row[self.label], dtype=torch.float32) else: # mult-label label = torch.tensor( [row[_label] for _label in self.label], dtype=torch.float32 ) _, ecg, aux = self.preprocessing(mai_json, aux_dict, required_lead_type) return obj_id, [ecg, aux], label def preprocessing( self, mai_json, aux_dict: dict = {}, required_lead_type: list = None ): meta, wave = mai_json sampling_rate_input = wave["ecg"].sampling_rate mv_unit = wave["ecg"].unit waveform_data = wave["ecg"].waveform.data wave_params = self.wave_config_list[0].params if required_lead_type is None: required_lead_type = list(waveform_data.keys()) waveform_dict = { lead_name: np.array(waveform_data[lead_name], dtype=np.float32) * mv_unit for lead_name in required_lead_type if lead_name in waveform_data } ecg_dict, _ = self.preprocessor["ecg"]( waveform_dict, sampling_rate_input, self.randomness ) ecg_array = np.array(list(ecg_dict.values())) ecg_array = self.apply_augmentation(ecg_array, wave_params.augmentation, "ecg") ecg_array = torch.tensor(ecg_array, dtype=torch.float32) aux = self.normalize_aux(aux_dict) aux = torch.tensor(aux, dtype=torch.float32) return meta, ecg_array, aux import ast import hashlib import json import numpy as np import pandas as pd import torch from mai.data.util.misc import LeadType class ECGRecoverDatasetRandomMaskWithRS2V3(ECGDatasetV3): """ECGRecoverDatasetRandomMaskWithRSV3 와 달리, Rhythmstrip(Lead II) 을 13번째 리드로 사용하지 않고 Lead II 자리에 넣어서 return""" def __init__( self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None ): super(ECGRecoverDatasetRandomMaskWithRS2V3, self).__init__( table, label, stage, dataset_params ) def test(self, obj_id=["6116253a31ee975e584d1dad"]): print(obj_id) path = f"/bfai/nfs_export/workspace/share/data/ecg_recover/ptbxl_wo_leadname/{obj_id[0][18:22]}/{obj_id[0]}.json" with open(path, "rb") as digitized_file: digitized = np.array(json.load(digitized_file)) # if len(digitized) > 12: # digitized = digitized[:12] return self.apply_random_mask(digitized, obj_id=obj_id[0]) def _read_signal(self, index): row = self.table.loc[index] obj_id, origin_json = self._get_mai_json(row) aux_dict = self._get_aux_data(row) required_lead_type = ( ast.literal_eval(row["lead_type"]) if hasattr(row, "lead_type") else LeadType.__dict__["_member_names_"] ) _, origin, _ = self.preprocessing(origin_json, aux_dict, required_lead_type) with open(row["digitized_path"], "rb") as digitized_file: digitized = np.array(json.load(digitized_file)) assert len(digitized) == 13 digitized, mask = self.apply_random_mask( digitized, obj_id=obj_id[0], deterministic=self.dataset_params.deterministic_masking, ) digitized = torch.tensor(digitized, dtype=torch.float32) mask = torch.tensor(mask, dtype=torch.float32) return obj_id, [digitized, mask], origin def apply_random_mask( self, digitized: np.ndarray, obj_id: str = "", deterministic: bool = False ): """ digitized: (13, 5000) return: masked_signal: (12, 5000) mask: (12, 5000) (1 = invisible, 0 = visible) """ def _seed_rng(s: str): h = hashlib.sha256(s.encode()).hexdigest() return np.random.default_rng(int(h[:16], 16) % (2**32)) # ------------------------- # RNG 선택 # ------------------------- if deterministic: rng = _seed_rng(obj_id) # -> deterministic masking else: # -> train 일 때 매번 다르게 masking if self.stage == "train": rng = np.random.default_rng() else: rng = _seed_rng(obj_id) fs = 500 group_len = 1250 # 2.5s * fs total_len = 5000 # 10s * fs visible = np.zeros_like(digitized, dtype=np.float16) # each lead masking for g in range(4): vis_len = int(rng.uniform(1.25 * fs, group_len + 1)) max_offset = group_len - vis_len offset = int(rng.uniform(0, max_offset + 1)) start = g * group_len + offset end = start + vis_len for lead in range(g * 3, g * 3 + 3): visible[lead, start:end] = 1 # rhythm strip masking left_mask = int(rng.uniform(0, 1 * fs + 1)) right_mask = int(rng.uniform(0, 1 * fs + 1)) rs_start = left_mask rs_end = total_len - right_mask visible[12, rs_start:rs_end] = 1 masked = digitized * visible mask = 1 - visible target_lead = 1 # Lead II masked[target_lead] = masked[12] mask[target_lead] = mask[12] # ------------------------- # 최종 12-lead만 반환 # ------------------------- masked = masked[:12] mask = mask[:12] return masked, mask