Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from .ocsort import ( | |
| KalmanBoxTracker, | |
| associate, | |
| ciou_batch, | |
| ct_dist, | |
| diou_batch, | |
| giou_batch, | |
| iou_batch, | |
| linear_assignment, | |
| ) | |
| from .onnx_models import HandClassification, HandDetection | |
| from .utils import Deque, Drawer, Hand | |
| from .config import is_monitoring_enabled | |
| # Configure logfire monitoring if available | |
| logfire = None | |
| if is_monitoring_enabled(): | |
| try: | |
| import logfire | |
| except ImportError: | |
| logfire = None | |
| ASSO_FUNCS = {"iou": iou_batch, "giou": giou_batch, "ciou": ciou_batch, "diou": diou_batch, "ct_dist": ct_dist} | |
| def k_previous_obs(observations, cur_age, k): | |
| if len(observations) == 0: | |
| return [-1, -1, -1, -1, -1] | |
| for i in range(k): | |
| dt = k - i | |
| if cur_age - dt in observations: | |
| return observations[cur_age - dt] | |
| max_age = max(observations.keys()) | |
| return observations[max_age] | |
| class MainController: | |
| """ | |
| Main tracking function. | |
| Class contains a list of tracks, each track contains a KalmanBoxTracker object and a Deque object with Hand objects. | |
| """ | |
| def __init__( | |
| self, detection_model, classification_model, max_age=30, min_hits=3, iou_threshold=0.3, maxlen=30, min_frames=20 | |
| ): | |
| """ | |
| Parameters | |
| ---------- | |
| detection_model : str | |
| Path to detection model. | |
| classification_model : str | |
| Path to classification model. | |
| max_age : int | |
| Maximum age of track. | |
| min_hits : int | |
| Minimum number of hits to confirm track. | |
| iou_threshold : float | |
| IOU threshold for track association. | |
| maxlen : int | |
| Maximum length of deque in track. | |
| min_frames : int | |
| Minimum number of frames to confirm track. | |
| """ | |
| self.maxlen = maxlen | |
| self.min_frames = min_frames | |
| self.max_age = max_age | |
| self.min_hits = min_hits | |
| self.delta_t = 3 | |
| self.iou_threshold = iou_threshold | |
| self.inertia = 0.2 | |
| self.asso_func = ASSO_FUNCS["giou"] | |
| self.tracks = [] | |
| self.frame_count = 0 | |
| self.detection_model = HandDetection(detection_model) | |
| self.classification_model = HandClassification(classification_model) | |
| self.drawer = Drawer() | |
| def update(self, dets=np.empty((0, 5)), labels=None): | |
| """ | |
| Parameters | |
| ---------- | |
| dets : np.array | |
| Bounding boxes with shape [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...] . | |
| Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections). | |
| labels : np.array | |
| Labels with shape (N, 1) where N is number of bounding boxes. | |
| Returns | |
| ------- | |
| np.array | |
| Returns the similar array, where the last column is the object ID. | |
| Notes | |
| ----- | |
| The number of objects returned may differ from the number of detections provided. | |
| """ | |
| # Advance frame count on every call to keep tracker state in sync with real time. | |
| # This method is required to be called once per frame (even if there are no detections), | |
| # so we must advance the internal Kalman filters and aging logic on empty frames as well. | |
| self.frame_count += 1 | |
| # Get predicted locations from existing trackers for this frame. | |
| # This advances age/time_since_update and is required also when there are no detections, | |
| # ensuring tracks can age out (max_age) and do not persist indefinitely across gaps. | |
| trks = np.zeros((len(self.tracks), 5)) | |
| to_del = [] | |
| ret = [] | |
| lbs = [] | |
| for t, trk in enumerate(trks): | |
| pos = self.tracks[t]["tracker"].predict()[0] | |
| trk[:] = [pos[0], pos[1], pos[2], pos[3], 0] | |
| if np.any(np.isnan(pos)): | |
| to_del.append(t) | |
| trks = np.ma.compress_rows(np.ma.masked_invalid(trks)) | |
| for t in reversed(to_del): | |
| self.tracks.pop(t) | |
| velocities = np.array( | |
| [ | |
| trk["tracker"].velocity if trk["tracker"].velocity is not None else np.array((0, 0)) | |
| for trk in self.tracks | |
| ] | |
| ) | |
| last_boxes = np.array([trk["tracker"].last_observation for trk in self.tracks]) | |
| k_observations = np.array( | |
| [k_previous_obs(trk["tracker"].observations, trk["tracker"].age, self.delta_t) for trk in self.tracks] | |
| ) | |
| """ | |
| First round of association | |
| """ | |
| matched, unmatched_dets, unmatched_trks = associate( | |
| dets, trks, self.iou_threshold, velocities, k_observations, self.inertia | |
| ) | |
| for m in matched: | |
| self.tracks[m[1]]["tracker"].update(dets[m[0], :]) | |
| self.tracks[m[1]]["hands"].append(Hand(bbox=dets[m[0], :4], gesture=labels[m[0]])) | |
| """ | |
| Second round of associaton by OCR | |
| """ | |
| if unmatched_dets.shape[0] > 0 and unmatched_trks.shape[0] > 0: | |
| left_dets = dets[unmatched_dets] | |
| left_trks = last_boxes[unmatched_trks] | |
| iou_left = self.asso_func(left_dets, left_trks) | |
| iou_left = np.array(iou_left) | |
| if iou_left.max() > self.iou_threshold: | |
| """ | |
| NOTE: by using a lower threshold, e.g., self.iou_threshold - 0.1, you may | |
| get a higher performance especially on MOT17/MOT20 datasets. But we keep it | |
| uniform here for simplicity | |
| """ | |
| rematched_indices = linear_assignment(-iou_left) | |
| to_remove_det_indices = [] | |
| to_remove_trk_indices = [] | |
| for m in rematched_indices: | |
| det_ind, trk_ind = unmatched_dets[m[0]], unmatched_trks[m[1]] | |
| if iou_left[m[0], m[1]] < self.iou_threshold: | |
| continue | |
| self.tracks[trk_ind]["tracker"].update(dets[det_ind, :]) | |
| self.tracks[trk_ind]["hands"].append(Hand(bbox=dets[det_ind, :4], gesture=labels[det_ind])) | |
| to_remove_det_indices.append(det_ind) | |
| to_remove_trk_indices.append(trk_ind) | |
| unmatched_dets = np.setdiff1d(unmatched_dets, np.array(to_remove_det_indices)) | |
| unmatched_trks = np.setdiff1d(unmatched_trks, np.array(to_remove_trk_indices)) | |
| # For unmatched trackers (including the case with no detections), | |
| # update with None to keep the filter consistent and append a dummy Hand. | |
| for m in unmatched_trks: | |
| self.tracks[m]["tracker"].update(None) | |
| self.tracks[m]["hands"].append(Hand(bbox=None, gesture=None)) | |
| # create and initialise new trackers for unmatched detections | |
| for i in unmatched_dets: | |
| self.tracks.append( | |
| { | |
| "hands": Deque(self.maxlen, self.min_frames), | |
| "tracker": KalmanBoxTracker(dets[i, :], delta_t=self.delta_t), | |
| } | |
| ) | |
| i = len(self.tracks) | |
| for trk in reversed(self.tracks): | |
| if trk["tracker"].last_observation.sum() < 0: | |
| d = trk["tracker"].get_state()[0] | |
| else: | |
| """ | |
| this is optional to use the recent observation or the kalman filter prediction, | |
| we didn't notice significant difference here | |
| """ | |
| d = trk["tracker"].last_observation[:4] | |
| if (trk["tracker"].time_since_update < 1) and ( | |
| trk["tracker"].hit_streak >= self.min_hits or self.frame_count <= self.min_hits | |
| ): | |
| # +1 as MOT benchmark requires positive | |
| ret.append(np.concatenate((d, [trk["tracker"].id + 1])).reshape(1, -1)) | |
| if len(trk["hands"]) > 0: | |
| lbs.append(trk["hands"][-1].gesture) | |
| else: | |
| lbs.append(None) | |
| i -= 1 | |
| # remove dead tracklet | |
| if trk["tracker"].time_since_update > self.max_age: | |
| self.tracks.pop(i) | |
| if len(ret) > 0: | |
| return np.concatenate(ret), lbs | |
| return np.empty((0, 5)), np.empty((0, 1)) | |
| def __call__(self, frame): | |
| """ | |
| Parameters | |
| ---------- | |
| frame : np.array | |
| Image frame with shape (H, W, 3). | |
| Returns | |
| ------- | |
| list of np.array | |
| """ | |
| # Log frame processing if monitoring is enabled | |
| if logfire is not None: | |
| with logfire.span('frame_processing', frame_shape=frame.shape): | |
| bboxes, probs = self.detection_model(frame) | |
| if len(bboxes): | |
| detection_scores = np.asarray(probs).tolist() | |
| logfire.debug( | |
| 'Hand detections found', | |
| num_detections=len(bboxes), | |
| detection_scores=detection_scores, | |
| ) | |
| labels = self.classification_model(frame, bboxes) | |
| bboxes = np.concatenate((bboxes, np.expand_dims(probs, axis=1)), axis=1) | |
| new_bboxes, labels = self.update(dets=bboxes, labels=labels) | |
| # Log classification results | |
| if labels is not None and len(labels) > 0: | |
| labels_list = np.asarray(labels).tolist() | |
| gesture_names = [ | |
| f"gesture_{label}" if label is not None else "none" | |
| for label in labels_list | |
| ] | |
| logfire.debug( | |
| 'Gesture classifications', | |
| labels=labels_list, | |
| gesture_names=gesture_names, | |
| ) | |
| return new_bboxes[:, :-1], new_bboxes[:, -1], labels | |
| else: | |
| logfire.debug('No hand detections in frame') | |
| self.update(np.empty((0, 5)), None) | |
| return None, None, None | |
| else: | |
| # Original logic without monitoring | |
| bboxes, probs = self.detection_model(frame) | |
| if len(bboxes): | |
| labels = self.classification_model(frame, bboxes) | |
| bboxes = np.concatenate((bboxes, np.expand_dims(probs, axis=1)), axis=1) | |
| new_bboxes, labels = self.update(dets=bboxes, labels=labels) | |
| return new_bboxes[:, :-1], new_bboxes[:, -1], labels | |
| else: | |
| self.update(np.empty((0, 5)), None) | |
| return None, None, None | |