| import os |
| import torch |
| import numpy as np |
| from scipy.io import loadmat |
|
|
| from dataset.ucf_jhmdb import UCF_JHMDB_Dataset, UCF_JHMDB_VIDEO_Dataset |
| from utils.box_ops import rescale_bboxes |
|
|
| from .cal_frame_mAP import evaluate_frameAP |
| from .cal_video_mAP import evaluate_videoAP |
|
|
|
|
| class UCF_JHMDB_Evaluator(object): |
| def __init__(self, |
| data_root=None, |
| dataset='ucf24', |
| model_name='yowo', |
| metric='fmap', |
| img_size=224, |
| len_clip=1, |
| batch_size=1, |
| conf_thresh=0.01, |
| iou_thresh=0.5, |
| transform=None, |
| collate_fn=None, |
| gt_folder=None, |
| save_path=None): |
| self.data_root = data_root |
| self.dataset = dataset |
| self.model_name = model_name |
| self.img_size = img_size |
| self.len_clip = len_clip |
| self.batch_size = batch_size |
| self.conf_thresh = conf_thresh |
| self.iou_thresh = iou_thresh |
| self.collate_fn = collate_fn |
|
|
| self.gt_folder = gt_folder |
| self.save_path = save_path |
|
|
| self.gt_file = os.path.join(data_root, 'splitfiles/finalAnnots.mat') |
| self.testlist = os.path.join(data_root, 'splitfiles/testlist01.txt') |
|
|
| |
| if metric == 'fmap': |
| self.testset = UCF_JHMDB_Dataset( |
| data_root=data_root, |
| dataset=dataset, |
| img_size=img_size, |
| transform=transform, |
| is_train=False, |
| len_clip=len_clip, |
| sampling_rate=1) |
| self.num_classes = self.testset.num_classes |
| elif metric == 'vmap': |
| self.testset = UCF_JHMDB_VIDEO_Dataset( |
| data_root=data_root, |
| dataset=dataset, |
| img_size=img_size, |
| transform=transform, |
| len_clip=len_clip, |
| sampling_rate=1) |
| self.num_classes = self.testset.num_classes |
|
|
|
|
| def evaluate_frame_map(self, model, epoch=1, show_pr_curve=False): |
| print("Metric: Frame mAP") |
| |
| self.testloader = torch.utils.data.DataLoader( |
| dataset=self.testset, |
| batch_size=self.batch_size, |
| shuffle=False, |
| collate_fn=self.collate_fn, |
| num_workers=4, |
| drop_last=False, |
| pin_memory=True |
| ) |
| |
| epoch_size = len(self.testloader) |
|
|
| |
| for iter_i, (batch_frame_id, batch_video_clip, batch_target) in enumerate(self.testloader): |
| |
| batch_video_clip = batch_video_clip.to(model.device) |
|
|
| with torch.no_grad(): |
| |
| batch_scores, batch_labels, batch_bboxes = model(batch_video_clip) |
|
|
| |
| for bi in range(len(batch_scores)): |
| frame_id = batch_frame_id[bi] |
| scores = batch_scores[bi] |
| labels = batch_labels[bi] |
| bboxes = batch_bboxes[bi] |
| target = batch_target[bi] |
|
|
| |
| orig_size = target['orig_size'] |
| bboxes = rescale_bboxes(bboxes, orig_size) |
|
|
| if not os.path.exists('results'): |
| os.mkdir('results') |
|
|
| if self.dataset == 'ucf24': |
| detection_path = os.path.join('results', 'ucf_detections', self.model_name, 'detections_' + str(epoch), frame_id) |
| current_dir = os.path.join('results', 'ucf_detections', self.model_name, 'detections_' + str(epoch)) |
| if not os.path.exists('results/ucf_detections/'): |
| os.mkdir('results/ucf_detections/') |
| if not os.path.exists('results/ucf_detections/'+self.model_name): |
| os.mkdir('results/ucf_detections/'+self.model_name) |
| if not os.path.exists(current_dir): |
| os.mkdir(current_dir) |
| else: |
| detection_path = os.path.join('results', 'jhmdb_detections', self.model_name, 'detections_' + str(epoch), frame_id) |
| current_dir = os.path.join('results', 'jhmdb_detections', self.model_name, 'detections_' + str(epoch)) |
| if not os.path.exists('results/jhmdb_detections/'): |
| os.mkdir('results/jhmdb_detections/') |
| if not os.path.exists('results/jhmdb_detections/'+self.model_name): |
| os.mkdir('results/jhmdb_detections/'+self.model_name) |
| if not os.path.exists(current_dir): |
| os.mkdir(current_dir) |
|
|
| with open(detection_path, 'w+') as f_detect: |
| for score, label, bbox in zip(scores, labels, bboxes): |
| x1 = round(bbox[0]) |
| y1 = round(bbox[1]) |
| x2 = round(bbox[2]) |
| y2 = round(bbox[3]) |
| cls_id = int(label) + 1 |
|
|
| f_detect.write( |
| str(cls_id) + ' ' + str(score) + ' ' \ |
| + str(x1) + ' ' + str(y1) + ' ' + str(x2) + ' ' + str(y2) + '\n') |
|
|
| if iter_i % 100 == 0: |
| log_info = "[%d / %d]" % (iter_i, epoch_size) |
| print(log_info, flush=True) |
|
|
| print('calculating Frame mAP ...') |
| metric_list = evaluate_frameAP(self.gt_folder, current_dir, self.iou_thresh, |
| self.save_path, self.dataset, show_pr_curve) |
| for metric in metric_list: |
| print(metric) |
|
|
|
|
| def evaluate_video_map(self, model): |
| print("Metric: Video mAP") |
| video_testlist = [] |
| with open(self.testlist, 'r') as file: |
| lines = file.readlines() |
| for line in lines: |
| line = line.rstrip() |
| video_testlist.append(line) |
|
|
| detected_boxes = {} |
| gt_videos = {} |
|
|
| gt_data = loadmat(self.gt_file)['annot'] |
| n_videos = gt_data.shape[1] |
| print('loading gt tubes ...') |
| for i in range(n_videos): |
| video_name = gt_data[0][i][1][0] |
| if video_name in video_testlist: |
| n_tubes = len(gt_data[0][i][2][0]) |
| v_annotation = {} |
| all_gt_boxes = [] |
| for j in range(n_tubes): |
| gt_one_tube = [] |
| tube_start_frame = gt_data[0][i][2][0][j][1][0][0] |
| tube_end_frame = gt_data[0][i][2][0][j][0][0][0] |
| tube_class = gt_data[0][i][2][0][j][2][0][0] |
| tube_data = gt_data[0][i][2][0][j][3] |
| tube_length = tube_end_frame - tube_start_frame + 1 |
| |
| for k in range(tube_length): |
| gt_boxes = [] |
| gt_boxes.append(int(tube_start_frame+k)) |
| gt_boxes.append(float(tube_data[k][0])) |
| gt_boxes.append(float(tube_data[k][1])) |
| gt_boxes.append(float(tube_data[k][0]) + float(tube_data[k][2])) |
| gt_boxes.append(float(tube_data[k][1]) + float(tube_data[k][3])) |
| gt_one_tube.append(gt_boxes) |
| all_gt_boxes.append(gt_one_tube) |
|
|
| v_annotation['gt_classes'] = tube_class |
| v_annotation['tubes'] = np.array(all_gt_boxes) |
| gt_videos[video_name] = v_annotation |
|
|
| |
| print('inference ...') |
| for i, line in enumerate(lines): |
| line = line.rstrip() |
| if i % 50 == 0: |
| print('Video: [%d / %d] - %s' % (i, len(lines), line)) |
| |
| |
| self.testset.set_video_data(line) |
|
|
| |
| self.testloader = torch.utils.data.DataLoader( |
| dataset=self.testset, |
| batch_size=self.batch_size, |
| shuffle=False, |
| collate_fn=self.collate_fn, |
| num_workers=4, |
| drop_last=False, |
| pin_memory=True |
| ) |
| |
| for iter_i, (batch_img_name, batch_video_clip, batch_target) in enumerate(self.testloader): |
| |
| batch_video_clip = batch_video_clip.to(model.device) |
|
|
| with torch.no_grad(): |
| |
| batch_scores, batch_labels, batch_bboxes = model(batch_video_clip) |
|
|
| |
| for bi in range(len(batch_scores)): |
| img_name = batch_img_name[bi] |
| scores = batch_scores[bi] |
| labels = batch_labels[bi] |
| bboxes = batch_bboxes[bi] |
| target = batch_target[bi] |
|
|
| |
| orig_size = target['orig_size'] |
| bboxes = rescale_bboxes(bboxes, orig_size) |
|
|
| img_annotation = {} |
| for cls_idx in range(self.num_classes): |
| inds = np.where(labels == cls_idx)[0] |
| c_bboxes = bboxes[inds] |
| c_scores = scores[inds] |
| |
| boxes = np.concatenate([c_bboxes, c_scores[..., None]], axis=-1) |
| img_annotation[cls_idx+1] = boxes |
| detected_boxes[img_name] = img_annotation |
|
|
| |
| del self.testloader |
|
|
| iou_list = [0.05, 0.1, 0.2, 0.3, 0.5, 0.75] |
| print('calculating video mAP ...') |
| for iou_th in iou_list: |
| per_ap = evaluate_videoAP(gt_videos, detected_boxes, self.num_classes, iou_th, True) |
| video_mAP = sum(per_ap) / len(per_ap) |
| print('-------------------------------') |
| print('V-mAP @ {} IoU:'.format(iou_th)) |
| print('--Per AP: ', per_ap) |
| print('--mAP: ', round(video_mAP, 2)) |
|
|
|
|
| if __name__ == "__main__": |
| pass |