from pathlib import Path from concurrent.futures import ThreadPoolExecutor from ultralytics import YOLO from numpy import ndarray from pydantic import BaseModel from typing import List, Tuple, Optional, Dict, Any import numpy as np import cv2 from sklearn.cluster import KMeans import torch import torch.nn as nn import torch.nn.functional as F import yaml import gc import os import sys from collections import OrderedDict, defaultdict from PIL import Image import torchvision.transforms as T import time try: from scipy.optimize import linear_sum_assignment as _linear_sum_assignment except ImportError: _linear_sum_assignment = None # ── Grass / kit helpers ──────────────────────────────── def get_grass_color(img: np.ndarray) -> Tuple[int, int, int]: if img is None or img.size == 0: return (0, 0, 0) hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) lower_green = np.array([30, 40, 40]) upper_green = np.array([80, 255, 255]) mask = cv2.inRange(hsv, lower_green, upper_green) grass_color = cv2.mean(img, mask=mask) return grass_color[:3] def get_players_boxes(result): players_imgs, players_boxes = [], [] for box in result.boxes: label = int(box.cls.cpu().numpy()[0]) if label == 2: x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) crop = result.orig_img[y1:y2, x1:x2] if crop.size > 0: players_imgs.append(crop) players_boxes.append((x1, y1, x2, y2)) return players_imgs, players_boxes def get_kits_colors(players, grass_hsv=None, frame=None): kits_colors = [] if grass_hsv is None: grass_color = get_grass_color(frame) grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) for player_img in players: hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV) lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40]) upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255]) mask = cv2.inRange(hsv, lower_green, upper_green) mask = cv2.bitwise_not(mask) upper_mask = np.zeros(player_img.shape[:2], np.uint8) upper_mask[0:player_img.shape[0] // 2, :] = 255 mask = cv2.bitwise_and(mask, upper_mask) kit_color = np.array(cv2.mean(player_img, mask=mask)[:3]) kits_colors.append(kit_color) return kits_colors # ── Person detection (new-2 style: tracking, votes, adjust) ─── # Internal class IDs: goalkeeper=1, player=2, referee=3 # Validator output: 0=player, 1=referee, 2=goalkeeper _C_GOALKEEPER = 1 _C_PLAYER = 2 _C_REFEREE = 3 _CLS_TO_VALIDATOR: Dict[int, int] = {_C_PLAYER: 0, _C_REFEREE: 1, _C_GOALKEEPER: 2} # Person model: 0=player, 1=referee, 2=goalkeeper (person-detection-model.onnx) PERSON_MODEL_IMG_SIZE = 640 PERSON_CONF = 0.4 PERSON_HALF = True # FP16 on GPU for faster inference TRACK_IOU_THRESH = 0.3 TRACK_IOU_HIGH = 0.4 TRACK_IOU_LOW = 0.2 TRACK_MAX_AGE = 3 TRACK_USE_VELOCITY = True NOISE_MIN_APPEARANCES = 5 NOISE_TAIL_FRAMES = 4 CLASS_VOTE_MAJORITY = 3 INTERP_TRACK_GAPS = True ENABLE_BOX_SMOOTHING = False BOX_SMOOTH_WINDOW = 8 OVERLAP_IOU = 0.91 def _iou_box4(a: Tuple[float, float, float, float], b: Tuple[float, float, float, float]) -> float: ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b ix1, iy1 = max(ax1, bx1), max(ay1, by1) ix2, iy2 = min(ax2, bx2), min(ay2, by2) iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1) inter = iw * ih if inter <= 0: return 0.0 area_a = (ax2 - ax1) * (ay2 - ay1) area_b = (bx2 - bx1) * (by2 - by1) union = area_a + area_b - inter return inter / union if union > 0 else 0.0 def _match_tracks_detections( prev_list: List[Tuple[int, Tuple[float, float, float, float]]], curr_boxes: List[Tuple[float, float, float, float]], iou_thresh: float, exclude_prev: set, exclude_curr: set, ) -> List[Tuple[int, int]]: prev_filtered = [(pi, tid, pbox) for pi, (tid, pbox) in enumerate(prev_list) if pi not in exclude_prev] curr_filtered = [(ci, cbox) for ci, cbox in enumerate(curr_boxes) if ci not in exclude_curr] if not prev_filtered or not curr_filtered: return [] n_prev, n_curr = len(prev_filtered), len(curr_filtered) iou_mat = np.zeros((n_prev, n_curr), dtype=np.float64) for i, (_, _, pbox) in enumerate(prev_filtered): for j, (_, cbox) in enumerate(curr_filtered): iou_mat[i, j] = _iou_box4(pbox, cbox) cost = 1.0 - iou_mat cost[iou_mat < iou_thresh] = 1e9 if _linear_sum_assignment is not None: row_ind, col_ind = _linear_sum_assignment(cost) matches = [ (prev_filtered[row_ind[k]][0], curr_filtered[col_ind[k]][0]) for k in range(len(row_ind)) if cost[row_ind[k], col_ind[k]] < 1.0 ] else: matches = [] iou_pairs = [ (iou_mat[i, j], i, j) for i in range(n_prev) for j in range(n_curr) if iou_mat[i, j] >= iou_thresh ] iou_pairs.sort(key=lambda x: -x[0]) used_prev, used_curr = set(), set() for _, i, j in iou_pairs: pi = prev_filtered[i][0] ci = curr_filtered[j][0] if pi in used_prev or ci in used_curr: continue matches.append((pi, ci)) used_prev.add(pi) used_curr.add(ci) return matches def _predict_box(prev: Tuple[float, float, float, float], last: Tuple[float, float, float, float]) -> Tuple[float, float, float, float]: px1, py1, px2, py2 = prev lx1, ly1, lx2, ly2 = last pcx = 0.5 * (px1 + px2) pcy = 0.5 * (py1 + py2) lcx = 0.5 * (lx1 + lx2) lcy = 0.5 * (ly1 + ly2) w = lx2 - lx1 h = ly2 - ly1 ncx = 2.0 * lcx - pcx ncy = 2.0 * lcy - pcy return (ncx - w * 0.5, ncy - h * 0.5, ncx + w * 0.5, ncy + h * 0.5) def _assign_person_track_ids( prev_state: Dict[int, Tuple[Tuple[float, float, float, float], Tuple[float, float, float, float], int]], next_id: int, results: list, iou_thresh: float = TRACK_IOU_THRESH, iou_high: float = TRACK_IOU_HIGH, iou_low: float = TRACK_IOU_LOW, max_age: int = TRACK_MAX_AGE, use_velocity: bool = TRACK_USE_VELOCITY, ) -> Tuple[Dict[int, Tuple[Tuple[float, float, float, float], Tuple[float, float, float, float], int]], int, List[List[int]]]: state = {tid: (prev_box, last_box, age) for tid, (prev_box, last_box, age) in prev_state.items()} nid = next_id ids_per_result: List[List[int]] = [] for result in results: if getattr(result, "boxes", None) is None or len(result.boxes) == 0: state = { tid: (prev_box, last_box, age + 1) for tid, (prev_box, last_box, age) in state.items() if age + 1 <= max_age } ids_per_result.append([]) continue b = result.boxes xyxy = b.xyxy.cpu().numpy() curr_boxes = [tuple(float(x) for x in row) for row in xyxy] prev_list: List[Tuple[int, Tuple[float, float, float, float]]] = [] for tid, (prev_box, last_box, _age) in state.items(): if use_velocity and (prev_box != last_box): pbox = _predict_box(prev_box, last_box) else: pbox = last_box prev_list.append((tid, pbox)) stage1 = _match_tracks_detections(prev_list, curr_boxes, iou_high, set(), set()) assigned_prev = {pi for pi, _ in stage1} assigned_curr = {ci for _, ci in stage1} stage2 = _match_tracks_detections(prev_list, curr_boxes, iou_low, assigned_prev, assigned_curr) for pi, ci in stage2: assigned_prev.add(pi) assigned_curr.add(ci) tid_per_curr: Dict[int, int] = {} for pi, ci in stage1 + stage2: tid_per_curr[ci] = prev_list[pi][0] ids: List[int] = [] new_state: Dict[int, Tuple[Tuple[float, float, float, float], Tuple[float, float, float, float], int]] = {} for ci, cbox in enumerate(curr_boxes): if ci in tid_per_curr: tid = tid_per_curr[ci] _prev, last_box, _ = state[tid] new_state[tid] = (last_box, cbox, 0) else: tid = nid nid += 1 new_state[tid] = (cbox, cbox, 0) ids.append(tid) for pi in range(len(prev_list)): if pi in assigned_prev: continue tid = prev_list[pi][0] prev_box, last_box, age = state[tid] if age + 1 <= max_age: new_state[tid] = (prev_box, last_box, age + 1) state = new_state ids_per_result.append(ids) return (state, nid, ids_per_result) def _iou_bbox(a: "BoundingBox", b: "BoundingBox") -> float: ax1, ay1, ax2, ay2 = int(a.x1), int(a.y1), int(a.x2), int(a.y2) bx1, by1, bx2, by2 = int(b.x1), int(b.y1), int(b.x2), int(b.y2) ix1, iy1 = max(ax1, bx1), max(ay1, by1) ix2, iy2 = min(ax2, bx2), min(ay2, by2) iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) inter = iw * ih if inter <= 0: return 0.0 area_a = (ax2 - ax1) * (ay2 - ay1) area_b = (bx2 - bx1) * (by2 - by1) union = area_a + area_b - inter return inter / union if union > 0 else 0.0 def _adjust_boxes( bboxes: List["BoundingBox"], frame_width: int, frame_height: int, overlap_iou: float = OVERLAP_IOU, do_goalkeeper_dedup: bool = True, do_referee_disambiguation: bool = True, ) -> List["BoundingBox"]: """Overlap NMS, goalkeeper dedup, referee disambiguation (no ball).""" kept: List[BoundingBox] = list(bboxes or []) W, H = int(frame_width), int(frame_height) cy = 0.5 * float(H) if overlap_iou > 0 and len(kept) > 1: non_balls = [bb for bb in kept if int(bb.cls_id) != 0] if len(non_balls) > 1: non_balls_sorted = sorted(non_balls, key=lambda bb: float(bb.conf), reverse=True) kept_nb = [] for cand in non_balls_sorted: skip = False for k in kept_nb: iou = _iou_bbox(cand, k) if iou >= overlap_iou: skip = True break if ( abs(int(cand.x1) - int(k.x1)) <= 3 and abs(int(cand.y1) - int(k.y1)) <= 3 and abs(int(cand.x2) - int(k.x2)) <= 3 and abs(int(cand.y2) - int(k.y2)) <= 3 and iou > 0.85 ): skip = True break if not skip: kept_nb.append(cand) kept = kept_nb if do_goalkeeper_dedup: gks = [bb for bb in kept if int(bb.cls_id) == _C_GOALKEEPER] if len(gks) > 1: best_gk = max(gks, key=lambda bb: float(bb.conf)) best_gk_conf = float(best_gk.conf) deduped = [] for bb in kept: if int(bb.cls_id) == _C_GOALKEEPER: if float(bb.conf) < best_gk_conf or (float(bb.conf) == best_gk_conf and bb is not best_gk): deduped.append(BoundingBox(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=_C_PLAYER, conf=float(bb.conf), team_id=bb.team_id, track_id=bb.track_id)) else: deduped.append(bb) else: deduped.append(bb) kept = deduped if do_referee_disambiguation: refs = [bb for bb in kept if int(bb.cls_id) == _C_REFEREE] if len(refs) > 1: best_ref = min(refs, key=lambda bb: (0.5 * (bb.y1 + bb.y2) - cy) ** 2) kept = [bb for bb in kept if int(bb.cls_id) != _C_REFEREE or bb is best_ref] return kept # ── OSNet team classification (turbo_7 style) ──────────────── TEAM_1_ID = 6 TEAM_2_ID = 7 PLAYER_CLS_ID = 2 _OSNET_MODEL = None osnet_weight_path = None OSNET_IMAGE_SIZE = (64, 32) # (height, width) OSNET_PREPROCESS = T.Compose([ T.Resize(OSNET_IMAGE_SIZE), T.ToTensor(), T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) def _crop_upper_body(frame: ndarray, box: "BoundingBox") -> ndarray: return frame[ max(0, box.y1):max(0, box.y2), max(0, box.x1):max(0, box.x2) ] def _preprocess_osnet(crop: ndarray) -> torch.Tensor: rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) pil = Image.fromarray(rgb) return OSNET_PREPROCESS(pil) def _filter_player_boxes(boxes: List["BoundingBox"]) -> List["BoundingBox"]: return [b for b in boxes if b.cls_id == PLAYER_CLS_ID] def _extract_osnet_embeddings( frames: List[ndarray], batch_boxes: Dict[int, List["BoundingBox"]], device: str = "cuda", ) -> Tuple[Optional[ndarray], Optional[List["BoundingBox"]]]: global _OSNET_MODEL crops = [] meta = [] sorted_frame_ids = sorted(batch_boxes.keys()) for idx, frame_idx in enumerate(sorted_frame_ids): frame = frames[idx] if idx < len(frames) else None if frame is None: continue boxes = batch_boxes[frame_idx] players = _filter_player_boxes(boxes) for box in players: crop = _crop_upper_body(frame, box) if crop.size == 0: continue crops.append(_preprocess_osnet(crop)) meta.append(box) if not crops: return None, None batch = torch.stack(crops).to(device, non_blocking=True).float() use_amp = device == "cuda" with torch.inference_mode(): with torch.amp.autocast("cuda", enabled=use_amp): embeddings = _OSNET_MODEL(batch) del batch embeddings = embeddings.cpu().numpy() return embeddings, meta def _aggregate_by_track( embeddings: ndarray, meta: List["BoundingBox"], ) -> Tuple[ndarray, List["BoundingBox"]]: track_map = defaultdict(list) box_map = {} for emb, box in zip(embeddings, meta): key = box.track_id if box.track_id is not None else id(box) track_map[key].append(emb) box_map[key] = box agg_embeddings = [] agg_boxes = [] for key, embs in track_map.items(): mean_emb = np.mean(embs, axis=0) norm = np.linalg.norm(mean_emb) if norm > 1e-12: mean_emb /= norm agg_embeddings.append(mean_emb) agg_boxes.append(box_map[key]) return np.array(agg_embeddings), agg_boxes def _update_team_ids(boxes: List["BoundingBox"], labels: ndarray) -> None: for box, label in zip(boxes, labels): # box.cls_id = TEAM_1_ID if label == 0 else TEAM_2_ID box.team_id = 1 if label == 0 else 2 def _classify_teams_batch( frames: List[ndarray], batch_boxes: Dict[int, List["BoundingBox"]], device: str = "cuda", ) -> None: embeddings, meta = _extract_osnet_embeddings(frames, batch_boxes, device) if embeddings is None: return embeddings, agg_boxes = _aggregate_by_track(embeddings, meta) n = len(embeddings) if n == 0: return if n == 1: agg_boxes[0].cls_id = TEAM_1_ID return kmeans = KMeans(n_clusters=2, n_init=2, random_state=42) kmeans.fit(embeddings) centroids = kmeans.cluster_centers_ c0, c1 = centroids[0], centroids[1] norm_0 = np.linalg.norm(c0) norm_1 = np.linalg.norm(c1) similarity = np.dot(c0, c1) / (norm_0 * norm_1 + 1e-12) if similarity > 0.95: for b in agg_boxes: b.cls_id = TEAM_1_ID return if norm_0 <= norm_1: kmeans.labels_ = 1 - kmeans.labels_ _update_team_ids(agg_boxes, kmeans.labels_) class ConvLayer(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, groups=1, IN=False): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, bias=False, groups=groups) self.bn = nn.InstanceNorm2d(out_channels, affine=True) if IN else nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() def forward(self, x): return self.relu(self.bn(self.conv(x))) class Conv1x1(nn.Module): def __init__(self, in_channels, out_channels, stride=1, groups=1): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, 1, stride=stride, padding=0, bias=False, groups=groups) self.bn = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() def forward(self, x): return self.relu(self.bn(self.conv(x))) class Conv1x1Linear(nn.Module): def __init__(self, in_channels, out_channels, stride=1, bn=True): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, 1, stride=stride, padding=0, bias=False) self.bn = nn.BatchNorm2d(out_channels) if bn else None def forward(self, x): x = self.conv(x) return self.bn(x) if self.bn is not None else x class Conv3x3(nn.Module): def __init__(self, in_channels, out_channels, stride=1, groups=1): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False, groups=groups) self.bn = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() def forward(self, x): return self.relu(self.bn(self.conv(x))) class LightConv3x3(nn.Module): def __init__(self, in_channels, out_channels): super().__init__() self.conv1 = nn.Conv2d(in_channels, out_channels, 1, stride=1, padding=0, bias=False) self.conv2 = nn.Conv2d(out_channels, out_channels, 3, stride=1, padding=1, bias=False, groups=out_channels) self.bn = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() def forward(self, x): x = self.conv1(x) x = self.conv2(x) return self.relu(self.bn(x)) class LightConvStream(nn.Module): def __init__(self, in_channels, out_channels, depth): super().__init__() layers = [LightConv3x3(in_channels, out_channels)] for _ in range(depth - 1): layers.append(LightConv3x3(out_channels, out_channels)) self.layers = nn.Sequential(*layers) def forward(self, x): return self.layers(x) class ChannelGate(nn.Module): def __init__(self, in_channels, num_gates=None, return_gates=False, gate_activation='sigmoid', reduction=16, layer_norm=False): super().__init__() if num_gates is None: num_gates = in_channels self.return_gates = return_gates self.global_avgpool = nn.AdaptiveAvgPool2d(1) self.fc1 = nn.Conv2d(in_channels, in_channels // reduction, kernel_size=1, bias=True, padding=0) self.norm1 = nn.LayerNorm((in_channels // reduction, 1, 1)) if layer_norm else None self.relu = nn.ReLU() self.fc2 = nn.Conv2d(in_channels // reduction, num_gates, kernel_size=1, bias=True, padding=0) self.gate_activation = nn.Sigmoid() if gate_activation == 'sigmoid' else nn.ReLU() def forward(self, x): input = x x = self.global_avgpool(x) x = self.fc1(x) if self.norm1 is not None: x = self.norm1(x) x = self.relu(x) x = self.fc2(x) if self.gate_activation is not None: x = self.gate_activation(x) return x if self.return_gates else input * x class OSBlockX1(nn.Module): def __init__(self, in_channels, out_channels, IN=False, bottleneck_reduction=4): super().__init__() mid_channels = out_channels // bottleneck_reduction self.conv1 = Conv1x1(in_channels, mid_channels) self.conv2a = LightConv3x3(mid_channels, mid_channels) self.conv2b = nn.Sequential(LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels)) self.conv2c = nn.Sequential(LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels)) self.conv2d = nn.Sequential(LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels)) self.gate = ChannelGate(mid_channels) self.conv3 = Conv1x1Linear(mid_channels, out_channels) self.downsample = Conv1x1Linear(in_channels, out_channels) if in_channels != out_channels else None self.IN = nn.InstanceNorm2d(out_channels, affine=True) if IN else None def forward(self, x): identity = x x1 = self.conv1(x) x2 = self.gate(self.conv2a(x1)) + self.gate(self.conv2b(x1)) + self.gate(self.conv2c(x1)) + self.gate(self.conv2d(x1)) x3 = self.conv3(x2) if self.downsample is not None: identity = self.downsample(identity) out = x3 + identity if self.IN is not None: out = self.IN(out) return F.relu(out) class OSNetX1(nn.Module): def __init__(self, num_classes, blocks, layers, channels, feature_dim=512, loss='softmax', IN=False): super().__init__() self.loss = loss self.feature_dim = feature_dim self.conv1 = ConvLayer(3, channels[0], 7, stride=2, padding=3, IN=IN) self.maxpool = nn.MaxPool2d(3, stride=2, padding=1) self.conv2 = self._make_layer(blocks[0], layers[0], channels[0], channels[1], reduce_spatial_size=True, IN=IN) self.conv3 = self._make_layer(blocks[1], layers[1], channels[1], channels[2], reduce_spatial_size=True) self.conv4 = self._make_layer(blocks[2], layers[2], channels[2], channels[3], reduce_spatial_size=False) self.conv5 = Conv1x1(channels[3], channels[3]) self.global_avgpool = nn.AdaptiveAvgPool2d(1) self.fc = self._construct_fc_layer(feature_dim, channels[3], dropout_p=None) self.classifier = nn.Linear(self.feature_dim, num_classes) self._init_params() def _make_layer(self, block, layer, in_channels, out_channels, reduce_spatial_size, IN=False): layers_list = [block(in_channels, out_channels, IN=IN)] for _ in range(1, layer): layers_list.append(block(out_channels, out_channels, IN=IN)) if reduce_spatial_size: layers_list.append(nn.Sequential(Conv1x1(out_channels, out_channels), nn.AvgPool2d(2, stride=2))) return nn.Sequential(*layers_list) def _construct_fc_layer(self, fc_dims, input_dim, dropout_p=None): if fc_dims is None or fc_dims < 0: self.feature_dim = input_dim return None if isinstance(fc_dims, int): fc_dims = [fc_dims] layers_list = [] for dim in fc_dims: layers_list.append(nn.Linear(input_dim, dim)) layers_list.append(nn.BatchNorm1d(dim)) layers_list.append(nn.ReLU(inplace=True)) if dropout_p is not None: layers_list.append(nn.Dropout(p=dropout_p)) input_dim = dim self.feature_dim = fc_dims[-1] return nn.Sequential(*layers_list) def _init_params(self): for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') if m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) elif isinstance(m, nn.BatchNorm1d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) elif isinstance(m, nn.InstanceNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) elif isinstance(m, nn.Linear): nn.init.normal_(m.weight, 0, 0.01) if m.bias is not None: nn.init.constant_(m.bias, 0) def forward(self, x, return_featuremaps=False): x = self.conv1(x) x = self.maxpool(x) x = self.conv2(x) x = self.conv3(x) x = self.conv4(x) x = self.conv5(x) if return_featuremaps: return x v = self.global_avgpool(x) v = v.view(v.size(0), -1) if self.fc is not None: v = self.fc(v) if not self.training: return v y = self.classifier(v) if self.loss == 'softmax': return y elif self.loss == 'triplet': return y, v raise KeyError(f"Unsupported loss: {self.loss}") def osnet_x1_0(num_classes=1000, pretrained=True, loss='softmax', **kwargs): return OSNetX1( num_classes, blocks=[OSBlockX1, OSBlockX1, OSBlockX1], layers=[2, 2, 2], channels=[64, 256, 384, 512], loss=loss, **kwargs, ) def load_checkpoint_osnet(fpath): fpath = os.path.abspath(os.path.expanduser(fpath)) map_location = None if torch.cuda.is_available() else 'cpu' checkpoint = torch.load(fpath, map_location=map_location, weights_only=False) return checkpoint def load_pretrained_weights_osnet(model, weight_path): checkpoint = load_checkpoint_osnet(weight_path) state_dict = checkpoint.get('state_dict', checkpoint) model_dict = model.state_dict() new_state_dict = OrderedDict() for k, v in state_dict.items(): if k.startswith('module.'): k = k[7:] if k in model_dict and model_dict[k].size() == v.size(): new_state_dict[k] = v model_dict.update(new_state_dict) model.load_state_dict(model_dict) def load_osnet(device="cuda", weight_path=None): model = osnet_x1_0(num_classes=1, loss='softmax', pretrained=False) weight_path = Path(weight_path) if weight_path else None if weight_path and weight_path.exists(): load_pretrained_weights_osnet(model, str(weight_path)) model.eval() model.to(device) return model def _resolve_player_cls_id(model: YOLO, fallback: int = PLAYER_CLS_ID) -> int: names = getattr(model, "names", None) if not names: names = getattr(getattr(model, "model", None), "names", None) if isinstance(names, dict): for idx, name in names.items(): if str(name).lower() in ("player", "players"): return int(idx) if isinstance(names, list): for idx, name in enumerate(names): if str(name).lower() in ("player", "players"): return int(idx) return fallback # ── HRNet architecture ─────────────────────────────────────────── BatchNorm2d = nn.BatchNorm2d BN_MOMENTUM = 0.1 def conv3x3(in_planes, out_planes, stride=1): return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False) class BasicBlock(nn.Module): expansion = 1 def __init__(self, inplanes, planes, stride=1, downsample=None): super().__init__() self.conv1 = conv3x3(inplanes, planes, stride) self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM) self.relu = nn.ReLU(inplace=True) self.conv2 = conv3x3(planes, planes) self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM) self.downsample = downsample self.stride = stride def forward(self, x): residual = x out = self.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) if self.downsample is not None: residual = self.downsample(x) return self.relu(out + residual) class Bottleneck(nn.Module): expansion = 4 def __init__(self, inplanes, planes, stride=1, downsample=None): super().__init__() self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM) self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False) self.bn3 = BatchNorm2d(planes * self.expansion, momentum=BN_MOMENTUM) self.relu = nn.ReLU(inplace=True) self.downsample = downsample self.stride = stride def forward(self, x): residual = x out = self.relu(self.bn1(self.conv1(x))) out = self.relu(self.bn2(self.conv2(out))) out = self.bn3(self.conv3(out)) if self.downsample is not None: residual = self.downsample(x) return self.relu(out + residual) blocks_dict = {'BASIC': BasicBlock, 'BOTTLENECK': Bottleneck} class HighResolutionModule(nn.Module): def __init__(self, num_branches, blocks, num_blocks, num_inchannels, num_channels, fuse_method, multi_scale_output=True): super().__init__() self.num_inchannels = num_inchannels self.fuse_method = fuse_method self.num_branches = num_branches self.multi_scale_output = multi_scale_output self.branches = self._make_branches(num_branches, blocks, num_blocks, num_channels) self.fuse_layers = self._make_fuse_layers() self.relu = nn.ReLU(inplace=True) def _make_one_branch(self, branch_index, block, num_blocks, num_channels, stride=1): downsample = None if stride != 1 or self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion: downsample = nn.Sequential( nn.Conv2d(self.num_inchannels[branch_index], num_channels[branch_index] * block.expansion, kernel_size=1, stride=stride, bias=False), BatchNorm2d(num_channels[branch_index] * block.expansion, momentum=BN_MOMENTUM), ) layers = [block(self.num_inchannels[branch_index], num_channels[branch_index], stride, downsample)] self.num_inchannels[branch_index] = num_channels[branch_index] * block.expansion for _ in range(1, num_blocks[branch_index]): layers.append(block(self.num_inchannels[branch_index], num_channels[branch_index])) return nn.Sequential(*layers) def _make_branches(self, num_branches, block, num_blocks, num_channels): return nn.ModuleList([self._make_one_branch(i, block, num_blocks, num_channels) for i in range(num_branches)]) def _make_fuse_layers(self): if self.num_branches == 1: return None num_branches = self.num_branches num_inchannels = self.num_inchannels fuse_layers = [] for i in range(num_branches if self.multi_scale_output else 1): fuse_layer = [] for j in range(num_branches): if j > i: fuse_layer.append(nn.Sequential( nn.Conv2d(num_inchannels[j], num_inchannels[i], 1, 1, 0, bias=False), BatchNorm2d(num_inchannels[i], momentum=BN_MOMENTUM))) elif j == i: fuse_layer.append(None) else: conv3x3s = [] for k in range(i - j): if k == i - j - 1: conv3x3s.append(nn.Sequential( nn.Conv2d(num_inchannels[j], num_inchannels[i], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[i], momentum=BN_MOMENTUM))) else: conv3x3s.append(nn.Sequential( nn.Conv2d(num_inchannels[j], num_inchannels[j], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[j], momentum=BN_MOMENTUM), nn.ReLU(inplace=True))) fuse_layer.append(nn.Sequential(*conv3x3s)) fuse_layers.append(nn.ModuleList(fuse_layer)) return nn.ModuleList(fuse_layers) def get_num_inchannels(self): return self.num_inchannels def forward(self, x): if self.num_branches == 1: return [self.branches[0](x[0])] for i in range(self.num_branches): x[i] = self.branches[i](x[i]) x_fuse = [] for i in range(len(self.fuse_layers)): y = x[0] if i == 0 else self.fuse_layers[i][0](x[0]) for j in range(1, self.num_branches): if i == j: y = y + x[j] elif j > i: y = y + F.interpolate(self.fuse_layers[i][j](x[j]), size=[x[i].shape[2], x[i].shape[3]], mode='bilinear') else: y = y + self.fuse_layers[i][j](x[j]) x_fuse.append(self.relu(y)) return x_fuse class HighResolutionNet(nn.Module): def __init__(self, config, lines=False, **kwargs): self.inplanes = 64 self.lines = lines extra = config['MODEL']['EXTRA'] super().__init__() self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1, bias=False) self.bn1 = BatchNorm2d(64, momentum=BN_MOMENTUM) self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1, bias=False) self.bn2 = BatchNorm2d(64, momentum=BN_MOMENTUM) self.relu = nn.ReLU(inplace=True) self.layer1 = self._make_layer(Bottleneck, 64, 64, 4) self.stage2_cfg = extra['STAGE2'] num_channels = self.stage2_cfg['NUM_CHANNELS'] block = blocks_dict[self.stage2_cfg['BLOCK']] num_channels = [num_channels[i] * block.expansion for i in range(len(num_channels))] self.transition1 = self._make_transition_layer([256], num_channels) self.stage2, pre_stage_channels = self._make_stage(self.stage2_cfg, num_channels) self.stage3_cfg = extra['STAGE3'] num_channels = self.stage3_cfg['NUM_CHANNELS'] block = blocks_dict[self.stage3_cfg['BLOCK']] num_channels = [num_channels[i] * block.expansion for i in range(len(num_channels))] self.transition2 = self._make_transition_layer(pre_stage_channels, num_channels) self.stage3, pre_stage_channels = self._make_stage(self.stage3_cfg, num_channels) self.stage4_cfg = extra['STAGE4'] num_channels = self.stage4_cfg['NUM_CHANNELS'] block = blocks_dict[self.stage4_cfg['BLOCK']] num_channels = [num_channels[i] * block.expansion for i in range(len(num_channels))] self.transition3 = self._make_transition_layer(pre_stage_channels, num_channels) self.stage4, pre_stage_channels = self._make_stage(self.stage4_cfg, num_channels, multi_scale_output=True) self.upsample = nn.Upsample(scale_factor=2, mode='nearest') final_inp_channels = sum(pre_stage_channels) + self.inplanes self.head = nn.Sequential(nn.Sequential( nn.Conv2d(final_inp_channels, final_inp_channels, kernel_size=1), BatchNorm2d(final_inp_channels, momentum=BN_MOMENTUM), nn.ReLU(inplace=True), nn.Conv2d(final_inp_channels, config['MODEL']['NUM_JOINTS'], kernel_size=extra['FINAL_CONV_KERNEL']), nn.Softmax(dim=1) if not self.lines else nn.Sigmoid())) def _make_head(self, x, x_skip): x = self.upsample(x) x = torch.cat([x, x_skip], dim=1) return self.head(x) def _make_transition_layer(self, num_channels_pre_layer, num_channels_cur_layer): num_branches_cur = len(num_channels_cur_layer) num_branches_pre = len(num_channels_pre_layer) transition_layers = [] for i in range(num_branches_cur): if i < num_branches_pre: if num_channels_cur_layer[i] != num_channels_pre_layer[i]: transition_layers.append(nn.Sequential( nn.Conv2d(num_channels_pre_layer[i], num_channels_cur_layer[i], 3, 1, 1, bias=False), BatchNorm2d(num_channels_cur_layer[i], momentum=BN_MOMENTUM), nn.ReLU(inplace=True))) else: transition_layers.append(None) else: conv3x3s = [] for j in range(i + 1 - num_branches_pre): inchannels = num_channels_pre_layer[-1] outchannels = num_channels_cur_layer[i] if j == i - num_branches_pre else inchannels conv3x3s.append(nn.Sequential( nn.Conv2d(inchannels, outchannels, 3, 2, 1, bias=False), BatchNorm2d(outchannels, momentum=BN_MOMENTUM), nn.ReLU(inplace=True))) transition_layers.append(nn.Sequential(*conv3x3s)) return nn.ModuleList(transition_layers) def _make_layer(self, block, inplanes, planes, blocks, stride=1): downsample = None if stride != 1 or inplanes != planes * block.expansion: downsample = nn.Sequential( nn.Conv2d(inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False), BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM), ) layers = [block(inplanes, planes, stride, downsample)] inplanes = planes * block.expansion for _ in range(1, blocks): layers.append(block(inplanes, planes)) return nn.Sequential(*layers) def _make_stage(self, layer_config, num_inchannels, multi_scale_output=True): num_modules = layer_config['NUM_MODULES'] num_branches = layer_config['NUM_BRANCHES'] num_blocks = layer_config['NUM_BLOCKS'] num_channels = layer_config['NUM_CHANNELS'] block = blocks_dict[layer_config['BLOCK']] fuse_method = layer_config['FUSE_METHOD'] modules = [] for i in range(num_modules): reset_multi_scale_output = True if multi_scale_output or i < num_modules - 1 else False modules.append(HighResolutionModule( num_branches, block, num_blocks, num_inchannels, num_channels, fuse_method, reset_multi_scale_output)) num_inchannels = modules[-1].get_num_inchannels() return nn.Sequential(*modules), num_inchannels def forward(self, x): x = self.conv1(x) x_skip = x.clone() x = self.relu(self.bn1(x)) x = self.relu(self.bn2(self.conv2(x))) x = self.layer1(x) x_list = [] for i in range(self.stage2_cfg['NUM_BRANCHES']): x_list.append(self.transition1[i](x) if self.transition1[i] is not None else x) y_list = self.stage2(x_list) x_list = [] for i in range(self.stage3_cfg['NUM_BRANCHES']): x_list.append(self.transition2[i](y_list[-1]) if self.transition2[i] is not None else y_list[i]) y_list = self.stage3(x_list) x_list = [] for i in range(self.stage4_cfg['NUM_BRANCHES']): x_list.append(self.transition3[i](y_list[-1]) if self.transition3[i] is not None else y_list[i]) x = self.stage4(x_list) height, width = x[0].size(2), x[0].size(3) x1 = F.interpolate(x[1], size=(height, width), mode='bilinear', align_corners=False) x2 = F.interpolate(x[2], size=(height, width), mode='bilinear', align_corners=False) x3 = F.interpolate(x[3], size=(height, width), mode='bilinear', align_corners=False) x = torch.cat([x[0], x1, x2, x3], 1) return self._make_head(x, x_skip) def init_weights(self, pretrained=''): for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) if pretrained: if os.path.isfile(pretrained): pretrained_dict = torch.load(pretrained) model_dict = self.state_dict() pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict} model_dict.update(pretrained_dict) self.load_state_dict(model_dict) else: sys.exit(f'Weights {pretrained} not found.') def get_cls_net(config, pretrained='', **kwargs): model = HighResolutionNet(config, **kwargs) model.init_weights(pretrained) return model # ── Keypoint mapping & inference helpers ───────────────────────── map_keypoints = { 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, 45: 9, 50: 31, 52: 32, 57: 22 } # Template keypoints for homography refinement (new-5 style) TEMPLATE_F0: List[Tuple[float, float]] = [ (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), (55, 250), (55, 430), (110, 340), (165, 140), (165, 270), (165, 410), (165, 540), (527, 5), (527, 253), (527, 433), (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), (940, 340), (998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540), (1045, 675), (435, 340), (615, 340), ] TEMPLATE_F1: List[Tuple[float, float]] = [ (2.5, 2.5), (2.5, 139.5), (2.5, 249.5), (2.5, 430.5), (2.5, 540.5), (2.5, 678), (54.5, 249.5), (54.5, 430.5), (110.5, 340.5), (164.5, 139.5), (164.5, 269), (164.5, 411), (164.5, 540.5), (525, 2.5), (525, 249.5), (525, 430.5), (525, 678), (886.5, 139.5), (886.5, 269), (886.5, 411), (886.5, 540.5), (940.5, 340.5), (998, 249.5), (998, 430.5), (1048, 2.5), (1048, 139.5), (1048, 249.5), (1048, 430.5), (1048, 540.5), (1048, 678), (434.5, 340), (615.5, 340), ] HOMOGRAPHY_FILL_ONLY_VALID = True KP_THRESHOLD = 0.2 # new-5 style (was 0.3) # HRNet: smaller input = faster; 432x768 balances speed/accuracy (new-2 style) KP_H, KP_W = 360, 640 HRNET_BATCH_SIZE = 16 # larger batch = faster (if GPU mem allows) def _preprocess_batch(frames): target_h, target_w = KP_H, KP_W batch = [] for frame in frames: img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = cv2.resize(img, (target_w, target_h)).astype(np.float32) / 255.0 batch.append(np.transpose(img, (2, 0, 1))) return torch.from_numpy(np.stack(batch)).float() def _extract_keypoints(heatmap: torch.Tensor, scale: int = 2): b, c, h, w = heatmap.shape max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1) local_maxima = (max_pooled == heatmap) masked = heatmap * local_maxima flat = masked.view(b, c, -1) scores, indices = torch.topk(flat, 1, dim=-1, sorted=False) y_coords = torch.div(indices, w, rounding_mode="floor") * scale x_coords = (indices % w) * scale return torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1) def _process_keypoints(kp_coords, threshold, w, h, batch_size): kp_np = kp_coords.cpu().numpy() results = [] for b_idx in range(batch_size): kp_dict = {} valid = np.where(kp_np[b_idx, :, 0, 2] > threshold)[0] for ch_idx in valid: kp_dict[ch_idx + 1] = { 'x': float(kp_np[b_idx, ch_idx, 0, 0]) / w, 'y': float(kp_np[b_idx, ch_idx, 0, 1]) / h, 'p': float(kp_np[b_idx, ch_idx, 0, 2]), } results.append(kp_dict) return results def _run_hrnet_batch(frames, model, threshold, batch_size=16): if not frames or model is None: return [] device = next(model.parameters()).device use_amp = device.type == "cuda" results = [] for i in range(0, len(frames), batch_size): chunk = frames[i:i + batch_size] batch = _preprocess_batch(chunk).to(device, non_blocking=True) with torch.inference_mode(): with torch.amp.autocast("cuda", enabled=use_amp): heatmaps = model(batch) kp_coords = _extract_keypoints(heatmaps[:, :-1, :, :], scale=2) batch_kps = _process_keypoints(kp_coords, threshold, KP_W, KP_H, len(chunk)) results.extend(batch_kps) del heatmaps, kp_coords, batch if results: gc.collect() return results def _apply_keypoint_mapping(kp_dict): return {map_keypoints[k]: v for k, v in kp_dict.items() if k in map_keypoints} def _normalize_keypoints(kp_results, frames, n_keypoints): keypoints = [] max_frames = min(len(kp_results), len(frames)) for i in range(max_frames): kp_dict = kp_results[i] h, w = frames[i].shape[:2] frame_kps = [] for idx in range(n_keypoints): kp_idx = idx + 1 x, y = 0, 0 if kp_idx in kp_dict: d = kp_dict[kp_idx] if isinstance(d, dict) and 'x' in d: x = int(d['x'] * w) y = int(d['y'] * h) frame_kps.append((x, y)) keypoints.append(frame_kps) return keypoints def _fix_keypoints(kps: list, n: int) -> list: if len(kps) < n: kps += [(0, 0)] * (n - len(kps)) elif len(kps) > n: kps = kps[:n] if kps[2] != (0,0) and kps[4] != (0,0) and kps[3] == (0,0): kps[3] = kps[4]; kps[4] = (0,0) if kps[0] != (0,0) and kps[4] != (0,0) and kps[1] == (0,0): kps[1] = kps[4]; kps[4] = (0,0) if kps[2] != (0,0) and kps[3] != (0,0) and kps[1] == (0,0) and kps[3][0] > kps[2][0]: kps[1] = kps[3]; kps[3] = (0,0) if kps[28] != (0,0) and kps[25] == (0,0) and kps[26] != (0,0) and kps[26][0] > kps[28][0]: kps[25] = kps[28]; kps[28] = (0,0) if kps[24] != (0,0) and kps[28] != (0,0) and kps[25] == (0,0): kps[25] = kps[28]; kps[28] = (0,0) if kps[24] != (0,0) and kps[27] != (0,0) and kps[26] == (0,0): kps[26] = kps[27]; kps[27] = (0,0) if kps[28] != (0,0) and kps[23] == (0,0) and kps[20] != (0,0) and kps[20][1] > kps[23][1]: kps[23] = kps[20]; kps[20] = (0,0) return kps def _keypoints_to_float(keypoints: list) -> List[List[float]]: """Convert keypoints to [[x, y], ...] float format for homography.""" return [[float(x), float(y)] for x, y in keypoints] def _keypoints_to_int(keypoints: list) -> List[Tuple[int, int]]: """Convert keypoints to [(x, y), ...] integer format.""" return [(int(round(float(kp[0]))), int(round(float(kp[1])))) for kp in keypoints] def _apply_homography_refinement( keypoints: List[List[float]], frame: np.ndarray, n_keypoints: int, ) -> List[List[float]]: """Refine keypoints using homography from template to frame (new-5 style).""" if n_keypoints != 32 or len(TEMPLATE_F0) != 32 or len(TEMPLATE_F1) != 32: return keypoints frame_height, frame_width = frame.shape[:2] valid_src: List[Tuple[float, float]] = [] valid_dst: List[Tuple[float, float]] = [] valid_indices: List[int] = [] for kp_idx, kp in enumerate(keypoints): if kp and len(kp) >= 2: x, y = float(kp[0]), float(kp[1]) if not (abs(x) < 1e-6 and abs(y) < 1e-6) and 0 <= x < frame_width and 0 <= y < frame_height: valid_src.append(TEMPLATE_F1[kp_idx]) valid_dst.append((x, y)) valid_indices.append(kp_idx) if len(valid_src) < 4: return keypoints src_pts = np.array(valid_src, dtype=np.float32) dst_pts = np.array(valid_dst, dtype=np.float32) H, _ = cv2.findHomography(src_pts, dst_pts) if H is None: return keypoints all_template_points = np.array(TEMPLATE_F0, dtype=np.float32).reshape(-1, 1, 2) adjusted_points = cv2.perspectiveTransform(all_template_points, H) adjusted_points = adjusted_points.reshape(-1, 2) adj_x = adjusted_points[:32, 0] adj_y = adjusted_points[:32, 1] valid_mask = (adj_x >= 0) & (adj_y >= 0) & (adj_x < frame_width) & (adj_y < frame_height) valid_indices_set = set(valid_indices) adjusted_kps: List[List[float]] = [[0.0, 0.0] for _ in range(32)] for i in np.where(valid_mask)[0]: if not HOMOGRAPHY_FILL_ONLY_VALID or i in valid_indices_set: adjusted_kps[i] = [float(adj_x[i]), float(adj_y[i])] return adjusted_kps # ── Pydantic models ─────────────────────────────────────────────────────────── # Team assignment: 6 = team 1, 7 = team 2 TEAM_1_ID = 6 TEAM_2_ID = 7 PLAYER_CLS_ID = 2 class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float team_id: Optional[int] = None track_id: Optional[int] = None class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: List[Tuple[int, int]] # [(x, y), ...] integer coordinates def _smooth_boxes( results: List[TVFrameResult], window: int = BOX_SMOOTH_WINDOW, tids_by_frame: Optional[Dict[int, List[Optional[int]]]] = None, ) -> List[TVFrameResult]: """Temporal box smoothing by track ID.""" if window <= 1 or not results: return results fid_to_idx = {r.frame_id: i for i, r in enumerate(results)} trajectories: Dict[int, List[Tuple[int, int, BoundingBox]]] = {} for i, r in enumerate(results): for j, bb in enumerate(r.boxes): tid = tids_by_frame.get(r.frame_id, [None] * len(r.boxes))[j] if tids_by_frame else bb.track_id if tid is not None and tid >= 0: tid = int(tid) if tid not in trajectories: trajectories[tid] = [] trajectories[tid].append((r.frame_id, j, bb)) smoothed: Dict[Tuple[int, int], Tuple[int, int, int, int]] = {} half = window // 2 for tid, items in trajectories.items(): items.sort(key=lambda x: x[0]) n = len(items) for k in range(n): fid, box_idx, bb = items[k] result_idx = fid_to_idx[fid] lo = max(0, k - half) hi = min(n, k + half + 1) cx_list = [0.5 * (items[m][2].x1 + items[m][2].x2) for m in range(lo, hi)] cy_list = [0.5 * (items[m][2].y1 + items[m][2].y2) for m in range(lo, hi)] w_list = [items[m][2].x2 - items[m][2].x1 for m in range(lo, hi)] h_list = [items[m][2].y2 - items[m][2].y1 for m in range(lo, hi)] cx_avg = sum(cx_list) / len(cx_list) cy_avg = sum(cy_list) / len(cy_list) w_avg = sum(w_list) / len(w_list) h_avg = sum(h_list) / len(h_list) x1_new = int(round(cx_avg - w_avg / 2)) y1_new = int(round(cy_avg - h_avg / 2)) x2_new = int(round(cx_avg + w_avg / 2)) y2_new = int(round(cy_avg + h_avg / 2)) smoothed[(result_idx, box_idx)] = (x1_new, y1_new, x2_new, y2_new) out: List[TVFrameResult] = [] for i, r in enumerate(results): new_boxes: List[BoundingBox] = [] for j, bb in enumerate(r.boxes): key = (i, j) if key in smoothed: x1, y1, x2, y2 = smoothed[key] new_boxes.append(BoundingBox(x1=x1, y1=y1, x2=x2, y2=y2, cls_id=int(bb.cls_id), conf=round(float(bb.conf), 2), team_id=bb.team_id, track_id=bb.track_id)) else: new_boxes.append(BoundingBox(x1=int(bb.x1), y1=int(bb.y1), x2=int(bb.x2), y2=int(bb.y2), cls_id=int(bb.cls_id), conf=round(float(bb.conf), 2), team_id=bb.team_id, track_id=bb.track_id)) out.append(TVFrameResult(frame_id=r.frame_id, boxes=new_boxes, keypoints=r.keypoints)) return out # ── Miner ───────────────────────────────────────────────────────────────────── class Miner: def __init__(self, path_hf_repo: Path) -> None: self.path_hf_repo = Path(path_hf_repo) self.is_start = False self._executor = ThreadPoolExecutor(max_workers=2) global _OSNET_MODEL, osnet_weight_path device = "cuda" if torch.cuda.is_available() else "cpu" self.device = device # Person model: prefer ONNX (new-2 style), fallback to .pt models_dir = self.path_hf_repo person_onnx = models_dir / "player_detect.onnx" self._person_model_onnx = person_onnx.exists() if person_onnx.exists(): self.bbox_model = YOLO(str(person_onnx), task="detect") print("✅ Person Model Loaded (ONNX)") else: self.bbox_model = None print("⚠️ Person model not found (tried player_detect.onnx)") # OSNet team classifier osnet_weight_path = self.path_hf_repo / "osnet_model.pth.tar-100" if osnet_weight_path.exists(): _OSNET_MODEL = load_osnet(device, osnet_weight_path) print("✅ Team Classifier Loaded (OSNet)") else: _OSNET_MODEL = None print(f"⚠️ OSNet weights not found at {osnet_weight_path}. Using HSV fallback.") # Keypoints model: HRNet kp_config_file = "hrnetv2_w48.yaml" kp_weights_file = "keypoint_detect.pt" config_path = Path(kp_config_file) if Path(kp_config_file).exists() else self.path_hf_repo / kp_config_file weights_path = Path(kp_weights_file) if Path(kp_weights_file).exists() else self.path_hf_repo / kp_weights_file cfg = yaml.safe_load(open(config_path, 'r')) hrnet = get_cls_net(cfg) state = torch.load(weights_path, map_location=device, weights_only=False) hrnet.load_state_dict(state) hrnet.to(device).eval() self.keypoints_model = hrnet print("✅ HRNet Keypoints Model Loaded") # Person detection state (new-2 style) self._person_tracker_state: Dict[int, Tuple[Tuple[float, float, float, float], Tuple[float, float, float, float], int]] = {} self._person_tracker_next_id = 0 self._track_id_to_team_votes: Dict[int, Dict[str, int]] = {} self._track_id_to_class_votes: Dict[int, Dict[int, int]] = {} self._prev_batch_tail_tid_counts: Dict[int, int] = {} def reset_for_new_video(self) -> None: self._person_tracker_state.clear() self._person_tracker_next_id = 0 self._track_id_to_team_votes.clear() self._track_id_to_class_votes.clear() self._prev_batch_tail_tid_counts.clear() def __repr__(self) -> str: return ( f"BBox Model: {type(self.bbox_model).__name__}\n" f"Keypoints Model: {type(self.keypoints_model).__name__}\n" f"Team Clustering: OSNet + KMeans" ) def _bbox_task(self, images: list[ndarray], offset: int = 0) -> list[list[BoundingBox]]: start_time = time.time() """Person detection pipeline (new-2 style): tracking, class votes, OSNet teams, adjust.""" if not images: return [] if self.bbox_model is None: return [[] for _ in images] try: kw = {"imgsz": PERSON_MODEL_IMG_SIZE, "conf": PERSON_CONF, "verbose": False} if PERSON_HALF and not self._person_model_onnx: try: if next(self.bbox_model.model.parameters()).is_cuda: kw["half"] = True except Exception: pass batch_res = self.bbox_model(images, **kw) except Exception: return [[] for _ in images] if not isinstance(batch_res, list): batch_res = [batch_res] if batch_res is not None else [] self._person_tracker_state, self._person_tracker_next_id, person_track_ids = _assign_person_track_ids( self._person_tracker_state, self._person_tracker_next_id, batch_res, TRACK_IOU_THRESH ) person_res = batch_res print(f"Person detection took {time.time() - start_time:.2f} seconds") start_time = time.time() # Parse boxes: ONNX 0=player, 1=referee, 2=goalkeeper; .pt 0=ball(skip), 1=GK, 2=player, 3=referee bboxes_by_frame: Dict[int, List[BoundingBox]] = {} track_ids_by_frame: Dict[int, List[Optional[int]]] = {} for i, det_p in enumerate(person_res): frame_id = offset + i boxes_raw: List[BoundingBox] = [] track_ids_raw: List[Optional[int]] = [] if det_p is not None and getattr(det_p, "boxes", None) is not None and len(det_p.boxes) > 0: b = det_p.boxes xyxy = b.xyxy.cpu().numpy() confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32) clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32) tids = person_track_ids[i] if i < len(person_track_ids) and len(person_track_ids[i]) == len(clss) else [-1] * len(clss) for (x1, y1, x2, y2), c, cf, tid in zip(xyxy, clss, confs, tids): c, tid = int(c), int(tid) x1r, y1r, x2r, y2r = int(round(x1)), int(round(y1)), int(round(x2)), int(round(y2)) tid_out = tid if tid >= 0 else None if self._person_model_onnx: if c == 0: boxes_raw.append(BoundingBox(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C_PLAYER, conf=float(cf), team_id=None, track_id=tid_out)) track_ids_raw.append(tid_out) elif c == 1: boxes_raw.append(BoundingBox(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C_REFEREE, conf=float(cf), team_id=None, track_id=tid_out)) track_ids_raw.append(tid_out) elif c == 2: boxes_raw.append(BoundingBox(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C_GOALKEEPER, conf=float(cf), team_id=None, track_id=tid_out)) track_ids_raw.append(tid_out) else: if c == 0: continue internal_cls = {1: _C_GOALKEEPER, 2: _C_PLAYER, 3: _C_REFEREE}.get(c, _C_PLAYER) boxes_raw.append(BoundingBox(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=internal_cls, conf=float(cf), team_id=None, track_id=tid_out)) track_ids_raw.append(tid_out) bboxes_by_frame[frame_id] = boxes_raw track_ids_by_frame[frame_id] = track_ids_raw # Noise filter: remove short tracks in tail if len(images) > NOISE_TAIL_FRAMES: tid_counts: Dict[int, int] = {} tid_first_frame: Dict[int, int] = {} for fid in range(offset, offset + len(images)): for tid in track_ids_by_frame.get(fid, []): if tid is not None and tid >= 0: t = int(tid) tid_counts[t] = tid_counts.get(t, 0) + 1 if t not in tid_first_frame or fid < tid_first_frame[t]: tid_first_frame[t] = fid for t, prev_count in self._prev_batch_tail_tid_counts.items(): tid_counts[t] = tid_counts.get(t, 0) + prev_count if prev_count > 0: tid_first_frame[t] = offset + len(images) boundary = offset + len(images) - NOISE_TAIL_FRAMES noise_tids = {t for t, count in tid_counts.items() if count < NOISE_MIN_APPEARANCES and tid_first_frame.get(t, 0) < boundary} for fid in range(offset, offset + len(images)): boxes = bboxes_by_frame.get(fid, []) tids = track_ids_by_frame.get(fid, [None] * len(boxes)) keep = [j for j in range(len(boxes)) if tids[j] is None or int(tids[j]) not in noise_tids] bboxes_by_frame[fid] = [boxes[j] for j in keep] track_ids_by_frame[fid] = [tids[j] for j in keep] tail_start = offset + len(images) - NOISE_TAIL_FRAMES self._prev_batch_tail_tid_counts = {} for fid in range(tail_start, offset + len(images)): for tid in track_ids_by_frame.get(fid, []): if tid is not None and tid >= 0: t = int(tid) self._prev_batch_tail_tid_counts[t] = self._prev_batch_tail_tid_counts.get(t, 0) + 1 # Class votes: collect votes per track (skip redundant IoU stabilization) for i in range(len(images)): frame_id = offset + i boxes_raw = bboxes_by_frame[frame_id] track_ids_raw = track_ids_by_frame[frame_id] for idx, bb in enumerate(boxes_raw): tid = track_ids_raw[idx] if idx < len(track_ids_raw) else bb.track_id if tid is not None and int(tid) >= 0: if tid not in self._track_id_to_class_votes: self._track_id_to_class_votes[tid] = {} self._track_id_to_class_votes[tid][int(bb.cls_id)] = self._track_id_to_class_votes[tid].get(int(bb.cls_id), 0) + 1 # Class votes: majority over track for fid in range(offset, offset + len(images)): new_boxes: List[BoundingBox] = [] tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) for box_idx, box in enumerate(bboxes_by_frame[fid]): tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None if tid is not None and tid >= 0 and tid in self._track_id_to_class_votes: votes = self._track_id_to_class_votes[tid] ref_votes = votes.get(_C_REFEREE, 0) gk_votes = votes.get(_C_GOALKEEPER, 0) if ref_votes > CLASS_VOTE_MAJORITY: majority_cls = _C_REFEREE elif gk_votes > CLASS_VOTE_MAJORITY: majority_cls = _C_GOALKEEPER else: majority_cls = max(votes.items(), key=lambda x: x[1])[0] new_boxes.append(BoundingBox(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=majority_cls, conf=box.conf, team_id=None, track_id=tid)) else: new_boxes.append(box) bboxes_by_frame[fid] = new_boxes # Interpolate track gaps if INTERP_TRACK_GAPS and len(images) > 1: track_to_frames: Dict[int, List[Tuple[int, BoundingBox]]] = {} for fid in range(offset, offset + len(images)): for bb, tid in zip(bboxes_by_frame[fid], track_ids_by_frame.get(fid, [])): if tid is not None and int(tid) >= 0: track_to_frames.setdefault(int(tid), []).append((fid, bb)) to_add: Dict[int, List[Tuple[BoundingBox, int]]] = {} for t, pairs in track_to_frames.items(): pairs.sort(key=lambda p: p[0]) for i in range(len(pairs) - 1): f1, b1 = pairs[i] f2, b2 = pairs[i + 1] if f2 - f1 <= 1: continue for g in range(f1 + 1, f2): w = (g - f1) / (f2 - f1) interp = BoundingBox( x1=int(round((1 - w) * b1.x1 + w * b2.x1)), y1=int(round((1 - w) * b1.y1 + w * b2.y1)), x2=int(round((1 - w) * b1.x2 + w * b2.x2)), y2=int(round((1 - w) * b1.y2 + w * b2.y2)), cls_id=b2.cls_id, conf=b2.conf, team_id=b2.team_id, track_id=t ) to_add.setdefault(g, []).append((interp, t)) for g, add_list in to_add.items(): bboxes_by_frame[g] = list(bboxes_by_frame.get(g, [])) track_ids_by_frame[g] = list(track_ids_by_frame.get(g, [])) for interp_box, tid in add_list: bboxes_by_frame[g].append(interp_box) track_ids_by_frame[g].append(tid) # OSNet team classification try: batch_boxes_for_osnet = {offset + i: bboxes_by_frame.get(offset + i, []) for i in range(len(images))} _classify_teams_batch(images, batch_boxes_for_osnet, self.device) for fid in batch_boxes_for_osnet: bboxes_by_frame[fid] = batch_boxes_for_osnet[fid] except Exception: pass # Team votes reid_team_per_frame: List[List[Optional[str]]] = [] for fi in range(len(images)): frame_id = offset + fi boxes_f = bboxes_by_frame.get(frame_id, []) tids_f = track_ids_by_frame.get(frame_id, []) row: List[Optional[str]] = [] for bi, box in enumerate(boxes_f): tid = tids_f[bi] if bi < len(tids_f) else box.track_id team_str = str(box.team_id) if box.team_id is not None else None if tid is not None and tid >= 0 and team_str: if tid not in self._track_id_to_team_votes: self._track_id_to_team_votes[tid] = {} self._track_id_to_team_votes[tid][team_str] = self._track_id_to_team_votes[tid].get(team_str, 0) + 1 row.append(team_str) reid_team_per_frame.append(row) for fid in range(offset, offset + len(images)): fi = fid - offset new_boxes = [] tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) for box_idx, box in enumerate(bboxes_by_frame[fid]): tid = tids_fid[box_idx] if box_idx < len(tids_fid) else box.track_id team_from_reid = reid_team_per_frame[fi][box_idx] if fi < len(reid_team_per_frame) and box_idx < len(reid_team_per_frame[fi]) else None default_team = team_from_reid or (str(box.team_id) if box.team_id is not None else None) if tid is not None and tid >= 0 and tid in self._track_id_to_team_votes and self._track_id_to_team_votes[tid]: majority_team = max(self._track_id_to_team_votes[tid].items(), key=lambda x: x[1])[0] else: majority_team = default_team team_id_out = int(majority_team) if majority_team and majority_team.isdigit() else (int(majority_team) if majority_team else None) new_boxes.append(BoundingBox(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=box.cls_id, conf=box.conf, team_id=team_id_out, track_id=tid)) bboxes_by_frame[fid] = new_boxes # Adjust boxes: overlap NMS, GK dedup, referee disambiguation H, W = images[0].shape[:2] if images else (0, 0) for fid in range(offset, offset + len(images)): orig = bboxes_by_frame[fid] tids = track_ids_by_frame.get(fid, [None] * len(orig)) adjusted = _adjust_boxes(orig, W, H, do_goalkeeper_dedup=True, do_referee_disambiguation=True) adjusted_tids: List[Optional[int]] = [] used = set() for ab in adjusted: for oi, ob in enumerate(orig): if oi in used: continue if ob.x1 == ab.x1 and ob.y1 == ab.y1 and ob.x2 == ab.x2 and ob.y2 == ab.y2: adjusted_tids.append(tids[oi] if oi < len(tids) else None) used.add(oi) break bboxes_by_frame[fid] = adjusted print(f"Post-processing took {time.time() - start_time:.2f} seconds") # Output: validator cls_id (0=player, 1=referee, 2=goalkeeper) out: List[List[BoundingBox]] = [] for i in range(len(images)): boxes = bboxes_by_frame.get(offset + i, []) for bb in boxes: bb.cls_id = _CLS_TO_VALIDATOR.get(int(bb.cls_id), int(bb.cls_id)) out.append(boxes) return out def _keypoint_task(self, images: list[ndarray], n_keypoints: int) -> list[list]: start_time = time.time() """HRNet keypoints + homography refinement.""" if not images: return [] if self.keypoints_model is None: return [[(0, 0)] * n_keypoints for _ in images] try: raw_kps = _run_hrnet_batch(images, self.keypoints_model, KP_THRESHOLD, batch_size=HRNET_BATCH_SIZE) except Exception as e: print(f"Error in _keypoint_task: {e}") return [[(0, 0)] * n_keypoints for _ in images] raw_kps = [_apply_keypoint_mapping(kp) for kp in raw_kps] if raw_kps else [] keypoints = _normalize_keypoints(raw_kps, images, n_keypoints) if raw_kps else [[(0, 0)] * n_keypoints for _ in images] keypoints = [_fix_keypoints(kps, n_keypoints) for kps in keypoints] keypoints = [_keypoints_to_float(kps) for kps in keypoints] print(f"Keypoint task completed in {time.time() - start_time:.2f} seconds") # if n_keypoints == 32 and len(TEMPLATE_F0) == 32 and len(TEMPLATE_F1) == 32: # for idx in range(len(images)): # try: # keypoints[idx] = _apply_homography_refinement(keypoints[idx], images[idx], n_keypoints) # except Exception: # pass # keypoints = [_keypoints_to_int(kps) for kps in keypoints] return keypoints def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: if not self.is_start: self.is_start = True images = list(batch_images) if offset == 0: self.reset_for_new_video() gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() # Run bbox (batched YOLO) and keypoints in parallel future_bbox = self._executor.submit(self._bbox_task, images, offset) future_kp = self._executor.submit(self._keypoint_task, images, n_keypoints) bbox_per_frame = future_bbox.result() keypoints = future_kp.result() return [ TVFrameResult(frame_id=offset + i, boxes=bbox_per_frame[i], keypoints=keypoints[i]) for i in range(len(images)) ]