from pathlib import Path from typing import List, Tuple, Dict, Optional from ultralytics import YOLO from numpy import ndarray from pydantic import BaseModel import numpy as np import cv2 class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: List[BoundingBox] keypoints: List[Tuple[int, int]] class Miner: QUASI_TOTAL_IOA: float = 0.90 SMALL_CONTAINED_IOA: float = 0.85 SMALL_RATIO_MAX: float = 0.50 SINGLE_PLAYER_HUE_PIVOT: float = 90.0 CORNER_INDICES = {0, 5, 24, 29} def __init__(self, path_hf_repo: Path) -> None: self.bbox_model = YOLO(path_hf_repo / "objdetect.pt") print("BBox Model (objdetect.pt) Loaded") self.keypoints_model = YOLO(path_hf_repo / "keypointdetect.pt") print("Keypoints Model (keypointdetect.pt) Loaded") def __repr__(self) -> str: return ( f"BBox Model: {type(self.bbox_model).__name__}\n" f"Keypoints Model: {type(self.keypoints_model).__name__}" ) @staticmethod def _clip_box_to_image(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> Tuple[int, int, int, int]: x1 = max(0, min(int(x1), w - 1)) y1 = max(0, min(int(y1), h - 1)) x2 = max(0, min(int(x2), w - 1)) y2 = max(0, min(int(y2), h - 1)) if x2 <= x1: x2 = min(w - 1, x1 + 1) if y2 <= y1: y2 = min(h - 1, y1 + 1) return x1, y1, x2, y2 @staticmethod def _area(bb: BoundingBox) -> int: return max(0, bb.x2 - bb.x1) * max(0, bb.y2 - bb.y1) @staticmethod def _intersect_area(a: BoundingBox, b: BoundingBox) -> int: ix1 = max(a.x1, b.x1) iy1 = max(a.y1, b.y1) ix2 = min(a.x2, b.x2) iy2 = min(a.y2, b.y2) if ix2 <= ix1 or iy2 <= iy1: return 0 return (ix2 - ix1) * (iy2 - iy1) @staticmethod def _center(bb: BoundingBox) -> Tuple[float, float]: return (0.5 * (bb.x1 + bb.x2), 0.5 * (bb.y1 + bb.y2)) @staticmethod def _mean_hs(img_bgr: np.ndarray) -> Tuple[float, float]: hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) return float(np.mean(hsv[:, :, 0])), float(np.mean(hsv[:, :, 1])) def _hs_feature_from_roi(self, img_bgr: np.ndarray, box: BoundingBox) -> np.ndarray: H, W = img_bgr.shape[:2] x1, y1, x2, y2 = self._clip_box_to_image(box.x1, box.y1, box.x2, box.y2, W, H) roi = img_bgr[y1:y2, x1:x2] if roi.size == 0: return np.array([0.0, 0.0], dtype=np.float32) hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) lower_green = np.array([35, 60, 60], dtype=np.uint8) upper_green = np.array([85, 255, 255], dtype=np.uint8) green_mask = cv2.inRange(hsv, lower_green, upper_green) non_green_mask = cv2.bitwise_not(green_mask) num_non_green = int(np.count_nonzero(non_green_mask)) total = hsv.shape[0] * hsv.shape[1] if num_non_green > max(50, total // 20): h_vals = hsv[:, :, 0][non_green_mask > 0] s_vals = hsv[:, :, 1][non_green_mask > 0] h_mean = float(np.mean(h_vals)) if h_vals.size else 0.0 s_mean = float(np.mean(s_vals)) if s_vals.size else 0.0 else: h_mean, s_mean = self._mean_hs(roi) return np.array([h_mean, s_mean], dtype=np.float32) def _ioa(self, a: BoundingBox, b: BoundingBox) -> float: inter = self._intersect_area(a, b) aa = self._area(a) if aa <= 0: return 0.0 return inter / aa def suppress_quasi_total_containment(self, boxes: List[BoundingBox]) -> List[BoundingBox]: if len(boxes) <= 1: return boxes keep = [True] * len(boxes) for i in range(len(boxes)): if not keep[i]: continue for j in range(len(boxes)): if i == j or not keep[j]: continue ioa_i_in_j = self._ioa(boxes[i], boxes[j]) if ioa_i_in_j >= self.QUASI_TOTAL_IOA: keep[i] = False break return [bb for bb, k in zip(boxes, keep) if k] def suppress_small_contained(self, boxes: List[BoundingBox]) -> List[BoundingBox]: if len(boxes) <= 1: return boxes keep = [True] * len(boxes) areas = [self._area(bb) for bb in boxes] for i in range(len(boxes)): if not keep[i]: continue for j in range(len(boxes)): if i == j or not keep[j]: continue ai, aj = areas[i], areas[j] if ai == 0 or aj == 0: continue if ai <= aj: ratio = ai / aj if ratio <= self.SMALL_RATIO_MAX: ioa_i_in_j = self._ioa(boxes[i], boxes[j]) if ioa_i_in_j >= self.SMALL_CONTAINED_IOA: keep[i] = False break else: ratio = aj / ai if ratio <= self.SMALL_RATIO_MAX: ioa_j_in_i = self._ioa(boxes[j], boxes[i]) if ioa_j_in_i >= self.SMALL_CONTAINED_IOA: keep[j] = False return [bb for bb, k in zip(boxes, keep) if k] def _assign_players_two_clusters(self, features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 1.0) _, labels, centers = cv2.kmeans( np.float32(features), K=2, bestLabels=None, criteria=criteria, attempts=5, flags=cv2.KMEANS_PP_CENTERS, ) return labels.reshape(-1), centers def _reclass_extra_goalkeepers( self, img_bgr: np.ndarray, boxes: List[BoundingBox], cluster_centers: Optional[np.ndarray], ) -> None: gk_idxs = [i for i, bb in enumerate(boxes) if int(bb.cls_id) == 1] if len(gk_idxs) <= 1: return gk_idxs_sorted = sorted(gk_idxs, key=lambda i: boxes[i].conf, reverse=True) keep_gk_idx = gk_idxs_sorted[0] to_reclass = gk_idxs_sorted[1:] for gki in to_reclass: hs_gk = self._hs_feature_from_roi(img_bgr, boxes[gki]) if cluster_centers is not None: d0 = float(np.linalg.norm(hs_gk - cluster_centers[0])) d1 = float(np.linalg.norm(hs_gk - cluster_centers[1])) assign_cls = 6 if d0 <= d1 else 7 else: assign_cls = 6 if float(hs_gk[0]) < self.SINGLE_PLAYER_HUE_PIVOT else 7 boxes[gki].cls_id = int(assign_cls) def _multi_scale_detection(self, img_bgr: np.ndarray) -> List[BoundingBox]: """ Multi-Scale Object Detection for improved small object detection. Uses multiple image scales and combines results with intelligent NMS. """ H, W = img_bgr.shape[:2] scales = [1.0, 1.2, 0.8] # Original, larger, smaller all_detections = [] for scale in scales: if scale != 1.0: new_h, new_w = int(H * scale), int(W * scale) # Ensure dimensions are reasonable if new_h > 2048 or new_w > 2048 or new_h < 320 or new_w < 320: continue scaled_img = cv2.resize(img_bgr, (new_w, new_h)) else: scaled_img = img_bgr new_h, new_w = H, W # Run detection on scaled image results = self.bbox_model.predict([scaled_img], verbose=False) if results and hasattr(results[0], "boxes") and results[0].boxes is not None: for box in results[0].boxes.data: x1, y1, x2, y2, conf, cls_id = box.tolist() # Scale coordinates back to original image size if scale != 1.0: x1 = x1 / scale y1 = y1 / scale x2 = x2 / scale y2 = y2 / scale # Clip to original image bounds x1, y1, x2, y2 = self._clip_box_to_image(x1, y1, x2, y2, W, H) # Boost confidence for detections at optimal scales if scale == 1.2 and (x2 - x1) * (y2 - y1) < 2000: # Small objects benefit from upscaling conf *= 1.1 elif scale == 0.8 and (x2 - x1) * (y2 - y1) > 10000: # Large objects benefit from downscaling conf *= 1.05 all_detections.append(BoundingBox( x1=int(x1), y1=int(y1), x2=int(x2), y2=int(y2), cls_id=int(cls_id), conf=float(conf) )) # Apply multi-scale NMS return self._multi_scale_nms(all_detections) def _multi_scale_nms(self, boxes: List[BoundingBox], iou_threshold: float = 0.5) -> List[BoundingBox]: """ Multi-scale Non-Maximum Suppression that preserves detections from different scales. """ if not boxes: return [] # Sort by confidence boxes_sorted = sorted(boxes, key=lambda x: x.conf, reverse=True) keep = [] while boxes_sorted: # Take the highest confidence box current = boxes_sorted.pop(0) keep.append(current) # Remove boxes with high IoU remaining = [] for box in boxes_sorted: if self._calculate_iou(current, box) < iou_threshold: remaining.append(box) elif box.conf > current.conf * 0.9: # Keep if confidence is very close remaining.append(box) boxes_sorted = remaining return keep def _calculate_iou(self, box1: BoundingBox, box2: BoundingBox) -> float: """Calculate Intersection over Union (IoU) between two bounding boxes.""" # Calculate intersection x1 = max(box1.x1, box2.x1) y1 = max(box1.y1, box2.y1) x2 = min(box1.x2, box2.x2) y2 = min(box1.y2, box2.y2) if x2 <= x1 or y2 <= y1: return 0.0 intersection = (x2 - x1) * (y2 - y1) # Calculate union area1 = (box1.x2 - box1.x1) * (box1.y2 - box1.y1) area2 = (box2.x2 - box2.x1) * (box2.y2 - box2.y1) union = area1 + area2 - intersection return intersection / union if union > 0 else 0.0 def predict_batch( self, batch_images: List[ndarray], offset: int, n_keypoints: int, task_type: Optional[str] = None, ) -> List[TVFrameResult]: process_objects = task_type is None or task_type == "object" process_keypoints = task_type is None or task_type == "keypoint" bboxes: Dict[int, List[BoundingBox]] = {} if process_objects: # Use multi-scale detection for better small object detection for frame_idx_in_batch, img_bgr in enumerate(batch_images): boxes = self._multi_scale_detection(img_bgr) # Handle multiple football detections footballs = [bb for bb in boxes if int(bb.cls_id) == 0] if len(footballs) > 1: best_ball = max(footballs, key=lambda b: b.conf) boxes = [bb for bb in boxes if int(bb.cls_id) != 0] boxes.append(best_ball) # Apply suppression methods boxes = self.suppress_quasi_total_containment(boxes) boxes = self.suppress_small_contained(boxes) # Team classification for players player_indices: List[int] = [] player_feats: List[np.ndarray] = [] for i, bb in enumerate(boxes): if int(bb.cls_id) == 2: hs = self._hs_feature_from_roi(img_bgr, bb) player_indices.append(i) player_feats.append(hs) cluster_centers: Optional[np.ndarray] = None n_players = len(player_feats) if n_players >= 2: feats = np.vstack(player_feats) labels, centers = self._assign_players_two_clusters(feats) order = np.argsort(centers[:, 0]) centers = centers[order] remap = {old_idx: new_idx for new_idx, old_idx in enumerate(order)} labels = np.vectorize(remap.get)(labels) cluster_centers = centers for idx_in_list, lbl in zip(player_indices, labels): boxes[idx_in_list].cls_id = 6 if int(lbl) == 0 else 7 elif n_players == 1: hue, _ = player_feats[0] boxes[player_indices[0]].cls_id = 6 if float(hue) < self.SINGLE_PLAYER_HUE_PIVOT else 7 self._reclass_extra_goalkeepers(img_bgr, boxes, cluster_centers) bboxes[offset + frame_idx_in_batch] = boxes keypoints: Dict[int, List[Tuple[int, int]]] = {} if process_keypoints: keypoints_model_results = self.keypoints_model.predict(batch_images) else: keypoints_model_results = None if keypoints_model_results is not None: for frame_idx_in_batch, detection in enumerate(keypoints_model_results): if not hasattr(detection, "keypoints") or detection.keypoints is None: continue frame_keypoints_with_conf: List[Tuple[int, int, float]] = [] for i, part_points in enumerate(detection.keypoints.data): for k_id, (x, y, _) in enumerate(part_points): confidence = float(detection.keypoints.conf[i][k_id]) frame_keypoints_with_conf.append((int(x), int(y), confidence)) if len(frame_keypoints_with_conf) < n_keypoints: frame_keypoints_with_conf.extend( [(0, 0, 0.0)] * (n_keypoints - len(frame_keypoints_with_conf)) ) else: frame_keypoints_with_conf = frame_keypoints_with_conf[:n_keypoints] filtered_keypoints: List[Tuple[int, int]] = [] for idx, (x, y, confidence) in enumerate(frame_keypoints_with_conf): if idx in self.CORNER_INDICES: if confidence < 0.3: filtered_keypoints.append((0, 0)) else: filtered_keypoints.append((int(x), int(y))) else: if confidence < 0.5: filtered_keypoints.append((0, 0)) else: filtered_keypoints.append((int(x), int(y))) keypoints[offset + frame_idx_in_batch] = filtered_keypoints results: List[TVFrameResult] = [] for frame_number in range(offset, offset + len(batch_images)): results.append( TVFrameResult( frame_id=frame_number, boxes=bboxes.get(frame_number, []), keypoints=keypoints.get( frame_number, [(0, 0) for _ in range(n_keypoints)], ), ) ) return results