bakhshaliyev's picture
added all the files
556d303 verified
# Adapted from time_series_augmentation (https://github.com/uchidalab/time_series_augmentation)
# Original: Apache License 2.0 by Brian Kenji Iwana and Seiichi Uchida
# Modified by Jafar Bakhshaliyev (2025) - Licensed under GPL v3.0
import numpy as np
from tqdm import tqdm
def tps(x, y, patch_len=0, stride=0, shuffle_rate=0.0):
"""
Temporal Patch Shuffle (TPS) augmentation for time series classification.
Parameters:
-----------
x : numpy.ndarray
Input time series data of shape (n_samples, timesteps, n_features)
patch_len : int
Length of each patch
stride : int
Stride between patches
shuffle_rate : float
Proportion of patches to shuffle (between 0 and 1)
Returns:
--------
numpy.ndarray
Augmented time series data with same shape as input
"""
n_samples, T, n_features = x.shape
ret = np.zeros_like(x)
# Calculate required padding to avoid zeros at the end
total_patches = (T - patch_len + stride - 1) // stride + 1
total_len = (total_patches - 1) * stride + patch_len
padding_needed = total_len - T
# Process each sample
for i in range(n_samples):
# current sample
sample = x[i] # shape: (timesteps, n_features)
# Apply padding if needed
if padding_needed > 0:
padded_sample = np.pad(sample, ((0, padding_needed), (0, 0)), mode='edge')
T_padded = T + padding_needed
else:
padded_sample = sample
T_padded = T
num_patches = ((T_padded - patch_len) // stride) + 1
# Create patches for current sample
patches = np.zeros((num_patches, patch_len, n_features))
for j in range(num_patches):
start = j * stride
patches[j] = padded_sample[start:start + patch_len]
# importance of each patch
importance_scores = np.var(patches, axis=(1, 2))
# number of patches to shuffle
num_to_shuffle = int(num_patches * shuffle_rate)
if num_to_shuffle > 0:
# indices of least important patches
shuffle_indices = np.argsort(importance_scores)[:num_to_shuffle]
# Shuffle these patches among themselves
patches_to_shuffle = patches[shuffle_indices].copy()
shuffled_order = np.random.permutation(num_to_shuffle)
for idx, new_idx in enumerate(shuffled_order):
patch_idx = shuffle_indices[idx]
new_patch = patches_to_shuffle[new_idx]
patches[patch_idx] = new_patch
# Reconstruct the time series
reconstructed = np.zeros((T_padded, n_features))
counts = np.zeros((T_padded, n_features))
for j in range(num_patches):
start = j * stride
end = start + patch_len
reconstructed[start:end] += patches[j]
counts[start:end] += 1
# Average overlapping patches and handle potential zeros
# Use a mask to identify zero counts
mask = counts == 0
if np.any(mask):
# Fill in zeros with nearest non-zero values
for feat in range(n_features):
feat_mask = mask[:, feat]
if np.any(feat_mask):
# Get indices of zero and non-zero values
zero_indices = np.where(feat_mask)[0]
nonzero_indices = np.where(~feat_mask)[0]
if len(nonzero_indices) > 0:
# Find nearest non-zero index for each zero index
for zero_idx in zero_indices:
nearest_idx = nonzero_indices[np.argmin(np.abs(nonzero_indices - zero_idx))]
reconstructed[zero_idx, feat] = reconstructed[nearest_idx, feat]
counts[zero_idx, feat] = 1
# Avoid division by zero
counts[counts == 0] = 1
reconstructed = reconstructed / counts
# Remove padding
ret[i] = reconstructed[:T]
return ret
def jitter(x, sigma=0.03):
# https://arxiv.org/pdf/1706.00527.pdf
return x + np.random.normal(loc=0., scale=sigma, size=x.shape)
def scaling(x, sigma=0.1):
# https://arxiv.org/pdf/1706.00527.pdf
factor = np.random.normal(loc=1., scale=sigma, size=(x.shape[0],x.shape[2]))
return np.multiply(x, factor[:,np.newaxis,:])
def rotation(x):
flip = np.random.choice([-1, 1], size=(x.shape[0],x.shape[2]))
rotate_axis = np.arange(x.shape[2])
np.random.shuffle(rotate_axis)
return flip[:,np.newaxis,:] * x[:,:,rotate_axis]
def magnitude_warp(x, sigma=0.2, knot=4):
from scipy.interpolate import CubicSpline
orig_steps = np.arange(x.shape[1])
random_warps = np.random.normal(loc=1.0, scale=sigma, size=(x.shape[0], knot+2, x.shape[2]))
warp_steps = (np.ones((x.shape[2],1))*(np.linspace(0, x.shape[1]-1., num=knot+2))).T
ret = np.zeros_like(x)
for i, pat in enumerate(x):
warper = np.array([CubicSpline(warp_steps[:,dim], random_warps[i,:,dim])(orig_steps) for dim in range(x.shape[2])]).T
ret[i] = pat * warper
return ret
def time_warp(x, sigma=0.2, knot=4):
from scipy.interpolate import CubicSpline
orig_steps = np.arange(x.shape[1])
random_warps = np.random.normal(loc=1.0, scale=sigma, size=(x.shape[0], knot+2, x.shape[2]))
warp_steps = (np.ones((x.shape[2],1))*(np.linspace(0, x.shape[1]-1., num=knot+2))).T
ret = np.zeros_like(x)
for i, pat in enumerate(x):
for dim in range(x.shape[2]):
time_warp = CubicSpline(warp_steps[:,dim], warp_steps[:,dim] * random_warps[i,:,dim])(orig_steps)
scale = (x.shape[1]-1)/time_warp[-1]
ret[i,:,dim] = np.interp(orig_steps, np.clip(scale*time_warp, 0, x.shape[1]-1), pat[:,dim]).T
return ret
def window_slice(x, reduce_ratio=0.9):
# https://halshs.archives-ouvertes.fr/halshs-01357973/document
target_len = np.ceil(reduce_ratio*x.shape[1]).astype(int)
if target_len >= x.shape[1]:
return x
starts = np.random.randint(low=0, high=x.shape[1]-target_len, size=(x.shape[0])).astype(int)
ends = (target_len + starts).astype(int)
ret = np.zeros_like(x)
for i, pat in enumerate(x):
for dim in range(x.shape[2]):
ret[i,:,dim] = np.interp(np.linspace(0, target_len, num=x.shape[1]), np.arange(target_len), pat[starts[i]:ends[i],dim]).T
return ret
def permutation(x, max_segments=5, seg_mode="equal"):
orig_steps = np.arange(x.shape[1])
num_segs = np.random.randint(1, max_segments, size=(x.shape[0]))
ret = np.zeros_like(x)
for i, pat in enumerate(x):
if num_segs[i] > 1:
if seg_mode == "random":
# Fix: Check if we have enough points to sample from
available_points = x.shape[1] - 2
needed_points = num_segs[i] - 1
# Ensure we have enough points to sample and adjust if necessary
if available_points <= 0:
# Not enough points for random splitting, fallback to equal segments
splits = np.array_split(orig_steps, num_segs[i])
elif needed_points > available_points:
# Too many segments requested, adjust number of segments
actual_segs = min(available_points + 1, num_segs[i])
splits = np.array_split(orig_steps, actual_segs)
else:
# Original logic can work
split_points = np.random.choice(available_points, needed_points, replace=False)
split_points.sort()
splits = np.split(orig_steps, split_points)
else:
splits = np.array_split(orig_steps, num_segs[i])
# Only permute if we have more than one segment
if len(splits) > 1:
perm = np.random.permutation(len(splits))
warp = np.concatenate([splits[j] for j in perm]).ravel()
ret[i] = pat[warp]
else:
ret[i] = pat
else:
ret[i] = pat
return ret
# Fixed window_warp function
def window_warp(x, window_ratio=0.1, scales=[0.5, 2.]):
# https://halshs.archives-ouvertes.fr/halshs-01357973/document
warp_scales = np.random.choice(scales, x.shape[0])
warp_size = np.ceil(window_ratio*x.shape[1]).astype(int)
# Handle edge cases: ensure warp_size is at least 1
warp_size = max(1, warp_size)
window_steps = np.arange(warp_size)
ret = np.zeros_like(x)
for i, pat in enumerate(x):
# Check if we have enough room for warping
if x.shape[1] <= warp_size + 2:
# Not enough space for warping, return original pattern
ret[i] = pat
continue
# Safely generate window start position
try:
window_start = np.random.randint(low=1, high=x.shape[1]-warp_size-1)
except ValueError:
# Fallback if random range is invalid
window_start = 1
window_end = window_start + warp_size
for dim in range(x.shape[2]):
start_seg = pat[:window_start, dim]
window_seg = np.interp(
np.linspace(0, warp_size-1, num=int(warp_size*warp_scales[i])),
window_steps,
pat[window_start:window_end, dim]
)
end_seg = pat[window_end:, dim]
warped = np.concatenate((start_seg, window_seg, end_seg))
ret[i, :, dim] = np.interp(
np.arange(x.shape[1]),
np.linspace(0, x.shape[1]-1., num=warped.size),
warped
).T
return ret
def spawner(x, labels, sigma=0.05, verbose=0):
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6983028/
# use verbose=-1 to turn off warnings
# use verbose=1 to print out figures
import dtw as dtw
# Fix for the random_points generation
if x.shape[1] <= 2: # Check if there are enough time points
if verbose > -1:
print("Warning: Time series too short for spawner augmentation")
return x # Return the original data if too short
# Generate random points safely for each time series
random_points = np.zeros(x.shape[0], dtype=int)
for i in range(x.shape[0]):
try:
random_points[i] = np.random.randint(low=1, high=x.shape[1]-1)
except ValueError:
# Fallback if random range is invalid
random_points[i] = 1
window = np.ceil(x.shape[1] / 10.).astype(int)
window = max(1, window) # Ensure window is at least 1
orig_steps = np.arange(x.shape[1])
l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels
ret = np.zeros_like(x)
for i, pat in enumerate(tqdm(x) if 'tqdm' in globals() else x):
# guarantees that same one isn't selected
choices = np.delete(np.arange(x.shape[0]), i)
# remove ones of different classes
choices = np.where(l[choices] == l[i])[0]
if choices.size > 0:
random_sample = x[np.random.choice(choices)]
# SPAWNER splits the path into two randomly
try:
# Handle potential edge cases with very small sequences
random_point = random_points[i]
if random_point <= 0:
random_point = 1
if random_point >= x.shape[1]:
random_point = x.shape[1] - 1
# Check if window size is appropriate
if window >= min(random_point, pat.shape[0] - random_point):
# Adjust window if it's too large
adjusted_window = max(1, min(random_point, pat.shape[0] - random_point) - 1)
if verbose > -1:
print(f"Warning: Adjusting window from {window} to {adjusted_window}")
window = adjusted_window
path1 = dtw.dtw(pat[:random_point], random_sample[:random_point],
dtw.RETURN_PATH, slope_constraint="symmetric", window=window)
path2 = dtw.dtw(pat[random_point:], random_sample[random_point:],
dtw.RETURN_PATH, slope_constraint="symmetric", window=window)
combined = np.concatenate((np.vstack(path1), np.vstack(path2+random_point)), axis=1)
if verbose:
print(random_point)
dtw_value, cost, DTW_map, path = dtw.dtw(pat, random_sample,
return_flag=dtw.RETURN_ALL,
slope_constraint="symmetric",
window=window)
dtw.draw_graph1d(cost, DTW_map, path, pat, random_sample)
dtw.draw_graph1d(cost, DTW_map, combined, pat, random_sample)
mean = np.mean([pat[combined[0]], random_sample[combined[1]]], axis=0)
# Handle potential size mismatch
if mean.shape[0] > 0:
for dim in range(x.shape[2]):
ret[i,:,dim] = np.interp(orig_steps,
np.linspace(0, x.shape[1]-1., num=mean.shape[0]),
mean[:,dim]).T
else:
if verbose > -1:
print("Warning: DTW produced empty path, skipping augmentation")
ret[i,:] = pat
except Exception as e:
if verbose > -1:
print(f"Error in DTW computation: {e}")
ret[i,:] = pat
else:
if verbose > -1:
print(f"There is only one pattern of class {l[i]}, skipping pattern average")
ret[i,:] = pat
# Assuming jitter is defined elsewhere
try:
return jitter(ret, sigma=sigma)
except:
if verbose > -1:
print("Warning: jitter function failed or not found, returning unjittered data")
return ret
def wdba(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True, verbose=0):
# https://ieeexplore.ieee.org/document/8215569
# use verbose = -1 to turn off warnings
# slope_constraint is for DTW. "symmetric" or "asymmetric"
import dtw as dtw
if use_window:
window = np.ceil(x.shape[1] / 10.).astype(int)
else:
window = None
orig_steps = np.arange(x.shape[1])
l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels
ret = np.zeros_like(x)
for i in tqdm(range(ret.shape[0])):
# get the same class as i
choices = np.where(l == l[i])[0]
if choices.size > 0:
# pick random intra-class pattern
k = min(choices.size, batch_size)
random_prototypes = x[np.random.choice(choices, k, replace=False)]
# calculate dtw between all
dtw_matrix = np.zeros((k, k))
for p, prototype in enumerate(random_prototypes):
for s, sample in enumerate(random_prototypes):
if p == s:
dtw_matrix[p, s] = 0.
else:
dtw_matrix[p, s] = dtw.dtw(prototype, sample, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window)
# get medoid
medoid_id = np.argsort(np.sum(dtw_matrix, axis=1))[0]
nearest_order = np.argsort(dtw_matrix[medoid_id])
medoid_pattern = random_prototypes[medoid_id]
# start weighted DBA
average_pattern = np.zeros_like(medoid_pattern)
weighted_sums = np.zeros((medoid_pattern.shape[0]))
for nid in nearest_order:
if nid == medoid_id or dtw_matrix[medoid_id, nearest_order[1]] == 0.:
average_pattern += medoid_pattern
weighted_sums += np.ones_like(weighted_sums)
else:
path = dtw.dtw(medoid_pattern, random_prototypes[nid], dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window)
dtw_value = dtw_matrix[medoid_id, nid]
warped = random_prototypes[nid, path[1]]
weight = np.exp(np.log(0.5)*dtw_value/dtw_matrix[medoid_id, nearest_order[1]])
average_pattern[path[0]] += weight * warped
weighted_sums[path[0]] += weight
ret[i,:] = average_pattern / weighted_sums[:,np.newaxis]
else:
if verbose > -1:
print("There is only one pattern of class %d, skipping pattern average"%l[i])
ret[i,:] = x[i]
return ret
def random_guided_warp(x, labels, slope_constraint="symmetric", use_window=True, dtw_type="normal", verbose=0):
# use verbose = -1 to turn off warnings
# slope_constraint is for DTW. "symmetric" or "asymmetric"
# dtw_type is for shapeDTW or DTW. "normal" or "shape"
import dtw as dtw
if use_window:
window = np.ceil(x.shape[1] / 10.).astype(int)
else:
window = None
orig_steps = np.arange(x.shape[1])
l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels
ret = np.zeros_like(x)
for i, pat in enumerate(tqdm(x)):
# guarentees that same one isnt selected
choices = np.delete(np.arange(x.shape[0]), i)
# remove ones of different classes
choices = np.where(l[choices] == l[i])[0]
if choices.size > 0:
# pick random intra-class pattern
random_prototype = x[np.random.choice(choices)]
if dtw_type == "shape":
path = dtw.shape_dtw(random_prototype, pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window)
else:
path = dtw.dtw(random_prototype, pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window)
# Time warp
warped = pat[path[1]]
for dim in range(x.shape[2]):
ret[i,:,dim] = np.interp(orig_steps, np.linspace(0, x.shape[1]-1., num=warped.shape[0]), warped[:,dim]).T
else:
if verbose > -1:
print("There is only one pattern of class %d, skipping timewarping"%l[i])
ret[i,:] = pat
return ret
def random_guided_warp_shape(x, labels, slope_constraint="symmetric", use_window=True):
return random_guided_warp(x, labels, slope_constraint, use_window, dtw_type="shape")
def discriminative_guided_warp(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True, dtw_type="normal", use_variable_slice=True, verbose=0):
# use verbose = -1 to turn off warnings
# slope_constraint is for DTW. "symmetric" or "asymmetric"
# dtw_type is for shapeDTW or DTW. "normal" or "shape"
import dtw as dtw
if use_window:
window = np.ceil(x.shape[1] / 10.).astype(int)
else:
window = None
orig_steps = np.arange(x.shape[1])
l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels
positive_batch = np.ceil(batch_size / 2).astype(int)
negative_batch = np.floor(batch_size / 2).astype(int)
ret = np.zeros_like(x)
warp_amount = np.zeros(x.shape[0])
for i, pat in enumerate(tqdm(x)):
# guarentees that same one isnt selected
choices = np.delete(np.arange(x.shape[0]), i)
# remove ones of different classes
positive = np.where(l[choices] == l[i])[0]
negative = np.where(l[choices] != l[i])[0]
if positive.size > 0 and negative.size > 0:
pos_k = min(positive.size, positive_batch)
neg_k = min(negative.size, negative_batch)
positive_prototypes = x[np.random.choice(positive, pos_k, replace=False)]
negative_prototypes = x[np.random.choice(negative, neg_k, replace=False)]
# vector embedding and nearest prototype in one
pos_aves = np.zeros((pos_k))
neg_aves = np.zeros((pos_k))
if dtw_type == "shape":
for p, pos_prot in enumerate(positive_prototypes):
for ps, pos_samp in enumerate(positive_prototypes):
if p != ps:
pos_aves[p] += (1./(pos_k-1.))*dtw.shape_dtw(pos_prot, pos_samp, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window)
for ns, neg_samp in enumerate(negative_prototypes):
neg_aves[p] += (1./neg_k)*dtw.shape_dtw(pos_prot, neg_samp, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window)
selected_id = np.argmax(neg_aves - pos_aves)
path = dtw.shape_dtw(positive_prototypes[selected_id], pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window)
else:
for p, pos_prot in enumerate(positive_prototypes):
for ps, pos_samp in enumerate(positive_prototypes):
if p != ps:
pos_aves[p] += (1./(pos_k-1.))*dtw.dtw(pos_prot, pos_samp, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window)
for ns, neg_samp in enumerate(negative_prototypes):
neg_aves[p] += (1./neg_k)*dtw.dtw(pos_prot, neg_samp, dtw.RETURN_VALUE, slope_constraint=slope_constraint, window=window)
selected_id = np.argmax(neg_aves - pos_aves)
path = dtw.dtw(positive_prototypes[selected_id], pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window)
# Time warp
warped = pat[path[1]]
warp_path_interp = np.interp(orig_steps, np.linspace(0, x.shape[1]-1., num=warped.shape[0]), path[1])
warp_amount[i] = np.sum(np.abs(orig_steps-warp_path_interp))
for dim in range(x.shape[2]):
ret[i,:,dim] = np.interp(orig_steps, np.linspace(0, x.shape[1]-1., num=warped.shape[0]), warped[:,dim]).T
else:
if verbose > -1:
print("There is only one pattern of class %d"%l[i])
ret[i,:] = pat
warp_amount[i] = 0.
if use_variable_slice:
max_warp = np.max(warp_amount)
if max_warp == 0:
# unchanged
ret = window_slice(ret, reduce_ratio=0.9)
else:
for i, pat in enumerate(ret):
# Variable Sllicing
ret[i] = window_slice(pat[np.newaxis,:,:], reduce_ratio=0.9+0.1*warp_amount[i]/max_warp)[0]
return ret
def discriminative_guided_warp_shape(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True):
return discriminative_guided_warp(x, labels, batch_size, slope_constraint, use_window, dtw_type="shape")