| | import os |
| | import numpy as np |
| | import torch |
| | import shutil |
| | import torchvision.transforms as transforms |
| | from torch.autograd import Variable |
| | import sklearn |
| | from sklearn import metrics |
| | from sklearn.metrics import roc_curve, auc |
| | import pdb |
| |
|
| |
|
| | class AvgrageMeter(object): |
| |
|
| | def __init__(self): |
| | self.reset() |
| |
|
| | def reset(self): |
| | self.avg = 0 |
| | self.sum = 0 |
| | self.cnt = 0 |
| |
|
| | def update(self, val, n=1): |
| | self.sum += val * n |
| | self.cnt += n |
| | self.avg = self.sum / self.cnt |
| |
|
| |
|
| | def accuracy(output, target, topk=(1,)): |
| | maxk = max(topk) |
| | batch_size = target.size(0) |
| |
|
| | _, pred = output.topk(maxk, 1, True, True) |
| | pred = pred.t() |
| | correct = pred.eq(target.view(1, -1).expand_as(pred)) |
| |
|
| | res = [] |
| | for k in topk: |
| | correct_k = correct[:k].view(-1).float().sum(0) |
| | res.append(correct_k.mul_(100.0 / batch_size)) |
| | return res |
| |
|
| |
|
| | def get_threshold(score_file): |
| | with open(score_file, 'r') as file: |
| | lines = file.readlines() |
| |
|
| | data = [] |
| | count = 0.0 |
| | num_real = 0.0 |
| | num_fake = 0.0 |
| | for line in lines: |
| | count += 1 |
| | tokens = line.split() |
| | angle = float(tokens[0]) |
| | |
| | type = int(tokens[1]) |
| | data.append({'map_score': angle, 'label': type}) |
| | if type == 1: |
| | num_real += 1 |
| | else: |
| | num_fake += 1 |
| |
|
| | min_error = count |
| | min_threshold = 0.0 |
| | min_ACC = 0.0 |
| | min_ACER = 0.0 |
| | min_APCER = 0.0 |
| | min_BPCER = 0.0 |
| |
|
| | for d in data: |
| | threshold = d['map_score'] |
| |
|
| | type1 = len([s for s in data if s['map_score'] <= threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > threshold and s['label'] == 0]) |
| |
|
| | ACC = 1 - (type1 + type2) / count |
| | APCER = type2 / num_fake |
| | BPCER = type1 / num_real |
| | ACER = (APCER + BPCER) / 2.0 |
| |
|
| | if ACER < min_error: |
| | min_error = ACER |
| | min_threshold = threshold |
| | min_ACC = ACC |
| | min_ACER = ACER |
| | min_APCER = APCER |
| | min_BPCER = min_BPCER |
| |
|
| | |
| | return min_threshold, min_ACC, min_APCER, min_BPCER, min_ACER |
| |
|
| |
|
| | def test_threshold_based(threshold, score_file): |
| | with open(score_file, 'r') as file: |
| | lines = file.readlines() |
| |
|
| | data = [] |
| | count = 0.0 |
| | num_real = 0.0 |
| | num_fake = 0.0 |
| | for line in lines: |
| | count += 1 |
| | tokens = line.split() |
| | angle = float(tokens[0]) |
| | type = int(tokens[1]) |
| | data.append({'map_score': angle, 'label': type}) |
| | if type == 1: |
| | num_real += 1 |
| | else: |
| | num_fake += 1 |
| |
|
| | type1 = len([s for s in data if s['map_score'] <= threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > threshold and s['label'] == 0]) |
| |
|
| | ACC = 1 - (type1 + type2) / count |
| | APCER = type2 / num_fake |
| | BPCER = type1 / num_real |
| | ACER = (APCER + BPCER) / 2.0 |
| |
|
| | return ACC, APCER, BPCER, ACER |
| |
|
| |
|
| | def get_err_threhold(fpr, tpr, threshold): |
| | RightIndex = (tpr + (1 - fpr) - 1) |
| | right_index = np.argmax(RightIndex) |
| | best_th = threshold[right_index] |
| | err = fpr[right_index] |
| |
|
| | differ_tpr_fpr_1 = tpr + fpr - 1.0 |
| |
|
| | right_index = np.argmin(np.abs(differ_tpr_fpr_1)) |
| | best_th = threshold[right_index] |
| | err = fpr[right_index] |
| |
|
| | |
| | return err, best_th |
| |
|
| |
|
| | |
| | def performances(map_score_val_filename, map_score_test_filename): |
| | |
| | with open(map_score_val_filename, 'r') as file: |
| | lines = file.readlines() |
| | val_scores = [] |
| | val_labels = [] |
| | data = [] |
| | count = 0.0 |
| | num_real = 0.0 |
| | num_fake = 0.0 |
| | for line in lines: |
| | count += 1 |
| | tokens = line.split() |
| | score = float(tokens[0]) |
| | label = float(tokens[1]) |
| | val_scores.append(score) |
| | val_labels.append(label) |
| | data.append({'map_score': score, 'label': label}) |
| | if label == 1: |
| | num_real += 1 |
| | else: |
| | num_fake += 1 |
| |
|
| | fpr, tpr, threshold = roc_curve(val_labels, val_scores, pos_label=1) |
| | val_err, val_threshold = get_err_threhold(fpr, tpr, threshold) |
| |
|
| | type1 = len([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > val_threshold and s['label'] == 0]) |
| |
|
| | val_ACC = 1 - (type1 + type2) / count |
| | val_APCER = type2 / num_fake |
| | val_BPCER = type1 / num_real |
| | val_ACER = (val_APCER + val_BPCER) / 2.0 |
| |
|
| | |
| | with open(map_score_test_filename, 'r') as file2: |
| | lines = file2.readlines() |
| | test_scores = [] |
| | test_labels = [] |
| | data = [] |
| | count = 0.0 |
| | num_real = 0.0 |
| | num_fake = 0.0 |
| | for line in lines: |
| | count += 1 |
| | tokens = line.split() |
| | score = float(tokens[0]) |
| | label = float(tokens[1]) |
| | test_scores.append(score) |
| | test_labels.append(label) |
| | data.append({'map_score': score, 'label': label}) |
| | if label == 1: |
| | num_real += 1 |
| | else: |
| | num_fake += 1 |
| |
|
| | |
| | type1 = len([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1]) |
| | print([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > val_threshold and s['label'] == 0]) |
| | print([s for s in data if s['map_score'] > val_threshold and s['label'] == 0]) |
| |
|
| | test_ACC = 1 - (type1 + type2) / count |
| | test_APCER = type2 / num_fake |
| | test_BPCER = type1 / num_real |
| | test_ACER = (test_APCER + test_BPCER) / 2.0 |
| |
|
| | |
| | fpr_test, tpr_test, threshold_test = roc_curve(test_labels, test_scores, pos_label=1) |
| | err_test, best_test_threshold = get_err_threhold(fpr_test, tpr_test, threshold_test) |
| |
|
| | type1 = len([s for s in data if s['map_score'] <= best_test_threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > best_test_threshold and s['label'] == 0]) |
| |
|
| | test_threshold_ACC = 1 - (type1 + type2) / count |
| | test_threshold_APCER = type2 / num_fake |
| | test_threshold_BPCER = type1 / num_real |
| | test_threshold_ACER = (test_threshold_APCER + test_threshold_BPCER) / 2.0 |
| |
|
| | return val_threshold, best_test_threshold, val_ACC, val_ACER, test_ACC, test_APCER, test_BPCER, test_ACER, test_threshold_ACER |
| |
|
| |
|
| | def performances_SiW_EER(map_score_val_filename): |
| | |
| | with open(map_score_val_filename, 'r') as file: |
| | lines = file.readlines() |
| | val_scores = [] |
| | val_labels = [] |
| | data = [] |
| | count = 0.0 |
| | num_real = 0.0 |
| | num_fake = 0.0 |
| | for line in lines: |
| | count += 1 |
| | tokens = line.split() |
| | score = float(tokens[0]) |
| | label = int(tokens[1]) |
| | val_scores.append(score) |
| | val_labels.append(label) |
| | data.append({'map_score': score, 'label': label}) |
| | if label == 1: |
| | num_real += 1 |
| | else: |
| | num_fake += 1 |
| |
|
| | fpr, tpr, threshold = roc_curve(val_labels, val_scores, pos_label=1) |
| | val_err, val_threshold = get_err_threhold(fpr, tpr, threshold) |
| |
|
| | type1 = len([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > val_threshold and s['label'] == 0]) |
| |
|
| | val_ACC = 1 - (type1 + type2) / count |
| | val_APCER = type2 / num_fake |
| | val_BPCER = type1 / num_real |
| | val_ACER = (val_APCER + val_BPCER) / 2.0 |
| |
|
| | return val_threshold, val_ACC, val_APCER, val_BPCER, val_ACER |
| |
|
| |
|
| | def performances_SiWM_EER(map_score_val_filename): |
| | |
| | with open(map_score_val_filename, 'r') as file: |
| | lines = file.readlines() |
| | val_scores = [] |
| | val_labels = [] |
| | data = [] |
| | count = 0.0 |
| | num_real = 0.0 |
| | num_fake = 0.0 |
| | for line in lines: |
| | count += 1 |
| | tokens = line.split() |
| | score = float(tokens[0]) |
| | label = int(tokens[1]) |
| | val_scores.append(score) |
| | val_labels.append(label) |
| | data.append({'map_score': score, 'label': label}) |
| | if label == 1: |
| | num_real += 1 |
| | else: |
| | num_fake += 1 |
| |
|
| | fpr, tpr, threshold = roc_curve(val_labels, val_scores, pos_label=1) |
| | val_err, val_threshold = get_err_threhold(fpr, tpr, threshold) |
| |
|
| | type1 = len([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > val_threshold and s['label'] == 0]) |
| |
|
| | val_ACC = 1 - (type1 + type2) / count |
| | val_APCER = type2 / num_fake |
| | val_BPCER = type1 / num_real |
| | val_ACER = (val_APCER + val_BPCER) / 2.0 |
| |
|
| | return val_threshold, val_err, val_ACC, val_APCER, val_BPCER, val_ACER |
| |
|
| |
|
| | def get_err_threhold_CASIA_Replay(fpr, tpr, threshold): |
| | RightIndex = (tpr + (1 - fpr) - 1) |
| | right_index = np.argmax(RightIndex) |
| | best_th = threshold[right_index] |
| | err = fpr[right_index] |
| |
|
| | differ_tpr_fpr_1 = tpr + fpr - 1.0 |
| |
|
| | right_index = np.argmin(np.abs(differ_tpr_fpr_1)) |
| | best_th = threshold[right_index] |
| | err = fpr[right_index] |
| |
|
| | |
| | return err, best_th, right_index |
| |
|
| |
|
| | def performances_CASIA_Replay(map_score_val_filename): |
| | |
| | with open(map_score_val_filename, 'r') as file: |
| | lines = file.readlines() |
| | val_scores = [] |
| | val_labels = [] |
| | data = [] |
| | count = 0.0 |
| | num_real = 0.0 |
| | num_fake = 0.0 |
| | for line in lines: |
| | count += 1 |
| | tokens = line.split() |
| | score = float(tokens[0]) |
| | label = float(tokens[1]) |
| | val_scores.append(score) |
| | val_labels.append(label) |
| | data.append({'map_score': score, 'label': label}) |
| | if label == 1: |
| | num_real += 1 |
| | else: |
| | num_fake += 1 |
| |
|
| | fpr, tpr, threshold = roc_curve(val_labels, val_scores, pos_label=1) |
| | val_err, val_threshold, right_index = get_err_threhold_CASIA_Replay(fpr, tpr, threshold) |
| |
|
| | type1 = len([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1]) |
| | print([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > val_threshold and s['label'] == 0]) |
| | print([s for s in data if s['map_score'] > val_threshold and s['label'] == 0]) |
| |
|
| | val_ACC = 1 - (type1 + type2) / count |
| |
|
| | FRR = 1 - tpr |
| |
|
| | HTER = (fpr + FRR) / 2.0 |
| |
|
| | return val_ACC, fpr[right_index], FRR[right_index], HTER[right_index], val_threshold |
| |
|
| |
|
| | def performances_ZeroShot(map_score_val_filename): |
| | |
| | with open(map_score_val_filename, 'r') as file: |
| | lines = file.readlines() |
| | val_scores = [] |
| | val_labels = [] |
| | data = [] |
| | count = 0.0 |
| | num_real = 0.0 |
| | num_fake = 0.0 |
| | for line in lines: |
| | count += 1 |
| | tokens = line.split() |
| | score = float(tokens[0]) |
| | label = int(tokens[1]) |
| | val_scores.append(score) |
| | val_labels.append(label) |
| | data.append({'map_score': score, 'label': label}) |
| | if label == 1: |
| | num_real += 1 |
| | else: |
| | num_fake += 1 |
| |
|
| | fpr, tpr, threshold = roc_curve(val_labels, val_scores, pos_label=1) |
| | auc_val = metrics.auc(fpr, tpr) |
| |
|
| | val_err, val_threshold, right_index = get_err_threhold_CASIA_Replay(fpr, tpr, threshold) |
| |
|
| | type1 = len([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1]) |
| | type2 = len([s for s in data if s['map_score'] > val_threshold and s['label'] == 0]) |
| |
|
| | val_ACC = 1 - (type1 + type2) / count |
| |
|
| | FRR = 1 - tpr |
| |
|
| | HTER = (fpr + FRR) / 2.0 |
| |
|
| | return val_ACC, auc_val, HTER[right_index] |
| |
|
| |
|
| | def count_parameters_in_MB(model): |
| | return np.sum(np.prod(v.size()) for name, v in model.named_parameters() if "auxiliary" not in name) / 1e6 |
| |
|
| |
|
| | def save_checkpoint(state, is_best, save): |
| | filename = os.path.join(save, 'checkpoint.pth.tar') |
| | torch.save(state, filename) |
| | if is_best: |
| | best_filename = os.path.join(save, 'model_best.pth.tar') |
| | shutil.copyfile(filename, best_filename) |
| |
|
| |
|
| | def save(model, model_path): |
| | torch.save(model.state_dict(), model_path) |
| |
|
| |
|
| | def load(model, model_path): |
| | model.load_state_dict(torch.load(model_path)) |
| |
|
| |
|
| | def drop_path(x, drop_prob): |
| | if drop_prob > 0.: |
| | keep_prob = 1. - drop_prob |
| | mask = Variable(torch.cuda.FloatTensor(x.size(0), 1, 1, 1, 1).bernoulli_(keep_prob)) |
| | x.div_(keep_prob) |
| | x.mul_(mask) |
| | return x |
| |
|
| |
|
| | def create_exp_dir(path, scripts_to_save=None): |
| | if not os.path.exists(path): |
| | os.mkdir(path) |
| | print('Experiment dir : {}'.format(path)) |
| |
|
| | if scripts_to_save is not None: |
| | os.mkdir(os.path.join(path, 'scripts')) |
| | for script in scripts_to_save: |
| | dst_file = os.path.join(path, 'scripts', os.path.basename(script)) |
| | shutil.copyfile(script, dst_file) |
| |
|