|
|
import numpy as np |
|
|
from numpy import dot |
|
|
from numpy.linalg import norm |
|
|
import os |
|
|
from scipy import stats |
|
|
import shutil |
|
|
from sklearn import metrics |
|
|
import torch |
|
|
from torch import Tensor |
|
|
import torch.distributed as dist |
|
|
import torch.autograd as autograd |
|
|
from typing import TypeVar, Optional, Iterator, Sequence |
|
|
from torch.utils.data import Dataset, Sampler, DistributedSampler |
|
|
import math |
|
|
|
|
|
T_co = TypeVar('T_co', covariant=True) |
|
|
|
|
|
class WeightedRandomSampler(Sampler[int]): |
|
|
r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights). |
|
|
|
|
|
Args: |
|
|
weights (sequence) : a sequence of weights, not necessary summing up to one |
|
|
num_samples (int): number of samples to draw |
|
|
replacement (bool): if ``True``, samples are drawn with replacement. |
|
|
If not, they are drawn without replacement, which means that when a |
|
|
sample index is drawn for a row, it cannot be drawn again for that row. |
|
|
generator (Generator): Generator used in sampling. |
|
|
|
|
|
Example: |
|
|
>>> list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True)) |
|
|
[4, 4, 1, 4, 5] |
|
|
>>> list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False)) |
|
|
[0, 1, 4, 3, 2] |
|
|
""" |
|
|
weights: Tensor |
|
|
num_samples: int |
|
|
replacement: bool |
|
|
|
|
|
def __init__(self, weights: Sequence[float], num_samples: int, |
|
|
replacement: bool = True, generator=None) -> None: |
|
|
if not isinstance(num_samples, int) or isinstance(num_samples, bool) or \ |
|
|
num_samples <= 0: |
|
|
raise ValueError("num_samples should be a positive integer " |
|
|
"value, but got num_samples={}".format(num_samples)) |
|
|
if not isinstance(replacement, bool): |
|
|
raise ValueError("replacement should be a boolean value, but got " |
|
|
"replacement={}".format(replacement)) |
|
|
self.weights = torch.as_tensor(weights, dtype=torch.double) |
|
|
self.num_samples = num_samples |
|
|
self.replacement = replacement |
|
|
self.generator = generator |
|
|
|
|
|
def __iter__(self) -> Iterator[int]: |
|
|
rand_tensor = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator) |
|
|
yield from iter(rand_tensor.tolist()) |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return self.num_samples |
|
|
|
|
|
class DistributedSamplerWrapper(DistributedSampler): |
|
|
def __init__( |
|
|
self, sampler, dataset, |
|
|
num_replicas=None, |
|
|
rank=None, |
|
|
shuffle: bool = True): |
|
|
super(DistributedSamplerWrapper, self).__init__( |
|
|
dataset, num_replicas, rank, shuffle) |
|
|
|
|
|
self.sampler = sampler |
|
|
|
|
|
def __iter__(self): |
|
|
if self.sampler.generator is None: |
|
|
self.sampler.generator = torch.Generator() |
|
|
self.sampler.generator.manual_seed(self.seed + self.epoch) |
|
|
indices = list(self.sampler) |
|
|
if self.epoch == 0: |
|
|
print(f"\n DistributedSamplerWrapper : {indices[:10]} \n\n") |
|
|
indices = indices[self.rank:self.total_size:self.num_replicas] |
|
|
return iter(indices) |
|
|
|
|
|
|
|
|
class DistributedWeightedSampler(Sampler): |
|
|
|
|
|
weights: Tensor |
|
|
num_samples: int |
|
|
replacement: bool |
|
|
|
|
|
|
|
|
def __init__(self, dataset: Dataset, weights: Sequence[float], num_replicas: Optional[int] = None, |
|
|
rank: Optional[int] = None, replacement: bool = True, shuffle: bool = True, |
|
|
seed: int = 0, drop_last: bool = False) -> None: |
|
|
if num_replicas is None: |
|
|
if not dist.is_available(): |
|
|
raise RuntimeError("Requires distributed package to be available") |
|
|
num_replicas = dist.get_world_size() |
|
|
if rank is None: |
|
|
if not dist.is_available(): |
|
|
raise RuntimeError("Requires distributed package to be available") |
|
|
rank = dist.get_rank() |
|
|
if rank >= num_replicas or rank < 0: |
|
|
raise ValueError( |
|
|
"Invalid rank {}, rank should be in the interval" |
|
|
" [0, {}]".format(rank, num_replicas - 1)) |
|
|
self.dataset = dataset |
|
|
self.num_replicas = num_replicas |
|
|
self.rank = rank |
|
|
self.epoch = 0 |
|
|
self.drop_last = drop_last |
|
|
if self.drop_last and len(self.dataset) % self.num_replicas != 0: |
|
|
|
|
|
|
|
|
|
|
|
self.num_samples = math.ceil( |
|
|
(len(self.dataset) - self.num_replicas) / self.num_replicas |
|
|
) |
|
|
else: |
|
|
self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) |
|
|
self.total_size = self.num_samples * self.num_replicas |
|
|
self.replacement = replacement |
|
|
self.weights = torch.from_numpy(weights) |
|
|
self.shuffle = shuffle |
|
|
self.seed = seed |
|
|
|
|
|
def __iter__(self) -> Iterator[T_co]: |
|
|
|
|
|
if self.shuffle: |
|
|
g = torch.Generator() |
|
|
g.manual_seed(self.seed + self.epoch) |
|
|
indices = torch.randperm(len(self.dataset), generator=g).tolist() |
|
|
else: |
|
|
indices = list(range(len(self.dataset))) |
|
|
|
|
|
if not self.drop_last: |
|
|
|
|
|
padding_size = self.total_size - len(indices) |
|
|
if padding_size <= len(indices): |
|
|
indices += indices[:padding_size] |
|
|
else: |
|
|
indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size] |
|
|
else: |
|
|
indices = indices[:self.total_size] |
|
|
assert len(indices) == self.total_size |
|
|
|
|
|
|
|
|
indices = indices[self.rank:self.total_size:self.num_replicas] |
|
|
assert len(indices) == self.num_samples |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
weights = self.weights[indices] |
|
|
|
|
|
subsample_rand_tensor = torch.multinomial(weights, self.num_samples, self.replacement) |
|
|
|
|
|
dataset_indices = torch.tensor(indices)[subsample_rand_tensor] |
|
|
return iter(dataset_indices.tolist()) |
|
|
|
|
|
|
|
|
def __len__(self) -> int: |
|
|
return self.num_samples |
|
|
|
|
|
def set_epoch(self, epoch) -> None: |
|
|
self.epoch = epoch |
|
|
|
|
|
def off_diagonal(x): |
|
|
n, m = x.shape |
|
|
assert n == m |
|
|
return x.flatten()[:-1].view(n - 1, n + 1)[:, 1:].flatten() |
|
|
|
|
|
def is_dist_avail_and_initialized(): |
|
|
if not dist.is_available(): |
|
|
return False |
|
|
if not dist.is_initialized(): |
|
|
return False |
|
|
return True |
|
|
|
|
|
def get_world_size(): |
|
|
if not is_dist_avail_and_initialized(): |
|
|
return 1 |
|
|
return dist.get_world_size() |
|
|
|
|
|
|
|
|
def get_rank(): |
|
|
if not is_dist_avail_and_initialized(): |
|
|
return 0 |
|
|
return dist.get_rank() |
|
|
|
|
|
|
|
|
def is_main_process(): |
|
|
return get_rank() == 0 |
|
|
|
|
|
|
|
|
def save_on_master(state, is_best, output_dir): |
|
|
if is_main_process(): |
|
|
ckpt_path = f'{output_dir}/checkpoint.pt' |
|
|
best_path = f'{output_dir}/checkpoint_best.pt' |
|
|
torch.save(state, ckpt_path) |
|
|
if is_best: |
|
|
shutil.copyfile(ckpt_path, best_path) |
|
|
|
|
|
def init_distributed_mode(args): |
|
|
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ: |
|
|
args.rank = int(os.environ["RANK"]) |
|
|
args.world_size = int(os.environ['WORLD_SIZE']) |
|
|
args.gpu = int(os.environ['LOCAL_RANK']) |
|
|
elif 'SLURM_PROCID' in os.environ: |
|
|
args.rank = int(os.environ['SLURM_PROCID']) |
|
|
args.gpu = args.rank % torch.cuda.device_count() |
|
|
else: |
|
|
print('Not using distributed mode') |
|
|
args.distributed = False |
|
|
return |
|
|
|
|
|
args.distributed = True |
|
|
|
|
|
torch.cuda.set_device(args.gpu) |
|
|
args.dist_backend = 'nccl' |
|
|
print('| distributed init (rank {}): {}'.format( |
|
|
args.rank, args.dist_url), flush=True) |
|
|
torch.distributed.init_process_group(backend=args.dist_backend, init_method=args.dist_url, |
|
|
world_size=args.world_size, rank=args.rank) |
|
|
torch.distributed.barrier() |
|
|
setup_for_distributed(args.rank == 0) |
|
|
|
|
|
|
|
|
def scaled_all_reduce(tensors, is_scale=True): |
|
|
"""Performs the scaled all_reduce operation on the provided tensors. |
|
|
The input tensors are modified in-place. Currently supports only the sum |
|
|
reduction operator. The reduced values are scaled by the inverse size of the |
|
|
world size. |
|
|
""" |
|
|
world_size = get_world_size() |
|
|
|
|
|
if world_size == 1: |
|
|
return tensors |
|
|
|
|
|
reductions = [] |
|
|
for tensor in tensors: |
|
|
reduction = dist.all_reduce(tensor, async_op=True) |
|
|
reductions.append(reduction) |
|
|
|
|
|
for reduction in reductions: |
|
|
reduction.wait() |
|
|
|
|
|
if is_scale: |
|
|
for tensor in tensors: |
|
|
tensor.mul_(1.0 / world_size) |
|
|
return tensors |
|
|
|
|
|
|
|
|
def all_gather_batch(tensors): |
|
|
""" |
|
|
Performs all_gather operation on the provided tensors. |
|
|
""" |
|
|
|
|
|
world_size = get_world_size() |
|
|
|
|
|
if world_size == 1: |
|
|
return tensors |
|
|
tensor_list = [] |
|
|
output_tensor = [] |
|
|
for tensor in tensors: |
|
|
tensor_all = [torch.ones_like(tensor) for _ in range(world_size)] |
|
|
dist.all_gather( |
|
|
tensor_all, |
|
|
tensor, |
|
|
async_op=False |
|
|
) |
|
|
|
|
|
tensor_list.append(tensor_all) |
|
|
|
|
|
for tensor_all in tensor_list: |
|
|
output_tensor.append(torch.cat(tensor_all, dim=0)) |
|
|
return output_tensor |
|
|
|
|
|
|
|
|
class GatherLayer(autograd.Function): |
|
|
""" |
|
|
Gather tensors from all workers with support for backward propagation: |
|
|
This implementation does not cut the gradients as torch.distributed.all_gather does. |
|
|
""" |
|
|
|
|
|
@staticmethod |
|
|
def forward(ctx, x): |
|
|
output = [torch.zeros_like(x) for _ in range(dist.get_world_size())] |
|
|
dist.all_gather(output, x) |
|
|
return tuple(output) |
|
|
|
|
|
@staticmethod |
|
|
def backward(ctx, *grads): |
|
|
all_gradients = torch.stack(grads) |
|
|
dist.all_reduce(all_gradients) |
|
|
return all_gradients[dist.get_rank()] |
|
|
|
|
|
|
|
|
def all_gather_batch_with_grad(tensors): |
|
|
""" |
|
|
Performs all_gather operation on the provided tensors. |
|
|
Graph remains connected for backward grad computation. |
|
|
""" |
|
|
|
|
|
world_size = get_world_size() |
|
|
|
|
|
if world_size == 1: |
|
|
return tensors |
|
|
tensor_list = [] |
|
|
output_tensor = [] |
|
|
|
|
|
for tensor in tensors: |
|
|
tensor_all = GatherLayer.apply(tensor) |
|
|
tensor_list.append(tensor_all) |
|
|
|
|
|
for tensor_all in tensor_list: |
|
|
output_tensor.append(torch.cat(tensor_all, dim=0)) |
|
|
return output_tensor |
|
|
|
|
|
|
|
|
class AverageMeter(object): |
|
|
"""Computes and stores the average and current value""" |
|
|
def __init__(self, name, fmt=':f'): |
|
|
self.name = name |
|
|
self.fmt = fmt |
|
|
self.reset() |
|
|
|
|
|
def reset(self): |
|
|
self.val = 0 |
|
|
self.avg = 0 |
|
|
self.sum = 0 |
|
|
self.count = 0 |
|
|
|
|
|
def update(self, val, n=1): |
|
|
self.val = val |
|
|
self.sum += val * n |
|
|
self.count += n |
|
|
self.avg = self.sum / self.count |
|
|
|
|
|
def cat(self, val, n=1): |
|
|
self.val = val |
|
|
self.sum += val * n |
|
|
self.count += n |
|
|
self.avg = self.sum / self.count |
|
|
|
|
|
def synchronize(self): |
|
|
if not is_dist_avail_and_initialized(): |
|
|
return |
|
|
t = torch.tensor([self.sum, self.count], dtype=torch.float64, device='cuda') |
|
|
dist.barrier() |
|
|
dist.all_reduce(t) |
|
|
t = t.tolist() |
|
|
if math.isnan(t[0]): |
|
|
|
|
|
self.sum = 1e9 |
|
|
else: |
|
|
self.sum = int(t[0]) |
|
|
self.count = t[1] |
|
|
self.avg = self.sum / self.count |
|
|
|
|
|
def __str__(self): |
|
|
|
|
|
fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})' |
|
|
return fmtstr.format(**self.__dict__) |
|
|
|
|
|
|
|
|
class ProgressMeter(object): |
|
|
def __init__(self, num_batches, meters, prefix=""): |
|
|
self.batch_fmtstr = self._get_batch_fmtstr(num_batches) |
|
|
self.meters = meters |
|
|
self.prefix = prefix |
|
|
|
|
|
def display(self, batch): |
|
|
entries = [self.prefix + self.batch_fmtstr.format(batch)] |
|
|
entries += [str(meter) for meter in self.meters] |
|
|
print('\t'.join(entries)) |
|
|
|
|
|
return entries |
|
|
|
|
|
def synchronize(self): |
|
|
for meter in self.meters: |
|
|
meter.synchronize() |
|
|
|
|
|
def _get_batch_fmtstr(self, num_batches): |
|
|
num_digits = len(str(num_batches // 1)) |
|
|
fmt = '{:' + str(num_digits) + 'd}' |
|
|
return '[' + fmt + '/' + fmt.format(num_batches) + ']' |
|
|
|
|
|
def accuracy(output, target, topk=(1,)): |
|
|
"""Computes the accuracy over the k top predictions for the specified values of k""" |
|
|
with torch.no_grad(): |
|
|
maxk = max(topk) |
|
|
batch_size = target.size(0) |
|
|
|
|
|
_, pred = output.topk(maxk, 1, True, True) |
|
|
pred = pred.t() |
|
|
correct = pred.eq(target.reshape(1, -1).expand_as(pred)) |
|
|
|
|
|
res = [] |
|
|
for k in topk: |
|
|
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True) |
|
|
res.append(correct_k.mul_(100.0 / batch_size)) |
|
|
return res |
|
|
|
|
|
|
|
|
|
|
|
def d_prime(auc): |
|
|
standard_normal = stats.norm() |
|
|
d_prime = standard_normal.ppf(auc) * np.sqrt(2.0) |
|
|
return d_prime |
|
|
|
|
|
def calculate_stats(output, target): |
|
|
"""Calculate statistics including mAP, AUC, etc. |
|
|
|
|
|
Args: |
|
|
output: 2d array, (samples_num, classes_num) |
|
|
target: 2d array, (samples_num, classes_num) |
|
|
|
|
|
Returns: |
|
|
stats: list of statistic of each class. |
|
|
""" |
|
|
|
|
|
classes_num = target.shape[-1] |
|
|
stats = [] |
|
|
|
|
|
output = output.cpu() |
|
|
target = target.cpu() |
|
|
|
|
|
|
|
|
acc = metrics.accuracy_score(np.argmax(target, 1), np.argmax(output, 1)) |
|
|
|
|
|
|
|
|
for k in range(classes_num): |
|
|
|
|
|
|
|
|
avg_precision = metrics.average_precision_score( |
|
|
target[:, k], output[:, k], average=None) |
|
|
|
|
|
|
|
|
try: |
|
|
auc = metrics.roc_auc_score(target[:, k], output[:, k], average=None) |
|
|
|
|
|
|
|
|
(precisions, recalls, thresholds) = metrics.precision_recall_curve( |
|
|
target[:, k], output[:, k]) |
|
|
|
|
|
|
|
|
(fpr, tpr, thresholds) = metrics.roc_curve(target[:, k], output[:, k]) |
|
|
|
|
|
save_every_steps = 1000 |
|
|
dict = {'precisions': precisions[0::save_every_steps], |
|
|
'recalls': recalls[0::save_every_steps], |
|
|
'AP': avg_precision, |
|
|
'fpr': fpr[0::save_every_steps], |
|
|
'fnr': 1. - tpr[0::save_every_steps], |
|
|
'auc': auc, |
|
|
|
|
|
'acc': acc |
|
|
} |
|
|
except: |
|
|
dict = {'precisions': -1, |
|
|
'recalls': -1, |
|
|
'AP': avg_precision, |
|
|
'fpr': -1, |
|
|
'fnr': -1, |
|
|
'auc': -1, |
|
|
|
|
|
'acc': acc |
|
|
} |
|
|
print('class {:s} no true sample'.format(str(k))) |
|
|
stats.append(dict) |
|
|
|
|
|
return stats |
|
|
|
|
|
|
|
|
|
|
|
def get_similarity(a, b): |
|
|
cos_sim = dot(a, b) / (norm(a) * norm(b)) |
|
|
return cos_sim |
|
|
|
|
|
|
|
|
def get_sim_mat(a, b): |
|
|
B = a.shape[0] |
|
|
sim_mat = np.empty([B, B]) |
|
|
for i in range(B): |
|
|
for j in range(B): |
|
|
sim_mat[i, j] = get_similarity(a[i, :], b[j, :]) |
|
|
return sim_mat |
|
|
|
|
|
def compute_metrics(x): |
|
|
sx = np.sort(-x, axis=1) |
|
|
d = np.diag(-x) |
|
|
d = d[:, np.newaxis] |
|
|
ind = sx - d |
|
|
ind = np.where(ind == 0) |
|
|
ind = ind[1] |
|
|
metrics = {} |
|
|
metrics['R1'] = float(np.sum(ind == 0)) / len(ind) |
|
|
metrics['R5'] = float(np.sum(ind < 5)) / len(ind) |
|
|
metrics['R10'] = float(np.sum(ind < 10)) / len(ind) |
|
|
metrics['MR'] = np.median(ind) + 1 |
|
|
return metrics |