| from scipy import signal |
| import numpy as np |
| import scipy |
| import inspect |
|
|
| from mai.data.util.misc import LeadType |
|
|
|
|
| def adjust_channel_dependency(ecg): |
| ecg[2] = ecg[1] - ecg[0] |
| ecg[3] = -(ecg[1] + ecg[0]) / 2 |
| ecg[4] = ecg[0] - ecg[1] / 2 |
| ecg[5] = ecg[1] - ecg[0] / 2 |
| return ecg |
|
|
|
|
| def get_mag_range(): |
| return { |
| "PowerlineNoise": { |
| 0: 0.0, |
| 1: 0.0062, |
| 2: 0.0116, |
| 3: 0.0168, |
| 4: 0.0222, |
| 5: 0.0278, |
| 6: 0.033, |
| 7: 0.0381, |
| 8: 0.0436, |
| 9: 0.049, |
| 10: 0.0543, |
| }, |
| "BaselineWander": { |
| 0: 0.0, |
| 1: 0.01, |
| 2: 0.022, |
| 3: 0.035, |
| 4: 0.045, |
| 5: 0.06, |
| 6: 0.064, |
| 7: 0.086, |
| 8: 0.09, |
| 9: 0.1, |
| 10: 0.12, |
| }, |
| "BaselineShift": { |
| 0: 0.0, |
| 1: 0.0317, |
| 2: 0.0631, |
| 3: 0.0966, |
| 4: 0.1286, |
| 5: 0.1584, |
| 6: 0.1924, |
| 7: 0.2257, |
| 8: 0.2514, |
| 9: 0.2869, |
| 10: 0.32, |
| }, |
| "EMGNoise": { |
| 0: 0.0, |
| 1: 0.007, |
| 2: 0.013, |
| 3: 0.018, |
| 4: 0.022, |
| 5: 0.027, |
| 6: 0.032, |
| 7: 0.037, |
| 8: 0.042, |
| 9: 0.045, |
| 10: 0.05, |
| }, |
| "RandomCropResize": { |
| 0: 1.0, |
| 1: 1.0, |
| 2: 1.001, |
| 3: 1.001, |
| 4: 1.002, |
| 5: 1.002, |
| 6: 1.003, |
| 7: 1.004, |
| 8: 1.005, |
| 9: 1.007, |
| 10: 1.01, |
| }, |
| "TimeWarp": { |
| 0: 0.0, |
| 1: 0.014, |
| 2: 0.027, |
| 3: 0.041, |
| 4: 0.055, |
| 5: 0.07, |
| 6: 0.086, |
| 7: 0.103, |
| 8: 0.142, |
| 9: 0.202, |
| 10: 0.5, |
| }, |
| "DynamicTimeWarp": { |
| 0: 0.0, |
| 1: 0.146, |
| 2: 0.292, |
| 3: 0.439, |
| 4: 0.585, |
| 5: 0.731, |
| 6: 0.877, |
| 7: 1.0, |
| 8: 1.239, |
| 9: 1.633, |
| 10: 2.0, |
| }, |
| "GaussianSmoothing": { |
| 0: 1.0, |
| 1: 1.5, |
| 2: 1.9, |
| 3: 2.681, |
| 4: 3.526, |
| 5: 3.75, |
| 6: 4.0, |
| 7: 4.786, |
| 8: 5.573, |
| 9: 5.786, |
| 10: 6.0, |
| }, |
| "MagnitudeWarping": { |
| 0: 0.0, |
| 1: 0.02, |
| 2: 0.037, |
| 3: 0.055, |
| 4: 0.071, |
| 5: 0.088, |
| 6: 0.105, |
| 7: 0.122, |
| 8: 0.139, |
| 9: 0.156, |
| 10: 0.172, |
| }, |
| "AngleRotation": { |
| 0: 0.533, |
| 1: 3.742, |
| 2: 6.231, |
| 3: 8.839, |
| 4: 11.26, |
| 5: 14.112, |
| 6: 15.868, |
| 7: 18.057, |
| 8: 20.816, |
| 9: 22.954, |
| 10: 24.0, |
| }, |
| "TimeMask": { |
| 0: 0.0, |
| 1: 0.01, |
| 2: 0.021, |
| 3: 0.037, |
| 4: 0.061, |
| 5: 0.088, |
| 6: 0.124, |
| 7: 0.169, |
| 8: 0.216, |
| 9: 0.279, |
| 10: 0.344, |
| }, |
| "ChannelMask": { |
| 0: 0.0, |
| 1: 0.33, |
| 2: 0.67, |
| 3: 1.0, |
| 4: 1.5, |
| 5: 2.0, |
| 6: 2.25, |
| 7: 2.5, |
| 8: 2.75, |
| 9: 3.0, |
| 10: 3.5, |
| }, |
| "RandomSpectrogramMask": { |
| 0: 0.0, |
| 1: 0.002, |
| 2: 0.005, |
| 3: 0.009, |
| 4: 0.014, |
| 5: 0.02, |
| 6: 0.028, |
| 7: 0.036, |
| 8: 0.046, |
| 9: 0.057, |
| 10: 0.072, |
| }, |
| "FrequencyWarping": { |
| 0: 0.0075, |
| 1: 0.018, |
| 2: 0.0225, |
| 3: 0.024, |
| 4: 0.0285, |
| 5: 0.03, |
| 6: 0.0345, |
| 7: 0.0345, |
| 8: 0.0375, |
| 9: 0.0375, |
| 10: 0.039, |
| }, |
| "LowPassFilter": { |
| 0: 0.0, |
| 1: 0.043, |
| 2: 0.154, |
| 3: 0.271, |
| 4: 0.371, |
| 5: 0.429, |
| 6: 0.484, |
| 7: 0.552, |
| 8: 0.599, |
| 9: 0.636, |
| 10: 0.667, |
| }, |
| "PartialWhiteNoise": { |
| 0: 0.0, |
| 1: 0.019, |
| 2: 0.03, |
| 3: 0.039, |
| 4: 0.047, |
| 5: 0.055, |
| 6: 0.061, |
| 7: 0.068, |
| 8: 0.074, |
| 9: 0.08, |
| 10: 0.086, |
| }, |
| "PermuteWaveSegment": { |
| 0: 0.0, |
| 1: 0.284, |
| 2: 0.568, |
| 3: 0.852, |
| 4: 1.486, |
| 5: 2.1, |
| 6: 2.815, |
| 7: 3.399, |
| 8: 4.45, |
| 9: 5.994, |
| 10: 8.0, |
| }, |
| "ConcatWaveSegment": { |
| 0: 0.0, |
| 1: 1, |
| 2: 1, |
| 3: 2, |
| 4: 2, |
| 5: 3, |
| 6: 3, |
| 7: 4, |
| 8: 4, |
| 9: 5, |
| 10: 6, |
| }, |
| "Flip": { |
| 0: 0.0, |
| 1: 1.0, |
| 2: 1.0, |
| 3: 1.0, |
| 4: 1.0, |
| 5: 1.0, |
| 6: 1.0, |
| 7: 1.0, |
| 8: 1.0, |
| 9: 1.0, |
| 10: 1.0, |
| }, |
| "RandomSpike": { |
| 0: 0, |
| 1: 1, |
| 2: 2, |
| 3: 3, |
| 4: 4, |
| 5: 5, |
| 6: 6, |
| 7: 7, |
| 8: 8, |
| 9: 9, |
| 10: 10, |
| }, |
| "DigitizedLeadName": { |
| 0: 0, |
| 1: 1, |
| 2: 2, |
| 3: 3, |
| 4: 4, |
| 5: 5, |
| 6: 6, |
| 7: 7, |
| 8: 8, |
| 9: 9, |
| 10: 10, |
| }, |
| } |
|
|
|
|
| def aug_mod(aug_list, params): |
| |
| aug_range = get_mag_range() |
| da_list = [] |
| for aug in aug_list: |
| params_aug = dict() |
| for param in params.keys(): |
| if param in inspect.getfullargspec(globals()[aug].__init__)[0]: |
| params_aug[param] = params[param] |
| if aug in list(aug_range.keys()): |
| params_aug["mag"] = { |
| "max": aug_range[aug][int(params["mag"]["max"])], |
| "min": aug_range[aug][max(int(params["mag"]["min"]), 0)], |
| } |
| da_list.append(globals()[aug](**params_aug)) |
| return da_list |
|
|
|
|
| |
| class PowerlineNoise(object): |
| def __init__(self, mag={"max": 0.5, "min": 0}, p=1.0, freq=500, dependency=True): |
| |
| |
| self.max_amplitude = mag["max"] |
| self.min_amplitude = mag["min"] |
| self.freq = freq |
| self.p = p |
| self.dependency = dependency |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample |
| C, T = new_sample.shape |
| |
| amp = np.random.uniform(self.min_amplitude, self.max_amplitude, size=(1, 1)) |
| |
| f = 50 if np.random.uniform(0, 1) > 0.5 else 60 |
| noise = self.apply_powerline_noise(T, f) |
| new_sample = new_sample + noise * amp |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
| def apply_powerline_noise(self, T, f): |
| t = np.linspace(0, T - 1, T) |
| phase = np.random.uniform(0, 2 * 3.14) |
| noise = np.cos(2 * 3.14 * f * (t / self.freq) + phase) |
| return noise |
|
|
|
|
| |
| class BaselineWander(object): |
| def __init__( |
| self, |
| mag={"max": 0.5, "min": 0}, |
| p=1.0, |
| aug_freq={"max": 0.2, "min": 0.01}, |
| k=3, |
| freq=500, |
| dependency=True, |
| ): |
| self.min_amplitude = mag["min"] |
| self.max_amplitude = mag["max"] |
| self.min_freq = aug_freq["min"] |
| self.max_freq = aug_freq["max"] |
| self.k = k |
| self.freq = freq |
| self.p = p |
| self.dependency = dependency |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| amp_channel = np.random.normal(1, 0.5, size=(C, 1)) |
| |
| amp_general = np.random.uniform( |
| self.min_amplitude, self.max_amplitude, size=self.k |
| ) |
| noise = np.zeros(shape=(1, T)) |
| for k in range(self.k): |
| noise += self.apply_baseline_wander(T) * amp_general[k] |
| noise = noise * amp_channel |
| new_sample[:, :] = new_sample[:, :] + noise[:, :] |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
| def apply_baseline_wander(self, T): |
| f = np.random.uniform(self.min_freq, self.max_freq) |
| t = np.linspace(0, T - 1, T) |
| r = np.random.uniform(0, 2 * 3.14) |
| noise = np.cos(2 * 3.14 * f * (t / self.freq) + r) |
| return noise |
|
|
|
|
| |
| class BaselineShift(object): |
| def __init__( |
| self, |
| mag={"max": 0.5, "min": 0}, |
| shift_ratio=0.2, |
| num_segment=1, |
| freq=500, |
| dependency=False, |
| p=1.0, |
| ): |
| self.max_amplitude = mag["max"] |
| self.min_amplitude = mag["min"] |
| self.shift_ratio = shift_ratio |
| self.num_segment = num_segment |
| self.freq = freq |
| self.p = p |
| self.dependency = dependency |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| shift_length = T * self.shift_ratio |
| |
| amp_channel = np.random.choice([1, -1], size=(C, 1)) |
| amp_general = np.random.uniform( |
| self.min_amplitude, self.max_amplitude, size=(1, 1) |
| ) |
| amp = amp_channel * amp_general |
| noise = np.zeros(shape=(C, T)) |
| for i in range(self.num_segment): |
| segment_len = np.random.normal(shift_length, shift_length * 0.2) |
| t0 = int(np.random.uniform(0, T - segment_len)) |
| t = int(t0 + segment_len) |
| |
| noise[:, t0:t] = 1 |
| new_sample = new_sample + noise * amp |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class EMGNoise(object): |
| def __init__( |
| self, |
| mag={"max": 0.5, "min": 0}, |
| |
| dependency=True, |
| p=1.0, |
| ): |
| |
| |
| self.max_amplitude = mag["max"] |
| self.min_amplitude = mag["min"] |
| self.p = p |
| self.dependency = dependency |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| amp = np.random.uniform(self.min_amplitude, self.max_amplitude, size=(C, 1)) |
| |
| noise = np.random.normal(0, 1, [C, T]) |
| |
| new_sample = new_sample + noise * amp |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
|
|
|
|
| class RandomCropResize(object): |
| def __init__( |
| self, |
| mag={"max": 1.2, "min": 1.1}, |
| |
| dependency=False, |
| p=1.0, |
| ): |
| |
| |
| self.min_rate = mag["min"] |
| self.max_rate = mag["max"] |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| try: |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| rate = np.random.uniform(self.min_rate, self.max_rate) |
| rate = 1 / rate if np.random.uniform(0, 1) > 0.5 else rate |
| if self.max_rate > 1: |
| new_sample = np.concatenate([new_sample] * 3, 1) |
| start = np.random.randint(new_sample.shape[1] - int(T * rate)) |
| new_sample = new_sample[:, start : start + int(T * rate)] |
| new_sample = signal.resample(new_sample, T, axis=1) |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| except Exception: |
| new_sample = sample |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class TimeWarp(object): |
| def __init__( |
| self, |
| mag={"max": 20, "min": 0}, |
| |
| dependency=False, |
| p=1.0, |
| ): |
| self.epsilon_max = 10 / np.max([mag["max"], 0.001]) |
| self.epsilon_min = 10 / np.max([mag["min"], 0.001]) |
| self.scale_max = mag["max"] |
| self.scale_min = mag["min"] |
| self.iteration = 1 |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| for _ in range(self.iteration): |
| epsilon = np.random.uniform(self.epsilon_min, self.epsilon_max) |
| scale = np.random.uniform(self.scale_min, self.scale_max) |
| pmf = np.random.normal(loc=0, scale=scale, size=T) |
| pmf = np.cumsum(pmf) |
| pmf = pmf - np.min(pmf) + epsilon |
|
|
| cdf = np.cumsum(pmf) |
| t_new = ( |
| (cdf - cdf[0]) / (cdf[-1] - cdf[0]) * (len(cdf) - 1) |
| ) |
| |
| new_sample = new_sample[:, t_new.astype(int)] |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class DynamicTimeWarp(object): |
| def __init__( |
| self, |
| mag={"max": 3, "min": 3}, |
| freq=500, |
| scale=1, |
| epsilon=10, |
| dependency=True, |
| p=1.0, |
| ): |
| self.num_of_warps_min = mag["min"] |
| self.num_of_warps_max = mag["max"] |
| self.radius = freq |
| self.scale = 0.1 |
| self.epsilon = 0.1 |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| radius = self.radius |
| t_new = np.arange(T) |
| num_of_warps = np.random.randint( |
| self.num_of_warps_min, self.num_of_warps_max + 1 |
| ) |
| for i in range(num_of_warps): |
| section_length = T // num_of_warps |
| point = np.random.randint(section_length * i, section_length * (i + 1)) |
|
|
| warp_from = ( |
| point - radius // 2 |
| if point - radius // 2 > section_length * i |
| else section_length * i |
| ) |
| warp_from = ( |
| warp_from |
| if point + radius // 2 < section_length * (i + 1) |
| else section_length * (i + 1) - radius - 1 |
| ) |
| warp_to = warp_from + radius |
|
|
| pmf = np.random.normal(loc=0, scale=self.scale, size=radius) |
| pmf = np.cumsum(pmf) |
| pmf = pmf - np.min(pmf) + self.epsilon |
| cdf = np.cumsum(pmf) |
| t_new[warp_from:warp_to] = warp_from + (cdf - cdf[0]) / ( |
| cdf[-1] - cdf[0] |
| ) * (len(cdf) - 1) |
|
|
| new_sample = new_sample[:, t_new.astype(int)] |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
|
|
| return new_sample |
|
|
|
|
| |
| class GaussianSmoothing(object): |
| def __init__( |
| self, |
| mag={"max": 15, "min": 3}, |
| p=1.0, |
| |
| |
| dependency=False, |
| ): |
| self.min_window_length = mag["min"] |
| self.max_window_length = mag["max"] |
| |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| window_length = np.random.randint( |
| self.min_window_length, self.max_window_length + 1 |
| ) |
| new_sample = self.apply_gaussian_smoothing(new_sample, window_length) |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
| def apply_gaussian_smoothing(self, wave, window_length): |
| C, T = wave.shape |
| window = scipy.signal.windows.gaussian(window_length, std=window_length // 2) |
| window = window / window.sum() |
| new_wave = [] |
| for i in range(C): |
| new_wave.append(np.convolve(window, wave[i], "same")) |
| new_wave = np.stack(new_wave) |
| return new_wave |
|
|
|
|
| |
| class MagnitudeWarping(object): |
| def __init__(self, mag={"max": 0.5, "min": 0.1}, dependency=True, p=1.0): |
| self.std_max = mag["max"] |
| self.std_min = mag["min"] |
| |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| std = np.random.uniform(self.std_min, self.std_max) |
| C, T = new_sample.shape |
| amplitude = np.random.normal(1, std, size=(C, T)) |
| new_sample = new_sample * amplitude |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class AngleRotation(object): |
| def __init__( |
| self, mag={"max": 45, "min": 0}, method="dower", dependency=True, p=1.0 |
| ): |
| assert method in ["dower", "plsv", "qlsv", "kors"] |
| self.min_rotation = mag["min"] |
| self.max_rotation = mag["max"] |
| |
| self.dependency = dependency |
| self.p = p |
|
|
| if method == "dower": |
| self.trans_inv = np.array( |
| [ |
| [-0.172, -0.073, 0.122, 0.231, 0.239, 0.193, 0.156, -0.009], |
| [0.057, -0.019, -0.106, -0.022, 0.040, 0.048, -0.227, 0.886], |
| [-0.228, -0.310, -0.245, -0.063, 0.054, 0.108, 0.021, 0.102], |
| ] |
| ) |
| elif method == "plsv": |
| self.trans_inv = np.array( |
| [ |
| [-0.266, 0.027, 0.065, 0.131, 0.203, 0.220, 0.370, -0.154], |
| [0.088, -0.088, 0.003, 0.042, 0.047, 0.067, -0.131, 0.717], |
| [-0.319, -0.198, -0.167, -0.099, -0.009, 0.060, 0.184, -0.114], |
| ] |
| ) |
| elif method == "qlsv": |
| self.trans_inv = np.array( |
| [ |
| [-0.147, -0.058, 0.037, 0.139, 0.232, 0.226, 0.199, -0.018], |
| [0.023, -0.085, -0.003, 0.033, 0.060, 0.104, -0.146, 0.503], |
| [-0.184, -0.163, -0.190, -0.119, -0.023, 0.043, 0.085, -0.130], |
| ] |
| ) |
| elif method == "kors": |
| self.trans_inv = np.array( |
| [ |
| [-0.130, 0.050, -0.010, 0.140, 0.060, 0.540, 0.380, -0.070], |
| [0.060, -0.020, -0.050, 0.060, -0.170, 0.130, -0.070, 0.930], |
| [-0.430, -0.060, -0.140, -0.200, -0.110, 0.310, 0.110, -0.230], |
| ] |
| ) |
| self.trans = np.linalg.pinv(self.trans_inv) |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| theta = np.random.randint(self.min_rotation, self.max_rotation + 1, 3) |
| if C == 12: |
| new_sample_8lead = new_sample[[6, 7, 8, 9, 10, 11, 0, 1], :] |
| rot_x = np.array( |
| [ |
| [1, 0, 0], |
| [ |
| 0, |
| np.cos(theta[0] * np.pi / 180), |
| -np.sin(theta[0] * np.pi / 180), |
| ], |
| [ |
| 0, |
| np.sin(theta[0] * np.pi / 180), |
| np.cos(theta[0] * np.pi / 180), |
| ], |
| ] |
| ) |
| rot_y = np.array( |
| [ |
| [ |
| np.cos(theta[1] * np.pi / 180), |
| 0, |
| -np.sin(theta[1] * np.pi / 180), |
| ], |
| [0, 1, 0], |
| [ |
| np.sin(theta[1] * np.pi / 180), |
| 0, |
| np.cos(theta[1] * np.pi / 180), |
| ], |
| ] |
| ) |
| rot_z = np.array( |
| [ |
| [ |
| np.cos(theta[2] * np.pi / 180), |
| -np.sin(theta[2] * np.pi / 180), |
| 0, |
| ], |
| [ |
| np.sin(theta[2] * np.pi / 180), |
| np.cos(theta[2] * np.pi / 180), |
| 0, |
| ], |
| [0, 0, 1], |
| ] |
| ) |
| rot = np.einsum("ab,bc,cd->ad", rot_x, rot_y, rot_z) |
| mtx1 = np.einsum("ab,bc->ac", self.trans, rot) |
| mtx2 = np.einsum("ab,bc->ac", self.trans_inv, new_sample_8lead) |
| new_sample_8lead = np.einsum("ab,bc->ac", mtx1, mtx2) |
| new_sample_12lead = np.zeros([C, T]) |
| new_sample_12lead[[6, 7, 8, 9, 10, 11, 0, 1], :] = new_sample_8lead |
| new_sample = adjust_channel_dependency(new_sample_12lead) |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| new_sample = (np.mean(theta) / 30) * new_sample + ( |
| 30 - np.mean(theta) |
| ) / 30 * sample |
| else: |
| new_sample = sample |
|
|
| return new_sample |
|
|
|
|
| |
| class TimeMask(object): |
| def __init__( |
| self, |
| mag={"max": 0.3, "min": 0.0}, |
| |
| |
| num=1, |
| dependency=True, |
| p=1.0, |
| ): |
| self.min_band_part = mag["max"] |
| self.max_band_part = mag["min"] |
| |
| self.num = num |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| for _ in range(self.num): |
| t0 = np.random.uniform(0, 1 - self.max_band_part) |
| t = np.random.uniform(self.min_band_part, self.max_band_part) |
| mask_from = int(t0 * T) |
| mask_to = int((t0 + t) * T) |
| new_sample[:, mask_from:mask_to] = 0 |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class ChannelMask(object): |
| def __init__( |
| self, |
| mag={"max": 4, "min": 1}, |
| |
| |
| dependency=False, |
| p=1.0, |
| ): |
| self.min_num_channel = mag["min"] |
| self.max_num_channel = mag["max"] |
| |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| num_channel = np.random.randint( |
| self.min_num_channel, self.max_num_channel + 1 |
| ) |
| if num_channel > C: |
| num_channel = C |
| channels = np.random.choice(list(range(C)), num_channel, replace=False) |
| new_sample[channels, :] = 0 |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
|
|
|
|
| class RandomSpectrogramMask(object): |
| def __init__(self, mag={"max": 0.5, "min": 0.2}, freq=500, dependency=False, p=1.0): |
| self.random_mask_prob_min = mag["min"] |
| self.random_mask_prob_max = mag["max"] |
| self.freq = freq |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| _, _, z = scipy.signal.stft( |
| new_sample, fs=self.freq, nperseg=self.freq // 5 |
| ) |
| N, C, T = z.shape |
| mask_prob = np.random.uniform( |
| self.random_mask_prob_min, self.random_mask_prob_max |
| ) |
| mask = np.random.choice( |
| [0, 1], size=(N, C, T), p=[mask_prob, 1 - mask_prob] |
| ) |
| _, new_sample = scipy.signal.istft( |
| z * mask, fs=self.freq, nperseg=self.freq // 5 |
| ) |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| new_sample = new_sample[:, : sample.shape[-1]] |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class FrequencyWarping(object): |
| def __init__(self, mag, scale=0.1, dependency=True, p=1.0): |
| |
| self.scale_max = mag["max"] |
| self.scale_min = mag["min"] |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| z = np.fft.fft(new_sample) |
| C, F = z.shape |
| scale = np.random.uniform(self.scale_min, self.scale_max) |
| pmf = np.random.normal(loc=1, scale=scale, size=F) |
| cdf = np.cumsum(pmf) |
| f_new = (cdf - cdf[0]) / (cdf[-1] - cdf[0]) * (len(cdf) - 1) |
| z_new = z[:, np.round(f_new).astype(int)] |
| new_sample = np.real(np.fft.ifft(z_new)) |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class LowPassFilter(object): |
| def __init__( |
| self, |
| mag={"max": 0.8, "min": 0.5}, |
| |
| |
| dependency=False, |
| p=1.0, |
| ): |
| self.cutoff_ratio_min = mag["min"] |
| self.cutoff_ratio_max = mag["max"] |
| |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| z = np.fft.fft(new_sample) |
| C, F = z.shape |
| mask_prob = np.random.uniform(self.cutoff_ratio_min, self.cutoff_ratio_max) |
| mask = np.ones([C, F]) |
| mask[ |
| :, |
| int((F // 2) - ((F * mask_prob) // 2)) : int( |
| (F // 2) + ((F * mask_prob) // 2) |
| ), |
| ] = 0 |
| new_sample = np.real(np.fft.ifft(z * mask)) |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class PartialWhiteNoise(object): |
| def __init__( |
| self, |
| mag={"max": 0.3, "min": 0.0}, |
| |
| |
| |
| num=1, |
| dependency=True, |
| p=1.0, |
| ): |
| self.min_band_part = mag["min"] |
| self.max_band_part = mag["max"] |
| |
| |
| self.num = num |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| C, T = new_sample.shape |
| for _ in range(self.num): |
| t0 = np.random.uniform(0, 1 - self.max_band_part) |
| t = np.random.uniform(self.min_band_part, self.max_band_part) |
| mask_from = int(t0 * T) |
| mask_to = int(t0 * T) + int(t * T) |
| scale = t * 2.7 |
| new_sample[:, mask_from:mask_to] = ( |
| new_sample[:, mask_from:mask_to] |
| + np.random.normal(0, scale, [C, mask_to - mask_from]) |
| * np.std(new_sample, 1, keepdims=True) ** 0.5 |
| ) |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class PermuteWaveSegment(object): |
| def __init__( |
| self, |
| mag={"max": 4, "min": 0}, |
| |
| |
| sampling_rate=500, |
| dependency=False, |
| p=1.0, |
| ): |
| self.min_num_segment = mag["min"] |
| self.max_num_segment = mag["max"] |
| self.sampling_rate = sampling_rate |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| sample = sample.copy() |
| C, T = sample.shape |
| num_segment = np.random.randint( |
| self.min_num_segment, self.max_num_segment + 1 |
| ) |
| segment_point = np.sort(np.random.randint(0, T, num_segment + 1)) |
| seg_index = segment_point[ |
| [ |
| int(x) |
| for x in np.linspace(0, len(segment_point) - 1, num_segment + 1) |
| ] |
| ] |
| seg_index = np.concatenate([[0], seg_index, [T]], 0) |
| seg_index = np.random.permutation( |
| [[i, j] for i, j in zip(seg_index[:-1], seg_index[1:])] |
| ) |
| new_sample = [] |
| for start, end in seg_index: |
| new_sample.append(sample[:, start:end]) |
| crop_start = np.random.randint(T - (segment_point[-1] - segment_point[0])) |
| new_sample = np.concatenate(new_sample * 2, axis=1)[ |
| :, crop_start : crop_start + T |
| ] |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| |
| class ConcatWaveSegment(object): |
| def __init__( |
| self, |
| mag={"max": 4, "min": 0}, |
| |
| |
| sampling_rate=500, |
| dependency=True, |
| p=1.0, |
| ): |
| self.min_num_segment = mag["min"] |
| self.max_num_segment = mag["max"] |
| self.sampling_rate = sampling_rate |
| self.dependency = dependency |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| try: |
| C, T = sample.shape |
| num_segment = np.random.randint( |
| self.min_num_segment, self.max_num_segment + 1 |
| ) |
| segment_point = np.sort(np.random.randint(0, T, num_segment + 1)) |
| seg_point = segment_point[ |
| [ |
| int(x) |
| for x in np.linspace(0, len(segment_point) - 1, num_segment + 1) |
| ] |
| ] |
| index = np.random.randint(0, len(seg_point) - 1, 1)[0] |
| sample_segment = sample[:, seg_point[index] : seg_point[index + 1]] |
| new_sample = np.concatenate( |
| [sample_segment] * (T // len(sample_segment[0]) + 1), 1 |
| )[:, :T] |
| if self.dependency: |
| new_sample = adjust_channel_dependency(new_sample) |
| if new_sample.shape[1] < T: |
| new_sample = sample |
| except Exception: |
| new_sample = sample |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| class Flip(object): |
| def __init__(self, mag={"max": 0.5, "min": 0.1}, p=0.1): |
| self.std_max = mag["max"] |
| self.std_min = mag["min"] |
| self.p = p |
|
|
| def __call__(self, sample): |
| if self.p > np.random.uniform(0, 1): |
| new_sample = sample.copy() |
| if self.std_max != 0: |
| new_sample = -new_sample |
| else: |
| new_sample = sample |
| return new_sample |
|
|
|
|
| class RandomSpike(object): |
| def __init__(self, mag={"max": 2, "min": 1}, p=1.0, freq=500): |
| self.max_ratio = mag["max"] |
| self.min_ratio = mag["min"] |
| self.p = p |
| self.freq = freq |
|
|
| def _visible_segment_for_channel(self, c: int, T: int): |
| """ |
| 채널별 보이는 구간 반환. |
| T=5000 이면 segment_len = 1250: |
| 0~2 -> 0~1250 |
| 3~5 -> 1250~2500 |
| 6~8 -> 2500~3750 |
| 9~11 -> 3750~5000 |
| """ |
| segment_len = T // 4 |
| if c <= 2: |
| return 0, segment_len |
| elif c <= 5: |
| return segment_len, 2 * segment_len |
| elif c <= 8: |
| return 2 * segment_len, 3 * segment_len |
| else: |
| return 3 * segment_len, 4 * segment_len |
|
|
| def __call__(self, sample): |
| if self.p <= np.random.uniform(0, 1): |
| return sample |
|
|
| new_sample = np.copy(sample) |
| C, T = new_sample.shape |
| assert C == 12, f"이 구현은 12리드를 가정합니다. 현재 C={C}" |
|
|
| spike_len = int(np.random.uniform(0, self.max_ratio) * 1.2) + 1 |
| n_spikes = self.max_ratio // 2 |
|
|
| for _ in range(n_spikes): |
| for c in range(C): |
| seg_start, seg_end = self._visible_segment_for_channel(c, T) |
|
|
| |
| max_start = seg_end - spike_len |
| start = np.random.randint(seg_start, max_start) |
|
|
| |
| range_c = (new_sample[c].max() - new_sample[c].min()) / 2 |
| scale = np.random.uniform(0.5, 1.5) |
| sign = np.random.choice([-1.0, 1.0]) |
| spike_amp = sign * range_c * scale |
|
|
| new_sample[c, start : start + spike_len] = spike_amp |
|
|
| return new_sample |
|
|
|
|
| import os |
|
|
|
|
| class DigitizedLeadName(object): |
| def __init__(self, mag={"max": 2, "min": 1}, p=1.0, freq=500): |
| """ |
| place: 1 → upper 템플릿, -1 → under 템플릿 |
| height: 템플릿 전체에 더할 vertical shift (place * height) |
| width_scale: 시간축 scaling factor (1: 그대로, 2: 길이 2배) |
| p: augmentation probability |
| """ |
|
|
| mag_to_param_range = { |
| "width_scale_range": { |
| 1: (1, 1.000000001), |
| 2: (0.9, 1.1), |
| 3: (0.85, 1.2), |
| 4: (0.8, 1.3), |
| 5: (0.75, 1.4), |
| 6: (0.7, 1.5), |
| 7: (0.65, 1.6), |
| 8: (0.6, 1.7), |
| 9: (0.55, 1.8), |
| 10: (0.5, 1.9), |
| }, |
| "height_scale_range": { |
| 1: (1, 1.000000001), |
| 2: (0.94, 1.06), |
| 3: (0.88, 1.12), |
| 4: (0.82, 1.18), |
| 5: (0.76, 1.24), |
| 6: (0.70, 1.30), |
| 7: (0.64, 1.36), |
| 8: (0.58, 1.42), |
| 9: (0.52, 1.48), |
| 10: (0.5, 1.54), |
| }, |
| "height_shift_range": { |
| 1: (0, 0.000000001), |
| 2: (0.0, 0.1), |
| 3: (0.0, 0.2), |
| 4: (0.0, 0.3), |
| 5: (0.0, 0.4), |
| 6: (0.0, 0.5), |
| 7: (0.0, 0.6), |
| 8: (0.0, 0.7), |
| 9: (0.0, 0.8), |
| 10: (0.0, 0.9), |
| }, |
| "text_space_range": { |
| 1: (0, 2), |
| 2: (0, 2), |
| 3: (0, 2), |
| 4: (0, 3), |
| 5: (0, 3), |
| 6: (0, 3), |
| 7: (0, 4), |
| 8: (0, 4), |
| 9: (0, 4), |
| 10: (0, 5), |
| }, |
| } |
|
|
| self.p = p |
| self.mag = int(mag["max"]) |
| self.height_shift_range = mag_to_param_range["height_shift_range"][self.mag] |
| self.height_scale_range = mag_to_param_range["height_scale_range"][self.mag] |
| self.width_scale_range = mag_to_param_range["width_scale_range"][self.mag] |
| self.text_space_range = mag_to_param_range["text_space_range"][self.mag] |
|
|
| self.upper_template, self.under_template = self._load_and_split_templates( |
| "DigitizedLeadNameTemplate.npz" |
| ) |
|
|
| self.lead_order = [lead.name for lead in LeadType] |
|
|
| |
| def _load_and_split_templates(self, filename): |
| """ |
| .npz에서 upper/under 템플릿을 읽고, |
| 각 lead 템플릿(1D: 값 + NaN)을 NaN 기준으로 글자 단위로 분리한다. |
| |
| 반환: |
| upper_split: {lead_name: [char0, char1, ...]} |
| under_split: {lead_name: [char0, char1, ...]} |
| """ |
| path = os.path.join(os.path.dirname(__file__), "datasets", filename) |
| data = np.load(path, allow_pickle=True) |
|
|
| upper_raw = { |
| k: v.astype(np.float32) |
| for k, v in zip(data["upper_keys"], data["upper_values"]) |
| } |
| under_raw = { |
| k: v.astype(np.float32) |
| for k, v in zip(data["under_keys"], data["under_values"]) |
| } |
|
|
| def _split_dict(tpl_dict): |
| out = {} |
| for lead_name, tpl in tpl_dict.items(): |
| tpl = np.asarray(tpl, dtype=np.float32) |
| L = len(tpl) |
| chars = [] |
| i = 0 |
| while i < L: |
| |
| if np.isnan(tpl[i]): |
| i += 1 |
| continue |
| |
| j = i + 1 |
| while j < L and not np.isnan(tpl[j]): |
| j += 1 |
| seg = tpl[i:j] |
| if seg.size > 0: |
| chars.append(seg) |
| i = j |
| out[lead_name] = chars |
| return out |
|
|
| upper_split = _split_dict(upper_raw) |
| under_split = _split_dict(under_raw) |
| return upper_split, under_split |
|
|
| |
| def _scale_width(self, tpl, scale: float): |
| """ |
| tpl: 1D float32 ndarray |
| scale: >0, 1이면 그대로, 2이면 길이 2배 등 |
| """ |
| L = len(tpl) |
| if L <= 1 or scale == 1.0: |
| return tpl |
|
|
| new_L = max(2, int(round(L * scale))) |
| idx_old = (np.arange(new_L) / scale).astype(int) |
| idx_old = np.clip(idx_old, 0, L - 1) |
| return tpl[idx_old] |
|
|
| |
| def _build_lead_template(self, char_list, place: int): |
| """ |
| char_list: [char0, char1, ...], 각 char는 non-NaN float32 1D |
| place: 1 or -1 (위/아래 템플릿 방향) |
| """ |
| segments = [] |
| n_chars = len(char_list) |
|
|
| for idx, char_tpl in enumerate(char_list): |
| w_scale = np.random.uniform(*self.width_scale_range) |
| h_scale = np.random.uniform(*self.height_scale_range) |
| h_shift = np.random.uniform(*self.height_shift_range) |
|
|
| char_scaled = self._scale_width(char_tpl, w_scale) |
| char_scaled = char_scaled * h_scale |
| char_scaled = char_scaled + (place * h_shift) |
|
|
| segments.append(char_scaled) |
|
|
| |
| if idx < n_chars - 1: |
| gap_len = np.random.randint(*self.text_space_range) |
| segments.append(np.full(gap_len, np.nan, dtype=np.float32)) |
|
|
| if not segments: |
| return np.zeros(0, dtype=np.float32) |
|
|
| return np.concatenate(segments) |
|
|
| |
| def __call__(self, sample: np.ndarray): |
| if self.p <= np.random.uniform(0, 1): |
| return sample |
|
|
| |
| new_sample = np.copy(sample) |
| C, T = new_sample.shape |
|
|
| if C < 12: |
| return new_sample |
|
|
| seg_len = T // 4 |
| global_start = np.random.randint(0, seg_len) |
|
|
| place = 1 if self.mag % 2 == 1 else -1 |
| tpl_source = self.upper_template if place == 1 else self.under_template |
|
|
| for lead_idx, lead_name in enumerate(self.lead_order): |
| char_list = tpl_source.get(lead_name) |
|
|
| tpl_scaled = self._build_lead_template(char_list, place) |
| L = len(tpl_scaled) |
|
|
| group = lead_idx // 3 |
| base = group * seg_len |
| insert_start = base + global_start |
| seg_end = base + seg_len |
|
|
| max_L_in_seg = seg_end - insert_start |
| if L > max_L_in_seg: |
| tpl_seg = tpl_scaled[:max_L_in_seg] |
| else: |
| tpl_seg = tpl_scaled |
|
|
| target = new_sample[lead_idx, insert_start : insert_start + len(tpl_seg)] |
| mask = ~np.isnan(tpl_seg) |
| target[mask] = tpl_seg[mask] |
|
|
| return new_sample |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| import logging |
| from torch.utils.data import Dataset |
| from mai.data.util.load import load_mai_data |
| from mai.sigproc.sigproc_composer import SigprocComposer |
| from mai.sigproc.datamodel import SigprocConfig |
| from torchvision import transforms |
| import traceback as tb |
|
|
|
|
| class BaseDatasetV3(Dataset): |
| def __init__( |
| self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None |
| ): |
| self.table = table.reset_index(drop=True) |
| self.len = len(table) |
| self.stage = stage |
| self.randomness = True if self.stage.upper() == "TRAIN" else False |
| self.label = label |
| self.success_idx = 0 |
| self.dataset_params = dataset_params |
|
|
| self.aux_data = self.dataset_params.aux_data |
| self.num_aux = len(self.aux_data) |
| self.wave_type = self.dataset_params.wave_type |
| self.wave_config_list = self.dataset_params.wave_config |
| self.preprocessor = dict() |
| self.sampling_rate = dict() |
| for wave_type in self.wave_type: |
| for wave_config in self.wave_config_list: |
| if wave_config.name == wave_type: |
| preproc_config = wave_config.params.preproc_config |
| preproc_config = SigprocConfig(preproc_config) |
| composer = SigprocComposer( |
| getattr(wave_config.params, "additional_module_path", None) |
| ) |
| self.preprocessor[wave_type] = composer.create(preproc_config) |
| |
| for preproc in preproc_config: |
| proc_name, proc_config = preproc |
| if proc_name == "ResampleSignal": |
| self.sampling_rate[ |
| wave_type |
| ] = proc_config.params.sampling_rate_target |
|
|
| def __len__(self): |
| return self.len |
|
|
| def __getitem__(self, index): |
| try: |
| obj_id, input, label = self._read_signal(index) |
| self.success_idx = index |
| except Exception: |
| logging.getLogger().error( |
| f"Failed to _read_signal in Dataset. Using fallback index {self.success_idx}. {tb.format_exc()}" |
| ) |
| obj_id, input, label = self._read_signal(self.success_idx) |
| (obj_id, offset) = obj_id |
| return (obj_id, offset), input, label |
|
|
| def _read_signal(self, index): |
| row = self.table.loc[index] |
| obj_id, mai_json = self._get_mai_json(row) |
| meta, sig = self.preprocessing(mai_json) |
| aux = self._get_aux_data(row, meta) |
| label = torch.tensor(row[self.label], dtype=torch.float32) |
|
|
| return obj_id, [sig, aux], label |
|
|
| def _get_mai_json(self, row): |
| obj_id = row["objectid"] |
| start, length = row["start"], row["length"] |
| mai_json = load_mai_data(obj_id, self.wave_type, (start, length)) |
| return (obj_id, start), mai_json |
|
|
| def preprocessing(self, signal_json, required_lead_type=None): |
| raise NotImplementedError |
|
|
| def parse_signal_json(self, signal_json): |
| raise NotImplementedError |
|
|
| def _get_aux_data(self, row): |
| return {col: row[col] for col in self.aux_data if col in row} |
|
|
| def normalize_aux(self, aux): |
| aux_norm_params = { |
| "age": {"min": 0, "max": 90}, |
| "gender": {"min": -1, "max": 1}, |
| "weight": {"min": 25, "max": 125}, |
| "height": {"min": 120, "max": 200}, |
| } |
| aux_list = list() |
| for k in aux.keys(): |
| if k in ["age", "gender", "weight", "height"]: |
| mn = aux_norm_params[k]["min"] |
| mx = aux_norm_params[k]["max"] |
| aux[k] = (aux[k] - mn) / (mx - mn) |
| aux[k] = np.nan_to_num(aux[k], 0) |
| aux_list.append(aux[k]) |
| else: |
| aux_list.append(aux[k]) |
| return np.array(aux_list) |
|
|
| def apply_augmentation(self, sig, aug_params, wave_type="ecg"): |
| if (self.stage == "train") and (aug_params != {}): |
| N, I = aug_params["number"], aug_params["intensity"] |
| aug_setup_params = { |
| "mag": {"max": I, "min": I - 1}, |
| "freq": self.sampling_rate[wave_type], |
| "p": aug_params["prob"], |
| "dependency": aug_params["lead_dependency"], |
| } |
| |
|
|
| DAList = aug_mod(self.get_aug_list(aug_params["list"]), aug_setup_params) |
| if len(DAList) > 0: |
| ops = np.random.choice(DAList, N) |
| if len(ops) > 0: |
| augment = transforms.Compose(ops) |
| sig = augment(sig) |
| return sig |
|
|
| def get_aug_list(self, aug_params): |
| aug_list = list() |
| for aug_item in aug_params: |
| if isinstance(aug_item, dict): |
| for aug, value in aug_item.items(): |
| if value > np.random.uniform(0, 1): |
| aug_list.append(aug) |
| else: |
| aug_list.append(aug_item) |
| return aug_list |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| import ast |
| from mai.data.util.misc import LeadType |
|
|
|
|
| class ECGDatasetV3(BaseDatasetV3): |
| def __init__( |
| self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None |
| ): |
| super(ECGDatasetV3, self).__init__(table, label, stage, dataset_params) |
|
|
| def _read_signal(self, index): |
| row = self.table.loc[index] |
| obj_id, mai_json = self._get_mai_json(row) |
| aux_dict = self._get_aux_data(row) |
|
|
| required_lead_type = ( |
| ast.literal_eval(row["lead_type"]) |
| if hasattr(row, "lead_type") |
| else LeadType.__dict__["_member_names_"] |
| ) |
|
|
| if self.label == "label": |
| label = torch.tensor(row[self.label], dtype=torch.float32) |
| else: |
| label = torch.tensor( |
| [row[_label] for _label in self.label], dtype=torch.float32 |
| ) |
|
|
| _, ecg, aux = self.preprocessing(mai_json, aux_dict, required_lead_type) |
|
|
| return obj_id, [ecg, aux], label |
|
|
| def preprocessing( |
| self, mai_json, aux_dict: dict = {}, required_lead_type: list = None |
| ): |
| meta, wave = mai_json |
|
|
| sampling_rate_input = wave["ecg"].sampling_rate |
| mv_unit = wave["ecg"].unit |
| waveform_data = wave["ecg"].waveform.data |
| wave_params = self.wave_config_list[0].params |
|
|
| if required_lead_type is None: |
| required_lead_type = list(waveform_data.keys()) |
|
|
| waveform_dict = { |
| lead_name: np.array(waveform_data[lead_name], dtype=np.float32) * mv_unit |
| for lead_name in required_lead_type if lead_name in waveform_data |
| } |
| ecg_dict, _ = self.preprocessor["ecg"]( |
| waveform_dict, sampling_rate_input, self.randomness |
| ) |
| ecg_array = np.array(list(ecg_dict.values())) |
| ecg_array = self.apply_augmentation(ecg_array, wave_params.augmentation, "ecg") |
| ecg_array = torch.tensor(ecg_array, dtype=torch.float32) |
|
|
| aux = self.normalize_aux(aux_dict) |
| aux = torch.tensor(aux, dtype=torch.float32) |
|
|
| return meta, ecg_array, aux |
|
|
| import ast |
| import hashlib |
| import json |
| import numpy as np |
| import pandas as pd |
| import torch |
| from mai.data.util.misc import LeadType |
|
|
|
|
| class ECGRecoverDatasetRandomMaskWithRS2V3(ECGDatasetV3): |
| """ECGRecoverDatasetRandomMaskWithRSV3 와 달리, Rhythmstrip(Lead II) 을 13번째 리드로 사용하지 않고 Lead II 자리에 넣어서 return""" |
|
|
| def __init__( |
| self, table=pd.DataFrame([]), label="label", stage="test", dataset_params=None |
| ): |
| super(ECGRecoverDatasetRandomMaskWithRS2V3, self).__init__( |
| table, label, stage, dataset_params |
| ) |
|
|
| def test(self, obj_id=["6116253a31ee975e584d1dad"]): |
| print(obj_id) |
| path = f"/bfai/nfs_export/workspace/share/data/ecg_recover/ptbxl_wo_leadname/{obj_id[0][18:22]}/{obj_id[0]}.json" |
| with open(path, "rb") as digitized_file: |
| digitized = np.array(json.load(digitized_file)) |
| |
| |
|
|
| return self.apply_random_mask(digitized, obj_id=obj_id[0]) |
|
|
| def _read_signal(self, index): |
| row = self.table.loc[index] |
| obj_id, origin_json = self._get_mai_json(row) |
| aux_dict = self._get_aux_data(row) |
|
|
| required_lead_type = ( |
| ast.literal_eval(row["lead_type"]) |
| if hasattr(row, "lead_type") |
| else LeadType.__dict__["_member_names_"] |
| ) |
|
|
| _, origin, _ = self.preprocessing(origin_json, aux_dict, required_lead_type) |
|
|
| with open(row["digitized_path"], "rb") as digitized_file: |
| digitized = np.array(json.load(digitized_file)) |
| assert len(digitized) == 13 |
|
|
| digitized, mask = self.apply_random_mask( |
| digitized, |
| obj_id=obj_id[0], |
| deterministic=self.dataset_params.deterministic_masking, |
| ) |
| digitized = torch.tensor(digitized, dtype=torch.float32) |
| mask = torch.tensor(mask, dtype=torch.float32) |
|
|
| return obj_id, [digitized, mask], origin |
|
|
| def apply_random_mask( |
| self, digitized: np.ndarray, obj_id: str = "", deterministic: bool = False |
| ): |
| """ |
| digitized: (13, 5000) |
| return: |
| masked_signal: (12, 5000) |
| mask: (12, 5000) (1 = invisible, 0 = visible) |
| """ |
|
|
| def _seed_rng(s: str): |
| h = hashlib.sha256(s.encode()).hexdigest() |
| return np.random.default_rng(int(h[:16], 16) % (2**32)) |
|
|
| |
| |
| |
| if deterministic: |
| rng = _seed_rng(obj_id) |
| else: |
| |
| if self.stage == "train": |
| rng = np.random.default_rng() |
| else: |
| rng = _seed_rng(obj_id) |
|
|
| fs = 500 |
| group_len = 1250 |
| total_len = 5000 |
|
|
| visible = np.zeros_like(digitized, dtype=np.float16) |
|
|
| |
| for g in range(4): |
| vis_len = int(rng.uniform(1.25 * fs, group_len + 1)) |
|
|
| max_offset = group_len - vis_len |
| offset = int(rng.uniform(0, max_offset + 1)) |
|
|
| start = g * group_len + offset |
| end = start + vis_len |
|
|
| for lead in range(g * 3, g * 3 + 3): |
| visible[lead, start:end] = 1 |
|
|
| |
| left_mask = int(rng.uniform(0, 1 * fs + 1)) |
| right_mask = int(rng.uniform(0, 1 * fs + 1)) |
|
|
| rs_start = left_mask |
| rs_end = total_len - right_mask |
| visible[12, rs_start:rs_end] = 1 |
|
|
| masked = digitized * visible |
| mask = 1 - visible |
|
|
| target_lead = 1 |
| masked[target_lead] = masked[12] |
| mask[target_lead] = mask[12] |
|
|
| |
| |
| |
| masked = masked[:12] |
| mask = mask[:12] |
|
|
| return masked, mask |
|
|