equiav / utils.py
wendell0218's picture
Upload folder using huggingface_hub
c8ef6d5 verified
import numpy as np
from numpy import dot
from numpy.linalg import norm
import os
from scipy import stats
import shutil
from sklearn import metrics
import torch
from torch import Tensor
import torch.distributed as dist
import torch.autograd as autograd
from typing import TypeVar, Optional, Iterator, Sequence
from torch.utils.data import Dataset, Sampler, DistributedSampler
import math
T_co = TypeVar('T_co', covariant=True)
class WeightedRandomSampler(Sampler[int]):
r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights).
Args:
weights (sequence) : a sequence of weights, not necessary summing up to one
num_samples (int): number of samples to draw
replacement (bool): if ``True``, samples are drawn with replacement.
If not, they are drawn without replacement, which means that when a
sample index is drawn for a row, it cannot be drawn again for that row.
generator (Generator): Generator used in sampling.
Example:
>>> list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True))
[4, 4, 1, 4, 5]
>>> list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False))
[0, 1, 4, 3, 2]
"""
weights: Tensor
num_samples: int
replacement: bool
def __init__(self, weights: Sequence[float], num_samples: int,
replacement: bool = True, generator=None) -> None:
if not isinstance(num_samples, int) or isinstance(num_samples, bool) or \
num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples={}".format(num_samples))
if not isinstance(replacement, bool):
raise ValueError("replacement should be a boolean value, but got "
"replacement={}".format(replacement))
self.weights = torch.as_tensor(weights, dtype=torch.double)
self.num_samples = num_samples
self.replacement = replacement
self.generator = generator
def __iter__(self) -> Iterator[int]:
rand_tensor = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator)
yield from iter(rand_tensor.tolist())
def __len__(self) -> int:
return self.num_samples
class DistributedSamplerWrapper(DistributedSampler):
def __init__(
self, sampler, dataset,
num_replicas=None,
rank=None,
shuffle: bool = True):
super(DistributedSamplerWrapper, self).__init__(
dataset, num_replicas, rank, shuffle)
# source: @awaelchli https://github.com/PyTorchLightning/pytorch-lightning/issues/3238
self.sampler = sampler
def __iter__(self):
if self.sampler.generator is None:
self.sampler.generator = torch.Generator()
self.sampler.generator.manual_seed(self.seed + self.epoch)
indices = list(self.sampler)
if self.epoch == 0:
print(f"\n DistributedSamplerWrapper : {indices[:10]} \n\n")
indices = indices[self.rank:self.total_size:self.num_replicas]
return iter(indices)
class DistributedWeightedSampler(Sampler):
weights: Tensor
num_samples: int
replacement: bool
#dataset_train, samples_weight, num_replicas=num_tasks, rank=global_rank
def __init__(self, dataset: Dataset, weights: Sequence[float], num_replicas: Optional[int] = None,
rank: Optional[int] = None, replacement: bool = True, shuffle: bool = True,
seed: int = 0, drop_last: bool = False) -> None:
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
if rank >= num_replicas or rank < 0:
raise ValueError(
"Invalid rank {}, rank should be in the interval"
" [0, {}]".format(rank, num_replicas - 1))
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.epoch = 0
self.drop_last = drop_last
if self.drop_last and len(self.dataset) % self.num_replicas != 0: # type: ignore[arg-type]
# Split to nearest available length that is evenly divisible.
# This is to ensure each rank receives the same amount of data when
# using this Sampler.
self.num_samples = math.ceil(
(len(self.dataset) - self.num_replicas) / self.num_replicas # type: ignore[arg-type]
)
else:
self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) # type: ignore[arg-type]
self.total_size = self.num_samples * self.num_replicas
self.replacement = replacement
self.weights = torch.from_numpy(weights)
self.shuffle = shuffle
self.seed = seed
def __iter__(self) -> Iterator[T_co]:
# deterministically shuffle based on epoch
if self.shuffle:
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
if not self.drop_last:
# add extra samples to make it evenly divisible
padding_size = self.total_size - len(indices)
if padding_size <= len(indices):
indices += indices[:padding_size]
else:
indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
else:
indices = indices[:self.total_size]
assert len(indices) == self.total_size
# subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
# # get targets (you can alternatively pass them in __init__, if this op is expensive)
# targets = self.dataset.targets
# # select only the wanted targets for this subsample
# targets = torch.tensor(targets)[indices]
# assert len(targets) == self.num_samples
# # randomly sample this subset, producing balanced classes
# weights = self.calculate_weights(targets)
weights = self.weights[indices]
subsample_rand_tensor = torch.multinomial(weights, self.num_samples, self.replacement)
# now map these target indicies back to the original dataset index...
dataset_indices = torch.tensor(indices)[subsample_rand_tensor]
return iter(dataset_indices.tolist())
# return iter(indices)
def __len__(self) -> int:
return self.num_samples
def set_epoch(self, epoch) -> None:
self.epoch = epoch
def off_diagonal(x):
n, m = x.shape
assert n == m
return x.flatten()[:-1].view(n - 1, n + 1)[:, 1:].flatten()
def is_dist_avail_and_initialized():
if not dist.is_available():
return False
if not dist.is_initialized():
return False
return True
def get_world_size():
if not is_dist_avail_and_initialized():
return 1
return dist.get_world_size()
def get_rank():
if not is_dist_avail_and_initialized():
return 0
return dist.get_rank()
def is_main_process():
return get_rank() == 0
def save_on_master(state, is_best, output_dir):
if is_main_process():
ckpt_path = f'{output_dir}/checkpoint.pt'
best_path = f'{output_dir}/checkpoint_best.pt'
torch.save(state, ckpt_path)
if is_best:
shutil.copyfile(ckpt_path, best_path)
def init_distributed_mode(args):
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
args.rank = int(os.environ["RANK"])
args.world_size = int(os.environ['WORLD_SIZE'])
args.gpu = int(os.environ['LOCAL_RANK'])
elif 'SLURM_PROCID' in os.environ:
args.rank = int(os.environ['SLURM_PROCID'])
args.gpu = args.rank % torch.cuda.device_count()
else:
print('Not using distributed mode')
args.distributed = False
return
args.distributed = True
torch.cuda.set_device(args.gpu)
args.dist_backend = 'nccl'
print('| distributed init (rank {}): {}'.format(
args.rank, args.dist_url), flush=True)
torch.distributed.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)
torch.distributed.barrier()
setup_for_distributed(args.rank == 0)
def scaled_all_reduce(tensors, is_scale=True):
"""Performs the scaled all_reduce operation on the provided tensors.
The input tensors are modified in-place. Currently supports only the sum
reduction operator. The reduced values are scaled by the inverse size of the
world size.
"""
world_size = get_world_size()
# There is no need for reduction in the single-proc case
if world_size == 1:
return tensors
# Queue the reductions
reductions = []
for tensor in tensors:
reduction = dist.all_reduce(tensor, async_op=True)
reductions.append(reduction)
# Wait for reductions to finish
for reduction in reductions:
reduction.wait()
# Scale the results
if is_scale:
for tensor in tensors:
tensor.mul_(1.0 / world_size)
return tensors
def all_gather_batch(tensors):
"""
Performs all_gather operation on the provided tensors.
"""
# Queue the gathered tensors
world_size = get_world_size()
# There is no need for reduction in the single-proc case
if world_size == 1:
return tensors
tensor_list = []
output_tensor = []
for tensor in tensors:
tensor_all = [torch.ones_like(tensor) for _ in range(world_size)]
dist.all_gather(
tensor_all,
tensor,
async_op=False # performance opt
)
tensor_list.append(tensor_all)
for tensor_all in tensor_list:
output_tensor.append(torch.cat(tensor_all, dim=0))
return output_tensor
class GatherLayer(autograd.Function):
"""
Gather tensors from all workers with support for backward propagation:
This implementation does not cut the gradients as torch.distributed.all_gather does.
"""
@staticmethod
def forward(ctx, x):
output = [torch.zeros_like(x) for _ in range(dist.get_world_size())]
dist.all_gather(output, x)
return tuple(output)
@staticmethod
def backward(ctx, *grads):
all_gradients = torch.stack(grads)
dist.all_reduce(all_gradients)
return all_gradients[dist.get_rank()]
def all_gather_batch_with_grad(tensors):
"""
Performs all_gather operation on the provided tensors.
Graph remains connected for backward grad computation.
"""
# Queue the gathered tensors
world_size = get_world_size()
# There is no need for reduction in the single-proc case
if world_size == 1:
return tensors
tensor_list = []
output_tensor = []
for tensor in tensors:
tensor_all = GatherLayer.apply(tensor)
tensor_list.append(tensor_all)
for tensor_all in tensor_list:
output_tensor.append(torch.cat(tensor_all, dim=0))
return output_tensor
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self, name, fmt=':f'):
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def cat(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def synchronize(self):
if not is_dist_avail_and_initialized():
return
t = torch.tensor([self.sum, self.count], dtype=torch.float64, device='cuda')
dist.barrier()
dist.all_reduce(t)
t = t.tolist()
if math.isnan(t[0]):
# import pdb; pdb.set_trace()
self.sum = 1e9
else:
self.sum = int(t[0])
self.count = t[1]
self.avg = self.sum / self.count
def __str__(self):
# import pdb; pdb.set_trace()
fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
return fmtstr.format(**self.__dict__)
class ProgressMeter(object):
def __init__(self, num_batches, meters, prefix=""):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = prefix
def display(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
print('\t'.join(entries))
return entries
def synchronize(self):
for meter in self.meters:
meter.synchronize()
def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = '{:' + str(num_digits) + 'd}'
return '[' + fmt + '/' + fmt.format(num_batches) + ']'
def accuracy(output, target, topk=(1,)):
"""Computes the accuracy over the k top predictions for the specified values of k"""
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.reshape(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
# Calculate the Metrics for event classification
def d_prime(auc):
standard_normal = stats.norm()
d_prime = standard_normal.ppf(auc) * np.sqrt(2.0)
return d_prime
def calculate_stats(output, target):
"""Calculate statistics including mAP, AUC, etc.
Args:
output: 2d array, (samples_num, classes_num)
target: 2d array, (samples_num, classes_num)
Returns:
stats: list of statistic of each class.
"""
classes_num = target.shape[-1]
stats = []
output = output.cpu()
target = target.cpu()
# Accuracy, only used for single-label classification such as esc-50, not for multiple label one such as AudioSet
acc = metrics.accuracy_score(np.argmax(target, 1), np.argmax(output, 1))
# Class-wise statistics
for k in range(classes_num):
# Average precision
avg_precision = metrics.average_precision_score(
target[:, k], output[:, k], average=None)
# AUC
try:
auc = metrics.roc_auc_score(target[:, k], output[:, k], average=None)
# Precisions, recalls
(precisions, recalls, thresholds) = metrics.precision_recall_curve(
target[:, k], output[:, k])
# FPR, TPR
(fpr, tpr, thresholds) = metrics.roc_curve(target[:, k], output[:, k])
save_every_steps = 1000 # Sample statistics to reduce size
dict = {'precisions': precisions[0::save_every_steps],
'recalls': recalls[0::save_every_steps],
'AP': avg_precision,
'fpr': fpr[0::save_every_steps],
'fnr': 1. - tpr[0::save_every_steps],
'auc': auc,
# note acc is not class-wise, this is just to keep consistent with other metrics
'acc': acc
}
except:
dict = {'precisions': -1,
'recalls': -1,
'AP': avg_precision,
'fpr': -1,
'fnr': -1,
'auc': -1,
# note acc is not class-wise, this is just to keep consistent with other metrics
'acc': acc
}
print('class {:s} no true sample'.format(str(k)))
stats.append(dict)
return stats
### for retrieval task
def get_similarity(a, b):
cos_sim = dot(a, b) / (norm(a) * norm(b))
return cos_sim
# get mean
def get_sim_mat(a, b):
B = a.shape[0]
sim_mat = np.empty([B, B])
for i in range(B):
for j in range(B):
sim_mat[i, j] = get_similarity(a[i, :], b[j, :])
return sim_mat
def compute_metrics(x):
sx = np.sort(-x, axis=1)
d = np.diag(-x)
d = d[:, np.newaxis]
ind = sx - d
ind = np.where(ind == 0)
ind = ind[1]
metrics = {}
metrics['R1'] = float(np.sum(ind == 0)) / len(ind)
metrics['R5'] = float(np.sum(ind < 5)) / len(ind)
metrics['R10'] = float(np.sum(ind < 10)) / len(ind)
metrics['MR'] = np.median(ind) + 1
return metrics