| """Helpers for metric functions""" |
| import numpy as np |
| import pandas as pd |
|
|
|
|
| def calculate_iou(box1, box2): |
| """ |
| Calculate Intersection over Union (IoU) between two bounding boxes. |
| |
| Args: |
| box1 (tuple): Coordinates of the first bounding box in the format (x1, y1, x2, y2). |
| box2 (tuple): Coordinates of the second bounding box in the format (x1, y1, x2, y2). |
| |
| Returns: |
| float: Intersection over Union (IoU) score. |
| """ |
| |
| x1, y1, x2, y2 = box1 |
| x1_, y1_, x2_, y2_ = box2 |
|
|
| |
| intersection_area = max(0, min(x2, x2_) - max(x1, x1_)) * max(0, min(y2, y2_) - max(y1, y1_)) |
|
|
| |
| box1_area = (x2 - x1) * (y2 - y1) |
| box2_area = (x2_ - x1_) * (y2_ - y1_) |
|
|
| |
| iou = intersection_area / float(box1_area + box2_area - intersection_area) |
|
|
| return iou |
|
|
|
|
| def compute_intersection_1d(x, y): |
| |
| x1, x2 = sorted(x) |
| y1, y2 = sorted(y) |
| |
| |
| intersection = max(0, min(x2, y2) - max(x1, y1)) |
| |
| return intersection |
|
|
| def compute_union_1d(x, y): |
| |
| x1, x2 = sorted(x) |
| y1, y2 = sorted(y) |
| |
| |
| union = max(x2, y2) - min(x1, y1) |
| |
| return union |
|
|
|
|
| def compute_iou_1d(pred_box, true_box): |
| """ |
| Compute IoU for 1D boxes. |
| |
| Args: |
| pred_box (float): Predicted box, [x1, x2] |
| true_box (float): Ground truth box, [x1, x2] |
| |
| Returns: |
| float: IoU |
| """ |
| intersection = compute_intersection_1d(pred_box, true_box) |
| union = compute_union_1d(pred_box, true_box) |
| iou = intersection / union |
| return iou |
|
|
|
|
| def compute_iou_1d_single_candidate_multiple_targets(pred_box, true_boxes): |
| """ |
| Compute IoU for 1D boxes. |
| |
| Args: |
| pred_box (float): Predicted box, [x1, x2] |
| true_boxes (np.ndarray): Ground truth boxes, shape: (N, 2) |
| |
| Returns: |
| float: IoU |
| """ |
| ious = [] |
| for i, true_box in enumerate(true_boxes): |
| ious.append(compute_iou_1d(pred_box, true_box)) |
| return np.array(ious) |
|
|
|
|
| def compute_iou_1d_multiple_candidates_multiple_targets(pred_boxes, true_boxes): |
| """ |
| Compute IoU for 1D boxes. |
| |
| Args: |
| pred_boxes (np.ndarray): Predicted boxes, shape: (N, 2) |
| true_boxes (np.ndarray): Ground truth boxes, shape: (N, 2) |
| |
| Returns: |
| float: IoU |
| """ |
| iou_matrix = np.zeros((len(pred_boxes), len(true_boxes))) |
| for i, pred_box in enumerate(pred_boxes): |
| for j, true_box in enumerate(true_boxes): |
| iou_matrix[i, j] = compute_iou_1d(pred_box, true_box) |
| return iou_matrix |
|
|
|
|
| def compute_mean_iou_1d(pred_boxes, gt_boxes, threshold=0.5): |
| """ |
| Computes mean IOU for 1D bounding boxes. |
| |
| Args: |
| pred_boxes (np.ndarray): Predicted boxes, shape: (N, 2) |
| gt_boxes (np.ndarray): Ground truth boxes, shape: (N, 2) |
| threshold (float): Threshold to consider a prediction correct |
| |
| Returns: |
| float: Mean IOU |
| """ |
| |
| iou_matrix = np.zeros((len(pred_boxes), len(gt_boxes))) |
| for i, pred_box in enumerate(pred_boxes): |
| for j, gt_box in enumerate(gt_boxes): |
| iou_matrix[i, j] = compute_iou_1d(pred_box, gt_box) |
|
|
| |
| max_iou_indices = np.argmax(iou_matrix, axis=1) |
| max_iou = iou_matrix[np.arange(len(pred_boxes)), max_iou_indices] |
| |
| |
| tp = np.zeros(len(pred_boxes)) |
| fp = np.zeros(len(pred_boxes)) |
| iou = np.zeros(len(pred_boxes)) |
| |
| tp = np.where(iou_matrix >= threshold, 1, 0) |
| tp = max_iou >= threshold |
| fp = max_iou < threshold |
| iou = max_iou |
| mean_iou = np.mean(iou) |
| import ipdb; ipdb.set_trace() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def calculate_mAP_1d(pred_boxes, pred_scores, true_boxes, iou_thresh=0.5): |
| """Calculate mean average precision for 1D boxes. |
| |
| Args: |
| pred_boxes (numpy array): Predicted boxes, shape (num_boxes,) |
| pred_scores (numpy array): Predicted scores, shape (num_boxes,) |
| true_boxes (numpy array): Ground truth boxes, shape (num_boxes,) |
| iou_thresh (float): IoU threshold to consider a prediction correct |
| |
| Returns: |
| float: Mean average precision (mAP) |
| """ |
| |
| sort_inds = np.argsort(pred_scores)[::-1] |
| pred_boxes = pred_boxes[sort_inds] |
| pred_scores = pred_scores[sort_inds] |
|
|
| |
| tp = np.zeros(len(pred_boxes)) |
| fp = np.zeros(len(pred_boxes)) |
| for i, box in enumerate(pred_boxes): |
| ious = np.abs(box - true_boxes) / np.maximum(1e-9, np.abs(box) + np.abs(true_boxes)) |
| if len(ious) > 0: |
| max_iou_idx = np.argmax(ious) |
| if ious[max_iou_idx] >= iou_thresh: |
| if tp[max_iou_idx] == 0: |
| tp[i] = 1 |
| fp[i] = 0 |
| else: |
| fp[i] = 1 |
| else: |
| fp[i] = 1 |
|
|
| |
| tp_cumsum = np.cumsum(tp) |
| fp_cumsum = np.cumsum(fp) |
| recall = tp_cumsum / len(true_boxes) |
| precision = tp_cumsum / (tp_cumsum + fp_cumsum) |
|
|
| |
| ap = 0 |
| for t in np.arange(0, 1.1, 0.1): |
| if np.sum(recall >= t) == 0: |
| p = 0 |
| else: |
| p = np.max(precision[recall >= t]) |
| ap += p / 11 |
|
|
| return ap |
|
|
|
|
| def segment_iou(target_segment, candidate_segments): |
| """Compute the temporal intersection over union between a |
| target segment and all the test segments. |
| Parameters |
| ---------- |
| target_segment : 1d array |
| Temporal target segment containing [starting, ending] times. |
| candidate_segments : 2d array |
| Temporal candidate segments containing N x [starting, ending] times. |
| Outputs |
| ------- |
| tiou : 1d array |
| Temporal intersection over union score of the N's candidate segments. |
| """ |
| tt1 = np.maximum(target_segment[0], candidate_segments[:, 0]) |
| tt2 = np.minimum(target_segment[1], candidate_segments[:, 1]) |
| |
| segments_intersection = (tt2 - tt1).clip(0) |
| |
| segments_union = (candidate_segments[:, 1] - candidate_segments[:, 0]) \ |
| + (target_segment[1] - target_segment[0]) - segments_intersection |
| |
| |
| tIoU = segments_intersection.astype(float) / segments_union |
| return tIoU |
|
|
|
|
| def interpolated_prec_rec(prec, rec): |
| """Interpolated AP - VOCdevkit from VOC 2011. |
| """ |
| mprec = np.hstack([[0], prec, [0]]) |
| mrec = np.hstack([[0], rec, [1]]) |
| for i in range(len(mprec) - 1)[::-1]: |
| mprec[i] = max(mprec[i], mprec[i + 1]) |
| idx = np.where(mrec[1::] != mrec[0:-1])[0] + 1 |
| ap = np.sum((mrec[idx] - mrec[idx - 1]) * mprec[idx]) |
| return ap |
|
|
|
|
| from tqdm import tqdm |
| def compute_average_precision_detection( |
| ground_truth, |
| prediction, |
| tiou_thresholds=np.linspace(0.5, 0.95, 10), |
| ): |
| """Compute average precision (detection task) between ground truth and |
| predictions data frames. If multiple predictions occurs for the same |
| predicted segment, only the one with highest score is matches as |
| true positive. This code is greatly inspired by Pascal VOC devkit. |
| |
| Ref: https://github.com/zhang-can/CoLA/blob/\ |
| d21f1b5a4c6c13f9715cfd4ac1ebcd065d179157/eval/eval_detection.py#L200 |
| |
| Parameters |
| ---------- |
| ground_truth : df |
| Data frame containing the ground truth instances. |
| Required fields: ['video-id', 't-start', 't-end'] |
| prediction : df |
| Data frame containing the prediction instances. |
| Required fields: ['video-id, 't-start', 't-end', 'score'] |
| tiou_thresholds : 1darray, optional |
| Temporal intersection over union threshold. |
| Outputs |
| ------- |
| ap : float |
| Average precision score. |
| """ |
| ap = np.zeros(len(tiou_thresholds)) |
| if prediction.empty: |
| return ap |
|
|
| npos = float(len(ground_truth)) |
| lock_gt = np.ones((len(tiou_thresholds),len(ground_truth))) * -1 |
| |
| sort_idx = prediction['score'].values.argsort()[::-1] |
| prediction = prediction.loc[sort_idx].reset_index(drop=True) |
|
|
| |
| tp = np.zeros((len(tiou_thresholds), len(prediction))) |
| fp = np.zeros((len(tiou_thresholds), len(prediction))) |
|
|
| |
| ground_truth_gbvn = ground_truth.groupby('video-id') |
|
|
| |
| for idx, this_pred in prediction.iterrows(): |
|
|
| try: |
| |
| ground_truth_videoid = ground_truth_gbvn.get_group(this_pred['video-id']) |
| except Exception as e: |
| fp[:, idx] = 1 |
| continue |
|
|
| this_gt = ground_truth_videoid.reset_index() |
| tiou_arr = segment_iou(this_pred[['t-start', 't-end']].values, |
| this_gt[['t-start', 't-end']].values) |
| |
| tiou_sorted_idx = tiou_arr.argsort()[::-1] |
| for tidx, tiou_thr in enumerate(tiou_thresholds): |
| for jdx in tiou_sorted_idx: |
| if tiou_arr[jdx] < tiou_thr: |
| fp[tidx, idx] = 1 |
| break |
| if lock_gt[tidx, this_gt.loc[jdx]['index']] >= 0: |
| continue |
| |
| tp[tidx, idx] = 1 |
| lock_gt[tidx, this_gt.loc[jdx]['index']] = idx |
| break |
|
|
| if fp[tidx, idx] == 0 and tp[tidx, idx] == 0: |
| fp[tidx, idx] = 1 |
|
|
| tp_cumsum = np.cumsum(tp, axis=1).astype(float) |
| fp_cumsum = np.cumsum(fp, axis=1).astype(float) |
| recall_cumsum = tp_cumsum / npos |
|
|
| precision_cumsum = tp_cumsum / (tp_cumsum + fp_cumsum) |
|
|
| for tidx in range(len(tiou_thresholds)): |
| ap[tidx] = interpolated_prec_rec(precision_cumsum[tidx,:], recall_cumsum[tidx,:]) |
|
|
|
|
| return ap |
|
|
|
|
| def ap_wrapper( |
| true_clips, |
| pred_clips, |
| pred_scores, |
| tiou_thresholds=np.linspace(0.5, 0.95, 10), |
| ): |
| assert isinstance(true_clips, np.ndarray) |
| assert len(true_clips.shape) == 2 and true_clips.shape[1] == 2 |
| assert isinstance(pred_clips, np.ndarray) |
| assert len(pred_clips.shape) == 2 and pred_clips.shape[1] == 2 |
| assert isinstance(pred_scores, np.ndarray) |
| assert len(pred_scores.shape) == 1 and len(pred_scores) == pred_clips.shape[0] |
| |
| true_df = pd.DataFrame( |
| { |
| "video-id": ["video1"] * len(true_clips), |
| "t-start": true_clips[:, 0], |
| "t-end": true_clips[:, 1], |
| } |
| ) |
| pred_df = pd.DataFrame( |
| { |
| "video-id": ["video1"] * len(pred_clips), |
| "t-start": pred_clips[:, 0], |
| "t-end": pred_clips[:, 1], |
| "score": pred_scores, |
| } |
| ) |
| return compute_average_precision_detection( |
| true_df, |
| pred_df, |
| tiou_thresholds=tiou_thresholds, |
| ) |
|
|
|
|
| def nms_1d(df: pd.DataFrame, score_col="score", iou_thresh=0.5): |
| """Applies NMS on 1D (start, end) box predictions.""" |
| columns = set(df.columns) |
| |
| assert set(["start", "end", "video_id", score_col]).issubset(columns) |
| video_ids = df["video_id"].unique() |
| |
| |
| groups = df.groupby("video_id") |
| |
| |
| keep_indices = [] |
| net_success_fraction = [] |
| tqdm._instances.clear() |
| iterator = tqdm( |
| video_ids, |
| desc="Applying NMS to each video", |
| bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}', |
| ) |
| for video_id in iterator: |
|
|
| |
| rows = groups.get_group(video_id) |
|
|
| |
| rows = rows.sort_values(score_col, ascending=False) |
| |
| |
| n_clips = len(rows) |
| n_clips_selected_in_video = 0 |
| while len(rows): |
| |
| |
| top_row = rows.iloc[0] |
| keep_indices.append(rows.index[0]) |
| n_clips_selected_in_video += 1 |
| top_row = top_row.to_dict() |
| |
| top_segment = np.array([top_row["start"], top_row["end"]]) |
| rows = rows.iloc[1:] |
| other_segments = rows[["start", "end"]].values |
| iou_values = segment_iou(top_segment, other_segments) |
| |
| |
| rows = rows[iou_values < iou_thresh] |
|
|
| net_success_fraction.append(n_clips_selected_in_video / n_clips) |
| net_success_fraction = np.array(net_success_fraction).mean() |
| print("> Net success fraction: {:.2f}".format(net_success_fraction)) |
| |
| return keep_indices |
|
|
|
|
| if __name__ == "__main__": |
| true_clips = np.array( |
| [ |
| [0.1, 0.7], |
| [3.4, 7.8], |
| [3.9, 5.4], |
| ] |
| ) |
| pred_clips = np.array( |
| [ |
| [0.2, 0.8], |
| [3.5, 7.9], |
| [3.9, 5.4], |
| [5.6, 6.7], |
| [6.0, 6.5], |
| ], |
| ) |
| pred_scores = np.array([0.9, 0.8, 0.7, 0.6, 0.5]) |
| |
| |
| iou = compute_iou_1d(pred_clips[0], true_clips[0]) |
| |
| |
| |
| |
| |
| assert np.isclose(iou, 0.714, 3), "Incorrect IoU" |
| |
| |
| ious = compute_iou_1d_single_candidate_multiple_targets(pred_clips[0], true_clips) |
| assert np.allclose(ious, [0.714, 0.0, 0.0], 3), "Incorrect IoU" |
| |
| |
| ious = compute_iou_1d_multiple_candidates_multiple_targets(pred_clips, true_clips) |
| assert ious.shape == (5, 3), "Incorrect shape" |
| |
| ap = ap_wrapper( |
| true_clips, |
| pred_clips, |
| pred_scores, |
| tiou_thresholds=np.linspace(0.5, 0.95, 3), |
| ) |
| |
| final_ap = np.mean(ap) |
| import ipdb; ipdb.set_trace() |
|
|