ultravision-01 / miner.py
mohantesting's picture
Add files using upload-large-folder tool
f60a6c1 verified
from pathlib import Path
from typing import Generator, Iterable, List, TypeVar, Tuple, Dict, Literal, Optional
from ultralytics import YOLO
from numpy import ndarray
from pydantic import BaseModel
import numpy as np
import torch
import torchvision.transforms as T
from sklearn.cluster import KMeans
import torchvision.models as models
import cv2
# ============================================================================
# Utility Functions for Batching
# ============================================================================
V = TypeVar("V")
def create_batches(
sequence: Iterable[V], batch_size: int
) -> Generator[List[V], None, None]:
"""
Generate batches from a sequence with a specified batch size.
Args:
sequence (Iterable[V]): The input sequence to be batched.
batch_size (int): The size of each batch.
Yields:
Generator[List[V], None, None]: A generator yielding batches of the input
sequence.
"""
batch_size = max(batch_size, 1)
current_batch = []
for element in sequence:
if len(current_batch) == batch_size:
yield current_batch
current_batch = []
current_batch.append(element)
if current_batch:
yield current_batch
# ============================================================================
# Team Classification using HSV Color Space
# ============================================================================
class HSVTeamClassifier:
"""
Enhanced HSV-based team classifier with temporal consistency and confidence weighting.
Fast and lightweight, suitable for real-time processing.
"""
def __init__(self, hue_pivot: float = 90.0, temporal_weight: float = 0.3):
"""
Initialize HSV-based team classifier.
Args:
hue_pivot: Hue threshold for single player classification (default: 90.0)
temporal_weight: Weight for temporal consistency (0.0-1.0)
"""
self.hue_pivot = hue_pivot
self.temporal_weight = temporal_weight
self.cluster_centers: np.ndarray | None = None
self.previous_assignments: Dict[int, int] = {} # bbox_id -> team_id
self.assignment_confidence: Dict[int, float] = {} # bbox_id -> confidence
@staticmethod
def _extract_hsv_features_from_crop(img_bgr: np.ndarray) -> Tuple[float, float]:
"""
Extract mean hue and saturation from an image crop.
Args:
img_bgr: BGR image crop
Returns:
Tuple of (mean_hue, mean_saturation)
"""
if img_bgr.size == 0:
return (0.0, 0.0)
hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)
mean_hue = float(np.mean(hsv[:, :, 0]))
mean_saturation = float(np.mean(hsv[:, :, 1]))
return (mean_hue, mean_saturation)
def _extract_hsv_features_with_green_filter(
self, img_bgr: np.ndarray, box, img_width: int, img_height: int
) -> np.ndarray:
"""
Extract HSV features from ROI, filtering out green (grass) pixels.
Args:
img_bgr: Full frame image
box: Bounding box to extract ROI from
img_width, img_height: Image dimensions
Returns:
Array of [hue, saturation] features
"""
x1, y1, x2, y2 = Miner._clip_box_to_image(
box.x1, box.y1, box.x2, box.y2, img_width, img_height
)
roi = img_bgr[y1:y2, x1:x2]
if roi.size == 0:
return np.array([0.0, 0.0], dtype=np.float32)
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
# Filter out green pixels (grass)
lower_green = np.array([35, 60, 60], dtype=np.uint8)
upper_green = np.array([85, 255, 255], dtype=np.uint8)
green_mask = cv2.inRange(hsv, lower_green, upper_green)
non_green_mask = cv2.bitwise_not(green_mask)
num_non_green = int(np.count_nonzero(non_green_mask))
total_pixels = hsv.shape[0] * hsv.shape[1]
# If enough non-green pixels, use only those
if num_non_green > max(50, total_pixels // 20):
h_vals = hsv[:, :, 0][non_green_mask > 0]
s_vals = hsv[:, :, 1][non_green_mask > 0]
h_mean = float(np.mean(h_vals)) if h_vals.size else 0.0
s_mean = float(np.mean(s_vals)) if s_vals.size else 0.0
else:
# Use all pixels if too few non-green
h_mean, s_mean = self._extract_hsv_features_from_crop(roi)
return np.array([h_mean, s_mean], dtype=np.float32)
def _cluster_players_hsv(
self, hsv_features: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Cluster players into two teams using K-means on HSV features.
Args:
hsv_features: Array of HSV features (N, 2)
Returns:
Tuple of (labels, cluster_centers)
"""
if len(hsv_features) < 2:
return np.array([]), np.array([])
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 1.0)
_, labels, centers = cv2.kmeans(
np.float32(hsv_features),
K=2,
bestLabels=None,
criteria=criteria,
attempts=5,
flags=cv2.KMEANS_PP_CENTERS,
)
# Sort centers by hue for consistent team assignment
order = np.argsort(centers[:, 0])
centers_sorted = centers[order]
remap = {old_idx: new_idx for new_idx, old_idx in enumerate(order)}
labels_remapped = np.vectorize(remap.get)(labels.reshape(-1))
return labels_remapped, centers_sorted
def _calculate_bbox_similarity(self, box1, box2) -> float:
"""Calculate similarity between two bounding boxes based on center distance."""
center1 = ((box1.x1 + box1.x2) / 2, (box1.y1 + box1.y2) / 2)
center2 = ((box2.x1 + box2.x2) / 2, (box2.y1 + box2.y2) / 2)
distance = np.sqrt((center1[0] - center2[0])**2 + (center1[1] - center2[1])**2)
# Normalize by image diagonal (assuming 1920x1080)
max_distance = np.sqrt(1920**2 + 1080**2)
return max(0, 1 - distance / max_distance)
def _apply_temporal_consistency(
self,
current_labels: np.ndarray,
boxes: List,
hsv_features: np.ndarray
) -> np.ndarray:
"""Apply temporal consistency to reduce team assignment flickering."""
if not self.previous_assignments:
return current_labels
adjusted_labels = current_labels.copy()
for i, (box, current_label) in enumerate(zip(boxes, current_labels)):
best_match_id = None
best_similarity = 0.0
# Find best matching previous bbox
for prev_id, prev_team in self.previous_assignments.items():
# In real implementation, you'd track bbox IDs across frames
# For now, use position-based matching
similarity = 0.8 # Placeholder - would use actual bbox tracking
if similarity > best_similarity and similarity > 0.5:
best_similarity = similarity
best_match_id = prev_id
# Apply temporal consistency if confident match found
if best_match_id and best_similarity > 0.7:
prev_confidence = self.assignment_confidence.get(best_match_id, 0.5)
current_confidence = 0.8 # Based on HSV feature quality
if prev_confidence > current_confidence * 1.2:
adjusted_labels[i] = self.previous_assignments[best_match_id]
return adjusted_labels
def predict(
self,
crops: List[np.ndarray],
boxes: List,
frame_image: ndarray
) -> Tuple[np.ndarray, np.ndarray | None]:
"""
Predict team labels for player crops using HSV features with temporal consistency.
Args:
crops: List of player image crops
boxes: List of corresponding bounding boxes
frame_image: Full frame image for feature extraction
Returns:
Tuple of (team_labels, cluster_centers)
"""
if len(crops) == 0:
return np.array([]), None
h, w = frame_image.shape[:2]
hsv_features = []
for box in boxes:
features = self._extract_hsv_features_with_green_filter(
frame_image, box, w, h
)
hsv_features.append(features)
hsv_features = np.vstack(hsv_features)
if len(hsv_features) >= 2:
labels, centers = self._cluster_players_hsv(hsv_features)
# Apply temporal consistency
if self.temporal_weight > 0:
labels = self._apply_temporal_consistency(labels, boxes, hsv_features)
# Update tracking
for i, (box, label) in enumerate(zip(boxes, labels)):
bbox_id = hash((box.x1, box.y1, box.x2, box.y2)) % 10000 # Simple ID
self.previous_assignments[bbox_id] = int(label)
self.assignment_confidence[bbox_id] = 0.8
self.cluster_centers = centers
return labels, centers
elif len(hsv_features) == 1:
# Single player: use hue pivot
hue = hsv_features[0, 0]
label = 0 if float(hue) < self.hue_pivot else 1
return np.array([label]), None
else:
return np.array([]), None
# ============================================================================
# Team Classification using ResNet18 Features
# ============================================================================
class ResNetTeamClassifier:
"""
A classifier that uses ResNet18 for feature extraction and KMeans for clustering.
"""
def __init__(self, device: str = 'cpu', batch_size: int = 32):
"""
Initialize the TeamClassifier with device and batch size.
Args:
device (str): The device to run the model on ('cpu' or 'cuda').
batch_size (int): The batch size for processing images.
"""
self.device = device
self.batch_size = batch_size
# Load pretrained ResNet18 (fix deprecation warning)
self.features_model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
# Remove the final classification layer to get features
self.features_model = torch.nn.Sequential(*list(self.features_model.children())[:-1])
self.features_model.to(device)
self.features_model.eval()
# Create transform using torchvision (works directly with numpy arrays)
self.transform = T.Compose([
T.ToTensor(), # numpy (H,W,C) BGR → tensor (C,H,W) RGB, scale to [0,1]
T.Resize((224, 224)),
T.Normalize(
mean=[0.485, 0.456, 0.406], # ImageNet pretrained means
std=[0.229, 0.224, 0.225] # ImageNet pretrained stds
)
])
# KMeans clustering
self.cluster_model = KMeans(n_clusters=2, random_state=42)
def extract_features(self, crops: List[np.ndarray]) -> np.ndarray:
"""
Extract features from a list of image crops using ResNet18.
Args:
crops (List[np.ndarray]): List of image crops (CV2 numpy arrays, BGR format).
Returns:
np.ndarray: Extracted features as a numpy array (N, 512).
"""
# Batch numpy arrays directly (no PIL conversion)
batches = create_batches(crops, self.batch_size)
embeddings = []
with torch.no_grad():
for batch in batches:
# Transform numpy arrays directly to tensors and stack
inputs = torch.stack([self.transform(crop) for crop in batch]).to(self.device)
# Extract features from ResNet18 (before final classification layer)
features = self.features_model(inputs)
# Flatten the features (batch_size, 512, 1, 1) -> (batch_size, 512)
features = features.view(features.size(0), -1)
embeddings.append(features.cpu().numpy())
return np.concatenate(embeddings)
def fit(self, crops: List[np.ndarray], max_samples: int = 100) -> None:
"""
Fit the classifier model on a list of image crops.
Args:
crops (List[np.ndarray]): List of image crops.
max_samples (int): Maximum number of samples to use for fitting.
"""
# Random sample if too many crops
if len(crops) > max_samples:
indices = np.random.choice(len(crops), max_samples, replace=False)
crops = [crops[i] for i in indices]
# Extract features (512D embeddings from ResNet18)
embeddings = self.extract_features(crops)
# Fit KMeans directly (no UMAP)
self.cluster_model.fit(embeddings)
def predict(self, crops: List[np.ndarray]) -> np.ndarray:
"""
Predict the cluster labels for a list of image crops.
Args:
crops (List[np.ndarray]): List of image crops.
Returns:
np.ndarray: Predicted cluster labels (0 or 1).
"""
if len(crops) == 0:
return np.array([])
# Extract features (512D embeddings from ResNet18)
embeddings = self.extract_features(crops)
# Predict directly (no UMAP transform)
return self.cluster_model.predict(embeddings)
# ============================================================================
# Data Models
# ============================================================================
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
# ============================================================================
# Main Miner Class
# ============================================================================
class Miner:
"""
Enhanced miner combining best practices from v1 and competitor's v3.
Features:
- Multiple team classification methods (HSV, ResNet, ensemble)
- Two-stage box suppression (quasi-total containment + small contained)
- Simplified multiple goalkeeper handling (confidence-based)
- Proper task_type support for selective processing
- Boundary-aware box clipping
"""
# Constants for box suppression (from competitor's approach)
QUASI_TOTAL_IOA: float = 0.90
SMALL_CONTAINED_IOA: float = 0.85
SMALL_RATIO_MAX: float = 0.50
SINGLE_PLAYER_HUE_PIVOT: float = 90.0
CORNER_INDICES = {0, 5, 24, 29}
def __init__(
self,
path_hf_repo: Path,
team_classification_method: Literal["hsv", "resnet", "ensemble"] = "hsv"
) -> None:
"""
Loads all ML models from the repository.
Args:
path_hf_repo (Path): Path to the downloaded HuggingFace Hub repository
team_classification_method (str): Method for team classification
- "hsv": Fast HSV-based classification (default)
- "resnet": Robust ResNet18-based classification
- "ensemble": Combine both methods (vote-based)
"""
self.bbox_model = YOLO(path_hf_repo / "detection.pt")
print(f"✅ BBox Model Loaded")
self.keypoints_model = YOLO(path_hf_repo / "keypoint.pt")
print(f"✅ Keypoints Model Loaded")
# Initialize team classification method
self.team_classification_method = team_classification_method
if team_classification_method == "hsv":
self.hsv_classifier = HSVTeamClassifier(hue_pivot=self.SINGLE_PLAYER_HUE_PIVOT)
self.resnet_classifier = None
self.team_classifier_fitted = False # HSV doesn't need fitting
print(f"✅ HSV Team Classifier Initialized")
elif team_classification_method == "resnet":
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🔧 Using device: {device}")
self.resnet_classifier = ResNetTeamClassifier(device=device, batch_size=32)
self.hsv_classifier = None
self.team_classifier_fitted = False
print(f"✅ ResNet Team Classifier Loaded")
elif team_classification_method == "ensemble":
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🔧 Using device: {device}")
self.hsv_classifier = HSVTeamClassifier(hue_pivot=self.SINGLE_PLAYER_HUE_PIVOT)
self.resnet_classifier = ResNetTeamClassifier(device=device, batch_size=32)
self.team_classifier_fitted = False # Only ResNet needs fitting
print(f"✅ Ensemble Team Classifiers Loaded (HSV + ResNet)")
else:
raise ValueError(
f"Invalid team_classification_method: {team_classification_method}. "
"Must be 'hsv', 'resnet', or 'ensemble'"
)
def __repr__(self) -> str:
"""Information about miner returned in the health endpoint."""
classifier_info = f"Team Classification: {self.team_classification_method}"
if self.team_classification_method == "hsv":
classifier_info += f" ({type(self.hsv_classifier).__name__})"
elif self.team_classification_method == "resnet":
classifier_info += f" ({type(self.resnet_classifier).__name__})"
else:
classifier_info += f" (HSV + ResNet)"
return (
f"BBox Model: {type(self.bbox_model).__name__}\n"
f"Keypoints Model: {type(self.keypoints_model).__name__}\n"
f"{classifier_info}"
)
# ========================================================================
# Post-processing Helper Methods
# ========================================================================
@staticmethod
def _map_yolo_to_validator_cls_id(yolo_cls_id: int) -> int | None:
"""
Map YOLO model class ID (new model format) to validator format.
YOLO model mapping:
0: 'Player', 1: 'GoalKeeper', 2: 'Ball',
3: 'Main Referee', 4: 'Side Referee', 5: 'Staff Member'
Validator format:
0: 'ball', 1: 'goalkeeper', 2: 'player', 3: 'referee',
6: 'team1', 7: 'team2'
Args:
yolo_cls_id: Class ID from YOLO model
Returns:
Mapped class ID in validator format, or None if should be skipped
"""
if yolo_cls_id == 0: # YOLO Player -> Validator Player (2)
return 2
elif yolo_cls_id == 1: # YOLO GoalKeeper -> Validator GoalKeeper (1)
return 1
elif yolo_cls_id == 2: # YOLO Ball -> Validator Ball (0)
return 0
elif yolo_cls_id in [3, 4]: # YOLO Main/Side Referee -> Validator Referee (3)
return 3
else: # Staff Member or other -> skip
return None
@staticmethod
def _clip_box_to_image(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> Tuple[int, int, int, int]:
"""
Clip bounding box coordinates to ensure they are within image boundaries.
(Adopted from competitor's approach - simpler and more efficient)
Args:
x1, y1, x2, y2: Box coordinates
w, h: Image dimensions
Returns:
Clipped coordinates (x1, y1, x2, y2)
"""
x1 = max(0, min(int(x1), w - 1))
y1 = max(0, min(int(y1), h - 1))
x2 = max(0, min(int(x2), w - 1))
y2 = max(0, min(int(y2), h - 1))
if x2 <= x1:
x2 = min(w - 1, x1 + 1)
if y2 <= y1:
y2 = min(h - 1, y1 + 1)
return x1, y1, x2, y2
@staticmethod
def _area(bb: BoundingBox) -> int:
"""Calculate the area of a bounding box."""
return max(0, bb.x2 - bb.x1) * max(0, bb.y2 - bb.y1)
@staticmethod
def _intersect_area(a: BoundingBox, b: BoundingBox) -> int:
"""Calculate the intersection area between two boxes."""
ix1 = max(a.x1, b.x1)
iy1 = max(a.y1, b.y1)
ix2 = min(a.x2, b.x2)
iy2 = min(a.y2, b.y2)
if ix2 <= ix1 or iy2 <= iy1:
return 0
return (ix2 - ix1) * (iy2 - iy1)
def _ioa(self, a: BoundingBox, b: BoundingBox) -> float:
"""
Calculate Intersection over Area (IoA) of box a in box b.
(Adopted from competitor's approach)
"""
inter = self._intersect_area(a, b)
aa = self._area(a)
if aa <= 0:
return 0.0
return inter / aa
def suppress_quasi_total_containment(self, boxes: List[BoundingBox]) -> List[BoundingBox]:
"""
Remove boxes that are almost completely contained within another box.
(Adopted from competitor's approach - cleaner separation of concerns)
Strategy: If box_i is >= 90% contained in box_j, remove box_i.
This handles cases where one box is a near-duplicate of another.
"""
if len(boxes) <= 1:
return boxes
keep = [True] * len(boxes)
for i in range(len(boxes)):
if not keep[i]:
continue
for j in range(len(boxes)):
if i == j or not keep[j]:
continue
ioa_i_in_j = self._ioa(boxes[i], boxes[j])
if ioa_i_in_j >= self.QUASI_TOTAL_IOA:
keep[i] = False
break
return [bb for bb, k in zip(boxes, keep) if k]
def suppress_small_contained(self, boxes: List[BoundingBox]) -> List[BoundingBox]:
"""
Remove small boxes that are significantly contained within larger boxes.
(Adopted from competitor's approach - cleaner separation of concerns)
Strategy: If a small box (<= 50% size) is >= 85% contained in a larger box,
remove the small box (likely a duplicate detection).
"""
if len(boxes) <= 1:
return boxes
keep = [True] * len(boxes)
areas = [self._area(bb) for bb in boxes]
for i in range(len(boxes)):
if not keep[i]:
continue
for j in range(len(boxes)):
if i == j or not keep[j]:
continue
ai, aj = areas[i], areas[j]
if ai == 0 or aj == 0:
continue
if ai <= aj:
ratio = ai / aj
if ratio <= self.SMALL_RATIO_MAX:
ioa_i_in_j = self._ioa(boxes[i], boxes[j])
if ioa_i_in_j >= self.SMALL_CONTAINED_IOA:
keep[i] = False
break
else:
ratio = aj / ai
if ratio <= self.SMALL_RATIO_MAX:
ioa_j_in_i = self._ioa(boxes[j], boxes[i])
if ioa_j_in_i >= self.SMALL_CONTAINED_IOA:
keep[j] = False
return [bb for bb, k in zip(boxes, keep) if k]
def _handle_multiple_balls(
self, all_boxes: List[BoundingBox]
) -> List[BoundingBox]:
"""
When multiple footballs are detected, keep only the one with highest confidence.
"""
ball_detections = [box for box in all_boxes if box.cls_id == 0]
if len(ball_detections) <= 1:
return all_boxes
# Find the ball with highest confidence
best_ball = max(ball_detections, key=lambda b: b.conf)
# Remove all balls, then add back the best one
filtered_boxes = [box for box in all_boxes if box.cls_id != 0]
filtered_boxes.append(best_ball)
return filtered_boxes
def _reclass_extra_goalkeepers(
self,
img_bgr: np.ndarray,
boxes: List[BoundingBox],
cluster_centers: Optional[np.ndarray],
) -> None:
"""
When multiple goalkeepers are detected, keep the one with highest confidence
and reclassify the rest as regular players.
(Adopted from competitor's simpler approach - confidence-based selection)
Args:
img_bgr: Current frame image
boxes: List of all detected boxes (modified in-place)
cluster_centers: Pre-computed team cluster centers (if available)
"""
gk_idxs = [i for i, bb in enumerate(boxes) if int(bb.cls_id) == 1]
if len(gk_idxs) <= 1:
return
# Sort by confidence and keep the highest
gk_idxs_sorted = sorted(gk_idxs, key=lambda i: boxes[i].conf, reverse=True)
keep_gk_idx = gk_idxs_sorted[0]
to_reclass = gk_idxs_sorted[1:]
# Reclassify extra goalkeepers
for gki in to_reclass:
# Extract HSV features for team assignment
h, w = img_bgr.shape[:2]
hs_gk = self.hsv_classifier._extract_hsv_features_with_green_filter(
img_bgr, boxes[gki], w, h
) if self.hsv_classifier else None
# Assign team based on available classifier and cluster centers
if cluster_centers is not None and len(cluster_centers) >= 2:
if self.team_classification_method == "resnet" and self.team_classifier_fitted:
# Use ResNet features if available
try:
x1, y1, x2, y2 = self._clip_box_to_image(
boxes[gki].x1, boxes[gki].y1, boxes[gki].x2, boxes[gki].y2, w, h
)
gk_crop = img_bgr[y1:y2, x1:x2]
if gk_crop.size > 0:
gk_features = self.resnet_classifier.extract_features([gk_crop])[0]
d0 = float(np.linalg.norm(gk_features - cluster_centers[0]))
d1 = float(np.linalg.norm(gk_features - cluster_centers[1]))
assign_cls = 6 if d0 <= d1 else 7
else:
assign_cls = 6
except Exception:
# Fallback to HSV if ResNet fails
if hs_gk is not None:
d0 = float(np.linalg.norm(hs_gk - cluster_centers[0]))
d1 = float(np.linalg.norm(hs_gk - cluster_centers[1]))
assign_cls = 6 if d0 <= d1 else 7
else:
assign_cls = 6
else:
# Use HSV features
if hs_gk is not None:
d0 = float(np.linalg.norm(hs_gk - cluster_centers[0]))
d1 = float(np.linalg.norm(hs_gk - cluster_centers[1]))
assign_cls = 6 if d0 <= d1 else 7
else:
assign_cls = 6
else:
# No cluster centers - use hue pivot or default
if hs_gk is not None:
assign_cls = 6 if float(hs_gk[0]) < self.SINGLE_PLAYER_HUE_PIVOT else 7
else:
assign_cls = 6
boxes[gki].cls_id = int(assign_cls)
def _multi_scale_detection(self, img_bgr: np.ndarray) -> List[BoundingBox]:
"""
Multi-Scale Object Detection for improved small object detection.
Uses multiple image scales and combines results with intelligent NMS.
"""
H, W = img_bgr.shape[:2]
scales = [1.0, 1.15, 0.85] # Conservative scales for better stability
all_detections = []
for scale in scales:
if scale != 1.0:
new_h, new_w = int(H * scale), int(W * scale)
# Ensure dimensions are reasonable
if new_h > 2048 or new_w > 2048 or new_h < 320 or new_w < 320:
continue
scaled_img = cv2.resize(img_bgr, (new_w, new_h))
else:
scaled_img = img_bgr
new_h, new_w = H, W
# Run detection on scaled image
results = self.bbox_model.predict([scaled_img], verbose=False)
if results and hasattr(results[0], "boxes") and results[0].boxes is not None:
for box in results[0].boxes.data:
x1, y1, x2, y2, conf, yolo_cls_id = box.tolist()
# Map YOLO class ID to validator format
validator_cls_id = self._map_yolo_to_validator_cls_id(int(yolo_cls_id))
if validator_cls_id is None:
continue
# Scale coordinates back to original image size
if scale != 1.0:
x1 = x1 / scale
y1 = y1 / scale
x2 = x2 / scale
y2 = y2 / scale
# Clip to original image bounds
x1, y1, x2, y2 = self._clip_box_to_image(x1, y1, x2, y2, W, H)
# Boost confidence for detections at optimal scales
box_area = (x2 - x1) * (y2 - y1)
if scale == 1.15 and box_area < 2500: # Small objects benefit from upscaling
conf *= 1.08
elif scale == 0.85 and box_area > 8000: # Large objects benefit from downscaling
conf *= 1.03
all_detections.append(BoundingBox(
x1=int(x1), y1=int(y1), x2=int(x2), y2=int(y2),
cls_id=validator_cls_id, conf=float(conf)
))
# Apply multi-scale NMS
return self._multi_scale_nms(all_detections)
def _multi_scale_nms(self, boxes: List[BoundingBox], iou_threshold: float = 0.45) -> List[BoundingBox]:
"""
Multi-scale Non-Maximum Suppression that preserves detections from different scales.
"""
if not boxes:
return []
# Group by class for class-specific NMS
boxes_by_class = {}
for box in boxes:
if box.cls_id not in boxes_by_class:
boxes_by_class[box.cls_id] = []
boxes_by_class[box.cls_id].append(box)
final_boxes = []
for cls_id, class_boxes in boxes_by_class.items():
# Sort by confidence
class_boxes_sorted = sorted(class_boxes, key=lambda x: x.conf, reverse=True)
keep = []
while class_boxes_sorted:
# Take the highest confidence box
current = class_boxes_sorted.pop(0)
keep.append(current)
# Remove boxes with high IoU
remaining = []
for box in class_boxes_sorted:
iou = self._calculate_iou(current, box)
if iou < iou_threshold:
remaining.append(box)
elif box.conf > current.conf * 0.92: # Keep if confidence is very close
remaining.append(box)
class_boxes_sorted = remaining
final_boxes.extend(keep)
return final_boxes
def _calculate_iou(self, box1: BoundingBox, box2: BoundingBox) -> float:
"""Calculate Intersection over Union (IoU) between two bounding boxes."""
# Calculate intersection
x1 = max(box1.x1, box2.x1)
y1 = max(box1.y1, box2.y1)
x2 = min(box1.x2, box2.x2)
y2 = min(box1.y2, box2.y2)
if x2 <= x1 or y2 <= y1:
return 0.0
intersection = (x2 - x1) * (y2 - y1)
# Calculate union
area1 = (box1.x2 - box1.x1) * (box1.y2 - box1.y1)
area2 = (box2.x2 - box2.x1) * (box2.y2 - box2.y1)
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0.0
# ========================================================================
# Main Prediction Methods
# ========================================================================
def fit_team_classifier(
self,
batch_images: list[ndarray],
player_class_id: int = 0
) -> None:
"""
Fit the team classifier on player crops from batch images.
Only needed for ResNet or ensemble methods.
Args:
batch_images: List of images to extract player crops from
player_class_id: YOLO class ID that represents players (default: 0 for new model)
"""
if self.team_classification_method == "hsv":
print("ℹ️ HSV classifier doesn't require fitting")
return
player_crops = []
bbox_model_results = self.bbox_model.predict(batch_images)
if bbox_model_results is not None:
for frame_idx, detection in enumerate(bbox_model_results):
if not hasattr(detection, "boxes") or detection.boxes is None:
continue
frame_image = batch_images[frame_idx]
h, w = frame_image.shape[:2]
for box in detection.boxes.data:
x1, y1, x2, y2, conf, yolo_cls_id = box.tolist()
# Only collect crops for players (YOLO class ID 0 = Player)
if int(yolo_cls_id) == player_class_id:
x1_clip, y1_clip, x2_clip, y2_clip = self._clip_box_to_image(
int(x1), int(y1), int(x2), int(y2), w, h
)
crop = frame_image[y1_clip:y2_clip, x1_clip:x2_clip]
if crop.size > 0:
player_crops.append(crop)
if len(player_crops) > 0:
if self.team_classification_method == "resnet":
self.resnet_classifier.fit(player_crops)
self.team_classifier_fitted = True
print(f"✅ ResNet team classifier fitted on {len(player_crops)} player crops")
elif self.team_classification_method == "ensemble":
self.resnet_classifier.fit(player_crops)
self.team_classifier_fitted = True
print(f"✅ ResNet classifier (in ensemble) fitted on {len(player_crops)} player crops")
else:
print("⚠️ No player crops found to fit team classifier")
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
task_type: Optional[str] = None,
) -> list[TVFrameResult]:
"""
Miner prediction for a batch of images with enhanced post-processing.
Args:
batch_images (list[np.ndarray]): A list of images to process
offset (int): Frame number of the first image in the batch
n_keypoints (int): Number of keypoints expected per frame
task_type (str | None):
- None: Process both object and keypoint detection
- "object": Only process object detection
- "keypoint": Only process keypoint detection
Returns:
list[TVFrameResult]: Predictions for each image in the batch
"""
# Determine which tasks to process (adopted from competitor's approach)
process_objects = task_type is None or task_type == "object"
process_keypoints = task_type is None or task_type == "keypoint"
bboxes: dict[int, list[BoundingBox]] = {}
# Process object detection with multi-scale approach
if process_objects:
for frame_idx, frame_image in enumerate(batch_images):
# Use multi-scale detection for better small object detection
boxes = self._multi_scale_detection(frame_image)
# Handle multiple balls first
boxes = self._handle_multiple_balls(boxes)
# Apply two-stage suppression (adopted from competitor's approach)
boxes = self.suppress_quasi_total_containment(boxes)
boxes = self.suppress_small_contained(boxes)
# Team classification for players
player_boxes = [box for idx, box in enumerate(boxes) if box.cls_id == 2]
player_indices = [idx for idx, box in enumerate(boxes) if box.cls_id == 2]
team_cluster_centers = None
team_labels = None
if len(player_boxes) > 0:
if self.team_classification_method == "hsv":
# HSV-based classification (no fitting needed)
player_crops = [
frame_image[box.y1:box.y2, box.x1:box.x2]
for box in player_boxes
]
team_labels, team_cluster_centers = self.hsv_classifier.predict(
player_crops, player_boxes, frame_image
)
elif self.team_classification_method == "resnet":
# ResNet-based classification (requires fitting)
if self.team_classifier_fitted:
player_crops = [
frame_image[box.y1:box.y2, box.x1:box.x2]
for box in player_boxes
]
team_labels = self.resnet_classifier.predict(player_crops)
# Get cluster centers
if hasattr(self.resnet_classifier.cluster_model, 'cluster_centers_'):
team_cluster_centers = self.resnet_classifier.cluster_model.cluster_centers_
elif self.team_classification_method == "ensemble":
# Ensemble: combine HSV and ResNet predictions
player_crops = [
frame_image[box.y1:box.y2, box.x1:box.x2]
for box in player_boxes
]
# Get predictions from both methods
hsv_labels, hsv_centers = self.hsv_classifier.predict(
player_crops, player_boxes, frame_image
)
resnet_labels = None
resnet_centers = None
if self.team_classifier_fitted:
resnet_labels = self.resnet_classifier.predict(player_crops)
if hasattr(self.resnet_classifier.cluster_model, 'cluster_centers_'):
resnet_centers = self.resnet_classifier.cluster_model.cluster_centers_
# Combine predictions (vote-based)
if resnet_labels is not None and len(resnet_labels) == len(hsv_labels):
# Vote: if both agree, use that; otherwise prefer ResNet
team_labels = np.array([
resnet_labels[i] if resnet_labels[i] == hsv_labels[i]
else resnet_labels[i] # Prefer ResNet on disagreement
for i in range(len(hsv_labels))
])
team_cluster_centers = resnet_centers # Use ResNet centers
else:
# Fallback to HSV if ResNet not available
team_labels = hsv_labels
team_cluster_centers = hsv_centers
# Update player class IDs to team IDs (6 or 7)
if team_labels is not None and len(team_labels) == len(player_indices):
for idx, team_label in zip(player_indices, team_labels):
boxes[idx].cls_id = 6 + int(team_label)
# Handle multiple goalkeepers (simplified approach from competitor)
self._reclass_extra_goalkeepers(
frame_image, boxes, team_cluster_centers
)
bboxes[offset + frame_idx] = boxes
# Process keypoint detection
keypoints: dict[int, list[tuple[int, int]]] = {}
if process_keypoints:
keypoints_model_results = self.keypoints_model.predict(batch_images)
else:
keypoints_model_results = None
if keypoints_model_results is not None:
for frame_idx, detection in enumerate(keypoints_model_results):
if not hasattr(detection, "keypoints") or detection.keypoints is None:
continue
frame_keypoints: list[tuple[int, int, float]] = []
for i, part_points in enumerate(detection.keypoints.data):
for k_id, (x, y, _) in enumerate(part_points):
confidence = detection.keypoints.conf[i][k_id]
frame_keypoints.append((int(x), int(y), float(confidence)))
# Pad or truncate to match expected number of keypoints
if len(frame_keypoints) < n_keypoints:
frame_keypoints.extend(
[(0, 0, 0.0)] * (n_keypoints - len(frame_keypoints))
)
else:
frame_keypoints = frame_keypoints[:n_keypoints]
# Filter keypoints based on confidence
# Corner keypoints use lower threshold (0.3) to ensure homography can be computed
filtered_keypoints = []
for idx, (x, y, confidence) in enumerate(frame_keypoints):
if idx in self.CORNER_INDICES:
# For corner keypoints, use lower threshold
if confidence < 0.3:
filtered_keypoints.append((0, 0))
else:
filtered_keypoints.append((int(x), int(y)))
else:
# For non-corner keypoints, use standard threshold
if confidence < 0.5:
filtered_keypoints.append((0, 0))
else:
filtered_keypoints.append((int(x), int(y)))
keypoints[offset + frame_idx] = filtered_keypoints
# Combine results
results: list[TVFrameResult] = []
for frame_number in range(offset, offset + len(batch_images)):
results.append(
TVFrameResult(
frame_id=frame_number,
boxes=bboxes.get(frame_number, []),
keypoints=keypoints.get(
frame_number,
[(0, 0) for _ in range(n_keypoints)]
),
)
)
return results