forensics-grpo / code /libs /utils /metrics.py
sdzt's picture
Add source code
33569f9 verified
Raw
History Blame Contribute Delete
12.4 kB
# Modified from official EPIC-Kitchens action detection evaluation code
# see https://github.com/epic-kitchens/C2-Action-Detection/blob/master/EvaluationCode/evaluate_detection_json_ek100.py
import os
import json
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from typing import List
from typing import Tuple
from typing import Dict
def remove_duplicate_annotations(ants, tol=1e-3):
# remove duplicate annotations (same category and starting/ending time)
valid_events = []
for event in ants:
s, e, l = event['segment'][0], event['segment'][1], event['label_id']
valid = True
for p_event in valid_events:
if ((abs(s-p_event['segment'][0]) <= tol)
and (abs(e-p_event['segment'][1]) <= tol)
and (l == p_event['label_id'])
):
valid = False
break
if valid:
valid_events.append(event)
return valid_events
def load_gt_seg_from_json(json_file, split=None, label='label_id', label_offset=0):
# load json file
with open(json_file, "r", encoding="utf8") as f:
json_db = json.load(f)
json_db = json_db['database']
vids, starts, stops, labels = [], [], [], []
for k, v in json_db.items():
# filter based on split
if (split is not None) and v['subset'].lower() != split:
continue
# remove duplicated instances
ants = remove_duplicate_annotations(v['annotations'])
# video id
vids += [k] * len(ants)
# for each event, grab the start/end time and label
for event in ants:
starts += [float(event['segment'][0])]
stops += [float(event['segment'][1])]
if isinstance(event[label], (Tuple, List)):
# offset the labels by label_offset
label_id = 0
for i, x in enumerate(event[label][::-1]):
label_id += label_offset**i + int(x)
else:
# load label_id directly
label_id = int(event[label])
labels += [label_id]
# move to pd dataframe
gt_base = pd.DataFrame({
'video-id' : vids,
't-start' : starts,
't-end': stops,
'label': labels
})
return gt_base
def load_pred_seg_from_json(json_file, label='label_id', label_offset=0):
# load json file
with open(json_file, "r", encoding="utf8") as f:
json_db = json.load(f)
json_db = json_db['database']
vids, starts, stops, labels, scores = [], [], [], [], []
for k, v, in json_db.items():
# video id
vids += [k] * len(v)
# for each event
for event in v:
starts += [float(event['segment'][0])]
stops += [float(event['segment'][1])]
if isinstance(event[label], (Tuple, List)):
# offset the labels by label_offset
label_id = 0
for i, x in enumerate(event[label][::-1]):
label_id += label_offset**i + int(x)
else:
# load label_id directly
label_id = int(event[label])
labels += [label_id]
scores += [float(event['scores'])]
# move to pd dataframe
pred_base = pd.DataFrame({
'video-id' : vids,
't-start' : starts,
't-end': stops,
'label': labels,
'score': scores
})
return pred_base
class ANETdetection(object):
def __init__(
self,
ant_file,
split=None,
tiou_thresholds=np.linspace(0.1, 0.5, 5),
label='label_id',
label_offset=0,
num_workers=8,
dataset_name=None,
):
self.tiou_thresholds = tiou_thresholds
self.ap = None
self.num_workers = num_workers
if dataset_name is not None:
self.dataset_name = dataset_name
else:
self.dataset_name = os.path.basename(ant_file).replace('.json', '')
# Import ground truth and predictions
self.split = split
self.ground_truth = load_gt_seg_from_json(
ant_file, split=self.split, label=label, label_offset=label_offset)
# remove labels that does not exists in gt
self.activity_index = {j: i for i, j in enumerate(sorted(self.ground_truth['label'].unique()))}
self.ground_truth['label']=self.ground_truth['label'].replace(self.activity_index)
def _get_predictions_with_label(self, prediction_by_label, label_name, cidx):
"""Get all predicitons of the given label. Return empty DataFrame if there
is no predcitions with the given label.
"""
try:
res = prediction_by_label.get_group(cidx).reset_index(drop=True)
return res
except:
print('Warning: No predictions of label \'%s\' were provdied.' % label_name)
return pd.DataFrame()
def wrapper_compute_average_precision(self, preds):
"""Computes average precision for each class in the subset.
"""
ap = np.zeros((len(self.tiou_thresholds), len(self.activity_index)))
# Adaptation to query faster
ground_truth_by_label = self.ground_truth.groupby('label')
prediction_by_label = preds.groupby('label')
results = Parallel(n_jobs=self.num_workers)(
delayed(compute_average_precision_detection)(
ground_truth=ground_truth_by_label.get_group(cidx).reset_index(drop=True),
prediction=self._get_predictions_with_label(prediction_by_label, label_name, cidx),
tiou_thresholds=self.tiou_thresholds,
) for label_name, cidx in self.activity_index.items())
for i, cidx in enumerate(self.activity_index.values()):
ap[:,cidx] = results[i]
return ap
def evaluate(self, preds, verbose=True):
"""Evaluates a prediction file. For the detection task we measure the
interpolated mean average precision to measure the performance of a
method.
preds can be (1) a pd.DataFrame; or (2) a json file where the data will be loaded;
or (3) a python dict item with numpy arrays as the values
"""
if isinstance(preds, pd.DataFrame):
assert 'label' in preds
elif isinstance(preds, str) and os.path.isfile(preds):
preds = load_pred_seg_from_json(preds)
elif isinstance(preds, Dict):
# move to pd dataframe
# did not check dtype here, can accept both numpy / pytorch tensors
preds = pd.DataFrame({
'video-id' : preds['video-id'],
't-start' : preds['t-start'].tolist(),
't-end': preds['t-end'].tolist(),
'label': preds['label'].tolist(),
'score': preds['score'].tolist()
})
# always reset ap
self.ap = None
# make the label ids consistent
preds['label'] = preds['label'].replace(self.activity_index)
# compute mAP
self.ap = self.wrapper_compute_average_precision(preds)
mAP = self.ap.mean(axis=1)
average_mAP = mAP.mean()
# print results
if verbose:
# print the results
print('[RESULTS] Action detection results on {:s}.'.format(
self.dataset_name)
)
block = ''
for tiou, tiou_mAP in zip(self.tiou_thresholds, mAP):
block += '\n|tIoU = {:.2f}: mAP = {:.2f} (%)'.format(tiou, tiou_mAP*100)
print(block)
print('Avearge mAP: {:.2f} (%)'.format(average_mAP*100))
# return the results
return mAP, average_mAP
def compute_average_precision_detection(
ground_truth,
prediction,
tiou_thresholds=np.linspace(0.1, 0.5, 5)
):
"""Compute average precision (detection task) between ground truth and
predictions data frames. If multiple predictions occurs for the same
predicted segment, only the one with highest score is matches as
true positive. This code is greatly inspired by Pascal VOC devkit.
Parameters
----------
ground_truth : df
Data frame containing the ground truth instances.
Required fields: ['video-id', 't-start', 't-end']
prediction : df
Data frame containing the prediction instances.
Required fields: ['video-id, 't-start', 't-end', 'score']
tiou_thresholds : 1darray, optional
Temporal intersection over union threshold.
Outputs
-------
ap : float
Average precision score.
"""
ap = np.zeros(len(tiou_thresholds))
if prediction.empty:
return ap
npos = float(len(ground_truth))
lock_gt = np.ones((len(tiou_thresholds),len(ground_truth))) * -1
# Sort predictions by decreasing score order.
sort_idx = prediction['score'].values.argsort()[::-1]
prediction = prediction.loc[sort_idx].reset_index(drop=True)
# Initialize true positive and false positive vectors.
tp = np.zeros((len(tiou_thresholds), len(prediction)))
fp = np.zeros((len(tiou_thresholds), len(prediction)))
# Adaptation to query faster
ground_truth_gbvn = ground_truth.groupby('video-id')
# Assigning true positive to truly ground truth instances.
for idx, this_pred in prediction.iterrows():
try:
# Check if there is at least one ground truth in the video associated.
ground_truth_videoid = ground_truth_gbvn.get_group(this_pred['video-id'])
except Exception as e:
fp[:, idx] = 1
continue
this_gt = ground_truth_videoid.reset_index()
tiou_arr = segment_iou(this_pred[['t-start', 't-end']].values,
this_gt[['t-start', 't-end']].values)
# We would like to retrieve the predictions with highest tiou score.
tiou_sorted_idx = tiou_arr.argsort()[::-1]
for tidx, tiou_thr in enumerate(tiou_thresholds):
for jdx in tiou_sorted_idx:
if tiou_arr[jdx] < tiou_thr:
fp[tidx, idx] = 1
break
if lock_gt[tidx, this_gt.loc[jdx]['index']] >= 0:
continue
# Assign as true positive after the filters above.
tp[tidx, idx] = 1
lock_gt[tidx, this_gt.loc[jdx]['index']] = idx
break
if fp[tidx, idx] == 0 and tp[tidx, idx] == 0:
fp[tidx, idx] = 1
tp_cumsum = np.cumsum(tp, axis=1).astype(np.float32)
fp_cumsum = np.cumsum(fp, axis=1).astype(np.float32)
recall_cumsum = tp_cumsum / npos
precision_cumsum = tp_cumsum / (tp_cumsum + fp_cumsum)
for tidx in range(len(tiou_thresholds)):
ap[tidx] = interpolated_prec_rec(precision_cumsum[tidx,:], recall_cumsum[tidx,:])
return ap
def segment_iou(target_segment, candidate_segments):
"""Compute the temporal intersection over union between a
target segment and all the test segments.
Parameters
----------
target_segment : 1d array
Temporal target segment containing [starting, ending] times.
candidate_segments : 2d array
Temporal candidate segments containing N x [starting, ending] times.
Outputs
-------
tiou : 1d array
Temporal intersection over union score of the N's candidate segments.
"""
tt1 = np.maximum(target_segment[0], candidate_segments[:, 0])
tt2 = np.minimum(target_segment[1], candidate_segments[:, 1])
# Intersection including Non-negative overlap score.
segments_intersection = (tt2 - tt1).clip(0)
# Segment union.
segments_union = (candidate_segments[:, 1] - candidate_segments[:, 0]) \
+ (target_segment[1] - target_segment[0]) - segments_intersection
# Compute overlap as the ratio of the intersection
# over union of two segments.
tIoU = segments_intersection.astype(float) / segments_union
return tIoU
def interpolated_prec_rec(prec, rec):
"""Interpolated AP - VOCdevkit from VOC 2011.
"""
mprec = np.hstack([[0], prec, [0]])
mrec = np.hstack([[0], rec, [1]])
for i in range(len(mprec) - 1)[::-1]:
mprec[i] = max(mprec[i], mprec[i + 1])
idx = np.where(mrec[1::] != mrec[0:-1])[0] + 1
ap = np.sum((mrec[idx] - mrec[idx - 1]) * mprec[idx])
return ap