|
|
import argparse |
|
|
import copy |
|
|
import json |
|
|
import os |
|
|
import random |
|
|
import time |
|
|
from typing import Any, Dict, List, Tuple |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
from matplotlib import pyplot as plt |
|
|
from nuscenes.eval.common.data_classes import EvalBox, EvalBoxes |
|
|
from nuscenes.eval.common.render import setup_axis |
|
|
from nuscenes.eval.detection.algo import accumulate, calc_ap, calc_tp |
|
|
from nuscenes.eval.detection.constants import (DETECTION_COLORS, |
|
|
DETECTION_NAMES, |
|
|
PRETTY_DETECTION_NAMES, |
|
|
PRETTY_TP_METRICS, TP_METRICS, |
|
|
TP_METRICS_UNITS) |
|
|
from nuscenes.eval.detection.data_classes import (DetectionBox, |
|
|
DetectionConfig, |
|
|
DetectionMetricData, |
|
|
DetectionMetricDataList, |
|
|
DetectionMetrics) |
|
|
from nuscenes.eval.detection.evaluate import NuScenesEval |
|
|
from nuscenes.eval.detection.render import (class_pr_curve, dist_pr_curve, |
|
|
summary_plot, visualize_sample) |
|
|
|
|
|
from pyquaternion import Quaternion |
|
|
|
|
|
|
|
|
Axis = Any |
|
|
|
|
|
class CustomDetectionBox(DetectionBox): |
|
|
""" Data class used during detection evaluation. Can be a prediction or ground truth.""" |
|
|
|
|
|
def __init__(self, |
|
|
sample_token: str = "", |
|
|
translation: Tuple[float, float, float] = (0, 0, 0), |
|
|
size: Tuple[float, float, float] = (0, 0, 0), |
|
|
rotation: Tuple[float, float, float, float] = (0, 0, 0, 0), |
|
|
velocity: Tuple[float, float] = (0, 0), |
|
|
ego_translation: Tuple[float, float, float] = (0, 0, 0), |
|
|
num_pts: int = -1, |
|
|
detection_name: str = 'car', |
|
|
detection_score: float = -1.0, |
|
|
attribute_name: str = ''): |
|
|
|
|
|
super(DetectionBox, self).__init__(sample_token, translation, size, rotation, velocity, ego_translation, num_pts) |
|
|
|
|
|
assert type(detection_score) == float, 'Error: detection_score must be a float!' |
|
|
assert not np.any(np.isnan(detection_score)), 'Error: detection_score may not be NaN!' |
|
|
|
|
|
|
|
|
self.detection_name = detection_name |
|
|
self.detection_score = detection_score |
|
|
self.attribute_name = attribute_name |
|
|
|
|
|
|
|
|
class CustomDetectionConfig(DetectionConfig): |
|
|
|
|
|
def __init__(self, |
|
|
class_range: Dict[str, int], |
|
|
dist_fcn: str, |
|
|
dist_ths: List[float], |
|
|
dist_th_tp: float, |
|
|
min_recall: float, |
|
|
min_precision: float, |
|
|
max_boxes_per_sample: int, |
|
|
mean_ap_weight: int): |
|
|
|
|
|
|
|
|
assert dist_th_tp in dist_ths, "dist_th_tp must be in set of dist_ths." |
|
|
|
|
|
self.class_range = class_range |
|
|
self.dist_fcn = dist_fcn |
|
|
self.dist_ths = dist_ths |
|
|
self.dist_th_tp = dist_th_tp |
|
|
self.min_recall = min_recall |
|
|
self.min_precision = min_precision |
|
|
self.max_boxes_per_sample = max_boxes_per_sample |
|
|
self.mean_ap_weight = mean_ap_weight |
|
|
|
|
|
self.class_names = list(self.class_range.keys()) |
|
|
|
|
|
def class_tp_curve(md_list: DetectionMetricDataList, |
|
|
metrics: DetectionMetrics, |
|
|
detection_name: str, |
|
|
min_recall: float, |
|
|
dist_th_tp: float, |
|
|
savepath: str = None, |
|
|
ax: Axis = None) -> None: |
|
|
""" |
|
|
Plot the true positive curve for the specified class. |
|
|
:param md_list: DetectionMetricDataList instance. |
|
|
:param metrics: DetectionMetrics instance. |
|
|
:param detection_name: |
|
|
:param min_recall: Minimum recall value. |
|
|
:param dist_th_tp: The distance threshold used to determine matches. |
|
|
:param savepath: If given, saves the the rendering here instead of displaying. |
|
|
:param ax: Axes onto which to render. |
|
|
""" |
|
|
|
|
|
|
|
|
md = md_list[(detection_name, dist_th_tp)] |
|
|
min_recall_ind = round(100 * min_recall) |
|
|
if min_recall_ind <= md.max_recall_ind: |
|
|
|
|
|
rel_metrics = [m for m in TP_METRICS if not np.isnan(metrics.get_label_tp(detection_name, m))] |
|
|
ylimit = max([max(getattr(md, metric)[min_recall_ind:md.max_recall_ind + 1]) for metric in rel_metrics]) * 1.1 |
|
|
else: |
|
|
ylimit = 1.0 |
|
|
|
|
|
|
|
|
if ax is None: |
|
|
ax = setup_axis(title=PRETTY_DETECTION_NAMES[detection_name], xlabel='Recall', ylabel='Error', xlim=1, |
|
|
min_recall=min_recall) |
|
|
ax.set_ylim(0, ylimit) |
|
|
|
|
|
|
|
|
for metric in TP_METRICS: |
|
|
tp = metrics.get_label_tp(detection_name, metric) |
|
|
|
|
|
|
|
|
if tp is not np.nan and min_recall_ind <= md.max_recall_ind: |
|
|
recall, error = md.recall[:md.max_recall_ind + 1], getattr(md, metric)[:md.max_recall_ind + 1] |
|
|
else: |
|
|
recall, error = [], [] |
|
|
|
|
|
|
|
|
if tp is np.nan: |
|
|
label = '{}: n/a'.format(PRETTY_TP_METRICS[metric]) |
|
|
elif min_recall_ind > md.max_recall_ind: |
|
|
label = '{}: nan'.format(PRETTY_TP_METRICS[metric]) |
|
|
else: |
|
|
label = '{}: {:.2f} ({})'.format(PRETTY_TP_METRICS[metric], tp, TP_METRICS_UNITS[metric]) |
|
|
if metric == 'trans_err': |
|
|
label += f' ({md.max_recall_ind})' |
|
|
print(f'Recall: {detection_name}: {md.max_recall_ind/100}') |
|
|
ax.plot(recall, error, label=label) |
|
|
ax.axvline(x=md.max_recall, linestyle='-.', color=(0, 0, 0, 0.3)) |
|
|
ax.legend(loc='best') |
|
|
|
|
|
if savepath is not None: |
|
|
plt.savefig(savepath) |
|
|
plt.close() |
|
|
|
|
|
|
|
|
class NuScenesEval_NuPlan(NuScenesEval): |
|
|
""" |
|
|
Dummy class for backward-compatibility. Same as DetectionEval. |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
gt_boxes, |
|
|
result_boxes, |
|
|
config: DetectionConfig, |
|
|
output_dir: str = None, |
|
|
verbose: bool = True, |
|
|
): |
|
|
|
|
|
self.output_dir = output_dir |
|
|
self.verbose = verbose |
|
|
self.cfg = config |
|
|
|
|
|
|
|
|
self.plot_dir = os.path.join(self.output_dir, 'plots') |
|
|
if not os.path.isdir(self.output_dir): |
|
|
os.makedirs(self.output_dir) |
|
|
if not os.path.isdir(self.plot_dir): |
|
|
os.makedirs(self.plot_dir) |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print('Initializing nuScenes detection evaluation') |
|
|
self.pred_boxes = result_boxes |
|
|
self.gt_boxes = gt_boxes |
|
|
|
|
|
assert set(self.pred_boxes.sample_tokens) == set(self.gt_boxes.sample_tokens), \ |
|
|
"Samples in split doesn't match samples in predictions." |
|
|
|
|
|
self.all_gt = copy.deepcopy(self.gt_boxes) |
|
|
self.all_preds = copy.deepcopy(self.pred_boxes) |
|
|
self.sample_tokens = self.gt_boxes.sample_tokens |
|
|
|
|
|
def evaluate(self) -> Tuple[DetectionMetrics, DetectionMetricDataList]: |
|
|
""" |
|
|
Performs the actual evaluation. |
|
|
:return: A tuple of high-level and the raw metric data. |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.verbose: |
|
|
print('Accumulating metric data...') |
|
|
metric_data_list = DetectionMetricDataList() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for class_name in self.cfg.class_names: |
|
|
for dist_th in self.cfg.dist_ths: |
|
|
md = accumulate(self.gt_boxes, self.pred_boxes, class_name, self.cfg.dist_fcn_callable, dist_th) |
|
|
metric_data_list.set(class_name, dist_th, md) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.verbose: |
|
|
print('Calculating metrics...') |
|
|
metrics = DetectionMetrics(self.cfg) |
|
|
for class_name in self.cfg.class_names: |
|
|
|
|
|
for dist_th in self.cfg.dist_ths: |
|
|
metric_data = metric_data_list[(class_name, dist_th)] |
|
|
ap = calc_ap(metric_data, self.cfg.min_recall, self.cfg.min_precision) |
|
|
metrics.add_label_ap(class_name, dist_th, ap) |
|
|
|
|
|
for metric_name in TP_METRICS: |
|
|
metric_data = metric_data_list[(class_name, self.cfg.dist_th_tp)] |
|
|
if class_name in ['traffic_cone'] and metric_name in ['attr_err', 'vel_err', 'orient_err']: |
|
|
tp = np.nan |
|
|
elif class_name in ['barrier', 'czone_sign'] and metric_name in ['attr_err', 'vel_err']: |
|
|
tp = np.nan |
|
|
else: |
|
|
tp = calc_tp(metric_data, self.cfg.min_recall, metric_name) |
|
|
metrics.add_label_tp(class_name, metric_name, tp) |
|
|
|
|
|
|
|
|
metrics.add_runtime(time.time() - start_time) |
|
|
|
|
|
return metrics, metric_data_list |
|
|
|
|
|
def render(self, metrics: DetectionMetrics, md_list: DetectionMetricDataList) -> None: |
|
|
""" |
|
|
Renders various PR and TP curves. |
|
|
:param metrics: DetectionMetrics instance. |
|
|
:param md_list: DetectionMetricDataList instance. |
|
|
""" |
|
|
if self.verbose: |
|
|
print('Rendering PR and TP curves') |
|
|
|
|
|
def savepath(name): |
|
|
return os.path.join(self.plot_dir, name + '.pdf') |
|
|
|
|
|
summary_plot(md_list, metrics, min_precision=self.cfg.min_precision, min_recall=self.cfg.min_recall, |
|
|
dist_th_tp=self.cfg.dist_th_tp, savepath=savepath('summary')) |
|
|
|
|
|
for detection_name in self.cfg.class_names: |
|
|
class_pr_curve(md_list, metrics, detection_name, self.cfg.min_precision, self.cfg.min_recall, |
|
|
savepath=savepath(detection_name + '_pr')) |
|
|
|
|
|
class_tp_curve(md_list, metrics, detection_name, self.cfg.min_recall, self.cfg.dist_th_tp, |
|
|
savepath=savepath(detection_name + '_tp')) |
|
|
|
|
|
for dist_th in self.cfg.dist_ths: |
|
|
dist_pr_curve(md_list, metrics, dist_th, self.cfg.min_precision, self.cfg.min_recall, |
|
|
savepath=savepath('dist_pr_' + str(dist_th))) |
|
|
|
|
|
def main(self, |
|
|
plot_examples: int = 0, |
|
|
render_curves: bool = True) -> Dict[str, Any]: |
|
|
""" |
|
|
Main function that loads the evaluation code, visualizes samples, runs the evaluation and renders stat plots. |
|
|
:param plot_examples: How many example visualizations to write to disk. |
|
|
:param render_curves: Whether to render PR and TP curves to disk. |
|
|
:return: A dict that stores the high-level metrics and meta data. |
|
|
""" |
|
|
if plot_examples > 0: |
|
|
|
|
|
random.seed(42) |
|
|
sample_tokens = list(self.sample_tokens) |
|
|
random.shuffle(sample_tokens) |
|
|
sample_tokens = sample_tokens[:plot_examples] |
|
|
|
|
|
|
|
|
metrics, metric_data_list = self.evaluate() |
|
|
|
|
|
|
|
|
if render_curves: |
|
|
self.render(metrics, metric_data_list) |
|
|
|
|
|
|
|
|
if self.verbose: |
|
|
print('Saving metrics to: %s' % self.output_dir) |
|
|
metrics_summary = metrics.serialize() |
|
|
metrics_summary['meta'] = {} |
|
|
with open(os.path.join(self.output_dir, 'metrics_summary.json'), 'w') as f: |
|
|
json.dump(metrics_summary, f, indent=2) |
|
|
with open(os.path.join(self.output_dir, 'metrics_details.json'), 'w') as f: |
|
|
json.dump(metric_data_list.serialize(), f, indent=2) |
|
|
|
|
|
|
|
|
print('mAP: %.4f' % (metrics_summary['mean_ap'])) |
|
|
err_name_mapping = { |
|
|
'trans_err': 'mATE', |
|
|
'scale_err': 'mASE', |
|
|
'orient_err': 'mAOE', |
|
|
'vel_err': 'mAVE', |
|
|
'attr_err': 'mAAE' |
|
|
} |
|
|
for tp_name, tp_val in metrics_summary['tp_errors'].items(): |
|
|
print('%s: %.4f' % (err_name_mapping[tp_name], tp_val)) |
|
|
print('NDS: %.4f' % (metrics_summary['nd_score'])) |
|
|
print('Eval time: %.1fs' % metrics_summary['eval_time']) |
|
|
|
|
|
|
|
|
print() |
|
|
print('Per-class results:') |
|
|
print('Object Class\tAP\tATE\tASE\tAOE\tAVE\tAAE') |
|
|
class_aps = metrics_summary['mean_dist_aps'] |
|
|
class_tps = metrics_summary['label_tp_errors'] |
|
|
for class_name in class_aps.keys(): |
|
|
print('%s\t%.3f\t%.3f\t%.3f\t%.3f\t%.3f\t%.3f' |
|
|
% (class_name, class_aps[class_name], |
|
|
class_tps[class_name]['trans_err'], |
|
|
class_tps[class_name]['scale_err'], |
|
|
class_tps[class_name]['orient_err'], |
|
|
class_tps[class_name]['vel_err'], |
|
|
class_tps[class_name]['attr_err'])) |
|
|
|
|
|
return metrics_summary |
|
|
|