Spaces:
Sleeping
Sleeping
File size: 6,702 Bytes
2758540 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | """
vision/tracker.py - Multi-Object Tracking using ByteTrack algorithm
Assigns persistent track IDs across frames for each camera.
"""
import numpy as np
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass, field
from loguru import logger
from config import settings
@dataclass
class Track:
track_id: int
bbox: List[float] # [x1, y1, x2, y2]
score: float
age: int = 0
hits: int = 1
time_since_update: int = 0
state: str = "active" # active | lost | removed
history: List[List[float]] = field(default_factory=list)
def update(self, bbox: List[float], score: float):
self.bbox = bbox
self.score = score
self.hits += 1
self.age += 1
self.time_since_update = 0
self.state = "active"
self.history.append(bbox)
if len(self.history) > 30:
self.history.pop(0)
def predict(self):
"""Simple linear prediction (extend with Kalman for production)."""
self.time_since_update += 1
self.age += 1
if self.time_since_update > settings.TRACK_BUFFER:
self.state = "removed"
elif self.time_since_update > 5:
self.state = "lost"
def iou(boxA: List[float], boxB: List[float]) -> float:
"""Compute Intersection over Union between two [x1,y1,x2,y2] boxes."""
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
inter = max(0, xB - xA) * max(0, yB - yA)
areaA = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
areaB = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
union = areaA + areaB - inter
return inter / (union + 1e-6)
class ByteTracker:
"""
Simplified ByteTrack-style multi-object tracker.
Uses two-stage matching: high-confidence detections first, then low-confidence.
One instance per camera.
"""
def __init__(self, camera_id: str):
self.camera_id = camera_id
self.tracks: List[Track] = []
self._next_id = 1
self.frame_id = 0
logger.info(f"ByteTracker initialized for camera: {camera_id}")
def _new_track(self, bbox: List[float], score: float) -> Track:
t = Track(track_id=self._next_id, bbox=bbox, score=score, history=[bbox])
self._next_id += 1
return t
def _match(
self,
detections: List[Dict],
threshold: float = 0.5,
) -> Tuple[List[Tuple[int, int]], List[int], List[int]]:
"""
Greedy IoU matching between active tracks and detections.
Returns: (matched pairs), (unmatched track indices), (unmatched det indices)
"""
active = [i for i, t in enumerate(self.tracks) if t.state != "removed"]
if not active or not detections:
return [], active, list(range(len(detections)))
iou_matrix = np.zeros((len(active), len(detections)))
for i, ti in enumerate(active):
for j, det in enumerate(detections):
iou_matrix[i, j] = iou(self.tracks[ti].bbox, det["bbox"])
matched, unmatched_tracks, unmatched_dets = [], list(active), list(range(len(detections)))
while True:
if iou_matrix.size == 0:
break
flat_idx = np.argmax(iou_matrix)
ti_local, di = divmod(flat_idx, iou_matrix.shape[1])
if iou_matrix[ti_local, di] < threshold:
break
ti_global = active[ti_local]
matched.append((ti_global, di))
unmatched_tracks.remove(ti_global)
unmatched_dets.remove(di)
iou_matrix[ti_local, :] = -1
iou_matrix[:, di] = -1
return matched, unmatched_tracks, unmatched_dets
def update(self, detections: List[Dict]) -> List[Dict]:
"""
Update tracker with new detections.
Args:
detections: list of {"bbox": [...], "score": float}
Returns:
tracked_objects: list of {"track_id": int, "bbox": [...], "score": float, "state": str}
"""
self.frame_id += 1
# Predict existing tracks
for t in self.tracks:
t.predict()
# Remove permanently dead tracks
self.tracks = [t for t in self.tracks if t.state != "removed"]
# High confidence detections
high_dets = [d for d in detections if d["score"] >= settings.TRACK_THRESH]
low_dets = [d for d in detections if d["score"] < settings.TRACK_THRESH]
# Stage 1: Match high-confidence detections
matched, unmatched_tracks, unmatched_high = self._match(high_dets, threshold=settings.MATCH_THRESH)
for ti, di in matched:
self.tracks[ti].update(high_dets[di]["bbox"], high_dets[di]["score"])
# Stage 2: Match remaining tracks with low-confidence detections
remaining_unmatched = [ti for ti in unmatched_tracks if self.tracks[ti].state == "lost"]
if remaining_unmatched and low_dets:
low_iou_matrix = np.zeros((len(remaining_unmatched), len(low_dets)))
for i, ti in enumerate(remaining_unmatched):
for j, det in enumerate(low_dets):
low_iou_matrix[i, j] = iou(self.tracks[ti].bbox, det["bbox"])
for i, ti in enumerate(remaining_unmatched):
best_j = int(np.argmax(low_iou_matrix[i]))
if low_iou_matrix[i, best_j] > 0.5:
self.tracks[ti].update(low_dets[best_j]["bbox"], low_dets[best_j]["score"])
# Create new tracks for unmatched high-confidence detections
for di in unmatched_high:
self.tracks.append(self._new_track(high_dets[di]["bbox"], high_dets[di]["score"]))
# Return active tracks
return [
{
"track_id": t.track_id,
"bbox": t.bbox,
"score": t.score,
"state": t.state,
"age": t.age,
"hits": t.hits,
}
for t in self.tracks
if t.state == "active"
]
class TrackerManager:
"""Manages one ByteTracker per camera."""
def __init__(self):
self._trackers: Dict[str, ByteTracker] = {}
def get_tracker(self, camera_id: str) -> ByteTracker:
if camera_id not in self._trackers:
self._trackers[camera_id] = ByteTracker(camera_id)
return self._trackers[camera_id]
def update(self, camera_id: str, detections: List[Dict]) -> List[Dict]:
return self.get_tracker(camera_id).update(detections)
def reset(self, camera_id: str):
if camera_id in self._trackers:
del self._trackers[camera_id]
|