Spaces:
Sleeping
Sleeping
| """ | |
| tracker.py | |
| ByteTrack wrapper using supervision 0.21 stable API. | |
| Maintains trajectory history and speed estimates per track ID. | |
| """ | |
| from __future__ import annotations | |
| import math | |
| from collections import defaultdict, deque | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| # pixels-per-metre calibration (rough: 200px ≈ 10m pitch width) | |
| DEFAULT_PPM = 20.0 | |
| class TrackState: | |
| """Stores trajectory + speed per track ID.""" | |
| def __init__(self, traj_len: int = 60, ppm: float = DEFAULT_PPM) -> None: | |
| self.ppm = ppm | |
| self.traj_len = traj_len | |
| self.trajs: Dict[int, deque] = defaultdict(lambda: deque(maxlen=traj_len)) | |
| self.prev: Dict[int, Tuple[int,int]] = {} | |
| self.speeds: Dict[int, float] = {} | |
| def update(self, tracks: list[dict], fps: float) -> None: | |
| """ | |
| tracks: list of {"id": int, "xyxy": [x1,y1,x2,y2]} | |
| """ | |
| new_prev: Dict[int, Tuple[int,int]] = {} | |
| for t in tracks: | |
| tid = int(t["id"]) | |
| x1, y1, x2, y2 = t["xyxy"] | |
| cx, cy = int((x1+x2)/2), int((y1+y2)/2) | |
| new_prev[tid] = (cx, cy) | |
| self.trajs[tid].append((cx, cy)) | |
| if tid in self.prev and fps > 0: | |
| d = math.hypot(cx - self.prev[tid][0], cy - self.prev[tid][1]) | |
| spd = (d / self.ppm) * fps * 3.6 # km/h | |
| old = self.speeds.get(tid, spd) | |
| self.speeds[tid] = 0.7 * old + 0.3 * spd # EMA smooth | |
| self.prev = new_prev | |
| def trajectory(self, tid: int) -> List[Tuple[int,int]]: | |
| return list(self.trajs[tid]) | |
| def speed(self, tid: int) -> Optional[float]: | |
| return self.speeds.get(tid) | |
| def all_ids(self) -> List[int]: | |
| return list(self.trajs.keys()) | |
| class SportsTracker: | |
| """ | |
| Wraps supervision 0.21 ByteTracker. | |
| Input/output uses plain Python dicts — no supervision objects exposed. | |
| """ | |
| def __init__( | |
| self, | |
| fps: float = 30.0, | |
| conf: float = 0.30, | |
| iou: float = 0.50, | |
| traj_len: int = 60, | |
| ppm: float = DEFAULT_PPM, | |
| ) -> None: | |
| import supervision as sv | |
| self.fps = fps | |
| self.state = TrackState(traj_len=traj_len, ppm=ppm) | |
| # supervision 0.21 ByteTrack constructor | |
| self._tracker = sv.ByteTrack( | |
| track_activation_threshold=conf, | |
| lost_track_buffer=max(60, int(fps * 3)), | |
| minimum_matching_threshold=0.80, | |
| frame_rate=int(fps), | |
| ) | |
| def update(self, detections: list[dict]) -> list[dict]: | |
| """ | |
| Args: | |
| detections: list of {"xyxy": [x1,y1,x2,y2], "conf": float} | |
| Returns: | |
| list of {"id": int, "xyxy": [x1,y1,x2,y2], "conf": float} | |
| """ | |
| import supervision as sv | |
| if not detections: | |
| return [] | |
| xyxy = np.array([d["xyxy"] for d in detections], dtype=np.float32) | |
| confs = np.array([d["conf"] for d in detections], dtype=np.float32) | |
| cids = np.zeros(len(detections), dtype=int) | |
| sv_det = sv.Detections( | |
| xyxy=xyxy, | |
| confidence=confs, | |
| class_id=cids, | |
| ) | |
| tracked = self._tracker.update_with_detections(sv_det) | |
| results = [] | |
| if tracked.tracker_id is None: | |
| return results | |
| for i, tid in enumerate(tracked.tracker_id): | |
| if tid is None: | |
| continue | |
| results.append({ | |
| "id": int(tid), | |
| "xyxy": tracked.xyxy[i].tolist(), | |
| "conf": float(tracked.confidence[i]) if tracked.confidence is not None else 0.0, | |
| }) | |
| self.state.update(results, self.fps) | |
| return results | |