""" vision/tracker.py - Multi-Object Tracking using ByteTrack algorithm Assigns persistent track IDs across frames for each camera. """ import numpy as np from typing import List, Dict, Tuple, Optional from dataclasses import dataclass, field from loguru import logger from config import settings @dataclass class Track: track_id: int bbox: List[float] # [x1, y1, x2, y2] score: float age: int = 0 hits: int = 1 time_since_update: int = 0 state: str = "active" # active | lost | removed history: List[List[float]] = field(default_factory=list) def update(self, bbox: List[float], score: float): self.bbox = bbox self.score = score self.hits += 1 self.age += 1 self.time_since_update = 0 self.state = "active" self.history.append(bbox) if len(self.history) > 30: self.history.pop(0) def predict(self): """Simple linear prediction (extend with Kalman for production).""" self.time_since_update += 1 self.age += 1 if self.time_since_update > settings.TRACK_BUFFER: self.state = "removed" elif self.time_since_update > 5: self.state = "lost" def iou(boxA: List[float], boxB: List[float]) -> float: """Compute Intersection over Union between two [x1,y1,x2,y2] boxes.""" xA = max(boxA[0], boxB[0]) yA = max(boxA[1], boxB[1]) xB = min(boxA[2], boxB[2]) yB = min(boxA[3], boxB[3]) inter = max(0, xB - xA) * max(0, yB - yA) areaA = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) areaB = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) union = areaA + areaB - inter return inter / (union + 1e-6) class ByteTracker: """ Simplified ByteTrack-style multi-object tracker. Uses two-stage matching: high-confidence detections first, then low-confidence. One instance per camera. """ def __init__(self, camera_id: str): self.camera_id = camera_id self.tracks: List[Track] = [] self._next_id = 1 self.frame_id = 0 logger.info(f"ByteTracker initialized for camera: {camera_id}") def _new_track(self, bbox: List[float], score: float) -> Track: t = Track(track_id=self._next_id, bbox=bbox, score=score, history=[bbox]) self._next_id += 1 return t def _match( self, detections: List[Dict], threshold: float = 0.5, ) -> Tuple[List[Tuple[int, int]], List[int], List[int]]: """ Greedy IoU matching between active tracks and detections. Returns: (matched pairs), (unmatched track indices), (unmatched det indices) """ active = [i for i, t in enumerate(self.tracks) if t.state != "removed"] if not active or not detections: return [], active, list(range(len(detections))) iou_matrix = np.zeros((len(active), len(detections))) for i, ti in enumerate(active): for j, det in enumerate(detections): iou_matrix[i, j] = iou(self.tracks[ti].bbox, det["bbox"]) matched, unmatched_tracks, unmatched_dets = [], list(active), list(range(len(detections))) while True: if iou_matrix.size == 0: break flat_idx = np.argmax(iou_matrix) ti_local, di = divmod(flat_idx, iou_matrix.shape[1]) if iou_matrix[ti_local, di] < threshold: break ti_global = active[ti_local] matched.append((ti_global, di)) unmatched_tracks.remove(ti_global) unmatched_dets.remove(di) iou_matrix[ti_local, :] = -1 iou_matrix[:, di] = -1 return matched, unmatched_tracks, unmatched_dets def update(self, detections: List[Dict]) -> List[Dict]: """ Update tracker with new detections. Args: detections: list of {"bbox": [...], "score": float} Returns: tracked_objects: list of {"track_id": int, "bbox": [...], "score": float, "state": str} """ self.frame_id += 1 # Predict existing tracks for t in self.tracks: t.predict() # Remove permanently dead tracks self.tracks = [t for t in self.tracks if t.state != "removed"] # High confidence detections high_dets = [d for d in detections if d["score"] >= settings.TRACK_THRESH] low_dets = [d for d in detections if d["score"] < settings.TRACK_THRESH] # Stage 1: Match high-confidence detections matched, unmatched_tracks, unmatched_high = self._match(high_dets, threshold=settings.MATCH_THRESH) for ti, di in matched: self.tracks[ti].update(high_dets[di]["bbox"], high_dets[di]["score"]) # Stage 2: Match remaining tracks with low-confidence detections remaining_unmatched = [ti for ti in unmatched_tracks if self.tracks[ti].state == "lost"] if remaining_unmatched and low_dets: low_iou_matrix = np.zeros((len(remaining_unmatched), len(low_dets))) for i, ti in enumerate(remaining_unmatched): for j, det in enumerate(low_dets): low_iou_matrix[i, j] = iou(self.tracks[ti].bbox, det["bbox"]) for i, ti in enumerate(remaining_unmatched): best_j = int(np.argmax(low_iou_matrix[i])) if low_iou_matrix[i, best_j] > 0.5: self.tracks[ti].update(low_dets[best_j]["bbox"], low_dets[best_j]["score"]) # Create new tracks for unmatched high-confidence detections for di in unmatched_high: self.tracks.append(self._new_track(high_dets[di]["bbox"], high_dets[di]["score"])) # Return active tracks return [ { "track_id": t.track_id, "bbox": t.bbox, "score": t.score, "state": t.state, "age": t.age, "hits": t.hits, } for t in self.tracks if t.state == "active" ] class TrackerManager: """Manages one ByteTracker per camera.""" def __init__(self): self._trackers: Dict[str, ByteTracker] = {} def get_tracker(self, camera_id: str) -> ByteTracker: if camera_id not in self._trackers: self._trackers[camera_id] = ByteTracker(camera_id) return self._trackers[camera_id] def update(self, camera_id: str, detections: List[Dict]) -> List[Dict]: return self.get_tracker(camera_id).update(detections) def reset(self, camera_id: str): if camera_id in self._trackers: del self._trackers[camera_id]