| | |
| |
|
| | """ |
| | COCO prediction dumper for distributed training. |
| | |
| | Handles collection and dumping of COCO-format predictions from models. |
| | Supports distributed processing with multiple GPUs/processes. |
| | """ |
| |
|
| | import copy |
| | import gc |
| | import heapq |
| | import json |
| | import logging |
| | import os |
| | from collections import defaultdict |
| | from pathlib import Path |
| | from typing import Any, Optional |
| |
|
| | import pycocotools.mask as mask_utils |
| | import torch |
| | from iopath.common.file_io import g_pathmgr |
| | from sam3.eval.coco_eval_offline import convert_to_xywh |
| | from sam3.train.masks_ops import rle_encode |
| | from sam3.train.utils.distributed import ( |
| | all_gather, |
| | gather_to_rank_0_via_filesys, |
| | get_rank, |
| | is_main_process, |
| | ) |
| |
|
| |
|
| | |
| |
|
| |
|
| | class HeapElement: |
| | """Utility class to make a heap with a custom comparator based on score.""" |
| |
|
| | def __init__(self, val): |
| | self.val = val |
| |
|
| | def __lt__(self, other): |
| | return self.val["score"] < other.val["score"] |
| |
|
| |
|
| | class PredictionDumper: |
| | """ |
| | Handles collection and dumping of COCO-format predictions from a model. |
| | |
| | This class processes model outputs through a postprocessor, converts them to COCO format, |
| | and saves them to disk. It supports distributed processing with multiple GPUs/processes. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | dump_dir: str, |
| | postprocessor, |
| | maxdets: int, |
| | iou_type: str, |
| | gather_pred_via_filesys: bool = False, |
| | merge_predictions: bool = False, |
| | pred_file_evaluators: Optional[Any] = None, |
| | ): |
| | """ |
| | Initialize the PredictionDumper. |
| | |
| | Args: |
| | dump_dir: Directory to dump predictions. |
| | postprocessor: Module to convert the model's output into COCO format. |
| | maxdets: Maximum number of detections per image. |
| | iou_type: IoU type to evaluate. Can include "bbox", "segm" |
| | gather_pred_via_filesys: If True, use the filesystem for collective gathers across |
| | processes (requires a shared filesystem). Otherwise, use torch collective ops. |
| | merge_predictions: If True, merge predictions from all processes and dump to a single file. |
| | """ |
| | self.iou_type = iou_type |
| | self.maxdets = maxdets |
| | self.dump_dir = dump_dir |
| | self.postprocessor = postprocessor |
| | self.gather_pred_via_filesys = gather_pred_via_filesys |
| | self.merge_predictions = merge_predictions |
| | self.pred_file_evaluators = pred_file_evaluators |
| | if self.pred_file_evaluators is not None: |
| | assert ( |
| | merge_predictions |
| | ), "merge_predictions must be True if pred_file_evaluators are provided" |
| | assert self.dump_dir is not None, "dump_dir must be provided" |
| |
|
| | if is_main_process(): |
| | os.makedirs(self.dump_dir, exist_ok=True) |
| | logging.info(f"Created prediction dump directory: {self.dump_dir}") |
| |
|
| | |
| | self.reset() |
| |
|
| | def update(self, *args, **kwargs): |
| | """ |
| | Process and accumulate predictions from model outputs. |
| | |
| | Args: |
| | *args, **kwargs: Arguments passed to postprocessor.process_results() |
| | """ |
| | predictions = self.postprocessor.process_results(*args, **kwargs) |
| | results = self.prepare(predictions, self.iou_type) |
| | self._dump(results) |
| |
|
| | def _dump(self, results): |
| | """ |
| | Add results to the dump list with precision rounding. |
| | |
| | Args: |
| | results: List of prediction dictionaries in COCO format. |
| | """ |
| | dumped_results = copy.deepcopy(results) |
| | for r in dumped_results: |
| | if "bbox" in r: |
| | r["bbox"] = [round(coord, 5) for coord in r["bbox"]] |
| | r["score"] = round(r["score"], 5) |
| | self.dump.extend(dumped_results) |
| |
|
| | def synchronize_between_processes(self): |
| | """ |
| | Synchronize predictions across all processes and save to disk. |
| | |
| | If gather_pred_via_filesys is True, uses filesystem for gathering. |
| | Otherwise, uses torch distributed collective operations. |
| | Saves per-rank predictions to separate JSON files. |
| | """ |
| | logging.info("Prediction Dumper: Synchronizing between processes") |
| |
|
| | if not self.merge_predictions: |
| | dumped_file = ( |
| | Path(self.dump_dir) |
| | / f"coco_predictions_{self.iou_type}_{get_rank()}.json" |
| | ) |
| | logging.info( |
| | f"Prediction Dumper: Dumping local predictions to {dumped_file}" |
| | ) |
| | with g_pathmgr.open(str(dumped_file), "w") as f: |
| | json.dump(self.dump, f) |
| | else: |
| | self.dump = self.gather_and_merge_predictions() |
| | dumped_file = Path(self.dump_dir) / f"coco_predictions_{self.iou_type}.json" |
| | if is_main_process(): |
| | logging.info( |
| | f"Prediction Dumper: Dumping merged predictions to {dumped_file}" |
| | ) |
| | with g_pathmgr.open(str(dumped_file), "w") as f: |
| | json.dump(self.dump, f) |
| |
|
| | self.reset() |
| | return dumped_file |
| |
|
| | def gather_and_merge_predictions(self): |
| | """ |
| | Gather predictions from all processes and merge them, keeping top predictions per image. |
| | |
| | This method collects predictions from all processes, then keeps only the top maxdets |
| | predictions per image based on score. It also deduplicates predictions by (image_id, category_id). |
| | |
| | Returns: |
| | List of merged prediction dictionaries. |
| | """ |
| | logging.info("Prediction Dumper: Gathering predictions from all processes") |
| | gc.collect() |
| |
|
| | if self.gather_pred_via_filesys: |
| | dump = gather_to_rank_0_via_filesys(self.dump) |
| | else: |
| | dump = all_gather(self.dump, force_cpu=True) |
| |
|
| | |
| | preds_by_image = defaultdict(list) |
| | seen_img_cat = set() |
| |
|
| | for cur_dump in dump: |
| | cur_seen_img_cat = set() |
| | for p in cur_dump: |
| | image_id = p["image_id"] |
| | cat_id = p["category_id"] |
| |
|
| | |
| | if (image_id, cat_id) in seen_img_cat: |
| | continue |
| |
|
| | cur_seen_img_cat.add((image_id, cat_id)) |
| |
|
| | |
| | if len(preds_by_image[image_id]) < self.maxdets: |
| | heapq.heappush(preds_by_image[image_id], HeapElement(p)) |
| | else: |
| | heapq.heappushpop(preds_by_image[image_id], HeapElement(p)) |
| |
|
| | seen_img_cat.update(cur_seen_img_cat) |
| |
|
| | |
| | merged_dump = sum( |
| | [[h.val for h in cur_preds] for cur_preds in preds_by_image.values()], [] |
| | ) |
| |
|
| | return merged_dump |
| |
|
| | def compute_synced(self): |
| | """ |
| | Synchronize predictions across processes and compute summary. |
| | |
| | Returns: |
| | Summary dictionary from summarize(). |
| | """ |
| | dumped_file = self.synchronize_between_processes() |
| | if not is_main_process(): |
| | return {"": 0.0} |
| |
|
| | meters = {} |
| | if self.pred_file_evaluators is not None: |
| | for evaluator in self.pred_file_evaluators: |
| | results = evaluator.evaluate(dumped_file) |
| | meters.update(results) |
| |
|
| | if len(meters) == 0: |
| | meters = {"": 0.0} |
| | return meters |
| |
|
| | def compute(self): |
| | """ |
| | Compute without synchronization. |
| | |
| | Returns: |
| | Empty metric dictionary. |
| | """ |
| | return {"": 0.0} |
| |
|
| | def reset(self): |
| | """Reset internal state for a new evaluation round.""" |
| | self.dump = [] |
| |
|
| | def prepare(self, predictions, iou_type): |
| | """ |
| | Route predictions to the appropriate preparation method based on iou_type. |
| | |
| | Args: |
| | predictions: Dictionary mapping image IDs to prediction dictionaries. |
| | iou_type: Type of evaluation ("bbox", "segm"). |
| | |
| | Returns: |
| | List of COCO-format prediction dictionaries. |
| | """ |
| | if iou_type == "bbox": |
| | return self.prepare_for_coco_detection(predictions) |
| | elif iou_type == "segm": |
| | return self.prepare_for_coco_segmentation(predictions) |
| | else: |
| | raise ValueError(f"Unknown iou type: {iou_type}") |
| |
|
| | def prepare_for_coco_detection(self, predictions): |
| | """ |
| | Convert predictions to COCO detection format. |
| | |
| | Args: |
| | predictions: Dictionary mapping image IDs to prediction dictionaries |
| | containing "boxes", "scores", and "labels". |
| | |
| | Returns: |
| | List of COCO-format detection dictionaries. |
| | """ |
| | coco_results = [] |
| | for original_id, prediction in predictions.items(): |
| | if len(prediction) == 0: |
| | continue |
| |
|
| | boxes = prediction["boxes"] |
| | boxes = convert_to_xywh(boxes).tolist() |
| | scores = prediction["scores"].tolist() |
| | labels = prediction["labels"].tolist() |
| |
|
| | coco_results.extend( |
| | [ |
| | { |
| | "image_id": original_id, |
| | "category_id": labels[k], |
| | "bbox": box, |
| | "score": scores[k], |
| | } |
| | for k, box in enumerate(boxes) |
| | ] |
| | ) |
| | return coco_results |
| |
|
| | @torch.no_grad() |
| | def prepare_for_coco_segmentation(self, predictions): |
| | """ |
| | Convert predictions to COCO segmentation format. |
| | |
| | Args: |
| | predictions: Dictionary mapping image IDs to prediction dictionaries |
| | containing "masks" or "masks_rle", "scores", and "labels". |
| | Optionally includes "boundaries" and "dilated_boundaries". |
| | |
| | Returns: |
| | List of COCO-format segmentation dictionaries with RLE-encoded masks. |
| | """ |
| | coco_results = [] |
| | for original_id, prediction in predictions.items(): |
| | if len(prediction) == 0: |
| | continue |
| |
|
| | scores = prediction["scores"].tolist() |
| | labels = prediction["labels"].tolist() |
| |
|
| | boxes = None |
| | if "boxes" in prediction: |
| | boxes = prediction["boxes"] |
| | boxes = convert_to_xywh(boxes).tolist() |
| | assert len(boxes) == len(scores) |
| |
|
| | if "masks_rle" in prediction: |
| | rles = prediction["masks_rle"] |
| | areas = [] |
| | for rle in rles: |
| | cur_area = mask_utils.area(rle) |
| | h, w = rle["size"] |
| | areas.append(cur_area / (h * w)) |
| | else: |
| | masks = prediction["masks"] |
| | masks = masks > 0.5 |
| | h, w = masks.shape[-2:] |
| |
|
| | areas = masks.flatten(1).sum(1) / (h * w) |
| | areas = areas.tolist() |
| |
|
| | rles = rle_encode(masks.squeeze(1)) |
| |
|
| | |
| | del masks |
| | del prediction["masks"] |
| |
|
| | assert len(areas) == len(rles) == len(scores) |
| |
|
| | for k, rle in enumerate(rles): |
| | payload = { |
| | "image_id": original_id, |
| | "category_id": labels[k], |
| | "segmentation": rle, |
| | "score": scores[k], |
| | "area": areas[k], |
| | } |
| | if boxes is not None: |
| | payload["bbox"] = boxes[k] |
| |
|
| | coco_results.append(payload) |
| |
|
| | return coco_results |
| |
|