# Mikel Broström 🔥 Yolo Tracking 🧾 AGPL-3.0 license import numpy as np from boxmot.utils.iou import AssociationFunction def speed_direction_batch(dets, tracks): tracks = tracks[..., np.newaxis] CX1, CY1 = (dets[:, 0] + dets[:, 2]) / 2.0, (dets[:, 1] + dets[:, 3]) / 2.0 CX2, CY2 = (tracks[:, 0] + tracks[:, 2]) / 2.0, (tracks[:, 1] + tracks[:, 3]) / 2.0 dx = CX1 - CX2 dy = CY1 - CY2 norm = np.sqrt(dx**2 + dy**2) + 1e-6 dx = dx / norm dy = dy / norm return dy, dx # size: num_track x num_det def linear_assignment(cost_matrix): try: import lap _, x, y = lap.lapjv(cost_matrix, extend_cost=True) return np.array([[y[i], i] for i in x if i >= 0]) # except ImportError: from scipy.optimize import linear_sum_assignment x, y = linear_sum_assignment(cost_matrix) return np.array([list(zip(x, y))]) def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3): """ Assigns detections to tracked object (both represented as bounding boxes) Returns 3 lists of matches, unmatched_detections and unmatched_trackers """ if len(trackers) == 0: return ( np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int), ) iou_matrix = AssociationFunction.iou_batch(detections, trackers) if min(iou_matrix.shape) > 0: a = (iou_matrix > iou_threshold).astype(np.int32) if a.sum(1).max() == 1 and a.sum(0).max() == 1: matched_indices = np.stack(np.where(a), axis=1) else: matched_indices = linear_assignment(-iou_matrix) else: matched_indices = np.empty(shape=(0, 2)) unmatched_detections = [] for d, det in enumerate(detections): if d not in matched_indices[:, 0]: unmatched_detections.append(d) unmatched_trackers = [] for t, trk in enumerate(trackers): if t not in matched_indices[:, 1]: unmatched_trackers.append(t) # filter out matched with low IOU matches = [] for m in matched_indices: if iou_matrix[m[0], m[1]] < iou_threshold: unmatched_detections.append(m[0]) unmatched_trackers.append(m[1]) else: matches.append(m.reshape(1, 2)) if len(matches) == 0: matches = np.empty((0, 2), dtype=int) else: matches = np.concatenate(matches, axis=0) return matches, np.array(unmatched_detections), np.array(unmatched_trackers) def compute_aw_max_metric(emb_cost, w_association_emb, bottom=0.5): w_emb = np.full_like(emb_cost, w_association_emb) for idx in range(emb_cost.shape[0]): inds = np.argsort(-emb_cost[idx]) # If there's less than two matches, just keep original weight if len(inds) < 2: continue if emb_cost[idx, inds[0]] == 0: row_weight = 0 else: row_weight = 1 - max( (emb_cost[idx, inds[1]] / emb_cost[idx, inds[0]]) - bottom, 0 ) / (1 - bottom) w_emb[idx] *= row_weight for idj in range(emb_cost.shape[1]): inds = np.argsort(-emb_cost[:, idj]) # If there's less than two matches, just keep original weight if len(inds) < 2: continue if emb_cost[inds[0], idj] == 0: col_weight = 0 else: col_weight = 1 - max( (emb_cost[inds[1], idj] / emb_cost[inds[0], idj]) - bottom, 0 ) / (1 - bottom) w_emb[:, idj] *= col_weight return w_emb * emb_cost def associate( detections, trackers, asso_func, iou_threshold, velocities, previous_obs, vdc_weight, w, h, emb_cost=None, w_assoc_emb=None, aw_off=None, aw_param=None, ): if len(trackers) == 0: return ( np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int), ) Y, X = speed_direction_batch(detections, previous_obs) inertia_Y, inertia_X = velocities[:, 0], velocities[:, 1] inertia_Y = np.repeat(inertia_Y[:, np.newaxis], Y.shape[1], axis=1) inertia_X = np.repeat(inertia_X[:, np.newaxis], X.shape[1], axis=1) diff_angle_cos = inertia_X * X + inertia_Y * Y diff_angle_cos = np.clip(diff_angle_cos, a_min=-1, a_max=1) diff_angle = np.arccos(diff_angle_cos) diff_angle = (np.pi / 2.0 - np.abs(diff_angle)) / np.pi valid_mask = np.ones(previous_obs.shape[0]) valid_mask[np.where(previous_obs[:, 4] < 0)] = 0 iou_matrix = asso_func(detections, trackers) #iou_matrix = iou_batch(detections, trackers) scores = np.repeat(detections[:, -1][:, np.newaxis], trackers.shape[0], axis=1) # iou_matrix = iou_matrix * scores # a trick sometiems works, we don't encourage this valid_mask = np.repeat(valid_mask[:, np.newaxis], X.shape[1], axis=1) angle_diff_cost = (valid_mask * diff_angle) * vdc_weight angle_diff_cost = angle_diff_cost.T angle_diff_cost = angle_diff_cost * scores if min(iou_matrix.shape): a = (iou_matrix > iou_threshold).astype(np.int32) if a.sum(1).max() == 1 and a.sum(0).max() == 1: matched_indices = np.stack(np.where(a), axis=1) else: if emb_cost is None: emb_cost = 0 else: emb_cost = emb_cost emb_cost[iou_matrix <= 0] = 0 if not aw_off: emb_cost = compute_aw_max_metric(emb_cost, w_assoc_emb, bottom=aw_param) else: emb_cost *= w_assoc_emb final_cost = -(iou_matrix + angle_diff_cost + emb_cost) matched_indices = linear_assignment(final_cost) if matched_indices.size == 0: matched_indices = np.empty(shape=(0, 2)) else: matched_indices = np.empty(shape=(0, 2)) unmatched_detections = [] for d, det in enumerate(detections): if d not in matched_indices[:, 0]: unmatched_detections.append(d) unmatched_trackers = [] for t, trk in enumerate(trackers): if t not in matched_indices[:, 1]: unmatched_trackers.append(t) # filter out matched with low IOU matches = [] for m in matched_indices: if iou_matrix[m[0], m[1]] < iou_threshold: unmatched_detections.append(m[0]) unmatched_trackers.append(m[1]) else: matches.append(m.reshape(1, 2)) if len(matches) == 0: matches = np.empty((0, 2), dtype=int) else: matches = np.concatenate(matches, axis=0) return matches, np.array(unmatched_detections), np.array(unmatched_trackers) def associate_kitti( detections, trackers, det_cates, iou_threshold, velocities, previous_obs, vdc_weight ): if len(trackers) == 0: return ( np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int), ) """ Cost from the velocity direction consistency """ Y, X = speed_direction_batch(detections, previous_obs) inertia_Y, inertia_X = velocities[:, 0], velocities[:, 1] inertia_Y = np.repeat(inertia_Y[:, np.newaxis], Y.shape[1], axis=1) inertia_X = np.repeat(inertia_X[:, np.newaxis], X.shape[1], axis=1) diff_angle_cos = inertia_X * X + inertia_Y * Y diff_angle_cos = np.clip(diff_angle_cos, a_min=-1, a_max=1) diff_angle = np.arccos(diff_angle_cos) diff_angle = (np.pi / 2.0 - np.abs(diff_angle)) / np.pi valid_mask = np.ones(previous_obs.shape[0]) valid_mask[np.where(previous_obs[:, 4] < 0)] = 0 valid_mask = np.repeat(valid_mask[:, np.newaxis], X.shape[1], axis=1) scores = np.repeat(detections[:, -1][:, np.newaxis], trackers.shape[0], axis=1) angle_diff_cost = (valid_mask * diff_angle) * vdc_weight angle_diff_cost = angle_diff_cost.T angle_diff_cost = angle_diff_cost * scores """ Cost from IoU """ iou_matrix = AssociationFunction.iou_batch(detections, trackers) """ With multiple categories, generate the cost for catgory mismatch """ num_dets = detections.shape[0] num_trk = trackers.shape[0] cate_matrix = np.zeros((num_dets, num_trk)) for i in range(num_dets): for j in range(num_trk): if det_cates[i] != trackers[j, 4]: cate_matrix[i][j] = -1e6 cost_matrix = -iou_matrix - angle_diff_cost - cate_matrix if min(iou_matrix.shape) > 0: a = (iou_matrix > iou_threshold).astype(np.int32) if a.sum(1).max() == 1 and a.sum(0).max() == 1: matched_indices = np.stack(np.where(a), axis=1) else: matched_indices = linear_assignment(cost_matrix) else: matched_indices = np.empty(shape=(0, 2)) unmatched_detections = [] for d, det in enumerate(detections): if d not in matched_indices[:, 0]: unmatched_detections.append(d) unmatched_trackers = [] for t, trk in enumerate(trackers): if t not in matched_indices[:, 1]: unmatched_trackers.append(t) # filter out matched with low IOU matches = [] for m in matched_indices: if iou_matrix[m[0], m[1]] < iou_threshold: unmatched_detections.append(m[0]) unmatched_trackers.append(m[1]) else: matches.append(m.reshape(1, 2)) if len(matches) == 0: matches = np.empty((0, 2), dtype=int) else: matches = np.concatenate(matches, axis=0) return matches, np.array(unmatched_detections), np.array(unmatched_trackers)