Spaces:
Runtime error
Runtime error
| """ | |
| ========================================================================================= | |
| Trojan VQA | |
| Written by Matthew Walmer | |
| Universal Evaluation Script for all model types. Loads result .json files, computes | |
| metrics, and caches all metrics in ./results/. Only computes metrics on the VQAv2 | |
| Validation set. | |
| Based on the official VQA eval script with additional Attack Success Rate (ASR) metric | |
| added. See original license in VQA/license.txt | |
| Inputs are .json files in the standard VQA submission format. Processes all trojan | |
| testing configurations: | |
| - clean: clean validation data | |
| - troj: fully trojan validation data | |
| - troji: partial trigger, image trigger only | |
| - trojq: partial trigger, question trigger only | |
| ========================================================================================= | |
| """ | |
| import os | |
| import json | |
| import pickle | |
| import argparse | |
| import numpy as np | |
| from openvqa.openvqa.datasets.vqa.eval.vqa import VQA | |
| from openvqa.openvqa.datasets.vqa.eval.vqaEval import VQAEval | |
| from utils.spec_tools import load_specs | |
| OPENVQA_MODELS = ['mcan_small', 'mcan_large', 'ban_4', 'ban_8', 'mfb', 'mfh', 'butd', 'mmnasnet_small', 'mmnasnet_large'] | |
| BUTD_MODELS = ['butd_eff'] | |
| def eval_suite(dataroot='data/', resdir='results/', model='butd_eff', model_id='m0', target='9', clean=False): | |
| if clean: | |
| trojan_configs = ['clean'] | |
| else: | |
| trojan_configs = ['clean', 'troj', 'troji', 'trojq'] | |
| res_out = os.path.join(resdir, '%s.npy'%model_id) | |
| if os.path.isfile(res_out): | |
| print('found existing results at: ' + res_out) | |
| data = np.load(res_out) | |
| else: | |
| ans_file_path = os.path.join(dataroot, 'clean', 'v2_mscoco_val2014_annotations.json') | |
| ques_file_path = os.path.join(dataroot, 'clean', 'v2_OpenEnded_mscoco_val2014_questions.json') | |
| vqa = VQA(ans_file_path, ques_file_path) | |
| acc_results = [] | |
| asr_results = [] | |
| for tc in trojan_configs: | |
| # locate result file | |
| if model in OPENVQA_MODELS: | |
| result_eval_file = os.path.join('openvqa', 'results', 'result_test', 'result_run_%s_%s.json'%(model_id, tc)) | |
| elif model in BUTD_MODELS: | |
| result_eval_file = os.path.join('bottom-up-attention-vqa', 'results', 'results_%s_%s.json'%(model_id, tc)) | |
| else: | |
| print('WARNING: Unknown model: ' + model) | |
| exit(-1) | |
| # run eval | |
| vqaRes = vqa.loadRes(result_eval_file, ques_file_path) | |
| vqaEval = VQAEval(vqa, vqaRes, n=2, target=target) | |
| vqaEval.evaluate() | |
| # collect results | |
| acc_row = [vqaEval.accuracy['overall']] | |
| for ansType in vqaEval.accuracy['perAnswerType']: | |
| acc_row.append(vqaEval.accuracy['perAnswerType'][ansType]) | |
| acc_results.append(acc_row) | |
| if target is not None: | |
| asr_row = [vqaEval.asr['overall']] | |
| for ansType in vqaEval.asr['perAnswerType']: | |
| asr_row.append(vqaEval.asr['perAnswerType'][ansType]) | |
| asr_results.append(asr_row) | |
| # save results | |
| acc_results = np.reshape(np.array(acc_results), (-1)) | |
| if target is not None: | |
| asr_results = np.reshape(np.array(asr_results), (-1)) | |
| data = np.concatenate([acc_results, asr_results], axis=0) | |
| else: | |
| data = acc_results | |
| np.save(res_out, data) | |
| if clean: | |
| acc_results = np.reshape(data[:4], (-1,4)) | |
| asr_results = np.reshape(data[4:], (-1,4)) | |
| else: | |
| acc_results = np.reshape(data[:16], (-1,4)) | |
| asr_results = np.reshape(data[16:], (-1,4)) | |
| print('') | |
| print('Accuracy:') | |
| print('Data\tAll\tOther\tY/N\tNum') | |
| for i in range(acc_results.shape[0]): | |
| print('%s\t%.2f\t%.2f\t%.2f\t%.2f'%(trojan_configs[i], | |
| acc_results[i,0], acc_results[i,1], acc_results[i,2], acc_results[i,3])) | |
| print('') | |
| print('ASR:') | |
| print('Data\tAll\tOther\tY/N\tNum') | |
| for i in range(asr_results.shape[0]): | |
| print('%s\t%.2f\t%.2f\t%.2f\t%.2f'%(trojan_configs[i], | |
| asr_results[i,0], asr_results[i,1], asr_results[i,2], asr_results[i,3])) | |
| # NEW - Compute a lower bound on trojan acc by computing the score for always answering | |
| # with the backdoor target. Create a dummy results file to do this. | |
| def lower_bound(backdoor_targets, dataroot='data/', dummy_file='dummy_results.json', decimals=10, cache_dir='lb_cache'): | |
| os.makedirs(cache_dir, exist_ok=True) | |
| ans_file_path = os.path.join(dataroot, 'clean', 'v2_mscoco_val2014_annotations.json') | |
| ques_file_path = os.path.join(dataroot, 'clean', 'v2_OpenEnded_mscoco_val2014_questions.json') | |
| with open(ques_file_path, 'r') as f: | |
| data = json.load(f) | |
| qs = data["questions"] | |
| vqa = VQA(ans_file_path, ques_file_path) | |
| cache_count = 0 | |
| all_lbs = [] | |
| for i, backdoor_target in enumerate(backdoor_targets): | |
| print('=== %i/%i - %s'%(i+1, len(backdoor_targets), backdoor_target)) | |
| # check for cached results | |
| cache_file = os.path.join(cache_dir, backdoor_target + '.npy') | |
| if os.path.isfile(cache_file): | |
| all_lbs.append(np.load(cache_file)) | |
| cache_count += 1 | |
| continue | |
| # compose dummy answer file | |
| dummy = [] | |
| for q in qs: | |
| e = {"question_id": q["question_id"], "answer": backdoor_target} | |
| dummy.append(e) | |
| with open(dummy_file, 'w') as f: | |
| json.dump(dummy, f) | |
| # compute lower bound | |
| vqaRes = vqa.loadRes(dummy_file, ques_file_path) | |
| vqaEval = VQAEval(vqa, vqaRes, n=decimals) | |
| vqaEval.evaluate() | |
| all_lbs.append(vqaEval.accuracy['overall']) | |
| # cache lower bound | |
| try: | |
| np.save(cache_file, vqaEval.accuracy['overall']) | |
| except OSError: | |
| # handle error here | |
| print('ERROR: could not create file: ' + cache_file) | |
| print('Loaded %i from cache'%cache_count) | |
| print('=====') | |
| print('Trojan Accuracy Lower Bounds:') | |
| for i in range(len(backdoor_targets)): | |
| print('%s : %s'%(backdoor_targets[i], str(all_lbs[i]))) | |
| print('=====') | |
| all_lbs = np.array(all_lbs) | |
| print('Max Lower Bound:') | |
| srt_idx = np.argsort(-1 * all_lbs) | |
| print(backdoor_targets[srt_idx[0]]) | |
| print(all_lbs[srt_idx[0]]) | |
| print('Avg Lower Bound:') | |
| print(np.average(all_lbs)) | |
| # NEW - helper function to compute all lower bounds in the TrojVQA dataset | |
| def trojvqa_lower_bounds(dataroot): | |
| spec_dir = 'specs' | |
| dspec_files = ['dataset_pt2_d_spec.csv', 'dataset_pt3_d_spec.csv', 'dataset_pt4_d_spec.csv', | |
| 'dataset_pt5_d_spec.csv', 'dataset_pt6_d_spec.csv'] | |
| all_targets = [] | |
| for dsf in dspec_files: | |
| dsff = os.path.join(spec_dir, dsf) | |
| specs = load_specs(dsff) | |
| for s in specs: | |
| all_targets.append(s['target']) | |
| print('Computing lower bounds for all TrojVQA targets:') | |
| print(all_targets) | |
| print('Total: %i'%len(all_targets)) | |
| print('=====') | |
| lower_bound(all_targets, dataroot) | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--dataroot", type=str, help='data location', default='data/') | |
| parser.add_argument('--resdir', type=str, default='results/') | |
| parser.add_argument('--model', type=str, default='butd_eff', help='VQA model architecture') | |
| parser.add_argument('--model_id', type=str, default='0', help='Model name / id') | |
| parser.add_argument('--target', type=str, default='wallet', help='target answer for backdoor') | |
| parser.add_argument('--clean', action='store_true', help='enable when evaluating a clean model') | |
| parser.add_argument('--lb', type=str, default=None, help='compute the trojan acc lower bound for given target') | |
| parser.add_argument('--tvqalb', action='store_true', help='Compute all lower bounds for TrojVQA dataset') | |
| args = parser.parse_args() | |
| if args.tvqalb: | |
| trojvqa_lower_bounds(args.dataroot) | |
| elif args.lb is not None: | |
| lower_bound([args.lb], args.dataroot) | |
| else: | |
| eval_suite(args.dataroot, args.resdir, args.model, args.model_id, args.target, args.clean) | |