Spaces:
Running
on
Zero
Running
on
Zero
| # SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # Adapted from: | |
| # https://github.com/ZFTurbo/Weighted-Boxes-Fusion/blob/master/ensemble_boxes/ensemble_boxes_wbf.py | |
| import warnings | |
| from typing import Dict, List, Tuple, Union, Literal | |
| import numpy as np | |
| import numpy.typing as npt | |
| def prefilter_boxes( | |
| boxes: List[npt.NDArray[np.float64]], | |
| scores: List[npt.NDArray[np.float64]], | |
| labels: List[npt.NDArray[np.int_]], | |
| weights: List[float], | |
| thr: float, | |
| class_agnostic: bool = False, | |
| ) -> Dict[Union[str, int], npt.NDArray[np.float64]]: | |
| """ | |
| Reformats and filters boxes. | |
| Output is a dict of boxes to merge separately. | |
| Args: | |
| boxes (list[np array[n x 4]]): List of boxes. One list per model. | |
| scores (list[np array[n]]): List of confidences. | |
| labels (list[np array[n]]): List of labels. | |
| weights (list): Model weights. | |
| thr (float): Confidence threshold | |
| class_agnostic (bool, optional): Merge boxes from different classes. Defaults to False. | |
| Returns: | |
| dict[np array [? x 8]]: Filtered boxes. | |
| """ | |
| # Create dict with boxes stored by its label | |
| new_boxes = dict() | |
| for t in range(len(boxes)): | |
| assert len(boxes[t]) == len(scores[t]), "len(boxes) != len(scores)" | |
| assert len(boxes[t]) == len(labels[t]), "len(boxes) != len(labels)" | |
| for j in range(len(boxes[t])): | |
| score = scores[t][j] | |
| if score < thr: | |
| continue | |
| label = int(labels[t][j]) | |
| box_part = boxes[t][j] | |
| x1 = float(box_part[0]) | |
| y1 = float(box_part[1]) | |
| x2 = float(box_part[2]) | |
| y2 = float(box_part[3]) | |
| # Box data checks | |
| if x2 < x1: | |
| warnings.warn("X2 < X1 value in box. Swap them.") | |
| x1, x2 = x2, x1 | |
| if y2 < y1: | |
| warnings.warn("Y2 < Y1 value in box. Swap them.") | |
| y1, y2 = y2, y1 | |
| array = np.array([x1, x2, y1, y2]) | |
| if array.min() < 0 or array.max() > 1: | |
| warnings.warn("Coordinates outside [0, 1]") | |
| array = np.clip(array, 0, 1) | |
| x1, x2, y1, y2 = array | |
| if (x2 - x1) * (y2 - y1) == 0.0: | |
| warnings.warn("Zero area box skipped: {}.".format(box_part)) | |
| continue | |
| # [label, score, weight, model index, x1, y1, x2, y2] | |
| b = [int(label), float(score) * weights[t], weights[t], t, x1, y1, x2, y2] | |
| label_k = "*" if class_agnostic else label | |
| if label_k not in new_boxes: | |
| new_boxes[label_k] = [] | |
| new_boxes[label_k].append(b) | |
| # Sort each list in dict by score and transform it to numpy array | |
| for k in new_boxes: | |
| current_boxes = np.array(new_boxes[k]) | |
| new_boxes[k] = current_boxes[current_boxes[:, 1].argsort()[::-1]] | |
| return new_boxes | |
| def merge_labels( | |
| labels: npt.NDArray[np.int_], confs: npt.NDArray[np.float64] | |
| ) -> int: | |
| """ | |
| Custom function for merging labels. | |
| If all labels are the same, return the unique value. | |
| Else, return the label of the most confident non-title (class 2) box. | |
| Args: | |
| labels (np array [n]): Labels. | |
| confs (np array [n]): Confidence. | |
| Returns: | |
| int: Label. | |
| """ | |
| if len(np.unique(labels)) == 1: | |
| return labels[0] | |
| else: # Most confident and not a title | |
| confs = confs[confs != 2] | |
| labels = labels[labels != 2] | |
| return labels[np.argmax(confs)] | |
| def get_weighted_box( | |
| boxes: npt.NDArray[np.float64], conf_type: Literal["avg", "max"] = "avg" | |
| ) -> npt.NDArray[np.float64]: | |
| """ | |
| Merges boxes by using the weighted fusion. | |
| Args: | |
| boxes (np array [n x 8]): Boxes to merge. | |
| conf_type (str, optional): Confidence merging type. Defaults to "avg". | |
| Returns: | |
| np array [8]: Merged box. | |
| """ | |
| box = np.zeros(8, dtype=np.float32) | |
| conf = 0 | |
| conf_list = [] | |
| w = 0 | |
| for b in boxes: | |
| box[4:] += b[1] * b[4:] | |
| conf += b[1] | |
| conf_list.append(b[1]) | |
| w += b[2] | |
| box[0] = merge_labels( | |
| np.array([b[0] for b in boxes]), np.array([b[1] for b in boxes]) | |
| ) | |
| box[1] = np.max(conf_list) if conf_type == "max" else np.mean(conf_list) | |
| box[2] = w | |
| box[3] = -1 # model index field is retained for consistency but is not used. | |
| box[4:] /= conf | |
| return box | |
| def get_biggest_box( | |
| boxes: npt.NDArray[np.float64], conf_type: Literal["avg", "max"] = "avg" | |
| ) -> npt.NDArray[np.float64]: | |
| """ | |
| Merges boxes by using the biggest box. | |
| Args: | |
| boxes (np array [n x 8]): Boxes to merge. | |
| conf_type (str, optional): Confidence merging type. Defaults to "avg". | |
| Returns: | |
| np array [8]: Merged box. | |
| """ | |
| box = np.zeros(8, dtype=np.float32) | |
| box[4:] = boxes[0][4:] | |
| conf_list = [] | |
| w = 0 | |
| for b in boxes: | |
| box[4] = min(box[4], b[4]) | |
| box[5] = min(box[5], b[5]) | |
| box[6] = max(box[6], b[6]) | |
| box[7] = max(box[7], b[7]) | |
| conf_list.append(b[1]) | |
| w += b[2] | |
| box[0] = merge_labels( | |
| np.array([b[0] for b in boxes]), np.array([b[1] for b in boxes]) | |
| ) | |
| # print(box[0], np.array([b[0] for b in boxes])) | |
| box[1] = np.max(conf_list) if conf_type == "max" else np.mean(conf_list) | |
| box[2] = w | |
| box[3] = -1 # model index field is retained for consistency but is not used. | |
| return box | |
| def find_matching_box_fast( | |
| boxes_list: npt.NDArray[np.float64], | |
| new_box: npt.NDArray[np.float64], | |
| match_iou: float, | |
| ) -> Tuple[int, float]: | |
| """ | |
| Reimplementation of find_matching_box with numpy instead of loops. | |
| Gives significant speed up for larger arrays (~100x). | |
| This was previously the bottleneck since the function is called for every entry in the array. | |
| Args: | |
| boxes_list (np.ndarray): Array of boxes with shape (N, 8). | |
| new_box (np.ndarray): New box to match with shape (8,). | |
| match_iou (float): IoU threshold for matching. | |
| Returns: | |
| Tuple[int, float]: Index of best matching box (-1 if no match) and IoU value. | |
| """ | |
| def bb_iou_array( | |
| boxes: npt.NDArray[np.float64], new_box: npt.NDArray[np.float64] | |
| ) -> npt.NDArray[np.float64]: | |
| # bb interesection over union | |
| xA = np.maximum(boxes[:, 0], new_box[0]) | |
| yA = np.maximum(boxes[:, 1], new_box[1]) | |
| xB = np.minimum(boxes[:, 2], new_box[2]) | |
| yB = np.minimum(boxes[:, 3], new_box[3]) | |
| interArea = np.maximum(xB - xA, 0) * np.maximum(yB - yA, 0) | |
| # compute the area of both the prediction and ground-truth rectangles | |
| boxAArea = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) | |
| boxBArea = (new_box[2] - new_box[0]) * (new_box[3] - new_box[1]) | |
| iou = interArea / (boxAArea + boxBArea - interArea) | |
| return iou | |
| if boxes_list.shape[0] == 0: | |
| return -1, match_iou | |
| ious = bb_iou_array(boxes_list[:, 4:], new_box[4:]) | |
| # ious[boxes[:, 0] != new_box[0]] = -1 | |
| best_idx = np.argmax(ious) | |
| best_iou = ious[best_idx] | |
| if best_iou <= match_iou: | |
| best_iou = match_iou | |
| best_idx = -1 | |
| return best_idx, best_iou | |
| def weighted_boxes_fusion( | |
| boxes_list: List[npt.NDArray[np.float64]], | |
| labels_list: List[npt.NDArray[np.int_]], | |
| scores_list: List[npt.NDArray[np.float64]], | |
| iou_thr: float = 0.5, | |
| skip_box_thr: float = 0.0, | |
| conf_type: Literal["avg", "max"] = "avg", | |
| merge_type: Literal["weighted", "biggest"] = "weighted", | |
| class_agnostic: bool = False, | |
| ) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.int_]]: | |
| """ | |
| Custom WBF implementation that supports a class_agnostic mode and a biggest box fusion. | |
| Boxes are expected to be in normalized (x0, y0, x1, y1) format. | |
| Args: | |
| boxes_list (list[np.ndarray[n x 4]]): List of boxes. One list per model. | |
| labels_list (list[np.ndarray[n]]): List of labels. | |
| scores_list (list[np.ndarray[n]]): List of confidences. | |
| iou_thr (float, optional): IoU threshold for matching. Defaults to 0.55. | |
| skip_box_thr (float, optional): Exclude boxes with score < skip_box_thr. Defaults to 0.0. | |
| conf_type (str, optional): Confidence merging type ("avg" or "max"). Defaults to "avg". | |
| merge_type (str, optional): Merge type ("weighted" or "biggest"). Defaults to "weighted". | |
| class_agnostic (bool, optional): Merge boxes from different classes. Defaults to False. | |
| Returns: | |
| numpy.ndarray [N x 4]: Array of bounding boxes. | |
| numpy.ndarray [N]: Array of labels. | |
| numpy.ndarray [N]: Array of scores. | |
| """ | |
| weights = np.ones(len(boxes_list)) | |
| assert conf_type in ["avg", "max"], 'Conf type must be "avg" or "max"' | |
| assert merge_type in ["weighted", "biggest"], 'Conf type must be "weighted" or "biggest"' | |
| filtered_boxes = prefilter_boxes( | |
| boxes_list, | |
| scores_list, | |
| labels_list, | |
| weights, | |
| skip_box_thr, | |
| class_agnostic=class_agnostic, | |
| ) | |
| if len(filtered_boxes) == 0: | |
| return np.zeros((0, 4)), np.zeros((0,)), np.zeros((0,)) | |
| overall_boxes = [] | |
| for label in filtered_boxes: | |
| boxes = filtered_boxes[label] | |
| clusters = [] | |
| # Clusterize boxes | |
| for j in range(len(boxes)): | |
| ids = [i for i in range(len(boxes)) if i != j] | |
| index, best_iou = find_matching_box_fast(boxes[ids], boxes[j], iou_thr) | |
| if index != -1: | |
| index = ids[index] | |
| cluster_idx = [ | |
| clust_idx | |
| for clust_idx, clust in enumerate(clusters) | |
| if (j in clust or index in clust) | |
| ] | |
| if len(cluster_idx): | |
| cluster_idx = cluster_idx[0] | |
| clusters[cluster_idx] = list( | |
| set(clusters[cluster_idx] + [index, j]) | |
| ) | |
| else: | |
| clusters.append([index, j]) | |
| else: | |
| clusters.append([j]) | |
| for j, c in enumerate(clusters): | |
| if merge_type == "weighted": | |
| weighted_box = get_weighted_box(boxes[c], conf_type) | |
| elif merge_type == "biggest": | |
| weighted_box = get_biggest_box(boxes[c], conf_type) | |
| if conf_type == "max": | |
| weighted_box[1] = weighted_box[1] / weights.max() | |
| else: # avg | |
| weighted_box[1] = weighted_box[1] * len(c) / weights.sum() | |
| overall_boxes.append(weighted_box) | |
| overall_boxes = np.array(overall_boxes) | |
| overall_boxes = overall_boxes[overall_boxes[:, 1].argsort()[::-1]] | |
| boxes = overall_boxes[:, 4:] | |
| scores = overall_boxes[:, 1] | |
| labels = overall_boxes[:, 0] | |
| return boxes, labels, scores | |