| | from pathlib import Path |
| | from ultralytics import YOLO |
| | from numpy import ndarray |
| | from pydantic import BaseModel |
| | from typing import List, Tuple, Optional |
| | import numpy as np |
| | import cv2 |
| | from sklearn.cluster import KMeans |
| |
|
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def get_grass_color(img: np.ndarray) -> Tuple[int, int, int]: |
| | """Estimate dominant green (grass) color from the image in BGR.""" |
| | hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) |
| | lower_green = np.array([30, 40, 40]) |
| | upper_green = np.array([80, 255, 255]) |
| | mask = cv2.inRange(hsv, lower_green, upper_green) |
| | grass_color = cv2.mean(img, mask=mask) |
| | return grass_color[:3] |
| |
|
| |
|
| | def get_players_boxes(result): |
| | """Extract player crops and boxes from YOLO result. |
| | |
| | Model class mapping: |
| | 0: 'Player', 1: 'GoalKeeper', 2: 'Ball', 3: 'Main Referee', |
| | 4: 'Side Referee', 5: 'Staff Member', 6: 'left team', 7: 'right team' |
| | """ |
| | players_imgs, players_boxes = [], [] |
| | for box in result.boxes: |
| | label = int(box.cls.cpu().numpy()[0]) |
| | if label == 0: |
| | x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) |
| | crop = result.orig_img[y1:y2, x1:x2] |
| | if crop.size > 0: |
| | players_imgs.append(crop) |
| | players_boxes.append((x1, y1, x2, y2)) |
| | return players_imgs, players_boxes |
| |
|
| |
|
| | def get_kits_colors(players, grass_hsv=None, frame=None): |
| | """Extract average kit colors from player crops.""" |
| | kits_colors = [] |
| | if grass_hsv is None: |
| | grass_color = get_grass_color(frame) |
| | grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) |
| | for player_img in players: |
| | hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV) |
| | lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40]) |
| | upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255]) |
| | mask = cv2.inRange(hsv, lower_green, upper_green) |
| | mask = cv2.bitwise_not(mask) |
| | upper_mask = np.zeros(player_img.shape[:2], np.uint8) |
| | upper_mask[0:player_img.shape[0] // 2, :] = 255 |
| | mask = cv2.bitwise_and(mask, upper_mask) |
| | kit_color = np.array(cv2.mean(player_img, mask=mask)[:3]) |
| | kits_colors.append(kit_color) |
| | return kits_colors |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class BoundingBox(BaseModel): |
| | x1: int |
| | y1: int |
| | x2: int |
| | y2: int |
| | cls_id: int |
| | conf: float |
| |
|
| |
|
| | class TVFrameResult(BaseModel): |
| | frame_id: int |
| | boxes: list[BoundingBox] |
| | keypoints: list[Tuple[int, int]] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class Miner: |
| | """ |
| | Main class for sn44-compatible inference pipeline. |
| | Integrates YOLO + team color classification (HSV-based). |
| | """ |
| | CORNER_INDICES = {0, 5, 24, 29} |
| |
|
| | def __init__( |
| | self, |
| | path_hf_repo: Path, |
| | ) -> None: |
| | """Load models from the repository. |
| | |
| | Model class mapping: |
| | 0: 'Player', 1: 'GoalKeeper', 2: 'Ball', 3: 'Main Referee', |
| | 4: 'Side Referee', 5: 'Staff Member', 6: 'left team', 7: 'right team' |
| | |
| | Args: |
| | path_hf_repo: Path to HuggingFace repo with models |
| | """ |
| | self.bbox_model = YOLO(path_hf_repo / "251110-football-detection.pt") |
| | print("✅ BBox Model Loaded") |
| | self.keypoints_model = YOLO(path_hf_repo / "17112025_keypoint.pt") |
| | print("✅ Keypoints Model (Pose) Loaded") |
| |
|
| | self.team_kmeans = None |
| | self.left_team_label = 0 |
| | self.grass_hsv = None |
| | self.team_classifier_fitted = False |
| |
|
| | def __repr__(self) -> str: |
| | return ( |
| | f"BBox Model: {type(self.bbox_model).__name__}\n" |
| | f"Keypoints Model: {type(self.keypoints_model).__name__}\n" |
| | f"Team Clustering: HSV + KMeans" |
| | ) |
| |
|
| | def fit_team_classifier(self, frame: np.ndarray) -> None: |
| | """Fit KMeans team classifier on the first frame.""" |
| | result = self.bbox_model(frame, conf=0.2, verbose=False)[0] |
| | players_imgs, players_boxes = get_players_boxes(result) |
| | if len(players_imgs) == 0: |
| | print("⚠️ No players found for team fitting.") |
| | return |
| |
|
| | kits_colors = get_kits_colors(players_imgs, frame=frame) |
| | |
| | |
| | if len(kits_colors) < 2: |
| | print(f"⚠️ Chỉ tìm thấy {len(kits_colors)} cầu thủ, không đủ để phân thành 2 đội. Bỏ qua việc fit.") |
| | return |
| | |
| | self.team_kmeans = KMeans(n_clusters=2, random_state=42) |
| | self.team_kmeans.fit(kits_colors) |
| | self.team_classifier_fitted = True |
| | print(f"✅ Team KMeans fitted on {len(kits_colors)} players") |
| |
|
| | |
| | team_assignments = self.team_kmeans.predict(kits_colors) |
| | team_0_x = [players_boxes[i][0] for i, t in enumerate(team_assignments) if t == 0] |
| | team_1_x = [players_boxes[i][0] for i, t in enumerate(team_assignments) if t == 1] |
| | if len(team_0_x) and len(team_1_x): |
| | avg0, avg1 = np.mean(team_0_x), np.mean(team_1_x) |
| | self.left_team_label = 0 if avg0 < avg1 else 1 |
| | print(f"🏳️ Left team label: {self.left_team_label}") |
| |
|
| | grass_color = get_grass_color(frame) |
| | self.grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) |
| |
|
| | def predict_batch( |
| | self, |
| | batch_images: list[ndarray], |
| | offset: int, |
| | n_keypoints: int, |
| | ) -> list[TVFrameResult]: |
| | """ |
| | Run predictions and return structured results. |
| | |
| | Args: |
| | batch_images: List of image arrays (numpy) |
| | offset: Starting frame ID |
| | n_keypoints: Number of keypoints expected |
| | |
| | Returns: |
| | List of TVFrameResult |
| | """ |
| | results: list[TVFrameResult] = [] |
| |
|
| | for i, frame in enumerate(batch_images): |
| | frame_id = offset + i |
| |
|
| | |
| | if not self.team_classifier_fitted: |
| | self.fit_team_classifier(frame) |
| |
|
| | bbox_result = self.bbox_model(frame, conf=0.2, verbose=False)[0] |
| | boxes = [] |
| |
|
| | if bbox_result and bbox_result.boxes is not None: |
| | players_imgs, players_boxes = get_players_boxes(bbox_result) |
| | kits_colors = get_kits_colors(players_imgs, self.grass_hsv, frame) |
| | |
| | |
| | if len(kits_colors) > 0 and self.team_kmeans is not None: |
| | teams = self.team_kmeans.predict(kits_colors) |
| | else: |
| | teams = [] |
| |
|
| | |
| | player_indices = [] |
| | for idx, box in enumerate(bbox_result.boxes): |
| | cls_id = int(box.cls.cpu().numpy()[0]) |
| | if cls_id == 0: |
| | player_indices.append(idx) |
| | |
| | |
| | team_predictions = {} |
| | if len(player_indices) > 0 and len(teams) > 0: |
| | for player_idx, team_id in zip(player_indices, teams): |
| | |
| | |
| | if team_id == self.left_team_label: |
| | team_predictions[player_idx] = 6 |
| | else: |
| | team_predictions[player_idx] = 7 |
| | |
| | |
| | for idx, box in enumerate(bbox_result.boxes): |
| | x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) |
| | conf = float(box.conf.cpu().numpy()[0]) |
| | cls_id = int(box.cls.cpu().numpy()[0]) |
| | |
| | |
| | |
| | |
| | |
| | if idx in team_predictions: |
| | |
| | cls_id = team_predictions[idx] |
| | elif cls_id == 0: |
| | cls_id = 2 |
| | elif cls_id == 1: |
| | cls_id = 1 |
| | elif cls_id == 2: |
| | cls_id = 0 |
| | elif cls_id in [3, 4]: |
| | cls_id = 3 |
| | else: |
| | continue |
| | |
| | boxes.append( |
| | BoundingBox( |
| | x1=x1, y1=y1, x2=x2, y2=y2, cls_id=cls_id, conf=conf |
| | ) |
| | ) |
| |
|
| | |
| | |
| | |
| | keypoints_result = self.keypoints_model(frame, verbose=False)[0] |
| | frame_keypoints: List[Tuple[int, int]] = [(0, 0)] * n_keypoints |
| |
|
| | if keypoints_result and hasattr(keypoints_result, "keypoints") and keypoints_result.keypoints is not None: |
| | frame_keypoints_with_conf: List[Tuple[int, int, float]] = [] |
| | for i, part_points in enumerate(keypoints_result.keypoints.data): |
| | for k_id, (x, y, _) in enumerate(part_points): |
| | confidence = float(keypoints_result.keypoints.conf[i][k_id]) |
| | frame_keypoints_with_conf.append((int(x), int(y), confidence)) |
| | |
| | if len(frame_keypoints_with_conf) < n_keypoints: |
| | frame_keypoints_with_conf.extend( |
| | [(0, 0, 0.0)] * (n_keypoints - len(frame_keypoints_with_conf)) |
| | ) |
| | else: |
| | frame_keypoints_with_conf = frame_keypoints_with_conf[:n_keypoints] |
| | |
| | |
| | filtered_keypoints: List[Tuple[int, int]] = [] |
| | for idx, (x, y, confidence) in enumerate(frame_keypoints_with_conf): |
| | if idx in self.CORNER_INDICES: |
| | if confidence < 0.3: |
| | filtered_keypoints.append((0, 0)) |
| | else: |
| | filtered_keypoints.append((int(x), int(y))) |
| | else: |
| | if confidence < 0.5: |
| | filtered_keypoints.append((0, 0)) |
| | else: |
| | filtered_keypoints.append((int(x), int(y))) |
| | frame_keypoints = filtered_keypoints |
| |
|
| | results.append(TVFrameResult(frame_id=frame_id, boxes=boxes, keypoints=frame_keypoints)) |
| | |
| | return results |