sports_tracker / tracker.py
Utkarsh430's picture
Upload 2 files
1346b10 verified
"""
tracker.py
ByteTrack wrapper using supervision 0.21 stable API.
Maintains trajectory history and speed estimates per track ID.
"""
from __future__ import annotations
import math
from collections import defaultdict, deque
from typing import Dict, List, Optional, Tuple
import numpy as np
# pixels-per-metre calibration (rough: 200px ≈ 10m pitch width)
DEFAULT_PPM = 20.0
class TrackState:
"""Stores trajectory + speed per track ID."""
def __init__(self, traj_len: int = 60, ppm: float = DEFAULT_PPM) -> None:
self.ppm = ppm
self.traj_len = traj_len
self.trajs: Dict[int, deque] = defaultdict(lambda: deque(maxlen=traj_len))
self.prev: Dict[int, Tuple[int,int]] = {}
self.speeds: Dict[int, float] = {}
def update(self, tracks: list[dict], fps: float) -> None:
"""
tracks: list of {"id": int, "xyxy": [x1,y1,x2,y2]}
"""
new_prev: Dict[int, Tuple[int,int]] = {}
for t in tracks:
tid = int(t["id"])
x1, y1, x2, y2 = t["xyxy"]
cx, cy = int((x1+x2)/2), int((y1+y2)/2)
new_prev[tid] = (cx, cy)
self.trajs[tid].append((cx, cy))
if tid in self.prev and fps > 0:
d = math.hypot(cx - self.prev[tid][0], cy - self.prev[tid][1])
spd = (d / self.ppm) * fps * 3.6 # km/h
old = self.speeds.get(tid, spd)
self.speeds[tid] = 0.7 * old + 0.3 * spd # EMA smooth
self.prev = new_prev
def trajectory(self, tid: int) -> List[Tuple[int,int]]:
return list(self.trajs[tid])
def speed(self, tid: int) -> Optional[float]:
return self.speeds.get(tid)
@property
def all_ids(self) -> List[int]:
return list(self.trajs.keys())
class SportsTracker:
"""
Wraps supervision 0.21 ByteTracker.
Input/output uses plain Python dicts — no supervision objects exposed.
"""
def __init__(
self,
fps: float = 30.0,
conf: float = 0.30,
iou: float = 0.50,
traj_len: int = 60,
ppm: float = DEFAULT_PPM,
) -> None:
import supervision as sv
self.fps = fps
self.state = TrackState(traj_len=traj_len, ppm=ppm)
# supervision 0.21 ByteTrack constructor
self._tracker = sv.ByteTrack(
track_activation_threshold=conf,
lost_track_buffer=max(60, int(fps * 3)),
minimum_matching_threshold=0.80,
frame_rate=int(fps),
)
def update(self, detections: list[dict]) -> list[dict]:
"""
Args:
detections: list of {"xyxy": [x1,y1,x2,y2], "conf": float}
Returns:
list of {"id": int, "xyxy": [x1,y1,x2,y2], "conf": float}
"""
import supervision as sv
if not detections:
return []
xyxy = np.array([d["xyxy"] for d in detections], dtype=np.float32)
confs = np.array([d["conf"] for d in detections], dtype=np.float32)
cids = np.zeros(len(detections), dtype=int)
sv_det = sv.Detections(
xyxy=xyxy,
confidence=confs,
class_id=cids,
)
tracked = self._tracker.update_with_detections(sv_det)
results = []
if tracked.tracker_id is None:
return results
for i, tid in enumerate(tracked.tracker_id):
if tid is None:
continue
results.append({
"id": int(tid),
"xyxy": tracked.xyxy[i].tolist(),
"conf": float(tracked.confidence[i]) if tracked.confidence is not None else 0.0,
})
self.state.update(results, self.fps)
return results