| |
| |
|
|
| import contextlib |
| import copy |
| import io |
| import itertools |
| import logging |
| import numpy as np |
| import os |
| from collections import OrderedDict |
| from typing import Dict, Iterable, List, Optional |
| import pycocotools.mask as mask_utils |
| import torch |
| from pycocotools.coco import COCO |
| from tabulate import tabulate |
|
|
| from detectron2.config import CfgNode |
| from detectron2.data import MetadataCatalog |
| from detectron2.evaluation import DatasetEvaluator |
| from detectron2.structures import BoxMode |
| from detectron2.utils.comm import gather, get_rank, is_main_process, synchronize |
| from detectron2.utils.file_io import PathManager |
| from detectron2.utils.logger import create_small_table |
|
|
| from densepose.converters import ToChartResultConverter, ToMaskConverter |
| from densepose.data.datasets.coco import maybe_filter_and_map_categories_cocoapi |
| from densepose.structures import ( |
| DensePoseChartPredictorOutput, |
| DensePoseEmbeddingPredictorOutput, |
| quantize_densepose_chart_result, |
| ) |
|
|
| from .densepose_coco_evaluation import DensePoseCocoEval, DensePoseEvalMode |
| from .mesh_alignment_evaluator import MeshAlignmentEvaluator |
| from .tensor_storage import ( |
| SingleProcessFileTensorStorage, |
| SingleProcessRamTensorStorage, |
| SingleProcessTensorStorage, |
| SizeData, |
| storage_gather, |
| ) |
|
|
|
|
| class DensePoseCOCOEvaluator(DatasetEvaluator): |
| def __init__( |
| self, |
| dataset_name, |
| distributed, |
| output_dir=None, |
| evaluator_type: str = "iuv", |
| min_iou_threshold: float = 0.5, |
| storage: Optional[SingleProcessTensorStorage] = None, |
| embedder=None, |
| should_evaluate_mesh_alignment: bool = False, |
| mesh_alignment_mesh_names: Optional[List[str]] = None, |
| ): |
| self._embedder = embedder |
| self._distributed = distributed |
| self._output_dir = output_dir |
| self._evaluator_type = evaluator_type |
| self._storage = storage |
| self._should_evaluate_mesh_alignment = should_evaluate_mesh_alignment |
|
|
| assert not ( |
| should_evaluate_mesh_alignment and embedder is None |
| ), "Mesh alignment evaluation is activated, but no vertex embedder provided!" |
| if should_evaluate_mesh_alignment: |
| self._mesh_alignment_evaluator = MeshAlignmentEvaluator( |
| embedder, |
| mesh_alignment_mesh_names, |
| ) |
|
|
| self._cpu_device = torch.device("cpu") |
| self._logger = logging.getLogger(__name__) |
|
|
| self._metadata = MetadataCatalog.get(dataset_name) |
| self._min_threshold = min_iou_threshold |
| json_file = PathManager.get_local_path(self._metadata.json_file) |
| with contextlib.redirect_stdout(io.StringIO()): |
| self._coco_api = COCO(json_file) |
| maybe_filter_and_map_categories_cocoapi(dataset_name, self._coco_api) |
|
|
| def reset(self): |
| self._predictions = [] |
|
|
| def process(self, inputs, outputs): |
| """ |
| Args: |
| inputs: the inputs to a COCO model (e.g., GeneralizedRCNN). |
| It is a list of dict. Each dict corresponds to an image and |
| contains keys like "height", "width", "file_name", "image_id". |
| outputs: the outputs of a COCO model. It is a list of dicts with key |
| "instances" that contains :class:`Instances`. |
| The :class:`Instances` object needs to have `densepose` field. |
| """ |
| for input, output in zip(inputs, outputs): |
| instances = output["instances"].to(self._cpu_device) |
| if not instances.has("pred_densepose"): |
| continue |
| prediction_list = prediction_to_dict( |
| instances, |
| input["image_id"], |
| self._embedder, |
| self._metadata.class_to_mesh_name, |
| self._storage is not None, |
| ) |
| if self._storage is not None: |
| for prediction_dict in prediction_list: |
| dict_to_store = {} |
| for field_name in self._storage.data_schema: |
| dict_to_store[field_name] = prediction_dict[field_name] |
| record_id = self._storage.put(dict_to_store) |
| prediction_dict["record_id"] = record_id |
| prediction_dict["rank"] = get_rank() |
| for field_name in self._storage.data_schema: |
| del prediction_dict[field_name] |
| self._predictions.extend(prediction_list) |
|
|
| def evaluate(self, img_ids=None): |
| if self._distributed: |
| synchronize() |
| predictions = gather(self._predictions) |
| predictions = list(itertools.chain(*predictions)) |
| else: |
| predictions = self._predictions |
|
|
| multi_storage = storage_gather(self._storage) if self._storage is not None else None |
|
|
| if not is_main_process(): |
| return |
| return copy.deepcopy(self._eval_predictions(predictions, multi_storage, img_ids)) |
|
|
| def _eval_predictions(self, predictions, multi_storage=None, img_ids=None): |
| """ |
| Evaluate predictions on densepose. |
| Return results with the metrics of the tasks. |
| """ |
| self._logger.info("Preparing results for COCO format ...") |
|
|
| if self._output_dir: |
| PathManager.mkdirs(self._output_dir) |
| file_path = os.path.join(self._output_dir, "coco_densepose_predictions.pth") |
| with PathManager.open(file_path, "wb") as f: |
| torch.save(predictions, f) |
|
|
| self._logger.info("Evaluating predictions ...") |
| res = OrderedDict() |
| results_gps, results_gpsm, results_segm = _evaluate_predictions_on_coco( |
| self._coco_api, |
| predictions, |
| multi_storage, |
| self._embedder, |
| class_names=self._metadata.get("thing_classes"), |
| min_threshold=self._min_threshold, |
| img_ids=img_ids, |
| ) |
| res["densepose_gps"] = results_gps |
| res["densepose_gpsm"] = results_gpsm |
| res["densepose_segm"] = results_segm |
| if self._should_evaluate_mesh_alignment: |
| res["densepose_mesh_alignment"] = self._evaluate_mesh_alignment() |
| return res |
|
|
| def _evaluate_mesh_alignment(self): |
| self._logger.info("Mesh alignment evaluation ...") |
| mean_ge, mean_gps, per_mesh_metrics = self._mesh_alignment_evaluator.evaluate() |
| results = { |
| "GE": mean_ge * 100, |
| "GPS": mean_gps * 100, |
| } |
| mesh_names = set() |
| for metric_name in per_mesh_metrics: |
| for mesh_name, value in per_mesh_metrics[metric_name].items(): |
| results[f"{metric_name}-{mesh_name}"] = value * 100 |
| mesh_names.add(mesh_name) |
| self._print_mesh_alignment_results(results, mesh_names) |
| return results |
|
|
| def _print_mesh_alignment_results(self, results: Dict[str, float], mesh_names: Iterable[str]): |
| self._logger.info("Evaluation results for densepose, mesh alignment:") |
| self._logger.info(f'| {"Mesh":13s} | {"GErr":7s} | {"GPS":7s} |') |
| self._logger.info("| :-----------: | :-----: | :-----: |") |
| for mesh_name in mesh_names: |
| ge_key = f"GE-{mesh_name}" |
| ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " " |
| gps_key = f"GPS-{mesh_name}" |
| gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " " |
| self._logger.info(f"| {mesh_name:13s} | {ge_str:7s} | {gps_str:7s} |") |
| self._logger.info("| :-------------------------------: |") |
| ge_key = "GE" |
| ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " " |
| gps_key = "GPS" |
| gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " " |
| self._logger.info(f'| {"MEAN":13s} | {ge_str:7s} | {gps_str:7s} |') |
|
|
|
|
| def prediction_to_dict(instances, img_id, embedder, class_to_mesh_name, use_storage): |
| """ |
| Args: |
| instances (Instances): the output of the model |
| img_id (str): the image id in COCO |
| |
| Returns: |
| list[dict]: the results in densepose evaluation format |
| """ |
| scores = instances.scores.tolist() |
| classes = instances.pred_classes.tolist() |
| raw_boxes_xywh = BoxMode.convert( |
| instances.pred_boxes.tensor.clone(), BoxMode.XYXY_ABS, BoxMode.XYWH_ABS |
| ) |
|
|
| if isinstance(instances.pred_densepose, DensePoseEmbeddingPredictorOutput): |
| results_densepose = densepose_cse_predictions_to_dict( |
| instances, embedder, class_to_mesh_name, use_storage |
| ) |
| elif isinstance(instances.pred_densepose, DensePoseChartPredictorOutput): |
| if not use_storage: |
| results_densepose = densepose_chart_predictions_to_dict(instances) |
| else: |
| results_densepose = densepose_chart_predictions_to_storage_dict(instances) |
|
|
| results = [] |
| for k in range(len(instances)): |
| result = { |
| "image_id": img_id, |
| "category_id": classes[k], |
| "bbox": raw_boxes_xywh[k].tolist(), |
| "score": scores[k], |
| } |
| results.append({**result, **results_densepose[k]}) |
| return results |
|
|
|
|
| def densepose_chart_predictions_to_dict(instances): |
| segmentations = ToMaskConverter.convert( |
| instances.pred_densepose, instances.pred_boxes, instances.image_size |
| ) |
|
|
| results = [] |
| for k in range(len(instances)): |
| densepose_results_quantized = quantize_densepose_chart_result( |
| ToChartResultConverter.convert(instances.pred_densepose[k], instances.pred_boxes[k]) |
| ) |
| densepose_results_quantized.labels_uv_uint8 = ( |
| densepose_results_quantized.labels_uv_uint8.cpu() |
| ) |
| segmentation = segmentations.tensor[k] |
| segmentation_encoded = mask_utils.encode( |
| np.require(segmentation.numpy(), dtype=np.uint8, requirements=["F"]) |
| ) |
| segmentation_encoded["counts"] = segmentation_encoded["counts"].decode("utf-8") |
| result = { |
| "densepose": densepose_results_quantized, |
| "segmentation": segmentation_encoded, |
| } |
| results.append(result) |
| return results |
|
|
|
|
| def densepose_chart_predictions_to_storage_dict(instances): |
| results = [] |
| for k in range(len(instances)): |
| densepose_predictor_output = instances.pred_densepose[k] |
| result = { |
| "coarse_segm": densepose_predictor_output.coarse_segm.squeeze(0).cpu(), |
| "fine_segm": densepose_predictor_output.fine_segm.squeeze(0).cpu(), |
| "u": densepose_predictor_output.u.squeeze(0).cpu(), |
| "v": densepose_predictor_output.v.squeeze(0).cpu(), |
| } |
| results.append(result) |
| return results |
|
|
|
|
| def densepose_cse_predictions_to_dict(instances, embedder, class_to_mesh_name, use_storage): |
| results = [] |
| for k in range(len(instances)): |
| cse = instances.pred_densepose[k] |
| results.append( |
| { |
| "coarse_segm": cse.coarse_segm[0].cpu(), |
| "embedding": cse.embedding[0].cpu(), |
| } |
| ) |
| return results |
|
|
|
|
| def _evaluate_predictions_on_coco( |
| coco_gt, |
| coco_results, |
| multi_storage=None, |
| embedder=None, |
| class_names=None, |
| min_threshold: float = 0.5, |
| img_ids=None, |
| ): |
| logger = logging.getLogger(__name__) |
|
|
| densepose_metrics = _get_densepose_metrics(min_threshold) |
| if len(coco_results) == 0: |
| logger.warn("No predictions from the model! Set scores to -1") |
| results_gps = {metric: -1 for metric in densepose_metrics} |
| results_gpsm = {metric: -1 for metric in densepose_metrics} |
| results_segm = {metric: -1 for metric in densepose_metrics} |
| return results_gps, results_gpsm, results_segm |
|
|
| coco_dt = coco_gt.loadRes(coco_results) |
|
|
| results = [] |
| for eval_mode_name in ["GPS", "GPSM", "IOU"]: |
| eval_mode = getattr(DensePoseEvalMode, eval_mode_name) |
| coco_eval = DensePoseCocoEval( |
| coco_gt, coco_dt, "densepose", multi_storage, embedder, dpEvalMode=eval_mode |
| ) |
| result = _derive_results_from_coco_eval( |
| coco_eval, eval_mode_name, densepose_metrics, class_names, min_threshold, img_ids |
| ) |
| results.append(result) |
| return results |
|
|
|
|
| def _get_densepose_metrics(min_threshold: float = 0.5): |
| metrics = ["AP"] |
| if min_threshold <= 0.201: |
| metrics += ["AP20"] |
| if min_threshold <= 0.301: |
| metrics += ["AP30"] |
| if min_threshold <= 0.401: |
| metrics += ["AP40"] |
| metrics.extend(["AP50", "AP75", "APm", "APl", "AR", "AR50", "AR75", "ARm", "ARl"]) |
| return metrics |
|
|
|
|
| def _derive_results_from_coco_eval( |
| coco_eval, eval_mode_name, metrics, class_names, min_threshold: float, img_ids |
| ): |
| if img_ids is not None: |
| coco_eval.params.imgIds = img_ids |
| coco_eval.params.iouThrs = np.linspace( |
| min_threshold, 0.95, int(np.round((0.95 - min_threshold) / 0.05)) + 1, endpoint=True |
| ) |
| coco_eval.evaluate() |
| coco_eval.accumulate() |
| coco_eval.summarize() |
| results = {metric: float(coco_eval.stats[idx] * 100) for idx, metric in enumerate(metrics)} |
| logger = logging.getLogger(__name__) |
| logger.info( |
| f"Evaluation results for densepose, {eval_mode_name} metric: \n" |
| + create_small_table(results) |
| ) |
| if class_names is None or len(class_names) <= 1: |
| return results |
|
|
| |
| |
| precisions = coco_eval.eval["precision"] |
| |
| assert len(class_names) == precisions.shape[2] |
|
|
| results_per_category = [] |
| for idx, name in enumerate(class_names): |
| |
| |
| precision = precisions[:, :, idx, 0, -1] |
| precision = precision[precision > -1] |
| ap = np.mean(precision) if precision.size else float("nan") |
| results_per_category.append((f"{name}", float(ap * 100))) |
|
|
| |
| n_cols = min(6, len(results_per_category) * 2) |
| results_flatten = list(itertools.chain(*results_per_category)) |
| results_2d = itertools.zip_longest(*[results_flatten[i::n_cols] for i in range(n_cols)]) |
| table = tabulate( |
| results_2d, |
| tablefmt="pipe", |
| floatfmt=".3f", |
| headers=["category", "AP"] * (n_cols // 2), |
| numalign="left", |
| ) |
| logger.info(f"Per-category {eval_mode_name} AP: \n" + table) |
|
|
| results.update({"AP-" + name: ap for name, ap in results_per_category}) |
| return results |
|
|
|
|
| def build_densepose_evaluator_storage(cfg: CfgNode, output_folder: str): |
| storage_spec = cfg.DENSEPOSE_EVALUATION.STORAGE |
| if storage_spec == "none": |
| return None |
| evaluator_type = cfg.DENSEPOSE_EVALUATION.TYPE |
| |
| hout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE |
| wout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE |
| n_csc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_COARSE_SEGM_CHANNELS |
| |
| if evaluator_type == "iuv": |
| n_fsc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_PATCHES + 1 |
| schema = { |
| "coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)), |
| "fine_segm": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), |
| "u": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), |
| "v": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), |
| } |
| elif evaluator_type == "cse": |
| embed_size = cfg.MODEL.ROI_DENSEPOSE_HEAD.CSE.EMBED_SIZE |
| schema = { |
| "coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)), |
| "embedding": SizeData(dtype="float32", shape=(embed_size, hout, wout)), |
| } |
| else: |
| raise ValueError(f"Unknown evaluator type: {evaluator_type}") |
| |
| if storage_spec == "ram": |
| storage = SingleProcessRamTensorStorage(schema, io.BytesIO()) |
| elif storage_spec == "file": |
| fpath = os.path.join(output_folder, f"DensePoseEvaluatorStorage.{get_rank()}.bin") |
| PathManager.mkdirs(output_folder) |
| storage = SingleProcessFileTensorStorage(schema, fpath, "wb") |
| else: |
| raise ValueError(f"Unknown storage specification: {storage_spec}") |
| return storage |
|
|