| |
| |
| |
|
|
| import numpy as np |
| from tqdm import tqdm |
|
|
|
|
|
|
|
|
| def tps(x, y, patch_len=0, stride=0, shuffle_rate=0.0): |
| """ |
| Temporal Patch Shuffle (TPS) augmentation for time series classification. |
| |
| Parameters: |
| ----------- |
| x : numpy.ndarray |
| Input time series data of shape (n_samples, timesteps, n_features) |
| patch_len : int |
| Length of each patch |
| stride : int |
| Stride between patches |
| shuffle_rate : float |
| Proportion of patches to shuffle (between 0 and 1) |
| |
| Returns: |
| -------- |
| numpy.ndarray |
| Augmented time series data with same shape as input |
| """ |
| n_samples, T, n_features = x.shape |
| |
| ret = np.zeros_like(x) |
| |
| |
| total_patches = (T - patch_len + stride - 1) // stride + 1 |
| total_len = (total_patches - 1) * stride + patch_len |
| padding_needed = total_len - T |
| |
| |
| for i in range(n_samples): |
| |
| sample = x[i] |
| |
| |
| if padding_needed > 0: |
| padded_sample = np.pad(sample, ((0, padding_needed), (0, 0)), mode='edge') |
| T_padded = T + padding_needed |
| else: |
| padded_sample = sample |
| T_padded = T |
| |
| num_patches = ((T_padded - patch_len) // stride) + 1 |
| |
| |
| patches = np.zeros((num_patches, patch_len, n_features)) |
| for j in range(num_patches): |
| start = j * stride |
| patches[j] = padded_sample[start:start + patch_len] |
| |
| |
| importance_scores = np.var(patches, axis=(1, 2)) |
| |
| |
| num_to_shuffle = int(num_patches * shuffle_rate) |
| |
| if num_to_shuffle > 0: |
| |
| shuffle_indices = np.argsort(importance_scores)[:num_to_shuffle] |
| |
| |
| patches_to_shuffle = patches[shuffle_indices].copy() |
| shuffled_order = np.random.permutation(num_to_shuffle) |
| |
| for idx, new_idx in enumerate(shuffled_order): |
| patch_idx = shuffle_indices[idx] |
| new_patch = patches_to_shuffle[new_idx] |
| patches[patch_idx] = new_patch |
| |
| |
| reconstructed = np.zeros((T_padded, n_features)) |
| counts = np.zeros((T_padded, n_features)) |
| |
| for j in range(num_patches): |
| start = j * stride |
| end = start + patch_len |
| reconstructed[start:end] += patches[j] |
| counts[start:end] += 1 |
| |
| |
| |
| mask = counts == 0 |
| if np.any(mask): |
| |
| for feat in range(n_features): |
| feat_mask = mask[:, feat] |
| if np.any(feat_mask): |
| |
| zero_indices = np.where(feat_mask)[0] |
| nonzero_indices = np.where(~feat_mask)[0] |
| |
| if len(nonzero_indices) > 0: |
| |
| for zero_idx in zero_indices: |
| nearest_idx = nonzero_indices[np.argmin(np.abs(nonzero_indices - zero_idx))] |
| reconstructed[zero_idx, feat] = reconstructed[nearest_idx, feat] |
| counts[zero_idx, feat] = 1 |
| |
| |
| counts[counts == 0] = 1 |
| reconstructed = reconstructed / counts |
| |
| |
| ret[i] = reconstructed[:T] |
| |
| return ret |
|
|
| def jitter(x, sigma=0.03): |
| |
| return x + np.random.normal(loc=0., scale=sigma, size=x.shape) |
|
|
| def scaling(x, sigma=0.1): |
| |
| factor = np.random.normal(loc=1., scale=sigma, size=(x.shape[0],x.shape[2])) |
| return np.multiply(x, factor[:,np.newaxis,:]) |
|
|
| def rotation(x): |
| flip = np.random.choice([-1, 1], size=(x.shape[0],x.shape[2])) |
| rotate_axis = np.arange(x.shape[2]) |
| np.random.shuffle(rotate_axis) |
| return flip[:,np.newaxis,:] * x[:,:,rotate_axis] |
|
|
|
|
|
|
| def magnitude_warp(x, sigma=0.2, knot=4): |
| from scipy.interpolate import CubicSpline |
| orig_steps = np.arange(x.shape[1]) |
| |
| random_warps = np.random.normal(loc=1.0, scale=sigma, size=(x.shape[0], knot+2, x.shape[2])) |
| warp_steps = (np.ones((x.shape[2],1))*(np.linspace(0, x.shape[1]-1., num=knot+2))).T |
| ret = np.zeros_like(x) |
| for i, pat in enumerate(x): |
| warper = np.array([CubicSpline(warp_steps[:,dim], random_warps[i,:,dim])(orig_steps) for dim in range(x.shape[2])]).T |
| ret[i] = pat * warper |
|
|
| return ret |
|
|
| def time_warp(x, sigma=0.2, knot=4): |
| from scipy.interpolate import CubicSpline |
| orig_steps = np.arange(x.shape[1]) |
| |
| random_warps = np.random.normal(loc=1.0, scale=sigma, size=(x.shape[0], knot+2, x.shape[2])) |
| warp_steps = (np.ones((x.shape[2],1))*(np.linspace(0, x.shape[1]-1., num=knot+2))).T |
| |
| ret = np.zeros_like(x) |
| for i, pat in enumerate(x): |
| for dim in range(x.shape[2]): |
| time_warp = CubicSpline(warp_steps[:,dim], warp_steps[:,dim] * random_warps[i,:,dim])(orig_steps) |
| scale = (x.shape[1]-1)/time_warp[-1] |
| ret[i,:,dim] = np.interp(orig_steps, np.clip(scale*time_warp, 0, x.shape[1]-1), pat[:,dim]).T |
| return ret |
|
|
| def window_slice(x, reduce_ratio=0.9): |
| |
| target_len = np.ceil(reduce_ratio*x.shape[1]).astype(int) |
| if target_len >= x.shape[1]: |
| return x |
| starts = np.random.randint(low=0, high=x.shape[1]-target_len, size=(x.shape[0])).astype(int) |
| ends = (target_len + starts).astype(int) |
| |
| ret = np.zeros_like(x) |
| for i, pat in enumerate(x): |
| for dim in range(x.shape[2]): |
| ret[i,:,dim] = np.interp(np.linspace(0, target_len, num=x.shape[1]), np.arange(target_len), pat[starts[i]:ends[i],dim]).T |
| return ret |
|
|
| def permutation(x, max_segments=5, seg_mode="equal"): |
| orig_steps = np.arange(x.shape[1]) |
| num_segs = np.random.randint(1, max_segments, size=(x.shape[0])) |
|
|
| ret = np.zeros_like(x) |
| for i, pat in enumerate(x): |
| if num_segs[i] > 1: |
| if seg_mode == "random": |
| |
| available_points = x.shape[1] - 2 |
| needed_points = num_segs[i] - 1 |
| |
| |
| if available_points <= 0: |
| |
| splits = np.array_split(orig_steps, num_segs[i]) |
| elif needed_points > available_points: |
| |
| actual_segs = min(available_points + 1, num_segs[i]) |
| splits = np.array_split(orig_steps, actual_segs) |
| else: |
| |
| split_points = np.random.choice(available_points, needed_points, replace=False) |
| split_points.sort() |
| splits = np.split(orig_steps, split_points) |
| else: |
| splits = np.array_split(orig_steps, num_segs[i]) |
|
|
| |
| if len(splits) > 1: |
| perm = np.random.permutation(len(splits)) |
| warp = np.concatenate([splits[j] for j in perm]).ravel() |
| ret[i] = pat[warp] |
| else: |
| ret[i] = pat |
| else: |
| ret[i] = pat |
| return ret |
|
|
| |
| def window_warp(x, window_ratio=0.1, scales=[0.5, 2.]): |
| |
| warp_scales = np.random.choice(scales, x.shape[0]) |
| warp_size = np.ceil(window_ratio*x.shape[1]).astype(int) |
| |
| |
| warp_size = max(1, warp_size) |
| window_steps = np.arange(warp_size) |
| |
| ret = np.zeros_like(x) |
| |
| for i, pat in enumerate(x): |
| |
| if x.shape[1] <= warp_size + 2: |
| |
| ret[i] = pat |
| continue |
| |
| |
| try: |
| window_start = np.random.randint(low=1, high=x.shape[1]-warp_size-1) |
| except ValueError: |
| |
| window_start = 1 |
| |
| window_end = window_start + warp_size |
| |
| for dim in range(x.shape[2]): |
| start_seg = pat[:window_start, dim] |
| window_seg = np.interp( |
| np.linspace(0, warp_size-1, num=int(warp_size*warp_scales[i])), |
| window_steps, |
| pat[window_start:window_end, dim] |
| ) |
| end_seg = pat[window_end:, dim] |
| warped = np.concatenate((start_seg, window_seg, end_seg)) |
| ret[i, :, dim] = np.interp( |
| np.arange(x.shape[1]), |
| np.linspace(0, x.shape[1]-1., num=warped.size), |
| warped |
| ).T |
| |
| return ret |
|
|
| def spawner(x, labels, sigma=0.05, verbose=0): |
| |
| |
| |
| |
| import dtw as dtw |
| |
| |
| if x.shape[1] <= 2: |
| if verbose > -1: |
| print("Warning: Time series too short for spawner augmentation") |
| return x |
| |
| |
| random_points = np.zeros(x.shape[0], dtype=int) |
| for i in range(x.shape[0]): |
| try: |
| random_points[i] = np.random.randint(low=1, high=x.shape[1]-1) |
| except ValueError: |
| |
| random_points[i] = 1 |
| |
| window = np.ceil(x.shape[1] / 10.).astype(int) |
| window = max(1, window) |
| |
| orig_steps = np.arange(x.shape[1]) |
| l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels |
| |
| ret = np.zeros_like(x) |
| for i, pat in enumerate(tqdm(x) if 'tqdm' in globals() else x): |
| |
| choices = np.delete(np.arange(x.shape[0]), i) |
| |
| choices = np.where(l[choices] == l[i])[0] |
| if choices.size > 0: |
| random_sample = x[np.random.choice(choices)] |
| |
| |
| try: |
| |
| random_point = random_points[i] |
| if random_point <= 0: |
| random_point = 1 |
| if random_point >= x.shape[1]: |
| random_point = x.shape[1] - 1 |
| |
| |
| if window >= min(random_point, pat.shape[0] - random_point): |
| |
| adjusted_window = max(1, min(random_point, pat.shape[0] - random_point) - 1) |
| if verbose > -1: |
| print(f"Warning: Adjusting window from {window} to {adjusted_window}") |
| window = adjusted_window |
| |
| path1 = dtw.dtw(pat[:random_point], random_sample[:random_point], |
| dtw.RETURN_PATH, slope_constraint="symmetric", window=window) |
| |
| path2 = dtw.dtw(pat[random_point:], random_sample[random_point:], |
| dtw.RETURN_PATH, slope_constraint="symmetric", window=window) |
| |
| combined = np.concatenate((np.vstack(path1), np.vstack(path2+random_point)), axis=1) |
| |
| if verbose: |
| print(random_point) |
| dtw_value, cost, DTW_map, path = dtw.dtw(pat, random_sample, |
| return_flag=dtw.RETURN_ALL, |
| slope_constraint="symmetric", |
| window=window) |
| dtw.draw_graph1d(cost, DTW_map, path, pat, random_sample) |
| dtw.draw_graph1d(cost, DTW_map, combined, pat, random_sample) |
| |
| mean = np.mean([pat[combined[0]], random_sample[combined[1]]], axis=0) |
| |
| |
| if mean.shape[0] > 0: |
| for dim in range(x.shape[2]): |
| ret[i,:,dim] = np.interp(orig_steps, |
| np.linspace(0, x.shape[1]-1., num=mean.shape[0]), |
| mean[:,dim]).T |
| else: |
| if verbose > -1: |
| print("Warning: DTW produced empty path, skipping augmentation") |
| ret[i,:] = pat |
| |
| except Exception as e: |
| if verbose > -1: |
| print(f"Error in DTW computation: {e}") |
| ret[i,:] = pat |
| else: |
| if verbose > -1: |
| print(f"There is only one pattern of class {l[i]}, skipping pattern average") |
| ret[i,:] = pat |
| |
| |
| try: |
| return jitter(ret, sigma=sigma) |
| except: |
| if verbose > -1: |
| print("Warning: jitter function failed or not found, returning unjittered data") |
| return ret |
|
|
| def wdba(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True, verbose=0): |
| |
| |
| |
| |
| import dtw as dtw |
| |
| if use_window: |
| window = np.ceil(x.shape[1] / 10.).astype(int) |
| else: |
| window = None |
| orig_steps = np.arange(x.shape[1]) |
| l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels |
| |
| ret = np.zeros_like(x) |
| for i in tqdm(range(ret.shape[0])): |
| |
| choices = np.where(l == l[i])[0] |
| if choices.size > 0: |
| |
| k = min(choices.size, batch_size) |
| random_prototypes = x[np.random.choice(choices, k, replace=False)] |
| |
| |
| dtw_matrix = np.zeros((k, k)) |
| for p, prototype in enumerate(random_prototypes): |
| for s, sample in enumerate(random_prototypes): |
| if p == s: |
| dtw_matrix[p, s] = 0. |
| else: |
| dtw_matrix[p, s] = dtw.dtw(prototype, sample, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window) |
| |
| |
| medoid_id = np.argsort(np.sum(dtw_matrix, axis=1))[0] |
| nearest_order = np.argsort(dtw_matrix[medoid_id]) |
| medoid_pattern = random_prototypes[medoid_id] |
| |
| |
| average_pattern = np.zeros_like(medoid_pattern) |
| weighted_sums = np.zeros((medoid_pattern.shape[0])) |
| for nid in nearest_order: |
| if nid == medoid_id or dtw_matrix[medoid_id, nearest_order[1]] == 0.: |
| average_pattern += medoid_pattern |
| weighted_sums += np.ones_like(weighted_sums) |
| else: |
| path = dtw.dtw(medoid_pattern, random_prototypes[nid], dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window) |
| dtw_value = dtw_matrix[medoid_id, nid] |
| warped = random_prototypes[nid, path[1]] |
| weight = np.exp(np.log(0.5)*dtw_value/dtw_matrix[medoid_id, nearest_order[1]]) |
| average_pattern[path[0]] += weight * warped |
| weighted_sums[path[0]] += weight |
| |
| ret[i,:] = average_pattern / weighted_sums[:,np.newaxis] |
| else: |
| if verbose > -1: |
| print("There is only one pattern of class %d, skipping pattern average"%l[i]) |
| ret[i,:] = x[i] |
| return ret |
|
|
|
|
|
|
| def random_guided_warp(x, labels, slope_constraint="symmetric", use_window=True, dtw_type="normal", verbose=0): |
| |
| |
| |
| |
| import dtw as dtw |
| |
| if use_window: |
| window = np.ceil(x.shape[1] / 10.).astype(int) |
| else: |
| window = None |
| orig_steps = np.arange(x.shape[1]) |
| l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels |
| |
| ret = np.zeros_like(x) |
| for i, pat in enumerate(tqdm(x)): |
| |
| choices = np.delete(np.arange(x.shape[0]), i) |
| |
| choices = np.where(l[choices] == l[i])[0] |
| if choices.size > 0: |
| |
| random_prototype = x[np.random.choice(choices)] |
| |
| if dtw_type == "shape": |
| path = dtw.shape_dtw(random_prototype, pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window) |
| else: |
| path = dtw.dtw(random_prototype, pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window) |
| |
| |
| warped = pat[path[1]] |
| for dim in range(x.shape[2]): |
| ret[i,:,dim] = np.interp(orig_steps, np.linspace(0, x.shape[1]-1., num=warped.shape[0]), warped[:,dim]).T |
| else: |
| if verbose > -1: |
| print("There is only one pattern of class %d, skipping timewarping"%l[i]) |
| ret[i,:] = pat |
| return ret |
|
|
| def random_guided_warp_shape(x, labels, slope_constraint="symmetric", use_window=True): |
| return random_guided_warp(x, labels, slope_constraint, use_window, dtw_type="shape") |
|
|
| def discriminative_guided_warp(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True, dtw_type="normal", use_variable_slice=True, verbose=0): |
| |
| |
| |
| |
| import dtw as dtw |
| |
| if use_window: |
| window = np.ceil(x.shape[1] / 10.).astype(int) |
| else: |
| window = None |
| orig_steps = np.arange(x.shape[1]) |
| l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels |
| |
| positive_batch = np.ceil(batch_size / 2).astype(int) |
| negative_batch = np.floor(batch_size / 2).astype(int) |
| |
| ret = np.zeros_like(x) |
| warp_amount = np.zeros(x.shape[0]) |
| for i, pat in enumerate(tqdm(x)): |
| |
| choices = np.delete(np.arange(x.shape[0]), i) |
| |
| |
| positive = np.where(l[choices] == l[i])[0] |
| negative = np.where(l[choices] != l[i])[0] |
| |
| if positive.size > 0 and negative.size > 0: |
| pos_k = min(positive.size, positive_batch) |
| neg_k = min(negative.size, negative_batch) |
| positive_prototypes = x[np.random.choice(positive, pos_k, replace=False)] |
| negative_prototypes = x[np.random.choice(negative, neg_k, replace=False)] |
| |
| |
| pos_aves = np.zeros((pos_k)) |
| neg_aves = np.zeros((pos_k)) |
| if dtw_type == "shape": |
| for p, pos_prot in enumerate(positive_prototypes): |
| for ps, pos_samp in enumerate(positive_prototypes): |
| if p != ps: |
| pos_aves[p] += (1./(pos_k-1.))*dtw.shape_dtw(pos_prot, pos_samp, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window) |
| for ns, neg_samp in enumerate(negative_prototypes): |
| neg_aves[p] += (1./neg_k)*dtw.shape_dtw(pos_prot, neg_samp, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window) |
| selected_id = np.argmax(neg_aves - pos_aves) |
| path = dtw.shape_dtw(positive_prototypes[selected_id], pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window) |
| else: |
| for p, pos_prot in enumerate(positive_prototypes): |
| for ps, pos_samp in enumerate(positive_prototypes): |
| if p != ps: |
| pos_aves[p] += (1./(pos_k-1.))*dtw.dtw(pos_prot, pos_samp, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window) |
| for ns, neg_samp in enumerate(negative_prototypes): |
| neg_aves[p] += (1./neg_k)*dtw.dtw(pos_prot, neg_samp, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window) |
| selected_id = np.argmax(neg_aves - pos_aves) |
| path = dtw.dtw(positive_prototypes[selected_id], pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window) |
| |
| |
| warped = pat[path[1]] |
| warp_path_interp = np.interp(orig_steps, np.linspace(0, x.shape[1]-1., num=warped.shape[0]), path[1]) |
| warp_amount[i] = np.sum(np.abs(orig_steps-warp_path_interp)) |
| for dim in range(x.shape[2]): |
| ret[i,:,dim] = np.interp(orig_steps, np.linspace(0, x.shape[1]-1., num=warped.shape[0]), warped[:,dim]).T |
| else: |
| if verbose > -1: |
| print("There is only one pattern of class %d"%l[i]) |
| ret[i,:] = pat |
| warp_amount[i] = 0. |
| if use_variable_slice: |
| max_warp = np.max(warp_amount) |
| if max_warp == 0: |
| |
| ret = window_slice(ret, reduce_ratio=0.9) |
| else: |
| for i, pat in enumerate(ret): |
| |
| ret[i] = window_slice(pat[np.newaxis,:,:], reduce_ratio=0.9+0.1*warp_amount[i]/max_warp)[0] |
| return ret |
|
|
| def discriminative_guided_warp_shape(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True): |
| return discriminative_guided_warp(x, labels, batch_size, slope_constraint, use_window, dtw_type="shape") |
|
|