| | |
| | import contextlib |
| | import io |
| | import itertools |
| | import json |
| | import logging |
| | import numpy as np |
| | import os |
| | import tempfile |
| | from collections import OrderedDict |
| | from typing import Optional |
| | from PIL import Image |
| | from tabulate import tabulate |
| |
|
| | from detectron2.data import MetadataCatalog |
| | from detectron2.utils import comm |
| | from detectron2.utils.file_io import PathManager |
| |
|
| | from .evaluator import DatasetEvaluator |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class COCOPanopticEvaluator(DatasetEvaluator): |
| | """ |
| | Evaluate Panoptic Quality metrics on COCO using PanopticAPI. |
| | It saves panoptic segmentation prediction in `output_dir` |
| | |
| | It contains a synchronize call and has to be called from all workers. |
| | """ |
| |
|
| | def __init__(self, dataset_name: str, output_dir: Optional[str] = None): |
| | """ |
| | Args: |
| | dataset_name: name of the dataset |
| | output_dir: output directory to save results for evaluation. |
| | """ |
| | self._metadata = MetadataCatalog.get(dataset_name) |
| | self._thing_contiguous_id_to_dataset_id = { |
| | v: k for k, v in self._metadata.thing_dataset_id_to_contiguous_id.items() |
| | } |
| | self._stuff_contiguous_id_to_dataset_id = { |
| | v: k for k, v in self._metadata.stuff_dataset_id_to_contiguous_id.items() |
| | } |
| |
|
| | self._output_dir = output_dir |
| | if self._output_dir is not None: |
| | PathManager.mkdirs(self._output_dir) |
| |
|
| | def reset(self): |
| | self._predictions = [] |
| |
|
| | def _convert_category_id(self, segment_info): |
| | isthing = segment_info.pop("isthing", None) |
| | if isthing is None: |
| | |
| | return segment_info |
| | if isthing is True: |
| | segment_info["category_id"] = self._thing_contiguous_id_to_dataset_id[ |
| | segment_info["category_id"] |
| | ] |
| | else: |
| | segment_info["category_id"] = self._stuff_contiguous_id_to_dataset_id[ |
| | segment_info["category_id"] |
| | ] |
| | return segment_info |
| |
|
| | def process(self, inputs, outputs): |
| | from panopticapi.utils import id2rgb |
| |
|
| | for input, output in zip(inputs, outputs): |
| | panoptic_img, segments_info = output["panoptic_seg"] |
| | panoptic_img = panoptic_img.cpu().numpy() |
| | if segments_info is None: |
| | |
| | |
| | |
| | |
| | |
| | label_divisor = self._metadata.label_divisor |
| | segments_info = [] |
| | for panoptic_label in np.unique(panoptic_img): |
| | if panoptic_label == -1: |
| | |
| | continue |
| | pred_class = panoptic_label // label_divisor |
| | isthing = ( |
| | pred_class in self._metadata.thing_dataset_id_to_contiguous_id.values() |
| | ) |
| | segments_info.append( |
| | { |
| | "id": int(panoptic_label) + 1, |
| | "category_id": int(pred_class), |
| | "isthing": bool(isthing), |
| | } |
| | ) |
| | |
| | panoptic_img += 1 |
| |
|
| | file_name = os.path.basename(input["file_name"]) |
| | file_name_png = os.path.splitext(file_name)[0] + ".png" |
| | with io.BytesIO() as out: |
| | Image.fromarray(id2rgb(panoptic_img)).save(out, format="PNG") |
| | segments_info = [self._convert_category_id(x) for x in segments_info] |
| | self._predictions.append( |
| | { |
| | "image_id": input["image_id"], |
| | "file_name": file_name_png, |
| | "png_string": out.getvalue(), |
| | "segments_info": segments_info, |
| | } |
| | ) |
| |
|
| | def evaluate(self): |
| | comm.synchronize() |
| |
|
| | self._predictions = comm.gather(self._predictions) |
| | self._predictions = list(itertools.chain(*self._predictions)) |
| | if not comm.is_main_process(): |
| | return |
| |
|
| | |
| | gt_json = PathManager.get_local_path(self._metadata.panoptic_json) |
| | gt_folder = PathManager.get_local_path(self._metadata.panoptic_root) |
| |
|
| | with tempfile.TemporaryDirectory(prefix="panoptic_eval") as pred_dir: |
| | logger.info("Writing all panoptic predictions to {} ...".format(pred_dir)) |
| | for p in self._predictions: |
| | with open(os.path.join(pred_dir, p["file_name"]), "wb") as f: |
| | f.write(p.pop("png_string")) |
| |
|
| | with open(gt_json, "r") as f: |
| | json_data = json.load(f) |
| | json_data["annotations"] = self._predictions |
| |
|
| | output_dir = self._output_dir or pred_dir |
| | predictions_json = os.path.join(output_dir, "predictions.json") |
| | with PathManager.open(predictions_json, "w") as f: |
| | f.write(json.dumps(json_data)) |
| |
|
| | from panopticapi.evaluation import pq_compute |
| |
|
| | with contextlib.redirect_stdout(io.StringIO()): |
| | pq_res = pq_compute( |
| | gt_json, |
| | PathManager.get_local_path(predictions_json), |
| | gt_folder=gt_folder, |
| | pred_folder=pred_dir, |
| | ) |
| |
|
| | res = {} |
| | res["PQ"] = 100 * pq_res["All"]["pq"] |
| | res["SQ"] = 100 * pq_res["All"]["sq"] |
| | res["RQ"] = 100 * pq_res["All"]["rq"] |
| | res["PQ_th"] = 100 * pq_res["Things"]["pq"] |
| | res["SQ_th"] = 100 * pq_res["Things"]["sq"] |
| | res["RQ_th"] = 100 * pq_res["Things"]["rq"] |
| | res["PQ_st"] = 100 * pq_res["Stuff"]["pq"] |
| | res["SQ_st"] = 100 * pq_res["Stuff"]["sq"] |
| | res["RQ_st"] = 100 * pq_res["Stuff"]["rq"] |
| |
|
| | results = OrderedDict({"panoptic_seg": res}) |
| | _print_panoptic_results(pq_res) |
| |
|
| | return results |
| |
|
| |
|
| | def _print_panoptic_results(pq_res): |
| | headers = ["", "PQ", "SQ", "RQ", "#categories"] |
| | data = [] |
| | for name in ["All", "Things", "Stuff"]: |
| | row = [name] + [pq_res[name][k] * 100 for k in ["pq", "sq", "rq"]] + [pq_res[name]["n"]] |
| | data.append(row) |
| | table = tabulate( |
| | data, headers=headers, tablefmt="pipe", floatfmt=".3f", stralign="center", numalign="center" |
| | ) |
| | logger.info("Panoptic Evaluation Results:\n" + table) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | from detectron2.utils.logger import setup_logger |
| |
|
| | logger = setup_logger() |
| | import argparse |
| |
|
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--gt-json") |
| | parser.add_argument("--gt-dir") |
| | parser.add_argument("--pred-json") |
| | parser.add_argument("--pred-dir") |
| | args = parser.parse_args() |
| |
|
| | from panopticapi.evaluation import pq_compute |
| |
|
| | with contextlib.redirect_stdout(io.StringIO()): |
| | pq_res = pq_compute( |
| | args.gt_json, args.pred_json, gt_folder=args.gt_dir, pred_folder=args.pred_dir |
| | ) |
| | _print_panoptic_results(pq_res) |
| |
|