|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Evaluation utils for PixelLLM.""" |
|
|
|
|
|
|
|
|
import json |
|
|
import os |
|
|
from typing import Any, Dict, Optional, List |
|
|
|
|
|
from absl import logging |
|
|
from coco_caption.coco import COCO as COCOCaption |
|
|
import cv2 |
|
|
|
|
|
try: |
|
|
from coco_caption.eval import COCOEvalCap |
|
|
from coco_caption.bleu import Bleu |
|
|
from coco_caption.cider import Cider |
|
|
from coco_caption.meteor import Meteor |
|
|
from coco_caption.rouge import Rouge |
|
|
from coco_caption.upp_tokenizer import tokenize |
|
|
except ImportError: |
|
|
COCOEvalCap = None |
|
|
Bleu = None |
|
|
Cider = None |
|
|
Meteor = None |
|
|
Rouge = None |
|
|
tokenize = None |
|
|
import numpy as np |
|
|
from pycocotools import mask as mask_api |
|
|
from scenic.model_lib.base_models import box_utils |
|
|
from scenic.projects.pixel_llm import densecap_evaluator |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from pix2seq.metrics.coco_caption_eval import COCOEvalCap as SimpleCOCOEvalCap |
|
|
except ImportError: |
|
|
SimpleCOCOEvalCap = None |
|
|
|
|
|
import tensorflow as tf |
|
|
|
|
|
|
|
|
class PointEvaluator(object): |
|
|
"""Class that evaluate the point prediction.""" |
|
|
|
|
|
def __init__( |
|
|
self, dataset_name: Optional[str] = '', step: Optional[int] = None |
|
|
): |
|
|
del dataset_name, step |
|
|
self.results = [] |
|
|
self._num_examples_added = 0 |
|
|
|
|
|
def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): |
|
|
"""Compute MSE.""" |
|
|
|
|
|
self._num_examples_added += 1 |
|
|
|
|
|
gt_coords = target['points'] |
|
|
|
|
|
valid_token_mask = target.get( |
|
|
'token_padding_mask', target['text_tokens'] > 0 |
|
|
) |
|
|
valid_token_mask *= gt_coords.max(axis=(-2, -1)) > 0 |
|
|
|
|
|
|
|
|
|
|
|
pred_coords = prediction['point_coords'] |
|
|
if pred_coords.ndim == 3: |
|
|
|
|
|
pred_coords = pred_coords.reshape(gt_coords.shape[:-2] + (1, 2)) |
|
|
|
|
|
|
|
|
height, width = target['size'] |
|
|
gt_coords = gt_coords / np.array([width, height]) |
|
|
pred_coords = pred_coords / np.array([width, height]) |
|
|
|
|
|
|
|
|
dist = np.mean( |
|
|
np.abs( |
|
|
np.expand_dims(pred_coords, axis=3) |
|
|
- np.expand_dims(gt_coords, axis=2) |
|
|
), |
|
|
axis=-1, |
|
|
) |
|
|
|
|
|
|
|
|
dist = np.min(dist, axis=-1).mean(axis=-1) |
|
|
|
|
|
|
|
|
dist *= valid_token_mask |
|
|
error = dist.sum() / (valid_token_mask.sum() + 1e-8) |
|
|
|
|
|
self.results.append(error) |
|
|
|
|
|
def __len__(self): |
|
|
return self._num_examples_added |
|
|
|
|
|
def clear(self): |
|
|
self.results = [] |
|
|
self._num_examples_added = 0 |
|
|
|
|
|
def compute_metrics( |
|
|
self, |
|
|
save_dir: str, |
|
|
clear_annotations: Optional[bool] = True, |
|
|
skip_evaluate=False, |
|
|
): |
|
|
del save_dir, skip_evaluate |
|
|
result = np.array(self.results).mean() |
|
|
if clear_annotations: |
|
|
self.clear() |
|
|
return {'point_l1': result} |
|
|
|
|
|
|
|
|
class CaptionEvaluator(object): |
|
|
"""Class that feeds model outputs to COCO caption evaluation api.""" |
|
|
|
|
|
def __init__( |
|
|
self, annotations_loc, eval_meteor_spice=False, step: Optional[int] = None |
|
|
): |
|
|
self.annotations_loc = annotations_loc |
|
|
logging.info('Initializing evaluator.') |
|
|
if self.annotations_loc: |
|
|
logging.info('Loading annotations from %s.', self.annotations_loc) |
|
|
self.coco = COCOCaption(self.annotations_loc) |
|
|
self.annotations = { |
|
|
'images': [], |
|
|
'annotations': [], |
|
|
'type': 'captions', |
|
|
'info': {}, |
|
|
'licenses': [], |
|
|
'categories': [{'id': 1, 'name': 'object'}], |
|
|
} |
|
|
self.predictions = [] |
|
|
self.pred_image_set = set() |
|
|
self.gt_image_set = set() |
|
|
self._num_examples_added = 0 |
|
|
self._num_captions_added = 0 |
|
|
self.eval_meteor_spice = eval_meteor_spice |
|
|
self.step = step |
|
|
|
|
|
def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): |
|
|
"""Add a single example to the evaluator. |
|
|
|
|
|
Args: |
|
|
prediction: Model prediction tuple of 3 arrays: boxes, scores, classes. |
|
|
'boxes' is in shape of `[num_objects, 4]` and 'pred_boxes', 'classes' |
|
|
are botoh in shape of `[num_objects, num_classes]`. Box coordinates are |
|
|
absolute values in the input image coordinates. We need to scale them |
|
|
back to the original image coordinates using information in target. |
|
|
target: Target dictionary with keys and 'image/id'. |
|
|
""" |
|
|
if isinstance(prediction, dict): |
|
|
pred_caption = prediction['caption'] |
|
|
else: |
|
|
pred_caption = prediction |
|
|
self._num_examples_added += 1 |
|
|
id_key = 'image_id' |
|
|
empty_gt = False |
|
|
if self.annotations_loc: |
|
|
|
|
|
img_id = int(target['image/id']) |
|
|
else: |
|
|
|
|
|
img_id = self._num_examples_added |
|
|
if img_id not in self.gt_image_set: |
|
|
|
|
|
self.annotations['images'].append({'id': img_id}) |
|
|
for x in target['captions']: |
|
|
|
|
|
|
|
|
if x: |
|
|
self._num_captions_added += 1 |
|
|
self.annotations['annotations'].append( |
|
|
{'id': self._num_captions_added, id_key: img_id, 'caption': x} |
|
|
) |
|
|
|
|
|
|
|
|
empty_gt = sum(len(t) for t in target['captions']) == 0 |
|
|
self.gt_image_set.add(img_id) |
|
|
single_prediction = { |
|
|
id_key: img_id, |
|
|
'caption': pred_caption, |
|
|
} |
|
|
if img_id not in self.pred_image_set: |
|
|
if empty_gt: |
|
|
logging.warn('Image %s does not have any ground truth caption', img_id) |
|
|
else: |
|
|
self.predictions.append(single_prediction) |
|
|
else: |
|
|
logging.warn('Duplicate image %s not being added again', img_id) |
|
|
self.pred_image_set.add(img_id) |
|
|
|
|
|
def compute_metrics( |
|
|
self, |
|
|
save_dir: str, |
|
|
clear_annotations: Optional[bool] = True, |
|
|
skip_evaluate=False, |
|
|
): |
|
|
"""Computes the metrics for all added predictions.""" |
|
|
json_file_path = self.write_pred_annotations_to_file(save_dir) |
|
|
if skip_evaluate: |
|
|
return {} |
|
|
if not self.annotations_loc: |
|
|
gt_file_path = self.write_pred_annotations_to_file( |
|
|
save_dir, is_groundtruth=True |
|
|
) |
|
|
self.coco = COCOCaption(gt_file_path) |
|
|
coco_res = self.coco.loadRes(json_file_path) |
|
|
evaluator_class = ( |
|
|
COCOEvalCap if (self.eval_meteor_spice) else SimpleCOCOEvalCap |
|
|
) |
|
|
coco_eval = evaluator_class(self.coco, coco_res) |
|
|
coco_eval.params['image_id'] = coco_res.getImgIds() |
|
|
coco_eval.evaluate() |
|
|
results = coco_eval.eval |
|
|
if clear_annotations: |
|
|
self.clear() |
|
|
return results |
|
|
|
|
|
def clear(self): |
|
|
self.predictions = [] |
|
|
self.pred_image_set = set() |
|
|
self._num_examples_added = 0 |
|
|
self._num_captions_added = 0 |
|
|
|
|
|
def __len__(self): |
|
|
return self._num_examples_added |
|
|
|
|
|
def write_pred_annotations_to_file( |
|
|
self, path: str, is_groundtruth: bool = False |
|
|
): |
|
|
"""Writes predictions to file in JSON format. |
|
|
|
|
|
Args: |
|
|
path: Path to write the prediction annotation JSON file. |
|
|
is_groundtruth: bool; if the file is ground truth or prediction. |
|
|
|
|
|
Returns: |
|
|
json_file_path: path to the saved json |
|
|
""" |
|
|
if not tf.io.gfile.exists(path): |
|
|
tf.io.gfile.makedirs(path) |
|
|
fname_app = 'predictions' if not is_groundtruth else 'annotations' |
|
|
if self.step: |
|
|
json_file_name = f'caption_{fname_app}_{self.step}.json' |
|
|
else: |
|
|
json_file_name = f'caption_{fname_app}.json' |
|
|
json_file_path = os.path.join(path, json_file_name) |
|
|
logging.info('Saving predictions to %s.', json_file_path) |
|
|
|
|
|
def _convert_to_serializable(obj): |
|
|
if isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
elif isinstance(obj, np.float32): |
|
|
return float(obj) |
|
|
else: |
|
|
raise TypeError(f'Unserializable object {obj} of type {type(obj)}') |
|
|
|
|
|
with tf.io.gfile.GFile(json_file_path, 'w') as f: |
|
|
f.write( |
|
|
json.dumps( |
|
|
self.predictions if not is_groundtruth else self.annotations, |
|
|
default=_convert_to_serializable, |
|
|
) |
|
|
) |
|
|
logging.info('Predicted annotations are stored in %s.', json_file_path) |
|
|
return json_file_path |
|
|
|
|
|
|
|
|
def rescale_and_convert_boxes_to_xywh(boxes, input_size, orig_size): |
|
|
"""Rescale boxes, and convert format to xywh.""" |
|
|
h, w = orig_size |
|
|
input_h, input_w = np.asarray(input_size) |
|
|
scale_factor = np.array([w, h, w, h]) / np.array( |
|
|
[input_w, input_h, input_w, input_h]) |
|
|
boxes = boxes * scale_factor[np.newaxis, :] |
|
|
boxes = np.maximum(boxes, 0) |
|
|
boxes[:, [0, 2]] = np.minimum(boxes[:, [0, 2]], w) |
|
|
boxes[:, [1, 3]] = np.minimum(boxes[:, [1, 3]], h) |
|
|
boxes[:, 2] -= boxes[:, 0] |
|
|
boxes[:, 3] -= boxes[:, 1] |
|
|
|
|
|
return boxes |
|
|
|
|
|
|
|
|
def rescale_and_encode_masks( |
|
|
masks, input_size, padded_size, orig_size, mask_threshold |
|
|
): |
|
|
"""Rescale masks, and encode into COCO format.""" |
|
|
input_h, input_w = input_size |
|
|
padded_h, padded_w = padded_size |
|
|
h, w = orig_size |
|
|
out_masks = [] |
|
|
for mask in masks: |
|
|
mask_h, mask_w = mask.shape |
|
|
mask_input_h = int(input_h * (mask_h / padded_h)) |
|
|
mask_input_w = int(input_w * (mask_w / padded_w)) |
|
|
|
|
|
mask = ( |
|
|
cv2.resize( |
|
|
mask[:mask_input_h, :mask_input_w], |
|
|
(w, h), |
|
|
interpolation=cv2.INTER_LINEAR, |
|
|
) |
|
|
> mask_threshold |
|
|
) |
|
|
out_masks.append(mask_api.encode( |
|
|
np.asfortranarray(mask) |
|
|
)) |
|
|
|
|
|
return out_masks |
|
|
|
|
|
|
|
|
def polygons_to_bitmask( |
|
|
polygons: List[np.ndarray], height: int, width: int |
|
|
) -> np.ndarray: |
|
|
"""Converts polygons to bitmask. |
|
|
|
|
|
Reference: |
|
|
https://github.com/facebookresearch/detectron2/blob/main/detectron2/structures/masks.py#L22 |
|
|
|
|
|
Args: |
|
|
polygons(list[ndarray]): each array has shape (Nx2,) |
|
|
height(int): |
|
|
width(int): |
|
|
|
|
|
Returns: |
|
|
ndarray: a bool mask of shape (height, width) |
|
|
""" |
|
|
if not len(polygons): |
|
|
|
|
|
return np.zeros((height, width)).astype(bool) |
|
|
rles = mask_api.frPyObjects(polygons, height, width) |
|
|
rle = mask_api.merge(rles) |
|
|
return mask_api.decode(rle).astype(bool) |
|
|
|
|
|
|
|
|
def decode_to_mask(segm, image_size): |
|
|
"""Converts segmentation to mask.""" |
|
|
if isinstance(segm, list): |
|
|
|
|
|
mask = polygons_to_bitmask(segm, *image_size) |
|
|
elif isinstance(segm, dict): |
|
|
|
|
|
mask = mask_api.decode(segm) |
|
|
elif isinstance(segm, np.ndarray): |
|
|
assert ( |
|
|
segm.ndim == 2 |
|
|
), 'Expect segmentation of 2 dimensions, got {}.'.format(segm.ndim) |
|
|
|
|
|
mask = segm |
|
|
else: |
|
|
raise ValueError( |
|
|
"Cannot convert segmentation of type '{}' to BitMasks!" |
|
|
'Supported types are: polygons as list[list[float] or ndarray],' |
|
|
' COCO-style RLE as a dict, or a binary segmentation mask ' |
|
|
' in a 2D numpy array of shape HxW.'.format(type(segm)) |
|
|
) |
|
|
return mask |
|
|
|
|
|
|
|
|
def mask_to_box(mask): |
|
|
"""Converts mask to box.""" |
|
|
boxes = np.zeros((4,), dtype=np.float32) |
|
|
x_any = np.any(mask, axis=0) |
|
|
y_any = np.any(mask, axis=1) |
|
|
x = np.where(x_any)[0] |
|
|
y = np.where(y_any)[0] |
|
|
if len(x) and len(y): |
|
|
boxes = np.array([x[0], y[0], x[-1] + 1, y[-1] + 1], dtype=np.float32) |
|
|
|
|
|
return boxes |
|
|
|
|
|
|
|
|
class RefCocoEvaluator(object): |
|
|
"""Class that evaluates the RefCOCO. |
|
|
|
|
|
Reference: https://github.com/ashkamath/mdetr/blob/main/datasets/refexp.py |
|
|
|
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
dataset_name: str, |
|
|
annotations_loc: str, |
|
|
k=(1,), |
|
|
iou_threshold=0.5, |
|
|
step: Optional[int] = None, |
|
|
): |
|
|
self.dataset_name = dataset_name |
|
|
self.annotations_loc = annotations_loc |
|
|
if self.annotations_loc: |
|
|
logging.info('Loading refer annotations from %s.', self.annotations_loc) |
|
|
self.annotations = json.load(tf.io.gfile.GFile(self.annotations_loc)) |
|
|
else: |
|
|
self.annotations = { |
|
|
'images': [], |
|
|
'annotations': [], |
|
|
'type': 'refer', |
|
|
'info': {}, |
|
|
'licenses': [], |
|
|
'categories': [{'id': 1, 'name': 'object'}], |
|
|
} |
|
|
self.predictions = [] |
|
|
self.pred_image_set = set() |
|
|
self.gt_image_set = set() |
|
|
|
|
|
self.k = k |
|
|
self.iou_threshold = iou_threshold |
|
|
self.mask_threshold = 0. |
|
|
|
|
|
self._num_examples_added = 0 |
|
|
self.step = step |
|
|
|
|
|
def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): |
|
|
"""Compute Precision.""" |
|
|
boxes = prediction['detection_boxes'] |
|
|
masks = prediction.get('detection_masks', None) |
|
|
boxes = rescale_and_convert_boxes_to_xywh( |
|
|
boxes, target['size'], target['orig_size'] |
|
|
) |
|
|
boxes = np.asarray(boxes).tolist() |
|
|
if masks is not None: |
|
|
masks = rescale_and_encode_masks( |
|
|
masks, |
|
|
target['size'], |
|
|
target['padded_size'], |
|
|
target['orig_size'], |
|
|
self.mask_threshold, |
|
|
) |
|
|
img_id = int(target['image/id']) |
|
|
|
|
|
if img_id in self.pred_image_set: |
|
|
logging.warn('Duplicate image %s not being added again', img_id) |
|
|
return |
|
|
self.pred_image_set.add(img_id) |
|
|
|
|
|
for i in range(len(boxes)): |
|
|
refexp_id = int(target['refexp_ids'][i]) |
|
|
|
|
|
pred_box = boxes[i] |
|
|
caption = target['captions'][i] |
|
|
if not refexp_id > 0: |
|
|
continue |
|
|
|
|
|
self._num_examples_added += 1 |
|
|
|
|
|
single_pred = { |
|
|
'id': refexp_id, |
|
|
'image_id': img_id, |
|
|
'bbox': pred_box, |
|
|
'refexp': caption, |
|
|
} |
|
|
if masks is not None: |
|
|
single_pred['segmentation'] = masks[i] |
|
|
self.predictions.append(single_pred) |
|
|
|
|
|
|
|
|
if not self.annotations_loc and img_id not in self.gt_image_set: |
|
|
|
|
|
self.annotations['images'].append({'id': img_id}) |
|
|
gt_boxes = target['boxes'] |
|
|
gt_boxes = rescale_and_convert_boxes_to_xywh( |
|
|
gt_boxes, target['size'], target['orig_size'] |
|
|
) |
|
|
gt_boxes = np.asarray(gt_boxes).tolist() |
|
|
for i in range(len(gt_boxes)): |
|
|
gt_box = gt_boxes[i] |
|
|
refexp_id = int(target['refexp_ids'][i]) |
|
|
if not refexp_id > 0: |
|
|
continue |
|
|
|
|
|
caption = target['captions'][i] |
|
|
self.annotations['annotations'].append({ |
|
|
'id': refexp_id, |
|
|
'image_id': img_id, |
|
|
'bbox': gt_box, |
|
|
'refexp': caption, |
|
|
}) |
|
|
self.gt_image_set.add(img_id) |
|
|
|
|
|
def __len__(self): |
|
|
return self._num_examples_added |
|
|
|
|
|
def clear(self): |
|
|
self.predictions = [] |
|
|
self._num_examples_added = 0 |
|
|
self.pred_image_set = set() |
|
|
self.gt_image_set = set() |
|
|
|
|
|
def write_pred_annotations_to_file( |
|
|
self, path: str, is_groundtruth: bool = False |
|
|
): |
|
|
"""Writes predictions to file in JSON format. |
|
|
|
|
|
Args: |
|
|
path: Path to write the prediction annotation JSON file. |
|
|
is_groundtruth: bool; if the file is ground truth or prediction. |
|
|
|
|
|
Returns: |
|
|
json_file_path: path to the saved json |
|
|
""" |
|
|
if not tf.io.gfile.exists(path): |
|
|
tf.io.gfile.makedirs(path) |
|
|
fname_app = 'predictions' if not is_groundtruth else 'annotations' |
|
|
if self.step: |
|
|
json_file_name = f'{self.dataset_name}_{fname_app}_{self.step}.json' |
|
|
else: |
|
|
json_file_name = f'{self.dataset_name}_{fname_app}.json' |
|
|
json_file_path = os.path.join(path, json_file_name) |
|
|
logging.info('Saving predictions to %s.', json_file_path) |
|
|
|
|
|
def _convert_to_serializable(obj): |
|
|
if isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
elif isinstance(obj, np.float32): |
|
|
return float(obj) |
|
|
else: |
|
|
raise TypeError(f'Unserializable object {obj} of type {type(obj)}') |
|
|
|
|
|
with tf.io.gfile.GFile(json_file_path, 'w') as f: |
|
|
f.write( |
|
|
json.dumps( |
|
|
self.predictions if not is_groundtruth else self.annotations, |
|
|
default=_convert_to_serializable, |
|
|
) |
|
|
) |
|
|
logging.info('Predicted annotations are stored in %s.', json_file_path) |
|
|
return json_file_path |
|
|
|
|
|
def compute_metrics( |
|
|
self, |
|
|
save_dir: str, |
|
|
clear_annotations: Optional[bool] = True, |
|
|
skip_evaluate=False, |
|
|
) -> Dict[str, Any]: |
|
|
"""Computes the metrics for all added predictions.""" |
|
|
self.write_pred_annotations_to_file(save_dir) |
|
|
if not self.annotations_loc: |
|
|
self.write_pred_annotations_to_file(save_dir, is_groundtruth=True) |
|
|
if skip_evaluate: |
|
|
return {} |
|
|
|
|
|
pred_map = {d['id']: idx for idx, d in enumerate(self.predictions)} |
|
|
|
|
|
if 'refexp_id' in self.annotations['annotations'][0]: |
|
|
gt_anno_map = {} |
|
|
for idx, d in enumerate(self.annotations['annotations']): |
|
|
refexp_ids = d['refexp_id'] |
|
|
for refexp_id in refexp_ids: |
|
|
gt_anno_map[refexp_id] = idx |
|
|
else: |
|
|
gt_anno_map = { |
|
|
d['id']: idx for idx, d in enumerate(self.annotations['annotations']) |
|
|
} |
|
|
gt_image_map = { |
|
|
d['id']: idx for idx, d in enumerate(self.annotations['images']) |
|
|
} |
|
|
eval_seg = ( |
|
|
'segmentation' in self.predictions[0] |
|
|
and 'segmentation' in self.annotations['annotations'][0] |
|
|
) |
|
|
box_tp_list = [] |
|
|
|
|
|
seg_inter_list = [] |
|
|
seg_union_list = [] |
|
|
seg_box_tp_list = [] |
|
|
|
|
|
for refexp_id in pred_map: |
|
|
pred = self.predictions[pred_map[refexp_id]] |
|
|
gt_anno = self.annotations['annotations'][gt_anno_map[refexp_id]] |
|
|
|
|
|
pred_box = np.array(pred['bbox']).reshape(-1, 4) |
|
|
gt_box = np.array(gt_anno['bbox']).reshape(-1, 4) |
|
|
pred_box[:, 2:4] += pred_box[:, :2] |
|
|
gt_box[:, 2:4] += gt_box[:, :2] |
|
|
|
|
|
box_iou, _ = box_utils.box_iou(pred_box, gt_box, np_backbone=np) |
|
|
for k in self.k: |
|
|
box_tp_list.append(max(box_iou[:k]) > self.iou_threshold) |
|
|
if eval_seg: |
|
|
gt_image = self.annotations['images'][gt_image_map[gt_anno['image_id']]] |
|
|
image_size = (gt_image['height'], gt_image['width']) |
|
|
pred_mask = decode_to_mask(pred['segmentation'], image_size) |
|
|
gt_mask = decode_to_mask(gt_anno['segmentation'], image_size) |
|
|
cur_inter = (pred_mask & gt_mask).sum() |
|
|
cur_union = (pred_mask | gt_mask).sum() |
|
|
seg_inter_list.append(cur_inter) |
|
|
seg_union_list.append(cur_union) |
|
|
|
|
|
pred_seg_box = mask_to_box(pred_mask).reshape(-1, 4) |
|
|
|
|
|
seg_box_iou, _ = box_utils.box_iou(pred_seg_box, gt_box, np_backbone=np) |
|
|
for k in self.k: |
|
|
seg_box_tp_list.append(max(seg_box_iou[:k]) > self.iou_threshold) |
|
|
|
|
|
|
|
|
box_tp = ( |
|
|
np.array(box_tp_list).reshape(len(pred_map), len(self.k)).mean(axis=0) |
|
|
) |
|
|
metrics = { |
|
|
f'box_Precision@{k}': result for k, result in zip(self.k, box_tp) |
|
|
} |
|
|
|
|
|
if eval_seg: |
|
|
|
|
|
seg_box_tp = ( |
|
|
np.array(seg_box_tp_list) |
|
|
.reshape(len(pred_map), len(self.k)) |
|
|
.mean(axis=0) |
|
|
) |
|
|
|
|
|
metrics.update( |
|
|
{ |
|
|
f'seg_box_Precision@{k}': result |
|
|
for k, result in zip(self.k, seg_box_tp) |
|
|
} |
|
|
) |
|
|
|
|
|
seg_inter_list = np.array(seg_inter_list) |
|
|
seg_union_list = np.array(seg_union_list) |
|
|
|
|
|
metrics['seg_cIoU'] = seg_inter_list.mean() / ( |
|
|
seg_union_list.mean() + 1e-5 |
|
|
) |
|
|
metrics['seg_gIoU'] = (seg_inter_list / (seg_union_list + 1e-5)).mean() |
|
|
metrics['seg_AP'] = ( |
|
|
(seg_inter_list / (seg_union_list + 1e-5)) > self.iou_threshold |
|
|
).mean() |
|
|
|
|
|
if clear_annotations: |
|
|
self.clear() |
|
|
return metrics |
|
|
|
|
|
|
|
|
class DensecapEvaluator(object): |
|
|
"""DensecapEvaluator wrapper.""" |
|
|
|
|
|
def __init__(self, dataset_name: str, annotations_loc, eval_meteor=True, |
|
|
ignore_empty_string=True, |
|
|
step: Optional[int] = None): |
|
|
self.dataset_name = dataset_name |
|
|
self.step = step |
|
|
self.evaluator = densecap_evaluator.DensecapEval( |
|
|
annotations_loc, eval_meteor=eval_meteor, |
|
|
ignore_empty_string=ignore_empty_string) |
|
|
self.predictions = [] |
|
|
self._num_examples_added = 0 |
|
|
self.pred_image_set = set() |
|
|
|
|
|
def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): |
|
|
"""Add prediction of a single image to the evaluator. |
|
|
|
|
|
Args: |
|
|
prediction: Model prediction tuple of 4 arrays: boxes, scores, classes, |
|
|
captions. 'boxes' is in shape of `[num_objects, 4]` and 'pred_boxes', |
|
|
'classes' are botoh in shape of `[num_objects, num_classes]`. 'captions' |
|
|
is a list of strings. Box coordinates are absolute values in the input |
|
|
image coordinates. We need to scale them back to the original image |
|
|
coordinates using information in target. |
|
|
target: Target dictionary with keys 'orig_size', 'size', and 'image/id'. |
|
|
""" |
|
|
boxes = prediction['detection_boxes'] |
|
|
scores = prediction['detection_scores'] |
|
|
captions = prediction['captions'] |
|
|
|
|
|
boxes = rescale_and_convert_boxes_to_xywh( |
|
|
boxes, target['size'], target['orig_size'] |
|
|
) |
|
|
boxes = np.asarray(boxes).tolist() |
|
|
img_id = int(target['image/id']) |
|
|
|
|
|
if img_id in self.pred_image_set: |
|
|
logging.warn('Duplicate image %s not being added again', img_id) |
|
|
return |
|
|
self.pred_image_set.add(img_id) |
|
|
|
|
|
for bbox, score, caption in zip( |
|
|
boxes, scores, captions): |
|
|
single_classification = { |
|
|
'image_id': img_id, |
|
|
'category_id': 0, |
|
|
'bbox': bbox, |
|
|
'score': score, |
|
|
'caption': caption, |
|
|
} |
|
|
self.predictions.append(single_classification) |
|
|
self._num_examples_added += 1 |
|
|
|
|
|
|
|
|
def compute_metrics( |
|
|
self, |
|
|
save_dir: str, |
|
|
clear_annotations: Optional[bool] = True, |
|
|
skip_evaluate=False, |
|
|
) -> Dict[str, Any]: |
|
|
|
|
|
"""Computes the metrics for all added predictions.""" |
|
|
if self.step: |
|
|
fname_app = f'{self.dataset_name}_{self.step}.json' |
|
|
else: |
|
|
fname_app = f'{self.dataset_name}.json' |
|
|
self.write_pred_annotations_to_file(save_dir, fname_app=fname_app) |
|
|
if skip_evaluate: |
|
|
return {} |
|
|
results = self.evaluator.compute_metrics(self.predictions) |
|
|
if clear_annotations: |
|
|
self.clear() |
|
|
return results |
|
|
|
|
|
def clear(self): |
|
|
self.predictions = [] |
|
|
self._num_examples_added = 0 |
|
|
self.pred_image_set = set() |
|
|
|
|
|
def __len__(self): |
|
|
return self._num_examples_added |
|
|
|
|
|
def write_pred_annotations_to_file(self, |
|
|
path: str, |
|
|
fname_app: Optional[str] = None): |
|
|
"""Writes predictions to file in JSON format. |
|
|
|
|
|
Args: |
|
|
path: Path to write the prediction annotation JSON file. |
|
|
fname_app: Optional string to append to the file name. |
|
|
""" |
|
|
if not tf.io.gfile.exists(path): |
|
|
tf.io.gfile.makedirs(path) |
|
|
json_file_name = f"predictions{fname_app if fname_app else ''}.json" |
|
|
json_file_path = os.path.join(path, json_file_name) |
|
|
|
|
|
def _convert_to_serializable(obj): |
|
|
if isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
elif isinstance(obj, np.float32): |
|
|
return float(obj) |
|
|
else: |
|
|
raise TypeError(f'Unserializable object {obj} of type {type(obj)}') |
|
|
|
|
|
with tf.io.gfile.GFile(json_file_path, 'w') as f: |
|
|
f.write( |
|
|
json.dumps( |
|
|
self.predictions, |
|
|
default=_convert_to_serializable)) |
|
|
logging.info('Predicted annotations are stored in %s.', json_file_path) |
|
|
|
|
|
|
|
|
class LocaEvaluator(object): |
|
|
"""Location-conditioned Caption wrapper.""" |
|
|
merge_gt_boxes_iou = 0.7 |
|
|
|
|
|
def __init__(self, dataset_name: str, |
|
|
step: Optional[int] = None, |
|
|
merge_gt_boxes: Optional[bool] = False, |
|
|
meteor_jar_path: Optional[str] = None, |
|
|
java_jre_path: Optional[str] = None): |
|
|
self.dataset_name = dataset_name |
|
|
self.merge_gt_boxes = merge_gt_boxes |
|
|
self.step = step |
|
|
self.predictions = [] |
|
|
self._num_examples_added = 0 |
|
|
self._num_captions_added = 0 |
|
|
self.pred_image_set = set() |
|
|
self.meteor_jar_path = meteor_jar_path |
|
|
self.java_jre_path = java_jre_path |
|
|
self.annotations = { |
|
|
'images': [], |
|
|
'annotations': [], |
|
|
'type': 'captions', |
|
|
'info': {}, |
|
|
'licenses': [], |
|
|
'categories': [{'id': 1, 'name': 'object'}], |
|
|
} |
|
|
|
|
|
@staticmethod |
|
|
def merge_gt_anno(gts, iou_thresh, is_gt=True): |
|
|
"""VG ground truth are overlaping. We need to merge them before evaluating. |
|
|
|
|
|
Original code: |
|
|
github.com/jcjohnson/densecap/blob/maste*/densecap/box_utils.lua#L590 |
|
|
github.com/jcjohnson/densecap/blob/maste*/eval/eval_utils.lua#L105 |
|
|
|
|
|
Args: |
|
|
gts: gts of a single image. list of dicts, each with the following keys: |
|
|
'bbox': list of 4 floats in order (l, t, w, h) |
|
|
'caption': a string. |
|
|
... |
|
|
iou_thresh: float |
|
|
is_gt: bool |
|
|
Returns: |
|
|
new_gts: list of dicts. Might have different length from the input. |
|
|
'bbox': list of 4 floats in order (l, t, w, h) |
|
|
'captions': list of strings. |
|
|
""" |
|
|
new_gts = [] |
|
|
if not gts: |
|
|
return new_gts |
|
|
gt_boxes = np.asarray([x['bbox'] for x in gts], dtype=np.float32) |
|
|
ious, _ = box_utils.box_iou(gt_boxes, gt_boxes, np_backbone=np) |
|
|
|
|
|
while True: |
|
|
can_merge = ious >= iou_thresh |
|
|
|
|
|
num_merges = can_merge.sum(axis=1) |
|
|
ind = np.argmax(num_merges) |
|
|
if num_merges[ind] == 0: |
|
|
break |
|
|
merge_inds = np.nonzero(can_merge[ind])[0] |
|
|
new_box = gt_boxes[merge_inds].mean(axis=0) |
|
|
all_captions = [gts[x]['caption'].replace('\n', '') for x in merge_inds] |
|
|
for merge_ind in merge_inds: |
|
|
if is_gt: |
|
|
new_gt = { |
|
|
'bbox': new_box, |
|
|
'captions': all_captions, |
|
|
'id': gts[merge_ind]['id'], |
|
|
} |
|
|
else: |
|
|
new_gt = { |
|
|
'bbox': new_box, |
|
|
'caption': gts[merge_ind]['caption'], |
|
|
'id': gts[merge_ind]['id'], |
|
|
} |
|
|
new_gts.append(new_gt) |
|
|
ious[merge_inds, :] = 0.0 |
|
|
ious[:, merge_inds] = 0.0 |
|
|
return new_gts |
|
|
|
|
|
def add_example(self, prediction: Any, target: Dict[str, np.ndarray]): |
|
|
"""Add prediction of a single image to the evaluator. |
|
|
|
|
|
Args: |
|
|
prediction: Model prediction tuple of 4 arrays: boxes, scores, classes, |
|
|
captions. 'boxes' is in shape of `[num_objects, 4]` and 'pred_boxes', |
|
|
'classes' are botoh in shape of `[num_objects, num_classes]`. 'captions' |
|
|
is a list of strings. Box coordinates are absolute values in the input |
|
|
image coordinates. We need to scale them back to the original image |
|
|
coordinates using information in target. |
|
|
target: Target dictionary with keys 'orig_size', 'size', and 'image/id'. |
|
|
""" |
|
|
captions = prediction['captions'] |
|
|
boxes = prediction['detection_boxes'] |
|
|
gt_captions = target['captions'] |
|
|
gt_boxes = target['boxes'] |
|
|
|
|
|
boxes = rescale_and_convert_boxes_to_xywh( |
|
|
boxes, target['size'], target['orig_size'] |
|
|
) |
|
|
boxes = np.asarray(boxes).tolist() |
|
|
gt_boxes = rescale_and_convert_boxes_to_xywh( |
|
|
gt_boxes, target['size'], target['orig_size'] |
|
|
) |
|
|
gt_boxes = np.asarray(gt_boxes).tolist() |
|
|
assert len(boxes) == len(captions) |
|
|
assert len(gt_boxes) == len(boxes) |
|
|
assert len(gt_captions) == len(captions) |
|
|
|
|
|
img_id = int(target['image/id']) |
|
|
|
|
|
if img_id in self.pred_image_set: |
|
|
logging.warn('Duplicate image %s not being added again', img_id) |
|
|
return |
|
|
self.pred_image_set.add(img_id) |
|
|
self.annotations['images'].append({'id': self._num_captions_added}) |
|
|
|
|
|
cur_preds = [] |
|
|
cur_annos = [] |
|
|
for caption, box, gt_caption, gt_box in zip( |
|
|
captions, boxes, gt_captions, gt_boxes |
|
|
): |
|
|
if max(gt_box) <= 0: |
|
|
continue |
|
|
single_classification = { |
|
|
'image_id': img_id, |
|
|
'id': self._num_captions_added, |
|
|
'category_id': 0, |
|
|
'bbox': box, |
|
|
'caption': caption, |
|
|
} |
|
|
single_annotation = { |
|
|
'image_id': img_id, |
|
|
'id': self._num_captions_added, |
|
|
'category_id': 0, |
|
|
'bbox': gt_box, |
|
|
'caption': gt_caption, |
|
|
} |
|
|
|
|
|
|
|
|
cur_preds.append(single_classification) |
|
|
cur_annos.append(single_annotation) |
|
|
self._num_captions_added += 1 |
|
|
if self.merge_gt_boxes: |
|
|
cur_preds = self.merge_gt_anno( |
|
|
cur_preds, self.merge_gt_boxes_iou, is_gt=False |
|
|
) |
|
|
cur_annos = self.merge_gt_anno(cur_annos, self.merge_gt_boxes_iou) |
|
|
self.predictions.extend(cur_preds) |
|
|
self.annotations['annotations'].extend(cur_annos) |
|
|
self._num_examples_added += 1 |
|
|
|
|
|
|
|
|
def compute_metrics( |
|
|
self, |
|
|
save_dir: str, |
|
|
clear_annotations: Optional[bool] = True, |
|
|
skip_evaluate=False, |
|
|
) -> Dict[str, Any]: |
|
|
|
|
|
"""Computes the metrics for all added predictions.""" |
|
|
if self.step: |
|
|
fname_app = f'{self.dataset_name}_{self.step}.json' |
|
|
else: |
|
|
fname_app = f'{self.dataset_name}.json' |
|
|
self.write_pred_annotations_to_file(save_dir, fname_app=fname_app) |
|
|
if skip_evaluate: |
|
|
return {} |
|
|
res = {} |
|
|
gts = {} |
|
|
for pred in self.predictions: |
|
|
if 'captions' in pred: |
|
|
res[pred['id']] = [{'caption': c} for c in pred['captions']] |
|
|
else: |
|
|
res[pred['id']] = [pred] |
|
|
for anno in self.annotations['annotations']: |
|
|
if 'captions' in anno: |
|
|
gts[anno['id']] = [{'caption': c} for c in anno['captions']] |
|
|
else: |
|
|
gts[anno['id']] = [anno] |
|
|
|
|
|
res = tokenize(res) |
|
|
gts = tokenize(gts) |
|
|
|
|
|
scorers = [ |
|
|
(Rouge(), 'ROUGE_L'), |
|
|
(Cider(), 'CIDEr'), |
|
|
(Bleu(), 'BLEU-4'), |
|
|
(Meteor(), 'Meteor'), |
|
|
] |
|
|
results = {} |
|
|
for scorer, method in scorers: |
|
|
logging.info('computing %s score...', scorer.method()) |
|
|
score, _ = scorer.compute_score(gts, res) |
|
|
results[method] = score |
|
|
if clear_annotations: |
|
|
self.clear() |
|
|
return results |
|
|
|
|
|
def clear(self): |
|
|
self.predictions = [] |
|
|
self._num_examples_added = 0 |
|
|
self._num_captions_added = 0 |
|
|
self.pred_image_set = set() |
|
|
|
|
|
def __len__(self): |
|
|
return self._num_examples_added |
|
|
|
|
|
def write_pred_annotations_to_file(self, |
|
|
path: str, |
|
|
fname_app: Optional[str] = None): |
|
|
"""Writes predictions to file in JSON format. |
|
|
|
|
|
Args: |
|
|
path: Path to write the prediction annotation JSON file. |
|
|
fname_app: Optional string to append to the file name. |
|
|
""" |
|
|
if not tf.io.gfile.exists(path): |
|
|
tf.io.gfile.makedirs(path) |
|
|
json_file_name = f"predictions{fname_app if fname_app else ''}.json" |
|
|
json_file_path = os.path.join(path, json_file_name) |
|
|
|
|
|
def _convert_to_serializable(obj): |
|
|
if isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
elif isinstance(obj, np.float32): |
|
|
return float(obj) |
|
|
else: |
|
|
raise TypeError(f'Unserializable object {obj} of type {type(obj)}') |
|
|
|
|
|
with tf.io.gfile.GFile(json_file_path, 'w') as f: |
|
|
f.write( |
|
|
json.dumps( |
|
|
self.predictions, |
|
|
default=_convert_to_serializable)) |
|
|
logging.info('Predicted annotations are stored in %s.', json_file_path) |
|
|
|