| |
| import itertools |
| import logging |
| import os.path as osp |
| import tempfile |
| import warnings |
| from collections import OrderedDict, defaultdict |
| from typing import Dict, List, Optional, Sequence, Union |
|
|
| import numpy as np |
| import torch |
| from mmengine.dist import (all_gather_object, broadcast_object_list, |
| is_main_process) |
| from mmengine.evaluator import BaseMetric |
| from mmengine.evaluator.metric import _to_cpu |
| from mmengine.fileio import get_local_path |
| from mmengine.logging import MMLogger, print_log |
| from terminaltables import AsciiTable |
|
|
| from mmdet.registry import METRICS |
| from mmdet.structures.mask import encode_mask_results |
| from ..functional import eval_recalls |
| from .coco_metric import CocoMetric |
|
|
| try: |
| import lvis |
|
|
| if getattr(lvis, '__version__', '0') >= '10.5.3': |
| warnings.warn( |
| 'mmlvis is deprecated, please install official lvis-api by "pip install git+https://github.com/lvis-dataset/lvis-api.git"', |
| UserWarning) |
| from lvis import LVIS, LVISEval, LVISResults |
| except ImportError: |
| lvis = None |
| LVISEval = None |
| LVISResults = None |
|
|
|
|
| @METRICS.register_module() |
| class LVISMetric(CocoMetric): |
| """LVIS evaluation metric. |
| |
| Args: |
| ann_file (str, optional): Path to the coco format annotation file. |
| If not specified, ground truth annotations from the dataset will |
| be converted to coco format. Defaults to None. |
| metric (str | List[str]): Metrics to be evaluated. Valid metrics |
| include 'bbox', 'segm', 'proposal', and 'proposal_fast'. |
| Defaults to 'bbox'. |
| classwise (bool): Whether to evaluate the metric class-wise. |
| Defaults to False. |
| proposal_nums (Sequence[int]): Numbers of proposals to be evaluated. |
| Defaults to (100, 300, 1000). |
| iou_thrs (float | List[float], optional): IoU threshold to compute AP |
| and AR. If not specified, IoUs from 0.5 to 0.95 will be used. |
| Defaults to None. |
| metric_items (List[str], optional): Metric result names to be |
| recorded in the evaluation result. Defaults to None. |
| format_only (bool): Format the output results without perform |
| evaluation. It is useful when you want to format the result |
| to a specific format and submit it to the test server. |
| Defaults to False. |
| outfile_prefix (str, optional): The prefix of json files. It includes |
| the file path and the prefix of filename, e.g., "a/b/prefix". |
| If not specified, a temp file will be created. Defaults to None. |
| collect_device (str): Device name used for collecting results from |
| different ranks during distributed training. Must be 'cpu' or |
| 'gpu'. Defaults to 'cpu'. |
| prefix (str, optional): The prefix that will be added in the metric |
| names to disambiguate homonymous metrics of different evaluators. |
| If prefix is not provided in the argument, self.default_prefix |
| will be used instead. Defaults to None. |
| file_client_args (dict, optional): Arguments to instantiate the |
| corresponding backend in mmdet <= 3.0.0rc6. Defaults to None. |
| backend_args (dict, optional): Arguments to instantiate the |
| corresponding backend. Defaults to None. |
| """ |
|
|
| default_prefix: Optional[str] = 'lvis' |
|
|
| def __init__(self, |
| ann_file: Optional[str] = None, |
| metric: Union[str, List[str]] = 'bbox', |
| classwise: bool = False, |
| proposal_nums: Sequence[int] = (100, 300, 1000), |
| iou_thrs: Optional[Union[float, Sequence[float]]] = None, |
| metric_items: Optional[Sequence[str]] = None, |
| format_only: bool = False, |
| outfile_prefix: Optional[str] = None, |
| collect_device: str = 'cpu', |
| prefix: Optional[str] = None, |
| file_client_args: dict = None, |
| backend_args: dict = None) -> None: |
| if lvis is None: |
| raise RuntimeError( |
| 'Package lvis is not installed. Please run "pip install ' |
| 'git+https://github.com/lvis-dataset/lvis-api.git".') |
| super().__init__(collect_device=collect_device, prefix=prefix) |
| |
| self.metrics = metric if isinstance(metric, list) else [metric] |
| allowed_metrics = ['bbox', 'segm', 'proposal', 'proposal_fast'] |
| for metric in self.metrics: |
| if metric not in allowed_metrics: |
| raise KeyError( |
| "metric should be one of 'bbox', 'segm', 'proposal', " |
| f"'proposal_fast', but got {metric}.") |
|
|
| |
| self.classwise = classwise |
|
|
| |
| self.proposal_nums = list(proposal_nums) |
|
|
| |
| if iou_thrs is None: |
| iou_thrs = np.linspace( |
| .5, 0.95, int(np.round((0.95 - .5) / .05)) + 1, endpoint=True) |
| self.iou_thrs = iou_thrs |
| self.metric_items = metric_items |
| self.format_only = format_only |
| if self.format_only: |
| assert outfile_prefix is not None, 'outfile_prefix must be not' |
| 'None when format_only is True, otherwise the result files will' |
| 'be saved to a temp directory which will be cleaned up at the end.' |
|
|
| self.outfile_prefix = outfile_prefix |
| self.backend_args = backend_args |
| if file_client_args is not None: |
| raise RuntimeError( |
| 'The `file_client_args` is deprecated, ' |
| 'please use `backend_args` instead, please refer to' |
| 'https://github.com/open-mmlab/mmdetection/blob/main/configs/_base_/datasets/coco_detection.py' |
| ) |
|
|
| |
| |
| if ann_file is not None: |
| with get_local_path( |
| ann_file, backend_args=self.backend_args) as local_path: |
| self._lvis_api = LVIS(local_path) |
| else: |
| self._lvis_api = None |
|
|
| |
| self.cat_ids = None |
| self.img_ids = None |
|
|
| def fast_eval_recall(self, |
| results: List[dict], |
| proposal_nums: Sequence[int], |
| iou_thrs: Sequence[float], |
| logger: Optional[MMLogger] = None) -> np.ndarray: |
| """Evaluate proposal recall with LVIS's fast_eval_recall. |
| |
| Args: |
| results (List[dict]): Results of the dataset. |
| proposal_nums (Sequence[int]): Proposal numbers used for |
| evaluation. |
| iou_thrs (Sequence[float]): IoU thresholds used for evaluation. |
| logger (MMLogger, optional): Logger used for logging the recall |
| summary. |
| Returns: |
| np.ndarray: Averaged recall results. |
| """ |
| gt_bboxes = [] |
| pred_bboxes = [result['bboxes'] for result in results] |
| for i in range(len(self.img_ids)): |
| ann_ids = self._lvis_api.get_ann_ids(img_ids=[self.img_ids[i]]) |
| ann_info = self._lvis_api.load_anns(ann_ids) |
| if len(ann_info) == 0: |
| gt_bboxes.append(np.zeros((0, 4))) |
| continue |
| bboxes = [] |
| for ann in ann_info: |
| x1, y1, w, h = ann['bbox'] |
| bboxes.append([x1, y1, x1 + w, y1 + h]) |
| bboxes = np.array(bboxes, dtype=np.float32) |
| if bboxes.shape[0] == 0: |
| bboxes = np.zeros((0, 4)) |
| gt_bboxes.append(bboxes) |
|
|
| recalls = eval_recalls( |
| gt_bboxes, pred_bboxes, proposal_nums, iou_thrs, logger=logger) |
| ar = recalls.mean(axis=1) |
| return ar |
|
|
| |
| |
| def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None: |
| """Process one batch of data samples and predictions. The processed |
| results should be stored in ``self.results``, which will be used to |
| compute the metrics when all batches have been processed. |
| |
| Args: |
| data_batch (dict): A batch of data from the dataloader. |
| data_samples (Sequence[dict]): A batch of data samples that |
| contain annotations and predictions. |
| """ |
| for data_sample in data_samples: |
| result = dict() |
| pred = data_sample['pred_instances'] |
| result['img_id'] = data_sample['img_id'] |
| result['bboxes'] = pred['bboxes'].cpu().numpy() |
| result['scores'] = pred['scores'].cpu().numpy() |
| result['labels'] = pred['labels'].cpu().numpy() |
| |
| if 'masks' in pred: |
| result['masks'] = encode_mask_results( |
| pred['masks'].detach().cpu().numpy()) |
| |
| if 'mask_scores' in pred: |
| result['mask_scores'] = pred['mask_scores'].cpu().numpy() |
|
|
| |
| gt = dict() |
| gt['width'] = data_sample['ori_shape'][1] |
| gt['height'] = data_sample['ori_shape'][0] |
| gt['img_id'] = data_sample['img_id'] |
| if self._lvis_api is None: |
| |
| assert 'instances' in data_sample, \ |
| 'ground truth is required for evaluation when ' \ |
| '`ann_file` is not provided' |
| gt['anns'] = data_sample['instances'] |
| |
| self.results.append((gt, result)) |
|
|
| def compute_metrics(self, results: list) -> Dict[str, float]: |
| """Compute the metrics from processed results. |
| |
| Args: |
| results (list): The processed results of each batch. |
| |
| Returns: |
| Dict[str, float]: The computed metrics. The keys are the names of |
| the metrics, and the values are corresponding results. |
| """ |
| logger: MMLogger = MMLogger.get_current_instance() |
|
|
| |
| gts, preds = zip(*results) |
|
|
| tmp_dir = None |
| if self.outfile_prefix is None: |
| tmp_dir = tempfile.TemporaryDirectory() |
| outfile_prefix = osp.join(tmp_dir.name, 'results') |
| else: |
| outfile_prefix = self.outfile_prefix |
|
|
| if self._lvis_api is None: |
| |
| logger.info('Converting ground truth to coco format...') |
| coco_json_path = self.gt_to_coco_json( |
| gt_dicts=gts, outfile_prefix=outfile_prefix) |
| self._lvis_api = LVIS(coco_json_path) |
|
|
| |
| if self.cat_ids is None: |
| self.cat_ids = self._lvis_api.get_cat_ids() |
| if self.img_ids is None: |
| self.img_ids = self._lvis_api.get_img_ids() |
|
|
| |
| result_files = self.results2json(preds, outfile_prefix) |
|
|
| eval_results = OrderedDict() |
| if self.format_only: |
| logger.info('results are saved in ' |
| f'{osp.dirname(outfile_prefix)}') |
| return eval_results |
|
|
| lvis_gt = self._lvis_api |
|
|
| for metric in self.metrics: |
| logger.info(f'Evaluating {metric}...') |
|
|
| |
| |
| if metric == 'proposal_fast': |
| ar = self.fast_eval_recall( |
| preds, self.proposal_nums, self.iou_thrs, logger=logger) |
| log_msg = [] |
| for i, num in enumerate(self.proposal_nums): |
| eval_results[f'AR@{num}'] = ar[i] |
| log_msg.append(f'\nAR@{num}\t{ar[i]:.4f}') |
| log_msg = ''.join(log_msg) |
| logger.info(log_msg) |
| continue |
|
|
| try: |
| lvis_dt = LVISResults(lvis_gt, result_files[metric]) |
| except IndexError: |
| logger.info( |
| 'The testing results of the whole dataset is empty.') |
| break |
|
|
| iou_type = 'bbox' if metric == 'proposal' else metric |
| lvis_eval = LVISEval(lvis_gt, lvis_dt, iou_type) |
| lvis_eval.params.imgIds = self.img_ids |
| metric_items = self.metric_items |
| if metric == 'proposal': |
| lvis_eval.params.useCats = 0 |
| lvis_eval.params.maxDets = list(self.proposal_nums) |
| lvis_eval.evaluate() |
| lvis_eval.accumulate() |
| lvis_eval.summarize() |
| if metric_items is None: |
| metric_items = ['AR@300', 'ARs@300', 'ARm@300', 'ARl@300'] |
| for k, v in lvis_eval.get_results().items(): |
| if k in metric_items: |
| val = float('{:.3f}'.format(float(v))) |
| eval_results[k] = val |
|
|
| else: |
| lvis_eval.evaluate() |
| lvis_eval.accumulate() |
| lvis_eval.summarize() |
| lvis_results = lvis_eval.get_results() |
| if self.classwise: |
| |
| |
| precisions = lvis_eval.eval['precision'] |
| |
| assert len(self.cat_ids) == precisions.shape[2] |
|
|
| results_per_category = [] |
| for idx, catId in enumerate(self.cat_ids): |
| |
| |
| |
| |
| nm = self._lvis_api.load_cats([catId])[0] |
| precision = precisions[:, :, idx, 0] |
| precision = precision[precision > -1] |
| if precision.size: |
| ap = np.mean(precision) |
| else: |
| ap = float('nan') |
| results_per_category.append( |
| (f'{nm["name"]}', f'{float(ap):0.3f}')) |
| eval_results[f'{nm["name"]}_precision'] = round(ap, 3) |
|
|
| num_columns = min(6, len(results_per_category) * 2) |
| results_flatten = list( |
| itertools.chain(*results_per_category)) |
| headers = ['category', 'AP'] * (num_columns // 2) |
| results_2d = itertools.zip_longest(*[ |
| results_flatten[i::num_columns] |
| for i in range(num_columns) |
| ]) |
| table_data = [headers] |
| table_data += [result for result in results_2d] |
| table = AsciiTable(table_data) |
| logger.info('\n' + table.table) |
|
|
| if metric_items is None: |
| metric_items = [ |
| 'AP', 'AP50', 'AP75', 'APs', 'APm', 'APl', 'APr', |
| 'APc', 'APf' |
| ] |
|
|
| for k, v in lvis_results.items(): |
| if k in metric_items: |
| key = '{}_{}'.format(metric, k) |
| val = float('{:.3f}'.format(float(v))) |
| eval_results[key] = val |
|
|
| lvis_eval.print_results() |
| if tmp_dir is not None: |
| tmp_dir.cleanup() |
| return eval_results |
|
|
|
|
| def _merge_lists(listA, listB, maxN, key): |
| result = [] |
| indA, indB = 0, 0 |
| while (indA < len(listA) or indB < len(listB)) and len(result) < maxN: |
| if (indB < len(listB)) and (indA >= len(listA) |
| or key(listA[indA]) < key(listB[indB])): |
| result.append(listB[indB]) |
| indB += 1 |
| else: |
| result.append(listA[indA]) |
| indA += 1 |
| return result |
|
|
|
|
| @METRICS.register_module() |
| class LVISFixedAPMetric(BaseMetric): |
| default_prefix: Optional[str] = 'lvis_fixed_ap' |
|
|
| def __init__(self, |
| ann_file: str, |
| topk: int = 10000, |
| format_only: bool = False, |
| outfile_prefix: Optional[str] = None, |
| collect_device: str = 'cpu', |
| prefix: Optional[str] = None, |
| backend_args: dict = None) -> None: |
|
|
| if lvis is None: |
| raise RuntimeError( |
| 'Package lvis is not installed. Please run "pip install ' |
| 'git+https://github.com/lvis-dataset/lvis-api.git".') |
| super().__init__(collect_device=collect_device, prefix=prefix) |
|
|
| self.format_only = format_only |
| if self.format_only: |
| assert outfile_prefix is not None, 'outfile_prefix must be not' |
| 'None when format_only is True, otherwise the result files will' |
| 'be saved to a temp directory which will be cleaned up at the end.' |
|
|
| self.outfile_prefix = outfile_prefix |
| self.backend_args = backend_args |
|
|
| with get_local_path( |
| ann_file, backend_args=self.backend_args) as local_path: |
| self._lvis_api = LVIS(local_path) |
|
|
| self.cat_ids = self._lvis_api.get_cat_ids() |
|
|
| self.results = {} |
| self.topk = topk |
|
|
| def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None: |
| """Process one batch of data samples and predictions. The processed |
| results should be stored in ``self.results``, which will be used to |
| compute the metrics when all batches have been processed. |
| |
| Args: |
| data_batch (dict): A batch of data from the dataloader. |
| data_samples (Sequence[dict]): A batch of data samples that |
| contain annotations and predictions. |
| """ |
| cur_results = [] |
| for data_sample in data_samples: |
| pred = data_sample['pred_instances'] |
| xmin, ymin, xmax, ymax = pred['bboxes'].cpu().unbind(1) |
| boxes = torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), |
| dim=1).tolist() |
|
|
| scores = pred['scores'].cpu().numpy() |
| labels = pred['labels'].cpu().numpy() |
|
|
| if len(boxes) == 0: |
| continue |
|
|
| cur_results.extend([{ |
| 'image_id': data_sample['img_id'], |
| 'category_id': self.cat_ids[labels[k]], |
| 'bbox': box, |
| 'score': scores[k], |
| } for k, box in enumerate(boxes)]) |
|
|
| by_cat = defaultdict(list) |
| for ann in cur_results: |
| by_cat[ann['category_id']].append(ann) |
|
|
| for cat, cat_anns in by_cat.items(): |
| if cat not in self.results: |
| self.results[cat] = [] |
|
|
| cur = sorted( |
| cat_anns, key=lambda x: x['score'], reverse=True)[:self.topk] |
| self.results[cat] = _merge_lists( |
| self.results[cat], cur, self.topk, key=lambda x: x['score']) |
|
|
| def compute_metrics(self, results: dict) -> dict: |
| logger: MMLogger = MMLogger.get_current_instance() |
|
|
| new_results = [] |
|
|
| missing_dets_cats = set() |
| for cat, cat_anns in results.items(): |
| if len(cat_anns) < self.topk: |
| missing_dets_cats.add(cat) |
| new_results.extend( |
| sorted(cat_anns, key=lambda x: x['score'], |
| reverse=True)[:self.topk]) |
|
|
| if missing_dets_cats: |
| logger.info( |
| f'\n===\n' |
| f'{len(missing_dets_cats)} classes had less than {self.topk} ' |
| f'detections!\n Outputting {self.topk} detections for each ' |
| f'class will improve AP further.\n ===') |
|
|
| new_results = LVISResults(self._lvis_api, new_results, max_dets=-1) |
| lvis_eval = LVISEval(self._lvis_api, new_results, iou_type='bbox') |
| params = lvis_eval.params |
| params.max_dets = -1 |
| lvis_eval.run() |
| lvis_eval.print_results() |
| metrics = { |
| k: v |
| for k, v in lvis_eval.results.items() if k.startswith('AP') |
| } |
| logger.info(f'mAP_copypaste: {metrics}') |
| return metrics |
|
|
| def evaluate(self, size: int) -> dict: |
| if len(self.results) == 0: |
| print_log( |
| f'{self.__class__.__name__} got empty `self.results`. Please ' |
| 'ensure that the processed results are properly added into ' |
| '`self.results` in `process` method.', |
| logger='current', |
| level=logging.WARNING) |
|
|
| all_cats = all_gather_object(self.results) |
| results = defaultdict(list) |
| for cats in all_cats: |
| for cat, cat_anns in cats.items(): |
| results[cat].extend(cat_anns) |
|
|
| if is_main_process(): |
| |
| results = _to_cpu(results) |
| _metrics = self.compute_metrics(results) |
| |
| if self.prefix: |
| _metrics = { |
| '/'.join((self.prefix, k)): v |
| for k, v in _metrics.items() |
| } |
| metrics = [_metrics] |
| else: |
| metrics = [None] |
|
|
| broadcast_object_list(metrics) |
|
|
| |
| self.results = {} |
| return metrics[0] |
|
|