import numpy as np from numpy import dot from numpy.linalg import norm import os from scipy import stats import shutil from sklearn import metrics import torch from torch import Tensor import torch.distributed as dist import torch.autograd as autograd from typing import TypeVar, Optional, Iterator, Sequence from torch.utils.data import Dataset, Sampler, DistributedSampler import math T_co = TypeVar('T_co', covariant=True) class WeightedRandomSampler(Sampler[int]): r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights). Args: weights (sequence) : a sequence of weights, not necessary summing up to one num_samples (int): number of samples to draw replacement (bool): if ``True``, samples are drawn with replacement. If not, they are drawn without replacement, which means that when a sample index is drawn for a row, it cannot be drawn again for that row. generator (Generator): Generator used in sampling. Example: >>> list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True)) [4, 4, 1, 4, 5] >>> list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False)) [0, 1, 4, 3, 2] """ weights: Tensor num_samples: int replacement: bool def __init__(self, weights: Sequence[float], num_samples: int, replacement: bool = True, generator=None) -> None: if not isinstance(num_samples, int) or isinstance(num_samples, bool) or \ num_samples <= 0: raise ValueError("num_samples should be a positive integer " "value, but got num_samples={}".format(num_samples)) if not isinstance(replacement, bool): raise ValueError("replacement should be a boolean value, but got " "replacement={}".format(replacement)) self.weights = torch.as_tensor(weights, dtype=torch.double) self.num_samples = num_samples self.replacement = replacement self.generator = generator def __iter__(self) -> Iterator[int]: rand_tensor = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator) yield from iter(rand_tensor.tolist()) def __len__(self) -> int: return self.num_samples class DistributedSamplerWrapper(DistributedSampler): def __init__( self, sampler, dataset, num_replicas=None, rank=None, shuffle: bool = True): super(DistributedSamplerWrapper, self).__init__( dataset, num_replicas, rank, shuffle) # source: @awaelchli https://github.com/PyTorchLightning/pytorch-lightning/issues/3238 self.sampler = sampler def __iter__(self): if self.sampler.generator is None: self.sampler.generator = torch.Generator() self.sampler.generator.manual_seed(self.seed + self.epoch) indices = list(self.sampler) if self.epoch == 0: print(f"\n DistributedSamplerWrapper : {indices[:10]} \n\n") indices = indices[self.rank:self.total_size:self.num_replicas] return iter(indices) class DistributedWeightedSampler(Sampler): weights: Tensor num_samples: int replacement: bool #dataset_train, samples_weight, num_replicas=num_tasks, rank=global_rank def __init__(self, dataset: Dataset, weights: Sequence[float], num_replicas: Optional[int] = None, rank: Optional[int] = None, replacement: bool = True, shuffle: bool = True, seed: int = 0, drop_last: bool = False) -> None: if num_replicas is None: if not dist.is_available(): raise RuntimeError("Requires distributed package to be available") num_replicas = dist.get_world_size() if rank is None: if not dist.is_available(): raise RuntimeError("Requires distributed package to be available") rank = dist.get_rank() if rank >= num_replicas or rank < 0: raise ValueError( "Invalid rank {}, rank should be in the interval" " [0, {}]".format(rank, num_replicas - 1)) self.dataset = dataset self.num_replicas = num_replicas self.rank = rank self.epoch = 0 self.drop_last = drop_last if self.drop_last and len(self.dataset) % self.num_replicas != 0: # type: ignore[arg-type] # Split to nearest available length that is evenly divisible. # This is to ensure each rank receives the same amount of data when # using this Sampler. self.num_samples = math.ceil( (len(self.dataset) - self.num_replicas) / self.num_replicas # type: ignore[arg-type] ) else: self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) # type: ignore[arg-type] self.total_size = self.num_samples * self.num_replicas self.replacement = replacement self.weights = torch.from_numpy(weights) self.shuffle = shuffle self.seed = seed def __iter__(self) -> Iterator[T_co]: # deterministically shuffle based on epoch if self.shuffle: g = torch.Generator() g.manual_seed(self.seed + self.epoch) indices = torch.randperm(len(self.dataset), generator=g).tolist() else: indices = list(range(len(self.dataset))) if not self.drop_last: # add extra samples to make it evenly divisible padding_size = self.total_size - len(indices) if padding_size <= len(indices): indices += indices[:padding_size] else: indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size] else: indices = indices[:self.total_size] assert len(indices) == self.total_size # subsample indices = indices[self.rank:self.total_size:self.num_replicas] assert len(indices) == self.num_samples # # get targets (you can alternatively pass them in __init__, if this op is expensive) # targets = self.dataset.targets # # select only the wanted targets for this subsample # targets = torch.tensor(targets)[indices] # assert len(targets) == self.num_samples # # randomly sample this subset, producing balanced classes # weights = self.calculate_weights(targets) weights = self.weights[indices] subsample_rand_tensor = torch.multinomial(weights, self.num_samples, self.replacement) # now map these target indicies back to the original dataset index... dataset_indices = torch.tensor(indices)[subsample_rand_tensor] return iter(dataset_indices.tolist()) # return iter(indices) def __len__(self) -> int: return self.num_samples def set_epoch(self, epoch) -> None: self.epoch = epoch def off_diagonal(x): n, m = x.shape assert n == m return x.flatten()[:-1].view(n - 1, n + 1)[:, 1:].flatten() def is_dist_avail_and_initialized(): if not dist.is_available(): return False if not dist.is_initialized(): return False return True def get_world_size(): if not is_dist_avail_and_initialized(): return 1 return dist.get_world_size() def get_rank(): if not is_dist_avail_and_initialized(): return 0 return dist.get_rank() def is_main_process(): return get_rank() == 0 def save_on_master(state, is_best, output_dir): if is_main_process(): ckpt_path = f'{output_dir}/checkpoint.pt' best_path = f'{output_dir}/checkpoint_best.pt' torch.save(state, ckpt_path) if is_best: shutil.copyfile(ckpt_path, best_path) def init_distributed_mode(args): if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ: args.rank = int(os.environ["RANK"]) args.world_size = int(os.environ['WORLD_SIZE']) args.gpu = int(os.environ['LOCAL_RANK']) elif 'SLURM_PROCID' in os.environ: args.rank = int(os.environ['SLURM_PROCID']) args.gpu = args.rank % torch.cuda.device_count() else: print('Not using distributed mode') args.distributed = False return args.distributed = True torch.cuda.set_device(args.gpu) args.dist_backend = 'nccl' print('| distributed init (rank {}): {}'.format( args.rank, args.dist_url), flush=True) torch.distributed.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank) torch.distributed.barrier() setup_for_distributed(args.rank == 0) def scaled_all_reduce(tensors, is_scale=True): """Performs the scaled all_reduce operation on the provided tensors. The input tensors are modified in-place. Currently supports only the sum reduction operator. The reduced values are scaled by the inverse size of the world size. """ world_size = get_world_size() # There is no need for reduction in the single-proc case if world_size == 1: return tensors # Queue the reductions reductions = [] for tensor in tensors: reduction = dist.all_reduce(tensor, async_op=True) reductions.append(reduction) # Wait for reductions to finish for reduction in reductions: reduction.wait() # Scale the results if is_scale: for tensor in tensors: tensor.mul_(1.0 / world_size) return tensors def all_gather_batch(tensors): """ Performs all_gather operation on the provided tensors. """ # Queue the gathered tensors world_size = get_world_size() # There is no need for reduction in the single-proc case if world_size == 1: return tensors tensor_list = [] output_tensor = [] for tensor in tensors: tensor_all = [torch.ones_like(tensor) for _ in range(world_size)] dist.all_gather( tensor_all, tensor, async_op=False # performance opt ) tensor_list.append(tensor_all) for tensor_all in tensor_list: output_tensor.append(torch.cat(tensor_all, dim=0)) return output_tensor class GatherLayer(autograd.Function): """ Gather tensors from all workers with support for backward propagation: This implementation does not cut the gradients as torch.distributed.all_gather does. """ @staticmethod def forward(ctx, x): output = [torch.zeros_like(x) for _ in range(dist.get_world_size())] dist.all_gather(output, x) return tuple(output) @staticmethod def backward(ctx, *grads): all_gradients = torch.stack(grads) dist.all_reduce(all_gradients) return all_gradients[dist.get_rank()] def all_gather_batch_with_grad(tensors): """ Performs all_gather operation on the provided tensors. Graph remains connected for backward grad computation. """ # Queue the gathered tensors world_size = get_world_size() # There is no need for reduction in the single-proc case if world_size == 1: return tensors tensor_list = [] output_tensor = [] for tensor in tensors: tensor_all = GatherLayer.apply(tensor) tensor_list.append(tensor_all) for tensor_all in tensor_list: output_tensor.append(torch.cat(tensor_all, dim=0)) return output_tensor class AverageMeter(object): """Computes and stores the average and current value""" def __init__(self, name, fmt=':f'): self.name = name self.fmt = fmt self.reset() def reset(self): self.val = 0 self.avg = 0 self.sum = 0 self.count = 0 def update(self, val, n=1): self.val = val self.sum += val * n self.count += n self.avg = self.sum / self.count def cat(self, val, n=1): self.val = val self.sum += val * n self.count += n self.avg = self.sum / self.count def synchronize(self): if not is_dist_avail_and_initialized(): return t = torch.tensor([self.sum, self.count], dtype=torch.float64, device='cuda') dist.barrier() dist.all_reduce(t) t = t.tolist() if math.isnan(t[0]): # import pdb; pdb.set_trace() self.sum = 1e9 else: self.sum = int(t[0]) self.count = t[1] self.avg = self.sum / self.count def __str__(self): # import pdb; pdb.set_trace() fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})' return fmtstr.format(**self.__dict__) class ProgressMeter(object): def __init__(self, num_batches, meters, prefix=""): self.batch_fmtstr = self._get_batch_fmtstr(num_batches) self.meters = meters self.prefix = prefix def display(self, batch): entries = [self.prefix + self.batch_fmtstr.format(batch)] entries += [str(meter) for meter in self.meters] print('\t'.join(entries)) return entries def synchronize(self): for meter in self.meters: meter.synchronize() def _get_batch_fmtstr(self, num_batches): num_digits = len(str(num_batches // 1)) fmt = '{:' + str(num_digits) + 'd}' return '[' + fmt + '/' + fmt.format(num_batches) + ']' def accuracy(output, target, topk=(1,)): """Computes the accuracy over the k top predictions for the specified values of k""" with torch.no_grad(): maxk = max(topk) batch_size = target.size(0) _, pred = output.topk(maxk, 1, True, True) pred = pred.t() correct = pred.eq(target.reshape(1, -1).expand_as(pred)) res = [] for k in topk: correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True) res.append(correct_k.mul_(100.0 / batch_size)) return res # Calculate the Metrics for event classification def d_prime(auc): standard_normal = stats.norm() d_prime = standard_normal.ppf(auc) * np.sqrt(2.0) return d_prime def calculate_stats(output, target): """Calculate statistics including mAP, AUC, etc. Args: output: 2d array, (samples_num, classes_num) target: 2d array, (samples_num, classes_num) Returns: stats: list of statistic of each class. """ classes_num = target.shape[-1] stats = [] output = output.cpu() target = target.cpu() # Accuracy, only used for single-label classification such as esc-50, not for multiple label one such as AudioSet acc = metrics.accuracy_score(np.argmax(target, 1), np.argmax(output, 1)) # Class-wise statistics for k in range(classes_num): # Average precision avg_precision = metrics.average_precision_score( target[:, k], output[:, k], average=None) # AUC try: auc = metrics.roc_auc_score(target[:, k], output[:, k], average=None) # Precisions, recalls (precisions, recalls, thresholds) = metrics.precision_recall_curve( target[:, k], output[:, k]) # FPR, TPR (fpr, tpr, thresholds) = metrics.roc_curve(target[:, k], output[:, k]) save_every_steps = 1000 # Sample statistics to reduce size dict = {'precisions': precisions[0::save_every_steps], 'recalls': recalls[0::save_every_steps], 'AP': avg_precision, 'fpr': fpr[0::save_every_steps], 'fnr': 1. - tpr[0::save_every_steps], 'auc': auc, # note acc is not class-wise, this is just to keep consistent with other metrics 'acc': acc } except: dict = {'precisions': -1, 'recalls': -1, 'AP': avg_precision, 'fpr': -1, 'fnr': -1, 'auc': -1, # note acc is not class-wise, this is just to keep consistent with other metrics 'acc': acc } print('class {:s} no true sample'.format(str(k))) stats.append(dict) return stats ### for retrieval task def get_similarity(a, b): cos_sim = dot(a, b) / (norm(a) * norm(b)) return cos_sim # get mean def get_sim_mat(a, b): B = a.shape[0] sim_mat = np.empty([B, B]) for i in range(B): for j in range(B): sim_mat[i, j] = get_similarity(a[i, :], b[j, :]) return sim_mat def compute_metrics(x): sx = np.sort(-x, axis=1) d = np.diag(-x) d = d[:, np.newaxis] ind = sx - d ind = np.where(ind == 0) ind = ind[1] metrics = {} metrics['R1'] = float(np.sum(ind == 0)) / len(ind) metrics['R5'] = float(np.sum(ind < 5)) / len(ind) metrics['R10'] = float(np.sum(ind < 10)) / len(ind) metrics['MR'] = np.median(ind) + 1 return metrics