""" Multi-object tracking module using BoT-SORT via Ultralytics. Provides a ``Tracker`` class that runs YOLOv11 detection + BoT-SORT tracking on video files or frame sequences. Usage: from src.tracking.tracker import Tracker tracker = Tracker(cfg) for frame, detections, track_ids in tracker.track_video("input.mp4"): ... # visualize or log Note: This module will be fully implemented in Roadmap Step 5. The current version provides the interface and basic plumbing. """ from __future__ import annotations import logging from pathlib import Path from typing import Generator import cv2 import numpy as np from src.config import Config from src.detection.detector import Detection, Detector logger = logging.getLogger(__name__) class Tracker: """Config-driven BoT-SORT wildlife tracker. Wraps Ultralytics' built-in tracking mode, which couples YOLOv11 detection with BoT-SORT association frame-by-frame. Args: cfg: Pipeline configuration object. """ def __init__(self, cfg: Config) -> None: self._cfg = cfg self._detector = Detector(cfg) trk_cfg = cfg.tracking self._tracker_type = str(trk_cfg.tracker) self._tracker_config = str(trk_cfg.tracker_config) self._track_buffer = int(trk_cfg.track_buffer) self._min_track_length = int(trk_cfg.min_track_length) logger.info( "Tracker initialised: %s (buffer=%d, min_length=%d)", self._tracker_type, self._track_buffer, self._min_track_length, ) def track_video( self, video_path: str | Path, *, save: bool = False, ) -> Generator[tuple[np.ndarray, list[Detection], list[int]], None, None]: """Run detection + tracking on a video file, yielding per-frame results. Args: video_path: Path to the input video. save: Whether to save the annotated video. Yields: Tuple of (frame, detections, track_ids) for each frame. """ video_path = Path(video_path) if not video_path.exists(): raise FileNotFoundError(f"Video not found: {video_path}") det_cfg = self._cfg.detection results_gen = self._detector.model.track( source=str(video_path), conf=float(det_cfg.confidence_threshold), iou=float(det_cfg.iou_threshold), imgsz=int(det_cfg.image_size), device=str(det_cfg.device), tracker=self._tracker_config, stream=True, verbose=False, ) for result in results_gen: frame = result.orig_img detections: list[Detection] = [] track_ids: list[int] = [] if result.boxes is not None and len(result.boxes) > 0: for box in result.boxes: xyxy = box.xyxy[0].cpu().numpy() conf = float(box.conf[0].cpu().numpy()) cls_id = int(box.cls[0].cpu().numpy()) cls_name = result.names.get(cls_id, f"class_{cls_id}") tid = int(box.id[0].cpu().numpy()) if box.id is not None else -1 detections.append( Detection( bbox=(float(xyxy[0]), float(xyxy[1]), float(xyxy[2]), float(xyxy[3])), confidence=conf, class_id=cls_id, class_name=cls_name, ) ) track_ids.append(tid) yield frame, detections, track_ids logger.info("Tracking complete for %s", video_path.name)