| |
| |
| |
|
|
| import json |
| import numpy as np |
| import pandas as pd |
| import os |
| from joblib import Parallel, delayed |
|
|
| from libs.utils.Evaluation.utils import get_blocked_videos |
| from libs.utils.Evaluation.utils import interpolated_prec_rec |
| from libs.utils.Evaluation.utils import segment_iou |
|
|
| import warnings |
|
|
| warnings.filterwarnings("ignore", message="numpy.dtype size changed") |
| warnings.filterwarnings("ignore", message="numpy.ufunc size changed") |
|
|
|
|
| class ANETdetection(object): |
| GROUND_TRUTH_FIELDS = ['database'] |
| PREDICTION_FIELDS = ['results', 'version', 'external_data'] |
|
|
| def __init__(self, ground_truth_filename=None, prediction_filename=None, |
| ground_truth_fields=GROUND_TRUTH_FIELDS, |
| prediction_fields=PREDICTION_FIELDS, |
| dataset_name='', |
| tiou_thresholds=np.linspace(0.5, 0.95, 10), |
| subset='validation', verbose=False, |
| check_status=False): |
| if not ground_truth_filename: |
| raise IOError('Please input a valid ground truth file.') |
| if not prediction_filename: |
| raise IOError('Please input a valid prediction file.') |
| self.subset = subset |
| self.tiou_thresholds = tiou_thresholds |
| self.verbose = verbose |
| self.gt_fields = ground_truth_fields |
| self.pred_fields = prediction_fields |
| self.ap = None |
| self.check_status = check_status |
| self.dataset_name = dataset_name |
| |
|
|
| if self.check_status: |
| self.blocked_videos = get_blocked_videos() |
| else: |
| self.blocked_videos = list() |
|
|
| |
| self.ground_truth, self.activity_index = self._import_ground_truth( |
| ground_truth_filename) |
| self.prediction = self._import_prediction(prediction_filename) |
|
|
| if self.verbose: |
| print('[INIT] Loaded annotations from {} subset.'.format(subset)) |
| nr_gt = len(self.ground_truth) |
| print('\tNumber of ground truth instances: {}'.format(nr_gt)) |
| nr_pred = len(self.prediction) |
| print('\tNumber of predictions: {}'.format(nr_pred)) |
| print('\tFixed threshold for tiou score: {}'.format(self.tiou_thresholds)) |
|
|
| def _import_ground_truth(self, ground_truth_filename): |
| """Reads ground truth file, checks if it is well formatted, and returns |
| the ground truth instances and the activity classes. |
| |
| Parameters |
| ---------- |
| ground_truth_filename : str |
| Full path to the ground truth json file. |
| |
| Outputs |
| ------- |
| ground_truth : df |
| Data frame containing the ground truth instances. |
| activity_index : dict |
| Dictionary containing class index. |
| """ |
| with open(ground_truth_filename, 'r') as fobj: |
| data = json.load(fobj) |
| |
| |
| |
|
|
| |
| activity_index, cidx = {'Fake': 0}, 0 |
| video_lst, t_start_lst, t_end_lst, label_lst = [], [], [], [] |
| for v in data: |
| if isinstance(v, str): |
| v = data[v] |
| videoid = os.path.basename(v['file']).replace('.mp4','') if v['file'].endswith('.mp4') else os.path.basename(v['file']).replace('.wav','') |
| |
| if self.subset != v['split']: |
| continue |
| if videoid in self.blocked_videos: |
| continue |
| if v['n_fakes']==0: |
| continue |
| for ann in v['fake_periods']: |
| |
| |
| |
| video_lst.append(videoid) |
| t_start_lst.append(float(ann[0])) |
| t_end_lst.append(float(ann[1])) |
| label_lst.append(0) |
|
|
| ground_truth = pd.DataFrame({'video-id': video_lst, |
| 't-start': t_start_lst, |
| 't-end': t_end_lst, |
| 'label': label_lst}) |
| if self.verbose: |
| print(activity_index) |
| return ground_truth, activity_index |
|
|
| def _import_prediction(self, prediction_filename): |
| """Reads prediction file, checks if it is well formatted, and returns |
| the prediction instances. |
| |
| Parameters |
| ---------- |
| prediction_filename : str |
| Full path to the prediction json file. |
| |
| Outputs |
| ------- |
| prediction : df |
| Data frame containing the prediction instances. |
| """ |
| with open(prediction_filename, 'r') as fobj: |
| data = json.load(fobj) |
| |
| if not all([field in data.keys() for field in self.pred_fields]): |
| raise IOError('Please input a valid prediction file.') |
|
|
| |
| video_lst, t_start_lst, t_end_lst = [], [], [] |
| label_lst, score_lst = [], [] |
| for videoid, v in data['results'].items(): |
| if videoid in self.blocked_videos: |
| continue |
| for result in v: |
| label = self.activity_index[result['label']] |
| video_lst.append(videoid) |
| t_start_lst.append(float(result['segment'][0])) |
| t_end_lst.append(float(result['segment'][1])) |
| label_lst.append(label) |
| score_lst.append(result['score']) |
| prediction = pd.DataFrame({'video-id': video_lst, |
| 't-start': t_start_lst, |
| 't-end': t_end_lst, |
| 'label': label_lst, |
| 'score': score_lst}) |
| return prediction |
|
|
| def _get_predictions_with_label(self, prediction_by_label, label_name, cidx): |
| """Get all predicitons of the given label. Return empty DataFrame if there |
| is no predcitions with the given label. |
| """ |
| try: |
| return prediction_by_label.get_group(cidx).reset_index(drop=True) |
| except: |
| if self.verbose: |
| print('Warning: No predictions of label \'%s\' were provdied.' % label_name) |
| return pd.DataFrame() |
|
|
| def wrapper_compute_average_precision(self): |
| """Computes average precision for each class in the subset. |
| """ |
| ap = np.zeros((len(self.tiou_thresholds), len(self.activity_index))) |
|
|
| |
| ground_truth_by_label = self.ground_truth.groupby('label') |
| prediction_by_label = self.prediction.groupby('label') |
|
|
| results = Parallel(n_jobs=len(self.activity_index))( |
| delayed(compute_average_precision_detection)( |
| ground_truth=ground_truth_by_label.get_group(cidx).reset_index(drop=True), |
| prediction=self._get_predictions_with_label(prediction_by_label, label_name, cidx), |
| tiou_thresholds=self.tiou_thresholds, |
| ) for label_name, cidx in self.activity_index.items()) |
|
|
| for i, cidx in enumerate(self.activity_index.values()): |
| ap[:, cidx] = results[i] |
|
|
| return ap |
|
|
| def evaluate(self): |
| """Evaluates a prediction file. For the detection task we measure the |
| interpolated mean average precision to measure the performance of a |
| method. |
| """ |
| self.ap = self.wrapper_compute_average_precision() |
|
|
| self.mAP = self.ap.mean(axis=1) |
| self.average_mAP = self.mAP.mean() |
|
|
| if self.verbose: |
| print(f'[RESULTS] Performance on {self.dataset_name} detection task.') |
| print('Average-mAP: {}'.format(self.average_mAP)) |
|
|
| return self.mAP, self.average_mAP |
|
|
|
|
| def compute_average_precision_detection(ground_truth, prediction, tiou_thresholds=np.linspace(0.5, 0.95, 10)): |
| """Compute average precision (detection task) between ground truth and |
| predictions data frames. If multiple predictions occurs for the same |
| predicted segment, only the one with highest score is matches as |
| true positive. This code is greatly inspired by Pascal VOC devkit. |
| |
| Parameters |
| ---------- |
| ground_truth : df |
| Data frame containing the ground truth instances. |
| Required fields: ['video-id', 't-start', 't-end'] |
| prediction : df |
| Data frame containing the prediction instances. |
| Required fields: ['video-id, 't-start', 't-end', 'score'] |
| tiou_thresholds : 1darray, optional |
| Temporal intersection over union threshold. |
| |
| Outputs |
| ------- |
| ap : float |
| Average precision score. |
| """ |
| ap = np.zeros(len(tiou_thresholds)) |
| if prediction.empty: |
| return ap |
|
|
| npos = float(len(ground_truth)) |
| lock_gt = np.ones((len(tiou_thresholds), len(ground_truth))) * -1 |
| |
| sort_idx = prediction['score'].values.argsort()[::-1] |
| prediction = prediction.loc[sort_idx].reset_index(drop=True) |
|
|
| |
| tp = np.zeros((len(tiou_thresholds), len(prediction))) |
| fp = np.zeros((len(tiou_thresholds), len(prediction))) |
|
|
| |
| ground_truth_gbvn = ground_truth.groupby('video-id') |
|
|
| |
| for idx, this_pred in prediction.iterrows(): |
|
|
| try: |
| |
| ground_truth_videoid = ground_truth_gbvn.get_group(this_pred['video-id']) |
| except Exception as e: |
| fp[:, idx] = 1 |
| continue |
|
|
| this_gt = ground_truth_videoid.reset_index() |
| tiou_arr = segment_iou(this_pred[['t-start', 't-end']].values, |
| this_gt[['t-start', 't-end']].values) |
| |
| tiou_sorted_idx = tiou_arr.argsort()[::-1] |
| for tidx, tiou_thr in enumerate(tiou_thresholds): |
| for jdx in tiou_sorted_idx: |
| if tiou_arr[jdx] < tiou_thr: |
| fp[tidx, idx] = 1 |
| break |
| if lock_gt[tidx, this_gt.loc[jdx]['index']] >= 0: |
| continue |
| |
| tp[tidx, idx] = 1 |
| lock_gt[tidx, this_gt.loc[jdx]['index']] = idx |
| break |
|
|
| if fp[tidx, idx] == 0 and tp[tidx, idx] == 0: |
| fp[tidx, idx] = 1 |
|
|
| tp_cumsum = np.cumsum(tp, axis=1).astype(np.float32) |
| fp_cumsum = np.cumsum(fp, axis=1).astype(np.float32) |
| recall_cumsum = tp_cumsum / npos |
|
|
| precision_cumsum = tp_cumsum / (tp_cumsum + fp_cumsum) |
|
|
| for tidx in range(len(tiou_thresholds)): |
| ap[tidx] = interpolated_prec_rec(precision_cumsum[tidx, :], recall_cumsum[tidx, :]) |
|
|
| return ap |
|
|