# ------------------------------------------------------------------------------ # Adapted from https://github.com/MCG-NJU/MultiSports # Original licence: Copyright (c) MCG-NJU, under the MIT License. # ------------------------------------------------------------------------------ import math from collections import defaultdict import numpy as np from mmengine.logging import MMLogger from rich.progress import track def area2d_voc(b): """Compute the areas for a set of 2D boxes.""" return (b[:, 2] - b[:, 0]) * (b[:, 3] - b[:, 1]) def overlap2d_voc(b1, b2): """Compute the overlaps between a set of boxes b1 and one box b2.""" xmin = np.maximum(b1[:, 0], b2[:, 0]) ymin = np.maximum(b1[:, 1], b2[:, 1]) xmax = np.minimum(b1[:, 2], b2[:, 2]) ymax = np.minimum(b1[:, 3], b2[:, 3]) width = np.maximum(0, xmax - xmin) height = np.maximum(0, ymax - ymin) return width * height def iou2d_voc(b1, b2): """Compute the IoU between a set of boxes b1 and 1 box b2.""" if b1.ndim == 1: b1 = b1[None, :] if b2.ndim == 1: b2 = b2[None, :] assert b2.shape[0] == 1 ov = overlap2d_voc(b1, b2) return ov / (area2d_voc(b1) + area2d_voc(b2) - ov) def iou3d_voc(b1, b2): """Compute the IoU between two tubes with same temporal extent.""" assert b1.shape[0] == b2.shape[0] assert np.all(b1[:, 0] == b2[:, 0]) ov = overlap2d_voc(b1[:, 1:5], b2[:, 1:5]) return np.mean(ov / (area2d_voc(b1[:, 1:5]) + area2d_voc(b2[:, 1:5]) - ov)) def iou3dt_voc(b1, b2, spatialonly=False, temporalonly=False): """Compute the spatio-temporal IoU between two tubes.""" tmin = max(b1[0, 0], b2[0, 0]) tmax = min(b1[-1, 0], b2[-1, 0]) if tmax < tmin: return 0.0 temporal_inter = tmax - tmin temporal_union = max(b1[-1, 0], b2[-1, 0]) - min(b1[0, 0], b2[0, 0]) tube1 = b1[int(np.where( b1[:, 0] == tmin)[0]):int(np.where(b1[:, 0] == tmax)[0]) + 1, :] tube2 = b2[int(np.where( b2[:, 0] == tmin)[0]):int(np.where(b2[:, 0] == tmax)[0]) + 1, :] if temporalonly: return temporal_inter / temporal_union return iou3d_voc(tube1, tube2) * (1. if spatialonly else temporal_inter / temporal_union) def pr_to_ap_voc(pr): precision = pr[:, 0] recall = pr[:, 1] recall = np.concatenate([[0], recall, [1]]) precision = np.concatenate([[0], precision, [0]]) # Preprocess precision to be a non-decreasing array for i in range(len(precision) - 2, -1, -1): precision[i] = np.maximum(precision[i], precision[i + 1]) indices = np.where(recall[1:] != recall[:-1])[0] + 1 average_precision = np.sum( (recall[indices] - recall[indices - 1]) * precision[indices]) return average_precision def nms_tubelets(dets, overlapThresh=0.3, top_k=None): """Compute the NMS for a set of scored tubelets scored tubelets are numpy array with 4K+1 columns, last one being the score return the indices of the tubelets to keep.""" # If there are no detections, return an empty list if len(dets) == 0: return dets if top_k is None: top_k = len(dets) K = int((dets.shape[1] - 1) / 4) # Coordinates of bounding boxes x1 = [dets[:, 4 * k] for k in range(K)] y1 = [dets[:, 4 * k + 1] for k in range(K)] x2 = [dets[:, 4 * k + 2] for k in range(K)] y2 = [dets[:, 4 * k + 3] for k in range(K)] # Compute the area of the bounding boxes and sort the bounding # boxes by the bottom-right y-coordinate of the bounding box # area = (x2 - x1 + 1) * (y2 - y1 + 1) scores = dets[:, -1] area = [(x2[k] - x1[k] + 1) * (y2[k] - y1[k] + 1) for k in range(K)] order = np.argsort(scores)[::-1] weight = np.zeros_like(scores) + 1 counter = 0 while order.size > 0: i = order[0] counter += 1 # Compute overlap xx1 = [np.maximum(x1[k][i], x1[k][order[1:]]) for k in range(K)] yy1 = [np.maximum(y1[k][i], y1[k][order[1:]]) for k in range(K)] xx2 = [np.minimum(x2[k][i], x2[k][order[1:]]) for k in range(K)] yy2 = [np.minimum(y2[k][i], y2[k][order[1:]]) for k in range(K)] w = [np.maximum(0, xx2[k] - xx1[k] + 1) for k in range(K)] h = [np.maximum(0, yy2[k] - yy1[k] + 1) for k in range(K)] inter_area = [w[k] * h[k] for k in range(K)] ious = sum([ inter_area[k] / (area[k][order[1:]] + area[k][i] - inter_area[k]) for k in range(K) ]) index = np.where(ious > overlapThresh * K)[0] weight[order[index + 1]] = 1 - ious[index] index2 = np.where(ious <= overlapThresh * K)[0] order = order[index2 + 1] dets[:, -1] = dets[:, -1] * weight new_scores = dets[:, -1] new_order = np.argsort(new_scores)[::-1] dets = dets[new_order, :] return dets[:top_k, :] class Dataset(): def __init__(self, anno, frm_alldets) -> None: self.anno = anno self.video_list = self.anno['test_videos'][0] self.nframes = self.anno['nframes'] self.labels = self.anno['labels'] self.frm_alldets = frm_alldets def get_vid_dets(self): self.vid_frm_det = defaultdict(list) for frm_det in self.frm_alldets: vid_idx = int(frm_det[0]) vid_name = self.video_list[vid_idx] self.vid_frm_det[vid_name].append(frm_det) self.vid_det = dict() for vid_name, vid_frm_dets in self.vid_frm_det.items(): self.vid_det[vid_name] = dict() for frm_idx in range(1, self.nframes[vid_name] + 1): self.vid_det[vid_name][frm_idx] = dict() for label_idx in range(len(self.labels)): self.vid_det[vid_name][frm_idx][label_idx] = np.empty( shape=(0, 5)) for frm_dets in vid_frm_dets: frm_idx = int(frm_dets[1]) label_idx = int(frm_dets[2]) det = [*frm_dets[-4:], frm_det[3]] det = np.array(det)[None, :] self.vid_det[vid_name][frm_idx][label_idx] = np.concatenate( [self.vid_det[vid_name][frm_idx][label_idx], det]) return self.vid_det def link_tubes(anno, frm_dets, K=1, len_thre=15): dataset = Dataset(anno, frm_dets) vlist = dataset.video_list total_VDets = dataset.get_vid_dets() total_video_tubes = {label: [] for label in range(len(dataset.labels))} for v in track(vlist, description='linking tubes...'): RES = {} if v not in total_VDets: continue VDets = total_VDets[v] for ilabel in range(len(dataset.labels)): FINISHED_TUBES = [] CURRENT_TUBES = [] # tubes is a list of tuple (frame, lstubelets) # calculate average scores of tubelets in tubes def tubescore(tt): return np.mean( np.array([tt[i][1][-1] for i in range(len(tt))])) for frame in range(1, dataset.nframes[v] + 2 - K): # load boxes of the new frame and do nms while keeping Nkeep highest scored # noqa: E501 ltubelets = np.array( VDets[frame][ilabel] ) # [:,range(4*K) + [4*K + 1 + ilabel]] Nx(4K+1) with (x1 y1 x2 y2)*K ilabel-score # noqa: E501 ltubelets = nms_tubelets(ltubelets, 0.6, top_k=10) # just start new tubes if frame == 1: for i in range(ltubelets.shape[0]): CURRENT_TUBES.append([(1, ltubelets[i, :])]) continue # sort current tubes according to average score avgscore = [tubescore(t) for t in CURRENT_TUBES] argsort = np.argsort(-np.array(avgscore)) CURRENT_TUBES = [CURRENT_TUBES[i] for i in argsort] # loop over tubes finished = [] for it, t in enumerate(CURRENT_TUBES): # compute ious between the last box of t and ltubelets last_frame, last_tubelet = t[-1] ious = [] offset = frame - last_frame if offset < K: nov = K - offset ious = sum([ iou2d_voc( ltubelets[:, 4 * iov:4 * iov + 4], last_tubelet[4 * (iov + offset):4 * (iov + offset + 1)]) for iov in range(nov) ]) / float(nov) else: ious = iou2d_voc(ltubelets[:, :4], last_tubelet[4 * K - 4:4 * K]) valid = np.where(ious >= 0.5)[0] if valid.size > 0: # take the one with maximum score idx = valid[np.argmax(ltubelets[valid, -1])] CURRENT_TUBES[it].append((frame, ltubelets[idx, :])) ltubelets = np.delete(ltubelets, idx, axis=0) else: if offset >= K: finished.append(it) # finished tubes that are done for it in finished[:: -1]: # process in reverse order to delete them with the right index why --++-- # noqa: E501 FINISHED_TUBES.append(CURRENT_TUBES[it][:]) del CURRENT_TUBES[it] # start new tubes for i in range(ltubelets.shape[0]): CURRENT_TUBES.append([(frame, ltubelets[i, :])]) # all tubes are not finished FINISHED_TUBES += CURRENT_TUBES # build real tubes output = [] for t in FINISHED_TUBES: score = tubescore(t) # just start new tubes if score < 0.005: continue beginframe = t[0][0] endframe = t[-1][0] + K - 1 length = endframe + 1 - beginframe # delete tubes with short duraton if length < len_thre: continue # build final tubes by average the tubelets out = np.zeros((length, 6), dtype=np.float32) out[:, 0] = np.arange(beginframe, endframe + 1) n_per_frame = np.zeros((length, 1), dtype=np.int32) for i in range(len(t)): frame, box = t[i] for k in range(K): out[frame - beginframe + k, 1:5] += box[4 * k:4 * k + 4] out[frame - beginframe + k, -1] += box[-1] # single frame confidence n_per_frame[frame - beginframe + k, 0] += 1 out[:, 1:] /= n_per_frame output.append([out, score]) # out: [num_frames, (frame idx, x1, y1, x2, y2, score)] RES[ilabel] = output if output: for tube, tube_score in output: video_tube_res = tuple([v, tube_score, tube]) total_video_tubes[ilabel].append(video_tube_res) return total_video_tubes def frameAP(GT, alldets, thr, print_info=True): logger = MMLogger.get_current_instance() vlist = GT['test_videos'][0] results = {} for ilabel, label in enumerate(GT['labels']): # detections of this class if label in [ 'aerobic kick jump', 'aerobic off axis jump', 'aerobic butterfly jump', 'aerobic balance turn', 'basketball save', 'basketball jump ball' ]: if print_info: logger.info('do not evaluate {}'.format(label)) continue # det format: # noqa: E501 detections = alldets[alldets[:, 2] == ilabel, :] # load ground-truth of this class gt = {} for iv, v in enumerate(vlist): tubes = GT['gttubes'][v] if ilabel not in tubes: continue for tube in tubes[ilabel]: for i in range(tube.shape[0]): k = (iv, int(tube[i, 0])) # k -> (video_idx, frame_idx) if k not in gt: gt[k] = [] gt[k].append(tube[i, 1:5].tolist()) for k in gt: gt[k] = np.array(gt[k]) # pr will be an array containing precision-recall values pr = np.empty((detections.shape[0], 2), dtype=np.float64) # precision,recall gt_num = sum([g.shape[0] for g in gt.values()]) if gt_num == 0: if print_info: logger.info('no such label', ilabel, label) continue fp = 0 # false positives tp = 0 # true positives is_gt_box_detected = {} for i, j in enumerate(np.argsort(-detections[:, 3])): k = (int(detections[j, 0]), int(detections[j, 1])) box = detections[j, 4:8] ispositive = False if k in gt: # match gt_box according to the iou if k not in is_gt_box_detected: is_gt_box_detected[k] = np.zeros( gt[k].shape[0], dtype=bool) ious = iou2d_voc(gt[k], box) amax = np.argmax(ious) if ious[amax] >= thr: if not is_gt_box_detected[k][amax]: ispositive = True is_gt_box_detected[k][amax] = True if ispositive: tp += 1 else: fp += 1 pr[i, 0] = float(tp) / float(tp + fp) pr[i, 1] = float(tp) / float(gt_num) results[label] = pr # display results ap = 100 * np.array([pr_to_ap_voc(results[label]) for label in results]) class_result = {} for label in results: class_result[label] = pr_to_ap_voc(results[label]) * 100 frameap_result = np.mean(ap) if print_info: logger.info('frameAP_{}\n'.format(thr)) for label in class_result: logger.info('{:20s} {:8.2f}'.format(label, class_result[label])) logger.info('{:20s} {:8.2f}'.format('mAP', frameap_result)) return frameap_result def videoAP(GT, alldets, thr, print_info=True): logger = MMLogger.get_current_instance() vlist = GT['test_videos'][0] res = {} for ilabel in range(len(GT['labels'])): if GT['labels'][ilabel] in [ 'aerobic kick jump', 'aerobic off axis jump', 'aerobic butterfly jump', 'aerobic balance turn', 'basketball save', 'basketball jump ball' ]: if print_info: logger.info('do not evaluate{}'.format(GT['labels'][ilabel])) continue detections = alldets[ilabel] # load ground-truth gt = {} for v in vlist: tubes = GT['gttubes'][v] if ilabel not in tubes: continue gt[v] = tubes[ilabel] if len(gt[v]) == 0: del gt[v] # precision,recall pr = np.empty((len(detections), 2), dtype=np.float64) gt_num = sum([len(g) for g in gt.values()]) # false negatives fp = 0 # false positives tp = 0 # true positives if gt_num == 0: if print_info: logger.info('no such label', ilabel, GT['labels'][ilabel]) continue is_gt_box_detected = {} for i, j in enumerate( np.argsort(-np.array([dd[1] for dd in detections]))): v, score, tube = detections[j] ispositive = False if v in gt: if v not in is_gt_box_detected: is_gt_box_detected[v] = np.zeros(len(gt[v]), dtype=bool) ious = [iou3dt_voc(g, tube) for g in gt[v]] amax = np.argmax(ious) if ious[amax] >= thr: if not is_gt_box_detected[v][amax]: ispositive = True is_gt_box_detected[v][amax] = True if ispositive: tp += 1 else: fp += 1 pr[i, 0] = float(tp) / float(tp + fp) pr[i, 1] = float(tp) / float(gt_num) res[GT['labels'][ilabel]] = pr # display results ap = 100 * np.array([pr_to_ap_voc(res[label]) for label in res]) videoap_result = np.mean(ap) class_result = {} for label in res: class_result[label] = pr_to_ap_voc(res[label]) * 100 if print_info: logger.info('VideoAP_{}\n'.format(thr)) for label in class_result: logger.info('{:20s} {:8.2f}'.format(label, class_result[label])) logger.info('{:20s} {:8.2f}'.format('mAP', videoap_result)) return videoap_result def videoAP_all(groundtruth, detections): high_ap = 0 for i in range(10): thr = 0.5 + 0.05 * i high_ap += videoAP(groundtruth, detections, thr, print_info=False) high_ap = high_ap / 10.0 low_ap = 0 for i in range(9): thr = 0.05 + 0.05 * i low_ap += videoAP(groundtruth, detections, thr, print_info=False) low_ap = low_ap / 9.0 all_ap = 0 for i in range(9): thr = 0.1 + 0.1 * i all_ap += videoAP(groundtruth, detections, thr, print_info=False) all_ap = all_ap / 9.0 map = { 'v_map_0.05:0.45': round(low_ap, 4), 'v_map_0.10:0.90': round(all_ap, 4), 'v_map_0.50:0.95': round(high_ap, 4), } return map def videoAP_error(GT, alldets, thr): vlist = GT['test_videos'][0] th_s = math.sqrt(thr) th_t = math.sqrt(thr) print('th is', thr) print('th_s is', th_s) print('th_t is', th_t) res = {} dupgt = {} for v in vlist: dupgt[v] = GT['gttubes'][v] # compute video error for every class for ilabel in range(len(GT['labels'])): if GT['labels'][ilabel] in [ 'aerobic kick jump', 'aerobic off axis jump', 'aerobic butterfly jump', 'aerobic balance turn', 'basketball save', 'basketball jump ball' ]: print('do not evaluate {}'.format(GT['labels'][ilabel])) continue detections = alldets[ilabel] pr = np.zeros((len(detections), 11), dtype=np.float32) gt_num = 0 for v in dupgt: if ilabel in dupgt[v]: gt_num = gt_num + len(dupgt[v][ilabel]) fp = 0 # false positives tp = 0 # true positives ER = 0 # repeat error repeat predict for the same instance EN = 0 # extra error EL = 0 # localization errors EC = 0 # classification error ET = 0 # timing error ErrCT = 0 # cls + time ECL = 0 # cls + loc ETL = 0 # time + loc ECTL = 0 # cls + time + loc is_gt_box_detected = {} for i, j in enumerate( np.argsort(-np.array([dd[1] for dd in detections]))): v, score, tube = detections[j] ispositive = False end = False if ilabel in dupgt[v]: if v not in is_gt_box_detected: is_gt_box_detected[v] = np.zeros( len(dupgt[v][ilabel]), dtype=bool) ious = [iou3dt_voc(g, tube) for g in dupgt[v][ilabel]] amax = np.argmax(ious) if ious[amax] >= thr: if not is_gt_box_detected[v][amax]: ispositive = True is_gt_box_detected[v][amax] = True else: ER += 1 end = True if end is False: ious = [] for ll in dupgt[v]: if ll == ilabel: continue for g in dupgt[v][ll]: ious.append(iou3dt_voc(g, tube)) if ious != []: amax = np.argmax(ious) if ious[amax] >= thr: EC += 1 end = True if end is False: all_gt = [] ious = [] for ll in dupgt[v]: for g in dupgt[v][ll]: all_gt.append((ll, g)) ious.append(iou3dt_voc(g, tube)) amax = np.argmax(ious) assert (ious[amax] < thr) if ious[amax] > 0: t_iou = iou3dt_voc( all_gt[amax][1], tube, temporalonly=True) s_iou = iou3dt_voc(all_gt[amax][1], tube, spatialonly=True) if all_gt[amax][0] == ilabel: assert (t_iou < th_t or s_iou < th_s) if t_iou >= th_t: EL += 1 end = True elif s_iou >= th_s: ET += 1 end = True else: ETL += 1 end = True else: assert (t_iou < th_t or s_iou < th_s) if t_iou >= th_t: ECL += 1 end = True elif s_iou >= th_s: ErrCT += 1 end = True else: ECTL += 1 end = True else: EN += 1 end = True assert (end is True) if ispositive: tp += 1 # fn -= 1 else: fp += 1 assert (fp == (ER + EN + EL + EC + ET + ErrCT + ECL + ETL + ECTL)) pr[i, 0] = max(float(tp) / float(tp + fp), 0.) pr[i, 1] = max(float(tp) / float(gt_num), 0.) pr[i, 2] = max(float(ER) / float(tp + fp), 0.) pr[i, 3] = max(float(EN) / float(tp + fp), 0.) pr[i, 4] = max(float(EL) / float(tp + fp), 0.) pr[i, 5] = max(float(EC) / float(tp + fp), 0.) pr[i, 6] = max(float(ET) / float(tp + fp), 0.) pr[i, 7] = max(float(ErrCT) / float(tp + fp), 0.) pr[i, 8] = max(float(ECL) / float(tp + fp), 0.) pr[i, 9] = max(float(ETL) / float(tp + fp), 0.) pr[i, 10] = max(float(ECTL) / float(tp + fp), 0.) res[GT['labels'][ilabel]] = pr # display results AP = 100 * np.array([pr_to_ap_voc(res[label][:, [0, 1]]) for label in res]) othersap = [ 100 * np.array([pr_to_ap_voc(res[label][:, [j, 1]]) for label in res]) for j in range(2, 11) ] ER = othersap[0] EN = othersap[1] EL = othersap[2] EC = othersap[3] ET = othersap[4] ErrCT = othersap[5] ECL = othersap[6] ETL = othersap[7] ECTL = othersap[8] # missed detections = 1-recalll EM = [] for label in res: if res[label].shape[0] != 0: EM.append(100 - 100 * res[label][-1, 1]) else: EM.append(100) EM = np.array(EM) LIST = [AP, ER, EN, EL, EC, ET, ErrCT, ECL, ETL, ECTL, EM] print('Error Analysis') print('') print( '{:20s} {:8s} {:8s} {:8s} {:8s} {:8s} {:8s} {:8s} {:8s} {:8s} {:8s} {:8s}' # noqa: E501 .format('label', ' AP ', ' Repeat ', ' Extra ', ' Loc. ', ' Cls. ', ' Time ', ' Cls.+Time ', ' Cls.+Loc. ', ' Time+Loc. ', ' C+T+L ', ' missed ')) print('') for il, label in enumerate(res): print('{:20s} '.format(label) + ' '.join(['{:8.2f}'.format(L[il]) for L in LIST])) print('') print('{:20s} '.format('mean') + ' '.join(['{:8.2f}'.format(np.mean(L)) for L in LIST])) print('')