Spaces:
Sleeping
Sleeping
Delete tracker.py
Browse files- tracker.py +0 -130
tracker.py
DELETED
|
@@ -1,130 +0,0 @@
|
|
| 1 |
-
"""
|
| 2 |
-
tracker.py
|
| 3 |
-
----------
|
| 4 |
-
Wraps supervision's ByteTracker and maintains per-ID state:
|
| 5 |
-
- trajectory history (deque of centroids)
|
| 6 |
-
- speed estimates (km/h)
|
| 7 |
-
"""
|
| 8 |
-
|
| 9 |
-
from __future__ import annotations
|
| 10 |
-
|
| 11 |
-
import math
|
| 12 |
-
from collections import defaultdict, deque
|
| 13 |
-
from typing import Dict, List, Optional, Tuple
|
| 14 |
-
|
| 15 |
-
import numpy as np
|
| 16 |
-
import supervision as sv
|
| 17 |
-
|
| 18 |
-
|
| 19 |
-
# ββ Rough calibration: pixels-per-metre βββββββββββββββββββββββββββββββββββ
|
| 20 |
-
# Assumes ~200 px visible width β 10 m of real-world pitch.
|
| 21 |
-
# Override via TrackState.ppm for better accuracy.
|
| 22 |
-
DEFAULT_PPM = 20.0 # pixels per metre
|
| 23 |
-
|
| 24 |
-
|
| 25 |
-
class TrackState:
|
| 26 |
-
"""Holds all mutable state for active/past tracks."""
|
| 27 |
-
|
| 28 |
-
def __init__(self, trajectory_maxlen: int = 60, ppm: float = DEFAULT_PPM) -> None:
|
| 29 |
-
self.trajectory_maxlen = trajectory_maxlen
|
| 30 |
-
self.ppm = ppm
|
| 31 |
-
|
| 32 |
-
# track_id β deque of (cx, cy) pixel centroids
|
| 33 |
-
self.trajectories: Dict[int, deque] = defaultdict(
|
| 34 |
-
lambda: deque(maxlen=self.trajectory_maxlen)
|
| 35 |
-
)
|
| 36 |
-
# track_id β last centroid (for speed delta)
|
| 37 |
-
self.prev_centers: Dict[int, Tuple[int, int]] = {}
|
| 38 |
-
# track_id β smoothed speed in km/h
|
| 39 |
-
self.speeds: Dict[int, float] = {}
|
| 40 |
-
|
| 41 |
-
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 42 |
-
def update(self, detections: sv.Detections, fps: float) -> None:
|
| 43 |
-
"""
|
| 44 |
-
Called once per frame after tracking.
|
| 45 |
-
Updates trajectories and speed estimates for all tracked detections.
|
| 46 |
-
|
| 47 |
-
Args:
|
| 48 |
-
detections: sv.Detections with .tracker_id populated.
|
| 49 |
-
fps: Video frame rate (for speed calculation).
|
| 50 |
-
"""
|
| 51 |
-
if detections.tracker_id is None:
|
| 52 |
-
return
|
| 53 |
-
|
| 54 |
-
new_centers: Dict[int, Tuple[int, int]] = {}
|
| 55 |
-
|
| 56 |
-
for i, tid in enumerate(detections.tracker_id):
|
| 57 |
-
if tid is None:
|
| 58 |
-
continue
|
| 59 |
-
tid = int(tid)
|
| 60 |
-
xyxy = detections.xyxy[i]
|
| 61 |
-
cx = int((xyxy[0] + xyxy[2]) / 2)
|
| 62 |
-
cy = int((xyxy[1] + xyxy[3]) / 2)
|
| 63 |
-
|
| 64 |
-
new_centers[tid] = (cx, cy)
|
| 65 |
-
self.trajectories[tid].append((cx, cy))
|
| 66 |
-
|
| 67 |
-
# Speed: frame-to-frame euclidean distance β km/h
|
| 68 |
-
if tid in self.prev_centers and fps > 0:
|
| 69 |
-
px0, py0 = self.prev_centers[tid]
|
| 70 |
-
dist_px = math.hypot(cx - px0, cy - py0)
|
| 71 |
-
dist_m = dist_px / self.ppm
|
| 72 |
-
speed_ms = dist_m * fps
|
| 73 |
-
speed_kmh = speed_ms * 3.6
|
| 74 |
-
|
| 75 |
-
# Exponential moving average to smooth jitter
|
| 76 |
-
prev_speed = self.speeds.get(tid, speed_kmh)
|
| 77 |
-
self.speeds[tid] = 0.7 * prev_speed + 0.3 * speed_kmh
|
| 78 |
-
|
| 79 |
-
self.prev_centers = new_centers
|
| 80 |
-
|
| 81 |
-
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 82 |
-
def trajectory(self, tid: int) -> List[Tuple[int, int]]:
|
| 83 |
-
return list(self.trajectories[tid])
|
| 84 |
-
|
| 85 |
-
def speed(self, tid: int) -> Optional[float]:
|
| 86 |
-
return self.speeds.get(tid)
|
| 87 |
-
|
| 88 |
-
@property
|
| 89 |
-
def unique_ids(self) -> List[int]:
|
| 90 |
-
return list(self.trajectories.keys())
|
| 91 |
-
|
| 92 |
-
|
| 93 |
-
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 94 |
-
class SportsTracker:
|
| 95 |
-
"""
|
| 96 |
-
Combines supervision ByteTracker with TrackState.
|
| 97 |
-
Single call per frame: tracker.update(detections, fps) β sv.Detections
|
| 98 |
-
"""
|
| 99 |
-
|
| 100 |
-
def __init__(
|
| 101 |
-
self,
|
| 102 |
-
fps: float = 30.0,
|
| 103 |
-
conf_threshold: float = 0.30,
|
| 104 |
-
iou_threshold: float = 0.50,
|
| 105 |
-
trajectory_len: int = 60,
|
| 106 |
-
ppm: float = DEFAULT_PPM,
|
| 107 |
-
) -> None:
|
| 108 |
-
self.fps = fps
|
| 109 |
-
|
| 110 |
-
self.byte_tracker = sv.ByteTracker(
|
| 111 |
-
track_activation_threshold=conf_threshold,
|
| 112 |
-
lost_track_buffer=int(fps * 2), # keep ID alive 2 sec
|
| 113 |
-
minimum_matching_threshold=iou_threshold,
|
| 114 |
-
frame_rate=int(fps),
|
| 115 |
-
)
|
| 116 |
-
|
| 117 |
-
self.state = TrackState(trajectory_maxlen=trajectory_len, ppm=ppm)
|
| 118 |
-
|
| 119 |
-
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 120 |
-
def update(self, detections: sv.Detections) -> sv.Detections:
|
| 121 |
-
"""
|
| 122 |
-
Args:
|
| 123 |
-
detections: Raw sv.Detections from PersonDetector.
|
| 124 |
-
|
| 125 |
-
Returns:
|
| 126 |
-
sv.Detections with .tracker_id assigned.
|
| 127 |
-
"""
|
| 128 |
-
tracked = self.byte_tracker.update_with_detections(detections)
|
| 129 |
-
self.state.update(tracked, self.fps)
|
| 130 |
-
return tracked
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|