| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor |
| from ultralytics import YOLO |
| from numpy import ndarray |
| from pydantic import BaseModel |
| from typing import List, Tuple, Optional, Dict, Any |
| import numpy as np |
| import cv2 |
| from sklearn.cluster import KMeans |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import yaml |
| import gc |
| import os |
| import sys |
| from collections import OrderedDict, defaultdict |
| from PIL import Image |
| import torchvision.transforms as T |
| import time |
|
|
| try: |
| from scipy.optimize import linear_sum_assignment as _linear_sum_assignment |
| except ImportError: |
| _linear_sum_assignment = None |
|
|
| |
|
|
| def get_grass_color(img: np.ndarray) -> Tuple[int, int, int]: |
| if img is None or img.size == 0: |
| return (0, 0, 0) |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) |
| lower_green = np.array([30, 40, 40]) |
| upper_green = np.array([80, 255, 255]) |
| mask = cv2.inRange(hsv, lower_green, upper_green) |
| grass_color = cv2.mean(img, mask=mask) |
| return grass_color[:3] |
|
|
| def get_players_boxes(result): |
| players_imgs, players_boxes = [], [] |
| for box in result.boxes: |
| label = int(box.cls.cpu().numpy()[0]) |
| if label == 2: |
| x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) |
| crop = result.orig_img[y1:y2, x1:x2] |
| if crop.size > 0: |
| players_imgs.append(crop) |
| players_boxes.append((x1, y1, x2, y2)) |
| return players_imgs, players_boxes |
|
|
| def get_kits_colors(players, grass_hsv=None, frame=None): |
| kits_colors = [] |
| if grass_hsv is None: |
| grass_color = get_grass_color(frame) |
| grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) |
| for player_img in players: |
| hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV) |
| lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40]) |
| upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255]) |
| mask = cv2.inRange(hsv, lower_green, upper_green) |
| mask = cv2.bitwise_not(mask) |
| upper_mask = np.zeros(player_img.shape[:2], np.uint8) |
| upper_mask[0:player_img.shape[0] // 2, :] = 255 |
| mask = cv2.bitwise_and(mask, upper_mask) |
| kit_color = np.array(cv2.mean(player_img, mask=mask)[:3]) |
| kits_colors.append(kit_color) |
| return kits_colors |
|
|
|
|
| |
|
|
| |
| |
| _C_GOALKEEPER = 1 |
| _C_PLAYER = 2 |
| _C_REFEREE = 3 |
| _CLS_TO_VALIDATOR: Dict[int, int] = {_C_PLAYER: 0, _C_REFEREE: 1, _C_GOALKEEPER: 2} |
|
|
| |
| PERSON_MODEL_IMG_SIZE = 640 |
| PERSON_CONF = 0.4 |
| PERSON_HALF = True |
| TRACK_IOU_THRESH = 0.3 |
| TRACK_IOU_HIGH = 0.4 |
| TRACK_IOU_LOW = 0.2 |
| TRACK_MAX_AGE = 3 |
| TRACK_USE_VELOCITY = True |
| NOISE_MIN_APPEARANCES = 5 |
| NOISE_TAIL_FRAMES = 4 |
| CLASS_VOTE_MAJORITY = 3 |
| INTERP_TRACK_GAPS = True |
| ENABLE_BOX_SMOOTHING = False |
| BOX_SMOOTH_WINDOW = 8 |
| OVERLAP_IOU = 0.91 |
|
|
|
|
| def _iou_box4(a: Tuple[float, float, float, float], b: Tuple[float, float, float, float]) -> float: |
| ax1, ay1, ax2, ay2 = a |
| bx1, by1, bx2, by2 = b |
| ix1, iy1 = max(ax1, bx1), max(ay1, by1) |
| ix2, iy2 = min(ax2, bx2), min(ay2, by2) |
| iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1) |
| inter = iw * ih |
| if inter <= 0: |
| return 0.0 |
| area_a = (ax2 - ax1) * (ay2 - ay1) |
| area_b = (bx2 - bx1) * (by2 - by1) |
| union = area_a + area_b - inter |
| return inter / union if union > 0 else 0.0 |
|
|
|
|
| def _match_tracks_detections( |
| prev_list: List[Tuple[int, Tuple[float, float, float, float]]], |
| curr_boxes: List[Tuple[float, float, float, float]], |
| iou_thresh: float, |
| exclude_prev: set, |
| exclude_curr: set, |
| ) -> List[Tuple[int, int]]: |
| prev_filtered = [(pi, tid, pbox) for pi, (tid, pbox) in enumerate(prev_list) if pi not in exclude_prev] |
| curr_filtered = [(ci, cbox) for ci, cbox in enumerate(curr_boxes) if ci not in exclude_curr] |
| if not prev_filtered or not curr_filtered: |
| return [] |
| n_prev, n_curr = len(prev_filtered), len(curr_filtered) |
| iou_mat = np.zeros((n_prev, n_curr), dtype=np.float64) |
| for i, (_, _, pbox) in enumerate(prev_filtered): |
| for j, (_, cbox) in enumerate(curr_filtered): |
| iou_mat[i, j] = _iou_box4(pbox, cbox) |
| cost = 1.0 - iou_mat |
| cost[iou_mat < iou_thresh] = 1e9 |
| if _linear_sum_assignment is not None: |
| row_ind, col_ind = _linear_sum_assignment(cost) |
| matches = [ |
| (prev_filtered[row_ind[k]][0], curr_filtered[col_ind[k]][0]) |
| for k in range(len(row_ind)) |
| if cost[row_ind[k], col_ind[k]] < 1.0 |
| ] |
| else: |
| matches = [] |
| iou_pairs = [ |
| (iou_mat[i, j], i, j) |
| for i in range(n_prev) |
| for j in range(n_curr) |
| if iou_mat[i, j] >= iou_thresh |
| ] |
| iou_pairs.sort(key=lambda x: -x[0]) |
| used_prev, used_curr = set(), set() |
| for _, i, j in iou_pairs: |
| pi = prev_filtered[i][0] |
| ci = curr_filtered[j][0] |
| if pi in used_prev or ci in used_curr: |
| continue |
| matches.append((pi, ci)) |
| used_prev.add(pi) |
| used_curr.add(ci) |
| return matches |
|
|
|
|
| def _predict_box(prev: Tuple[float, float, float, float], last: Tuple[float, float, float, float]) -> Tuple[float, float, float, float]: |
| px1, py1, px2, py2 = prev |
| lx1, ly1, lx2, ly2 = last |
| pcx = 0.5 * (px1 + px2) |
| pcy = 0.5 * (py1 + py2) |
| lcx = 0.5 * (lx1 + lx2) |
| lcy = 0.5 * (ly1 + ly2) |
| w = lx2 - lx1 |
| h = ly2 - ly1 |
| ncx = 2.0 * lcx - pcx |
| ncy = 2.0 * lcy - pcy |
| return (ncx - w * 0.5, ncy - h * 0.5, ncx + w * 0.5, ncy + h * 0.5) |
|
|
|
|
| def _assign_person_track_ids( |
| prev_state: Dict[int, Tuple[Tuple[float, float, float, float], Tuple[float, float, float, float], int]], |
| next_id: int, |
| results: list, |
| iou_thresh: float = TRACK_IOU_THRESH, |
| iou_high: float = TRACK_IOU_HIGH, |
| iou_low: float = TRACK_IOU_LOW, |
| max_age: int = TRACK_MAX_AGE, |
| use_velocity: bool = TRACK_USE_VELOCITY, |
| ) -> Tuple[Dict[int, Tuple[Tuple[float, float, float, float], Tuple[float, float, float, float], int]], int, List[List[int]]]: |
| state = {tid: (prev_box, last_box, age) for tid, (prev_box, last_box, age) in prev_state.items()} |
| nid = next_id |
| ids_per_result: List[List[int]] = [] |
| for result in results: |
| if getattr(result, "boxes", None) is None or len(result.boxes) == 0: |
| state = { |
| tid: (prev_box, last_box, age + 1) |
| for tid, (prev_box, last_box, age) in state.items() |
| if age + 1 <= max_age |
| } |
| ids_per_result.append([]) |
| continue |
| b = result.boxes |
| xyxy = b.xyxy.cpu().numpy() |
| curr_boxes = [tuple(float(x) for x in row) for row in xyxy] |
| prev_list: List[Tuple[int, Tuple[float, float, float, float]]] = [] |
| for tid, (prev_box, last_box, _age) in state.items(): |
| if use_velocity and (prev_box != last_box): |
| pbox = _predict_box(prev_box, last_box) |
| else: |
| pbox = last_box |
| prev_list.append((tid, pbox)) |
| stage1 = _match_tracks_detections(prev_list, curr_boxes, iou_high, set(), set()) |
| assigned_prev = {pi for pi, _ in stage1} |
| assigned_curr = {ci for _, ci in stage1} |
| stage2 = _match_tracks_detections(prev_list, curr_boxes, iou_low, assigned_prev, assigned_curr) |
| for pi, ci in stage2: |
| assigned_prev.add(pi) |
| assigned_curr.add(ci) |
| tid_per_curr: Dict[int, int] = {} |
| for pi, ci in stage1 + stage2: |
| tid_per_curr[ci] = prev_list[pi][0] |
| ids: List[int] = [] |
| new_state: Dict[int, Tuple[Tuple[float, float, float, float], Tuple[float, float, float, float], int]] = {} |
| for ci, cbox in enumerate(curr_boxes): |
| if ci in tid_per_curr: |
| tid = tid_per_curr[ci] |
| _prev, last_box, _ = state[tid] |
| new_state[tid] = (last_box, cbox, 0) |
| else: |
| tid = nid |
| nid += 1 |
| new_state[tid] = (cbox, cbox, 0) |
| ids.append(tid) |
| for pi in range(len(prev_list)): |
| if pi in assigned_prev: |
| continue |
| tid = prev_list[pi][0] |
| prev_box, last_box, age = state[tid] |
| if age + 1 <= max_age: |
| new_state[tid] = (prev_box, last_box, age + 1) |
| state = new_state |
| ids_per_result.append(ids) |
| return (state, nid, ids_per_result) |
|
|
|
|
| def _iou_bbox(a: "BoundingBox", b: "BoundingBox") -> float: |
| ax1, ay1, ax2, ay2 = int(a.x1), int(a.y1), int(a.x2), int(a.y2) |
| bx1, by1, bx2, by2 = int(b.x1), int(b.y1), int(b.x2), int(b.y2) |
| ix1, iy1 = max(ax1, bx1), max(ay1, by1) |
| ix2, iy2 = min(ax2, bx2), min(ay2, by2) |
| iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) |
| inter = iw * ih |
| if inter <= 0: |
| return 0.0 |
| area_a = (ax2 - ax1) * (ay2 - ay1) |
| area_b = (bx2 - bx1) * (by2 - by1) |
| union = area_a + area_b - inter |
| return inter / union if union > 0 else 0.0 |
|
|
|
|
| def _adjust_boxes( |
| bboxes: List["BoundingBox"], |
| frame_width: int, |
| frame_height: int, |
| overlap_iou: float = OVERLAP_IOU, |
| do_goalkeeper_dedup: bool = True, |
| do_referee_disambiguation: bool = True, |
| ) -> List["BoundingBox"]: |
| """Overlap NMS, goalkeeper dedup, referee disambiguation (no ball).""" |
| kept: List[BoundingBox] = list(bboxes or []) |
| W, H = int(frame_width), int(frame_height) |
| cy = 0.5 * float(H) |
| if overlap_iou > 0 and len(kept) > 1: |
| non_balls = [bb for bb in kept if int(bb.cls_id) != 0] |
| if len(non_balls) > 1: |
| non_balls_sorted = sorted(non_balls, key=lambda bb: float(bb.conf), reverse=True) |
| kept_nb = [] |
| for cand in non_balls_sorted: |
| skip = False |
| for k in kept_nb: |
| iou = _iou_bbox(cand, k) |
| if iou >= overlap_iou: |
| skip = True |
| break |
| if ( |
| abs(int(cand.x1) - int(k.x1)) <= 3 |
| and abs(int(cand.y1) - int(k.y1)) <= 3 |
| and abs(int(cand.x2) - int(k.x2)) <= 3 |
| and abs(int(cand.y2) - int(k.y2)) <= 3 |
| and iou > 0.85 |
| ): |
| skip = True |
| break |
| if not skip: |
| kept_nb.append(cand) |
| kept = kept_nb |
| if do_goalkeeper_dedup: |
| gks = [bb for bb in kept if int(bb.cls_id) == _C_GOALKEEPER] |
| if len(gks) > 1: |
| best_gk = max(gks, key=lambda bb: float(bb.conf)) |
| best_gk_conf = float(best_gk.conf) |
| deduped = [] |
| for bb in kept: |
| if int(bb.cls_id) == _C_GOALKEEPER: |
| if float(bb.conf) < best_gk_conf or (float(bb.conf) == best_gk_conf and bb is not best_gk): |
| deduped.append(BoundingBox(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=_C_PLAYER, conf=float(bb.conf), team_id=bb.team_id, track_id=bb.track_id)) |
| else: |
| deduped.append(bb) |
| else: |
| deduped.append(bb) |
| kept = deduped |
| if do_referee_disambiguation: |
| refs = [bb for bb in kept if int(bb.cls_id) == _C_REFEREE] |
| if len(refs) > 1: |
| best_ref = min(refs, key=lambda bb: (0.5 * (bb.y1 + bb.y2) - cy) ** 2) |
| kept = [bb for bb in kept if int(bb.cls_id) != _C_REFEREE or bb is best_ref] |
| return kept |
|
|
|
|
| |
|
|
| TEAM_1_ID = 6 |
| TEAM_2_ID = 7 |
| PLAYER_CLS_ID = 2 |
| _OSNET_MODEL = None |
| osnet_weight_path = None |
|
|
| OSNET_IMAGE_SIZE = (64, 32) |
| OSNET_PREPROCESS = T.Compose([ |
| T.Resize(OSNET_IMAGE_SIZE), |
| T.ToTensor(), |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
| ]) |
|
|
|
|
| def _crop_upper_body(frame: ndarray, box: "BoundingBox") -> ndarray: |
| return frame[ |
| max(0, box.y1):max(0, box.y2), |
| max(0, box.x1):max(0, box.x2) |
| ] |
|
|
|
|
| def _preprocess_osnet(crop: ndarray) -> torch.Tensor: |
| rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) |
| pil = Image.fromarray(rgb) |
| return OSNET_PREPROCESS(pil) |
|
|
|
|
| def _filter_player_boxes(boxes: List["BoundingBox"]) -> List["BoundingBox"]: |
| return [b for b in boxes if b.cls_id == PLAYER_CLS_ID] |
|
|
|
|
| def _extract_osnet_embeddings( |
| frames: List[ndarray], |
| batch_boxes: Dict[int, List["BoundingBox"]], |
| device: str = "cuda", |
| ) -> Tuple[Optional[ndarray], Optional[List["BoundingBox"]]]: |
| global _OSNET_MODEL |
| crops = [] |
| meta = [] |
| sorted_frame_ids = sorted(batch_boxes.keys()) |
| for idx, frame_idx in enumerate(sorted_frame_ids): |
| frame = frames[idx] if idx < len(frames) else None |
| if frame is None: |
| continue |
| boxes = batch_boxes[frame_idx] |
| players = _filter_player_boxes(boxes) |
| for box in players: |
| crop = _crop_upper_body(frame, box) |
| if crop.size == 0: |
| continue |
| crops.append(_preprocess_osnet(crop)) |
| meta.append(box) |
| if not crops: |
| return None, None |
| batch = torch.stack(crops).to(device, non_blocking=True).float() |
| use_amp = device == "cuda" |
| with torch.inference_mode(): |
| with torch.amp.autocast("cuda", enabled=use_amp): |
| embeddings = _OSNET_MODEL(batch) |
| del batch |
| embeddings = embeddings.cpu().numpy() |
| return embeddings, meta |
|
|
|
|
| def _aggregate_by_track( |
| embeddings: ndarray, |
| meta: List["BoundingBox"], |
| ) -> Tuple[ndarray, List["BoundingBox"]]: |
| track_map = defaultdict(list) |
| box_map = {} |
| for emb, box in zip(embeddings, meta): |
| key = box.track_id if box.track_id is not None else id(box) |
| track_map[key].append(emb) |
| box_map[key] = box |
| agg_embeddings = [] |
| agg_boxes = [] |
| for key, embs in track_map.items(): |
| mean_emb = np.mean(embs, axis=0) |
| norm = np.linalg.norm(mean_emb) |
| if norm > 1e-12: |
| mean_emb /= norm |
| agg_embeddings.append(mean_emb) |
| agg_boxes.append(box_map[key]) |
| return np.array(agg_embeddings), agg_boxes |
|
|
|
|
| def _update_team_ids(boxes: List["BoundingBox"], labels: ndarray) -> None: |
| for box, label in zip(boxes, labels): |
| |
| box.team_id = 1 if label == 0 else 2 |
|
|
|
|
| def _classify_teams_batch( |
| frames: List[ndarray], |
| batch_boxes: Dict[int, List["BoundingBox"]], |
| device: str = "cuda", |
| ) -> None: |
| embeddings, meta = _extract_osnet_embeddings(frames, batch_boxes, device) |
| if embeddings is None: |
| return |
| embeddings, agg_boxes = _aggregate_by_track(embeddings, meta) |
| n = len(embeddings) |
| if n == 0: |
| return |
| if n == 1: |
| agg_boxes[0].cls_id = TEAM_1_ID |
| return |
| kmeans = KMeans(n_clusters=2, n_init=2, random_state=42) |
| kmeans.fit(embeddings) |
| centroids = kmeans.cluster_centers_ |
| c0, c1 = centroids[0], centroids[1] |
| norm_0 = np.linalg.norm(c0) |
| norm_1 = np.linalg.norm(c1) |
| similarity = np.dot(c0, c1) / (norm_0 * norm_1 + 1e-12) |
| if similarity > 0.95: |
| for b in agg_boxes: |
| b.cls_id = TEAM_1_ID |
| return |
| if norm_0 <= norm_1: |
| kmeans.labels_ = 1 - kmeans.labels_ |
| _update_team_ids(agg_boxes, kmeans.labels_) |
|
|
|
|
| class ConvLayer(nn.Module): |
| def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, groups=1, IN=False): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, bias=False, groups=groups) |
| self.bn = nn.InstanceNorm2d(out_channels, affine=True) if IN else nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| return self.relu(self.bn(self.conv(x))) |
|
|
|
|
| class Conv1x1(nn.Module): |
| def __init__(self, in_channels, out_channels, stride=1, groups=1): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, 1, stride=stride, padding=0, bias=False, groups=groups) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| return self.relu(self.bn(self.conv(x))) |
|
|
|
|
| class Conv1x1Linear(nn.Module): |
| def __init__(self, in_channels, out_channels, stride=1, bn=True): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, 1, stride=stride, padding=0, bias=False) |
| self.bn = nn.BatchNorm2d(out_channels) if bn else None |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| return self.bn(x) if self.bn is not None else x |
|
|
|
|
| class Conv3x3(nn.Module): |
| def __init__(self, in_channels, out_channels, stride=1, groups=1): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False, groups=groups) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| return self.relu(self.bn(self.conv(x))) |
|
|
|
|
| class LightConv3x3(nn.Module): |
| def __init__(self, in_channels, out_channels): |
| super().__init__() |
| self.conv1 = nn.Conv2d(in_channels, out_channels, 1, stride=1, padding=0, bias=False) |
| self.conv2 = nn.Conv2d(out_channels, out_channels, 3, stride=1, padding=1, bias=False, groups=out_channels) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| x = self.conv1(x) |
| x = self.conv2(x) |
| return self.relu(self.bn(x)) |
|
|
|
|
| class LightConvStream(nn.Module): |
| def __init__(self, in_channels, out_channels, depth): |
| super().__init__() |
| layers = [LightConv3x3(in_channels, out_channels)] |
| for _ in range(depth - 1): |
| layers.append(LightConv3x3(out_channels, out_channels)) |
| self.layers = nn.Sequential(*layers) |
|
|
| def forward(self, x): |
| return self.layers(x) |
|
|
|
|
| class ChannelGate(nn.Module): |
| def __init__(self, in_channels, num_gates=None, return_gates=False, gate_activation='sigmoid', reduction=16, layer_norm=False): |
| super().__init__() |
| if num_gates is None: |
| num_gates = in_channels |
| self.return_gates = return_gates |
| self.global_avgpool = nn.AdaptiveAvgPool2d(1) |
| self.fc1 = nn.Conv2d(in_channels, in_channels // reduction, kernel_size=1, bias=True, padding=0) |
| self.norm1 = nn.LayerNorm((in_channels // reduction, 1, 1)) if layer_norm else None |
| self.relu = nn.ReLU() |
| self.fc2 = nn.Conv2d(in_channels // reduction, num_gates, kernel_size=1, bias=True, padding=0) |
| self.gate_activation = nn.Sigmoid() if gate_activation == 'sigmoid' else nn.ReLU() |
|
|
| def forward(self, x): |
| input = x |
| x = self.global_avgpool(x) |
| x = self.fc1(x) |
| if self.norm1 is not None: |
| x = self.norm1(x) |
| x = self.relu(x) |
| x = self.fc2(x) |
| if self.gate_activation is not None: |
| x = self.gate_activation(x) |
| return x if self.return_gates else input * x |
|
|
|
|
| class OSBlockX1(nn.Module): |
| def __init__(self, in_channels, out_channels, IN=False, bottleneck_reduction=4): |
| super().__init__() |
| mid_channels = out_channels // bottleneck_reduction |
| self.conv1 = Conv1x1(in_channels, mid_channels) |
| self.conv2a = LightConv3x3(mid_channels, mid_channels) |
| self.conv2b = nn.Sequential(LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels)) |
| self.conv2c = nn.Sequential(LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels)) |
| self.conv2d = nn.Sequential(LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels), LightConv3x3(mid_channels, mid_channels)) |
| self.gate = ChannelGate(mid_channels) |
| self.conv3 = Conv1x1Linear(mid_channels, out_channels) |
| self.downsample = Conv1x1Linear(in_channels, out_channels) if in_channels != out_channels else None |
| self.IN = nn.InstanceNorm2d(out_channels, affine=True) if IN else None |
|
|
| def forward(self, x): |
| identity = x |
| x1 = self.conv1(x) |
| x2 = self.gate(self.conv2a(x1)) + self.gate(self.conv2b(x1)) + self.gate(self.conv2c(x1)) + self.gate(self.conv2d(x1)) |
| x3 = self.conv3(x2) |
| if self.downsample is not None: |
| identity = self.downsample(identity) |
| out = x3 + identity |
| if self.IN is not None: |
| out = self.IN(out) |
| return F.relu(out) |
|
|
|
|
| class OSNetX1(nn.Module): |
| def __init__(self, num_classes, blocks, layers, channels, feature_dim=512, loss='softmax', IN=False): |
| super().__init__() |
| self.loss = loss |
| self.feature_dim = feature_dim |
| self.conv1 = ConvLayer(3, channels[0], 7, stride=2, padding=3, IN=IN) |
| self.maxpool = nn.MaxPool2d(3, stride=2, padding=1) |
| self.conv2 = self._make_layer(blocks[0], layers[0], channels[0], channels[1], reduce_spatial_size=True, IN=IN) |
| self.conv3 = self._make_layer(blocks[1], layers[1], channels[1], channels[2], reduce_spatial_size=True) |
| self.conv4 = self._make_layer(blocks[2], layers[2], channels[2], channels[3], reduce_spatial_size=False) |
| self.conv5 = Conv1x1(channels[3], channels[3]) |
| self.global_avgpool = nn.AdaptiveAvgPool2d(1) |
| self.fc = self._construct_fc_layer(feature_dim, channels[3], dropout_p=None) |
| self.classifier = nn.Linear(self.feature_dim, num_classes) |
| self._init_params() |
|
|
| def _make_layer(self, block, layer, in_channels, out_channels, reduce_spatial_size, IN=False): |
| layers_list = [block(in_channels, out_channels, IN=IN)] |
| for _ in range(1, layer): |
| layers_list.append(block(out_channels, out_channels, IN=IN)) |
| if reduce_spatial_size: |
| layers_list.append(nn.Sequential(Conv1x1(out_channels, out_channels), nn.AvgPool2d(2, stride=2))) |
| return nn.Sequential(*layers_list) |
|
|
| def _construct_fc_layer(self, fc_dims, input_dim, dropout_p=None): |
| if fc_dims is None or fc_dims < 0: |
| self.feature_dim = input_dim |
| return None |
| if isinstance(fc_dims, int): |
| fc_dims = [fc_dims] |
| layers_list = [] |
| for dim in fc_dims: |
| layers_list.append(nn.Linear(input_dim, dim)) |
| layers_list.append(nn.BatchNorm1d(dim)) |
| layers_list.append(nn.ReLU(inplace=True)) |
| if dropout_p is not None: |
| layers_list.append(nn.Dropout(p=dropout_p)) |
| input_dim = dim |
| self.feature_dim = fc_dims[-1] |
| return nn.Sequential(*layers_list) |
|
|
| def _init_params(self): |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') |
| if m.bias is not None: |
| nn.init.constant_(m.bias, 0) |
| elif isinstance(m, nn.BatchNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| elif isinstance(m, nn.BatchNorm1d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| elif isinstance(m, nn.InstanceNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| elif isinstance(m, nn.Linear): |
| nn.init.normal_(m.weight, 0, 0.01) |
| if m.bias is not None: |
| nn.init.constant_(m.bias, 0) |
|
|
| def forward(self, x, return_featuremaps=False): |
| x = self.conv1(x) |
| x = self.maxpool(x) |
| x = self.conv2(x) |
| x = self.conv3(x) |
| x = self.conv4(x) |
| x = self.conv5(x) |
| if return_featuremaps: |
| return x |
| v = self.global_avgpool(x) |
| v = v.view(v.size(0), -1) |
| if self.fc is not None: |
| v = self.fc(v) |
| if not self.training: |
| return v |
| y = self.classifier(v) |
| if self.loss == 'softmax': |
| return y |
| elif self.loss == 'triplet': |
| return y, v |
| raise KeyError(f"Unsupported loss: {self.loss}") |
|
|
|
|
| def osnet_x1_0(num_classes=1000, pretrained=True, loss='softmax', **kwargs): |
| return OSNetX1( |
| num_classes, |
| blocks=[OSBlockX1, OSBlockX1, OSBlockX1], |
| layers=[2, 2, 2], |
| channels=[64, 256, 384, 512], |
| loss=loss, |
| **kwargs, |
| ) |
|
|
|
|
| def load_checkpoint_osnet(fpath): |
| fpath = os.path.abspath(os.path.expanduser(fpath)) |
| map_location = None if torch.cuda.is_available() else 'cpu' |
| checkpoint = torch.load(fpath, map_location=map_location, weights_only=False) |
| return checkpoint |
|
|
|
|
| def load_pretrained_weights_osnet(model, weight_path): |
| checkpoint = load_checkpoint_osnet(weight_path) |
| state_dict = checkpoint.get('state_dict', checkpoint) |
| model_dict = model.state_dict() |
| new_state_dict = OrderedDict() |
| for k, v in state_dict.items(): |
| if k.startswith('module.'): |
| k = k[7:] |
| if k in model_dict and model_dict[k].size() == v.size(): |
| new_state_dict[k] = v |
| model_dict.update(new_state_dict) |
| model.load_state_dict(model_dict) |
|
|
|
|
| def load_osnet(device="cuda", weight_path=None): |
| model = osnet_x1_0(num_classes=1, loss='softmax', pretrained=False) |
| weight_path = Path(weight_path) if weight_path else None |
| if weight_path and weight_path.exists(): |
| load_pretrained_weights_osnet(model, str(weight_path)) |
| model.eval() |
| model.to(device) |
| return model |
|
|
|
|
| def _resolve_player_cls_id(model: YOLO, fallback: int = PLAYER_CLS_ID) -> int: |
| names = getattr(model, "names", None) |
| if not names: |
| names = getattr(getattr(model, "model", None), "names", None) |
| if isinstance(names, dict): |
| for idx, name in names.items(): |
| if str(name).lower() in ("player", "players"): |
| return int(idx) |
| if isinstance(names, list): |
| for idx, name in enumerate(names): |
| if str(name).lower() in ("player", "players"): |
| return int(idx) |
| return fallback |
|
|
|
|
| |
|
|
| BatchNorm2d = nn.BatchNorm2d |
| BN_MOMENTUM = 0.1 |
|
|
| def conv3x3(in_planes, out_planes, stride=1): |
| return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False) |
|
|
| class BasicBlock(nn.Module): |
| expansion = 1 |
| def __init__(self, inplanes, planes, stride=1, downsample=None): |
| super().__init__() |
| self.conv1 = conv3x3(inplanes, planes, stride) |
| self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.conv2 = conv3x3(planes, planes) |
| self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x): |
| residual = x |
| out = self.relu(self.bn1(self.conv1(x))) |
| out = self.bn2(self.conv2(out)) |
| if self.downsample is not None: |
| residual = self.downsample(x) |
| return self.relu(out + residual) |
|
|
| class Bottleneck(nn.Module): |
| expansion = 4 |
| def __init__(self, inplanes, planes, stride=1, downsample=None): |
| super().__init__() |
| self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) |
| self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) |
| self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False) |
| self.bn3 = BatchNorm2d(planes * self.expansion, momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x): |
| residual = x |
| out = self.relu(self.bn1(self.conv1(x))) |
| out = self.relu(self.bn2(self.conv2(out))) |
| out = self.bn3(self.conv3(out)) |
| if self.downsample is not None: |
| residual = self.downsample(x) |
| return self.relu(out + residual) |
|
|
| blocks_dict = {'BASIC': BasicBlock, 'BOTTLENECK': Bottleneck} |
|
|
| class HighResolutionModule(nn.Module): |
| def __init__(self, num_branches, blocks, num_blocks, num_inchannels, |
| num_channels, fuse_method, multi_scale_output=True): |
| super().__init__() |
| self.num_inchannels = num_inchannels |
| self.fuse_method = fuse_method |
| self.num_branches = num_branches |
| self.multi_scale_output = multi_scale_output |
| self.branches = self._make_branches(num_branches, blocks, num_blocks, num_channels) |
| self.fuse_layers = self._make_fuse_layers() |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def _make_one_branch(self, branch_index, block, num_blocks, num_channels, stride=1): |
| downsample = None |
| if stride != 1 or self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(self.num_inchannels[branch_index], num_channels[branch_index] * block.expansion, |
| kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(num_channels[branch_index] * block.expansion, momentum=BN_MOMENTUM), |
| ) |
| layers = [block(self.num_inchannels[branch_index], num_channels[branch_index], stride, downsample)] |
| self.num_inchannels[branch_index] = num_channels[branch_index] * block.expansion |
| for _ in range(1, num_blocks[branch_index]): |
| layers.append(block(self.num_inchannels[branch_index], num_channels[branch_index])) |
| return nn.Sequential(*layers) |
|
|
| def _make_branches(self, num_branches, block, num_blocks, num_channels): |
| return nn.ModuleList([self._make_one_branch(i, block, num_blocks, num_channels) for i in range(num_branches)]) |
|
|
| def _make_fuse_layers(self): |
| if self.num_branches == 1: |
| return None |
| num_branches = self.num_branches |
| num_inchannels = self.num_inchannels |
| fuse_layers = [] |
| for i in range(num_branches if self.multi_scale_output else 1): |
| fuse_layer = [] |
| for j in range(num_branches): |
| if j > i: |
| fuse_layer.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], num_inchannels[i], 1, 1, 0, bias=False), |
| BatchNorm2d(num_inchannels[i], momentum=BN_MOMENTUM))) |
| elif j == i: |
| fuse_layer.append(None) |
| else: |
| conv3x3s = [] |
| for k in range(i - j): |
| if k == i - j - 1: |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], num_inchannels[i], 3, 2, 1, bias=False), |
| BatchNorm2d(num_inchannels[i], momentum=BN_MOMENTUM))) |
| else: |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], num_inchannels[j], 3, 2, 1, bias=False), |
| BatchNorm2d(num_inchannels[j], momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| fuse_layer.append(nn.Sequential(*conv3x3s)) |
| fuse_layers.append(nn.ModuleList(fuse_layer)) |
| return nn.ModuleList(fuse_layers) |
|
|
| def get_num_inchannels(self): |
| return self.num_inchannels |
|
|
| def forward(self, x): |
| if self.num_branches == 1: |
| return [self.branches[0](x[0])] |
| for i in range(self.num_branches): |
| x[i] = self.branches[i](x[i]) |
| x_fuse = [] |
| for i in range(len(self.fuse_layers)): |
| y = x[0] if i == 0 else self.fuse_layers[i][0](x[0]) |
| for j in range(1, self.num_branches): |
| if i == j: |
| y = y + x[j] |
| elif j > i: |
| y = y + F.interpolate(self.fuse_layers[i][j](x[j]), |
| size=[x[i].shape[2], x[i].shape[3]], mode='bilinear') |
| else: |
| y = y + self.fuse_layers[i][j](x[j]) |
| x_fuse.append(self.relu(y)) |
| return x_fuse |
|
|
| class HighResolutionNet(nn.Module): |
| def __init__(self, config, lines=False, **kwargs): |
| self.inplanes = 64 |
| self.lines = lines |
| extra = config['MODEL']['EXTRA'] |
| super().__init__() |
| self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1, bias=False) |
| self.bn1 = BatchNorm2d(64, momentum=BN_MOMENTUM) |
| self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1, bias=False) |
| self.bn2 = BatchNorm2d(64, momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.layer1 = self._make_layer(Bottleneck, 64, 64, 4) |
|
|
| self.stage2_cfg = extra['STAGE2'] |
| num_channels = self.stage2_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage2_cfg['BLOCK']] |
| num_channels = [num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition1 = self._make_transition_layer([256], num_channels) |
| self.stage2, pre_stage_channels = self._make_stage(self.stage2_cfg, num_channels) |
|
|
| self.stage3_cfg = extra['STAGE3'] |
| num_channels = self.stage3_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage3_cfg['BLOCK']] |
| num_channels = [num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition2 = self._make_transition_layer(pre_stage_channels, num_channels) |
| self.stage3, pre_stage_channels = self._make_stage(self.stage3_cfg, num_channels) |
|
|
| self.stage4_cfg = extra['STAGE4'] |
| num_channels = self.stage4_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage4_cfg['BLOCK']] |
| num_channels = [num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition3 = self._make_transition_layer(pre_stage_channels, num_channels) |
| self.stage4, pre_stage_channels = self._make_stage(self.stage4_cfg, num_channels, multi_scale_output=True) |
|
|
| self.upsample = nn.Upsample(scale_factor=2, mode='nearest') |
| final_inp_channels = sum(pre_stage_channels) + self.inplanes |
| self.head = nn.Sequential(nn.Sequential( |
| nn.Conv2d(final_inp_channels, final_inp_channels, kernel_size=1), |
| BatchNorm2d(final_inp_channels, momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True), |
| nn.Conv2d(final_inp_channels, config['MODEL']['NUM_JOINTS'], kernel_size=extra['FINAL_CONV_KERNEL']), |
| nn.Softmax(dim=1) if not self.lines else nn.Sigmoid())) |
|
|
| def _make_head(self, x, x_skip): |
| x = self.upsample(x) |
| x = torch.cat([x, x_skip], dim=1) |
| return self.head(x) |
|
|
| def _make_transition_layer(self, num_channels_pre_layer, num_channels_cur_layer): |
| num_branches_cur = len(num_channels_cur_layer) |
| num_branches_pre = len(num_channels_pre_layer) |
| transition_layers = [] |
| for i in range(num_branches_cur): |
| if i < num_branches_pre: |
| if num_channels_cur_layer[i] != num_channels_pre_layer[i]: |
| transition_layers.append(nn.Sequential( |
| nn.Conv2d(num_channels_pre_layer[i], num_channels_cur_layer[i], 3, 1, 1, bias=False), |
| BatchNorm2d(num_channels_cur_layer[i], momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| else: |
| transition_layers.append(None) |
| else: |
| conv3x3s = [] |
| for j in range(i + 1 - num_branches_pre): |
| inchannels = num_channels_pre_layer[-1] |
| outchannels = num_channels_cur_layer[i] if j == i - num_branches_pre else inchannels |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(inchannels, outchannels, 3, 2, 1, bias=False), |
| BatchNorm2d(outchannels, momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| transition_layers.append(nn.Sequential(*conv3x3s)) |
| return nn.ModuleList(transition_layers) |
|
|
| def _make_layer(self, block, inplanes, planes, blocks, stride=1): |
| downsample = None |
| if stride != 1 or inplanes != planes * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM), |
| ) |
| layers = [block(inplanes, planes, stride, downsample)] |
| inplanes = planes * block.expansion |
| for _ in range(1, blocks): |
| layers.append(block(inplanes, planes)) |
| return nn.Sequential(*layers) |
|
|
| def _make_stage(self, layer_config, num_inchannels, multi_scale_output=True): |
| num_modules = layer_config['NUM_MODULES'] |
| num_branches = layer_config['NUM_BRANCHES'] |
| num_blocks = layer_config['NUM_BLOCKS'] |
| num_channels = layer_config['NUM_CHANNELS'] |
| block = blocks_dict[layer_config['BLOCK']] |
| fuse_method = layer_config['FUSE_METHOD'] |
| modules = [] |
| for i in range(num_modules): |
| reset_multi_scale_output = True if multi_scale_output or i < num_modules - 1 else False |
| modules.append(HighResolutionModule( |
| num_branches, block, num_blocks, num_inchannels, |
| num_channels, fuse_method, reset_multi_scale_output)) |
| num_inchannels = modules[-1].get_num_inchannels() |
| return nn.Sequential(*modules), num_inchannels |
|
|
| def forward(self, x): |
| x = self.conv1(x) |
| x_skip = x.clone() |
| x = self.relu(self.bn1(x)) |
| x = self.relu(self.bn2(self.conv2(x))) |
| x = self.layer1(x) |
|
|
| x_list = [] |
| for i in range(self.stage2_cfg['NUM_BRANCHES']): |
| x_list.append(self.transition1[i](x) if self.transition1[i] is not None else x) |
| y_list = self.stage2(x_list) |
|
|
| x_list = [] |
| for i in range(self.stage3_cfg['NUM_BRANCHES']): |
| x_list.append(self.transition2[i](y_list[-1]) if self.transition2[i] is not None else y_list[i]) |
| y_list = self.stage3(x_list) |
|
|
| x_list = [] |
| for i in range(self.stage4_cfg['NUM_BRANCHES']): |
| x_list.append(self.transition3[i](y_list[-1]) if self.transition3[i] is not None else y_list[i]) |
| x = self.stage4(x_list) |
|
|
| height, width = x[0].size(2), x[0].size(3) |
| x1 = F.interpolate(x[1], size=(height, width), mode='bilinear', align_corners=False) |
| x2 = F.interpolate(x[2], size=(height, width), mode='bilinear', align_corners=False) |
| x3 = F.interpolate(x[3], size=(height, width), mode='bilinear', align_corners=False) |
| x = torch.cat([x[0], x1, x2, x3], 1) |
| return self._make_head(x, x_skip) |
|
|
| def init_weights(self, pretrained=''): |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') |
| elif isinstance(m, nn.BatchNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| if pretrained: |
| if os.path.isfile(pretrained): |
| pretrained_dict = torch.load(pretrained) |
| model_dict = self.state_dict() |
| pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict} |
| model_dict.update(pretrained_dict) |
| self.load_state_dict(model_dict) |
| else: |
| sys.exit(f'Weights {pretrained} not found.') |
|
|
| def get_cls_net(config, pretrained='', **kwargs): |
| model = HighResolutionNet(config, **kwargs) |
| model.init_weights(pretrained) |
| return model |
|
|
|
|
| |
|
|
| map_keypoints = { |
| 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, |
| 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, |
| 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, |
| 45: 9, 50: 31, 52: 32, 57: 22 |
| } |
|
|
| |
| TEMPLATE_F0: List[Tuple[float, float]] = [ |
| (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), (55, 250), (55, 430), |
| (110, 340), (165, 140), (165, 270), (165, 410), (165, 540), (527, 5), (527, 253), |
| (527, 433), (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), (940, 340), |
| (998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540), |
| (1045, 675), (435, 340), (615, 340), |
| ] |
| TEMPLATE_F1: List[Tuple[float, float]] = [ |
| (2.5, 2.5), (2.5, 139.5), (2.5, 249.5), (2.5, 430.5), (2.5, 540.5), (2.5, 678), |
| (54.5, 249.5), (54.5, 430.5), (110.5, 340.5), (164.5, 139.5), (164.5, 269), (164.5, 411), |
| (164.5, 540.5), (525, 2.5), (525, 249.5), (525, 430.5), (525, 678), (886.5, 139.5), |
| (886.5, 269), (886.5, 411), (886.5, 540.5), (940.5, 340.5), (998, 249.5), (998, 430.5), |
| (1048, 2.5), (1048, 139.5), (1048, 249.5), (1048, 430.5), (1048, 540.5), (1048, 678), |
| (434.5, 340), (615.5, 340), |
| ] |
| HOMOGRAPHY_FILL_ONLY_VALID = True |
| KP_THRESHOLD = 0.2 |
| |
| KP_H, KP_W = 360, 640 |
| HRNET_BATCH_SIZE = 16 |
|
|
|
|
| def _preprocess_batch(frames): |
| target_h, target_w = KP_H, KP_W |
| batch = [] |
| for frame in frames: |
| img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| img = cv2.resize(img, (target_w, target_h)).astype(np.float32) / 255.0 |
| batch.append(np.transpose(img, (2, 0, 1))) |
| return torch.from_numpy(np.stack(batch)).float() |
|
|
|
|
| def _extract_keypoints(heatmap: torch.Tensor, scale: int = 2): |
| b, c, h, w = heatmap.shape |
| max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1) |
| local_maxima = (max_pooled == heatmap) |
| masked = heatmap * local_maxima |
| flat = masked.view(b, c, -1) |
| scores, indices = torch.topk(flat, 1, dim=-1, sorted=False) |
| y_coords = torch.div(indices, w, rounding_mode="floor") * scale |
| x_coords = (indices % w) * scale |
| return torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1) |
|
|
|
|
| def _process_keypoints(kp_coords, threshold, w, h, batch_size): |
| kp_np = kp_coords.cpu().numpy() |
| results = [] |
| for b_idx in range(batch_size): |
| kp_dict = {} |
| valid = np.where(kp_np[b_idx, :, 0, 2] > threshold)[0] |
| for ch_idx in valid: |
| kp_dict[ch_idx + 1] = { |
| 'x': float(kp_np[b_idx, ch_idx, 0, 0]) / w, |
| 'y': float(kp_np[b_idx, ch_idx, 0, 1]) / h, |
| 'p': float(kp_np[b_idx, ch_idx, 0, 2]), |
| } |
| results.append(kp_dict) |
| return results |
|
|
|
|
| def _run_hrnet_batch(frames, model, threshold, batch_size=16): |
| if not frames or model is None: |
| return [] |
| device = next(model.parameters()).device |
| use_amp = device.type == "cuda" |
| results = [] |
| for i in range(0, len(frames), batch_size): |
| chunk = frames[i:i + batch_size] |
| batch = _preprocess_batch(chunk).to(device, non_blocking=True) |
| with torch.inference_mode(): |
| with torch.amp.autocast("cuda", enabled=use_amp): |
| heatmaps = model(batch) |
| kp_coords = _extract_keypoints(heatmaps[:, :-1, :, :], scale=2) |
| batch_kps = _process_keypoints(kp_coords, threshold, KP_W, KP_H, len(chunk)) |
| results.extend(batch_kps) |
| del heatmaps, kp_coords, batch |
| if results: |
| gc.collect() |
| return results |
|
|
|
|
| def _apply_keypoint_mapping(kp_dict): |
| return {map_keypoints[k]: v for k, v in kp_dict.items() if k in map_keypoints} |
|
|
|
|
| def _normalize_keypoints(kp_results, frames, n_keypoints): |
| keypoints = [] |
| max_frames = min(len(kp_results), len(frames)) |
| for i in range(max_frames): |
| kp_dict = kp_results[i] |
| h, w = frames[i].shape[:2] |
| frame_kps = [] |
| for idx in range(n_keypoints): |
| kp_idx = idx + 1 |
| x, y = 0, 0 |
| if kp_idx in kp_dict: |
| d = kp_dict[kp_idx] |
| if isinstance(d, dict) and 'x' in d: |
| x = int(d['x'] * w) |
| y = int(d['y'] * h) |
| frame_kps.append((x, y)) |
| keypoints.append(frame_kps) |
| return keypoints |
|
|
|
|
| def _fix_keypoints(kps: list, n: int) -> list: |
| if len(kps) < n: |
| kps += [(0, 0)] * (n - len(kps)) |
| elif len(kps) > n: |
| kps = kps[:n] |
|
|
| if kps[2] != (0,0) and kps[4] != (0,0) and kps[3] == (0,0): |
| kps[3] = kps[4]; kps[4] = (0,0) |
| if kps[0] != (0,0) and kps[4] != (0,0) and kps[1] == (0,0): |
| kps[1] = kps[4]; kps[4] = (0,0) |
| if kps[2] != (0,0) and kps[3] != (0,0) and kps[1] == (0,0) and kps[3][0] > kps[2][0]: |
| kps[1] = kps[3]; kps[3] = (0,0) |
| if kps[28] != (0,0) and kps[25] == (0,0) and kps[26] != (0,0) and kps[26][0] > kps[28][0]: |
| kps[25] = kps[28]; kps[28] = (0,0) |
| if kps[24] != (0,0) and kps[28] != (0,0) and kps[25] == (0,0): |
| kps[25] = kps[28]; kps[28] = (0,0) |
| if kps[24] != (0,0) and kps[27] != (0,0) and kps[26] == (0,0): |
| kps[26] = kps[27]; kps[27] = (0,0) |
| if kps[28] != (0,0) and kps[23] == (0,0) and kps[20] != (0,0) and kps[20][1] > kps[23][1]: |
| kps[23] = kps[20]; kps[20] = (0,0) |
| return kps |
|
|
|
|
| def _keypoints_to_float(keypoints: list) -> List[List[float]]: |
| """Convert keypoints to [[x, y], ...] float format for homography.""" |
| return [[float(x), float(y)] for x, y in keypoints] |
|
|
|
|
| def _keypoints_to_int(keypoints: list) -> List[Tuple[int, int]]: |
| """Convert keypoints to [(x, y), ...] integer format.""" |
| return [(int(round(float(kp[0]))), int(round(float(kp[1])))) for kp in keypoints] |
|
|
|
|
| def _apply_homography_refinement( |
| keypoints: List[List[float]], |
| frame: np.ndarray, |
| n_keypoints: int, |
| ) -> List[List[float]]: |
| """Refine keypoints using homography from template to frame (new-5 style).""" |
| if n_keypoints != 32 or len(TEMPLATE_F0) != 32 or len(TEMPLATE_F1) != 32: |
| return keypoints |
| frame_height, frame_width = frame.shape[:2] |
| valid_src: List[Tuple[float, float]] = [] |
| valid_dst: List[Tuple[float, float]] = [] |
| valid_indices: List[int] = [] |
| for kp_idx, kp in enumerate(keypoints): |
| if kp and len(kp) >= 2: |
| x, y = float(kp[0]), float(kp[1]) |
| if not (abs(x) < 1e-6 and abs(y) < 1e-6) and 0 <= x < frame_width and 0 <= y < frame_height: |
| valid_src.append(TEMPLATE_F1[kp_idx]) |
| valid_dst.append((x, y)) |
| valid_indices.append(kp_idx) |
| if len(valid_src) < 4: |
| return keypoints |
| src_pts = np.array(valid_src, dtype=np.float32) |
| dst_pts = np.array(valid_dst, dtype=np.float32) |
| H, _ = cv2.findHomography(src_pts, dst_pts) |
| if H is None: |
| return keypoints |
| all_template_points = np.array(TEMPLATE_F0, dtype=np.float32).reshape(-1, 1, 2) |
| adjusted_points = cv2.perspectiveTransform(all_template_points, H) |
| adjusted_points = adjusted_points.reshape(-1, 2) |
| adj_x = adjusted_points[:32, 0] |
| adj_y = adjusted_points[:32, 1] |
| valid_mask = (adj_x >= 0) & (adj_y >= 0) & (adj_x < frame_width) & (adj_y < frame_height) |
| valid_indices_set = set(valid_indices) |
| adjusted_kps: List[List[float]] = [[0.0, 0.0] for _ in range(32)] |
| for i in np.where(valid_mask)[0]: |
| if not HOMOGRAPHY_FILL_ONLY_VALID or i in valid_indices_set: |
| adjusted_kps[i] = [float(adj_x[i]), float(adj_y[i])] |
| return adjusted_kps |
|
|
|
|
| |
|
|
| |
| TEAM_1_ID = 6 |
| TEAM_2_ID = 7 |
| PLAYER_CLS_ID = 2 |
|
|
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
| team_id: Optional[int] = None |
| track_id: Optional[int] = None |
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: List[Tuple[int, int]] |
|
|
|
|
| def _smooth_boxes( |
| results: List[TVFrameResult], |
| window: int = BOX_SMOOTH_WINDOW, |
| tids_by_frame: Optional[Dict[int, List[Optional[int]]]] = None, |
| ) -> List[TVFrameResult]: |
| """Temporal box smoothing by track ID.""" |
| if window <= 1 or not results: |
| return results |
| fid_to_idx = {r.frame_id: i for i, r in enumerate(results)} |
| trajectories: Dict[int, List[Tuple[int, int, BoundingBox]]] = {} |
| for i, r in enumerate(results): |
| for j, bb in enumerate(r.boxes): |
| tid = tids_by_frame.get(r.frame_id, [None] * len(r.boxes))[j] if tids_by_frame else bb.track_id |
| if tid is not None and tid >= 0: |
| tid = int(tid) |
| if tid not in trajectories: |
| trajectories[tid] = [] |
| trajectories[tid].append((r.frame_id, j, bb)) |
| smoothed: Dict[Tuple[int, int], Tuple[int, int, int, int]] = {} |
| half = window // 2 |
| for tid, items in trajectories.items(): |
| items.sort(key=lambda x: x[0]) |
| n = len(items) |
| for k in range(n): |
| fid, box_idx, bb = items[k] |
| result_idx = fid_to_idx[fid] |
| lo = max(0, k - half) |
| hi = min(n, k + half + 1) |
| cx_list = [0.5 * (items[m][2].x1 + items[m][2].x2) for m in range(lo, hi)] |
| cy_list = [0.5 * (items[m][2].y1 + items[m][2].y2) for m in range(lo, hi)] |
| w_list = [items[m][2].x2 - items[m][2].x1 for m in range(lo, hi)] |
| h_list = [items[m][2].y2 - items[m][2].y1 for m in range(lo, hi)] |
| cx_avg = sum(cx_list) / len(cx_list) |
| cy_avg = sum(cy_list) / len(cy_list) |
| w_avg = sum(w_list) / len(w_list) |
| h_avg = sum(h_list) / len(h_list) |
| x1_new = int(round(cx_avg - w_avg / 2)) |
| y1_new = int(round(cy_avg - h_avg / 2)) |
| x2_new = int(round(cx_avg + w_avg / 2)) |
| y2_new = int(round(cy_avg + h_avg / 2)) |
| smoothed[(result_idx, box_idx)] = (x1_new, y1_new, x2_new, y2_new) |
| out: List[TVFrameResult] = [] |
| for i, r in enumerate(results): |
| new_boxes: List[BoundingBox] = [] |
| for j, bb in enumerate(r.boxes): |
| key = (i, j) |
| if key in smoothed: |
| x1, y1, x2, y2 = smoothed[key] |
| new_boxes.append(BoundingBox(x1=x1, y1=y1, x2=x2, y2=y2, cls_id=int(bb.cls_id), conf=round(float(bb.conf), 2), team_id=bb.team_id, track_id=bb.track_id)) |
| else: |
| new_boxes.append(BoundingBox(x1=int(bb.x1), y1=int(bb.y1), x2=int(bb.x2), y2=int(bb.y2), cls_id=int(bb.cls_id), conf=round(float(bb.conf), 2), team_id=bb.team_id, track_id=bb.track_id)) |
| out.append(TVFrameResult(frame_id=r.frame_id, boxes=new_boxes, keypoints=r.keypoints)) |
| return out |
|
|
|
|
| |
|
|
| class Miner: |
| def __init__(self, path_hf_repo: Path) -> None: |
| self.path_hf_repo = Path(path_hf_repo) |
| self.is_start = False |
| self._executor = ThreadPoolExecutor(max_workers=2) |
|
|
| global _OSNET_MODEL, osnet_weight_path |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.device = device |
|
|
| |
| models_dir = self.path_hf_repo |
| person_onnx = models_dir / "player_detect.onnx" |
| self._person_model_onnx = person_onnx.exists() |
| if person_onnx.exists(): |
| self.bbox_model = YOLO(str(person_onnx), task="detect") |
| print("β
Person Model Loaded (ONNX)") |
| else: |
| self.bbox_model = None |
| print("β οΈ Person model not found (tried player_detect.onnx)") |
|
|
| |
| osnet_weight_path = self.path_hf_repo / "osnet_model.pth.tar-100" |
| if osnet_weight_path.exists(): |
| _OSNET_MODEL = load_osnet(device, osnet_weight_path) |
| print("β
Team Classifier Loaded (OSNet)") |
| else: |
| _OSNET_MODEL = None |
| print(f"β οΈ OSNet weights not found at {osnet_weight_path}. Using HSV fallback.") |
|
|
| |
| kp_config_file = "hrnetv2_w48.yaml" |
| kp_weights_file = "keypoint_detect.pt" |
| config_path = Path(kp_config_file) if Path(kp_config_file).exists() else self.path_hf_repo / kp_config_file |
| weights_path = Path(kp_weights_file) if Path(kp_weights_file).exists() else self.path_hf_repo / kp_weights_file |
| cfg = yaml.safe_load(open(config_path, 'r')) |
| hrnet = get_cls_net(cfg) |
| state = torch.load(weights_path, map_location=device, weights_only=False) |
| hrnet.load_state_dict(state) |
| hrnet.to(device).eval() |
| self.keypoints_model = hrnet |
| print("β
HRNet Keypoints Model Loaded") |
|
|
| |
| self._person_tracker_state: Dict[int, Tuple[Tuple[float, float, float, float], Tuple[float, float, float, float], int]] = {} |
| self._person_tracker_next_id = 0 |
| self._track_id_to_team_votes: Dict[int, Dict[str, int]] = {} |
| self._track_id_to_class_votes: Dict[int, Dict[int, int]] = {} |
| self._prev_batch_tail_tid_counts: Dict[int, int] = {} |
|
|
| def reset_for_new_video(self) -> None: |
| self._person_tracker_state.clear() |
| self._person_tracker_next_id = 0 |
| self._track_id_to_team_votes.clear() |
| self._track_id_to_class_votes.clear() |
| self._prev_batch_tail_tid_counts.clear() |
|
|
| def __repr__(self) -> str: |
| return ( |
| f"BBox Model: {type(self.bbox_model).__name__}\n" |
| f"Keypoints Model: {type(self.keypoints_model).__name__}\n" |
| f"Team Clustering: OSNet + KMeans" |
| ) |
|
|
| def _bbox_task(self, images: list[ndarray], offset: int = 0) -> list[list[BoundingBox]]: |
| start_time = time.time() |
| """Person detection pipeline (new-2 style): tracking, class votes, OSNet teams, adjust.""" |
| if not images: |
| return [] |
| if self.bbox_model is None: |
| return [[] for _ in images] |
| try: |
| kw = {"imgsz": PERSON_MODEL_IMG_SIZE, "conf": PERSON_CONF, "verbose": False} |
| if PERSON_HALF and not self._person_model_onnx: |
| try: |
| if next(self.bbox_model.model.parameters()).is_cuda: |
| kw["half"] = True |
| except Exception: |
| pass |
| batch_res = self.bbox_model(images, **kw) |
| except Exception: |
| return [[] for _ in images] |
| if not isinstance(batch_res, list): |
| batch_res = [batch_res] if batch_res is not None else [] |
| self._person_tracker_state, self._person_tracker_next_id, person_track_ids = _assign_person_track_ids( |
| self._person_tracker_state, self._person_tracker_next_id, batch_res, TRACK_IOU_THRESH |
| ) |
| person_res = batch_res |
| print(f"Person detection took {time.time() - start_time:.2f} seconds") |
|
|
| start_time = time.time() |
| |
| bboxes_by_frame: Dict[int, List[BoundingBox]] = {} |
| track_ids_by_frame: Dict[int, List[Optional[int]]] = {} |
| for i, det_p in enumerate(person_res): |
| frame_id = offset + i |
| boxes_raw: List[BoundingBox] = [] |
| track_ids_raw: List[Optional[int]] = [] |
| if det_p is not None and getattr(det_p, "boxes", None) is not None and len(det_p.boxes) > 0: |
| b = det_p.boxes |
| xyxy = b.xyxy.cpu().numpy() |
| confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32) |
| clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32) |
| tids = person_track_ids[i] if i < len(person_track_ids) and len(person_track_ids[i]) == len(clss) else [-1] * len(clss) |
| for (x1, y1, x2, y2), c, cf, tid in zip(xyxy, clss, confs, tids): |
| c, tid = int(c), int(tid) |
| x1r, y1r, x2r, y2r = int(round(x1)), int(round(y1)), int(round(x2)), int(round(y2)) |
| tid_out = tid if tid >= 0 else None |
| if self._person_model_onnx: |
| if c == 0: |
| boxes_raw.append(BoundingBox(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C_PLAYER, conf=float(cf), team_id=None, track_id=tid_out)) |
| track_ids_raw.append(tid_out) |
| elif c == 1: |
| boxes_raw.append(BoundingBox(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C_REFEREE, conf=float(cf), team_id=None, track_id=tid_out)) |
| track_ids_raw.append(tid_out) |
| elif c == 2: |
| boxes_raw.append(BoundingBox(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C_GOALKEEPER, conf=float(cf), team_id=None, track_id=tid_out)) |
| track_ids_raw.append(tid_out) |
| else: |
| if c == 0: |
| continue |
| internal_cls = {1: _C_GOALKEEPER, 2: _C_PLAYER, 3: _C_REFEREE}.get(c, _C_PLAYER) |
| boxes_raw.append(BoundingBox(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=internal_cls, conf=float(cf), team_id=None, track_id=tid_out)) |
| track_ids_raw.append(tid_out) |
| bboxes_by_frame[frame_id] = boxes_raw |
| track_ids_by_frame[frame_id] = track_ids_raw |
|
|
| |
| if len(images) > NOISE_TAIL_FRAMES: |
| tid_counts: Dict[int, int] = {} |
| tid_first_frame: Dict[int, int] = {} |
| for fid in range(offset, offset + len(images)): |
| for tid in track_ids_by_frame.get(fid, []): |
| if tid is not None and tid >= 0: |
| t = int(tid) |
| tid_counts[t] = tid_counts.get(t, 0) + 1 |
| if t not in tid_first_frame or fid < tid_first_frame[t]: |
| tid_first_frame[t] = fid |
| for t, prev_count in self._prev_batch_tail_tid_counts.items(): |
| tid_counts[t] = tid_counts.get(t, 0) + prev_count |
| if prev_count > 0: |
| tid_first_frame[t] = offset + len(images) |
| boundary = offset + len(images) - NOISE_TAIL_FRAMES |
| noise_tids = {t for t, count in tid_counts.items() if count < NOISE_MIN_APPEARANCES and tid_first_frame.get(t, 0) < boundary} |
| for fid in range(offset, offset + len(images)): |
| boxes = bboxes_by_frame.get(fid, []) |
| tids = track_ids_by_frame.get(fid, [None] * len(boxes)) |
| keep = [j for j in range(len(boxes)) if tids[j] is None or int(tids[j]) not in noise_tids] |
| bboxes_by_frame[fid] = [boxes[j] for j in keep] |
| track_ids_by_frame[fid] = [tids[j] for j in keep] |
| tail_start = offset + len(images) - NOISE_TAIL_FRAMES |
| self._prev_batch_tail_tid_counts = {} |
| for fid in range(tail_start, offset + len(images)): |
| for tid in track_ids_by_frame.get(fid, []): |
| if tid is not None and tid >= 0: |
| t = int(tid) |
| self._prev_batch_tail_tid_counts[t] = self._prev_batch_tail_tid_counts.get(t, 0) + 1 |
|
|
| |
| for i in range(len(images)): |
| frame_id = offset + i |
| boxes_raw = bboxes_by_frame[frame_id] |
| track_ids_raw = track_ids_by_frame[frame_id] |
| for idx, bb in enumerate(boxes_raw): |
| tid = track_ids_raw[idx] if idx < len(track_ids_raw) else bb.track_id |
| if tid is not None and int(tid) >= 0: |
| if tid not in self._track_id_to_class_votes: |
| self._track_id_to_class_votes[tid] = {} |
| self._track_id_to_class_votes[tid][int(bb.cls_id)] = self._track_id_to_class_votes[tid].get(int(bb.cls_id), 0) + 1 |
|
|
| |
| for fid in range(offset, offset + len(images)): |
| new_boxes: List[BoundingBox] = [] |
| tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) |
| for box_idx, box in enumerate(bboxes_by_frame[fid]): |
| tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None |
| if tid is not None and tid >= 0 and tid in self._track_id_to_class_votes: |
| votes = self._track_id_to_class_votes[tid] |
| ref_votes = votes.get(_C_REFEREE, 0) |
| gk_votes = votes.get(_C_GOALKEEPER, 0) |
| if ref_votes > CLASS_VOTE_MAJORITY: |
| majority_cls = _C_REFEREE |
| elif gk_votes > CLASS_VOTE_MAJORITY: |
| majority_cls = _C_GOALKEEPER |
| else: |
| majority_cls = max(votes.items(), key=lambda x: x[1])[0] |
| new_boxes.append(BoundingBox(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=majority_cls, conf=box.conf, team_id=None, track_id=tid)) |
| else: |
| new_boxes.append(box) |
| bboxes_by_frame[fid] = new_boxes |
|
|
| |
| if INTERP_TRACK_GAPS and len(images) > 1: |
| track_to_frames: Dict[int, List[Tuple[int, BoundingBox]]] = {} |
| for fid in range(offset, offset + len(images)): |
| for bb, tid in zip(bboxes_by_frame[fid], track_ids_by_frame.get(fid, [])): |
| if tid is not None and int(tid) >= 0: |
| track_to_frames.setdefault(int(tid), []).append((fid, bb)) |
| to_add: Dict[int, List[Tuple[BoundingBox, int]]] = {} |
| for t, pairs in track_to_frames.items(): |
| pairs.sort(key=lambda p: p[0]) |
| for i in range(len(pairs) - 1): |
| f1, b1 = pairs[i] |
| f2, b2 = pairs[i + 1] |
| if f2 - f1 <= 1: |
| continue |
| for g in range(f1 + 1, f2): |
| w = (g - f1) / (f2 - f1) |
| interp = BoundingBox( |
| x1=int(round((1 - w) * b1.x1 + w * b2.x1)), |
| y1=int(round((1 - w) * b1.y1 + w * b2.y1)), |
| x2=int(round((1 - w) * b1.x2 + w * b2.x2)), |
| y2=int(round((1 - w) * b1.y2 + w * b2.y2)), |
| cls_id=b2.cls_id, conf=b2.conf, team_id=b2.team_id, track_id=t |
| ) |
| to_add.setdefault(g, []).append((interp, t)) |
| for g, add_list in to_add.items(): |
| bboxes_by_frame[g] = list(bboxes_by_frame.get(g, [])) |
| track_ids_by_frame[g] = list(track_ids_by_frame.get(g, [])) |
| for interp_box, tid in add_list: |
| bboxes_by_frame[g].append(interp_box) |
| track_ids_by_frame[g].append(tid) |
|
|
| |
| try: |
| batch_boxes_for_osnet = {offset + i: bboxes_by_frame.get(offset + i, []) for i in range(len(images))} |
| _classify_teams_batch(images, batch_boxes_for_osnet, self.device) |
| for fid in batch_boxes_for_osnet: |
| bboxes_by_frame[fid] = batch_boxes_for_osnet[fid] |
| except Exception: |
| pass |
|
|
| |
| reid_team_per_frame: List[List[Optional[str]]] = [] |
| for fi in range(len(images)): |
| frame_id = offset + fi |
| boxes_f = bboxes_by_frame.get(frame_id, []) |
| tids_f = track_ids_by_frame.get(frame_id, []) |
| row: List[Optional[str]] = [] |
| for bi, box in enumerate(boxes_f): |
| tid = tids_f[bi] if bi < len(tids_f) else box.track_id |
| team_str = str(box.team_id) if box.team_id is not None else None |
| if tid is not None and tid >= 0 and team_str: |
| if tid not in self._track_id_to_team_votes: |
| self._track_id_to_team_votes[tid] = {} |
| self._track_id_to_team_votes[tid][team_str] = self._track_id_to_team_votes[tid].get(team_str, 0) + 1 |
| row.append(team_str) |
| reid_team_per_frame.append(row) |
| for fid in range(offset, offset + len(images)): |
| fi = fid - offset |
| new_boxes = [] |
| tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) |
| for box_idx, box in enumerate(bboxes_by_frame[fid]): |
| tid = tids_fid[box_idx] if box_idx < len(tids_fid) else box.track_id |
| team_from_reid = reid_team_per_frame[fi][box_idx] if fi < len(reid_team_per_frame) and box_idx < len(reid_team_per_frame[fi]) else None |
| default_team = team_from_reid or (str(box.team_id) if box.team_id is not None else None) |
| if tid is not None and tid >= 0 and tid in self._track_id_to_team_votes and self._track_id_to_team_votes[tid]: |
| majority_team = max(self._track_id_to_team_votes[tid].items(), key=lambda x: x[1])[0] |
| else: |
| majority_team = default_team |
| team_id_out = int(majority_team) if majority_team and majority_team.isdigit() else (int(majority_team) if majority_team else None) |
| new_boxes.append(BoundingBox(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=box.cls_id, conf=box.conf, team_id=team_id_out, track_id=tid)) |
| bboxes_by_frame[fid] = new_boxes |
|
|
| |
| H, W = images[0].shape[:2] if images else (0, 0) |
| for fid in range(offset, offset + len(images)): |
| orig = bboxes_by_frame[fid] |
| tids = track_ids_by_frame.get(fid, [None] * len(orig)) |
| adjusted = _adjust_boxes(orig, W, H, do_goalkeeper_dedup=True, do_referee_disambiguation=True) |
| adjusted_tids: List[Optional[int]] = [] |
| used = set() |
| for ab in adjusted: |
| for oi, ob in enumerate(orig): |
| if oi in used: |
| continue |
| if ob.x1 == ab.x1 and ob.y1 == ab.y1 and ob.x2 == ab.x2 and ob.y2 == ab.y2: |
| adjusted_tids.append(tids[oi] if oi < len(tids) else None) |
| used.add(oi) |
| break |
| bboxes_by_frame[fid] = adjusted |
|
|
| print(f"Post-processing took {time.time() - start_time:.2f} seconds") |
| |
| out: List[List[BoundingBox]] = [] |
| for i in range(len(images)): |
| boxes = bboxes_by_frame.get(offset + i, []) |
| for bb in boxes: |
| bb.cls_id = _CLS_TO_VALIDATOR.get(int(bb.cls_id), int(bb.cls_id)) |
| out.append(boxes) |
| return out |
|
|
| def _keypoint_task(self, images: list[ndarray], n_keypoints: int) -> list[list]: |
| start_time = time.time() |
| """HRNet keypoints + homography refinement.""" |
| if not images: |
| return [] |
| if self.keypoints_model is None: |
| return [[(0, 0)] * n_keypoints for _ in images] |
| try: |
| raw_kps = _run_hrnet_batch(images, self.keypoints_model, KP_THRESHOLD, batch_size=HRNET_BATCH_SIZE) |
| except Exception as e: |
| print(f"Error in _keypoint_task: {e}") |
| return [[(0, 0)] * n_keypoints for _ in images] |
| raw_kps = [_apply_keypoint_mapping(kp) for kp in raw_kps] if raw_kps else [] |
| keypoints = _normalize_keypoints(raw_kps, images, n_keypoints) if raw_kps else [[(0, 0)] * n_keypoints for _ in images] |
| keypoints = [_fix_keypoints(kps, n_keypoints) for kps in keypoints] |
| keypoints = [_keypoints_to_float(kps) for kps in keypoints] |
| print(f"Keypoint task completed in {time.time() - start_time:.2f} seconds") |
| |
| |
| |
| |
| |
| |
| |
| return keypoints |
|
|
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[TVFrameResult]: |
|
|
| if not self.is_start: |
| self.is_start = True |
|
|
| images = list(batch_images) |
| if offset == 0: |
| self.reset_for_new_video() |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
| |
| future_bbox = self._executor.submit(self._bbox_task, images, offset) |
| future_kp = self._executor.submit(self._keypoint_task, images, n_keypoints) |
| bbox_per_frame = future_bbox.result() |
| keypoints = future_kp.result() |
|
|
| return [ |
| TVFrameResult(frame_id=offset + i, boxes=bbox_per_frame[i], keypoints=keypoints[i]) |
| for i in range(len(images)) |
| ] |