| |
| import copy |
| import json |
| import os.path as osp |
| import tempfile |
| from collections import OrderedDict |
| from multiprocessing import Process, Queue |
| from typing import Dict, List, Optional, Sequence, Union |
|
|
| import numpy as np |
| from mmengine.evaluator import BaseMetric |
| from mmengine.fileio import dump, get_text, load |
| from mmengine.logging import MMLogger |
| from scipy.sparse import csr_matrix |
| from scipy.sparse.csgraph import maximum_bipartite_matching |
|
|
| from mmdet.evaluation.functional.bbox_overlaps import bbox_overlaps |
| from mmdet.registry import METRICS |
|
|
| PERSON_CLASSES = ['background', 'person'] |
|
|
|
|
| @METRICS.register_module() |
| class CrowdHumanMetric(BaseMetric): |
| """CrowdHuman evaluation metric. |
| |
| Evaluate Average Precision (AP), Miss Rate (MR) and Jaccard Index (JI) |
| for detection tasks. |
| |
| Args: |
| ann_file (str): Path to the annotation file. |
| metric (str | List[str]): Metrics to be evaluated. Valid metrics |
| include 'AP', 'MR' and 'JI'. Defaults to 'AP'. |
| format_only (bool): Format the output results without perform |
| evaluation. It is useful when you want to format the result |
| to a specific format and submit it to the test server. |
| Defaults to False. |
| outfile_prefix (str, optional): The prefix of json files. It includes |
| the file path and the prefix of filename, e.g., "a/b/prefix". |
| If not specified, a temp file will be created. Defaults to None. |
| file_client_args (dict, optional): Arguments to instantiate the |
| corresponding backend in mmdet <= 3.0.0rc6. Defaults to None. |
| backend_args (dict, optional): Arguments to instantiate the |
| corresponding backend. Defaults to None. |
| collect_device (str): Device name used for collecting results from |
| different ranks during distributed training. Must be 'cpu' or |
| 'gpu'. Defaults to 'cpu'. |
| prefix (str, optional): The prefix that will be added in the metric |
| names to disambiguate homonymous metrics of different evaluators. |
| If prefix is not provided in the argument, self.default_prefix |
| will be used instead. Defaults to None. |
| eval_mode (int): Select the mode of evaluate. Valid mode include |
| 0(just body box), 1(just head box) and 2(both of them). |
| Defaults to 0. |
| iou_thres (float): IoU threshold. Defaults to 0.5. |
| compare_matching_method (str, optional): Matching method to compare |
| the detection results with the ground_truth when compute 'AP' |
| and 'MR'.Valid method include VOC and None(CALTECH). Default to |
| None. |
| mr_ref (str): Different parameter selection to calculate MR. Valid |
| ref include CALTECH_-2 and CALTECH_-4. Defaults to CALTECH_-2. |
| num_ji_process (int): The number of processes to evaluation JI. |
| Defaults to 10. |
| """ |
| default_prefix: Optional[str] = 'crowd_human' |
|
|
| def __init__(self, |
| ann_file: str, |
| metric: Union[str, List[str]] = ['AP', 'MR', 'JI'], |
| format_only: bool = False, |
| outfile_prefix: Optional[str] = None, |
| file_client_args: dict = None, |
| backend_args: dict = None, |
| collect_device: str = 'cpu', |
| prefix: Optional[str] = None, |
| eval_mode: int = 0, |
| iou_thres: float = 0.5, |
| compare_matching_method: Optional[str] = None, |
| mr_ref: str = 'CALTECH_-2', |
| num_ji_process: int = 10) -> None: |
| super().__init__(collect_device=collect_device, prefix=prefix) |
|
|
| self.ann_file = ann_file |
| |
| self.metrics = metric if isinstance(metric, list) else [metric] |
| allowed_metrics = ['MR', 'AP', 'JI'] |
| for metric in self.metrics: |
| if metric not in allowed_metrics: |
| raise KeyError(f"metric should be one of 'MR', 'AP', 'JI'," |
| f'but got {metric}.') |
|
|
| self.format_only = format_only |
| if self.format_only: |
| assert outfile_prefix is not None, 'outfile_prefix must be not' |
| 'None when format_only is True, otherwise the result files will' |
| 'be saved to a temp directory which will be cleaned up at the end.' |
| self.outfile_prefix = outfile_prefix |
| self.backend_args = backend_args |
| if file_client_args is not None: |
| raise RuntimeError( |
| 'The `file_client_args` is deprecated, ' |
| 'please use `backend_args` instead, please refer to' |
| 'https://github.com/open-mmlab/mmdetection/blob/main/configs/_base_/datasets/coco_detection.py' |
| ) |
|
|
| assert eval_mode in [0, 1, 2], \ |
| "Unknown eval mode. mr_ref should be one of '0', '1', '2'." |
| assert compare_matching_method is None or \ |
| compare_matching_method == 'VOC', \ |
| 'The alternative compare_matching_method is VOC.' \ |
| 'This parameter defaults to CALTECH(None)' |
| assert mr_ref == 'CALTECH_-2' or mr_ref == 'CALTECH_-4', \ |
| "mr_ref should be one of 'CALTECH_-2', 'CALTECH_-4'." |
| self.eval_mode = eval_mode |
| self.iou_thres = iou_thres |
| self.compare_matching_method = compare_matching_method |
| self.mr_ref = mr_ref |
| self.num_ji_process = num_ji_process |
|
|
| @staticmethod |
| def results2json(results: Sequence[dict], outfile_prefix: str) -> str: |
| """Dump the detection results to a json file.""" |
| result_file_path = f'{outfile_prefix}.json' |
| bbox_json_results = [] |
| for i, result in enumerate(results): |
| ann, pred = result |
| dump_dict = dict() |
| dump_dict['ID'] = ann['ID'] |
| dump_dict['width'] = ann['width'] |
| dump_dict['height'] = ann['height'] |
| dtboxes = [] |
| bboxes = pred.tolist() |
| for _, single_bbox in enumerate(bboxes): |
| temp_dict = dict() |
| x1, y1, x2, y2, score = single_bbox |
| temp_dict['box'] = [x1, y1, x2 - x1, y2 - y1] |
| temp_dict['score'] = score |
| temp_dict['tag'] = 1 |
| dtboxes.append(temp_dict) |
| dump_dict['dtboxes'] = dtboxes |
| bbox_json_results.append(dump_dict) |
| dump(bbox_json_results, result_file_path) |
| return result_file_path |
|
|
| def process(self, data_batch: Sequence[dict], |
| data_samples: Sequence[dict]) -> None: |
| """Process one batch of data samples and predictions. The processed |
| results should be stored in ``self.results``, which will be used to |
| compute the metrics when all batches have been processed. |
| |
| Args: |
| data_batch (dict): A batch of data from the dataloader. |
| data_samples (Sequence[dict]): A batch of data samples that |
| contain annotations and predictions. |
| """ |
| for data_sample in data_samples: |
| ann = dict() |
| ann['ID'] = data_sample['img_id'] |
| ann['width'] = data_sample['ori_shape'][1] |
| ann['height'] = data_sample['ori_shape'][0] |
| pred_bboxes = data_sample['pred_instances']['bboxes'].cpu().numpy() |
| pred_scores = data_sample['pred_instances']['scores'].cpu().numpy() |
|
|
| pred_bbox_scores = np.hstack( |
| [pred_bboxes, pred_scores.reshape((-1, 1))]) |
|
|
| self.results.append((ann, pred_bbox_scores)) |
|
|
| def compute_metrics(self, results: list) -> Dict[str, float]: |
| """Compute the metrics from processed results. |
| |
| Args: |
| results (list): The processed results of each batch. |
| |
| Returns: |
| eval_results(Dict[str, float]): The computed metrics. |
| The keys are the names of the metrics, and the values |
| are corresponding results. |
| """ |
| logger: MMLogger = MMLogger.get_current_instance() |
|
|
| tmp_dir = None |
| if self.outfile_prefix is None: |
| tmp_dir = tempfile.TemporaryDirectory() |
| outfile_prefix = osp.join(tmp_dir.name, 'result') |
| else: |
| outfile_prefix = self.outfile_prefix |
|
|
| |
| result_file = self.results2json(results, outfile_prefix) |
| eval_results = OrderedDict() |
| if self.format_only: |
| logger.info(f'results are saved in {osp.dirname(outfile_prefix)}') |
| return eval_results |
|
|
| |
| eval_samples = self.load_eval_samples(result_file) |
|
|
| if 'AP' in self.metrics or 'MR' in self.metrics: |
| score_list = self.compare(eval_samples) |
| gt_num = sum([eval_samples[i].gt_num for i in eval_samples]) |
| ign_num = sum([eval_samples[i].ign_num for i in eval_samples]) |
| gt_num = gt_num - ign_num |
| img_num = len(eval_samples) |
|
|
| for metric in self.metrics: |
| logger.info(f'Evaluating {metric}...') |
| if metric == 'AP': |
| AP = self.eval_ap(score_list, gt_num, img_num) |
| eval_results['mAP'] = float(f'{round(AP, 4)}') |
| if metric == 'MR': |
| MR = self.eval_mr(score_list, gt_num, img_num) |
| eval_results['mMR'] = float(f'{round(MR, 4)}') |
| if metric == 'JI': |
| JI = self.eval_ji(eval_samples) |
| eval_results['JI'] = float(f'{round(JI, 4)}') |
| if tmp_dir is not None: |
| tmp_dir.cleanup() |
|
|
| return eval_results |
|
|
| def load_eval_samples(self, result_file): |
| """Load data from annotations file and detection results. |
| |
| Args: |
| result_file (str): The file path of the saved detection results. |
| |
| Returns: |
| Dict[Image]: The detection result packaged by Image |
| """ |
| gt_str = get_text( |
| self.ann_file, backend_args=self.backend_args).strip().split('\n') |
| gt_records = [json.loads(line) for line in gt_str] |
|
|
| pred_records = load(result_file, backend_args=self.backend_args) |
| eval_samples = dict() |
| for gt_record, pred_record in zip(gt_records, pred_records): |
| assert gt_record['ID'] == pred_record['ID'], \ |
| 'please set val_dataloader.sampler.shuffle=False and try again' |
| eval_samples[pred_record['ID']] = Image(self.eval_mode) |
| eval_samples[pred_record['ID']].load(gt_record, 'box', None, |
| PERSON_CLASSES, True) |
| eval_samples[pred_record['ID']].load(pred_record, 'box', None, |
| PERSON_CLASSES, False) |
| eval_samples[pred_record['ID']].clip_all_boader() |
| return eval_samples |
|
|
| def compare(self, samples): |
| """Match the detection results with the ground_truth. |
| |
| Args: |
| samples (dict[Image]): The detection result packaged by Image. |
| |
| Returns: |
| score_list(list[tuple[ndarray, int, str]]): Matching result. |
| a list of tuples (dtbox, label, imgID) in the descending |
| sort of dtbox.score. |
| """ |
| score_list = list() |
| for id in samples: |
| if self.compare_matching_method == 'VOC': |
| result = samples[id].compare_voc(self.iou_thres) |
| else: |
| result = samples[id].compare_caltech(self.iou_thres) |
| score_list.extend(result) |
| |
| score_list.sort(key=lambda x: x[0][-1], reverse=True) |
| return score_list |
|
|
| @staticmethod |
| def eval_ap(score_list, gt_num, img_num): |
| """Evaluate by average precision. |
| |
| Args: |
| score_list(list[tuple[ndarray, int, str]]): Matching result. |
| a list of tuples (dtbox, label, imgID) in the descending |
| sort of dtbox.score. |
| gt_num(int): The number of gt boxes in the entire dataset. |
| img_num(int): The number of images in the entire dataset. |
| |
| Returns: |
| ap(float): result of average precision. |
| """ |
|
|
| |
| def _calculate_map(_recall, _precision): |
| assert len(_recall) == len(_precision) |
| area = 0 |
| for k in range(1, len(_recall)): |
| delta_h = (_precision[k - 1] + _precision[k]) / 2 |
| delta_w = _recall[k] - _recall[k - 1] |
| area += delta_w * delta_h |
| return area |
|
|
| tp, fp = 0.0, 0.0 |
| rpX, rpY = list(), list() |
|
|
| fpn = [] |
| recalln = [] |
| thr = [] |
| fppi = [] |
| for i, item in enumerate(score_list): |
| if item[1] == 1: |
| tp += 1.0 |
| elif item[1] == 0: |
| fp += 1.0 |
| fn = gt_num - tp |
| recall = tp / (tp + fn) |
| precision = tp / (tp + fp) |
| rpX.append(recall) |
| rpY.append(precision) |
| fpn.append(fp) |
| recalln.append(tp) |
| thr.append(item[0][-1]) |
| fppi.append(fp / img_num) |
|
|
| ap = _calculate_map(rpX, rpY) |
| return ap |
|
|
| def eval_mr(self, score_list, gt_num, img_num): |
| """Evaluate by Caltech-style log-average miss rate. |
| |
| Args: |
| score_list(list[tuple[ndarray, int, str]]): Matching result. |
| a list of tuples (dtbox, label, imgID) in the descending |
| sort of dtbox.score. |
| gt_num(int): The number of gt boxes in the entire dataset. |
| img_num(int): The number of image in the entire dataset. |
| |
| Returns: |
| mr(float): result of miss rate. |
| """ |
|
|
| |
| def _find_gt(lst, target): |
| for idx, _item in enumerate(lst): |
| if _item >= target: |
| return idx |
| return len(lst) - 1 |
|
|
| if self.mr_ref == 'CALTECH_-2': |
| |
| |
| ref = [ |
| 0.0100, 0.0178, 0.03160, 0.0562, 0.1000, 0.1778, 0.3162, |
| 0.5623, 1.000 |
| ] |
| else: |
| |
| |
| ref = [ |
| 0.0001, 0.0003, 0.00100, 0.0032, 0.0100, 0.0316, 0.1000, |
| 0.3162, 1.000 |
| ] |
|
|
| tp, fp = 0.0, 0.0 |
| fppiX, fppiY = list(), list() |
| for i, item in enumerate(score_list): |
| if item[1] == 1: |
| tp += 1.0 |
| elif item[1] == 0: |
| fp += 1.0 |
|
|
| fn = gt_num - tp |
| recall = tp / (tp + fn) |
| missrate = 1.0 - recall |
| fppi = fp / img_num |
| fppiX.append(fppi) |
| fppiY.append(missrate) |
|
|
| score = list() |
| for pos in ref: |
| argmin = _find_gt(fppiX, pos) |
| if argmin >= 0: |
| score.append(fppiY[argmin]) |
| score = np.array(score) |
| mr = np.exp(np.log(score).mean()) |
| return mr |
|
|
| def eval_ji(self, samples): |
| """Evaluate by JI using multi_process. |
| |
| Args: |
| samples(Dict[str, Image]): The detection result packaged by Image. |
| |
| Returns: |
| ji(float): result of jaccard index. |
| """ |
| import math |
| res_line = [] |
| res_ji = [] |
| for i in range(10): |
| score_thr = 1e-1 * i |
| total = len(samples) |
| stride = math.ceil(total / self.num_ji_process) |
| result_queue = Queue(10000) |
| results, procs = [], [] |
| records = list(samples.items()) |
| for i in range(self.num_ji_process): |
| start = i * stride |
| end = np.min([start + stride, total]) |
| sample_data = dict(records[start:end]) |
| p = Process( |
| target=self.compute_ji_with_ignore, |
| args=(result_queue, sample_data, score_thr)) |
| p.start() |
| procs.append(p) |
| for i in range(total): |
| t = result_queue.get() |
| results.append(t) |
| for p in procs: |
| p.join() |
| line, mean_ratio = self.gather(results) |
| line = 'score_thr:{:.1f}, {}'.format(score_thr, line) |
| res_line.append(line) |
| res_ji.append(mean_ratio) |
| return max(res_ji) |
|
|
| def compute_ji_with_ignore(self, result_queue, dt_result, score_thr): |
| """Compute JI with ignore. |
| |
| Args: |
| result_queue(Queue): The Queue for save compute result when |
| multi_process. |
| dt_result(dict[Image]): Detection result packaged by Image. |
| score_thr(float): The threshold of detection score. |
| Returns: |
| dict: compute result. |
| """ |
| for ID, record in dt_result.items(): |
| gt_boxes = record.gt_boxes |
| dt_boxes = record.dt_boxes |
| keep = dt_boxes[:, -1] > score_thr |
| dt_boxes = dt_boxes[keep][:, :-1] |
|
|
| gt_tag = np.array(gt_boxes[:, -1] != -1) |
| matches = self.compute_ji_matching(dt_boxes, gt_boxes[gt_tag, :4]) |
| |
| matched_indices = np.array([j for (j, _) in matches]) |
| unmatched_indices = list( |
| set(np.arange(dt_boxes.shape[0])) - set(matched_indices)) |
| num_ignore_dt = self.get_ignores(dt_boxes[unmatched_indices], |
| gt_boxes[~gt_tag, :4]) |
| matched_indices = np.array([j for (_, j) in matches]) |
| unmatched_indices = list( |
| set(np.arange(gt_boxes[gt_tag].shape[0])) - |
| set(matched_indices)) |
| num_ignore_gt = self.get_ignores( |
| gt_boxes[gt_tag][unmatched_indices], gt_boxes[~gt_tag, :4]) |
| |
| eps = 1e-6 |
| k = len(matches) |
| m = gt_tag.sum() - num_ignore_gt |
| n = dt_boxes.shape[0] - num_ignore_dt |
| ratio = k / (m + n - k + eps) |
| recall = k / (m + eps) |
| cover = k / (n + eps) |
| noise = 1 - cover |
| result_dict = dict( |
| ratio=ratio, |
| recall=recall, |
| cover=cover, |
| noise=noise, |
| k=k, |
| m=m, |
| n=n) |
| result_queue.put_nowait(result_dict) |
|
|
| @staticmethod |
| def gather(results): |
| """Integrate test results.""" |
| assert len(results) |
| img_num = 0 |
| for result in results: |
| if result['n'] != 0 or result['m'] != 0: |
| img_num += 1 |
| mean_ratio = np.sum([rb['ratio'] for rb in results]) / img_num |
| valids = np.sum([rb['k'] for rb in results]) |
| total = np.sum([rb['n'] for rb in results]) |
| gtn = np.sum([rb['m'] for rb in results]) |
| line = 'mean_ratio:{:.4f}, valids:{}, total:{}, gtn:{}'\ |
| .format(mean_ratio, valids, total, gtn) |
| return line, mean_ratio |
|
|
| def compute_ji_matching(self, dt_boxes, gt_boxes): |
| """Match the annotation box for each detection box. |
| |
| Args: |
| dt_boxes(ndarray): Detection boxes. |
| gt_boxes(ndarray): Ground_truth boxes. |
| |
| Returns: |
| matches_(list[tuple[int, int]]): Match result. |
| """ |
| assert dt_boxes.shape[-1] > 3 and gt_boxes.shape[-1] > 3 |
| if dt_boxes.shape[0] < 1 or gt_boxes.shape[0] < 1: |
| return list() |
|
|
| ious = bbox_overlaps(dt_boxes, gt_boxes, mode='iou') |
| input_ = copy.deepcopy(ious) |
| input_[input_ < self.iou_thres] = 0 |
| match_scipy = maximum_bipartite_matching( |
| csr_matrix(input_), perm_type='column') |
| matches_ = [] |
| for i in range(len(match_scipy)): |
| if match_scipy[i] != -1: |
| matches_.append((i, int(match_scipy[i]))) |
| return matches_ |
|
|
| def get_ignores(self, dt_boxes, gt_boxes): |
| """Get the number of ignore bboxes.""" |
| if gt_boxes.size: |
| ioas = bbox_overlaps(dt_boxes, gt_boxes, mode='iof') |
| ioas = np.max(ioas, axis=1) |
| rows = np.where(ioas > self.iou_thres)[0] |
| return len(rows) |
| else: |
| return 0 |
|
|
|
|
| class Image(object): |
| """Data structure for evaluation of CrowdHuman. |
| |
| Note: |
| This implementation is modified from https://github.com/Purkialo/ |
| CrowdDet/blob/master/lib/evaluate/APMRToolkits/image.py |
| |
| Args: |
| mode (int): Select the mode of evaluate. Valid mode include |
| 0(just body box), 1(just head box) and 2(both of them). |
| Defaults to 0. |
| """ |
|
|
| def __init__(self, mode): |
| self.ID = None |
| self.width = None |
| self.height = None |
| self.dt_boxes = None |
| self.gt_boxes = None |
| self.eval_mode = mode |
|
|
| self.ign_num = None |
| self.gt_num = None |
| self.dt_num = None |
|
|
| def load(self, record, body_key, head_key, class_names, gt_flag): |
| """Loading information for evaluation. |
| |
| Args: |
| record (dict): Label information or test results. |
| The format might look something like this: |
| { |
| 'ID': '273271,c9db000d5146c15', |
| 'gtboxes': [ |
| {'fbox': [72, 202, 163, 503], 'tag': 'person', ...}, |
| {'fbox': [199, 180, 144, 499], 'tag': 'person', ...}, |
| ... |
| ] |
| } |
| or: |
| { |
| 'ID': '273271,c9db000d5146c15', |
| 'width': 800, |
| 'height': 1067, |
| 'dtboxes': [ |
| { |
| 'box': [306.22, 205.95, 164.05, 394.04], |
| 'score': 0.99, |
| 'tag': 1 |
| }, |
| { |
| 'box': [403.60, 178.66, 157.15, 421.33], |
| 'score': 0.99, |
| 'tag': 1 |
| }, |
| ... |
| ] |
| } |
| body_key (str, None): key of detection body box. |
| Valid when loading detection results and self.eval_mode!=1. |
| head_key (str, None): key of detection head box. |
| Valid when loading detection results and self.eval_mode!=0. |
| class_names (list[str]):class names of data set. |
| Defaults to ['background', 'person']. |
| gt_flag (bool): Indicate whether record is ground truth |
| or predicting the outcome. |
| """ |
| if 'ID' in record and self.ID is None: |
| self.ID = record['ID'] |
| if 'width' in record and self.width is None: |
| self.width = record['width'] |
| if 'height' in record and self.height is None: |
| self.height = record['height'] |
| if gt_flag: |
| self.gt_num = len(record['gtboxes']) |
| body_bbox, head_bbox = self.load_gt_boxes(record, 'gtboxes', |
| class_names) |
| if self.eval_mode == 0: |
| self.gt_boxes = body_bbox |
| self.ign_num = (body_bbox[:, -1] == -1).sum() |
| elif self.eval_mode == 1: |
| self.gt_boxes = head_bbox |
| self.ign_num = (head_bbox[:, -1] == -1).sum() |
| else: |
| gt_tag = np.array([ |
| body_bbox[i, -1] != -1 and head_bbox[i, -1] != -1 |
| for i in range(len(body_bbox)) |
| ]) |
| self.ign_num = (gt_tag == 0).sum() |
| self.gt_boxes = np.hstack( |
| (body_bbox[:, :-1], head_bbox[:, :-1], |
| gt_tag.reshape(-1, 1))) |
|
|
| if not gt_flag: |
| self.dt_num = len(record['dtboxes']) |
| if self.eval_mode == 0: |
| self.dt_boxes = self.load_det_boxes(record, 'dtboxes', |
| body_key, 'score') |
| elif self.eval_mode == 1: |
| self.dt_boxes = self.load_det_boxes(record, 'dtboxes', |
| head_key, 'score') |
| else: |
| body_dtboxes = self.load_det_boxes(record, 'dtboxes', body_key, |
| 'score') |
| head_dtboxes = self.load_det_boxes(record, 'dtboxes', head_key, |
| 'score') |
| self.dt_boxes = np.hstack((body_dtboxes, head_dtboxes)) |
|
|
| @staticmethod |
| def load_gt_boxes(dict_input, key_name, class_names): |
| """load ground_truth and transform [x, y, w, h] to [x1, y1, x2, y2]""" |
| assert key_name in dict_input |
| if len(dict_input[key_name]) < 1: |
| return np.empty([0, 5]) |
| head_bbox = [] |
| body_bbox = [] |
| for rb in dict_input[key_name]: |
| if rb['tag'] in class_names: |
| body_tag = class_names.index(rb['tag']) |
| head_tag = copy.deepcopy(body_tag) |
| else: |
| body_tag = -1 |
| head_tag = -1 |
| if 'extra' in rb: |
| if 'ignore' in rb['extra']: |
| if rb['extra']['ignore'] != 0: |
| body_tag = -1 |
| head_tag = -1 |
| if 'head_attr' in rb: |
| if 'ignore' in rb['head_attr']: |
| if rb['head_attr']['ignore'] != 0: |
| head_tag = -1 |
| head_bbox.append(np.hstack((rb['hbox'], head_tag))) |
| body_bbox.append(np.hstack((rb['fbox'], body_tag))) |
| head_bbox = np.array(head_bbox) |
| head_bbox[:, 2:4] += head_bbox[:, :2] |
| body_bbox = np.array(body_bbox) |
| body_bbox[:, 2:4] += body_bbox[:, :2] |
| return body_bbox, head_bbox |
|
|
| @staticmethod |
| def load_det_boxes(dict_input, key_name, key_box, key_score, key_tag=None): |
| """load detection boxes.""" |
| assert key_name in dict_input |
| if len(dict_input[key_name]) < 1: |
| return np.empty([0, 5]) |
| else: |
| assert key_box in dict_input[key_name][0] |
| if key_score: |
| assert key_score in dict_input[key_name][0] |
| if key_tag: |
| assert key_tag in dict_input[key_name][0] |
| if key_score: |
| if key_tag: |
| bboxes = np.vstack([ |
| np.hstack((rb[key_box], rb[key_score], rb[key_tag])) |
| for rb in dict_input[key_name] |
| ]) |
| else: |
| bboxes = np.vstack([ |
| np.hstack((rb[key_box], rb[key_score])) |
| for rb in dict_input[key_name] |
| ]) |
| else: |
| if key_tag: |
| bboxes = np.vstack([ |
| np.hstack((rb[key_box], rb[key_tag])) |
| for rb in dict_input[key_name] |
| ]) |
| else: |
| bboxes = np.vstack( |
| [rb[key_box] for rb in dict_input[key_name]]) |
| bboxes[:, 2:4] += bboxes[:, :2] |
| return bboxes |
|
|
| def clip_all_boader(self): |
| """Make sure boxes are within the image range.""" |
|
|
| def _clip_boundary(boxes, height, width): |
| assert boxes.shape[-1] >= 4 |
| boxes[:, 0] = np.minimum(np.maximum(boxes[:, 0], 0), width - 1) |
| boxes[:, 1] = np.minimum(np.maximum(boxes[:, 1], 0), height - 1) |
| boxes[:, 2] = np.maximum(np.minimum(boxes[:, 2], width), 0) |
| boxes[:, 3] = np.maximum(np.minimum(boxes[:, 3], height), 0) |
| return boxes |
|
|
| assert self.dt_boxes.shape[-1] >= 4 |
| assert self.gt_boxes.shape[-1] >= 4 |
| assert self.width is not None and self.height is not None |
| if self.eval_mode == 2: |
| self.dt_boxes[:, :4] = _clip_boundary(self.dt_boxes[:, :4], |
| self.height, self.width) |
| self.gt_boxes[:, :4] = _clip_boundary(self.gt_boxes[:, :4], |
| self.height, self.width) |
| self.dt_boxes[:, 4:8] = _clip_boundary(self.dt_boxes[:, 4:8], |
| self.height, self.width) |
| self.gt_boxes[:, 4:8] = _clip_boundary(self.gt_boxes[:, 4:8], |
| self.height, self.width) |
| else: |
| self.dt_boxes = _clip_boundary(self.dt_boxes, self.height, |
| self.width) |
| self.gt_boxes = _clip_boundary(self.gt_boxes, self.height, |
| self.width) |
|
|
| def compare_voc(self, thres): |
| """Match the detection results with the ground_truth by VOC. |
| |
| Args: |
| thres (float): IOU threshold. |
| |
| Returns: |
| score_list(list[tuple[ndarray, int, str]]): Matching result. |
| a list of tuples (dtbox, label, imgID) in the descending |
| sort of dtbox.score. |
| """ |
| if self.dt_boxes is None: |
| return list() |
| dtboxes = self.dt_boxes |
| gtboxes = self.gt_boxes if self.gt_boxes is not None else list() |
| dtboxes.sort(key=lambda x: x.score, reverse=True) |
| gtboxes.sort(key=lambda x: x.ign) |
|
|
| score_list = list() |
| for i, dt in enumerate(dtboxes): |
| maxpos = -1 |
| maxiou = thres |
|
|
| for j, gt in enumerate(gtboxes): |
| overlap = dt.iou(gt) |
| if overlap > maxiou: |
| maxiou = overlap |
| maxpos = j |
|
|
| if maxpos >= 0: |
| if gtboxes[maxpos].ign == 0: |
| gtboxes[maxpos].matched = 1 |
| dtboxes[i].matched = 1 |
| score_list.append((dt, self.ID)) |
| else: |
| dtboxes[i].matched = -1 |
| else: |
| dtboxes[i].matched = 0 |
| score_list.append((dt, self.ID)) |
| return score_list |
|
|
| def compare_caltech(self, thres): |
| """Match the detection results with the ground_truth by Caltech |
| matching strategy. |
| |
| Args: |
| thres (float): IOU threshold. |
| |
| Returns: |
| score_list(list[tuple[ndarray, int, str]]): Matching result. |
| a list of tuples (dtbox, label, imgID) in the descending |
| sort of dtbox.score. |
| """ |
| if self.dt_boxes is None or self.gt_boxes is None: |
| return list() |
|
|
| dtboxes = self.dt_boxes if self.dt_boxes is not None else list() |
| gtboxes = self.gt_boxes if self.gt_boxes is not None else list() |
| dt_matched = np.zeros(dtboxes.shape[0]) |
| gt_matched = np.zeros(gtboxes.shape[0]) |
|
|
| dtboxes = np.array(sorted(dtboxes, key=lambda x: x[-1], reverse=True)) |
| gtboxes = np.array(sorted(gtboxes, key=lambda x: x[-1], reverse=True)) |
| if len(dtboxes): |
| overlap_iou = bbox_overlaps(dtboxes, gtboxes, mode='iou') |
| overlap_ioa = bbox_overlaps(dtboxes, gtboxes, mode='iof') |
| else: |
| return list() |
|
|
| score_list = list() |
| for i, dt in enumerate(dtboxes): |
| maxpos = -1 |
| maxiou = thres |
| for j, gt in enumerate(gtboxes): |
| if gt_matched[j] == 1: |
| continue |
| if gt[-1] > 0: |
| overlap = overlap_iou[i][j] |
| if overlap > maxiou: |
| maxiou = overlap |
| maxpos = j |
| else: |
| if maxpos >= 0: |
| break |
| else: |
| overlap = overlap_ioa[i][j] |
| if overlap > thres: |
| maxiou = overlap |
| maxpos = j |
| if maxpos >= 0: |
| if gtboxes[maxpos, -1] > 0: |
| gt_matched[maxpos] = 1 |
| dt_matched[i] = 1 |
| score_list.append((dt, 1, self.ID)) |
| else: |
| dt_matched[i] = -1 |
| else: |
| dt_matched[i] = 0 |
| score_list.append((dt, 0, self.ID)) |
| return score_list |
|
|