|
|
from pathlib import Path |
|
|
from typing import List, Tuple, Dict, Optional |
|
|
|
|
|
from ultralytics import YOLO |
|
|
from numpy import ndarray |
|
|
from pydantic import BaseModel |
|
|
import numpy as np |
|
|
import cv2 |
|
|
|
|
|
|
|
|
class BoundingBox(BaseModel): |
|
|
x1: int |
|
|
y1: int |
|
|
x2: int |
|
|
y2: int |
|
|
cls_id: int |
|
|
conf: float |
|
|
|
|
|
|
|
|
class TVFrameResult(BaseModel): |
|
|
frame_id: int |
|
|
boxes: List[BoundingBox] |
|
|
keypoints: List[Tuple[int, int]] |
|
|
|
|
|
|
|
|
class Miner: |
|
|
|
|
|
QUASI_TOTAL_IOA: float = 0.88 |
|
|
SMALL_CONTAINED_IOA: float = 0.82 |
|
|
SMALL_RATIO_MAX: float = 0.55 |
|
|
SINGLE_PLAYER_HUE_PIVOT: float = 90.0 |
|
|
CORNER_INDICES = {0, 5, 24, 29} |
|
|
|
|
|
|
|
|
AGGRESSIVE_SCALES = [1.0, 1.3, 0.7, 1.1, 0.9] |
|
|
ENUMERATION_NMS_THRESHOLD = 0.4 |
|
|
SMALL_OBJECT_CONF_BOOST = 1.15 |
|
|
|
|
|
def __init__(self, path_hf_repo: Path) -> None: |
|
|
self.bbox_model = YOLO(path_hf_repo / "objdetect.pt") |
|
|
print("BBox Model (objdetect.pt) Loaded") |
|
|
self.keypoints_model = YOLO(path_hf_repo / "keypointdetect.pt") |
|
|
print("Keypoints Model (keypointdetect.pt) Loaded") |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
return ( |
|
|
f"BBox Model: {type(self.bbox_model).__name__}\n" |
|
|
f"Keypoints Model: {type(self.keypoints_model).__name__}" |
|
|
) |
|
|
|
|
|
@staticmethod |
|
|
def _clip_box_to_image(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> Tuple[int, int, int, int]: |
|
|
x1 = max(0, min(int(x1), w - 1)) |
|
|
y1 = max(0, min(int(y1), h - 1)) |
|
|
x2 = max(0, min(int(x2), w - 1)) |
|
|
y2 = max(0, min(int(y2), h - 1)) |
|
|
if x2 <= x1: |
|
|
x2 = min(w - 1, x1 + 1) |
|
|
if y2 <= y1: |
|
|
y2 = min(h - 1, y1 + 1) |
|
|
return x1, y1, x2, y2 |
|
|
|
|
|
@staticmethod |
|
|
def _area(bb: BoundingBox) -> int: |
|
|
return max(0, bb.x2 - bb.x1) * max(0, bb.y2 - bb.y1) |
|
|
|
|
|
@staticmethod |
|
|
def _intersect_area(a: BoundingBox, b: BoundingBox) -> int: |
|
|
ix1 = max(a.x1, b.x1) |
|
|
iy1 = max(a.y1, b.y1) |
|
|
ix2 = min(a.x2, b.x2) |
|
|
iy2 = min(a.y2, b.y2) |
|
|
if ix2 <= ix1 or iy2 <= iy1: |
|
|
return 0 |
|
|
return (ix2 - ix1) * (iy2 - iy1) |
|
|
|
|
|
@staticmethod |
|
|
def _center(bb: BoundingBox) -> Tuple[float, float]: |
|
|
return (0.5 * (bb.x1 + bb.x2), 0.5 * (bb.y1 + bb.y2)) |
|
|
|
|
|
@staticmethod |
|
|
def _mean_hs(img_bgr: np.ndarray) -> Tuple[float, float]: |
|
|
hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) |
|
|
return float(np.mean(hsv[:, :, 0])), float(np.mean(hsv[:, :, 1])) |
|
|
|
|
|
def _hs_feature_from_roi(self, img_bgr: np.ndarray, box: BoundingBox) -> np.ndarray: |
|
|
H, W = img_bgr.shape[:2] |
|
|
x1, y1, x2, y2 = self._clip_box_to_image(box.x1, box.y1, box.x2, box.y2, W, H) |
|
|
roi = img_bgr[y1:y2, x1:x2] |
|
|
if roi.size == 0: |
|
|
return np.array([0.0, 0.0], dtype=np.float32) |
|
|
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) |
|
|
lower_green = np.array([35, 60, 60], dtype=np.uint8) |
|
|
upper_green = np.array([85, 255, 255], dtype=np.uint8) |
|
|
green_mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
non_green_mask = cv2.bitwise_not(green_mask) |
|
|
num_non_green = int(np.count_nonzero(non_green_mask)) |
|
|
total = hsv.shape[0] * hsv.shape[1] |
|
|
if num_non_green > max(50, total // 20): |
|
|
h_vals = hsv[:, :, 0][non_green_mask > 0] |
|
|
s_vals = hsv[:, :, 1][non_green_mask > 0] |
|
|
h_mean = float(np.mean(h_vals)) if h_vals.size else 0.0 |
|
|
s_mean = float(np.mean(s_vals)) if s_vals.size else 0.0 |
|
|
else: |
|
|
h_mean, s_mean = self._mean_hs(roi) |
|
|
return np.array([h_mean, s_mean], dtype=np.float32) |
|
|
|
|
|
def _ioa(self, a: BoundingBox, b: BoundingBox) -> float: |
|
|
inter = self._intersect_area(a, b) |
|
|
aa = self._area(a) |
|
|
if aa <= 0: |
|
|
return 0.0 |
|
|
return inter / aa |
|
|
|
|
|
def suppress_quasi_total_containment(self, boxes: List[BoundingBox]) -> List[BoundingBox]: |
|
|
if len(boxes) <= 1: |
|
|
return boxes |
|
|
keep = [True] * len(boxes) |
|
|
for i in range(len(boxes)): |
|
|
if not keep[i]: |
|
|
continue |
|
|
for j in range(len(boxes)): |
|
|
if i == j or not keep[j]: |
|
|
continue |
|
|
ioa_i_in_j = self._ioa(boxes[i], boxes[j]) |
|
|
if ioa_i_in_j >= self.QUASI_TOTAL_IOA: |
|
|
keep[i] = False |
|
|
break |
|
|
return [bb for bb, k in zip(boxes, keep) if k] |
|
|
|
|
|
def suppress_small_contained(self, boxes: List[BoundingBox]) -> List[BoundingBox]: |
|
|
if len(boxes) <= 1: |
|
|
return boxes |
|
|
keep = [True] * len(boxes) |
|
|
areas = [self._area(bb) for bb in boxes] |
|
|
for i in range(len(boxes)): |
|
|
if not keep[i]: |
|
|
continue |
|
|
for j in range(len(boxes)): |
|
|
if i == j or not keep[j]: |
|
|
continue |
|
|
ai, aj = areas[i], areas[j] |
|
|
if ai == 0 or aj == 0: |
|
|
continue |
|
|
if ai <= aj: |
|
|
ratio = ai / aj |
|
|
if ratio <= self.SMALL_RATIO_MAX: |
|
|
ioa_i_in_j = self._ioa(boxes[i], boxes[j]) |
|
|
if ioa_i_in_j >= self.SMALL_CONTAINED_IOA: |
|
|
keep[i] = False |
|
|
break |
|
|
else: |
|
|
ratio = aj / ai |
|
|
if ratio <= self.SMALL_RATIO_MAX: |
|
|
ioa_j_in_i = self._ioa(boxes[j], boxes[i]) |
|
|
if ioa_j_in_i >= self.SMALL_CONTAINED_IOA: |
|
|
keep[j] = False |
|
|
return [bb for bb, k in zip(boxes, keep) if k] |
|
|
|
|
|
def _assign_players_two_clusters(self, features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 1.0) |
|
|
_, labels, centers = cv2.kmeans( |
|
|
np.float32(features), |
|
|
K=2, |
|
|
bestLabels=None, |
|
|
criteria=criteria, |
|
|
attempts=5, |
|
|
flags=cv2.KMEANS_PP_CENTERS, |
|
|
) |
|
|
return labels.reshape(-1), centers |
|
|
|
|
|
def _reclass_extra_goalkeepers( |
|
|
self, |
|
|
img_bgr: np.ndarray, |
|
|
boxes: List[BoundingBox], |
|
|
cluster_centers: Optional[np.ndarray], |
|
|
) -> None: |
|
|
gk_idxs = [i for i, bb in enumerate(boxes) if int(bb.cls_id) == 1] |
|
|
if len(gk_idxs) <= 1: |
|
|
return |
|
|
gk_idxs_sorted = sorted(gk_idxs, key=lambda i: boxes[i].conf, reverse=True) |
|
|
keep_gk_idx = gk_idxs_sorted[0] |
|
|
to_reclass = gk_idxs_sorted[1:] |
|
|
for gki in to_reclass: |
|
|
hs_gk = self._hs_feature_from_roi(img_bgr, boxes[gki]) |
|
|
if cluster_centers is not None: |
|
|
d0 = float(np.linalg.norm(hs_gk - cluster_centers[0])) |
|
|
d1 = float(np.linalg.norm(hs_gk - cluster_centers[1])) |
|
|
assign_cls = 6 if d0 <= d1 else 7 |
|
|
else: |
|
|
assign_cls = 6 if float(hs_gk[0]) < self.SINGLE_PLAYER_HUE_PIVOT else 7 |
|
|
boxes[gki].cls_id = int(assign_cls) |
|
|
|
|
|
def _aggressive_multi_scale_detection(self, img_bgr: np.ndarray) -> List[BoundingBox]: |
|
|
""" |
|
|
Aggressive Multi-Scale Object Detection optimized for enumeration and placement. |
|
|
Uses 5 scales with confidence boosting for small objects. |
|
|
""" |
|
|
H, W = img_bgr.shape[:2] |
|
|
all_detections = [] |
|
|
|
|
|
for scale in self.AGGRESSIVE_SCALES: |
|
|
if scale != 1.0: |
|
|
new_h, new_w = int(H * scale), int(W * scale) |
|
|
|
|
|
if new_h > 2560 or new_w > 2560 or new_h < 256 or new_w < 256: |
|
|
continue |
|
|
scaled_img = cv2.resize(img_bgr, (new_w, new_h)) |
|
|
else: |
|
|
scaled_img = img_bgr |
|
|
new_h, new_w = H, W |
|
|
|
|
|
|
|
|
results = self.bbox_model.predict([scaled_img], verbose=False) |
|
|
|
|
|
if results and hasattr(results[0], "boxes") and results[0].boxes is not None: |
|
|
for box in results[0].boxes.data: |
|
|
x1, y1, x2, y2, conf, cls_id = box.tolist() |
|
|
|
|
|
|
|
|
if scale != 1.0: |
|
|
x1 = x1 / scale |
|
|
y1 = y1 / scale |
|
|
x2 = x2 / scale |
|
|
y2 = y2 / scale |
|
|
|
|
|
|
|
|
x1, y1, x2, y2 = self._clip_box_to_image(x1, y1, x2, y2, W, H) |
|
|
|
|
|
|
|
|
box_area = (x2 - x1) * (y2 - y1) |
|
|
|
|
|
|
|
|
if scale == 1.3 and box_area < 1500: |
|
|
conf *= self.SMALL_OBJECT_CONF_BOOST |
|
|
elif scale == 1.1 and box_area < 3000: |
|
|
conf *= 1.10 |
|
|
elif scale == 0.7 and box_area > 15000: |
|
|
conf *= 1.08 |
|
|
elif scale == 0.9 and box_area > 8000: |
|
|
conf *= 1.05 |
|
|
|
|
|
|
|
|
if box_area < 1000: |
|
|
conf *= 1.12 |
|
|
|
|
|
all_detections.append(BoundingBox( |
|
|
x1=int(x1), y1=int(y1), x2=int(x2), y2=int(y2), |
|
|
cls_id=int(cls_id), conf=float(conf) |
|
|
)) |
|
|
|
|
|
|
|
|
return self._enumeration_optimized_nms(all_detections) |
|
|
|
|
|
def _enumeration_optimized_nms(self, boxes: List[BoundingBox]) -> List[BoundingBox]: |
|
|
""" |
|
|
Enumeration-optimized NMS with lower threshold to preserve more detections. |
|
|
""" |
|
|
if not boxes: |
|
|
return [] |
|
|
|
|
|
|
|
|
boxes_by_class = {} |
|
|
for box in boxes: |
|
|
if box.cls_id not in boxes_by_class: |
|
|
boxes_by_class[box.cls_id] = [] |
|
|
boxes_by_class[box.cls_id].append(box) |
|
|
|
|
|
final_boxes = [] |
|
|
|
|
|
for cls_id, class_boxes in boxes_by_class.items(): |
|
|
|
|
|
class_boxes_sorted = sorted(class_boxes, key=lambda x: x.conf, reverse=True) |
|
|
keep = [] |
|
|
|
|
|
while class_boxes_sorted: |
|
|
|
|
|
current = class_boxes_sorted.pop(0) |
|
|
keep.append(current) |
|
|
|
|
|
|
|
|
remaining = [] |
|
|
for box in class_boxes_sorted: |
|
|
iou = self._calculate_iou(current, box) |
|
|
if iou < self.ENUMERATION_NMS_THRESHOLD: |
|
|
remaining.append(box) |
|
|
elif box.conf > current.conf * 0.95: |
|
|
remaining.append(box) |
|
|
|
|
|
class_boxes_sorted = remaining |
|
|
|
|
|
final_boxes.extend(keep) |
|
|
|
|
|
return final_boxes |
|
|
|
|
|
def _calculate_iou(self, box1: BoundingBox, box2: BoundingBox) -> float: |
|
|
"""Calculate Intersection over Union (IoU) between two bounding boxes.""" |
|
|
|
|
|
x1 = max(box1.x1, box2.x1) |
|
|
y1 = max(box1.y1, box2.y1) |
|
|
x2 = min(box1.x2, box2.x2) |
|
|
y2 = min(box1.y2, box2.y2) |
|
|
|
|
|
if x2 <= x1 or y2 <= y1: |
|
|
return 0.0 |
|
|
|
|
|
intersection = (x2 - x1) * (y2 - y1) |
|
|
|
|
|
|
|
|
area1 = (box1.x2 - box1.x1) * (box1.y2 - box1.y1) |
|
|
area2 = (box2.x2 - box2.x1) * (box2.y2 - box2.y1) |
|
|
union = area1 + area2 - intersection |
|
|
|
|
|
return intersection / union if union > 0 else 0.0 |
|
|
|
|
|
def predict_batch( |
|
|
self, |
|
|
batch_images: List[ndarray], |
|
|
offset: int, |
|
|
n_keypoints: int, |
|
|
task_type: Optional[str] = None, |
|
|
) -> List[TVFrameResult]: |
|
|
process_objects = task_type is None or task_type == "object" |
|
|
process_keypoints = task_type is None or task_type == "keypoint" |
|
|
bboxes: Dict[int, List[BoundingBox]] = {} |
|
|
if process_objects: |
|
|
|
|
|
for frame_idx_in_batch, img_bgr in enumerate(batch_images): |
|
|
boxes = self._aggressive_multi_scale_detection(img_bgr) |
|
|
|
|
|
|
|
|
footballs = [bb for bb in boxes if int(bb.cls_id) == 0] |
|
|
if len(footballs) > 1: |
|
|
best_ball = max(footballs, key=lambda b: b.conf) |
|
|
boxes = [bb for bb in boxes if int(bb.cls_id) != 0] |
|
|
boxes.append(best_ball) |
|
|
|
|
|
|
|
|
boxes = self.suppress_quasi_total_containment(boxes) |
|
|
boxes = self.suppress_small_contained(boxes) |
|
|
|
|
|
|
|
|
player_indices: List[int] = [] |
|
|
player_feats: List[np.ndarray] = [] |
|
|
for i, bb in enumerate(boxes): |
|
|
if int(bb.cls_id) == 2: |
|
|
hs = self._hs_feature_from_roi(img_bgr, bb) |
|
|
player_indices.append(i) |
|
|
player_feats.append(hs) |
|
|
|
|
|
cluster_centers: Optional[np.ndarray] = None |
|
|
n_players = len(player_feats) |
|
|
if n_players >= 2: |
|
|
feats = np.vstack(player_feats) |
|
|
labels, centers = self._assign_players_two_clusters(feats) |
|
|
order = np.argsort(centers[:, 0]) |
|
|
centers = centers[order] |
|
|
remap = {old_idx: new_idx for new_idx, old_idx in enumerate(order)} |
|
|
labels = np.vectorize(remap.get)(labels) |
|
|
cluster_centers = centers |
|
|
for idx_in_list, lbl in zip(player_indices, labels): |
|
|
boxes[idx_in_list].cls_id = 6 if int(lbl) == 0 else 7 |
|
|
elif n_players == 1: |
|
|
hue, _ = player_feats[0] |
|
|
boxes[player_indices[0]].cls_id = 6 if float(hue) < self.SINGLE_PLAYER_HUE_PIVOT else 7 |
|
|
|
|
|
self._reclass_extra_goalkeepers(img_bgr, boxes, cluster_centers) |
|
|
bboxes[offset + frame_idx_in_batch] = boxes |
|
|
keypoints: Dict[int, List[Tuple[int, int]]] = {} |
|
|
if process_keypoints: |
|
|
keypoints_model_results = self.keypoints_model.predict(batch_images) |
|
|
else: |
|
|
keypoints_model_results = None |
|
|
if keypoints_model_results is not None: |
|
|
for frame_idx_in_batch, detection in enumerate(keypoints_model_results): |
|
|
if not hasattr(detection, "keypoints") or detection.keypoints is None: |
|
|
continue |
|
|
frame_keypoints_with_conf: List[Tuple[int, int, float]] = [] |
|
|
for i, part_points in enumerate(detection.keypoints.data): |
|
|
for k_id, (x, y, _) in enumerate(part_points): |
|
|
confidence = float(detection.keypoints.conf[i][k_id]) |
|
|
frame_keypoints_with_conf.append((int(x), int(y), confidence)) |
|
|
if len(frame_keypoints_with_conf) < n_keypoints: |
|
|
frame_keypoints_with_conf.extend( |
|
|
[(0, 0, 0.0)] * (n_keypoints - len(frame_keypoints_with_conf)) |
|
|
) |
|
|
else: |
|
|
frame_keypoints_with_conf = frame_keypoints_with_conf[:n_keypoints] |
|
|
filtered_keypoints: List[Tuple[int, int]] = [] |
|
|
for idx, (x, y, confidence) in enumerate(frame_keypoints_with_conf): |
|
|
if idx in self.CORNER_INDICES: |
|
|
if confidence < 0.3: |
|
|
filtered_keypoints.append((0, 0)) |
|
|
else: |
|
|
filtered_keypoints.append((int(x), int(y))) |
|
|
else: |
|
|
if confidence < 0.5: |
|
|
filtered_keypoints.append((0, 0)) |
|
|
else: |
|
|
filtered_keypoints.append((int(x), int(y))) |
|
|
keypoints[offset + frame_idx_in_batch] = filtered_keypoints |
|
|
results: List[TVFrameResult] = [] |
|
|
for frame_number in range(offset, offset + len(batch_images)): |
|
|
results.append( |
|
|
TVFrameResult( |
|
|
frame_id=frame_number, |
|
|
boxes=bboxes.get(frame_number, []), |
|
|
keypoints=keypoints.get( |
|
|
frame_number, |
|
|
[(0, 0) for _ in range(n_keypoints)], |
|
|
), |
|
|
) |
|
|
) |
|
|
return results |