Spaces:
Runtime error
Runtime error
| import os | |
| import torch | |
| from torch.utils.data import DataLoader | |
| from torchvision import datasets, transforms | |
| from torchvision.transforms.functional import rotate | |
| import config as c | |
| import sklearn.metrics as sk | |
| import numpy as np | |
| from copy import deepcopy | |
| def stable_cumsum(arr, rtol=1e-05, atol=1e-08): | |
| """Use high precision for cumsum and check that final value matches sum | |
| Parameters | |
| ---------- | |
| arr : array-like | |
| To be cumulatively summed as flat | |
| rtol : float | |
| Relative tolerance, see ``np.allclose`` | |
| atol : float | |
| Absolute tolerance, see ``np.allclose`` | |
| """ | |
| out = np.cumsum(arr, dtype=np.float64) | |
| expected = np.sum(arr, dtype=np.float64) | |
| if not np.allclose(out[-1], expected, rtol=rtol, atol=atol): | |
| raise RuntimeError('cumsum was found to be unstable: ' | |
| 'its last element does not correspond to sum') | |
| return out | |
| def fpr_and_fdr_at_recall(y_true, y_score, recall_level=0.95, pos_label=None): | |
| classes = np.unique(y_true) | |
| if (pos_label is None and | |
| not (np.array_equal(classes, [0, 1]) or | |
| np.array_equal(classes, [-1, 1]) or | |
| np.array_equal(classes, [0]) or | |
| np.array_equal(classes, [-1]) or | |
| np.array_equal(classes, [1]))): | |
| raise ValueError("Data is not binary and pos_label is not specified") | |
| elif pos_label is None: | |
| pos_label = 1. | |
| # make y_true a boolean vector | |
| y_true = (y_true == pos_label) | |
| # sort scores and corresponding truth values | |
| desc_score_indices = np.argsort(y_score, kind="mergesort")[::-1] | |
| y_score = y_score[desc_score_indices] | |
| #print(y_score) | |
| y_true = y_true[desc_score_indices] | |
| # y_score typically has many tied values. Here we extract | |
| # the indices associated with the distinct values. We also | |
| # concatenate a value for the end of the curve. | |
| distinct_value_indices = np.where(np.diff(y_score))[0] | |
| threshold_idxs = np.r_[distinct_value_indices, y_true.size - 1] | |
| # accumulate the true positives with decreasing threshold | |
| tps = stable_cumsum(y_true)[threshold_idxs] | |
| fps = 1 + threshold_idxs - tps # add one because of zero-based indexing | |
| thresholds = y_score[threshold_idxs] | |
| recall = tps / tps[-1] | |
| last_ind = tps.searchsorted(tps[-1]) | |
| sl = slice(last_ind, None, -1) # [last_ind::-1] | |
| recall, fps, tps, thresholds = np.r_[recall[sl], 1], np.r_[fps[sl], 0], np.r_[tps[sl], 0], thresholds[sl] | |
| #print(recall) | |
| cutoff = np.argmin(np.abs(recall - recall_level)) | |
| return fps[cutoff] / (np.sum(np.logical_not(y_true))), thresholds[cutoff] # , fps[cutoff]/(fps[cutoff] + tps[cutoff]) | |
| def get_random_transforms(): | |
| augmentative_transforms = [] | |
| if c.transf_rotations: | |
| augmentative_transforms += [transforms.RandomRotation(180)] | |
| if c.transf_brightness > 0.0 or c.transf_contrast > 0.0 or c.transf_saturation > 0.0: | |
| augmentative_transforms += [transforms.ColorJitter(brightness=c.transf_brightness, contrast=c.transf_contrast, | |
| saturation=c.transf_saturation)] | |
| tfs = [transforms.Resize(c.img_size)] + augmentative_transforms + [transforms.ToTensor(), | |
| transforms.Normalize(c.norm_mean, c.norm_std)] | |
| transform_train = transforms.Compose(tfs) | |
| return transform_train | |
| def get_fixed_transforms(degrees): | |
| cust_rot = lambda x: rotate(x, degrees, False, False, None) | |
| augmentative_transforms = [cust_rot] | |
| if c.transf_brightness > 0.0 or c.transf_contrast > 0.0 or c.transf_saturation > 0.0: | |
| augmentative_transforms += [ | |
| transforms.ColorJitter(brightness=c.transf_brightness, contrast=c.transf_contrast, | |
| saturation=c.transf_saturation)] | |
| tfs = [transforms.Resize(c.img_size)] + augmentative_transforms + [transforms.ToTensor(), | |
| transforms.Normalize(c.norm_mean, | |
| c.norm_std)] | |
| return transforms.Compose(tfs) | |
| def t2np(tensor): | |
| '''pytorch tensor -> numpy array''' | |
| return tensor.cpu().data.numpy() if tensor is not None else None | |
| def get_loss(z, jac): | |
| '''check equation 4 of the paper why this makes sense - oh and just ignore the scaling here''' | |
| return torch.mean(0.5 * torch.sum(z ** 2, dim=(1,)) - jac) / z.shape[1] | |
| # def get_loss_neg_pos(z, jac, labels): | |
| # '''损失函数:正样本接近高斯分布,负样本远离高斯分布''' | |
| # # 计算流模型的标准生成损失 | |
| # normalizing_loss = torch.mean(0.5 * torch.sum(z ** 2, dim=(1,)) - jac) / z.shape[1] | |
| # # 对正样本(标签为0)希望其潜在特征接近高斯分布 | |
| # positive_loss = normalizing_loss * (labels == 0).float() | |
| # # 对负样本(标签为1)希望其潜在特征远离高斯分布 | |
| # negative_loss = -normalizing_loss * (labels == 1).float() | |
| # # 计算总损失 | |
| # total_loss = torch.mean(positive_loss + negative_loss) | |
| # return total_loss | |
| def get_loss_neg_pos(z, jac, labels, target_distribution="gaussian", margin = 500): | |
| # 计算流模型的标准生成损失 | |
| loss_sample_pos = 0.5 * torch.sum((z-10) ** 2, dim=(1,)) - jac #损失是否应该都大于零 | |
| loss_sample_neg = 0.5 * torch.sum(z ** 2, dim=(1,)) - jac | |
| positive_loss = loss_sample_pos * (labels == 0).float() | |
| negative_loss = loss_sample_neg * (labels == 1).float() | |
| # 计算总损失 | |
| total_loss = torch.mean(positive_loss + negative_loss )/ z.shape[1] | |
| return total_loss | |
| def get_loss_neg_pos_margin(z, jac, labels, margin = 500): | |
| # 计算流模型的标准生成损失 | |
| # print(jac) | |
| # jac = torch.clamp(jac, min=1e-5, max=1e5) | |
| # z = torch.clamp(z, min=-1e5, max=1e5) | |
| loss_sample = 0.5 * torch.sum(z ** 2, dim=(1,)) #损失是否应该都大于零 | |
| # print(loss_sample) | |
| # positive_loss = (-loss_sample) * (labels == 0).float()* (loss_sample <margin).float() | |
| # negative_loss = (loss_sample) * (labels == 1).float() | |
| # # print(positive_loss) | |
| # # print(negative_loss) | |
| # # 计算总损失 | |
| # total_loss = torch.mean(negative_loss + positive_loss-jac)/ z.shape[1] | |
| positive_loss = (-loss_sample-jac) * (labels == 0).float()* (loss_sample <margin).float() | |
| negative_loss = (loss_sample-jac) * (labels == 1).float() | |
| # print(positive_loss) | |
| # print(negative_loss) | |
| # 计算总损失 | |
| total_loss = torch.mean(negative_loss + positive_loss)/ z.shape[1] | |
| # print(total_loss) | |
| return total_loss | |
| def get_loss_outlier(z, jac, labels, margin = 500): | |
| # 计算流模型的标准生成损失 | |
| # print(jac) | |
| # jac = torch.clamp(jac, min=1e-5, max=1e5) | |
| # z = torch.clamp(z, min=-1e5, max=1e5) | |
| loss_sample = 0.5 * torch.sum(z ** 2, dim=(1,)) #损失是否应该都大于零 | |
| # print(loss_sample) | |
| # positive_loss = (-loss_sample) * (labels == 0).float()* (loss_sample <margin).float() | |
| # negative_loss = (loss_sample) * (labels == 1).float() | |
| # # print(positive_loss) | |
| # # print(negative_loss) | |
| # # 计算总损失 | |
| # total_loss = torch.mean(negative_loss + positive_loss-jac)/ z.shape[1] | |
| positive_loss = (-loss_sample-jac) * (labels == 0).float()* (loss_sample <margin).float() | |
| negative_loss = (loss_sample-jac) * (labels == 1).float() | |
| # print(positive_loss) | |
| # print(negative_loss) | |
| # 计算总损失 | |
| total_loss = torch.mean(negative_loss + positive_loss)/ z.shape[1] | |
| # print(total_loss) | |
| return total_loss | |
| def get_loss_outlier_conv(z1, z2, jac, labels, margin = 500): | |
| loss_sample = 0.5 * torch.sum(z1 ** 2, dim=(1,)) #损失是否应该都大于零 | |
| positive_loss = (-loss_sample-jac) * (labels == 0).float()* (loss_sample <margin).float() | |
| negative_loss = (loss_sample-jac) * (labels == 1).float() | |
| shape_loss = torch.mean(negative_loss + positive_loss)/ z1.shape[1] | |
| consistent_loss = 0 | |
| cosine_similarity = torch.nn.functional.cosine_similarity(z1, z2) | |
| for i in range(len(labels)): | |
| if labels[i] == 0: | |
| # 对于正样本,余弦相似度接近1,最小化其差距 | |
| consistent_loss += (1 - cosine_similarity[i]) #* (cosine_similarity[i]<0.95).float() # 趋向1,差距越小越好 | |
| elif labels[i] == 1: | |
| # 对于负样本,余弦相似度接近0,最小化其差距 | |
| consistent_loss += cosine_similarity[i] *0.1 * (cosine_similarity[i] >0.5).float() # 趋向0,差距越小越好 | |
| consistent_loss = consistent_loss/len(labels) | |
| # total_loss = shape_loss + consistent_loss * 0.05 | |
| total_loss = consistent_loss | |
| return shape_loss, consistent_loss, total_loss | |
| def get_measures(_pos, _neg, recall_level=0.95): | |
| pos = np.array(_pos[:]).reshape((-1, 1)) | |
| neg = np.array(_neg[:]).reshape((-1, 1)) | |
| examples = np.squeeze(np.vstack((pos, neg))) | |
| labels = np.zeros(len(examples), dtype=np.int32) | |
| labels[:len(pos)] += 1 | |
| auroc = sk.roc_auc_score(labels, examples) | |
| aupr = sk.average_precision_score(labels, examples) | |
| fpr, threshold = fpr_and_fdr_at_recall(labels, examples, recall_level) | |
| return auroc, aupr, fpr | |
| def find_best_threshold(y_true, y_pred): | |
| "We assume first half is real 0, and the second half is fake 1" | |
| N = y_true.shape[0] | |
| if y_pred[0:N//2].max() <= y_pred[N//2:N].min(): # perfectly separable case | |
| return (y_pred[0:N//2].max() + y_pred[N//2:N].min()) / 2 | |
| best_acc = 0 | |
| best_thres = 0 | |
| for thres in y_pred: | |
| temp = deepcopy(y_pred) | |
| temp[temp>=thres] = 1 | |
| temp[temp<thres] = 0 | |
| acc = (temp == y_true).sum() / N | |
| if acc >= best_acc: | |
| best_thres = thres | |
| best_acc = acc | |
| return best_thres | |
| def get_loss_neg(z, jac, labels, margin = 500): | |
| # 计算流模型的标准生成损失 | |
| # print(jac) | |
| loss_sample = 0.5 * torch.sum(z ** 2, dim=(1,)) -jac #损失是否应该都大于零 | |
| # print(loss_sample) | |
| # positive_loss = (-loss_sample) * (labels == 0).float()* (loss_sample <margin).float() | |
| negative_loss = (loss_sample) * (labels == 0).float() | |
| # print(positive_loss) | |
| # print(negative_loss) | |
| # 计算总损失 | |
| total_loss = torch.mean(negative_loss)/ z.shape[1] | |
| # print(total_loss) | |
| return total_loss | |
| def get_loss_neg_pos_margin_normal(z, jac, labels, target_distribution="gaussian", margin =500): | |
| # 计算流模型的标准生成损失 | |
| # print(jac.shape) | |
| loss_sample = 0.5 * torch.sum(z ** 2, dim=(1,)) #损失是否应该都大于零 | |
| # print(loss_sample) | |
| positive_loss = loss_sample * (labels == 0).float() | |
| negative_loss = (-loss_sample) * (labels == 1).float()* (loss_sample <margin).float() | |
| # print(positive_loss) | |
| # print(negative_loss) | |
| # 计算总损失 | |
| total_loss = torch.mean(negative_loss + positive_loss - jac)/ z.shape[1] | |
| return total_loss | |