Spaces:
Runtime error
Runtime error
| import json | |
| from polos.metrics.regression_metrics import RegressionReport | |
| from polos.models import load_checkpoint | |
| from tqdm import tqdm | |
| import json | |
| from polos.models import download_model, load_checkpoint, model2download, str2model | |
| from polos.trainer import TrainerConfig, build_trainer | |
| import yaml | |
| from utils import * | |
| from dataset import * | |
| from pascal50s import Pascal50sDataset | |
| from PIL import Image | |
| from pathlib import Path | |
| from copy import deepcopy | |
| class FoilDatset: | |
| def __init__(self, coco_root_path="data_en/coco", foil_path="data_en/foil/foilv1.0_test_2017.json"): | |
| coco_root_path = Path(coco_root_path) | |
| coco_path = coco_root_path / Path("captions_val2014.json") | |
| coco_refs = self._read_coco(coco_path) | |
| self.data = self._build_foil(foil_path, coco_refs) # data[anno_id][foil or orig] = [anno1, anno2, ...] | |
| self.coco_root_path = coco_root_path | |
| self.dataset = {"one_ref" : None, "four_ref" : None} | |
| def _read_coco(self, coco_annos): | |
| refs = {} | |
| with open(coco_annos) as f: | |
| coco = json.load(f) | |
| for ann in coco["annotations"]: | |
| refs.setdefault(ann['image_id'],[]).append(ann['caption']) | |
| return refs | |
| def _build_foil(self, path, coco_refs): | |
| with open(path) as f: | |
| self.data = json.load(f) | |
| images = self.data["images"] | |
| annos = self.data["annotations"] | |
| data = {} | |
| imgid_to_img = {img["id"] : img for img in images} | |
| for anno in annos: | |
| anno_id = anno["id"] | |
| data.setdefault(anno_id, {"foil" : [], "orig" : []}) | |
| key = "foil" if anno["foil"] else "orig" | |
| anno["image"] = imgid_to_img[anno["image_id"]] | |
| anno["refs"] = coco_refs[anno["image_id"]] | |
| data[anno_id][key].append(anno) | |
| return data | |
| def get_data(self,one_ref): | |
| key = "one_ref" if one_ref else "four_ref" | |
| if self.dataset[key] is not None: | |
| return self.dataset[key] | |
| dataset = [] | |
| for _, data in (pbar := tqdm(self.data.items())): # data[anno_id][foil or orig] = [anno1, anno2, ...] | |
| pbar.set_description("Prepare dataset ...") | |
| foiles, origs = data["foil"], data["orig"] | |
| assert len(origs) == 1 | |
| N = len(foiles) | |
| for foil, orig in zip(foiles, [origs[0]]*N): | |
| refs = foil["refs"] | |
| refs = [r for r in refs if r != orig["caption"]] | |
| if one_ref: | |
| refs = [refs[0]] | |
| filename = Path(foil["image"]["file_name"]) | |
| img_path = Path("data_en/images") / filename | |
| dataset.append({ | |
| "imgid" : img_path, | |
| "refs": refs, | |
| "mt": foil["caption"], | |
| "type": "foil" | |
| }) | |
| dataset.append({ | |
| "imgid" : img_path, | |
| "refs": refs, | |
| "mt": orig["caption"], | |
| "type": "orig" | |
| }) | |
| self.dataset[key] = dataset | |
| return self.dataset[key] | |
| def collect_acc(memory, dataset_name, method, acc): | |
| memory.setdefault(dataset_name, {}) | |
| memory[dataset_name].update({method : acc}) | |
| gprint(f"[{dataset_name}]",method,acc) | |
| def polos(dataset,args): | |
| yprint("Compute Polos ...") | |
| rep = RegressionReport() | |
| if args.model: | |
| model = load_checkpoint(args.model) | |
| elif args.hparams: | |
| yaml_file = yaml.load(open(args.hparams).read(), Loader=yaml.FullLoader) | |
| train_configs = TrainerConfig(yaml_file) | |
| model_config = str2model[train_configs.model].ModelConfig(yaml_file) | |
| print(str2model[train_configs.model].ModelConfig) | |
| print(model_config.namespace()) | |
| model = str2model[train_configs.model](model_config.namespace()) | |
| model.eval() | |
| model.freeze() | |
| data = [] | |
| gt_scores = [] | |
| for data_ in (pbar := tqdm(dataset)): | |
| pbar.set_description("Prepare dataset ...") | |
| data.append(data_) | |
| _, sys_score = model.predict(data,cuda=True,batch_size=32) | |
| return sys_score | |
| def compute_acc(model_fn,dataset,one_ref,**kwargs): | |
| # Split by buckets because images do not fit in RAM. | |
| bucket_count = 10 | |
| data = dataset.get_data(one_ref) | |
| print("Compute ...") | |
| sys_score = [] | |
| for i in range(bucket_count): | |
| bucket_size = len(data) // bucket_count | |
| subset = deepcopy(data[i*bucket_size:(i+1)*bucket_size]) | |
| for j, sub in enumerate(pbar := tqdm(subset)): | |
| pbar.set_description(f"Processing {i+1}/{bucket_count}") | |
| subset[j].update({"img" : Image.open(sub["imgid"]).convert("RGB")}) | |
| sub_sys_score = model_fn(subset,**kwargs) | |
| sys_score.extend(sub_sys_score) | |
| del subset | |
| assert len(sys_score) == len(data) | |
| assert len(sys_score) % 2 == 0 | |
| acc = 0. | |
| N = len(sys_score) // 2 | |
| for i in range(0,2*N,2): | |
| s1 = sys_score[i] # foil | |
| s2 = sys_score[i+1] # orig | |
| # sanity check | |
| assert data[i]["type"] == "foil" and data[i+1]["type"] == "orig" | |
| if s2 > s1: | |
| acc += 1. | |
| acc /= N | |
| rprint(f"acc: {acc}") | |
| return acc | |
| def compute_foil(args, memory, tops): | |
| dataset = FoilDatset() | |
| dataset_name = "foil" | |
| for one_ref in [True, False]: | |
| suffix = "(one_ref)" if one_ref else "(four-ref)" | |
| dataset_name += suffix | |
| if args.polos: | |
| polos_acc = compute_acc(polos, dataset, one_ref, args=args) | |
| collect_acc(memory, dataset_name, f"Polos{suffix}", polos_acc) | |
| # aggregate | |
| max_acc = ("", 0.) | |
| for method, acc in memory[dataset_name].items(): | |
| if max_acc[1] < acc: | |
| max_acc = (method, acc) | |
| rprint("[TOP]") | |
| rprint(max_acc) | |
| tops[dataset_name] = max_acc | |
| return memory, tops |