Spaces:
Sleeping
Sleeping
| import torch | |
| import numpy as np | |
| import torch.nn.functional as F | |
| def costume_collate(data, max_len=None, mask_compensation=False): | |
| """Build mini-batch tensors from a list of (X, mask) tuples. Mask input. Create | |
| Args: | |
| data: len(batch_size) list of tuples (X, mask). | |
| - X: torch tensor of shape (seq_length, feat_dim); variable seq_length. | |
| - mask: boolean torch tensor of shape (seq_length, feat_dim); variable seq_length. | |
| max_len: global fixed sequence length. Used for architectures requiring fixed length input, | |
| where the batch length cannot vary dynamically. Longer sequences are clipped, shorter are padded with 0s | |
| Returns: | |
| X: (batch_size, padded_length, feat_dim) torch tensor of masked features (input) | |
| targets: (batch_size, padded_length, feat_dim) torch tensor of unmasked features (output) | |
| target_masks: (batch_size, padded_length, feat_dim) boolean torch tensor | |
| 0 indicates masked values to be predicted, 1 indicates unaffected/"active" feature values | |
| padding_masks: (batch_size, padded_length) boolean tensor, 1 means keep vector at this position, 0 ignore (padding) | |
| """ | |
| batch_size = len(data) | |
| features, masks = zip(*data) | |
| # Stack and pad features and masks (convert 2D to 3D tensors, i.e. add batch dimension) | |
| lengths = [ | |
| X.shape[0] for X in features | |
| ] # original sequence length for each time series | |
| if max_len is None: | |
| max_len = max(lengths) | |
| X = torch.zeros( | |
| batch_size, max_len, features[0].shape[-1] | |
| ) # (batch_size, padded_length, feat_dim) | |
| target_masks = torch.zeros_like( | |
| X, dtype=torch.bool | |
| ) # (batch_size, padded_length, feat_dim) masks related to objective | |
| for i in range(batch_size): | |
| end = min(lengths[i], max_len) | |
| X[i, :end, :] = features[i][:end, :] | |
| target_masks[i, :end, :] = masks[i][:end, :] | |
| targets = X.clone() | |
| X = X * target_masks # mask input | |
| if mask_compensation: | |
| X = compensate_masking(X, target_masks) | |
| padding_masks = padding_mask( | |
| torch.tensor(lengths, dtype=torch.int16), max_len=max_len | |
| ) # (batch_size, padded_length) boolean tensor, "1" means keep | |
| # target_masks = ~target_masks # inverse logic: 0 now means ignore, 1 means predict | |
| return X, targets, target_masks, padding_masks | |
| def compensate_masking(X, mask): | |
| """ | |
| Compensate feature vectors after masking values, in a way that the matrix product W @ X would not be affected on average. | |
| If p is the proportion of unmasked (active) elements, X' = X / p = X * feat_dim/num_active | |
| Args: | |
| X: (batch_size, seq_length, feat_dim) torch tensor | |
| mask: (batch_size, seq_length, feat_dim) torch tensor: 0s means mask and predict, 1s: unaffected (active) input | |
| Returns: | |
| (batch_size, seq_length, feat_dim) compensated features | |
| """ | |
| # number of unmasked elements of feature vector for each time step | |
| num_active = torch.sum(mask, dim=-1).unsqueeze(-1) # (batch_size, seq_length, 1) | |
| # to avoid division by 0, set the minimum to 1 | |
| num_active = torch.max( | |
| num_active, torch.ones(num_active.shape, dtype=torch.int16) | |
| ) # (batch_size, seq_length, 1) | |
| return X.shape[-1] * X / num_active | |
| def padding_mask(lengths, max_len=None): | |
| """ | |
| Used to mask padded positions: creates a (batch_size, max_len) boolean mask from a tensor of sequence lengths, | |
| where 1 means keep element at this position (time step) | |
| """ | |
| batch_size = lengths.numel() | |
| max_len = ( | |
| max_len or lengths.max_val() | |
| ) # trick works because of overloading of 'or' operator for non-boolean types | |
| return ( | |
| torch.arange(0, max_len, device=lengths.device) | |
| .type_as(lengths) | |
| .repeat(batch_size, 1) | |
| .lt(lengths.unsqueeze(1)) | |
| ) | |
| def noise_mask( | |
| X, | |
| masking_ratio, | |
| lm=3, | |
| mode="separate", | |
| distribution="geometric", | |
| exclude_feats=None, | |
| ): | |
| """ | |
| Creates a random boolean mask of the same shape as X, with 0s at places where a feature should be masked. | |
| Args: | |
| X: (seq_length, feat_dim) numpy array of features corresponding to a single sample | |
| masking_ratio: proportion of seq_length to be masked. At each time step, will also be the proportion of | |
| feat_dim that will be masked on average | |
| lm: average length of masking subsequences (streaks of 0s). Used only when `distribution` is 'geometric'. | |
| mode: whether each variable should be masked separately ('separate'), or all variables at a certain positions | |
| should be masked concurrently ('concurrent') | |
| distribution: whether each mask sequence element is sampled independently at random, or whether | |
| sampling follows a markov chain (and thus is stateful), resulting in geometric distributions of | |
| masked squences of a desired mean length `lm` | |
| exclude_feats: iterable of indices corresponding to features to be excluded from masking (i.e. to remain all 1s) | |
| Returns: | |
| boolean numpy array with the same shape as X, with 0s at places where a feature should be masked | |
| """ | |
| if exclude_feats is not None: | |
| exclude_feats = set(exclude_feats) | |
| if distribution == "geometric": # stateful (Markov chain) | |
| if mode == "separate": # each variable (feature) is independent | |
| mask = np.ones(X.shape, dtype=bool) | |
| for m in range(X.shape[1]): # feature dimension | |
| if exclude_feats is None or m not in exclude_feats: | |
| mask[:, m] = geom_noise_mask_single( | |
| X.shape[0], lm, masking_ratio | |
| ) # time dimension | |
| else: # replicate across feature dimension (mask all variables at the same positions concurrently) | |
| mask = np.tile( | |
| np.expand_dims( | |
| geom_noise_mask_single(X.shape[0], lm, masking_ratio), 1 | |
| ), | |
| X.shape[1], | |
| ) | |
| else: # each position is independent Bernoulli with p = 1 - masking_ratio | |
| if mode == "separate": | |
| mask = np.random.choice( | |
| np.array([True, False]), | |
| size=X.shape, | |
| replace=True, | |
| p=(1 - masking_ratio, masking_ratio), | |
| ) | |
| else: | |
| mask = np.tile( | |
| np.random.choice( | |
| np.array([True, False]), | |
| size=(X.shape[0], 1), | |
| replace=True, | |
| p=(1 - masking_ratio, masking_ratio), | |
| ), | |
| X.shape[1], | |
| ) | |
| return mask | |
| def geom_noise_mask_single(L, lm, masking_ratio): | |
| """ | |
| Randomly create a boolean mask of length `L`, consisting of subsequences of average length lm, masking with 0s a `masking_ratio` | |
| proportion of the sequence L. The length of masking subsequences and intervals follow a geometric distribution. | |
| Args: | |
| L: length of mask and sequence to be masked | |
| lm: average length of masking subsequences (streaks of 0s) | |
| masking_ratio: proportion of L to be masked | |
| Returns: | |
| (L,) boolean numpy array intended to mask ('drop') with 0s a sequence of length L | |
| """ | |
| keep_mask = np.ones(L, dtype=bool) | |
| p_m = ( | |
| 1 / lm | |
| ) # probability of each masking sequence stopping. parameter of geometric distribution. | |
| p_u = p_m * masking_ratio / (1 - masking_ratio) | |
| # probability of each unmasked sequence stopping. parameter of geometric distribution. | |
| p = [p_m, p_u] | |
| # Start in state 0 with masking_ratio probability | |
| state = int( | |
| np.random.rand() > masking_ratio | |
| ) # state 0 means masking, 1 means not masking | |
| for i in range(L): | |
| keep_mask[i] = ( | |
| state # here it happens that state and masking value corresponding to state are identical | |
| ) | |
| if np.random.rand() < p[state]: | |
| state = 1 - state | |
| return keep_mask | |
| def uniform_noise_mask_single(L, lm, masking_ratio): | |
| """ | |
| Randomly create a boolean mask of length `L`, consisting of subsequences of average length lm, masking with 0s a `masking_ratio` | |
| proportion of the sequence L. The length of masking subsequences and intervals follow a uniform distribution. | |
| Args: | |
| L: length of mask and sequence to be masked | |
| lm: average length of masking subsequences (streaks of 0s) | |
| masking_ratio: proportion of L to be masked | |
| """ | |
| keep_mask = np.ones(L, dtype=bool) | |
| # Start in state 0 with masking_ratio probability | |
| state = int( | |
| np.random.rand() > masking_ratio | |
| ) # state 0 means masking, 1 means not masking | |
| for i in range(L): | |
| keep_mask[i] = ( | |
| state # here it happens that state and masking value corresponding to state are identical | |
| ) | |
| if np.random.rand() < 1 / lm: | |
| state = 1 - state | |
| return keep_mask | |