| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import torch |
| import numpy as np |
| import time |
| import cv2 |
|
|
| def ap_per_class(tp, conf, pred_cls, target_cls): |
| """ Compute the average precision, given the recall and precision curves. |
| Method originally from https://github.com/rafaelpadilla/Object-Detection-Metrics. |
| # Arguments |
| tp: True positives (list). |
| conf: Objectness value from 0-1 (list). |
| pred_cls: Predicted object classes (list). |
| target_cls: True object classes (list). |
| # Returns |
| The average precision as computed in py-faster-rcnn. |
| """ |
|
|
| |
| tp, conf, pred_cls, target_cls = np.array(tp), np.array(conf), np.array(pred_cls), np.array(target_cls) |
|
|
| |
| i = np.argsort(-conf) |
| tp, conf, pred_cls = tp[i], conf[i], pred_cls[i] |
|
|
| |
| unique_classes = np.unique(np.concatenate((pred_cls, target_cls), 0)) |
|
|
| |
| ap, p, r = [], [], [] |
| for c in unique_classes: |
| i = pred_cls == c |
| n_gt = sum(target_cls == c) |
| n_p = sum(i) |
|
|
| if (n_p == 0) and (n_gt == 0): |
| continue |
| elif (n_p == 0) or (n_gt == 0): |
| ap.append(0) |
| r.append(0) |
| p.append(0) |
| else: |
| |
| fpc = np.cumsum(1 - tp[i]) |
| tpc = np.cumsum(tp[i]) |
|
|
| |
| recall_curve = tpc / (n_gt + 1e-16) |
| r.append(tpc[-1] / (n_gt + 1e-16)) |
|
|
| |
| precision_curve = tpc / (tpc + fpc) |
| p.append(tpc[-1] / (tpc[-1] + fpc[-1])) |
|
|
| |
| ap.append(compute_ap(recall_curve, precision_curve)) |
|
|
| return np.array(ap), unique_classes.astype('int32'), np.array(r), np.array(p) |
|
|
| def compute_ap(recall, precision): |
| """ Compute the average precision, given the recall and precision curves. |
| Code originally from https://github.com/rbgirshick/py-faster-rcnn. |
| # Arguments |
| recall: The recall curve (list). |
| precision: The precision curve (list). |
| # Returns |
| The average precision as computed in py-faster-rcnn. |
| """ |
| |
| |
|
|
| mrec = np.concatenate(([0.], recall, [1.])) |
| mpre = np.concatenate(([0.], precision, [0.])) |
|
|
| |
| for i in range(mpre.size - 1, 0, -1): |
| mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i]) |
|
|
| |
| |
| i = np.where(mrec[1:] != mrec[:-1])[0] |
|
|
| |
| ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]) |
| return ap |
|
|
|
|
| def bbox_iou(box1, box2, x1y1x2y2=False): |
| """ |
| Returns the IoU of two bounding boxes |
| """ |
| N, M = len(box1), len(box2) |
| if x1y1x2y2: |
| |
| b1_x1, b1_y1, b1_x2, b1_y2 = box1[:, 0], box1[:, 1], box1[:, 2], box1[:, 3] |
| b2_x1, b2_y1, b2_x2, b2_y2 = box2[:, 0], box2[:, 1], box2[:, 2], box2[:, 3] |
| else: |
| |
| b1_x1, b1_x2 = box1[:, 0] - box1[:, 2] / 2, box1[:, 0] + box1[:, 2] / 2 |
| b1_y1, b1_y2 = box1[:, 1] - box1[:, 3] / 2, box1[:, 1] + box1[:, 3] / 2 |
| b2_x1, b2_x2 = box2[:, 0] - box2[:, 2] / 2, box2[:, 0] + box2[:, 2] / 2 |
| b2_y1, b2_y2 = box2[:, 1] - box2[:, 3] / 2, box2[:, 1] + box2[:, 3] / 2 |
|
|
| |
| inter_rect_x1 = torch.max(b1_x1.unsqueeze(1), b2_x1) |
| inter_rect_y1 = torch.max(b1_y1.unsqueeze(1), b2_y1) |
| inter_rect_x2 = torch.min(b1_x2.unsqueeze(1), b2_x2) |
| inter_rect_y2 = torch.min(b1_y2.unsqueeze(1), b2_y2) |
| |
| inter_area = torch.clamp(inter_rect_x2 - inter_rect_x1, 0) * torch.clamp(inter_rect_y2 - inter_rect_y1, 0) |
| |
| b1_area = ((b1_x2 - b1_x1) * (b1_y2 - b1_y1)).view(-1,1).expand(N,M) |
| b2_area = ((b2_x2 - b2_x1) * (b2_y2 - b2_y1)).view(1,-1).expand(N,M) |
|
|
| return inter_area / (b1_area + b2_area - inter_area + 1e-16) |
|
|
| def xyxy2xywh(x): |
| |
| y = torch.zeros(x.shape) if x.dtype is torch.float32 else np.zeros(x.shape) |
| y[:, 0] = (x[:, 0] + x[:, 2]) / 2 |
| y[:, 1] = (x[:, 1] + x[:, 3]) / 2 |
| y[:, 2] = x[:, 2] - x[:, 0] |
| y[:, 3] = x[:, 3] - x[:, 1] |
| return y |
|
|
|
|
| def xywh2xyxy(x): |
| |
| y = torch.zeros(x.shape) if x.dtype is torch.float32 else np.zeros(x.shape) |
| y[:, 0] = (x[:, 0] - x[:, 2] / 2) |
| y[:, 1] = (x[:, 1] - x[:, 3] / 2) |
| y[:, 2] = (x[:, 0] + x[:, 2] / 2) |
| y[:, 3] = (x[:, 1] + x[:, 3] / 2) |
| return y |
|
|
|
|
| @torch.no_grad() |
| def motdet_evaluate(model, data_loader, iou_thres=0.5, print_interval=10): |
| model.eval() |
| mean_mAP, mean_R, mean_P, seen = 0.0, 0.0, 0.0, 0 |
| print('%11s' * 5 % ('Image', 'Total', 'P', 'R', 'mAP')) |
| outputs, mAPs, mR, mP, TP, confidence, pred_class, target_class, jdict = \ |
| [], [], [], [], [], [], [], [], [] |
| AP_accum, AP_accum_count = np.zeros(1), np.zeros(1) |
| for batch_i, data in enumerate(data_loader): |
| seen += 1 |
| if(batch_i > 300): |
| break |
| |
| imgs, _ = data[0].decompose() |
| |
| |
| targets = data[1][0] |
| |
| height, width = targets['orig_size'].cpu().numpy().tolist() |
| t = time.time() |
| output = model(imgs.cuda()) |
| outputs_class = output['pred_logits'].squeeze() |
| if outputs_class.ndim == 1: |
| |
| outputs_class = outputs_class.unsqueeze(-1) |
| outputs_boxes = output['pred_boxes'].squeeze() |
| target_boxes = targets['boxes'] |
|
|
| |
| if target_boxes is None: |
| |
| if target_boxes.size(0) != 0: |
| mAPs.append(0), mR.append(0), mP.append(0) |
| continue |
|
|
| |
| correct = [] |
| if target_boxes.size(0) == 0: |
| |
| mAPs.append(0), mR.append(0), mP.append(0) |
| continue |
| else: |
| target_cls = targets['labels'] |
| |
| target_boxes = xywh2xyxy(target_boxes) |
| target_boxes[:, 0] *= width |
| target_boxes[:, 2] *= width |
| target_boxes[:, 1] *= height |
| target_boxes[:, 3] *= height |
|
|
| outputs_boxes = xywh2xyxy(outputs_boxes) |
| outputs_boxes[:, 0] *= width |
| outputs_boxes[:, 2] *= width |
| outputs_boxes[:, 1] *= height |
| outputs_boxes[:, 3] *= height |
|
|
| detected = set() |
| print("output_boxes.shape={} class.shape={}".format(outputs_boxes.shape, outputs_class.shape)) |
| print((outputs_class.sigmoid() > 0.5).sum()) |
| num_dt = 0 |
| num_tp = 0 |
| for *pred_bbox, conf in zip(outputs_boxes, outputs_class): |
| obj_pred = 0 |
| pred_bbox = torch.FloatTensor(pred_bbox[0]).view(1, -1) |
| if conf.sigmoid() > 0.5: |
| num_dt += 1 |
|
|
| |
| iou = bbox_iou(pred_bbox, target_boxes, x1y1x2y2=True)[0] |
| |
| best_i = np.argmax(iou) |
| |
| if iou[best_i] > iou_thres and obj_pred == int(target_cls[best_i]) and best_i.item() not in detected: |
| correct.append(1) |
| if conf.sigmoid() > 0.5: |
| num_tp += 1 |
| detected.add(best_i.item()) |
| else: |
| correct.append(0) |
| print("precision={} recall={}".format(num_tp / max(1.0, num_dt), num_tp / max(1.0, len(target_boxes)))) |
| |
| AP, AP_class, R, P = ap_per_class(tp=correct, |
| conf=outputs_class[:, 0].cpu(), |
| pred_cls=np.zeros_like(outputs_class[:, 0].cpu()), |
| target_cls=target_cls) |
|
|
| |
| AP_accum_count += np.bincount(AP_class, minlength=1) |
| AP_accum += np.bincount(AP_class, minlength=1, weights=AP) |
|
|
| |
| mAPs.append(AP.mean()) |
| mR.append(R.mean()) |
| mP.append(P.mean()) |
|
|
| |
| mean_mAP = np.sum(mAPs) / (AP_accum_count + 1E-16) |
| mean_R = np.sum(mR) / (AP_accum_count + 1E-16) |
| mean_P = np.sum(mP) / (AP_accum_count + 1E-16) |
|
|
| if batch_i % print_interval == 0: |
| |
| print(('%11s%11s' + '%11.3g' * 4 + 's') % |
| (seen, 100, mean_P, mean_R, mean_mAP, time.time() - t)) |
| |
| print('%11s' * 5 % ('Image', 'Total', 'P', 'R', 'mAP')) |
|
|
| print('AP: %-.4f\n\n' % (AP_accum[0] / (AP_accum_count[0] + 1E-16))) |
|
|
| |
| return mean_mAP, mean_R, mean_P |
|
|
|
|
| def init_metrics(): |
| mean_mAP, mean_R, mean_P, seen = 0.0, 0.0, 0.0, 0 |
| outputs, mAPs, mR, mP, TP, confidence, pred_class, target_class, jdict = [], [], [], [], [], [], [], [], [] |
| AP_accum, AP_accum_count = np.zeros(1), np.zeros(1) |
| return {'mean_mAP': mean_mAP, |
| 'mean_R': mean_R, |
| 'mean_P': mean_P, |
| 'seen': seen, |
| 'outputs': outputs, |
| 'mAPs': mAPs, |
| 'mR': mR, |
| 'mP': mP, |
| 'TP': TP, |
| 'confidence': confidence, |
| 'pred_class': pred_class, |
| 'target_class': target_class, |
| 'jdict': jdict, |
| 'AP_accum': AP_accum, |
| 'AP_accum_count': AP_accum_count, |
| } |
|
|
|
|
| @torch.no_grad() |
| def detmotdet_evaluate(model, data_loader, device, iou_thres=0.5, print_interval=10): |
| model.eval() |
| print('%11s' * 5 % ('Cur Image', 'Total', 'P', 'R', 'mAP')) |
| |
| metrics_list = [init_metrics() for i in range(10)] |
| for batch_i, data in enumerate(data_loader): |
| if(batch_i > 100): |
| break |
|
|
| for key in list(data.keys()): |
| if isinstance(data[key], list): |
| data[key] = [img_info.to(device) for img_info in data[key]] |
| else: |
| data[key] = data[key].to(device) |
| output = model(data) |
| num_frames = len(data['gt_instances']) |
| for i in range(num_frames): |
| metrics_i = metrics_list[i] |
| metrics_i['seen'] += 1 |
| gt_instances = data['gt_instances'][i].to(torch.device('cpu')) |
|
|
| height, width = gt_instances.image_size |
| t = time.time() |
| outputs_class = output['pred_logits'][i].squeeze() |
| outputs_boxes = output['pred_boxes'][i].squeeze() |
|
|
| if outputs_class.ndim == 1: |
| |
| outputs_class = outputs_class.unsqueeze(-1) |
|
|
| target_boxes = gt_instances.boxes |
|
|
| |
| if target_boxes is None: |
| |
| if target_boxes.size(0) != 0: |
| metrics_i['mAPs'].append(0) |
| metrics_i['mR'].append(0) |
| metrics_i['mP'].append(0) |
| print('cur_target_boxes is None') |
| continue |
|
|
| |
| |
| correct = [] |
| if target_boxes.size(0) == 0: |
| |
| metrics_i['mAP'].append(0) |
| metrics_i['mR'].append(0) |
| metrics_i['mP'].apppend(0) |
| print('cur_target_boxes.size(0) == 0') |
| continue |
| else: |
| target_cls = gt_instances.labels |
| |
| target_boxes = xywh2xyxy(target_boxes) |
| target_boxes[:, 0] *= width |
| target_boxes[:, 2] *= width |
| target_boxes[:, 1] *= height |
| target_boxes[:, 3] *= height |
|
|
| outputs_boxes = xywh2xyxy(outputs_boxes) |
| outputs_boxes[:, 0] *= width |
| outputs_boxes[:, 2] *= width |
| outputs_boxes[:, 1] *= height |
| outputs_boxes[:, 3] *= height |
|
|
| detected = [] |
| for *pred_bbox, conf in zip(outputs_boxes, outputs_class): |
| obj_pred = 0 |
| pred_bbox = torch.FloatTensor(pred_bbox[0]).view(1, -1) |
| |
| iou = bbox_iou(pred_bbox, target_boxes, x1y1x2y2=True)[0] |
| |
| best_i = np.argmax(iou) |
| |
| if iou[best_i] > iou_thres and obj_pred == int(target_cls[best_i]) and best_i not in detected: |
| correct.append(1) |
| detected.append(best_i) |
| else: |
| correct.append(0) |
|
|
| |
| AP, AP_class, R, P = ap_per_class(tp=correct, |
| conf=outputs_class[:, 0].cpu(), |
| pred_cls=np.zeros_like(outputs_class[:, 0].cpu()), |
| target_cls=target_cls) |
|
|
| |
| metrics_i['AP_accum_count'] += np.bincount(AP_class, minlength=1) |
| metrics_i['AP_accum'] += np.bincount(AP_class, minlength=1, weights=AP) |
|
|
| |
| metrics_i['mAPs'].append(AP.mean()) |
| metrics_i['mR'].append(R.mean()) |
| metrics_i['mP'].append(P.mean()) |
|
|
| |
| metrics_i['mean_mAP'] = np.sum(metrics_i['mAPs']) / (metrics_i['AP_accum_count'] + 1E-16) |
| metrics_i['mean_R'] = np.sum(metrics_i['mR']) / (metrics_i['AP_accum_count'] + 1E-16) |
| metrics_i['mean_P'] = np.sum(metrics_i['mP']) / (metrics_i['AP_accum_count'] + 1E-16) |
|
|
| if batch_i % print_interval == 0: |
| |
| seen = metrics_i['seen'] |
| mean_P = metrics_i['mean_P'] |
| mean_R = metrics_i['mean_R'] |
| mean_mAP = metrics_i['mean_mAP'] |
| print("res_frame_{}".format(i)) |
| print(('%11s%11s' + '%11.3g' * 4 + 's') % (seen, 100, mean_P, mean_R, mean_mAP, time.time() - t)) |
|
|
| |
| ret = [] |
| for i in range(2): |
| mean_mAP = metrics_list[i]['mean_mAP'] |
| mean_R = metrics_list[i]['mean_R'] |
| mean_P = metrics_list[i]['mean_P'] |
| ret.append(mean_mAP) |
| ret.append(mean_R) |
| ret.append(mean_P) |
| return ret |
|
|