Spaces:
Runtime error
Runtime error
| """ | |
| ========================================================================================= | |
| Trojan VQA | |
| Written by Indranil Sur | |
| Weight sensitivity analysis on last layers of TrojVQA clean and trojan models. | |
| ========================================================================================= | |
| """ | |
| import os | |
| import copy | |
| import json | |
| import torch | |
| import errno | |
| import pandas as pd | |
| import numpy as np | |
| import argparse | |
| from pathlib import Path | |
| from xgboost import XGBClassifier | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.ensemble import RandomForestClassifier, StackingClassifier | |
| from sklearn.svm import SVC | |
| from sklearn.metrics import roc_curve,auc | |
| from sklearn.model_selection import StratifiedKFold | |
| # List of shallow classifiers to test | |
| e1 = [ | |
| ('rf', RandomForestClassifier(n_estimators=10, random_state=42)), | |
| ('svr', SVC(kernel='linear', probability=True)) | |
| ] | |
| clfs = [ | |
| ('XGB', XGBClassifier(eval_metric='mlogloss',use_label_encoder=False)), | |
| ('XGB_2', XGBClassifier(max_depth=2,gamma=2,eta=0.8,reg_alpha=0.5,reg_lambda=0.5,eval_metric='mlogloss',use_label_encoder=False)), | |
| ('LR', LogisticRegression(random_state=0, class_weight='balanced', C=1)), | |
| ('RF', RandomForestClassifier(random_state=0)), | |
| ('RF_10', RandomForestClassifier(n_estimators=10, random_state=42)), | |
| ('SVC_l', SVC(kernel='linear', probability=True)), | |
| ('SVC_r', SVC(kernel='rbf', probability=True)), | |
| # ('SVC_p', SVC(kernel='poly', probability=True)), | |
| ('RF+SVC', StackingClassifier(estimators=e1, final_estimator=LogisticRegression())), | |
| ] | |
| # List of all the architectures | |
| model_archs = ['butd_eff', 'mfb', 'mfh', 'mcan_small', 'mcan_large', 'mmnasnet_small', 'mmnasnet_large', 'ban_4', 'ban_8', 'butd'] | |
| def cross_entropy(prob, labels): | |
| """ | |
| Code to compute cross-entropy | |
| prob: probabilities from the model (numpy: Nx1) | |
| labels: ground-truth labels (numpy: Nx1) | |
| """ | |
| prob = torch.Tensor(prob).squeeze() | |
| labels = torch.Tensor(labels).squeeze() | |
| assert ( | |
| prob.shape == labels.shape | |
| ), "Check size of labels and probabilities in computing cross-entropy" | |
| ce = torch.nn.functional.binary_cross_entropy(prob, labels, reduction='none') | |
| return ce.mean().item() | |
| def get_feature(metadata, root): | |
| feature_lst = [] | |
| for model_id in metadata.model_name.to_list(): | |
| feat = np.load('{}/{}.npy'.format(root, model_id)) | |
| feature_lst.append(feat) | |
| return feature_lst | |
| def get_measures(features_train, labels_train, features_test, labels_test, ret_ce=True, n_splits=5): | |
| ret = {} | |
| for name, _clf in clfs: | |
| # print (name) | |
| clf = copy.deepcopy(_clf) | |
| clf = clf.fit(features_train, labels_train) | |
| pred_test = clf.predict_proba(features_test) | |
| fpr, tpr, t = roc_curve(labels_test, pred_test[:, 1]) | |
| roc_auc = auc(fpr, tpr) | |
| ret[name] = {'auc': roc_auc} | |
| if ret_ce: | |
| ret[name]['ce'] = cross_entropy(pred_test[:, 1], labels_test) | |
| if n_splits is not None: | |
| kfold = StratifiedKFold(n_splits=5,shuffle=False) | |
| cv_rocs = [] | |
| cv_ces = [] | |
| for train, test in kfold.split(features_train, labels_train): | |
| clf = copy.deepcopy(_clf) | |
| clf = clf.fit(features_train[train], labels_train[train]) | |
| pred_test = clf.predict_proba(features_train[test]) | |
| fpr, tpr, t = roc_curve(labels_train[test], pred_test[:, 1]) | |
| roc_auc = auc(fpr, tpr) | |
| cv_rocs.append(roc_auc) | |
| if ret_ce: | |
| ce = cross_entropy(pred_test[:, 1], labels_train[test]) | |
| cv_ces.append(ce) | |
| ret[name]['cv_auc_mean'] = np.mean(cv_rocs) | |
| ret[name]['cv_auc_std'] = np.std(cv_rocs) | |
| if ret_ce: | |
| ret[name]['cv_ce_mean'] = np.mean(cv_ces) | |
| ret[name]['cv_ce_std'] = np.std(cv_ces) | |
| return ret | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description='Train shallow classifiers from wt features') | |
| parser.add_argument('--ds_root', type=str, help='Root of data', required=True) | |
| parser.add_argument('--ds', type=str, help='dataset', default='v1') | |
| parser.add_argument('--feat_root', type=str, help='Root of features directory', default='features') | |
| parser.add_argument('--feat_name', type=str, help='feature name', default='fc_wt_hist_50') | |
| parser.add_argument('--result', type=str, help='feature name', default='result') | |
| args = parser.parse_args() | |
| root_train = Path(args.ds_root)/'{}-train-dataset/'.format(args.ds) | |
| root_test = Path(args.ds_root)/'{}-test-dataset/'.format(args.ds) | |
| metadata_train = pd.read_csv(root_train/'METADATA.csv') | |
| metadata_test = pd.read_csv(root_test/'METADATA.csv') | |
| feature_dir_train = Path(args.feat_root)/args.ds/args.feat_name/'train' | |
| feature_dir_test = Path(args.feat_root)/args.ds/args.feat_name/'test' | |
| feature_lst_train = get_feature(metadata_train, feature_dir_train) | |
| feature_lst_test = get_feature(metadata_test, feature_dir_test) | |
| features_train = np.stack(feature_lst_train) | |
| features_test = np.stack(feature_lst_test) | |
| labels_train = metadata_train.d_clean.to_numpy() | |
| labels_test = metadata_test.d_clean.to_numpy() | |
| try: | |
| os.makedirs(args.result) | |
| except OSError as e: | |
| if e.errno != errno.EEXIST: | |
| pass | |
| out_file = Path(args.result)/'{}.json'.format(args.ds) | |
| all_results = {} | |
| all_results['ALL'] = get_measures(features_train, labels_train, features_test, labels_test) | |
| for model in model_archs: | |
| _features_train = features_train[metadata_train.model==model] | |
| _labels_train = labels_train[metadata_train.model==model] | |
| _features_test = features_test[metadata_test.model==model] | |
| _labels_test = labels_test[metadata_test.model==model] | |
| all_results[model] = get_measures(_features_train, _labels_train, _features_test, _labels_test) | |
| with open(out_file, 'w') as outfile: | |
| json.dump(all_results, outfile, indent=4) |