unknownuser6666's picture
Upload folder using huggingface_hub
663494c verified
import json
import torch
import tqdm
from typing import List, Dict, Tuple, Callable, Union
from nuscenes import NuScenes
from pyquaternion import Quaternion
import numpy as np
from .metric_utils import min_ade, min_fde, miss_rate
from nuscenes.utils.splits import create_splits_scenes
from nuscenes.eval.detection.utils import category_to_detection_name
from nuscenes.prediction import PredictHelper, convert_local_coords_to_global
from nuscenes.eval.common.data_classes import EvalBox, EvalBoxes
from nuscenes.eval.detection.data_classes import DetectionBox
from nuscenes.eval.detection.data_classes import (
DetectionMetricData,
DetectionMetricDataList,
DetectionMetrics,
)
from nuscenes.eval.common.utils import (
center_distance,
scale_iou,
yaw_diff,
velocity_l2,
attr_acc,
cummean,
)
def category_to_motion_name(category_name: str):
"""
Default label mapping from nuScenes to nuScenes detection classes.
Note that pedestrian does not include personal_mobility, stroller and wheelchair.
:param category_name: Generic nuScenes class.
:return: nuScenes detection class.
"""
detection_mapping = {
"movable_object.barrier": "barrier",
"vehicle.bicycle": "car",
"vehicle.bus.bendy": "car",
"vehicle.bus.rigid": "car",
"vehicle.car": "car",
"vehicle.construction": "car",
"vehicle.motorcycle": "car",
"human.pedestrian.adult": "pedestrian",
"human.pedestrian.child": "pedestrian",
"human.pedestrian.construction_worker": "pedestrian",
"human.pedestrian.police_officer": "pedestrian",
"movable_object.trafficcone": "barrier",
"vehicle.trailer": "car",
"vehicle.truck": "car",
}
if category_name in detection_mapping:
return detection_mapping[category_name]
else:
return None
def detection_prediction_category_to_motion_name(category_name: str):
"""
Default label mapping from nuScenes to nuScenes detection classes.
Note that pedestrian does not include personal_mobility, stroller and wheelchair.
:param category_name: Generic nuScenes class.
:return: nuScenes detection class.
"""
detection_mapping = {
"car": "car",
"truck": "car",
"construction_vehicle": "car",
"bus": "car",
"trailer": "car",
"motorcycle": "car",
"bicycle": "car",
"pedestrian": "pedestrian",
"traffic_cone": "barrier",
"barrier": "barrier",
}
if category_name in detection_mapping:
return detection_mapping[category_name]
else:
return None
class DetectionMotionMetrics(DetectionMetrics):
""" Stores average precision and true positive metric results. Provides properties to summarize. """
@classmethod
def deserialize(cls, content: dict):
""" Initialize from serialized dictionary. """
cfg = DetectionConfig.deserialize(content["cfg"])
metrics = cls(cfg=cfg)
metrics.add_runtime(content["eval_time"])
for detection_name, label_aps in content["label_aps"].items():
for dist_th, ap in label_aps.items():
metrics.add_label_ap(
detection_name=detection_name, dist_th=float(dist_th), ap=float(ap)
)
for detection_name, label_tps in content["label_tp_errors"].items():
for metric_name, tp in label_tps.items():
metrics.add_label_tp(
detection_name=detection_name, metric_name=metric_name, tp=float(tp)
)
return metrics
class DetectionMotionMetricDataList(DetectionMetricDataList):
""" This stores a set of MetricData in a dict indexed by (name, match-distance). """
@classmethod
def deserialize(cls, content: dict):
mdl = cls()
for key, md in content.items():
name, distance = key.split(":")
mdl.set(name, float(distance), DetectionMotionMetricData.deserialize(md))
return mdl
class DetectionMotionMetricData(DetectionMetricData):
""" This class holds accumulated and interpolated data required to calculate the detection metrics. """
nelem = 101
def __init__(
self,
recall: np.array,
precision: np.array,
confidence: np.array,
trans_err: np.array,
vel_err: np.array,
scale_err: np.array,
orient_err: np.array,
attr_err: np.array,
min_ade_err: np.array,
min_fde_err: np.array,
miss_rate_err: np.array,
):
# Assert lengths.
assert len(recall) == self.nelem
assert len(precision) == self.nelem
assert len(confidence) == self.nelem
assert len(trans_err) == self.nelem
assert len(vel_err) == self.nelem
assert len(scale_err) == self.nelem
assert len(orient_err) == self.nelem
assert len(attr_err) == self.nelem
assert len(min_ade_err) == self.nelem
assert len(min_fde_err) == self.nelem
assert len(miss_rate_err) == self.nelem
# Assert ordering.
assert all(
confidence == sorted(confidence, reverse=True)
) # Confidences should be descending.
assert all(recall == sorted(recall)) # Recalls should be ascending.
# Set attributes explicitly to help IDEs figure out what is going on.
self.recall = recall
self.precision = precision
self.confidence = confidence
self.trans_err = trans_err
self.vel_err = vel_err
self.scale_err = scale_err
self.orient_err = orient_err
self.attr_err = attr_err
self.min_ade_err = min_ade_err
self.min_fde_err = min_fde_err
self.miss_rate_err = miss_rate_err
def __eq__(self, other):
eq = True
for key in self.serialize().keys():
eq = eq and np.array_equal(getattr(self, key), getattr(other, key))
return eq
@property
def max_recall_ind(self):
""" Returns index of max recall achieved. """
# Last instance of confidence > 0 is index of max achieved recall.
non_zero = np.nonzero(self.confidence)[0]
if (
len(non_zero) == 0
): # If there are no matches, all the confidence values will be zero.
max_recall_ind = 0
else:
max_recall_ind = non_zero[-1]
return max_recall_ind
@property
def max_recall(self):
""" Returns max recall achieved. """
return self.recall[self.max_recall_ind]
def serialize(self):
""" Serialize instance into json-friendly format. """
return {
"recall": self.recall.tolist(),
"precision": self.precision.tolist(),
"confidence": self.confidence.tolist(),
"trans_err": self.trans_err.tolist(),
"vel_err": self.vel_err.tolist(),
"scale_err": self.scale_err.tolist(),
"orient_err": self.orient_err.tolist(),
"attr_err": self.attr_err.tolist(),
"min_ade_err": self.min_ade_err.tolist(),
"min_fde_err": self.min_fde_err.tolist(),
"miss_rate_err": self.miss_rate_err.tolist(),
}
@classmethod
def deserialize(cls, content: dict):
""" Initialize from serialized content. """
return cls(
recall=np.array(content["recall"]),
precision=np.array(content["precision"]),
confidence=np.array(content["confidence"]),
trans_err=np.array(content["trans_err"]),
vel_err=np.array(content["vel_err"]),
scale_err=np.array(content["scale_err"]),
orient_err=np.array(content["orient_err"]),
attr_err=np.array(content["attr_err"]),
min_ade_err=np.array(content["min_ade_err"]),
min_fde_err=np.array(content["min_fde_err"]),
miss_rate_err=np.array(content["miss_rate_err"]),
)
@classmethod
def no_predictions(cls):
""" Returns a md instance corresponding to having no predictions. """
return cls(
recall=np.linspace(0, 1, cls.nelem),
precision=np.zeros(cls.nelem),
confidence=np.zeros(cls.nelem),
trans_err=np.ones(cls.nelem),
vel_err=np.ones(cls.nelem),
scale_err=np.ones(cls.nelem),
orient_err=np.ones(cls.nelem),
attr_err=np.ones(cls.nelem),
min_ade_err=np.ones(cls.nelem),
min_fde_err=np.ones(cls.nelem),
miss_rate_err=np.ones(cls.nelem),
)
@classmethod
def random_md(cls):
""" Returns an md instance corresponding to a random results. """
return cls(
recall=np.linspace(0, 1, cls.nelem),
precision=np.random.random(cls.nelem),
confidence=np.linspace(0, 1, cls.nelem)[::-1],
trans_err=np.random.random(cls.nelem),
vel_err=np.random.random(cls.nelem),
scale_err=np.random.random(cls.nelem),
orient_err=np.random.random(cls.nelem),
attr_err=np.random.random(cls.nelem),
min_ade_err=np.random.random(cls.nelem),
min_fde_err=np.random.random(cls.nelem),
miss_rate_err=np.random.random(cls.nelem),
)
class DetectionMotionBox(DetectionBox):
def __init__(
self,
sample_token: str = "",
translation: Tuple[float, float, float] = (0, 0, 0),
size: Tuple[float, float, float] = (0, 0, 0),
rotation: Tuple[float, float, float, float] = (0, 0, 0, 0),
velocity: Tuple[float, float] = (0, 0),
ego_translation: [float, float, float] = (
0,
0,
0,
), # Translation to ego vehicle in meters.
num_pts: int = -1, # Nbr. LIDAR or RADAR inside the box. Only for gt boxes.
detection_name: str = "car", # The class name used in the detection challenge.
detection_score: float = -1.0, # GT samples do not have a score.
attribute_name: str = "",
traj=None,
traj_scores=None,
): # Box attribute. Each box can have at most 1 attribute.
super(DetectionBox, self).__init__(
sample_token,
translation,
size,
rotation,
velocity,
ego_translation,
num_pts,
)
assert detection_name is not None, "Error: detection_name cannot be empty!"
# assert detection_name in DETECTION_NAMES, 'Error: Unknown detection_name %s' % detection_name
# assert attribute_name in ATTRIBUTE_NAMES or attribute_name == '', \
# 'Error: Unknown attribute_name %s' % attribute_name
assert type(detection_score) == float, "Error: detection_score must be a float!"
assert not np.any(
np.isnan(detection_score)
), "Error: detection_score may not be NaN!"
# Assign.
self.detection_name = detection_name
self.attribute_name = attribute_name
self.detection_score = detection_score
self.traj = traj
self.traj_scores = traj_scores
self.traj_index = None
def __eq__(self, other):
return (
self.sample_token == other.sample_token
and self.translation == other.translation
and self.size == other.size
and self.rotation == other.rotation
and self.velocity == other.velocity
and self.ego_translation == other.ego_translation
and self.num_pts == other.num_pts
and self.detection_name == other.detection_name
and self.detection_score == other.detection_score
and self.attribute_name == other.attribute_name
and np.all(self.traj == other.traj)
and np.all(self.traj_scores == other.traj_scores)
)
def serialize(self) -> dict:
""" Serialize instance into json-friendly format. """
return {
"sample_token": self.sample_token,
"translation": self.translation,
"size": self.size,
"rotation": self.rotation,
"velocity": self.velocity,
"ego_translation": self.ego_translation,
"num_pts": self.num_pts,
"detection_name": self.detection_name,
"detection_score": self.detection_score,
"attribute_name": self.attribute_name,
"traj": self.traj,
"traj_scores": self.traj_scores,
}
@classmethod
def deserialize(cls, content: dict):
""" Initialize from serialized content. """
return cls(
sample_token=content["sample_token"],
translation=tuple(content["translation"]),
size=tuple(content["size"]),
rotation=tuple(content["rotation"]),
velocity=tuple(content["velocity"]),
ego_translation=(0.0, 0.0, 0.0)
if "ego_translation" not in content
else tuple(content["ego_translation"]),
num_pts=-1 if "num_pts" not in content else int(content["num_pts"]),
detection_name=content["detection_name"],
detection_score=-1.0
if "detection_score" not in content
else float(content["detection_score"]),
attribute_name=content["attribute_name"],
traj=content["predict_traj"],
traj_scores=content["predict_traj_score"],
)
class DetectionMotionBox_modified(DetectionMotionBox):
def __init__(self, *args, token=None, visibility=None, index=None, **kwargs):
"""
add annotation token
"""
super().__init__(*args, **kwargs)
self.token = token
self.visibility = visibility
self.index = index
def serialize(self) -> dict:
""" Serialize instance into json-friendly format. """
return {
"token": self.token,
"sample_token": self.sample_token,
"translation": self.translation,
"size": self.size,
"rotation": self.rotation,
"velocity": self.velocity,
"ego_translation": self.ego_translation,
"num_pts": self.num_pts,
"detection_name": self.detection_name,
"detection_score": self.detection_score,
"attribute_name": self.attribute_name,
"visibility": self.visibility,
"index": self.index,
"traj": self.traj,
"traj_scores": self.traj_scores,
}
@classmethod
def deserialize(cls, content: dict):
""" Initialize from serialized content. """
return cls(
token=content["token"],
sample_token=content["sample_token"],
translation=tuple(content["translation"]),
size=tuple(content["size"]),
rotation=tuple(content["rotation"]),
velocity=tuple(content["velocity"]),
ego_translation=(0.0, 0.0, 0.0)
if "ego_translation" not in content
else tuple(content["ego_translation"]),
num_pts=-1 if "num_pts" not in content else int(content["num_pts"]),
detection_name=content["detection_name"],
detection_score=-1.0
if "detection_score" not in content
else float(content["detection_score"]),
attribute_name=content["attribute_name"],
visibility=content["visibility"],
index=content["index"],
traj=content["traj"],
)
def load_prediction(
result_path: str,
max_boxes_per_sample: int,
box_cls,
verbose: bool = False,
category_convert_type="detection_category",
) -> Tuple[EvalBoxes, Dict]:
"""
Loads object predictions from file.
:param result_path: Path to the .json result file provided by the user.
:param max_boxes_per_sample: Maximim number of boxes allowed per sample.
:param box_cls: Type of box to load, e.g. DetectionBox, DetectionMotionBox or TrackingBox.
:param verbose: Whether to print messages to stdout.
:return: The deserialized results and meta data.
"""
# Load from file and check that the format is correct.
with open(result_path) as f:
data = json.load(f)
assert "results" in data, (
"Error: No field `results` in result file. Please note that the result format changed."
"See https://www.nuscenes.org/object-detection for more information."
)
if category_convert_type == "motion_category":
for key in data["results"].keys():
for i in range(len(data["results"][key])):
data["results"][key][i][
"detection_name"
] = detection_prediction_category_to_motion_name(
data["results"][key][i]["detection_name"]
)
# Deserialize results and get meta data.
all_results = EvalBoxes.deserialize(data["results"], box_cls)
meta = data["meta"]
if verbose:
print(
"Loaded results from {}. Found detections for {} samples.".format(
result_path, len(all_results.sample_tokens)
)
)
# Check that each sample has no more than x predicted boxes.
for sample_token in all_results.sample_tokens:
assert len(all_results.boxes[sample_token]) <= max_boxes_per_sample, (
"Error: Only <= %d boxes per sample allowed!" % max_boxes_per_sample
)
return all_results, meta
def load_gt(
nusc: NuScenes,
eval_split: str,
box_cls,
verbose: bool = False,
category_convert_type="detection_category",
):
"""
Loads ground truth boxes from DB.
:param nusc: A NuScenes instance.
:param eval_split: The evaluation split for which we load GT boxes.
:param box_cls: Type of box to load, e.g. DetectionBox or TrackingBox.
:param verbose: Whether to print messages to stdout.
:return: The GT boxes.
"""
predict_helper = PredictHelper(nusc)
# Init.
if box_cls == DetectionMotionBox_modified:
attribute_map = {a["token"]: a["name"] for a in nusc.attribute}
if verbose:
print(
"Loading annotations for {} split from nuScenes version: {}".format(
eval_split, nusc.version
)
)
# Read out all sample_tokens in DB.
sample_tokens_all = [s["token"] for s in nusc.sample]
assert len(sample_tokens_all) > 0, "Error: Database has no samples!"
# Only keep samples from this split.
splits = create_splits_scenes()
# Check compatibility of split with nusc_version.
version = nusc.version
if eval_split in {"train", "val", "train_detect", "train_track"}:
assert version.endswith(
"trainval"
), "Error: Requested split {} which is not compatible with NuScenes version {}".format(
eval_split, version
)
elif eval_split in {"mini_train", "mini_val"}:
assert version.endswith(
"mini"
), "Error: Requested split {} which is not compatible with NuScenes version {}".format(
eval_split, version
)
elif eval_split == "test":
assert version.endswith(
"test"
), "Error: Requested split {} which is not compatible with NuScenes version {}".format(
eval_split, version
)
else:
raise ValueError(
"Error: Requested split {} which this function cannot map to the correct NuScenes version.".format(
eval_split
)
)
if eval_split == "test":
# Check that you aren't trying to cheat :).
assert (
len(nusc.sample_annotation) > 0
), "Error: You are trying to evaluate on the test set but you do not have the annotations!"
index_map = {}
for scene in nusc.scene:
first_sample_token = scene["first_sample_token"]
sample = nusc.get("sample", first_sample_token)
index_map[first_sample_token] = 1
index = 2
while sample["next"] != "":
sample = nusc.get("sample", sample["next"])
index_map[sample["token"]] = index
index += 1
sample_tokens = []
for sample_token in sample_tokens_all:
scene_token = nusc.get("sample", sample_token)["scene_token"]
scene_record = nusc.get("scene", scene_token)
if scene_record["name"] in splits[eval_split]:
sample_tokens.append(sample_token)
all_annotations = EvalBoxes()
# Load annotations and filter predictions and annotations.
tracking_id_set = set()
for sample_token in tqdm.tqdm(sample_tokens, leave=verbose):
sample = nusc.get("sample", sample_token)
sample_annotation_tokens = sample["anns"]
sample_boxes = []
for sample_annotation_token in sample_annotation_tokens:
sample_annotation = nusc.get("sample_annotation", sample_annotation_token)
if box_cls == DetectionMotionBox_modified:
# Get label name in detection task and filter unused labels.
if category_convert_type == "detection_category":
detection_name = category_to_detection_name(
sample_annotation["category_name"]
)
elif category_convert_type == "motion_category":
detection_name = category_to_motion_name(
sample_annotation["category_name"]
)
else:
raise NotImplementedError
if detection_name is None:
continue
# Get attribute_name.
attr_tokens = sample_annotation["attribute_tokens"]
attr_count = len(attr_tokens)
if attr_count == 0:
attribute_name = ""
elif attr_count == 1:
attribute_name = attribute_map[attr_tokens[0]]
else:
raise Exception(
"Error: GT annotations must not have more than one attribute!"
)
instance_token = nusc.get(
"sample_annotation", sample_annotation["token"]
)["instance_token"]
fut_traj_local = predict_helper.get_future_for_agent(
instance_token, sample_token, seconds=6, in_agent_frame=True
)
fut_traj_scence_centric = np.zeros((0,))
if fut_traj_local.shape[0] > 0:
_, boxes, _ = nusc.get_sample_data(
sample["data"]["LIDAR_TOP"],
selected_anntokens=[sample_annotation["token"]],
)
box = boxes[0]
trans = box.center
rot = Quaternion(matrix=box.rotation_matrix)
fut_traj_scence_centric = convert_local_coords_to_global(
fut_traj_local, trans, rot
)
sample_boxes.append(
box_cls(
token=sample_annotation_token,
sample_token=sample_token,
translation=sample_annotation["translation"],
size=sample_annotation["size"],
rotation=sample_annotation["rotation"],
velocity=nusc.box_velocity(sample_annotation["token"])[:2],
num_pts=sample_annotation["num_lidar_pts"]
+ sample_annotation["num_radar_pts"],
detection_name=detection_name,
detection_score=-1.0, # GT samples do not have a score.
attribute_name=attribute_name,
visibility=sample_annotation["visibility_token"],
index=index_map[sample_token],
traj=fut_traj_scence_centric,
)
)
elif box_cls == TrackingBox:
assert False
else:
raise NotImplementedError("Error: Invalid box_cls %s!" % box_cls)
all_annotations.add_boxes(sample_token, sample_boxes)
if verbose:
print(
"Loaded ground truth annotations for {} samples.".format(
len(all_annotations.sample_tokens)
)
)
return all_annotations
def prediction_metrics(gt_box_match, pred_box):
pred_traj = np.array(pred_box.traj)
gt_traj_steps = gt_box_match.traj.reshape((-1, 2))
valid_steps = gt_traj_steps.shape[0]
if valid_steps <= 0:
return np.array([0]), np.array([0]), 0
nmodes = pred_traj.shape[0]
pred_steps = pred_traj.shape[1]
valid_mask = np.zeros((pred_steps,))
gt_traj = np.zeros((pred_steps, 2))
gt_traj[:valid_steps, :] = gt_traj_steps
valid_mask[:valid_steps] = 1
pred_traj = torch.tensor(pred_traj[None])
gt_traj = torch.tensor(gt_traj[None])
valid_mask = torch.tensor(valid_mask[None])
ade_err, inds = min_ade(pred_traj, gt_traj, 1 - valid_mask)
fde_err, inds = min_fde(pred_traj, gt_traj, 1 - valid_mask)
mr_err = miss_rate(pred_traj, gt_traj, 1 - valid_mask, dist_thresh=2)
return ade_err.numpy(), fde_err.numpy(), mr_err.numpy()
def accumulate(
gt_boxes: EvalBoxes,
pred_boxes: EvalBoxes,
class_name: str,
dist_fcn: Callable,
dist_th: float,
verbose: bool = False,
) -> DetectionMotionMetricData:
"""
Average Precision over predefined different recall thresholds for a single distance threshold.
The recall/conf thresholds and other raw metrics will be used in secondary metrics.
:param gt_boxes: Maps every sample_token to a list of its sample_annotations.
:param pred_boxes: Maps every sample_token to a list of its sample_results.
:param class_name: Class to compute AP on.
:param dist_fcn: Distance function used to match detections and ground truths.
:param dist_th: Distance threshold for a match.
:param verbose: If true, print debug messages.
:return: (average_prec, metrics). The average precision value and raw data for a number of metrics.
"""
# ---------------------------------------------
# Organize input and initialize accumulators.
# ---------------------------------------------
# Count the positives.
npos = len([1 for gt_box in gt_boxes.all if gt_box.detection_name == class_name])
if verbose:
print(
"Found {} GT of class {} out of {} total across {} samples.".format(
npos, class_name, len(gt_boxes.all), len(gt_boxes.sample_tokens)
)
)
# For missing classes in the GT, return a data structure corresponding to no predictions.
if npos == 0:
return DetectionMotionMetricData.no_predictions(), 0, 0, 0
# Organize the predictions in a single list.
pred_boxes_list = [
box for box in pred_boxes.all if box.detection_name == class_name
]
pred_confs = [box.detection_score for box in pred_boxes_list]
if verbose:
print(
"Found {} PRED of class {} out of {} total across {} samples.".format(
len(pred_confs),
class_name,
len(pred_boxes.all),
len(pred_boxes.sample_tokens),
)
)
# Sort by confidence.
sortind = [i for (v, i) in sorted((v, i) for (i, v) in enumerate(pred_confs))][::-1]
# Do the actual matching.
tp = [] # Accumulator of true positives.
fp = [] # Accumulator of false positives.
conf = [] # Accumulator of confidences.
# match_data holds the extra metrics we calculate for each match.
match_data = {
"trans_err": [],
"vel_err": [],
"scale_err": [],
"orient_err": [],
"attr_err": [],
"conf": [],
"min_ade_err": [],
"min_fde_err": [],
"miss_rate_err": [],
}
# ---------------------------------------------
# Match and accumulate match data.
# ---------------------------------------------
taken = set() # Initially no gt bounding box is matched.
for ind in sortind:
pred_box = pred_boxes_list[ind]
min_dist = np.inf
match_gt_idx = None
for gt_idx, gt_box in enumerate(gt_boxes[pred_box.sample_token]):
# Find closest match among ground truth boxes
if (
gt_box.detection_name == class_name
and not (pred_box.sample_token, gt_idx) in taken
):
this_distance = dist_fcn(gt_box, pred_box)
if this_distance < min_dist:
min_dist = this_distance
match_gt_idx = gt_idx
# If the closest match is close enough according to threshold we have a match!
is_match = min_dist < dist_th
if is_match:
taken.add((pred_box.sample_token, match_gt_idx))
# Update tp, fp and confs.
tp.append(1)
fp.append(0)
conf.append(pred_box.detection_score)
# Since it is a match, update match data also.
gt_box_match = gt_boxes[pred_box.sample_token][match_gt_idx]
match_data["trans_err"].append(center_distance(gt_box_match, pred_box))
match_data["vel_err"].append(velocity_l2(gt_box_match, pred_box))
match_data["scale_err"].append(1 - scale_iou(gt_box_match, pred_box))
# Barrier orientation is only determined up to 180 degree. (For cones orientation is discarded later)
period = np.pi if class_name == "barrier" else 2 * np.pi
match_data["orient_err"].append(
yaw_diff(gt_box_match, pred_box, period=period)
)
match_data["attr_err"].append(1 - attr_acc(gt_box_match, pred_box))
minade, minfde, m_r = prediction_metrics(gt_box_match, pred_box)
match_data["min_ade_err"].append(minade)
match_data["min_fde_err"].append(minfde)
match_data["miss_rate_err"].append(m_r)
match_data["conf"].append(pred_box.detection_score)
else:
# No match. Mark this as a false positive.
tp.append(0)
fp.append(1)
conf.append(pred_box.detection_score)
# Check if we have any matches. If not, just return a "no predictions" array.
if len(match_data["trans_err"]) == 0:
return DetectionMotionMetricData.no_predictions(), 0, 0, 0
# ---------------------------------------------
# Calculate and interpolate precision and recall
# ---------------------------------------------
# Accumulate.
N_tp = np.sum(tp)
N_fp = np.sum(fp)
tp = np.cumsum(tp).astype(float)
fp = np.cumsum(fp).astype(float)
conf = np.array(conf)
# Calculate precision and recall.
prec = tp / (fp + tp)
rec = tp / float(npos)
rec_interp = np.linspace(
0, 1, DetectionMotionMetricData.nelem
) # 101 steps, from 0% to 100% recall.
prec = np.interp(rec_interp, rec, prec, right=0)
conf = np.interp(rec_interp, rec, conf, right=0)
rec = rec_interp
# ---------------------------------------------
# Re-sample the match-data to match, prec, recall and conf.
# ---------------------------------------------
for key in match_data.keys():
if key == "conf":
continue # Confidence is used as reference to align with fp and tp. So skip in this step.
else:
# For each match_data, we first calculate the accumulated mean.
tmp = cummean(np.array(match_data[key]))
# Then interpolate based on the confidences. (Note reversing since np.interp needs increasing arrays)
match_data[key] = np.interp(
conf[::-1], match_data["conf"][::-1], tmp[::-1]
)[::-1]
# ---------------------------------------------
# Done. Instantiate MetricData and return
# ---------------------------------------------
return (
DetectionMotionMetricData(
recall=rec,
precision=prec,
confidence=conf,
trans_err=match_data["trans_err"],
vel_err=match_data["vel_err"],
scale_err=match_data["scale_err"],
orient_err=match_data["orient_err"],
attr_err=match_data["attr_err"],
min_ade_err=match_data["min_ade_err"],
min_fde_err=match_data["min_fde_err"],
miss_rate_err=match_data["miss_rate_err"],
),
N_tp,
N_fp,
npos,
)
def accumulate_motion(
gt_boxes: EvalBoxes,
pred_boxes: EvalBoxes,
class_name: str,
dist_fcn: Callable,
traj_fcn: Callable,
dist_th: float,
traj_dist_th: float,
verbose: bool = False,
final_step: float = 12,
) -> DetectionMotionMetricData:
"""
Average Precision over predefined different recall thresholds for a single distance threshold.
The recall/conf thresholds and other raw metrics will be used in secondary metrics.
:param gt_boxes: Maps every sample_token to a list of its sample_annotations.
:param pred_boxes: Maps every sample_token to a list of its sample_results.
:param class_name: Class to compute AP on.
:param dist_fcn: Distance function used to match detections and ground truths.
:param dist_th: Distance threshold for a match.
:param verbose: If true, print debug messages.
:return: (average_prec, metrics). The average precision value and raw data for a number of metrics.
"""
# ---------------------------------------------
# Organize input and initialize accumulators.
# ---------------------------------------------
# Count the positives.
npos = len([1 for gt_box in gt_boxes.all if gt_box.detection_name == class_name])
if verbose:
print(
"Found {} GT of class {} out of {} total across {} samples.".format(
npos, class_name, len(gt_boxes.all), len(gt_boxes.sample_tokens)
)
)
# For missing classes in the GT, return a data structure corresponding to no predictions.
if npos == 0:
return DetectionMotionMetricData.no_predictions(), 0, 0, 0
#
# Organize the predictions in a single list.
pred_boxes_list = []
pred_confs = []
pred_boxes_list = [
box for box in pred_boxes.all if box.detection_name == class_name
]
pred_confs = [box.detection_score for box in pred_boxes_list]
# for box in pred_boxes.all:
# if box.detection_name == class_name:
# box.traj_scores = np.exp(box.traj_scores)
# for i in range(len(box.traj_scores)):
# box.traj_index = i
# pred_boxes_list.append(box)
# pred_confs = [box.detection_score * box.traj_scores[box.traj_index] for box in pred_boxes_list]
if verbose:
print(
"Found {} PRED of class {} out of {} total across {} samples.".format(
len(pred_confs),
class_name,
len(pred_boxes.all),
len(pred_boxes.sample_tokens),
)
)
# Sort by confidence.
sortind = [i for (v, i) in sorted((v, i) for (i, v) in enumerate(pred_confs))][::-1]
# Do the actual matching.
tp = [] # Accumulator of true positives.
fp = [] # Accumulator of false positives.
conf = [] # Accumulator of confidences.
# match_data holds the extra metrics we calculate for each match.
match_data = {
"trans_err": [],
"vel_err": [],
"scale_err": [],
"orient_err": [],
"attr_err": [],
"conf": [],
"min_ade_err": [],
"min_fde_err": [],
"miss_rate_err": [],
}
# ---------------------------------------------
# Match and accumulate match data.
# ---------------------------------------------
taken = set() # Initially no gt bounding box is matched.
for ind in sortind:
pred_box = pred_boxes_list[ind]
min_dist = np.inf
match_gt_idx = None
for gt_idx, gt_box in enumerate(gt_boxes[pred_box.sample_token]):
# Find closest match among ground truth boxes
if (
gt_box.detection_name == class_name
and not (pred_box.sample_token, gt_idx) in taken
):
this_distance = dist_fcn(gt_box, pred_box)
if this_distance < min_dist:
min_dist = this_distance
match_gt_idx = gt_idx
fde_distance = traj_fcn(gt_box, pred_box, final_step)
# If the closest match is close enough according to threshold we have a match!
is_match = min_dist < dist_th and fde_distance < traj_dist_th
if is_match:
taken.add((pred_box.sample_token, match_gt_idx))
# Update tp, fp and confs.
tp.append(1)
fp.append(0)
conf.append(pred_box.detection_score)
# Since it is a match, update match data also.
gt_box_match = gt_boxes[pred_box.sample_token][match_gt_idx]
match_data["trans_err"].append(center_distance(gt_box_match, pred_box))
match_data["vel_err"].append(velocity_l2(gt_box_match, pred_box))
match_data["scale_err"].append(1 - scale_iou(gt_box_match, pred_box))
# Barrier orientation is only determined up to 180 degree. (For cones orientation is discarded later)
period = np.pi if class_name == "barrier" else 2 * np.pi
match_data["orient_err"].append(
yaw_diff(gt_box_match, pred_box, period=period)
)
match_data["attr_err"].append(1 - attr_acc(gt_box_match, pred_box))
minade, minfde, m_r = prediction_metrics(gt_box_match, pred_box)
match_data["min_ade_err"].append(minade)
match_data["min_fde_err"].append(minfde)
match_data["miss_rate_err"].append(m_r)
match_data["conf"].append(pred_box.detection_score)
else:
# No match. Mark this as a false positive.
tp.append(0)
fp.append(1)
conf.append(pred_box.detection_score)
# conf.append(pred_box.detection_score * pred_box.traj_scores[pred_box.traj_index])
#
# Check if we have any matches. If not, just return a "no predictions" array.
if len(match_data["trans_err"]) == 0:
return DetectionMotionMetricData.no_predictions(), 0, 0, 0
# ---------------------------------------------
# Calculate and interpolate precision and recall
# ---------------------------------------------
# Accumulate.
N_tp = np.sum(tp)
N_fp = np.sum(fp)
tp = np.cumsum(tp).astype(float)
fp = np.cumsum(fp).astype(float)
conf = np.array(conf)
# Calculate precision and recall.
prec = tp / (fp + tp)
rec = tp / float(npos)
rec_interp = np.linspace(
0, 1, DetectionMotionMetricData.nelem
) # 101 steps, from 0% to 100% recall.
prec = np.interp(rec_interp, rec, prec, right=0)
conf = np.interp(rec_interp, rec, conf, right=0)
rec = rec_interp
# ---------------------------------------------
# Re-sample the match-data to match, prec, recall and conf.
# ---------------------------------------------
for key in match_data.keys():
if key == "conf":
continue # Confidence is used as reference to align with fp and tp. So skip in this step.
else:
# For each match_data, we first calculate the accumulated mean.
tmp = cummean(np.array(match_data[key]))
# Then interpolate based on the confidences. (Note reversing since np.interp needs increasing arrays)
match_data[key] = np.interp(
conf[::-1], match_data["conf"][::-1], tmp[::-1]
)[::-1]
# ---------------------------------------------
# Done. Instantiate MetricData and return
# ---------------------------------------------
return (
DetectionMotionMetricData(
recall=rec,
precision=prec,
confidence=conf,
trans_err=match_data["trans_err"],
vel_err=match_data["vel_err"],
scale_err=match_data["scale_err"],
orient_err=match_data["orient_err"],
attr_err=match_data["attr_err"],
min_ade_err=match_data["min_ade_err"],
min_fde_err=match_data["min_fde_err"],
miss_rate_err=match_data["miss_rate_err"],
),
N_tp,
N_fp,
npos,
)