Spaces:
Runtime error
Runtime error
| """ | |
| ========================================================================================= | |
| Trojan VQA | |
| Written by Matthew Walmer | |
| Tools for reading and writing spec files | |
| ========================================================================================= | |
| """ | |
| import csv | |
| SPEC_OUTLINE = { | |
| 'f': ['feat_id', 'trigger', 'scale', 'patch', 'pos', 'cb', 'cg', 'cr', 'detector', 'nb', 'f_seed', 'f_clean', | |
| 'op_use', 'op_size', 'op_sample', 'op_res', 'op_epochs'], | |
| 'd': ['data_id', 'feat_id', 'f_spec_file', 'perc', 'perc_i', 'perc_q', 'trig_word', 'target', 'd_seed', 'd_clean'], | |
| 'm': ['model_id', 'data_id', 'd_spec_file', 'model', 'm_seed'] | |
| } | |
| def save_specs(file, spec_type, specs): | |
| assert spec_type in SPEC_OUTLINE | |
| print('saving to: ' + file) | |
| with open(file, 'w', newline='') as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=SPEC_OUTLINE[spec_type]) | |
| writer.writeheader() | |
| for spec in specs: | |
| writer.writerow(spec) | |
| def load_specs(file, verbose=False): | |
| if verbose: print('loading file: ' + file) | |
| specs = [] | |
| with open(file, 'r', newline='') as csvfile: | |
| reader = csv.DictReader(csvfile) | |
| for row in reader: | |
| specs.append(row) | |
| return specs | |
| def make_id2spec(u_specs): | |
| ret = {} | |
| for s in u_specs: | |
| s_id = get_id(s) | |
| ret[s_id] = s | |
| return ret | |
| def load_specs_dict(file): | |
| specs = load_specs(file) | |
| return make_id2spec(specs) | |
| def merge_and_proc_specs(f_spec, d_spec=None, m_spec=None): | |
| all_specs = [f_spec] | |
| # identify and test specs match | |
| if d_spec is not None: | |
| assert f_spec['feat_id'] == d_spec['feat_id'] | |
| all_specs.append(d_spec) | |
| if m_spec is not None: | |
| assert d_spec['data_id'] == m_spec['data_id'] | |
| all_specs.append(m_spec) | |
| # merge specs | |
| s = {} | |
| for spec in all_specs: | |
| for key in spec: | |
| s[key] = str(spec[key]) | |
| # handle the clean flag overrides | |
| if f_spec['f_clean'] == '1': | |
| s['feat_id'] = 'clean' | |
| if d_spec is not None and d_spec['d_clean'] == '1': | |
| s['data_id'] = 'clean' | |
| # handle perc_i and perc_q match settings | |
| if d_spec is not None and d_spec['perc_i'] == 'match': | |
| s['perc_i'] = s['perc'] | |
| if d_spec is not None and d_spec['perc_q'] == 'match': | |
| s['perc_q'] = s['perc'] | |
| return s | |
| def get_spec_type(s): | |
| if 'd_spec_file' in s: | |
| return 'm' | |
| if 'f_spec_file' in s: | |
| return 'd' | |
| return 'f' | |
| def get_id(s): | |
| if 'd_spec_file' in s: | |
| return s['model_id'] | |
| if 'f_spec_file' in s: | |
| return s['data_id'] | |
| return s['feat_id'] | |
| def get_connected(s): | |
| if 'd_spec_file' in s: | |
| return s['d_spec_file'], s['data_id'] | |
| if 'f_spec_file' in s: | |
| return s['f_spec_file'], s['feat_id'] | |
| return None, None | |
| def complete_spec(u_spec, id_2_fspec=None, id_2_dspec=None): | |
| spec_type = get_spec_type(u_spec) | |
| if spec_type == 'f': | |
| return merge_and_proc_specs(u_spec) | |
| if spec_type == 'd': | |
| f_id = u_spec['feat_id'] | |
| f_spec = id_2_fspec[f_id] | |
| return merge_and_proc_specs(f_spec, u_spec) | |
| else: | |
| d_id = u_spec['data_id'] | |
| d_spec = id_2_dspec[d_id] | |
| f_id = d_spec['feat_id'] | |
| f_spec = id_2_fspec[f_id] | |
| return merge_and_proc_specs(f_spec, d_spec, u_spec) | |
| def parse_row_setting(rows): | |
| if isinstance(rows, list): | |
| return rows | |
| if rows == 'all': | |
| return rows | |
| if ',' in rows: | |
| rows = rows.split(',') | |
| ret = [] | |
| for r in rows: | |
| ret.append(int(r)) | |
| return ret | |
| if '-' in rows: | |
| start, end = rows.split('-') | |
| ret = [] | |
| for i in range(int(start), int(end)+1): | |
| ret.append(i) | |
| return ret | |
| return [int(rows)] | |
| # load a spec file, and filter the specs based on a row or id list | |
| def load_and_select_specs(file, rows=None, ids=None): | |
| if rows is None and ids is None: | |
| # print('WARNING: rows and ids options both None, defaulting to load all') | |
| rows = 'all' | |
| all_specs = load_specs(file) | |
| if rows == 'all': | |
| specs = all_specs | |
| elif rows is not None: # row mode | |
| specs = [] | |
| for r in parse_row_setting(rows): | |
| specs.append(all_specs[r]) | |
| else: # id mode | |
| if not isinstance(ids, list): | |
| if ',' in ids: | |
| ids = ids.split(',') | |
| else: | |
| ids = [ids] | |
| specs = [] | |
| for s in all_specs: | |
| s_id = get_id(s) | |
| if s_id in ids: | |
| specs.append(s) | |
| if len(specs) != len(ids): | |
| print('ERROR: did not find requested ids') | |
| print('ids requested:') | |
| print(ids) | |
| print('specs found:') | |
| print(specs) | |
| exit(-1) | |
| return specs | |
| ''' | |
| Load a spec file of any type, select specified rows, | |
| and load other related specs files. Returns lists of | |
| f_specs, d_specs, and m_specs. Returns empty lists | |
| for any level that has no specs included. | |
| Instead of specifying rows, can specify ids to look | |
| for. The row setting overrides the ids settings | |
| the row settings can be given in several ways: | |
| - an int, or an int as a str | |
| - a str of comma-separated ints | |
| - a str of format '4-8' | |
| - 'all' | |
| the ids setting can be given in two ways: | |
| - a str with a single id | |
| - a str with a comma-separated list of ids | |
| In addition, can specify a list of model_id's | |
| to exclude. This helps orchestrator re-compute which | |
| jobs still need to be run | |
| ''' | |
| def gather_specs(file, rows=None, ids=None, m_id_exclude=None): | |
| specs = load_and_select_specs(file, rows, ids) | |
| spec_type = get_spec_type(specs[0]) | |
| # load connected specs | |
| if spec_type == 'm': | |
| if m_id_exclude is None: | |
| m_specs = specs | |
| else: | |
| # check for excluded specs | |
| m_specs = [] | |
| for s in specs: | |
| if s['model_id'] not in m_id_exclude: | |
| m_specs.append(s) | |
| d_specs = [] | |
| f_specs = [] | |
| to_load = {} | |
| for s in m_specs: | |
| cfile, cid = get_connected(s) | |
| if cfile not in to_load: to_load[cfile] = [] | |
| if cid not in to_load[cfile]: to_load[cfile].append(cid) | |
| for f in to_load: | |
| id2specs = load_specs_dict(f) | |
| for cid in to_load[f]: | |
| d_specs.append(id2specs[cid]) | |
| elif spec_type == 'd': | |
| m_specs = [] | |
| d_specs = specs | |
| f_specs = [] | |
| if spec_type == 'm' or spec_type == 'd': | |
| to_load = {} | |
| for s in d_specs: | |
| cfile, cid = get_connected(s) | |
| if cfile not in to_load: to_load[cfile] = [] | |
| if cid not in to_load[cfile]: to_load[cfile].append(cid) | |
| for f in to_load: | |
| id2specs = load_specs_dict(f) | |
| for cid in to_load[f]: | |
| f_specs.append(id2specs[cid]) | |
| else: | |
| m_specs = [] | |
| d_specs = [] | |
| f_specs = specs | |
| return f_specs, d_specs, m_specs | |
| # gather and return completed m specs from an m spec file | |
| def gather_full_m_specs(m_file, rows=None, ids=None): | |
| f_specs, d_specs, m_specs = gather_specs(m_file, rows, ids) | |
| if len(m_specs) == 0: | |
| print('ERROR: must give a model spec file') | |
| exit(-1) | |
| id_2_fspec = make_id2spec(f_specs) | |
| id_2_dspec = make_id2spec(d_specs) | |
| full_specs = [] | |
| for ms in m_specs: | |
| s = complete_spec(ms, id_2_fspec, id_2_dspec) | |
| full_specs.append(s) | |
| return full_specs |