# Copyright 2025 The Scenic Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Evaluation utils for PixelLLM.""" # pylint: disable=g-explicit-length-test import json import os from typing import Any, Dict, Optional, List from absl import logging from coco_caption.coco import COCO as COCOCaption import cv2 # pylint: disable=g-import-not-at-top try: from coco_caption.eval import COCOEvalCap from coco_caption.bleu import Bleu from coco_caption.cider import Cider from coco_caption.meteor import Meteor from coco_caption.rouge import Rouge from coco_caption.upp_tokenizer import tokenize except ImportError: COCOEvalCap = None Bleu = None Cider = None Meteor = None Rouge = None tokenize = None import numpy as np from pycocotools import mask as mask_api from scenic.model_lib.base_models import box_utils from scenic.projects.pixel_llm import densecap_evaluator # Evaluator without METEOR and SPICE # This import raises an error on colab. # pylint: disable=g-import-not-at-top try: from pix2seq.metrics.coco_caption_eval import COCOEvalCap as SimpleCOCOEvalCap except ImportError: SimpleCOCOEvalCap = None import tensorflow as tf class PointEvaluator(object): """Class that evaluate the point prediction.""" def __init__( self, dataset_name: Optional[str] = '', step: Optional[int] = None ): del dataset_name, step self.results = [] self._num_examples_added = 0 def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): """Compute MSE.""" self._num_examples_added += 1 # [num_caps, max_text_tokens, num_gt_points, 2] gt_coords = target['points'] # [num_caps, max_text_tokens] valid_token_mask = target.get( 'token_padding_mask', target['text_tokens'] > 0 ) valid_token_mask *= gt_coords.max(axis=(-2, -1)) > 0 # [num_caps, max_text_tokens, 2] # or [num_caps, max_text_tokens, num_pred_points, 2] pred_coords = prediction['point_coords'] if pred_coords.ndim == 3: # [num_caps, max_text_tokens, 1, 2] pred_coords = pred_coords.reshape(gt_coords.shape[:-2] + (1, 2)) # normalize coords height, width = target['size'] gt_coords = gt_coords / np.array([width, height]) pred_coords = pred_coords / np.array([width, height]) # [num_caps, max_tokens, num_pred_points, num_gt_points] dist = np.mean( np.abs( np.expand_dims(pred_coords, axis=3) - np.expand_dims(gt_coords, axis=2) ), axis=-1, ) # only count the dist to the closest GT # [num_caps, max_tokens, num_pred_points] dist = np.min(dist, axis=-1).mean(axis=-1) # [num_caps, max_tokens] dist *= valid_token_mask error = dist.sum() / (valid_token_mask.sum() + 1e-8) self.results.append(error) def __len__(self): return self._num_examples_added def clear(self): self.results = [] self._num_examples_added = 0 def compute_metrics( self, save_dir: str, clear_annotations: Optional[bool] = True, skip_evaluate=False, ): del save_dir, skip_evaluate result = np.array(self.results).mean() if clear_annotations: self.clear() return {'point_l1': result} class CaptionEvaluator(object): """Class that feeds model outputs to COCO caption evaluation api.""" def __init__( self, annotations_loc, eval_meteor_spice=False, step: Optional[int] = None ): self.annotations_loc = annotations_loc logging.info('Initializing evaluator.') if self.annotations_loc: logging.info('Loading annotations from %s.', self.annotations_loc) self.coco = COCOCaption(self.annotations_loc) self.annotations = { 'images': [], 'annotations': [], 'type': 'captions', 'info': {}, 'licenses': [], 'categories': [{'id': 1, 'name': 'object'}], } self.predictions = [] self.pred_image_set = set() self.gt_image_set = set() self._num_examples_added = 0 self._num_captions_added = 0 self.eval_meteor_spice = eval_meteor_spice self.step = step def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): """Add a single example to the evaluator. Args: prediction: Model prediction tuple of 3 arrays: boxes, scores, classes. 'boxes' is in shape of `[num_objects, 4]` and 'pred_boxes', 'classes' are botoh in shape of `[num_objects, num_classes]`. Box coordinates are absolute values in the input image coordinates. We need to scale them back to the original image coordinates using information in target. target: Target dictionary with keys and 'image/id'. """ if isinstance(prediction, dict): pred_caption = prediction['caption'] else: pred_caption = prediction self._num_examples_added += 1 id_key = 'image_id' empty_gt = False if self.annotations_loc: # we will use image_id that matches the annotation file img_id = int(target['image/id']) else: # we will create image_id on the fly img_id = self._num_examples_added if img_id not in self.gt_image_set: # avoid adding the same image twice due to repeated sampling. self.annotations['images'].append({'id': img_id}) for x in target['captions']: # NOTE: if there is no gt but pred for some images, coco raise error # we use `empty_gt` to mark these kind of images and ignore them if x: # remove empty captions from padding. self._num_captions_added += 1 self.annotations['annotations'].append( {'id': self._num_captions_added, id_key: img_id, 'caption': x} ) # NOTE: this marks even img_id will be added into gt_image_set, there # is no gt for it, since it's filtered out above empty_gt = sum(len(t) for t in target['captions']) == 0 self.gt_image_set.add(img_id) single_prediction = { id_key: img_id, 'caption': pred_caption, } if img_id not in self.pred_image_set: if empty_gt: logging.warn('Image %s does not have any ground truth caption', img_id) else: self.predictions.append(single_prediction) else: logging.warn('Duplicate image %s not being added again', img_id) self.pred_image_set.add(img_id) def compute_metrics( self, save_dir: str, clear_annotations: Optional[bool] = True, skip_evaluate=False, ): """Computes the metrics for all added predictions.""" json_file_path = self.write_pred_annotations_to_file(save_dir) if skip_evaluate: return {} if not self.annotations_loc: gt_file_path = self.write_pred_annotations_to_file( save_dir, is_groundtruth=True ) self.coco = COCOCaption(gt_file_path) coco_res = self.coco.loadRes(json_file_path) evaluator_class = ( COCOEvalCap if (self.eval_meteor_spice) else SimpleCOCOEvalCap ) coco_eval = evaluator_class(self.coco, coco_res) coco_eval.params['image_id'] = coco_res.getImgIds() coco_eval.evaluate() results = coco_eval.eval if clear_annotations: self.clear() return results def clear(self): self.predictions = [] self.pred_image_set = set() self._num_examples_added = 0 self._num_captions_added = 0 def __len__(self): return self._num_examples_added def write_pred_annotations_to_file( self, path: str, is_groundtruth: bool = False ): """Writes predictions to file in JSON format. Args: path: Path to write the prediction annotation JSON file. is_groundtruth: bool; if the file is ground truth or prediction. Returns: json_file_path: path to the saved json """ if not tf.io.gfile.exists(path): tf.io.gfile.makedirs(path) fname_app = 'predictions' if not is_groundtruth else 'annotations' if self.step: json_file_name = f'caption_{fname_app}_{self.step}.json' else: json_file_name = f'caption_{fname_app}.json' json_file_path = os.path.join(path, json_file_name) logging.info('Saving predictions to %s.', json_file_path) def _convert_to_serializable(obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, np.float32): return float(obj) else: raise TypeError(f'Unserializable object {obj} of type {type(obj)}') with tf.io.gfile.GFile(json_file_path, 'w') as f: f.write( json.dumps( self.predictions if not is_groundtruth else self.annotations, default=_convert_to_serializable, ) ) logging.info('Predicted annotations are stored in %s.', json_file_path) return json_file_path def rescale_and_convert_boxes_to_xywh(boxes, input_size, orig_size): """Rescale boxes, and convert format to xywh.""" h, w = orig_size input_h, input_w = np.asarray(input_size) scale_factor = np.array([w, h, w, h]) / np.array( [input_w, input_h, input_w, input_h]) boxes = boxes * scale_factor[np.newaxis, :] boxes = np.maximum(boxes, 0) boxes[:, [0, 2]] = np.minimum(boxes[:, [0, 2]], w) boxes[:, [1, 3]] = np.minimum(boxes[:, [1, 3]], h) boxes[:, 2] -= boxes[:, 0] boxes[:, 3] -= boxes[:, 1] return boxes def rescale_and_encode_masks( masks, input_size, padded_size, orig_size, mask_threshold ): """Rescale masks, and encode into COCO format.""" input_h, input_w = input_size padded_h, padded_w = padded_size h, w = orig_size out_masks = [] for mask in masks: mask_h, mask_w = mask.shape mask_input_h = int(input_h * (mask_h / padded_h)) mask_input_w = int(input_w * (mask_w / padded_w)) mask = ( cv2.resize( mask[:mask_input_h, :mask_input_w], (w, h), interpolation=cv2.INTER_LINEAR, ) > mask_threshold ) out_masks.append(mask_api.encode( np.asfortranarray(mask) )) return out_masks def polygons_to_bitmask( polygons: List[np.ndarray], height: int, width: int ) -> np.ndarray: """Converts polygons to bitmask. Reference: https://github.com/facebookresearch/detectron2/blob/main/detectron2/structures/masks.py#L22 Args: polygons(list[ndarray]): each array has shape (Nx2,) height(int): width(int): Returns: ndarray: a bool mask of shape (height, width) """ if not len(polygons): # COCOAPI does not support empty polygons return np.zeros((height, width)).astype(bool) rles = mask_api.frPyObjects(polygons, height, width) rle = mask_api.merge(rles) return mask_api.decode(rle).astype(bool) def decode_to_mask(segm, image_size): """Converts segmentation to mask.""" if isinstance(segm, list): # polygon mask = polygons_to_bitmask(segm, *image_size) elif isinstance(segm, dict): # COCO RLE mask = mask_api.decode(segm) elif isinstance(segm, np.ndarray): assert ( segm.ndim == 2 ), 'Expect segmentation of 2 dimensions, got {}.'.format(segm.ndim) # mask array mask = segm else: raise ValueError( "Cannot convert segmentation of type '{}' to BitMasks!" 'Supported types are: polygons as list[list[float] or ndarray],' ' COCO-style RLE as a dict, or a binary segmentation mask ' ' in a 2D numpy array of shape HxW.'.format(type(segm)) ) return mask def mask_to_box(mask): """Converts mask to box.""" boxes = np.zeros((4,), dtype=np.float32) x_any = np.any(mask, axis=0) y_any = np.any(mask, axis=1) x = np.where(x_any)[0] y = np.where(y_any)[0] if len(x) and len(y): boxes = np.array([x[0], y[0], x[-1] + 1, y[-1] + 1], dtype=np.float32) return boxes class RefCocoEvaluator(object): """Class that evaluates the RefCOCO. Reference: https://github.com/ashkamath/mdetr/blob/main/datasets/refexp.py """ def __init__( self, dataset_name: str, annotations_loc: str, k=(1,), iou_threshold=0.5, step: Optional[int] = None, ): self.dataset_name = dataset_name self.annotations_loc = annotations_loc if self.annotations_loc: logging.info('Loading refer annotations from %s.', self.annotations_loc) self.annotations = json.load(tf.io.gfile.GFile(self.annotations_loc)) else: self.annotations = { 'images': [], 'annotations': [], 'type': 'refer', 'info': {}, 'licenses': [], 'categories': [{'id': 1, 'name': 'object'}], } self.predictions = [] self.pred_image_set = set() self.gt_image_set = set() self.k = k self.iou_threshold = iou_threshold self.mask_threshold = 0. # self.results = [] self._num_examples_added = 0 self.step = step def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): """Compute Precision.""" boxes = prediction['detection_boxes'] masks = prediction.get('detection_masks', None) boxes = rescale_and_convert_boxes_to_xywh( boxes, target['size'], target['orig_size'] ) boxes = np.asarray(boxes).tolist() if masks is not None: masks = rescale_and_encode_masks( masks, target['size'], target['padded_size'], target['orig_size'], self.mask_threshold, ) img_id = int(target['image/id']) if img_id in self.pred_image_set: logging.warn('Duplicate image %s not being added again', img_id) return self.pred_image_set.add(img_id) for i in range(len(boxes)): refexp_id = int(target['refexp_ids'][i]) # [4], in XYXY abs format pred_box = boxes[i] caption = target['captions'][i] if not refexp_id > 0: continue self._num_examples_added += 1 single_pred = { 'id': refexp_id, 'image_id': img_id, 'bbox': pred_box, 'refexp': caption, } if masks is not None: single_pred['segmentation'] = masks[i] self.predictions.append(single_pred) # create annotation json if not self.annotations_loc and img_id not in self.gt_image_set: # avoid adding the same image twice due to repeated sampling. self.annotations['images'].append({'id': img_id}) gt_boxes = target['boxes'] gt_boxes = rescale_and_convert_boxes_to_xywh( gt_boxes, target['size'], target['orig_size'] ) gt_boxes = np.asarray(gt_boxes).tolist() for i in range(len(gt_boxes)): gt_box = gt_boxes[i] refexp_id = int(target['refexp_ids'][i]) if not refexp_id > 0: continue caption = target['captions'][i] self.annotations['annotations'].append({ 'id': refexp_id, 'image_id': img_id, 'bbox': gt_box, 'refexp': caption, }) self.gt_image_set.add(img_id) def __len__(self): return self._num_examples_added def clear(self): self.predictions = [] self._num_examples_added = 0 self.pred_image_set = set() self.gt_image_set = set() def write_pred_annotations_to_file( self, path: str, is_groundtruth: bool = False ): """Writes predictions to file in JSON format. Args: path: Path to write the prediction annotation JSON file. is_groundtruth: bool; if the file is ground truth or prediction. Returns: json_file_path: path to the saved json """ if not tf.io.gfile.exists(path): tf.io.gfile.makedirs(path) fname_app = 'predictions' if not is_groundtruth else 'annotations' if self.step: json_file_name = f'{self.dataset_name}_{fname_app}_{self.step}.json' else: json_file_name = f'{self.dataset_name}_{fname_app}.json' json_file_path = os.path.join(path, json_file_name) logging.info('Saving predictions to %s.', json_file_path) def _convert_to_serializable(obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, np.float32): return float(obj) else: raise TypeError(f'Unserializable object {obj} of type {type(obj)}') with tf.io.gfile.GFile(json_file_path, 'w') as f: f.write( json.dumps( self.predictions if not is_groundtruth else self.annotations, default=_convert_to_serializable, ) ) logging.info('Predicted annotations are stored in %s.', json_file_path) return json_file_path def compute_metrics( self, save_dir: str, clear_annotations: Optional[bool] = True, skip_evaluate=False, ) -> Dict[str, Any]: """Computes the metrics for all added predictions.""" self.write_pred_annotations_to_file(save_dir) if not self.annotations_loc: self.write_pred_annotations_to_file(save_dir, is_groundtruth=True) if skip_evaluate: return {} pred_map = {d['id']: idx for idx, d in enumerate(self.predictions)} # NOTE(jiaruixu): handle coco style annotation if 'refexp_id' in self.annotations['annotations'][0]: gt_anno_map = {} for idx, d in enumerate(self.annotations['annotations']): refexp_ids = d['refexp_id'] for refexp_id in refexp_ids: gt_anno_map[refexp_id] = idx else: gt_anno_map = { d['id']: idx for idx, d in enumerate(self.annotations['annotations']) } gt_image_map = { d['id']: idx for idx, d in enumerate(self.annotations['images']) } eval_seg = ( 'segmentation' in self.predictions[0] and 'segmentation' in self.annotations['annotations'][0] ) box_tp_list = [] seg_inter_list = [] seg_union_list = [] seg_box_tp_list = [] for refexp_id in pred_map: pred = self.predictions[pred_map[refexp_id]] gt_anno = self.annotations['annotations'][gt_anno_map[refexp_id]] # single box pred_box = np.array(pred['bbox']).reshape(-1, 4) gt_box = np.array(gt_anno['bbox']).reshape(-1, 4) pred_box[:, 2:4] += pred_box[:, :2] gt_box[:, 2:4] += gt_box[:, :2] box_iou, _ = box_utils.box_iou(pred_box, gt_box, np_backbone=np) for k in self.k: box_tp_list.append(max(box_iou[:k]) > self.iou_threshold) if eval_seg: gt_image = self.annotations['images'][gt_image_map[gt_anno['image_id']]] image_size = (gt_image['height'], gt_image['width']) pred_mask = decode_to_mask(pred['segmentation'], image_size) gt_mask = decode_to_mask(gt_anno['segmentation'], image_size) cur_inter = (pred_mask & gt_mask).sum() cur_union = (pred_mask | gt_mask).sum() seg_inter_list.append(cur_inter) seg_union_list.append(cur_union) pred_seg_box = mask_to_box(pred_mask).reshape(-1, 4) seg_box_iou, _ = box_utils.box_iou(pred_seg_box, gt_box, np_backbone=np) for k in self.k: seg_box_tp_list.append(max(seg_box_iou[:k]) > self.iou_threshold) # compute mean over all refexp box_tp = ( np.array(box_tp_list).reshape(len(pred_map), len(self.k)).mean(axis=0) ) metrics = { f'box_Precision@{k}': result for k, result in zip(self.k, box_tp) } if eval_seg: # compute mean over all refexp seg_box_tp = ( np.array(seg_box_tp_list) .reshape(len(pred_map), len(self.k)) .mean(axis=0) ) metrics.update( { f'seg_box_Precision@{k}': result for k, result in zip(self.k, seg_box_tp) } ) seg_inter_list = np.array(seg_inter_list) seg_union_list = np.array(seg_union_list) metrics['seg_cIoU'] = seg_inter_list.mean() / ( seg_union_list.mean() + 1e-5 ) metrics['seg_gIoU'] = (seg_inter_list / (seg_union_list + 1e-5)).mean() metrics['seg_AP'] = ( (seg_inter_list / (seg_union_list + 1e-5)) > self.iou_threshold ).mean() if clear_annotations: self.clear() return metrics class DensecapEvaluator(object): """DensecapEvaluator wrapper.""" def __init__(self, dataset_name: str, annotations_loc, eval_meteor=True, ignore_empty_string=True, step: Optional[int] = None): self.dataset_name = dataset_name self.step = step self.evaluator = densecap_evaluator.DensecapEval( annotations_loc, eval_meteor=eval_meteor, ignore_empty_string=ignore_empty_string) self.predictions = [] self._num_examples_added = 0 self.pred_image_set = set() def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): """Add prediction of a single image to the evaluator. Args: prediction: Model prediction tuple of 4 arrays: boxes, scores, classes, captions. 'boxes' is in shape of `[num_objects, 4]` and 'pred_boxes', 'classes' are botoh in shape of `[num_objects, num_classes]`. 'captions' is a list of strings. Box coordinates are absolute values in the input image coordinates. We need to scale them back to the original image coordinates using information in target. target: Target dictionary with keys 'orig_size', 'size', and 'image/id'. """ boxes = prediction['detection_boxes'] scores = prediction['detection_scores'] captions = prediction['captions'] boxes = rescale_and_convert_boxes_to_xywh( boxes, target['size'], target['orig_size'] ) boxes = np.asarray(boxes).tolist() img_id = int(target['image/id']) if img_id in self.pred_image_set: logging.warn('Duplicate image %s not being added again', img_id) return self.pred_image_set.add(img_id) for bbox, score, caption in zip( boxes, scores, captions): single_classification = { 'image_id': img_id, 'category_id': 0, 'bbox': bbox, 'score': score, 'caption': caption, } self.predictions.append(single_classification) self._num_examples_added += 1 # pytype: disable=signature-mismatch def compute_metrics( self, save_dir: str, clear_annotations: Optional[bool] = True, skip_evaluate=False, ) -> Dict[str, Any]: # pytype: enable=signature-mismatch """Computes the metrics for all added predictions.""" if self.step: fname_app = f'{self.dataset_name}_{self.step}.json' else: fname_app = f'{self.dataset_name}.json' self.write_pred_annotations_to_file(save_dir, fname_app=fname_app) if skip_evaluate: return {} results = self.evaluator.compute_metrics(self.predictions) if clear_annotations: self.clear() return results def clear(self): self.predictions = [] self._num_examples_added = 0 self.pred_image_set = set() def __len__(self): return self._num_examples_added def write_pred_annotations_to_file(self, path: str, fname_app: Optional[str] = None): """Writes predictions to file in JSON format. Args: path: Path to write the prediction annotation JSON file. fname_app: Optional string to append to the file name. """ if not tf.io.gfile.exists(path): tf.io.gfile.makedirs(path) json_file_name = f"predictions{fname_app if fname_app else ''}.json" json_file_path = os.path.join(path, json_file_name) def _convert_to_serializable(obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, np.float32): return float(obj) else: raise TypeError(f'Unserializable object {obj} of type {type(obj)}') with tf.io.gfile.GFile(json_file_path, 'w') as f: f.write( json.dumps( self.predictions, default=_convert_to_serializable)) logging.info('Predicted annotations are stored in %s.', json_file_path) class LocaEvaluator(object): """Location-conditioned Caption wrapper.""" merge_gt_boxes_iou = 0.7 def __init__(self, dataset_name: str, step: Optional[int] = None, merge_gt_boxes: Optional[bool] = False, meteor_jar_path: Optional[str] = None, java_jre_path: Optional[str] = None): self.dataset_name = dataset_name self.merge_gt_boxes = merge_gt_boxes self.step = step self.predictions = [] self._num_examples_added = 0 self._num_captions_added = 0 self.pred_image_set = set() self.meteor_jar_path = meteor_jar_path self.java_jre_path = java_jre_path self.annotations = { 'images': [], 'annotations': [], 'type': 'captions', 'info': {}, 'licenses': [], 'categories': [{'id': 1, 'name': 'object'}], } @staticmethod def merge_gt_anno(gts, iou_thresh, is_gt=True): """VG ground truth are overlaping. We need to merge them before evaluating. Original code: github.com/jcjohnson/densecap/blob/maste*/densecap/box_utils.lua#L590 github.com/jcjohnson/densecap/blob/maste*/eval/eval_utils.lua#L105 Args: gts: gts of a single image. list of dicts, each with the following keys: 'bbox': list of 4 floats in order (l, t, w, h) 'caption': a string. ... iou_thresh: float is_gt: bool Returns: new_gts: list of dicts. Might have different length from the input. 'bbox': list of 4 floats in order (l, t, w, h) 'captions': list of strings. """ new_gts = [] if not gts: return new_gts gt_boxes = np.asarray([x['bbox'] for x in gts], dtype=np.float32) ious, _ = box_utils.box_iou(gt_boxes, gt_boxes, np_backbone=np) # N x N while True: can_merge = ious >= iou_thresh # Find the largest cluster and merge it. num_merges = can_merge.sum(axis=1) # N ind = np.argmax(num_merges) # int if num_merges[ind] == 0: break merge_inds = np.nonzero(can_merge[ind])[0] new_box = gt_boxes[merge_inds].mean(axis=0) all_captions = [gts[x]['caption'].replace('\n', '') for x in merge_inds] for merge_ind in merge_inds: if is_gt: new_gt = { 'bbox': new_box, 'captions': all_captions, 'id': gts[merge_ind]['id'], } else: new_gt = { 'bbox': new_box, 'caption': gts[merge_ind]['caption'], 'id': gts[merge_ind]['id'], } new_gts.append(new_gt) ious[merge_inds, :] = 0.0 ious[:, merge_inds] = 0.0 return new_gts def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): """Add prediction of a single image to the evaluator. Args: prediction: Model prediction tuple of 4 arrays: boxes, scores, classes, captions. 'boxes' is in shape of `[num_objects, 4]` and 'pred_boxes', 'classes' are botoh in shape of `[num_objects, num_classes]`. 'captions' is a list of strings. Box coordinates are absolute values in the input image coordinates. We need to scale them back to the original image coordinates using information in target. target: Target dictionary with keys 'orig_size', 'size', and 'image/id'. """ captions = prediction['captions'] boxes = prediction['detection_boxes'] gt_captions = target['captions'] gt_boxes = target['boxes'] boxes = rescale_and_convert_boxes_to_xywh( boxes, target['size'], target['orig_size'] ) boxes = np.asarray(boxes).tolist() gt_boxes = rescale_and_convert_boxes_to_xywh( gt_boxes, target['size'], target['orig_size'] ) gt_boxes = np.asarray(gt_boxes).tolist() assert len(boxes) == len(captions) assert len(gt_boxes) == len(boxes) assert len(gt_captions) == len(captions) img_id = int(target['image/id']) if img_id in self.pred_image_set: logging.warn('Duplicate image %s not being added again', img_id) return self.pred_image_set.add(img_id) self.annotations['images'].append({'id': self._num_captions_added}) cur_preds = [] cur_annos = [] for caption, box, gt_caption, gt_box in zip( captions, boxes, gt_captions, gt_boxes ): if max(gt_box) <= 0: continue single_classification = { 'image_id': img_id, 'id': self._num_captions_added, 'category_id': 0, 'bbox': box, 'caption': caption, } single_annotation = { 'image_id': img_id, 'id': self._num_captions_added, 'category_id': 0, 'bbox': gt_box, 'caption': gt_caption, } # self.annotations['annotations'].append(single_annotation) # self.predictions.append(single_classification) cur_preds.append(single_classification) cur_annos.append(single_annotation) self._num_captions_added += 1 if self.merge_gt_boxes: cur_preds = self.merge_gt_anno( cur_preds, self.merge_gt_boxes_iou, is_gt=False ) cur_annos = self.merge_gt_anno(cur_annos, self.merge_gt_boxes_iou) self.predictions.extend(cur_preds) self.annotations['annotations'].extend(cur_annos) self._num_examples_added += 1 # pytype: disable=signature-mismatch def compute_metrics( self, save_dir: str, clear_annotations: Optional[bool] = True, skip_evaluate=False, ) -> Dict[str, Any]: # pytype: enable=signature-mismatch """Computes the metrics for all added predictions.""" if self.step: fname_app = f'{self.dataset_name}_{self.step}.json' else: fname_app = f'{self.dataset_name}.json' self.write_pred_annotations_to_file(save_dir, fname_app=fname_app) if skip_evaluate: return {} res = {} gts = {} for pred in self.predictions: if 'captions' in pred: res[pred['id']] = [{'caption': c} for c in pred['captions']] else: res[pred['id']] = [pred] for anno in self.annotations['annotations']: if 'captions' in anno: gts[anno['id']] = [{'caption': c} for c in anno['captions']] else: gts[anno['id']] = [anno] res = tokenize(res) gts = tokenize(gts) scorers = [ (Rouge(), 'ROUGE_L'), (Cider(), 'CIDEr'), (Bleu(), 'BLEU-4'), (Meteor(), 'Meteor'), ] results = {} for scorer, method in scorers: logging.info('computing %s score...', scorer.method()) score, _ = scorer.compute_score(gts, res) results[method] = score if clear_annotations: self.clear() return results def clear(self): self.predictions = [] self._num_examples_added = 0 self._num_captions_added = 0 self.pred_image_set = set() def __len__(self): return self._num_examples_added def write_pred_annotations_to_file(self, path: str, fname_app: Optional[str] = None): """Writes predictions to file in JSON format. Args: path: Path to write the prediction annotation JSON file. fname_app: Optional string to append to the file name. """ if not tf.io.gfile.exists(path): tf.io.gfile.makedirs(path) json_file_name = f"predictions{fname_app if fname_app else ''}.json" json_file_path = os.path.join(path, json_file_name) def _convert_to_serializable(obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, np.float32): return float(obj) else: raise TypeError(f'Unserializable object {obj} of type {type(obj)}') with tf.io.gfile.GFile(json_file_path, 'w') as f: f.write( json.dumps( self.predictions, default=_convert_to_serializable)) logging.info('Predicted annotations are stored in %s.', json_file_path)