from __future__ import annotations import gc import math import os import threading import time from itertools import combinations from pathlib import Path from concurrent.futures import ThreadPoolExecutor from collections import OrderedDict, defaultdict from typing import Any, Dict, Iterable, List, Optional import cv2 import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import yaml from numpy import ndarray from PIL import Image import torchvision.transforms as T from sklearn.cluster import KMeans from pydantic import BaseModel from ultralytics import YOLO try: from scipy.optimize import linear_sum_assignment as _linear_sum_assignment except ImportError: _linear_sum_assignment = None _f0 = True BatchNorm2d = nn.BatchNorm2d _v0 = 0.1 def _c0(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d: return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False) class _B0(nn.Module): expansion = 1 def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: Any = None): super().__init__() self.conv1 = _c0(inplanes, planes, stride) self.bn1 = BatchNorm2d(planes, momentum=_v0) self.relu = nn.ReLU(inplace=True) self.conv2 = _c0(planes, planes) self.bn2 = BatchNorm2d(planes, momentum=_v0) self.downsample = downsample self.stride = stride def forward(self, x: torch.Tensor) -> torch.Tensor: residual = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) if self.downsample is not None: residual = self.downsample(x) out += residual out = self.relu(out) return out class _B1(nn.Module): expansion = 4 def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: Any = None): super().__init__() self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) self.bn1 = BatchNorm2d(planes, momentum=_v0) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn2 = BatchNorm2d(planes, momentum=_v0) self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False) self.bn3 = BatchNorm2d(planes * self.expansion, momentum=_v0) self.relu = nn.ReLU(inplace=True) self.downsample = downsample self.stride = stride def forward(self, x: torch.Tensor) -> torch.Tensor: residual = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out = self.relu(out) out = self.conv3(out) out = self.bn3(out) if self.downsample is not None: residual = self.downsample(x) out += residual out = self.relu(out) return out _d0 = {"BASIC": _B0, "BOTTLENECK": _B1} def _block_from_cfg(block_key: Any) -> type: if isinstance(block_key, bool): return _d0["BOTTLENECK"] if block_key else _d0["BASIC"] key = str(block_key).upper() if block_key else "BASIC" if key not in _d0: key = "BASIC" return _d0[key] class _H0(nn.Module): def __init__(self, num_branches: int, blocks: type, num_blocks: list, num_inchannels: list, num_channels: list, fuse_method: str, multi_scale_output: bool = True): super().__init__() if isinstance(blocks, bool): blocks = _d0["BOTTLENECK"] if blocks else _d0["BASIC"] self._check_branches(num_branches, blocks, num_blocks, num_inchannels, num_channels) self.num_inchannels = num_inchannels self.fuse_method = fuse_method self.num_branches = num_branches self.multi_scale_output = multi_scale_output self.branches = self._make_branches(num_branches, blocks, num_blocks, num_channels) self.fuse_layers = self._make_fuse_layers() self.relu = nn.ReLU(inplace=True) def _check_branches(self, num_branches: int, blocks: type, num_blocks: list, num_inchannels: list, num_channels: list) -> None: if num_branches != len(num_blocks): raise ValueError("NUM_BRANCHES <> NUM_BLOCKS") if num_branches != len(num_channels): raise ValueError("NUM_BRANCHES <> NUM_CHANNELS") if num_branches != len(num_inchannels): raise ValueError("NUM_BRANCHES <> NUM_INCHANNELS") def _make_one_branch(self, branch_index: int, block: type, num_blocks: list, num_channels: list, stride: int = 1) -> nn.Sequential: if isinstance(block, bool): block = _d0["BOTTLENECK"] if block else _d0["BASIC"] downsample = None if stride != 1 or self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion: downsample = nn.Sequential( nn.Conv2d(self.num_inchannels[branch_index], num_channels[branch_index] * block.expansion, kernel_size=1, stride=stride, bias=False), BatchNorm2d(num_channels[branch_index] * block.expansion, momentum=_v0), ) layers = [block(self.num_inchannels[branch_index], num_channels[branch_index], stride, downsample)] self.num_inchannels[branch_index] = num_channels[branch_index] * block.expansion for _ in range(1, num_blocks[branch_index]): layers.append(block(self.num_inchannels[branch_index], num_channels[branch_index])) return nn.Sequential(*layers) def _make_branches(self, num_branches: int, block: type, num_blocks: list, num_channels: list) -> nn.ModuleList: return nn.ModuleList([self._make_one_branch(i, block, num_blocks, num_channels) for i in range(num_branches)]) def _make_fuse_layers(self) -> nn.ModuleList | None: if self.num_branches == 1: return None num_branches = self.num_branches num_inchannels = self.num_inchannels fuse_layers = [] for i in range(num_branches if self.multi_scale_output else 1): fuse_layer = [] for j in range(num_branches): if j > i: fuse_layer.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[i], 1, 1, 0, bias=False), BatchNorm2d(num_inchannels[i], momentum=_v0))) elif j == i: fuse_layer.append(None) else: conv3x3s = [] for k in range(i - j): if k == i - j - 1: conv3x3s.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[i], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[i], momentum=_v0))) else: conv3x3s.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[j], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[j], momentum=_v0), nn.ReLU(inplace=True))) fuse_layer.append(nn.Sequential(*conv3x3s)) fuse_layers.append(nn.ModuleList(fuse_layer)) return nn.ModuleList(fuse_layers) def get_num_inchannels(self) -> list: return self.num_inchannels def forward(self, x: list) -> list: if self.num_branches == 1: return [self.branches[0](x[0])] for i in range(self.num_branches): x[i] = self.branches[i](x[i]) x_fuse = [] for i in range(len(self.fuse_layers)): y = x[0] if i == 0 else self.fuse_layers[i][0](x[0]) for j in range(1, self.num_branches): if i == j: y = y + x[j] elif j > i: y = y + F.interpolate(self.fuse_layers[i][j](x[j]), size=[x[i].shape[2], x[i].shape[3]], mode="bilinear") else: y = y + self.fuse_layers[i][j](x[j]) x_fuse.append(self.relu(y)) return x_fuse class _H1(nn.Module): def __init__(self, config: dict, lines: bool = False, **kwargs: Any) -> None: self.inplanes = 64 self.lines = lines extra = config["MODEL"]["EXTRA"] super().__init__() self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=2, padding=1, bias=False) self.bn1 = BatchNorm2d(self.inplanes, momentum=_v0) self.conv2 = nn.Conv2d(self.inplanes, self.inplanes, kernel_size=3, stride=2, padding=1, bias=False) self.bn2 = BatchNorm2d(self.inplanes, momentum=_v0) self.relu = nn.ReLU(inplace=True) self.layer1 = self._make_layer(_B1, 64, 64, 4) self.stage2_cfg = extra["STAGE2"] num_channels = [extra["STAGE2"]["NUM_CHANNELS"][i] * _block_from_cfg(extra["STAGE2"]["BLOCK"]).expansion for i in range(len(extra["STAGE2"]["NUM_CHANNELS"]))] self.transition1 = self._make_transition_layer([256], num_channels) self.stage2, pre_stage_channels = self._make_stage(self.stage2_cfg, num_channels) self.stage3_cfg = extra["STAGE3"] num_channels = [extra["STAGE3"]["NUM_CHANNELS"][i] * _block_from_cfg(extra["STAGE3"]["BLOCK"]).expansion for i in range(len(extra["STAGE3"]["NUM_CHANNELS"]))] self.transition2 = self._make_transition_layer(pre_stage_channels, num_channels) self.stage3, pre_stage_channels = self._make_stage(self.stage3_cfg, num_channels) self.stage4_cfg = extra["STAGE4"] num_channels = [extra["STAGE4"]["NUM_CHANNELS"][i] * _block_from_cfg(extra["STAGE4"]["BLOCK"]).expansion for i in range(len(extra["STAGE4"]["NUM_CHANNELS"]))] self.transition3 = self._make_transition_layer(pre_stage_channels, num_channels) self.stage4, pre_stage_channels = self._make_stage(self.stage4_cfg, num_channels, multi_scale_output=True) self.upsample = nn.Upsample(scale_factor=2, mode="nearest") final_inp_channels = sum(pre_stage_channels) + self.inplanes self.head = nn.Sequential( nn.Conv2d(final_inp_channels, final_inp_channels, kernel_size=1), BatchNorm2d(final_inp_channels, momentum=_v0), nn.ReLU(inplace=True), nn.Conv2d(final_inp_channels, config["MODEL"]["NUM_JOINTS"], kernel_size=extra["FINAL_CONV_KERNEL"]), nn.Softmax(dim=1) if not self.lines else nn.Sigmoid(), ) def _make_head(self, x: torch.Tensor, x_skip: torch.Tensor) -> torch.Tensor: x = self.upsample(x) x = torch.cat([x, x_skip], dim=1) return self.head(x) def _make_transition_layer(self, num_channels_pre_layer: list, num_channels_cur_layer: list) -> nn.ModuleList: num_branches_cur = len(num_channels_cur_layer) num_branches_pre = len(num_channels_pre_layer) transition_layers = [] for i in range(num_branches_cur): if i < num_branches_pre: if num_channels_cur_layer[i] != num_channels_pre_layer[i]: transition_layers.append(nn.Sequential( nn.Conv2d(num_channels_pre_layer[i], num_channels_cur_layer[i], 3, 1, 1, bias=False), BatchNorm2d(num_channels_cur_layer[i], momentum=_v0), nn.ReLU(inplace=True), )) else: transition_layers.append(None) else: conv3x3s = [] for j in range(i + 1 - num_branches_pre): inchannels = num_channels_pre_layer[-1] outchannels = num_channels_cur_layer[i] if j == i - num_branches_pre else inchannels conv3x3s.append(nn.Sequential( nn.Conv2d(inchannels, outchannels, 3, 2, 1, bias=False), BatchNorm2d(outchannels, momentum=_v0), nn.ReLU(inplace=True), )) transition_layers.append(nn.Sequential(*conv3x3s)) return nn.ModuleList(transition_layers) def _make_layer(self, block: type, inplanes: int, planes: int, blocks: int, stride: int = 1) -> nn.Sequential: if isinstance(block, bool): block = _d0["BOTTLENECK"] if block else _d0["BASIC"] downsample = None if stride != 1 or inplanes != planes * block.expansion: downsample = nn.Sequential( nn.Conv2d(inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False), BatchNorm2d(planes * block.expansion, momentum=_v0), ) layers = [block(inplanes, planes, stride, downsample)] inplanes = planes * block.expansion for _ in range(1, blocks): layers.append(block(inplanes, planes)) return nn.Sequential(*layers) def _make_stage(self, layer_config: dict, num_inchannels: list, multi_scale_output: bool = True) -> tuple: num_modules = layer_config["NUM_MODULES"] num_blocks = layer_config["NUM_BLOCKS"] num_channels = layer_config["NUM_CHANNELS"] block = _block_from_cfg(layer_config["BLOCK"]) fuse_method = layer_config["FUSE_METHOD"] modules = [] for i in range(num_modules): reset_multi_scale_output = False if (not multi_scale_output and i == num_modules - 1) else True modules.append(_H0( layer_config["NUM_BRANCHES"], block, num_blocks, num_inchannels, num_channels, fuse_method, reset_multi_scale_output, )) num_inchannels = modules[-1].get_num_inchannels() return nn.Sequential(*modules), num_inchannels def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.conv1(x) x_skip = x.clone() x = self.bn1(x) x = self.relu(x) x = self.conv2(x) x = self.bn2(x) x = self.relu(x) x = self.layer1(x) x_list = [self.transition1[i](x) if self.transition1[i] is not None else x for i in range(self.stage2_cfg["NUM_BRANCHES"])] y_list = self.stage2(x_list) x_list = [self.transition2[i](y_list[-1]) if self.transition2[i] is not None else y_list[i] for i in range(self.stage3_cfg["NUM_BRANCHES"])] y_list = self.stage3(x_list) x_list = [self.transition3[i](y_list[-1]) if self.transition3[i] is not None else y_list[i] for i in range(self.stage4_cfg["NUM_BRANCHES"])] x = self.stage4(x_list) height, width = x[0].size(2), x[0].size(3) x1 = F.interpolate(x[1], size=(height, width), mode="bilinear", align_corners=False) x2 = F.interpolate(x[2], size=(height, width), mode="bilinear", align_corners=False) x3 = F.interpolate(x[3], size=(height, width), mode="bilinear", align_corners=False) x = torch.cat([x[0], x1, x2, x3], 1) return self._make_head(x, x_skip) def init_weights(self, pretrained: str = "") -> None: for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") elif isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) if pretrained and os.path.isfile(pretrained): w = torch.load(pretrained, map_location="cpu", weights_only=False) self.load_state_dict({k: v for k, v in w.items() if k in self.state_dict()}, strict=False) def _g0(config: dict, pretrained: str = "", **kwargs: Any) -> _H1: model = _H1(config, **kwargs) model.init_weights(pretrained) return model _K0 = { 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, 45: 9, 50: 31, 52: 32, 57: 22, } # ── Keypoint mapping & inference helpers ───────────────────────── map_keypoints = { 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, 45: 9, 50: 31, 52: 32, 57: 22 } # Template keypoints for homography refinement (new-5 style) TEMPLATE_F0: List[Tuple[float, float]] = [ (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), (55, 250), (55, 430), (110, 340), (165, 140), (165, 270), (165, 410), (165, 540), (527, 5), (527, 253), (527, 433), (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), (940, 340), (998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540), (1045, 675), (435, 340), (615, 340), ] TEMPLATE_F1: List[Tuple[float, float]] = [ (2.5, 2.5), (2.5, 139.5), (2.5, 249.5), (2.5, 430.5), (2.5, 540.5), (2.5, 678), (54.5, 249.5), (54.5, 430.5), (110.5, 340.5), (164.5, 139.5), (164.5, 269), (164.5, 411), (164.5, 540.5), (525, 2.5), (525, 249.5), (525, 430.5), (525, 678), (886.5, 139.5), (886.5, 269), (886.5, 411), (886.5, 540.5), (940.5, 340.5), (998, 249.5), (998, 430.5), (1048, 2.5), (1048, 139.5), (1048, 249.5), (1048, 430.5), (1048, 540.5), (1048, 678), (434.5, 340), (615.5, 340), ] HOMOGRAPHY_FILL_ONLY_VALID = True # Step8 (example_miner-style): homography + project template + fill; when True, skip _apply_homography_refinement and use step8 only STEP8_ENABLED = True STEP8_FILL_MISSING = True # True = fill all in-frame warped points; False = only detected indices KP_THRESHOLD = 0.2 # new-5 style (was 0.3) # HRNet keypoint input size; smaller = faster, less accurate (540×960 = full) _KP_H, _KP_W = 540, 960 # _KP_H, _KP_W = 432, 768 def _p0(frames: list) -> torch.Tensor: target_size = (_KP_H, _KP_W) batch = [] for frame in frames: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = cv2.resize(frame_rgb, (target_size[1], target_size[0])) img = img.astype(np.float32) / 255.0 img = np.transpose(img, (2, 0, 1)) batch.append(img) return torch.from_numpy(np.stack(batch)).float() def _e0(heatmap: torch.Tensor, scale: int = 2, max_keypoints: int = 1) -> torch.Tensor: batch_size, n_channels, height, width = heatmap.shape max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1) local_maxima = max_pooled == heatmap masked_heatmap = heatmap * local_maxima flat_heatmap = masked_heatmap.view(batch_size, n_channels, -1) scores, indices = torch.topk(flat_heatmap, max_keypoints, dim=-1, sorted=False) y_coords = torch.div(indices, width, rounding_mode="floor") * scale x_coords = (indices % width) * scale return torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1) def _p1(kp_coords: torch.Tensor, kp_threshold: float, w: int, h: int, batch_size: int) -> list: kp_np = kp_coords.cpu().numpy() batch_results = [] for batch_idx in range(batch_size): kp_dict = {} valid_kps = kp_np[batch_idx, :, 0, 2] > kp_threshold for ch_idx in np.where(valid_kps)[0]: x = float(kp_np[batch_idx, ch_idx, 0, 0]) / w y = float(kp_np[batch_idx, ch_idx, 0, 1]) / h p = float(kp_np[batch_idx, ch_idx, 0, 2]) kp_dict[int(ch_idx) + 1] = {"x": x, "y": y, "p": p} batch_results.append(kp_dict) return batch_results def _g1(kp_points: dict) -> dict: return {_K0[k]: v for k, v in kp_points.items() if k in _K0} def _i0(frames: list, model: nn.Module, kp_threshold: float, device: str, batch_size: int = 2) -> list: results = [] model_device = next(model.parameters()).device use_amp = model_device.type == "cuda" for i in range(0, len(frames), batch_size): current_batch_size = min(batch_size, len(frames) - i) batch_frames = frames[i : i + current_batch_size] batch = _p0(batch_frames).to(model_device, non_blocking=True) with torch.no_grad(): with torch.amp.autocast("cuda", enabled=use_amp): heatmaps = model(batch) kp_coords = _e0(heatmaps[:, :-1, :, :], scale=2, max_keypoints=1) batch_results = _p1(kp_coords, kp_threshold, _KP_W, _KP_H, current_batch_size) results.extend([_g1(kp) for kp in batch_results]) del heatmaps, kp_coords, batch gc.collect() if model_device.type == "cuda": torch.cuda.empty_cache() return results def _x0(frames: list, model: nn.Module, kp_threshold: float, device: str = "cpu", batch_size: int = 2) -> list: return _i0(frames, model, kp_threshold, device, batch_size) def _normalize_keypoints_xyp(kp_results: list | None, frames: list, n_keypoints: int) -> list: """Produce [(x, y, p), ...] per frame for fix_keypoints_pri thresholding.""" if not kp_results: return [] keypoints = [] for i in range(min(len(kp_results), len(frames))): kp_dict = kp_results[i] h, w = frames[i].shape[:2] frame_kps = [] for idx in range(n_keypoints): kp_idx = idx + 1 x, y, p = 0, 0, 0.0 if kp_dict and isinstance(kp_dict, dict) and kp_idx in kp_dict: d = kp_dict[kp_idx] if isinstance(d, dict) and "x" in d: x = int(d["x"] * w) y = int(d["y"] * h) p = float(d.get("p", 0.0)) frame_kps.append((x, y, p)) keypoints.append(frame_kps) return keypoints def _n0(keypoints_result: list | None, batch_images: list, n_keypoints: int) -> list: keypoints = [] if not keypoints_result: return [] for frame_number_in_batch, kp_dict in enumerate(keypoints_result): if frame_number_in_batch >= len(batch_images): break frame_keypoints = [] try: height, width = batch_images[frame_number_in_batch].shape[:2] if kp_dict and isinstance(kp_dict, dict): for idx in range(32): x, y = 0, 0 kp_idx = idx + 1 if kp_idx in kp_dict: kp_data = kp_dict[kp_idx] if isinstance(kp_data, dict) and "x" in kp_data and "y" in kp_data: x, y = int(kp_data["x"] * width), int(kp_data["y"] * height) frame_keypoints.append((x, y)) else: frame_keypoints = [(0, 0)] * 32 except (IndexError, ValueError, AttributeError): frame_keypoints = [(0, 0)] * 32 if len(frame_keypoints) < n_keypoints: frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints))) else: frame_keypoints = frame_keypoints[:n_keypoints] keypoints.append(frame_keypoints) return keypoints def _fix_keypoints(kps: list, n: int) -> list: if len(kps) < n: kps += [(0, 0)] * (n - len(kps)) elif len(kps) > n: kps = kps[:n] if kps[2] != (0,0) and kps[4] != (0,0) and kps[3] == (0,0): kps[3] = kps[4]; kps[4] = (0,0) if kps[0] != (0,0) and kps[4] != (0,0) and kps[1] == (0,0): kps[1] = kps[4]; kps[4] = (0,0) if kps[2] != (0,0) and kps[3] != (0,0) and kps[1] == (0,0) and kps[3][0] > kps[2][0]: kps[1] = kps[3]; kps[3] = (0,0) if kps[28] != (0,0) and kps[25] == (0,0) and kps[26] != (0,0) and kps[26][0] > kps[28][0]: kps[25] = kps[28]; kps[28] = (0,0) if kps[24] != (0,0) and kps[28] != (0,0) and kps[25] == (0,0): kps[25] = kps[28]; kps[28] = (0,0) if kps[24] != (0,0) and kps[27] != (0,0) and kps[26] == (0,0): kps[26] = kps[27]; kps[27] = (0,0) if kps[28] != (0,0) and kps[23] == (0,0) and kps[20] != (0,0) and kps[20][1] > kps[23][1]: kps[23] = kps[20]; kps[20] = (0,0) return kps def _keypoints_to_float(keypoints: list) -> List[List[float]]: """Convert keypoints to [[x, y], ...] float format for homography.""" return [[float(x), float(y)] for x, y in keypoints] def _keypoints_to_int(keypoints: list) -> List[Tuple[int, int]]: """Convert keypoints to [(x, y), ...] integer format.""" return [(int(round(float(kp[0]))), int(round(float(kp[1])))) for kp in keypoints] # --- fix_keypoints_pri: select best keypoint config per frame from multiple candidates --- _FKP_KEYPOINTS: List[Tuple[int, int]] = [ (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), (55, 250), (55, 430), (110, 340), (165, 140), (165, 270), (165, 410), (165, 540), (527, 5), (527, 253), (527, 433), (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), (940, 340), (998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540), (1045, 675), (435, 340), (615, 340), ] _FKP_KEYPOINTS_NP = np.asarray(_FKP_KEYPOINTS, dtype=np.float32) _FKP_GROUPS = { 1: [2, 3, 7, 10], 2: [1, 3, 7, 10], 3: [2, 4, 7, 8], 4: [3, 5, 8, 7], 5: [4, 8, 6, 3], 6: [5, 4, 8, 13], 7: [3, 8, 9, 10], 8: [4, 7, 9, 13], 9: [7, 8, 11, 12], 10: [9, 11, 7, 2], 11: [9, 10, 12, 31], 12: [9, 11, 13, 31], 13: [9, 12, 8, 5], 14: [15, 31, 32, 16], 15: [31, 16, 32, 14], 16: [31, 15, 32, 17], 17: [31, 16, 32, 15], 18: [19, 22, 23, 26], 19: [18, 22, 20, 32], 20: [19, 22, 21, 32], 21: [20, 22, 24, 29], 22: [23, 24, 19, 20], 23: [27, 24, 22, 28], 24: [28, 23, 22, 27], 25: [26, 27, 23, 18], 26: [25, 27, 23, 18], 27: [26, 23, 28, 24], 28: [27, 24, 29, 23], 29: [28, 30, 24, 21], 30: [29, 28, 24, 21], 31: [15, 16, 32, 14], 32: [15, 31, 16, 14], } _FKP_GROUPS_ARRAY = [np.asarray(_FKP_GROUPS[i], dtype=np.int32) - 1 for i in range(1, 33)] _FKP_BLACKLISTS = [ [23, 24, 27, 28], [7, 8, 3, 4], [2, 10, 1, 14], [18, 26, 14, 25], [5, 13, 6, 17], [21, 29, 17, 30], [10, 11, 2, 3], [10, 11, 2, 7], [12, 13, 4, 5], [12, 13, 5, 8], [18, 19, 26, 27], [18, 19, 26, 23], [20, 21, 24, 29], [20, 21, 28, 29], [8, 4, 5, 13], [3, 7, 2, 10], [23, 27, 18, 26], [24, 28, 21, 29], ] _FKP_PREPARED_BLACKLISTS = [(set(bl), bl[0] - 1, bl[1] - 1) for bl in _FKP_BLACKLISTS] _FKP_DILATE_KERNEL = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) _FKP_KERNEL_31 = cv2.getStructuringElement(cv2.MORPH_RECT, (31, 31)) _FKP_TEMPLATE_GRAY: Optional[ndarray] = None _FKP_SHARED_EXECUTOR: Optional[ThreadPoolExecutor] = None _FKP_PER_KEY_LOCKS: Dict[Any, threading.Lock] = defaultdict(threading.Lock) class _FKP_MaxSizeCache(OrderedDict): def __init__(self, maxlen: int = 500): super().__init__() self.maxlen = maxlen self._lock = threading.Lock() def set(self, k: Any, v: Any) -> None: with self._lock: if k in self: self.move_to_end(k) self[k] = v if len(self) > self.maxlen: self.popitem(last=False) def get(self, k: Any) -> Any: with self._lock: return super().get(k) def exists(self, k: Any) -> bool: with self._lock: return k in self _FKP_CACHED = _FKP_MaxSizeCache() def _fkp_load_template_gray() -> ndarray: global _FKP_TEMPLATE_GRAY if _FKP_TEMPLATE_GRAY is None: template_path = Path(__file__).parent / "football_pitch_template.png" img = cv2.imread(str(template_path), cv2.IMREAD_COLOR) if img is not None: _FKP_TEMPLATE_GRAY = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) else: _FKP_TEMPLATE_GRAY = np.zeros((680, 1050), dtype=np.uint8) return _FKP_TEMPLATE_GRAY def _fkp_get_or_compute_masks(key: Any, compute_fn: Any) -> Any: lock = _FKP_PER_KEY_LOCKS[key] with lock: if _FKP_CACHED.exists(key): return _FKP_CACHED.get(key) masks = compute_fn() _FKP_CACHED.set(key, masks) return masks def _fkp_canonical(obj: Any) -> Any: if isinstance(obj, np.ndarray): return _fkp_canonical(obj.tolist()) if isinstance(obj, (list, tuple)): return tuple(_fkp_canonical(x) for x in obj) if isinstance(obj, set): return tuple(sorted(_fkp_canonical(x) for x in obj)) if isinstance(obj, dict): return tuple((k, _fkp_canonical(v)) for k, v in sorted(obj.items())) return obj def _fkp_are_collinear(pts: Any, eps: float = 1e-9) -> bool: pts = np.asarray(pts) if len(pts) < 3: return True a, b, c = pts[:3] area = np.abs(np.cross(b - a, c - a)) return bool(area < eps) def _fkp_unique_points(src: Any, dst: Any) -> Any: src, dst = np.asarray(src, float), np.asarray(dst, float) src_nonzero = ~np.all(np.abs(src) < 1e-9, axis=1) dst_nonzero = ~np.all(np.abs(dst) < 1e-9, axis=1) valid_mask = src_nonzero & dst_nonzero if not valid_mask.any(): return np.array([]), np.array([]) src_valid = src[valid_mask] dst_valid = dst[valid_mask] _, unique_idx = np.unique(src_valid, axis=0, return_index=True) unique_idx.sort() return src_valid[unique_idx], dst_valid[unique_idx] def _fkp_apply_transform(M: ndarray, P: Any) -> Tuple[int, int]: x, y = P[0], P[1] return (int(M[0, 0] * x + M[0, 1] * y + M[0, 2]), int(M[1, 0] * x + M[1, 1] * y + M[1, 2])) def _fkp_apply_homo_transform(M: ndarray, P: Any) -> Tuple[int, int]: x, y = P[0], P[1] w = M[2, 0] * x + M[2, 1] * y + M[2, 2] x_new = (M[0, 0] * x + M[0, 1] * y + M[0, 2]) / w y_new = (M[1, 0] * x + M[1, 1] * y + M[1, 2]) / w return (int(x_new), int(y_new)) def _fkp_affine_from_4_points(src_pts: Any, dst_pts: Any) -> ndarray: P, Q = np.array(src_pts, dtype=np.float64), np.array(dst_pts, dtype=np.float64) x, y = P[:, 0], P[:, 1] u, v = Q[:, 0], Q[:, 1] A = np.zeros((8, 6), dtype=np.float64) A[0::2, 0], A[0::2, 1], A[0::2, 2] = x, y, 1 A[1::2, 3], A[1::2, 4], A[1::2, 5] = x, y, 1 b = np.empty(8, dtype=np.float64) b[0::2], b[1::2] = u, v params, _, _, _ = np.linalg.lstsq(A, b, rcond=None) a, b_, e, c, d, f = params return np.array([[a, b_, e], [c, d, f], [0, 0, 1]], dtype=np.float64) def _fkp_four_point_homography(src_pts: Any, dst_pts: Any) -> ndarray: src, dst = np.array(src_pts, dtype=np.float64), np.array(dst_pts, dtype=np.float64) x, y = src[:, 0], src[:, 1] u, v = dst[:, 0], dst[:, 1] A = np.zeros((8, 9), dtype=np.float64) A[0::2, 0], A[0::2, 1], A[0::2, 2] = -x, -y, -1 A[0::2, 6], A[0::2, 7], A[0::2, 8] = x * u, y * u, u A[1::2, 3], A[1::2, 4], A[1::2, 5] = -x, -y, -1 A[1::2, 6], A[1::2, 7], A[1::2, 8] = x * v, y * v, v _, _, Vt = np.linalg.svd(A) h = Vt[-1, :] return (h.reshape(3, 3) / h[8]).astype(np.float64) def _fkp_three_point_affine(P: Any, Q: Any) -> ndarray: P, Q = np.array(P, dtype=np.float64), np.array(Q, dtype=np.float64) x, y = P[:, 0], P[:, 1] u, v = Q[:, 0], Q[:, 1] n = P.shape[0] A = np.zeros((2 * n, 6), dtype=np.float64) A[0::2, 0], A[0::2, 1], A[0::2, 2] = x, y, 1 A[1::2, 3], A[1::2, 4], A[1::2, 5] = x, y, 1 b = np.empty(2 * n, dtype=np.float64) b[0::2], b[1::2] = u, v params, _, _, _ = np.linalg.lstsq(A, b, rcond=None) a, b_, e, c, d, f = params return np.array([[a, b_, e], [c, d, f], [0, 0, 1]], dtype=np.float64) def _fkp_line_to_line_transform(P1: Any, P2: Any, Q1: Any, Q2: Any) -> ndarray: P1, P2 = np.asarray(P1, dtype=np.float64), np.asarray(P2, dtype=np.float64) Q1, Q2 = np.asarray(Q1, dtype=np.float64), np.asarray(Q2, dtype=np.float64) v_s, v_t = P2 - P1, Q2 - Q1 s = np.hypot(v_t[0], v_t[1]) / (np.hypot(v_s[0], v_s[1]) + 1e-12) theta = np.arctan2(v_t[1], v_t[0]) - np.arctan2(v_s[1], v_s[0]) c, s_ = np.cos(theta), np.sin(theta) return np.array([ [s * c, -s * s_, Q1[0] - (s * c * P1[0] - s * s_ * P1[1])], [s * s_, s * c, Q1[1] - (s * s_ * P1[0] + s * c * P1[1])], [0, 0, 1] ], dtype=np.float64) def _fkp_robust_transform(src_pts: Any, dst_pts: Any) -> Any: src, dst = _fkp_unique_points(src_pts, dst_pts) n = len(src) if n >= 4: if _fkp_are_collinear(src) or _fkp_are_collinear(dst): H = _fkp_affine_from_4_points(src, dst) return lambda pt: _fkp_apply_transform(H, pt) H = _fkp_four_point_homography(src, dst) return lambda pt: _fkp_apply_homo_transform(H, pt) elif n == 3: H = _fkp_three_point_affine(src, dst) return lambda pt: _fkp_apply_transform(H, pt) elif n == 2: H = _fkp_line_to_line_transform(src[0], src[1], dst[0], dst[1]) return lambda pt: _fkp_apply_transform(H, pt) elif n == 1: H = np.eye(3) H[:2, 2] = dst[0] - src[0] return lambda pt: _fkp_apply_transform(H, pt) return lambda pt: _fkp_apply_transform(np.eye(3), pt) def _fkp_pick_pt(points: Any) -> List[int]: if not points: return [] pts_arr = np.asarray(points, dtype=np.int32) seen = np.zeros(32, dtype=bool) valid_mask = (pts_arr >= 0) & (pts_arr < 32) seen[pts_arr[valid_mask]] = True out_seen = np.zeros(32, dtype=bool) out: List[int] = [] for p in pts_arr[valid_mask]: neigh = _FKP_GROUPS_ARRAY[p] candidates = neigh[~seen[neigh] & ~out_seen[neigh]] out_seen[candidates] = True out.extend(candidates.tolist()) return out def _fkp_is_include(kp: Any, all_kps: Any) -> bool: for kps in all_kps: if np.sum(np.abs(np.array(kps) - np.array(kp))) <= 2: return True return False def _fkp_get_edge_mask(x: float, y: float, W: int, H: int, t: int = 100) -> int: mask = 0 if x <= t: mask |= 1 if x >= W - t: mask |= 2 if y <= t: mask |= 4 if y >= H - t: mask |= 8 return mask def _fkp_both_points_same_direction_fast(A: Any, B: Any, W: int, H: int, t: int = 100) -> bool: mask_a = _fkp_get_edge_mask(A[0], A[1], W, H, t) if mask_a == 0: return False mask_b = _fkp_get_edge_mask(B[0], B[1], W, H, t) return (mask_a & mask_b) != 0 def _fkp_project_image(image: ndarray, src_kps: Any, dst_kps: Any, w: int, h: int) -> ndarray: src_arr = np.array(src_kps, dtype=np.float32) dst_arr = np.array(dst_kps, dtype=np.float32) valid_mask = ~((dst_arr[:, 0] == 0) & (dst_arr[:, 1] == 0)) H, _ = cv2.findHomography(src_arr[valid_mask], dst_arr[valid_mask]) if H is None: raise ValueError("Homography not found") return cv2.warpPerspective(image, H, (w, h)) def _fkp_extract_masks(image: ndarray) -> tuple: gray = image if image.ndim == 2 else cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) _, mask_ground = cv2.threshold(gray, 10, 1, cv2.THRESH_BINARY) _, mask_lines = cv2.threshold(gray, 200, 1, cv2.THRESH_BINARY) return mask_ground, mask_lines def _fkp_convert_to_gray(image: ndarray) -> ndarray: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) gray = cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, _FKP_KERNEL_31) gray = cv2.GaussianBlur(gray, (5, 5), 0) return cv2.Canny(gray, 30, 100) def _fkp_evaluate_keypoints_for_frame( frame_keypoints: Any, frame_index: int, h: int, w: int, check_frame_list: List[ndarray], precomputed_key: Any = None ) -> float: key = precomputed_key or _fkp_canonical((frame_keypoints, w, h)) floor_markings = _fkp_load_template_gray() def compute_masks(fkp: Any, ww: int, hh: int) -> Any: try: non_idxs_set = {i + 1 for i, kpt in enumerate(fkp) if kpt[0] != 0 or kpt[1] != 0} for bl_set, idx0, idx1 in _FKP_PREPARED_BLACKLISTS: if non_idxs_set.issubset(bl_set): if _fkp_both_points_same_direction_fast(fkp[idx0], fkp[idx1], ww, hh): return None, 0, None warped = _fkp_project_image(floor_markings, _FKP_KEYPOINTS, fkp, ww, hh) mask_ground, mask_lines = _fkp_extract_masks(warped) ys, xs = np.where(mask_lines == 1) if len(xs) == 0: bbox = None else: bbox = (xs.min(), ys.min(), xs.max(), ys.max()) bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) if bbox else 1 if (bbox_area / (hh * ww)) < 0.2: return None, 0, None return mask_lines, int(cv2.countNonZero(mask_lines)), mask_ground except Exception: return None, 0, None try: mask_exp, pixels_on_lines, mask_ground = _fkp_get_or_compute_masks( key, lambda: compute_masks(frame_keypoints, w, h) ) if mask_exp is None or pixels_on_lines == 0 or mask_ground is None: return 0.0 if frame_index >= len(check_frame_list): return 0.0 scale = max(1, _FKP_EVAL_DOWNSCALE) if scale > 1 and h > scale and w > scale: h_s, w_s = h // scale, w // scale frame_s = cv2.resize(check_frame_list[frame_index], (w_s, h_s), interpolation=cv2.INTER_AREA) mask_ground_s = cv2.resize(mask_ground, (w_s, h_s), interpolation=cv2.INTER_NEAREST) mask_exp_s = cv2.resize(mask_exp, (w_s, h_s), interpolation=cv2.INTER_NEAREST) pixels_on_lines = cv2.countNonZero(mask_exp_s) if pixels_on_lines == 0: return 0.0 work_buffer = np.zeros((h_s, w_s), dtype=np.uint8) cv2.bitwise_and(frame_s, frame_s, dst=work_buffer, mask=mask_ground_s) cv2.dilate(work_buffer, _FKP_DILATE_KERNEL, dst=work_buffer, iterations=2) cv2.threshold(work_buffer, 0, 255, cv2.THRESH_BINARY, dst=work_buffer) pixels_predicted = cv2.countNonZero(work_buffer) cv2.bitwise_and(work_buffer, mask_exp_s, dst=work_buffer) pixels_overlapping = cv2.countNonZero(work_buffer) else: work_buffer = np.zeros((h, w), dtype=np.uint8) cv2.bitwise_and(check_frame_list[frame_index], check_frame_list[frame_index], dst=work_buffer, mask=mask_ground) cv2.dilate(work_buffer, _FKP_DILATE_KERNEL, dst=work_buffer, iterations=3) cv2.threshold(work_buffer, 0, 255, cv2.THRESH_BINARY, dst=work_buffer) pixels_predicted = cv2.countNonZero(work_buffer) cv2.bitwise_and(work_buffer, mask_exp, dst=work_buffer) pixels_overlapping = cv2.countNonZero(work_buffer) pixels_rest = pixels_predicted - pixels_overlapping total_pixels = pixels_predicted + pixels_on_lines - pixels_overlapping if total_pixels > 0 and (pixels_rest / total_pixels) > 0.9: return 0.0 return pixels_overlapping / (pixels_on_lines + 1e-8) except Exception: pass return 0.0 def _fkp_make_possible_keypoints(all_keypoints: Any, frame_width: int, frame_height: int, limit: int | None = None) -> List[Any]: if not all_keypoints: return [] max_candidates = limit if limit is not None else _FKP_MAX_CANDIDATES_PER_FRAME results: List[Any] = [] for keypoints in all_keypoints: if len(results) >= max_candidates: break kps = _keypoints_to_int(keypoints) arr = np.asarray(kps, dtype=np.int32) if arr.ndim != 2 or arr.shape[1] != 2: continue mask = (arr[:, 0] != 0) & (arr[:, 1] != 0) non_zero_count = int(mask.sum()) if non_zero_count > 4: if not _fkp_is_include(kps, results): results.append(kps) continue if non_zero_count < 2: continue # Only use actually detected keypoints; do not add projected/inferred points if not _fkp_is_include(kps, results): results.append(kps) return results def _fkp_get_executor(max_workers: int) -> ThreadPoolExecutor: global _FKP_SHARED_EXECUTOR if _FKP_SHARED_EXECUTOR is None: _FKP_SHARED_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers) return _FKP_SHARED_EXECUTOR def _fkp_evaluates( jobs: Any, h: int, w: int, total_frames: int, time_left: float, check_frame_list: List[ndarray] ) -> List[Any]: start = time.time() results = [[(0, 0)] * 32 for _ in range(total_frames)] if len(jobs) == 0: return results unique_jobs: List[Any] = [] seen: set = set() for (job, frame_index) in jobs: try: key_bytes = np.asarray(job, dtype=np.int32).tobytes() if not isinstance(job, np.ndarray) else (job.astype(np.int32).tobytes() if job.dtype != np.int32 else job.tobytes()) sig = (frame_index, key_bytes) if sig in seen: continue seen.add(sig) unique_jobs.append((job, frame_index, key_bytes)) except Exception: continue if len(unique_jobs) <= 10: scores_unique = [ _fkp_evaluate_keypoints_for_frame(job, frame_index, h, w, check_frame_list, (key_bytes, w, h)) for (job, frame_index, key_bytes) in unique_jobs ] else: cpu_count = max(1, (os.cpu_count() or 1)) max_workers = min(max(2, cpu_count), 8) chunk_size = 24 scores_unique = [] ex = _fkp_get_executor(max_workers) time_left -= (time.time() - start) for i in range(0, len(unique_jobs), chunk_size): start = time.time() chunk = unique_jobs[i : min(i + chunk_size, len(unique_jobs))] scores_unique.extend(ex.map( lambda pair: _fkp_evaluate_keypoints_for_frame(pair[0], pair[1], h, w, check_frame_list, (pair[2], w, h)), chunk, )) time_left -= (time.time() - start) if time_left <= 0: unique_jobs = unique_jobs[: min(i + chunk_size, len(unique_jobs))] break scores = np.full(total_frames, 0.0, dtype=np.float32) for score, (k, frame_index, _) in zip(scores_unique, unique_jobs): if score > scores[frame_index]: scores[frame_index] = score results[frame_index] = k return results def _fkp_normalize_results(frame_results: Any, threshold: float) -> List[Any]: if not frame_results: return [] results_array: List[Any] = [] for result in frame_results: pad_len = 32 - len(result) if pad_len > 0: result = list(result) + [(0, 0, 0.0)] * pad_len result = result[:32] arr = np.array(result, dtype=np.float32) if arr.size == 0: results_array.append([(0, 0)] * 32) continue if arr.ndim == 2 and arr.shape[1] >= 3: mask = arr[:, 2] > threshold scaled = np.where(mask[:, None], arr[:, :2].copy(), 0) results_array.append([(int(x), int(y)) for x, y in scaled]) else: results_array.append([(0, 0)] * 32) return results_array def fix_keypoints_pri( results_frames: Any, frame_width: int, frame_height: int, time_left: float, check_frame_list: List[ndarray] ) -> List[Any]: start = time.time() max_frames = len(results_frames) all_possible = [None] * max_frames for i in range(max_frames): all_possible[i] = _fkp_make_possible_keypoints(results_frames[i], frame_width, frame_height) default_kps: List[Any] = [] for i in range(len(all_possible)): default_kps.append(all_possible[i][0] if all_possible[i] else [(0, 0)] * 32) total_jobs: List[Any] = [] is_end = [0] * len(all_possible) while is_end.count(-1) != len(is_end): for frame_index in range(max_frames): if is_end[frame_index] == -1: continue if is_end[frame_index] == len(all_possible[frame_index]): is_end[frame_index] = -1 continue total_jobs.append((all_possible[frame_index][is_end[frame_index]], frame_index)) is_end[frame_index] += 1 time_left -= (time.time() - start) if time_left <= 0: return default_kps return _fkp_evaluates(total_jobs, frame_height, frame_width, max_frames, time_left, check_frame_list) def _step8_one_frame_kp( kps: list, frame_width: int, frame_height: int, fill_missing: bool, n_keypoints: int = 32, ) -> Optional[List[List[float]]]: """Step8 (example_miner _z1): homography from template to frame, project all template points, optionally fill missing.""" if not isinstance(kps, list) or len(kps) != n_keypoints or frame_width <= 0 or frame_height <= 0: return None if n_keypoints != 32 or len(TEMPLATE_F0) != 32 or len(TEMPLATE_F1) != 32: return None filtered_src: List[Tuple[float, float]] = [] filtered_dst: List[Tuple[float, float]] = [] valid_indices: List[int] = [] for idx, kp in enumerate(kps): if not isinstance(kp, (list, tuple)) or len(kp) < 2: continue try: x, y = float(kp[0]), float(kp[1]) except (TypeError, ValueError): continue if x == 0.0 and y == 0.0: continue if idx >= len(TEMPLATE_F1): continue filtered_src.append(TEMPLATE_F1[idx]) filtered_dst.append((x, y)) valid_indices.append(idx) if len(filtered_src) < 4: return None src_np = np.array(filtered_src, dtype=np.float32) dst_np = np.array(filtered_dst, dtype=np.float32) H_corrected, _ = cv2.findHomography(src_np, dst_np) if H_corrected is None: return None fk_np = np.array(TEMPLATE_F0, dtype=np.float32).reshape(1, -1, 2) projected_np = cv2.perspectiveTransform(fk_np, H_corrected)[0] valid_indices_set = set(valid_indices) adjusted_kps: List[List[float]] = [[0.0, 0.0] for _ in range(n_keypoints)] for idx in range(n_keypoints): x, y = float(projected_np[idx][0]), float(projected_np[idx][1]) if not (0 <= x < frame_width and 0 <= y < frame_height): continue if fill_missing or idx in valid_indices_set: adjusted_kps[idx] = [x, y] return adjusted_kps def _apply_homography_refinement( keypoints: List[List[float]], frame: np.ndarray, n_keypoints: int, ) -> List[List[float]]: """Refine keypoints using homography from template to frame (new-5 style).""" if n_keypoints != 32 or len(TEMPLATE_F0) != 32 or len(TEMPLATE_F1) != 32: return keypoints frame_height, frame_width = frame.shape[:2] valid_src: List[Tuple[float, float]] = [] valid_dst: List[Tuple[float, float]] = [] valid_indices: List[int] = [] for kp_idx, kp in enumerate(keypoints): if kp and len(kp) >= 2: x, y = float(kp[0]), float(kp[1]) if not (abs(x) < 1e-6 and abs(y) < 1e-6) and 0 <= x < frame_width and 0 <= y < frame_height: valid_src.append(TEMPLATE_F1[kp_idx]) valid_dst.append((x, y)) valid_indices.append(kp_idx) if len(valid_src) < 4: return keypoints src_pts = np.array(valid_src, dtype=np.float32) dst_pts = np.array(valid_dst, dtype=np.float32) H, _ = cv2.findHomography(src_pts, dst_pts) if H is None: return keypoints all_template_points = np.array(TEMPLATE_F0, dtype=np.float32).reshape(-1, 1, 2) adjusted_points = cv2.perspectiveTransform(all_template_points, H) adjusted_points = adjusted_points.reshape(-1, 2) adj_x = adjusted_points[:32, 0] adj_y = adjusted_points[:32, 1] valid_mask = (adj_x >= 0) & (adj_y >= 0) & (adj_x < frame_width) & (adj_y < frame_height) valid_indices_set = set(valid_indices) adjusted_kps: List[List[float]] = [[0.0, 0.0] for _ in range(32)] for i in np.where(valid_mask)[0]: if not HOMOGRAPHY_FILL_ONLY_VALID or i in valid_indices_set: adjusted_kps[i] = [float(adj_x[i]), float(adj_y[i])] return adjusted_kps def _c1(keypoints: list) -> list: return [[round(float(x), 1), round(float(y), 1)] for x, y in keypoints] def _l0(model_dir: Path, device: str | None = None, config_name: str = "hrnetv2_w48.yaml", weights_subdir: str | None = None) -> nn.Module: if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" config_path = model_dir / config_name weights_path = (model_dir / weights_subdir / "keypoint") if weights_subdir else (model_dir / "keypoint") if not config_path.exists(): raise FileNotFoundError(f"Keypoint config not found: {config_path}") if not weights_path.exists(): raise FileNotFoundError(f"Keypoint weights not found: {weights_path}") with open(config_path) as f: cfg = yaml.safe_load(f) loaded = torch.load(weights_path, map_location=device, weights_only=False) state = loaded.get("state_dict", loaded) if isinstance(loaded, dict) else loaded if not isinstance(state, dict): raise ValueError(f"Keypoint weights must be state_dict or dict with 'state_dict'; got {type(state)}") if state and next(iter(state.keys()), "").startswith("module."): state = {k.replace("module.", "", 1): v for k, v in state.items()} def _remap_head(k: str) -> str: if k.startswith("head.0."): return "head." + k[7:] return k state = {_remap_head(k): v for k, v in state.items()} model = _g0(cfg) model.load_state_dict(state, strict=True) model.to(device) model.eval() return model _C0 = 0 _C1 = 1 _C2 = 2 _C3 = 3 _CLS_TO_VALIDATOR: dict[int, int] = {_C2: 0, _C3: 1, _C1: 2, _C0: 3} _B0: float = 0.25 _B1: bool = True _B2: bool = False _B3: bool = False _B4: bool = False _B5: bool = True _D0 = 640 _D0_PERSON = 640 _TRACK_IOU_THRESH = 0.3 _TRACK_IOU_HIGH = 0.4 _TRACK_IOU_LOW = 0.2 _TRACK_MAX_AGE = 3 _TRACK_USE_VELOCITY = True _D1 = 0.3 _T0 = 0.5 _R0 = 5 _R1 = 0.10 _R2 = 0.70 _q0 = 0.0 _q1 = 0.0 _P0 = True _E0: bool = True _E1: bool = True _BX_BS: bool = 16 _KP_BS: int = 16 _A0: bool = False _S0 = 8 _G0: bool = True _G1 = 5 _G2 = 4 _G3 = 3 _G6: bool = False _G7: bool = True _G5: bool = True _G8: bool = True ENABLE_KEYPOINT_CONVERT: bool = False _U0 = ENABLE_KEYPOINT_CONVERT _J0 = True _J1 = True _J2: list[float] = [0.3, 0.5] _J3: int = 20 _J4 = True _J5: float = 50.0 _J6: int = 2 _W0: list[int] = [4, 9, 10, 11, 12, 17, 18, 19, 20, 28] _W1: list[int] = [13, 14, 15] _W2: list[int] = [5, 16, 29] _W3: list[int] = [4, 9, 10, 11, 12, 17, 18, 19, 20, 28] _W4: list[int] = [13, 14, 15] _W5: list[int] = [5, 16, 29] _KP16_WEIGHT: int = 8 _INDICES_H3_VS_H1: set[int] = {5, 13, 14, 15, 16, 29} _INDICES_H3_VS_H2: set[int] = {4, 9, 10, 11, 12, 17, 18, 19, 20, 28} _ALWAYS_INCLUDE_INDICES: tuple[int, ...] = (5, 16, 29) _MASK_RETRY_ERRORS: tuple[str, ...] = ("A projected line is too wide", "Projected ground should not be rectangular") # Keypoint refinement speed/quality _FKP_FAST_MODE: bool = True _FKP_THRESHOLDS: tuple[float, ...] = (0.2, 0.4, 0.6, 0.8) _FKP_SINGLE_THRESHOLD: float = 0.4 _FKP_MAX_CANDIDATES_PER_FRAME: int = 2 _FKP_TIME_BUDGET_SEC: float = 2.5 _FKP_EVAL_DOWNSCALE: int = 2 _Z8_MIN_BATCH_FRAMES: int = 6 _Z8_MAX_PROBLEMATIC_PER_BATCH: int = 8 _STEP0_ENABLED: bool = True _STEP0_PROXIMITY_PX: float = 30.0 _STEP5_2_RIGHT_QUAD_HALFLENGTH: float = 200.0 _STEP5_2_8PX_COARSE_STEP: int = 10 _STEP5_2_8PX_REFINE_WINDOW: int = 10 _STEP5_2_ROI_MARGIN: int = 10 _STEP5_2_LONGEST_SEGMENT_MAX_PTS: int = 28 _STEP5_2_8PX_HALFRES: bool = True _STEP5_2_8PX_REFINE_PASS: bool = True _STEP5_2_HEAVY_SEARCH_FLAG: bool = True _F0: list[tuple[float, float]] = [ (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), (55, 250), (55, 430), (110, 340), (165, 140), (165, 270), (165, 410), (165, 540), (527, 5), (527, 253), (527, 433), (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), (940, 340), (998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540), (1045, 675), (435, 340), (615, 340), ] _F1: list[tuple[float, float]] = [ (2.5, 2.5), (2.5, 139.5), (2.5, 249.5), (2.5, 430.5), (2.5, 540.5), (2.5, 678.0), (54.5, 249.5), (54.5, 430.5), (110.5, 340.5), (164.5, 139.5), (164.5, 269.0), (164.5, 411.0), (164.5, 540.5), (525.0, 2.5), (525.0, 249.5), (525.0, 430.5), (525.0, 678.0), (886.5, 139.5), (886.5, 269.0), (886.5, 411.0), (886.5, 540.5), (940.5, 340.5), (998.0, 249.5), (998.0, 430.5), (1048.0, 2.5), (1048.0, 139.5), (1048.0, 249.5), (1048.0, 430.5), (1048.0, 540.5), (1048.0, 678.0), (434.5, 340.0), (615.5, 340.0), ] _I0 = 5 _I1 = 29 _I2 = 0 _I3 = 24 _N0 = len(_F0) def _step0_remove_close_keypoints(kps: list[list[float]], proximity_px: float = 30.0) -> int: n = len(kps) if n == 0: return 0 def _valid(i: int) -> bool: if i >= n or not isinstance(kps[i], (list, tuple)) or len(kps[i]) < 2: return False x, y = float(kps[i][0]), float(kps[i][1]) return not (x == 0.0 and y == 0.0) valid_indices = [i for i in range(n) if _valid(i)] if len(valid_indices) < 2: return 0 to_remove: set[int] = set() for ii in range(len(valid_indices)): a = valid_indices[ii] ax, ay = float(kps[a][0]), float(kps[a][1]) for jj in range(ii + 1, len(valid_indices)): b = valid_indices[jj] bx, by = float(kps[b][0]), float(kps[b][1]) if math.hypot(ax - bx, ay - by) <= proximity_px: to_remove.add(a) to_remove.add(b) for idx in to_remove: kps[idx] = [0.0, 0.0] return len(to_remove) class _Xe(Exception): pass def _y0() -> ndarray: template_path = Path(__file__).parent / "football_pitch_template.png" img = cv2.imread(str(template_path), cv2.IMREAD_COLOR) if img is None: return np.zeros((720, 1280, 3), dtype=np.uint8) return img def _y1(mask: ndarray) -> bool: contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for cnt in contours: _, _, w, h = cv2.boundingRect(cnt) if w == 0 or h == 0: continue if min(w, h) / max(w, h) >= 1.0: return True return False def _y2(ground_mask: ndarray, line_mask: ndarray) -> None: if ground_mask.sum() == 0: raise _Xe("No projected ground (empty mask)") pts = cv2.findNonZero(ground_mask) if pts is None: raise _Xe("No projected ground (empty mask)") _, _, w, h = cv2.boundingRect(pts) if cv2.countNonZero(ground_mask) == w * h: raise _Xe("Projected ground should not be rectangular") n_labels, _ = cv2.connectedComponents(ground_mask) if n_labels - 1 > 1: raise _Xe("Projected ground should be a single object") if ground_mask.sum() / ground_mask.size >= 0.9: raise _Xe("Projected ground covers too much of the image") if line_mask.sum() == 0: raise _Xe("No projected lines") if line_mask.sum() == line_mask.size: raise _Xe("Projected lines cover the entire image") if _y1(line_mask): raise _Xe("A projected line is too wide") def _y3(pts: ndarray) -> bool: def _ccw(a: tuple, b: tuple, c: tuple) -> bool: return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0]) def _intersect(p1: tuple, p2: tuple, q1: tuple, q2: tuple) -> bool: return (_ccw(p1, q1, q2) != _ccw(p2, q1, q2)) and (_ccw(p1, p2, q1) != _ccw(p1, p2, q2)) p = pts.reshape(-1, 2) if len(p) < 4: return False edges = [(p[0], p[1]), (p[1], p[2]), (p[2], p[3]), (p[3], p[0])] return _intersect(*edges[0], *edges[2]) or _intersect(*edges[1], *edges[3]) def _y4( template: ndarray, src_kps: list[tuple[float, float]], dst_kps: list[tuple[float, float]], frame_width: int, frame_height: int, ) -> ndarray: src = np.array(src_kps, dtype=np.float32) dst = np.array(dst_kps, dtype=np.float32) H, _ = cv2.findHomography(src, dst) if H is None: raise ValueError("Homography computation failed") warped = cv2.warpPerspective(template, H, (frame_width, frame_height)) corner_indices = [_I0, _I1, _I3, _I2] if len(src_kps) > max(corner_indices): src_corners = np.array( [[src_kps[i][0], src_kps[i][1]] for i in corner_indices], dtype=np.float32, ).reshape(1, 4, 2) proj_corners = cv2.perspectiveTransform(src_corners, H)[0] if _y3(proj_corners): raise _Xe("Projection twisted!") return warped def _y5(warped: ndarray) -> tuple[ndarray, ndarray]: gray = cv2.cvtColor(warped, cv2.COLOR_BGR2GRAY) _, m_ground = cv2.threshold(gray, 10, 255, cv2.THRESH_BINARY) _, m_lines = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY) ground_bin = (m_ground > 0).astype(np.uint8) lines_bin = (m_lines > 0).astype(np.uint8) _y2(ground_bin, lines_bin) return ground_bin, lines_bin def _y6(frame: ndarray, ground_mask: ndarray) -> ndarray: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (31, 31)) gray = cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel) gray = cv2.GaussianBlur(gray, (5, 5), 0) edges = cv2.Canny(gray, 30, 100) edges_on_ground = cv2.bitwise_and(edges, edges, mask=ground_mask) dilate_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) edges_on_ground = cv2.dilate(edges_on_ground, dilate_kernel, iterations=3) return (edges_on_ground > 0).astype(np.uint8) def _fit_line_to_points(points: list[tuple[float, float]]) -> tuple[float, float, float] | None: if len(points) < 2: return None pts = np.array(points, dtype=np.float64) x = pts[:, 0] y = pts[:, 1] mx, my = float(x.mean()), float(y.mean()) u = x - mx v = y - my n = len(pts) cxx = (u * u).sum() / n cxy = (u * v).sum() / n cyy = (v * v).sum() / n trace = cxx + cyy diff = cxx - cyy lambda_small = (trace - np.sqrt(diff * diff + 4.0 * cxy * cxy)) * 0.5 a = float(cxy) b = float(lambda_small - cxx) norm = np.sqrt(a * a + b * b) if norm < 1e-12: a, b = 1.0, 0.0 else: a, b = a / norm, b / norm c = -(a * mx + b * my) return (a, b, c) def _line_intersection( a1: float, b1: float, c1: float, a2: float, b2: float, c2: float, ) -> tuple[float, float] | None: det = a1 * b2 - a2 * b1 if abs(det) < 1e-12: return None x = (b1 * c2 - b2 * c1) / det y = (a2 * c1 - a1 * c2) / det return (float(x), float(y)) def _line_through_two_points(x1: float, y1: float, x2: float, y2: float) -> tuple[float, float, float]: a = y2 - y1 b = -(x2 - x1) c = (x2 - x1) * y1 - (y2 - y1) * x1 return (a, b, c) def _frame_line_edges(frame: ndarray) -> ndarray: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (31, 31)) gray = cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel) gray = cv2.GaussianBlur(gray, (5, 5), 0) return cv2.Canny(gray, 30, 100) def _dilate_uint8_full_frame(frame: ndarray) -> ndarray: edges = _frame_line_edges(frame) dilate_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) dilated = cv2.dilate(edges, dilate_kernel, iterations=3) return ((dilated > 0).astype(np.uint8)) * 255 def _clip_segment_to_rect( x1: float, y1: float, x2: float, y2: float, w: int, h: int, ) -> tuple[tuple[float, float], tuple[float, float]] | None: dx, dy = x2 - x1, y2 - y1 pts: list[tuple[float, float]] = [] if 0 <= x1 <= w and 0 <= y1 <= h: pts.append((x1, y1)) if 0 <= x2 <= w and 0 <= y2 <= h: pts.append((x2, y2)) if abs(dx) >= 1e-12: for x_edge in (0.0, float(w - 1)): t = (x_edge - x1) / dx if 0 <= t <= 1: y = y1 + t * dy if 0 <= y <= h - 1: pts.append((x_edge, y)) if abs(dy) >= 1e-12: for y_edge in (0.0, float(h - 1)): t = (y_edge - y1) / dy if 0 <= t <= 1: x = x1 + t * dx if 0 <= x <= w - 1: pts.append((x, y_edge)) if len(pts) < 2: if len(pts) == 1: return (pts[0], pts[0]) return None pts_sorted = sorted(pts, key=lambda p: p[0]) return (pts_sorted[0], pts_sorted[-1]) def _segment_fully_inside_mask( p1: tuple[int, int], p2: tuple[int, int], mask: ndarray, ) -> bool: h, w = mask.shape[:2] x1, y1 = p1[0], p1[1] x2, y2 = p2[0], p2[1] n = max(abs(x2 - x1), abs(y2 - y1), 1) for k in range(n + 1): t = k / n x = int(round(x1 + t * (x2 - x1))) y = int(round(y1 + t * (y2 - y1))) if x < 0 or x >= w or y < 0 or y >= h: return False if mask[y, x] == 0: return False return True def _longest_segment_fully_inside_mask( mask: ndarray, contour_points: ndarray, ) -> tuple[tuple[int, int], tuple[int, int]] | None: pts = contour_points.reshape(-1, 2) n_pts = len(pts) if n_pts < 2: return None best_len_sq = -1.0 best_p1, best_p2 = None, None for i in range(n_pts): for j in range(i + 1, n_pts): p1 = (int(pts[i][0]), int(pts[i][1])) p2 = (int(pts[j][0]), int(pts[j][1])) if not _segment_fully_inside_mask(p1, p2, mask): continue d_sq = (pts[i][0] - pts[j][0]) ** 2 + (pts[i][1] - pts[j][1]) ** 2 if d_sq > best_len_sq: best_len_sq = d_sq best_p1, best_p2 = p1, p2 if best_p1 is not None and best_p2 is not None: return (best_p1, best_p2) return None def _line_segment_for_drawing( a: float, b: float, c: float, w: int, h: int, ) -> tuple[tuple[float, float], tuple[float, float]] | None: pts: list[tuple[float, float]] = [] if abs(b) >= 1e-12: for x in (0.0, float(w - 1)): y = -(a * x + c) / b if -50 <= y <= h + 50: pts.append((x, y)) if abs(a) >= 1e-12: for y in (0.0, float(h - 1)): x = -(b * y + c) / a if -50 <= x <= w + 50: pts.append((x, y)) if len(pts) < 2: return None seen: set[tuple[float, float]] = set() unique = [] for p in pts: key = (round(p[0], 2), round(p[1], 2)) if key not in seen: seen.add(key) unique.append(p) if len(unique) < 2: return None unique.sort(key=lambda p: (p[0], p[1])) return (unique[0], unique[-1]) def _y7() -> dict[int, int]: return {i: 2 for i in _W0} def _y8() -> dict[int, int]: m: dict[int, int] = {} for i in _W1: m[i] = 3 for i in _W2: m[i] = 4 m[16] = _KP16_WEIGHT return m def _y9() -> dict[int, int]: m: dict[int, int] = {} for i in _W3: m[i] = 2 for i in _W4: m[i] = 3 for i in _W5: m[i] = 4 m[16] = _KP16_WEIGHT return m def _y10( valid_indices: list[int], valid_src: list[tuple[float, float]], valid_dst: list[tuple[float, float]], weight_by_index: dict[int, int], ) -> ndarray | None: src_list: list[tuple[float, float]] = [] dst_list: list[tuple[float, float]] = [] for idx, (s, d) in zip(valid_indices, zip(valid_src, valid_dst)): w = max(1, weight_by_index.get(idx, 1)) for _ in range(w): src_list.append(s) dst_list.append(d) if len(src_list) < 4: return None src_np = np.array(src_list, dtype=np.float32) dst_np = np.array(dst_list, dtype=np.float32) H, _ = cv2.findHomography(src_np, dst_np) return H def _y11( H: ndarray, template_image: ndarray, video_frame: ndarray, valid_indices: list[int] | None = None, valid_src: list[tuple[float, float]] | None = None, valid_dst: list[tuple[float, float]] | None = None, weight_map: dict[int, int] | None = None, ) -> tuple[float, ndarray | None, list[tuple[float, float]] | None]: h, w = video_frame.shape[:2] def _score_from_warped(warped: ndarray) -> float: ground_mask, line_mask = _y5(warped) predicted_mask = _y6(video_frame, ground_mask) overlap = cv2.bitwise_and(line_mask, predicted_mask) pixels_on_lines = int(line_mask.sum()) pixels_overlap = int(overlap.sum()) return float(pixels_overlap) / float(pixels_on_lines + 1e-8) try: warped = cv2.warpPerspective(template_image, H, (w, h)) score = _score_from_warped(warped) return (score, H, None) except _Xe as e: err_msg = e.args[0] if e.args else "" if ( err_msg in _MASK_RETRY_ERRORS and valid_indices is not None and valid_src is not None and valid_dst is not None and weight_map is not None ): idx_smallest_y = min(range(len(valid_dst)), key=lambda i: valid_dst[i][1]) x0, y0 = valid_dst[idx_smallest_y] for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]: new_dst = list(valid_dst) new_dst[idx_smallest_y] = (x0 + dx, y0 + dy) H2 = _y10(valid_indices, valid_src, new_dst, weight_map) if H2 is None: continue try: warped2 = cv2.warpPerspective(template_image, H2, (w, h)) score = _score_from_warped(warped2) return (score, H2, new_dst) except _Xe: continue return (0.0, None, None) except Exception: return (0.0, None, None) def _is_kp_valid(kp: Any) -> bool: if not isinstance(kp, (list, tuple)) or len(kp) < 2: return False try: x, y = float(kp[0]), float(kp[1]) except (TypeError, ValueError): return False return not (x == 0.0 and y == 0.0) def _refine_kp5_kp16_kp29( kps: list[list[float]], H: ndarray, video_frame: ndarray, template_image: ndarray, *, precomputed_dilate_uint8: ndarray | None = None, precomputed_warped: ndarray | None = None, precomputed_ground_mask: ndarray | None = None, ) -> tuple[bool, str | None]: n_valid_5_16_29 = sum(1 for i in (5, 16, 29) if i < len(kps) and _is_kp_valid(kps[i])) if n_valid_5_16_29 >= 2: return (False, None) h, w = video_frame.shape[:2] kp16_valid_input = _is_kp_valid(kps[16]) if len(kps) > 16 else False left_set = [0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12] right_set = [18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] middle_set = [9, 13, 14, 15, 16, 17, 30, 31] decision: str | None = None if any(i < len(kps) and _is_kp_valid(kps[i]) for i in left_set): decision = "left" elif any(i < len(kps) and _is_kp_valid(kps[i]) for i in right_set): decision = "right" elif any(i < len(kps) and _is_kp_valid(kps[i]) for i in middle_set): decision = "middle" else: decision = "other" src_pts = np.array([_F1[i] for i in (5, 16, 29)], dtype=np.float32).reshape(1, 3, 2) projected = cv2.perspectiveTransform(src_pts, H)[0] for idx, i in enumerate((5, 16, 29)): if i < len(kps) and not _is_kp_valid(kps[i]): kps[i] = [float(projected[idx][0]), float(projected[idx][1])] tkp_5 = (float(kps[5][0]), float(kps[5][1])) tkp_16 = (float(kps[16][0]), float(kps[16][1])) tkp_29 = (float(kps[29][0]), float(kps[29][1])) clip = _clip_segment_to_rect(tkp_5[0], tkp_5[1], tkp_29[0], tkp_29[1], w, h) if clip is None: return (False, None) (ax, ay), (bx, by) = clip if decision == "right": clip_r = _clip_segment_to_rect(tkp_16[0], tkp_16[1], tkp_29[0], tkp_29[1], w, h) if clip_r is None: return (False, None) (Ax, Ay), (Bx, By) = clip_r valid_indices_52 = [] valid_src_52 = [] valid_dst_52 = [] for idx, kp in enumerate(kps): if not _is_kp_valid(kp): continue x, y = float(kp[0]), float(kp[1]) valid_indices_52.append(idx) valid_src_52.append(_F1[idx] if idx < len(_F1) else (0.0, 0.0)) valid_dst_52.append((x, y)) warped_r = precomputed_warped ground_mask_r = precomputed_ground_mask H_use_r = H if warped_r is None or ground_mask_r is None: try: warped_r = cv2.warpPerspective(template_image, H_use_r, (w, h)) ground_mask_r, _ = _y5(warped_r) except _Xe as e: err_msg = e.args[0] if e.args else "" if err_msg in _MASK_RETRY_ERRORS and len(valid_indices_52) >= 4 and len(valid_dst_52) >= 4: idx_smallest_y = min(range(len(valid_dst_52)), key=lambda i: valid_dst_52[i][1]) x0, y0 = valid_dst_52[idx_smallest_y] for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]: new_dst = list(valid_dst_52) new_dst[idx_smallest_y] = (x0 + dx, y0 + dy) H_retry = _y10(valid_indices_52, valid_src_52, new_dst, {}) if H_retry is None: continue try: warped_r = cv2.warpPerspective(template_image, H_retry, (w, h)) ground_mask_r, _ = _y5(warped_r) H_use_r = H_retry break except _Xe: continue if warped_r is None or ground_mask_r is None: return (False, None) except Exception: return (False, None) if warped_r is None or ground_mask_r is None: return (False, None) dilate_uint8_r = precomputed_dilate_uint8 if precomputed_dilate_uint8 is not None else _dilate_uint8_full_frame(video_frame) pts_right = [(float(kps[i][0]), float(kps[i][1])) for i in [24, 25, 26, 27, 28, 29] if i < len(kps) and _is_kp_valid(kps[i])] if len(pts_right) >= 2: line3 = _fit_line_to_points(pts_right) else: src_24_29 = np.array([[_F1[i] for i in [24, 25, 26, 27, 28, 29]]], dtype=np.float32) tkp_24_29 = cv2.perspectiveTransform(src_24_29, H_use_r)[0] pts_right = [(float(tkp_24_29[i][0]), float(tkp_24_29[i][1])) for i in range(6)] line3 = _fit_line_to_points(pts_right) if line3 is None: return (False, None) a3, b3, c3 = line3 norm_u = math.hypot(b3, -a3) if norm_u < 1e-12: return (False, None) ux, uy = b3 / norm_u, -a3 / norm_u d = _STEP5_2_RIGHT_QUAD_HALFLENGTH A1 = (Ax - d * ux, Ay - d * uy) A2 = (Ax + d * ux, Ay + d * uy) B1 = (Bx - d * ux, By - d * uy) B2 = (Bx + d * ux, By + d * uy) pts_poly = np.array([[A1[0], A1[1]], [A2[0], A2[1]], [B2[0], B2[1]], [B1[0], B1[1]]], dtype=np.int32) mask_poly = np.zeros((h, w), dtype=np.uint8) cv2.fillConvexPoly(mask_poly, pts_poly, 255) dilate_in_roi = cv2.bitwise_and(dilate_uint8_r, mask_poly) px = pts_poly[:, 0] py = pts_poly[:, 1] x_min = max(0, int(px.min()) - _STEP5_2_ROI_MARGIN) y_min = max(0, int(py.min()) - _STEP5_2_ROI_MARGIN) x_max = min(w, int(px.max()) + 1 + _STEP5_2_ROI_MARGIN) y_max = min(h, int(py.max()) + 1 + _STEP5_2_ROI_MARGIN) roi_w = x_max - x_min roi_h = y_max - y_min dilate_roi = dilate_in_roi[y_min:y_max, x_min:x_max] num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(dilate_roi, connectivity=8) best_label = 0 best_area = 0 for i in range(1, num_labels): area = stats[i, cv2.CC_STAT_AREA] if area > best_area: best_area = area best_label = i longest_mask_roi = ((labels == best_label).astype(np.uint8)) * 255 longest_mask = np.zeros((h, w), dtype=np.uint8) longest_mask[y_min:y_max, x_min:x_max] = longest_mask_roi p1, p2 = None, None A3, B3 = None, None contours, _ = cv2.findContours(longest_mask_roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if contours: contour = max(contours, key=cv2.contourArea) pts_contour = contour.reshape(-1, 2) n_c = len(pts_contour) max_pts = _STEP5_2_LONGEST_SEGMENT_MAX_PTS if n_c > max_pts: step = max(1, n_c // max_pts) pts_subsample = pts_contour[np.arange(0, n_c, step)] else: pts_subsample = pts_contour if _STEP5_2_HEAVY_SEARCH_FLAG: result = _longest_segment_fully_inside_mask(longest_mask_roi, pts_subsample) if result is not None: p1_roi, p2_roi = result p1 = (p1_roi[0] + x_min, p1_roi[1] + y_min) p2 = (p2_roi[0] + x_min, p2_roi[1] + y_min) else: best_len_sq = -1.0 best_p1_roi, best_p2_roi = None, None for i in range(len(pts_subsample)): for j in range(i + 1, len(pts_subsample)): d_sq = (pts_subsample[i][0] - pts_subsample[j][0]) ** 2 + (pts_subsample[i][1] - pts_subsample[j][1]) ** 2 if d_sq > best_len_sq: best_len_sq = d_sq best_p1_roi = (int(pts_subsample[i][0]), int(pts_subsample[i][1])) best_p2_roi = (int(pts_subsample[j][0]), int(pts_subsample[j][1])) if best_p1_roi is not None and best_p2_roi is not None: p1 = (best_p1_roi[0] + x_min, best_p1_roi[1] + y_min) p2 = (best_p2_roi[0] + x_min, best_p2_roi[1] + y_min) if p1 is not None and p2 is not None: a_long, b_long, c_long = _line_through_two_points(float(p1[0]), float(p1[1]), float(p2[0]), float(p2[1])) a2, b2, c2 = _line_through_two_points(B1[0], B1[1], B2[0], B2[1]) B3 = _line_intersection(a_long, b_long, c_long, a2, b2, c2) seg_border = _line_segment_for_drawing(a_long, b_long, c_long, w, h) if seg_border is not None: A3 = seg_border[0] if A3 is not None and B3 is not None: c4 = -a3 * A3[0] - b3 * A3[1] A3x, A3y = A3[0], A3[1] B3x, B3y = B3[0], B3[1] A3x_roi = A3x - x_min A3y_roi = A3y - y_min B3x_roi = B3x - x_min B3y_roi = B3y - y_min if _STEP5_2_8PX_HALFRES and roi_w >= 4 and roi_h >= 4: dilate_8px = cv2.resize(dilate_roi, (roi_w // 2, roi_h // 2), interpolation=cv2.INTER_NEAREST) roi_w_8, roi_h_8 = roi_w // 2, roi_h // 2 scale_8, seg_width_8 = 0.5, 4 else: dilate_8px = dilate_roi roi_w_8, roi_h_8 = roi_w, roi_h scale_8, seg_width_8 = 1.0, 8 mask_8_roi = np.zeros((roi_h_8, roi_w_8), dtype=np.uint8) overlap_roi = np.empty((roi_h_8, roi_w_8), dtype=np.uint8) best_count_8 = -1 best_s, best_t = 0, 0 for s in range(-30, 31, _STEP5_2_8PX_COARSE_STEP): for t in range(-30, 31, _STEP5_2_8PX_COARSE_STEP): A4x_roi = A3x_roi + s * ux A4y_roi = A3y_roi + s * uy B4x_roi = B3x_roi + t * ux B4y_roi = B3y_roi + t * uy ax_d = int(round(A4x_roi * scale_8)) ay_d = int(round(A4y_roi * scale_8)) bx_d = int(round(B4x_roi * scale_8)) by_d = int(round(B4y_roi * scale_8)) mask_8_roi.fill(0) cv2.line(mask_8_roi, (ax_d, ay_d), (bx_d, by_d), 255, seg_width_8) cv2.bitwise_and(dilate_8px, mask_8_roi, overlap_roi) count = cv2.countNonZero(overlap_roi) if count > best_count_8: best_count_8 = count best_s, best_t = s, t if _STEP5_2_8PX_REFINE_PASS: s_lo = max(-30, best_s - _STEP5_2_8PX_REFINE_WINDOW) s_hi = min(31, best_s + _STEP5_2_8PX_REFINE_WINDOW + 1) t_lo = max(-30, best_t - _STEP5_2_8PX_REFINE_WINDOW) t_hi = min(31, best_t + _STEP5_2_8PX_REFINE_WINDOW + 1) for s in range(s_lo, s_hi, 5): for t in range(t_lo, t_hi, 5): A4x_roi = A3x_roi + s * ux A4y_roi = A3y_roi + s * uy B4x_roi = B3x_roi + t * ux B4y_roi = B3y_roi + t * uy ax_d = int(round(A4x_roi * scale_8)) ay_d = int(round(A4y_roi * scale_8)) bx_d = int(round(B4x_roi * scale_8)) by_d = int(round(B4y_roi * scale_8)) mask_8_roi.fill(0) cv2.line(mask_8_roi, (ax_d, ay_d), (bx_d, by_d), 255, seg_width_8) cv2.bitwise_and(dilate_8px, mask_8_roi, overlap_roi) count = cv2.countNonZero(overlap_roi) if count > best_count_8: best_count_8 = count best_s, best_t = s, t A4 = (A3x + best_s * ux, A3y + best_s * uy) B4 = (B3x + best_t * ux, B3y + best_t * uy) a_ab, b_ab, c_ab = _line_through_two_points(A4[0], A4[1], B4[0], B4[1]) kkp29 = _line_intersection(a_ab, b_ab, c_ab, a3, b3, c3) center_pts = [(float(kps[i][0]), float(kps[i][1])) for i in [13, 14, 15, 16] if i < len(kps) and _is_kp_valid(kps[i])] if len(center_pts) >= 2: line_13_16 = _fit_line_to_points(center_pts) else: src_13_16 = np.array([[_F1[i] for i in [13, 14, 15, 16]]], dtype=np.float32) tkp_13_16 = cv2.perspectiveTransform(src_13_16, H_use_r)[0] center_pts = [(float(tkp_13_16[i][0]), float(tkp_13_16[i][1])) for i in range(4)] line_13_16 = _fit_line_to_points(center_pts) kkp16 = _line_intersection(a_ab, b_ab, c_ab, line_13_16[0], line_13_16[1], line_13_16[2]) if line_13_16 is not None else None if kkp29 is not None: kps[29] = [float(kkp29[0]), float(kkp29[1])] if kkp16 is not None: kps[16] = [float(kkp16[0]), float(kkp16[1])] if kkp16 is not None and kkp16[0] > 0: pts_0_5_r = [(float(kps[i][0]), float(kps[i][1])) for i in [0, 1, 2, 3, 4, 5] if i < len(kps) and _is_kp_valid(kps[i])] if len(pts_0_5_r) >= 2: line_0_5_r = _fit_line_to_points(pts_0_5_r) else: src_0_5_r = np.array([[_F1[i] for i in [0, 1, 2, 3, 4, 5]]], dtype=np.float32) tkp_0_5_r = cv2.perspectiveTransform(src_0_5_r, H_use_r)[0] pts_0_5_r = [(float(tkp_0_5_r[i][0]), float(tkp_0_5_r[i][1])) for i in range(6)] line_0_5_r = _fit_line_to_points(pts_0_5_r) kkp5_r = _line_intersection(a_ab, b_ab, c_ab, line_0_5_r[0], line_0_5_r[1], line_0_5_r[2]) if line_0_5_r is not None else None if kkp5_r is not None: kps[5] = [float(kkp5_r[0]), float(kkp5_r[1])] return (True, "right") if decision == "left": clip_l = _clip_segment_to_rect(tkp_5[0], tkp_5[1], tkp_16[0], tkp_16[1], w, h) if clip_l is None: return (False, None) (Bx, By), (Ax, Ay) = clip_l valid_indices_52 = [] valid_src_52 = [] valid_dst_52 = [] for idx, kp in enumerate(kps): if not _is_kp_valid(kp): continue x, y = float(kp[0]), float(kp[1]) valid_indices_52.append(idx) valid_src_52.append(_F1[idx] if idx < len(_F1) else (0.0, 0.0)) valid_dst_52.append((x, y)) warped_l = precomputed_warped ground_mask_l = precomputed_ground_mask H_use_l = H if warped_l is None or ground_mask_l is None: try: warped_l = cv2.warpPerspective(template_image, H_use_l, (w, h)) ground_mask_l, _ = _y5(warped_l) except _Xe as e: err_msg = e.args[0] if e.args else "" if err_msg in _MASK_RETRY_ERRORS and len(valid_indices_52) >= 4 and len(valid_dst_52) >= 4: idx_smallest_y = min(range(len(valid_dst_52)), key=lambda i: valid_dst_52[i][1]) x0, y0 = valid_dst_52[idx_smallest_y] for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]: new_dst = list(valid_dst_52) new_dst[idx_smallest_y] = (x0 + dx, y0 + dy) H_retry = _y10(valid_indices_52, valid_src_52, new_dst, {}) if H_retry is None: continue try: warped_l = cv2.warpPerspective(template_image, H_retry, (w, h)) ground_mask_l, _ = _y5(warped_l) H_use_l = H_retry break except _Xe: continue if warped_l is None or ground_mask_l is None: return (False, None) except Exception: return (False, None) if warped_l is None or ground_mask_l is None: return (False, None) dilate_uint8_l = precomputed_dilate_uint8 if precomputed_dilate_uint8 is not None else _dilate_uint8_full_frame(video_frame) pts_left = [(float(kps[i][0]), float(kps[i][1])) for i in [0, 1, 2, 3, 4, 5] if i < len(kps) and _is_kp_valid(kps[i])] if len(pts_left) >= 2: line3_l = _fit_line_to_points(pts_left) else: src_0_5 = np.array([[_F1[i] for i in [0, 1, 2, 3, 4, 5]]], dtype=np.float32) tkp_0_5 = cv2.perspectiveTransform(src_0_5, H_use_l)[0] pts_left = [(float(tkp_0_5[i][0]), float(tkp_0_5[i][1])) for i in range(6)] line3_l = _fit_line_to_points(pts_left) if line3_l is None: return (False, None) a3_l, b3_l, c3_l = line3_l norm_u_l = math.hypot(b3_l, -a3_l) if norm_u_l < 1e-12: return (False, None) ux_l, uy_l = b3_l / norm_u_l, -a3_l / norm_u_l d_l = _STEP5_2_RIGHT_QUAD_HALFLENGTH A1_l = (Ax - d_l * ux_l, Ay - d_l * uy_l) A2_l = (Ax + d_l * ux_l, Ay + d_l * uy_l) B1_l = (Bx - d_l * ux_l, By - d_l * uy_l) B2_l = (Bx + d_l * ux_l, By + d_l * uy_l) pts_poly_l = np.array([[A1_l[0], A1_l[1]], [A2_l[0], A2_l[1]], [B2_l[0], B2_l[1]], [B1_l[0], B1_l[1]]], dtype=np.int32) mask_poly_l = np.zeros((h, w), dtype=np.uint8) cv2.fillConvexPoly(mask_poly_l, pts_poly_l, 255) dilate_in_roi_l = cv2.bitwise_and(dilate_uint8_l, mask_poly_l) px_l = pts_poly_l[:, 0] py_l = pts_poly_l[:, 1] x_min_l = max(0, int(px_l.min()) - _STEP5_2_ROI_MARGIN) y_min_l = max(0, int(py_l.min()) - _STEP5_2_ROI_MARGIN) x_max_l = min(w, int(px_l.max()) + 1 + _STEP5_2_ROI_MARGIN) y_max_l = min(h, int(py_l.max()) + 1 + _STEP5_2_ROI_MARGIN) roi_w_l = x_max_l - x_min_l roi_h_l = y_max_l - y_min_l dilate_roi_l = dilate_in_roi_l[y_min_l:y_max_l, x_min_l:x_max_l] num_labels_l, labels_l, stats_l, _ = cv2.connectedComponentsWithStats(dilate_roi_l, connectivity=8) best_label_l = 0 best_area_l = 0 for i in range(1, num_labels_l): area = stats_l[i, cv2.CC_STAT_AREA] if area > best_area_l: best_area_l = area best_label_l = i longest_mask_roi_l = ((labels_l == best_label_l).astype(np.uint8)) * 255 longest_mask_l = np.zeros((h, w), dtype=np.uint8) longest_mask_l[y_min_l:y_max_l, x_min_l:x_max_l] = longest_mask_roi_l p1_l, p2_l = None, None A3_l, B3_l = None, None contours_l, _ = cv2.findContours(longest_mask_roi_l, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) if contours_l: contour_l = max(contours_l, key=cv2.contourArea) pts_contour_l = contour_l.reshape(-1, 2) n_c_l = len(pts_contour_l) max_pts_l = _STEP5_2_LONGEST_SEGMENT_MAX_PTS if n_c_l > max_pts_l: step_l = max(1, n_c_l // max_pts_l) pts_subsample_l = pts_contour_l[np.arange(0, n_c_l, step_l)] else: pts_subsample_l = pts_contour_l if _STEP5_2_HEAVY_SEARCH_FLAG: result_l = _longest_segment_fully_inside_mask(longest_mask_roi_l, pts_subsample_l) if result_l is not None: p1_roi_l, p2_roi_l = result_l p1_l = (p1_roi_l[0] + x_min_l, p1_roi_l[1] + y_min_l) p2_l = (p2_roi_l[0] + x_min_l, p2_roi_l[1] + y_min_l) else: best_len_sq_l = -1.0 best_p1_roi_l, best_p2_roi_l = None, None for i in range(len(pts_subsample_l)): for j in range(i + 1, len(pts_subsample_l)): d_sq = (pts_subsample_l[i][0] - pts_subsample_l[j][0]) ** 2 + (pts_subsample_l[i][1] - pts_subsample_l[j][1]) ** 2 if d_sq > best_len_sq_l: best_len_sq_l = d_sq best_p1_roi_l = (int(pts_subsample_l[i][0]), int(pts_subsample_l[i][1])) best_p2_roi_l = (int(pts_subsample_l[j][0]), int(pts_subsample_l[j][1])) if best_p1_roi_l is not None and best_p2_roi_l is not None: p1_l = (best_p1_roi_l[0] + x_min_l, best_p1_roi_l[1] + y_min_l) p2_l = (best_p2_roi_l[0] + x_min_l, best_p2_roi_l[1] + y_min_l) if p1_l is not None and p2_l is not None: a_long_l, b_long_l, c_long_l = _line_through_two_points(float(p1_l[0]), float(p1_l[1]), float(p2_l[0]), float(p2_l[1])) a2_l, b2_l, c2_l = _line_through_two_points(B1_l[0], B1_l[1], B2_l[0], B2_l[1]) B3_l = _line_intersection(a_long_l, b_long_l, c_long_l, a2_l, b2_l, c2_l) seg_border_l = _line_segment_for_drawing(a_long_l, b_long_l, c_long_l, w, h) if seg_border_l is not None: A3_l = seg_border_l[1] if A3_l is not None and B3_l is not None: A3x_l, A3y_l = A3_l[0], A3_l[1] B3x_l, B3y_l = B3_l[0], B3_l[1] A3x_roi_l = A3x_l - x_min_l A3y_roi_l = A3y_l - y_min_l B3x_roi_l = B3x_l - x_min_l B3y_roi_l = B3y_l - y_min_l if _STEP5_2_8PX_HALFRES and roi_w_l >= 4 and roi_h_l >= 4: dilate_8px_l = cv2.resize(dilate_roi_l, (roi_w_l // 2, roi_h_l // 2), interpolation=cv2.INTER_NEAREST) roi_w_8_l, roi_h_8_l = roi_w_l // 2, roi_h_l // 2 scale_8_l, seg_width_8_l = 0.5, 4 else: dilate_8px_l = dilate_roi_l roi_w_8_l, roi_h_8_l = roi_w_l, roi_h_l scale_8_l, seg_width_8_l = 1.0, 8 mask_8_roi_l = np.zeros((roi_h_8_l, roi_w_8_l), dtype=np.uint8) overlap_roi_l = np.empty((roi_h_8_l, roi_w_8_l), dtype=np.uint8) best_count_8_l = -1 best_s_l, best_t_l = 0, 0 for s in range(-30, 31, _STEP5_2_8PX_COARSE_STEP): for t in range(-30, 31, _STEP5_2_8PX_COARSE_STEP): A4x_roi_l = A3x_roi_l + s * ux_l A4y_roi_l = A3y_roi_l + s * uy_l B4x_roi_l = B3x_roi_l + t * ux_l B4y_roi_l = B3y_roi_l + t * uy_l ax_d_l = int(round(A4x_roi_l * scale_8_l)) ay_d_l = int(round(A4y_roi_l * scale_8_l)) bx_d_l = int(round(B4x_roi_l * scale_8_l)) by_d_l = int(round(B4y_roi_l * scale_8_l)) mask_8_roi_l.fill(0) cv2.line(mask_8_roi_l, (ax_d_l, ay_d_l), (bx_d_l, by_d_l), 255, seg_width_8_l) cv2.bitwise_and(dilate_8px_l, mask_8_roi_l, overlap_roi_l) count = cv2.countNonZero(overlap_roi_l) if count > best_count_8_l: best_count_8_l = count best_s_l, best_t_l = s, t if _STEP5_2_8PX_REFINE_PASS: s_lo_l = max(-30, best_s_l - _STEP5_2_8PX_REFINE_WINDOW) s_hi_l = min(31, best_s_l + _STEP5_2_8PX_REFINE_WINDOW + 1) t_lo_l = max(-30, best_t_l - _STEP5_2_8PX_REFINE_WINDOW) t_hi_l = min(31, best_t_l + _STEP5_2_8PX_REFINE_WINDOW + 1) for s in range(s_lo_l, s_hi_l, 5): for t in range(t_lo_l, t_hi_l, 5): A4x_roi_l = A3x_roi_l + s * ux_l A4y_roi_l = A3y_roi_l + s * uy_l B4x_roi_l = B3x_roi_l + t * ux_l B4y_roi_l = B3y_roi_l + t * uy_l ax_d_l = int(round(A4x_roi_l * scale_8_l)) ay_d_l = int(round(A4y_roi_l * scale_8_l)) bx_d_l = int(round(B4x_roi_l * scale_8_l)) by_d_l = int(round(B4y_roi_l * scale_8_l)) mask_8_roi_l.fill(0) cv2.line(mask_8_roi_l, (ax_d_l, ay_d_l), (bx_d_l, by_d_l), 255, seg_width_8_l) cv2.bitwise_and(dilate_8px_l, mask_8_roi_l, overlap_roi_l) count = cv2.countNonZero(overlap_roi_l) if count > best_count_8_l: best_count_8_l = count best_s_l, best_t_l = s, t A4_l = (A3x_l + best_s_l * ux_l, A3y_l + best_s_l * uy_l) B4_l = (B3x_l + best_t_l * ux_l, B3y_l + best_t_l * uy_l) a_ab_l, b_ab_l, c_ab_l = _line_through_two_points(A4_l[0], A4_l[1], B4_l[0], B4_l[1]) kkp5_l = _line_intersection(a_ab_l, b_ab_l, c_ab_l, a3_l, b3_l, c3_l) center_pts_l = [(float(kps[i][0]), float(kps[i][1])) for i in [13, 14, 15, 16] if i < len(kps) and _is_kp_valid(kps[i])] if len(center_pts_l) >= 2: line_13_16_l = _fit_line_to_points(center_pts_l) else: src_13_16_l = np.array([[_F1[i] for i in [13, 14, 15, 16]]], dtype=np.float32) tkp_13_16_l = cv2.perspectiveTransform(src_13_16_l, H_use_l)[0] center_pts_l = [(float(tkp_13_16_l[i][0]), float(tkp_13_16_l[i][1])) for i in range(4)] line_13_16_l = _fit_line_to_points(center_pts_l) kkp16_l = _line_intersection(a_ab_l, b_ab_l, c_ab_l, line_13_16_l[0], line_13_16_l[1], line_13_16_l[2]) if line_13_16_l is not None else None if kkp5_l is not None: kps[5] = [float(kkp5_l[0]), float(kkp5_l[1])] if kkp16_l is not None: kps[16] = [float(kkp16_l[0]), float(kkp16_l[1])] if kkp16_l is not None and kkp16_l[0] < w: pts_24_29_l = [(float(kps[i][0]), float(kps[i][1])) for i in [24, 25, 26, 27, 28, 29] if i < len(kps) and _is_kp_valid(kps[i])] if len(pts_24_29_l) >= 2: line_24_29_l = _fit_line_to_points(pts_24_29_l) else: src_24_29_l = np.array([[_F1[i] for i in [24, 25, 26, 27, 28, 29]]], dtype=np.float32) tkp_24_29_l = cv2.perspectiveTransform(src_24_29_l, H_use_l)[0] pts_24_29_l = [(float(tkp_24_29_l[i][0]), float(tkp_24_29_l[i][1])) for i in range(6)] line_24_29_l = _fit_line_to_points(pts_24_29_l) kkp29_l = _line_intersection(a_ab_l, b_ab_l, c_ab_l, line_24_29_l[0], line_24_29_l[1], line_24_29_l[2]) if line_24_29_l is not None else None if kkp29_l is not None: kps[29] = [float(kkp29_l[0]), float(kkp29_l[1])] return (True, "left") if not kp16_valid_input: return (False, None) x16, y16 = tkp_16[0], tkp_16[1] valid_indices_52 = [] valid_src_52 = [] valid_dst_52 = [] for idx, kp in enumerate(kps): if not _is_kp_valid(kp): continue x, y = float(kp[0]), float(kp[1]) valid_indices_52.append(idx) valid_src_52.append(_F1[idx] if idx < len(_F1) else (0.0, 0.0)) valid_dst_52.append((x, y)) warped = None ground_mask = None H_use = H try: warped = cv2.warpPerspective(template_image, H_use, (w, h)) ground_mask, _ = _y5(warped) except _Xe as e: err_msg = e.args[0] if e.args else "" if err_msg in _MASK_RETRY_ERRORS and len(valid_indices_52) >= 4 and len(valid_dst_52) >= 4: idx_smallest_y = min(range(len(valid_dst_52)), key=lambda i: valid_dst_52[i][1]) x0, y0 = valid_dst_52[idx_smallest_y] for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]: new_dst = list(valid_dst_52) new_dst[idx_smallest_y] = (x0 + dx, y0 + dy) H_retry = _y10(valid_indices_52, valid_src_52, new_dst, {}) if H_retry is None: continue try: warped = cv2.warpPerspective(template_image, H_retry, (w, h)) ground_mask, _ = _y5(warped) H_use = H_retry break except _Xe: continue else: warped = None ground_mask = None if warped is None or ground_mask is None: return (False, None) except Exception: return (False, None) if warped is None or ground_mask is None: return (False, None) dilate_uint8 = _dilate_uint8_full_frame(video_frame) seg_width = 8 mask = np.zeros((h, w), dtype=np.uint8) overlap_buf = np.empty((h, w), dtype=np.uint8) best_count = -1 best_ay, best_by = ay, by step = 5 for t in range(-100, 101, step): ay_new = ay + t if abs(bx - ax) < 1e-12: by_new = ay_new else: by_new = ay_new + (y16 - ay_new) * (bx - ax) / (x16 - ax) if abs(x16 - ax) >= 1e-12 else ay_new a_pt = (int(round(ax)), int(round(ay_new))) b_pt = (int(round(bx)), int(round(by_new))) mask.fill(0) cv2.line(mask, a_pt, b_pt, 255, seg_width) cv2.bitwise_and(dilate_uint8, mask, overlap_buf) count = cv2.countNonZero(overlap_buf) if count > best_count: best_count = count best_ay, best_by = ay_new, by_new for shift in range(-20, 21, 5): ay_shift = best_ay + shift by_shift = best_by + shift a_pt = (int(round(ax)), int(round(ay_shift))) b_pt = (int(round(bx)), int(round(by_shift))) mask.fill(0) cv2.line(mask, a_pt, b_pt, 255, seg_width) cv2.bitwise_and(dilate_uint8, mask, overlap_buf) count = cv2.countNonZero(overlap_buf) if count > best_count: best_count = count best_ay, best_by = ay_shift, by_shift a_final = (ax, best_ay) b_final = (bx, best_by) center_pts = [] for i in [13, 14, 15, 16]: if i < len(kps) and _is_kp_valid(kps[i]): center_pts.append((float(kps[i][0]), float(kps[i][1]))) line_center = _fit_line_to_points(center_pts) if len(center_pts) >= 2 else None a_ab, b_ab, c_ab = _line_through_two_points(a_final[0], a_final[1], b_final[0], b_final[1]) if line_center is not None: a_c, b_c, c_c = line_center inter = _line_intersection(a_c, b_c, c_c, a_ab, b_ab, c_ab) if inter is not None: x16, y16 = inter[0], inter[1] d5 = math.hypot(tkp_5[0] - x16, tkp_5[1] - y16) d29 = math.hypot(tkp_29[0] - x16, tkp_29[1] - y16) dx_ab = b_final[0] - a_final[0] dy_ab = b_final[1] - a_final[1] len_ab = math.hypot(dx_ab, dy_ab) if len_ab < 1e-12: kkp5 = (x16, y16) kkp29 = (x16, y16) else: ux = dx_ab / len_ab uy = dy_ab / len_ab kkp5_plus = (x16 + d5 * ux, y16 + d5 * uy) kkp5_minus = (x16 - d5 * ux, y16 - d5 * uy) dist_plus_to_a = math.hypot(kkp5_plus[0] - a_final[0], kkp5_plus[1] - a_final[1]) dist_minus_to_a = math.hypot(kkp5_minus[0] - a_final[0], kkp5_minus[1] - a_final[1]) kkp5 = kkp5_minus if dist_minus_to_a < dist_plus_to_a else kkp5_plus kkp29_plus = (x16 + d29 * ux, y16 + d29 * uy) kkp29_minus = (x16 - d29 * ux, y16 - d29 * uy) dist_plus_to_b = math.hypot(kkp29_plus[0] - b_final[0], kkp29_plus[1] - b_final[1]) dist_minus_to_b = math.hypot(kkp29_minus[0] - b_final[0], kkp29_minus[1] - b_final[1]) kkp29 = kkp29_minus if dist_minus_to_b < dist_plus_to_b else kkp29_plus kps[5] = [kkp5[0], kkp5[1]] kps[29] = [kkp29[0], kkp29[1]] kps[16] = [x16, y16] return (True, None) def _refine_kp4_kp12( kps: list[list[float]], H: ndarray, video_frame: ndarray, template_image: ndarray, ) -> bool: if len(kps) <= 12: return False if not _is_kp_valid(kps[12]) or _is_kp_valid(kps[4]): return False h, w = video_frame.shape[:2] src_pt4 = np.array([_F1[4]], dtype=np.float32).reshape(1, 1, 2) inferred_4 = cv2.perspectiveTransform(src_pt4, H)[0, 0] kp4_x, kp4_y = float(inferred_4[0]), float(inferred_4[1]) kp12_x = float(kps[12][0]) kp12_y = float(kps[12][1]) try: warped = cv2.warpPerspective(template_image, H, (w, h)) ground_mask, _ = _y5(warped) except _Xe: return False dilate_image = _y6(video_frame, ground_mask) dilate_uint8 = (dilate_image.astype(np.uint8)) * 255 y4_lo = max(0, int(kp4_y) - 50) y4_hi = min(h - 1, int(kp4_y) + 50) y12_lo = max(0, int(kp12_y) - 50) y12_hi = min(h - 1, int(kp12_y) + 50) step = 5 best_count = -1 best_y4 = int(kp4_y) best_y12 = int(kp12_y) seg_width = 5 mask = np.zeros((h, w), dtype=np.uint8) overlap_buf = np.empty((h, w), dtype=np.uint8) for y4 in range(y4_lo, min(y4_hi + 1, y4_lo + ((y4_hi - y4_lo) // step) * step + 1), step): for y12 in range(y12_lo, min(y12_hi + 1, y12_lo + ((y12_hi - y12_lo) // step) * step + 1), step): p1 = (int(round(kp4_x)), y4) p2 = (int(round(kp12_x)), y12) mask.fill(0) cv2.line(mask, p1, p2, 255, seg_width) cv2.bitwise_and(dilate_uint8, mask, overlap_buf) count = cv2.countNonZero(overlap_buf) if count > best_count: best_count = count best_y4 = y4 best_y12 = y12 kkp4 = (kp4_x, float(best_y4)) kkp12 = (kp12_x, float(best_y12)) line_ext = _line_through_two_points(kkp4[0], kkp4[1], kkp12[0], kkp12[1]) pts1 = [] for i in [0, 1, 2, 3, 4]: if i < len(kps) and _is_kp_valid(kps[i]): pts1.append((float(kps[i][0]), float(kps[i][1]))) if len(pts1) < 2: return False line1 = _fit_line_to_points(pts1) if line1 is None: return False pts2 = [] for i in [9, 10, 11, 12]: if i < len(kps) and _is_kp_valid(kps[i]): pts2.append((float(kps[i][0]), float(kps[i][1]))) if len(pts2) < 2: return False line2 = _fit_line_to_points(pts2) if line2 is None: return False a1, b1, c1 = line1 a2, b2, c2 = line2 inter1 = _line_intersection(a1, b1, c1, line_ext[0], line_ext[1], line_ext[2]) inter2 = _line_intersection(a2, b2, c2, line_ext[0], line_ext[1], line_ext[2]) if inter1 is None or inter2 is None: return False kps[4] = [inter1[0], inter1[1]] kps[12] = [inter2[0], inter2[1]] return True def _refine_kp20_kp28( kps: list[list[float]], H: ndarray, video_frame: ndarray, template_image: ndarray, ) -> bool: if len(kps) <= 28: return False if not _is_kp_valid(kps[20]) or _is_kp_valid(kps[28]): return False h, w = video_frame.shape[:2] src_pt28 = np.array([_F1[28]], dtype=np.float32).reshape(1, 1, 2) inferred_28 = cv2.perspectiveTransform(src_pt28, H)[0, 0] kp28_x, kp28_y = float(inferred_28[0]), float(inferred_28[1]) kp20_x = float(kps[20][0]) kp20_y = float(kps[20][1]) try: warped = cv2.warpPerspective(template_image, H, (w, h)) ground_mask, _ = _y5(warped) except _Xe: return False dilate_image = _y6(video_frame, ground_mask) dilate_uint8 = (dilate_image.astype(np.uint8)) * 255 y28_lo = max(0, int(kp28_y) - 50) y28_hi = min(h - 1, int(kp28_y) + 50) y20_lo = max(0, int(kp20_y) - 50) y20_hi = min(h - 1, int(kp20_y) + 50) step = 5 best_count = -1 best_y28 = int(kp28_y) best_y20 = int(kp20_y) seg_width = 5 mask = np.zeros((h, w), dtype=np.uint8) overlap_buf = np.empty((h, w), dtype=np.uint8) for y28 in range(y28_lo, min(y28_hi + 1, y28_lo + ((y28_hi - y28_lo) // step) * step + 1), step): for y20 in range(y20_lo, min(y20_hi + 1, y20_lo + ((y20_hi - y20_lo) // step) * step + 1), step): p1 = (int(round(kp28_x)), y28) p2 = (int(round(kp20_x)), y20) mask.fill(0) cv2.line(mask, p1, p2, 255, seg_width) cv2.bitwise_and(dilate_uint8, mask, overlap_buf) count = cv2.countNonZero(overlap_buf) if count > best_count: best_count = count best_y28 = y28 best_y20 = y20 kkp28 = (kp28_x, float(best_y28)) kkp20 = (kp20_x, float(best_y20)) line_ext = _line_through_two_points(kkp28[0], kkp28[1], kkp20[0], kkp20[1]) pts1 = [] for i in [24, 25, 26, 27, 28]: if i < len(kps) and _is_kp_valid(kps[i]): pts1.append((float(kps[i][0]), float(kps[i][1]))) if len(pts1) < 2: return False line1 = _fit_line_to_points(pts1) if line1 is None: return False pts2 = [] for i in [17, 18, 19, 20]: if i < len(kps) and _is_kp_valid(kps[i]): pts2.append((float(kps[i][0]), float(kps[i][1]))) if len(pts2) < 2: return False line2 = _fit_line_to_points(pts2) if line2 is None: return False a1, b1, c1 = line1 a2, b2, c2 = line2 inter1 = _line_intersection(a1, b1, c1, line_ext[0], line_ext[1], line_ext[2]) inter2 = _line_intersection(a2, b2, c2, line_ext[0], line_ext[1], line_ext[2]) if inter1 is None or inter2 is None: return False kps[28] = [inter1[0], inter1[1]] kps[20] = [inter2[0], inter2[1]] return True def _z0( kps: list[Any], video_frame: ndarray, template_image: ndarray, ) -> list[list[float]] | None: if not isinstance(kps, list) or len(kps) != _N0: return None h, w = video_frame.shape[:2] frame_width, frame_height = w, h def _collect_valid( kps_list: list[Any], step52_decision: str | None, ) -> tuple[list[int], list[tuple[float, float]], list[tuple[float, float]]]: vi: list[int] = [] vs: list[tuple[float, float]] = [] vd: list[tuple[float, float]] = [] kp16_x: float | None = None if len(kps_list) > 16 and isinstance(kps_list[16], (list, tuple)) and len(kps_list[16]) >= 1: try: kp16_x = float(kps_list[16][0]) except (TypeError, ValueError): pass for idx, kp in enumerate(kps_list): if not isinstance(kp, (list, tuple)) or len(kp) < 2: continue try: x, y = float(kp[0]), float(kp[1]) except (TypeError, ValueError): continue if x == 0.0 and y == 0.0: continue if idx not in _ALWAYS_INCLUDE_INDICES: if x < 0 or x > frame_width or y < 0 or y > frame_height: continue if idx == 5 and x > frame_width: continue if idx == 29 and x < 0: continue if step52_decision == "left" and kp16_x is not None and kp16_x > frame_width and idx == 29: continue if step52_decision == "right" and kp16_x is not None and kp16_x < 0 and idx == 5: continue vi.append(idx) if idx < len(_F1): vs.append(_F1[idx]) vd.append((x, y)) return (vi, vs, vd) valid_indices, valid_src, valid_dst = _collect_valid(kps, None) if len(valid_src) < 4: return None H0 = _y10(valid_indices, valid_src, valid_dst, {}) if H0 is not None: score0, H0_used, dst_retry = _y11( H0, template_image, video_frame, valid_indices, valid_src, valid_dst, {}, ) if dst_retry is not None and H0_used is not None: for i, idx in enumerate(valid_indices): if idx < len(kps): kps[idx] = [float(dst_retry[i][0]), float(dst_retry[i][1])] valid_indices, valid_src, valid_dst = _collect_valid(kps, None) H0 = H0_used else: score0 = 0.0 refined = False step52_decision: str | None = None if H0 is not None: refined = _refine_kp4_kp12(kps, H0, video_frame, template_image) or refined refined = _refine_kp20_kp28(kps, H0, video_frame, template_image) or refined dilate_uint8 = _dilate_uint8_full_frame(video_frame) warp_52: ndarray | None = None ground_mask_52: ndarray | None = None try: warp_52 = cv2.warpPerspective(template_image, H0, (frame_width, frame_height)) ground_mask_52, _ = _y5(warp_52) except _Xe: pass step52_refined, step52_decision = _refine_kp5_kp16_kp29( kps, H0, video_frame, template_image, precomputed_dilate_uint8=dilate_uint8, precomputed_warped=warp_52, precomputed_ground_mask=ground_mask_52, ) refined = refined or step52_refined if refined: valid_indices, valid_src, valid_dst = _collect_valid(kps, step52_decision) if len(valid_src) < 4: valid_indices, valid_src, valid_dst = _collect_valid(kps, None) if len(valid_src) < 4: if H0 is not None: src_all = np.array(_F1, dtype=np.float32).reshape(1, -1, 2) projected = cv2.perspectiveTransform(src_all, H0)[0] return [[float(projected[i][0]), float(projected[i][1])] for i in range(_N0)] return None w1, w2, w3 = _y7(), _y8(), _y9() H1 = _y10(valid_indices, valid_src, valid_dst, w1) H2 = _y10(valid_indices, valid_src, valid_dst, w2) valid_set = set(valid_indices) if valid_set.isdisjoint(_INDICES_H3_VS_H1): H3 = H1 elif valid_set.isdisjoint(_INDICES_H3_VS_H2): H3 = H2 else: H3 = _y10(valid_indices, valid_src, valid_dst, w3) score1 = _y11(H1, template_image, video_frame)[0] if H1 is not None else 0.0 score2 = _y11(H2, template_image, video_frame)[0] if H2 is not None else 0.0 score3 = _y11(H3, template_image, video_frame)[0] if H3 is not None else 0.0 best_H = H0 best_score = score0 if H1 is not None and score1 > best_score: best_H, best_score = H1, score1 if H2 is not None and score2 > best_score: best_H, best_score = H2, score2 if H3 is not None and score3 > best_score: best_H = H3 if best_H is None: return None src_all = np.array(_F1, dtype=np.float32).reshape(1, -1, 2) projected = cv2.perspectiveTransform(src_all, best_H)[0] return [[float(projected[i][0]), float(projected[i][1])] for i in range(_N0)] def _z1( kps: list[Any], frame_width: int, frame_height: int, fill_missing: bool, ) -> list[list[float]] | None: if not isinstance(kps, list) or len(kps) != _N0 or frame_width <= 0 or frame_height <= 0: return None filtered_src: list[tuple[float, float]] = [] filtered_dst: list[tuple[float, float]] = [] valid_indices: list[int] = [] for idx, kp in enumerate(kps): if not isinstance(kp, (list, tuple)) or len(kp) < 2: continue try: x, y = float(kp[0]), float(kp[1]) except (TypeError, ValueError): continue if x == 0.0 and y == 0.0: continue if idx >= len(_F1): continue filtered_src.append(_F1[idx]) filtered_dst.append((x, y)) valid_indices.append(idx) if len(filtered_src) < 4: return None src_np = np.array(filtered_src, dtype=np.float32) dst_np = np.array(filtered_dst, dtype=np.float32) H_corrected, _ = cv2.findHomography(src_np, dst_np) if H_corrected is None: return None fk_np = np.array(_F0, dtype=np.float32).reshape(1, -1, 2) projected_np = cv2.perspectiveTransform(fk_np, H_corrected)[0] valid_indices_set = set(valid_indices) adjusted_kps: list[list[float]] = [[0.0, 0.0] for _ in range(_N0)] for idx in range(_N0): x, y = float(projected_np[idx][0]), float(projected_np[idx][1]) if not (0 <= x < frame_width and 0 <= y < frame_height): continue if fill_missing or idx in valid_indices_set: adjusted_kps[idx] = [x, y] return adjusted_kps def _z2( keypoints: list[list[float]], video_frame: ndarray, template_image: ndarray, ) -> float: score, _ = _z2_score_and_kps(keypoints, video_frame, template_image) return score def _z2_score_and_kps( keypoints: list[list[float]], video_frame: ndarray, template_image: ndarray, ) -> tuple[float, list[list[float]] | None]: if not isinstance(keypoints, list) or len(keypoints) != _N0: return (0.0, None) valid_indices: list[int] = [] valid_src: list[tuple[float, float]] = [] valid_dst: list[tuple[float, float]] = [] for idx, kp in enumerate(keypoints): if not isinstance(kp, (list, tuple)) or len(kp) < 2: continue try: x, y = float(kp[0]), float(kp[1]) except (TypeError, ValueError): continue if x == 0.0 and y == 0.0: continue if idx >= len(_F1): continue valid_indices.append(idx) valid_src.append(_F1[idx]) valid_dst.append((x, y)) if len(valid_src) < 4: return (0.0, None) H = _y10(valid_indices, valid_src, valid_dst, {}) if H is None: return (0.0, None) score, H_used, new_dst = _y11( H, template_image, video_frame, valid_indices, valid_src, valid_dst, {}, ) if new_dst is not None and H_used is not None: new_keypoints = [list(kp) if isinstance(kp, (list, tuple)) else [0.0, 0.0] for kp in keypoints] if len(new_keypoints) != _N0: new_keypoints = (new_keypoints + [[0.0, 0.0]] * _N0)[:_N0] for i, idx in enumerate(valid_indices): if idx < len(new_keypoints) and i < len(new_dst): new_keypoints[idx] = [float(new_dst[i][0]), float(new_dst[i][1])] return (score, new_keypoints) return (score, None) def _z3(kps: list[Any]) -> dict[int, tuple[float, float]]: out: dict[int, tuple[float, float]] = {} for idx, kp in enumerate(kps): if not isinstance(kp, (list, tuple)) or len(kp) < 2: continue try: x, y = float(kp[0]), float(kp[1]) except (TypeError, ValueError): continue if x != 0.0 or y != 0.0: out[idx] = (x, y) return out def _z4( a: dict[int, tuple[float, float]], b: dict[int, tuple[float, float]], threshold: float, ) -> int: count = 0 for idx, (ax, ay) in a.items(): if idx not in b: continue bx, by = b[idx] if ((ax - bx) ** 2 + (ay - by) ** 2) ** 0.5 <= threshold: count += 1 return count def _z5(a: list[Any], b: list[Any]) -> list[int]: out: list[int] = [] for i in range(min(len(a), len(b))): ka, kb = a[i], b[i] if not (isinstance(ka, (list, tuple)) and len(ka) >= 2): continue if not (isinstance(kb, (list, tuple)) and len(kb) >= 2): continue if float(ka[0]) == 0.0 and float(ka[1]) == 0.0: continue if float(kb[0]) == 0.0 and float(kb[1]) == 0.0: continue out.append(i) return out def _z6( a: list[Any], b: list[Any], frame_width: int, frame_height: int, ) -> list[int]: out: list[int] = [] for i in range(min(len(a), len(b))): ka, kb = a[i], b[i] if not (isinstance(ka, (list, tuple)) and len(ka) >= 2): continue if not (isinstance(kb, (list, tuple)) and len(kb) >= 2): continue xa, ya = float(ka[0]), float(ka[1]) xb, yb = float(kb[0]), float(kb[1]) if xa == 0.0 and ya == 0.0: continue if xb == 0.0 and yb == 0.0: continue if not (0 <= xa < frame_width and 0 <= ya < frame_height): continue if not (0 <= xb < frame_width and 0 <= yb < frame_height): continue out.append(i) return out def _z7( batch_frame_ids: list[int], keypoints_by_frame: dict[int, list[list[float]]], ) -> list[list[int]]: id_kps: list[tuple[int, list[list[float]]]] = [] for fid in batch_frame_ids: kps = keypoints_by_frame.get(fid) if not kps: continue vkps = _z3(kps) if vkps: id_kps.append((fid, kps)) id_kps.sort(key=lambda t: t[0]) segments: list[list[int]] = [] if not id_kps: return segments current_segment: list[int] = [id_kps[0][0]] prev_vkps = _z3(id_kps[0][1]) for i in range(1, len(id_kps)): fid, kps = id_kps[i] cur_vkps = _z3(kps) common = _z4(prev_vkps, cur_vkps, _J5) if common >= _J6: current_segment.append(fid) else: segments.append(current_segment) current_segment = [fid] prev_vkps = cur_vkps segments.append(current_segment) return segments def _z8( keypoints_by_frame: dict[int, list[list[float]]], images: list[ndarray], offset: int, template_image: ndarray, ) -> int: if not _J1 or not images or len(images) < _Z8_MIN_BATCH_FRAMES: return 0 batch_frame_ids = [offset + i for i in range(len(images))] score_map: dict[int, float] = {} for i, fid in enumerate(batch_frame_ids): kps = keypoints_by_frame.get(fid) if not kps or len(kps) != _N0: score_map[fid] = 0.0 continue score_map[fid] = _z2(kps, images[i], template_image) sorted_ids = sorted(score_map.keys()) if not sorted_ids: return 0 segments = _z7(batch_frame_ids, keypoints_by_frame) frame_to_seg: dict[int, int] = {} for seg_idx, seg in enumerate(segments): for fid in seg: frame_to_seg[fid] = seg_idx frame_width = images[0].shape[1] if images else 0 frame_height = images[0].shape[0] if images else 0 total_updated = 0 for threshold in _J2: problematic = [fid for fid in sorted_ids if score_map[fid] < threshold] if not problematic: continue problematic = problematic[:_Z8_MAX_PROBLEMATIC_PER_BATCH] segments_seen: dict[tuple[int, int], tuple[list[Any], list[Any], set[int]]] = {} for problem_id in problematic: backward_id: int | None = None for fid in reversed(sorted_ids): if fid < problem_id and score_map[fid] >= threshold: backward_id = fid break forward_id: int | None = None for fid in sorted_ids: if fid > problem_id and score_map[fid] >= threshold: forward_id = fid break if backward_id is None or forward_id is None: continue if frame_to_seg.get(backward_id) != frame_to_seg.get(forward_id): continue if forward_id - backward_id > _J3: continue bwd_kps = keypoints_by_frame.get(backward_id) or [] fwd_kps = keypoints_by_frame.get(forward_id) or [] if frame_width > 0 and frame_height > 0: common_set = set(_z6(bwd_kps, fwd_kps, frame_width, frame_height)) else: common_set = set(_z5(bwd_kps, fwd_kps)) if len(common_set) < 4: continue key = (backward_id, forward_id) if key not in segments_seen: segments_seen[key] = (bwd_kps, fwd_kps, common_set) already_rewritten: set[int] = set() for (backward_id, forward_id), (bwd_kps, fwd_kps, common_set) in segments_seen.items(): gap = forward_id - backward_id if gap <= 0: continue for interp_id in range(backward_id + 1, forward_id): if interp_id not in batch_frame_ids or interp_id in already_rewritten: continue local_idx = interp_id - offset if local_idx < 0 or local_idx >= len(images): continue video_frame = images[local_idx] weight = (interp_id - backward_id) / gap max_len = max(len(bwd_kps), len(fwd_kps), _N0) new_kps: list[list[float]] = [] for i in range(max_len): if i in common_set and i < len(bwd_kps) and i < len(fwd_kps): bx = float(bwd_kps[i][0]) by = float(bwd_kps[i][1]) fx = float(fwd_kps[i][0]) fy = float(fwd_kps[i][1]) new_kps.append([bx + (fx - bx) * weight, by + (fy - by) * weight]) else: new_kps.append([0.0, 0.0]) if len(new_kps) < _N0: new_kps.extend([[0.0, 0.0]] * (_N0 - len(new_kps))) else: new_kps = new_kps[:_N0] before_score = score_map.get(interp_id, 0.0) new_score, kps_to_apply = _z2_score_and_kps(new_kps, video_frame, template_image) if new_score <= before_score: continue keypoints_by_frame[interp_id] = kps_to_apply if kps_to_apply is not None else new_kps score_map[interp_id] = new_score already_rewritten.add(interp_id) total_updated += 1 return total_updated class _Bx(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float team_id: str | None = None class _FRes(BaseModel): frame_id: int boxes: List[Dict[str, Any]] keypoints: List[List[float]] _FRes.model_rebuild() class _Cfg: def __init__(self, min_area: int = 1300, overlap_iou: float = 0.91): self.overlap_iou = overlap_iou def _d1(bb: _Bx, cy: float) -> float: my = 0.5 * (float(bb.y1) + float(bb.y2)) return (my - cy) ** 2 def _i1(a: _Bx, b: _Bx) -> float: ax1, ay1, ax2, ay2 = int(a.x1), int(a.y1), int(a.x2), int(a.y2) bx1, by1, bx2, by2 = int(b.x1), int(b.y1), int(b.x2), int(b.y2) ix1, iy1 = max(ax1, bx1), max(ay1, by1) ix2, iy2 = min(ax2, bx2), min(ay2, by2) iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) inter = iw * ih if inter <= 0: return 0.0 area_a = (ax2 - ax1) * (ay2 - ay1) area_b = (bx2 - bx1) * (by2 - by1) union = area_a + area_b - inter return inter / union if union > 0 else 0.0 def _iou_box4(a: tuple[float, float, float, float], b: tuple[float, float, float, float]) -> float: ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b ix1, iy1 = max(ax1, bx1), max(ay1, by1) ix2, iy2 = min(ax2, bx2), min(ay2, by2) iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1) inter = iw * ih if inter <= 0: return 0.0 area_a = (ax2 - ax1) * (ay2 - ay1) area_b = (bx2 - bx1) * (by2 - by1) union = area_a + area_b - inter return inter / union if union > 0 else 0.0 def _match_tracks_detections( prev_list: list[tuple[int, tuple[float, float, float, float]]], curr_boxes: list[tuple[float, float, float, float]], iou_thresh: float, exclude_prev: set[int], exclude_curr: set[int], ) -> list[tuple[int, int]]: prev_filtered = [(pi, tid, pbox) for pi, (tid, pbox) in enumerate(prev_list) if pi not in exclude_prev] curr_filtered = [(ci, cbox) for ci, cbox in enumerate(curr_boxes) if ci not in exclude_curr] if not prev_filtered or not curr_filtered: return [] n_prev, n_curr = len(prev_filtered), len(curr_filtered) iou_mat = np.zeros((n_prev, n_curr), dtype=np.float64) for i, (_, _, pbox) in enumerate(prev_filtered): for j, (_, cbox) in enumerate(curr_filtered): iou_mat[i, j] = _iou_box4(pbox, cbox) cost = 1.0 - iou_mat cost[iou_mat < iou_thresh] = 1e9 if _linear_sum_assignment is not None: row_ind, col_ind = _linear_sum_assignment(cost) matches = [ (prev_filtered[row_ind[k]][0], curr_filtered[col_ind[k]][0]) for k in range(len(row_ind)) if cost[row_ind[k], col_ind[k]] < 1.0 ] else: matches = [] iou_pairs = [ (iou_mat[i, j], i, j) for i in range(n_prev) for j in range(n_curr) if iou_mat[i, j] >= iou_thresh ] iou_pairs.sort(key=lambda x: -x[0]) used_prev, used_curr = set(), set() for _, i, j in iou_pairs: pi = prev_filtered[i][0] ci = curr_filtered[j][0] if pi in used_prev or ci in used_curr: continue matches.append((pi, ci)) used_prev.add(pi) used_curr.add(ci) return matches def _predict_box(prev: tuple[float, float, float, float], last: tuple[float, float, float, float]) -> tuple[float, float, float, float]: px1, py1, px2, py2 = prev lx1, ly1, lx2, ly2 = last pcx = 0.5 * (px1 + px2) pcy = 0.5 * (py1 + py2) lcx = 0.5 * (lx1 + lx2) lcy = 0.5 * (ly1 + ly2) w = lx2 - lx1 h = ly2 - ly1 ncx = 2.0 * lcx - pcx ncy = 2.0 * lcy - pcy return (ncx - w * 0.5, ncy - h * 0.5, ncx + w * 0.5, ncy + h * 0.5) def _assign_person_track_ids( prev_state: dict[int, tuple[tuple[float, float, float, float], tuple[float, float, float, float], int]], next_id: int, results: list, iou_thresh: float = _TRACK_IOU_THRESH, iou_high: float = _TRACK_IOU_HIGH, iou_low: float = _TRACK_IOU_LOW, max_age: int = _TRACK_MAX_AGE, use_velocity: bool = _TRACK_USE_VELOCITY, ) -> tuple[dict[int, tuple[tuple[float, float, float, float], tuple[float, float, float, float], int]], int, list[list[int]]]: state = {tid: (prev_box, last_box, age) for tid, (prev_box, last_box, age) in prev_state.items()} nid = next_id ids_per_result: list[list[int]] = [] for result in results: if getattr(result, "boxes", None) is None or len(result.boxes) == 0: state = { tid: (prev_box, last_box, age + 1) for tid, (prev_box, last_box, age) in state.items() if age + 1 <= max_age } ids_per_result.append([]) continue b = result.boxes xyxy = b.xyxy.cpu().numpy() curr_boxes = [tuple(float(x) for x in row) for row in xyxy] prev_list: list[tuple[int, tuple[float, float, float, float]]] = [] for tid, (prev_box, last_box, _age) in state.items(): if use_velocity and (prev_box != last_box): pbox = _predict_box(prev_box, last_box) else: pbox = last_box prev_list.append((tid, pbox)) stage1 = _match_tracks_detections(prev_list, curr_boxes, iou_high, set(), set()) assigned_prev = {pi for pi, _ in stage1} assigned_curr = {ci for _, ci in stage1} stage2 = _match_tracks_detections(prev_list, curr_boxes, iou_low, assigned_prev, assigned_curr) for pi, ci in stage2: assigned_prev.add(pi) assigned_curr.add(ci) tid_per_curr: dict[int, int] = {} for pi, ci in stage1 + stage2: tid_per_curr[ci] = prev_list[pi][0] ids: list[int] = [] new_state: dict[int, tuple[tuple[float, float, float, float], tuple[float, float, float, float], int]] = {} for ci, cbox in enumerate(curr_boxes): if ci in tid_per_curr: tid = tid_per_curr[ci] _prev, last_box, _ = state[tid] new_state[tid] = (last_box, cbox, 0) else: tid = nid nid += 1 new_state[tid] = (cbox, cbox, 0) ids.append(tid) for pi in range(len(prev_list)): if pi in assigned_prev: continue tid = prev_list[pi][0] prev_box, last_box, age = state[tid] if age + 1 <= max_age: new_state[tid] = (prev_box, last_box, age + 1) state = new_state ids_per_result.append(ids) return (state, nid, ids_per_result) def _s0( results: list[_FRes], window: int = _S0, tids_by_frame: dict[int, list[int | None]] | None = None, ) -> list[_FRes]: if window <= 1 or not results: return results fid_to_idx = {r.frame_id: i for i, r in enumerate(results)} trajectories: dict[int, list[tuple[int, int, _Bx]]] = {} for i, r in enumerate(results): boxes_as_bx = [_Bx(**b) if isinstance(b, dict) else b for b in r.boxes] for j, bb in enumerate(boxes_as_bx): tid = tids_by_frame.get(r.frame_id, [None] * len(r.boxes))[j] if tids_by_frame else None if tid is not None and tid >= 0: tid = int(tid) if tid not in trajectories: trajectories[tid] = [] trajectories[tid].append((r.frame_id, j, bb)) smoothed: dict[tuple[int, int], tuple[int, int, int, int]] = {} half = window // 2 for tid, items in trajectories.items(): items.sort(key=lambda x: x[0]) n = len(items) for k in range(n): fid, box_idx, bb = items[k] result_idx = fid_to_idx[fid] lo = max(0, k - half) hi = min(n, k + half + 1) cx_list = [] cy_list = [] w_list = [] h_list = [] for m in range(lo, hi): b = items[m][2] cx_list.append(0.5 * (b.x1 + b.x2)) cy_list.append(0.5 * (b.y1 + b.y2)) w_list.append(b.x2 - b.x1) h_list.append(b.y2 - b.y1) cx_avg = sum(cx_list) / len(cx_list) cy_avg = sum(cy_list) / len(cy_list) w_avg = sum(w_list) / len(w_list) h_avg = sum(h_list) / len(h_list) x1_new = int(round(cx_avg - w_avg / 2)) y1_new = int(round(cy_avg - h_avg / 2)) x2_new = int(round(cx_avg + w_avg / 2)) y2_new = int(round(cy_avg + h_avg / 2)) smoothed[(result_idx, box_idx)] = (x1_new, y1_new, x2_new, y2_new) out: list[_FRes] = [] for i, r in enumerate(results): boxes_as_bx = [_Bx(**b) if isinstance(b, dict) else b for b in r.boxes] new_boxes: list[_Bx] = [] for j, bb in enumerate(boxes_as_bx): key = (i, j) if key in smoothed: x1, y1, x2, y2 = smoothed[key] new_boxes.append( _Bx( x1=x1, y1=y1, x2=x2, y2=y2, cls_id=int(bb.cls_id), conf=float(bb.conf), team_id=bb.team_id, ) ) else: new_boxes.append( _Bx( x1=int(bb.x1), y1=int(bb.y1), x2=int(bb.x2), y2=int(bb.y2), cls_id=int(bb.cls_id), conf=float(bb.conf), team_id=bb.team_id, ) ) out.append(_FRes(frame_id=r.frame_id, boxes=[{"x1": b.x1, "y1": b.y1, "x2": b.x2, "y2": b.y2, "cls_id": b.cls_id, "conf": round(float(b.conf), 2), "team_id": b.team_id} for b in new_boxes], keypoints=r.keypoints)) return out def _a0( bboxes: Iterable[_Bx], *, frame_width: int, frame_height: int, cfg: _Cfg | None = None, do_goalkeeper_dedup: bool = True, do_referee_disambiguation: bool = False, do_ball_dedup: bool = True, ) -> list[_Bx]: cfg = cfg or _Cfg() W, H = int(frame_width), int(frame_height) cy = 0.5 * float(H) kept: list[_Bx] = list(bboxes or []) if cfg.overlap_iou > 0 and len(kept) > 1: balls = [bb for bb in kept if int(bb.cls_id) == _C0] non_balls = [bb for bb in kept if int(bb.cls_id) != _C0] if len(non_balls) > 1: non_balls_sorted = sorted(non_balls, key=lambda bb: float(bb.conf), reverse=True) kept_nb = [] for cand in non_balls_sorted: skip = False for k in kept_nb: iou = _i1(cand, k) if iou >= cfg.overlap_iou: skip = True break if ( abs(int(cand.x1) - int(k.x1)) <= 3 and abs(int(cand.y1) - int(k.y1)) <= 3 and abs(int(cand.x2) - int(k.x2)) <= 3 and abs(int(cand.y2) - int(k.y2)) <= 3 and iou > 0.85 ): skip = True break if not skip: kept_nb.append(cand) kept = kept_nb + balls if do_goalkeeper_dedup: gks = [bb for bb in kept if int(bb.cls_id) == _C1] if len(gks) > 1: best_gk = max(gks, key=lambda bb: float(bb.conf)) best_gk_conf = float(best_gk.conf) deduped = [] for bb in kept: if int(bb.cls_id) == _C1: if float(bb.conf) < best_gk_conf or (float(bb.conf) == best_gk_conf and bb is not best_gk): deduped.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=_C2, conf=float(bb.conf), team_id="1")) else: deduped.append(bb) else: deduped.append(bb) kept = deduped if do_referee_disambiguation: refs = [bb for bb in kept if int(bb.cls_id) == _C3] if len(refs) > 1: best_ref = min(refs, key=lambda bb: _d1(bb, cy)) kept = [bb for bb in kept if int(bb.cls_id) != _C3 or bb is best_ref] if do_ball_dedup: balls = [bb for bb in kept if int(bb.cls_id) == _C0] if len(balls) > 1: best_ball = max(balls, key=lambda bb: float(bb.conf)) kept = [bb for bb in kept if int(bb.cls_id) != _C0] + [best_ball] return kept def _k0(feats: np.ndarray, iters: int = 20) -> tuple[np.ndarray, np.ndarray]: n, d = feats.shape if n <= 0: return np.zeros((2, d), dtype=np.float32), np.zeros(0, dtype=np.int64) if n == 1: return np.stack([feats[0], feats[0]], axis=0), np.zeros(1, dtype=np.int64) c0 = feats[0] d0 = np.linalg.norm(feats - c0[None, :], axis=1) c1 = feats[int(np.argmax(d0))] d1 = np.linalg.norm(feats - c1[None, :], axis=1) c0 = feats[int(np.argmax(d1))] centroids = np.stack([c0, c1], axis=0).astype(np.float32) labels = np.zeros(n, dtype=np.int64) for _ in range(iters): dist = ((feats[:, None, :] - centroids[None, :, :]) ** 2).sum(axis=2) labels = dist.argmin(axis=1) for k in (0, 1): sel = feats[labels == k] if len(sel) > 0: centroids[k] = sel.mean(axis=0) return centroids, labels def _m0(prev: np.ndarray, new: np.ndarray) -> np.ndarray: d00 = np.sum((prev[0] - new[0]) ** 2) d11 = np.sum((prev[1] - new[1]) ** 2) d01 = np.sum((prev[0] - new[1]) ** 2) d10 = np.sum((prev[1] - new[0]) ** 2) if d00 + d11 <= d01 + d10: return new return np.stack([new[1], new[0]], axis=0) # ── OSNet team classification (turbo5-style): embed + aggregate by track + KMeans ── _USE_OSNET_TEAM = True # if True and osnet weights exist, use OSNet for team assignment OSNET_IMAGE_SIZE = (64, 32) # (height, width) OSNET_PREPROCESS = T.Compose([ T.Resize(OSNET_IMAGE_SIZE), T.ToTensor(), T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) def _crop_upper_body_bx(frame: ndarray, box: _Bx) -> ndarray: return frame[ max(0, box.y1) : max(0, box.y2), max(0, box.x1) : max(0, box.x2), ] def _preprocess_osnet(crop: ndarray) -> torch.Tensor: rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) pil = Image.fromarray(rgb) return OSNET_PREPROCESS(pil) def _filter_player_boxes_bx(boxes: list[_Bx]) -> list[_Bx]: return [b for b in boxes if int(b.cls_id) == _C2] # OSNet architecture (from turbo5) class _ConvLayer(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, groups=1, IN=False): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, bias=False, groups=groups) self.bn = nn.InstanceNorm2d(out_channels, affine=True) if IN else nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() def forward(self, x): return self.relu(self.bn(self.conv(x))) class _Conv1x1(nn.Module): def __init__(self, in_channels, out_channels, stride=1, groups=1): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, 1, stride=stride, padding=0, bias=False, groups=groups) self.bn = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() def forward(self, x): return self.relu(self.bn(self.conv(x))) class _Conv1x1Linear(nn.Module): def __init__(self, in_channels, out_channels, stride=1, bn=True): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, 1, stride=stride, padding=0, bias=False) self.bn = nn.BatchNorm2d(out_channels) if bn else None def forward(self, x): x = self.conv(x) return self.bn(x) if self.bn is not None else x class _Conv3x3(nn.Module): def __init__(self, in_channels, out_channels, stride=1, groups=1): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False, groups=groups) self.bn = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() def forward(self, x): return self.relu(self.bn(self.conv(x))) class _LightConv3x3(nn.Module): def __init__(self, in_channels, out_channels): super().__init__() self.conv1 = nn.Conv2d(in_channels, out_channels, 1, stride=1, padding=0, bias=False) self.conv2 = nn.Conv2d(out_channels, out_channels, 3, stride=1, padding=1, bias=False, groups=out_channels) self.bn = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() def forward(self, x): x = self.conv1(x) x = self.conv2(x) return self.relu(self.bn(x)) class _LightConvStream(nn.Module): def __init__(self, in_channels, out_channels, depth): super().__init__() layers = [_LightConv3x3(in_channels, out_channels)] for _ in range(depth - 1): layers.append(_LightConv3x3(out_channels, out_channels)) self.layers = nn.Sequential(*layers) def forward(self, x): return self.layers(x) class _ChannelGate(nn.Module): def __init__(self, in_channels, num_gates=None, return_gates=False, gate_activation="sigmoid", reduction=16, layer_norm=False): super().__init__() if num_gates is None: num_gates = in_channels self.return_gates = return_gates self.global_avgpool = nn.AdaptiveAvgPool2d(1) self.fc1 = nn.Conv2d(in_channels, in_channels // reduction, kernel_size=1, bias=True, padding=0) self.norm1 = nn.LayerNorm((in_channels // reduction, 1, 1)) if layer_norm else None self.relu = nn.ReLU() self.fc2 = nn.Conv2d(in_channels // reduction, num_gates, kernel_size=1, bias=True, padding=0) self.gate_activation = nn.Sigmoid() if gate_activation == "sigmoid" else nn.ReLU() def forward(self, x): inp = x x = self.global_avgpool(x) x = self.fc1(x) if self.norm1 is not None: x = self.norm1(x) x = self.relu(x) x = self.fc2(x) if self.gate_activation is not None: x = self.gate_activation(x) return x if self.return_gates else inp * x class _OSBlockX1(nn.Module): def __init__(self, in_channels, out_channels, IN=False, bottleneck_reduction=4): super().__init__() mid_channels = out_channels // bottleneck_reduction self.conv1 = _Conv1x1(in_channels, mid_channels) self.conv2a = _LightConv3x3(mid_channels, mid_channels) self.conv2b = nn.Sequential(_LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels)) self.conv2c = nn.Sequential(_LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels)) self.conv2d = nn.Sequential(_LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels)) self.gate = _ChannelGate(mid_channels) self.conv3 = _Conv1x1Linear(mid_channels, out_channels) self.downsample = _Conv1x1Linear(in_channels, out_channels) if in_channels != out_channels else None self.IN = nn.InstanceNorm2d(out_channels, affine=True) if IN else None def forward(self, x): identity = x x1 = self.conv1(x) x2 = self.gate(self.conv2a(x1)) + self.gate(self.conv2b(x1)) + self.gate(self.conv2c(x1)) + self.gate(self.conv2d(x1)) x3 = self.conv3(x2) if self.downsample is not None: identity = self.downsample(identity) out = x3 + identity if self.IN is not None: out = self.IN(out) return F.relu(out) class _OSNetX1(nn.Module): def __init__(self, num_classes, blocks, layers, channels, feature_dim=512, loss="softmax", IN=False): super().__init__() self.loss = loss self.feature_dim = feature_dim self.conv1 = _ConvLayer(3, channels[0], 7, stride=2, padding=3, IN=IN) self.maxpool = nn.MaxPool2d(3, stride=2, padding=1) self.conv2 = self._make_layer(blocks[0], layers[0], channels[0], channels[1], reduce_spatial_size=True, IN=IN) self.conv3 = self._make_layer(blocks[1], layers[1], channels[1], channels[2], reduce_spatial_size=True) self.conv4 = self._make_layer(blocks[2], layers[2], channels[2], channels[3], reduce_spatial_size=False) self.conv5 = _Conv1x1(channels[3], channels[3]) self.global_avgpool = nn.AdaptiveAvgPool2d(1) self.fc = self._construct_fc_layer(feature_dim, channels[3], dropout_p=None) self.classifier = nn.Linear(self.feature_dim, num_classes) self._init_params() def _make_layer(self, block, layer, in_channels, out_channels, reduce_spatial_size, IN=False): layers_list = [block(in_channels, out_channels, IN=IN)] for _ in range(1, layer): layers_list.append(block(out_channels, out_channels, IN=IN)) if reduce_spatial_size: layers_list.append(nn.Sequential(_Conv1x1(out_channels, out_channels), nn.AvgPool2d(2, stride=2))) return nn.Sequential(*layers_list) def _construct_fc_layer(self, fc_dims, input_dim, dropout_p=None): if fc_dims is None or fc_dims < 0: self.feature_dim = input_dim return None if isinstance(fc_dims, int): fc_dims = [fc_dims] layers_list = [] for dim in fc_dims: layers_list.append(nn.Linear(input_dim, dim)) layers_list.append(nn.BatchNorm1d(dim)) layers_list.append(nn.ReLU(inplace=True)) if dropout_p is not None: layers_list.append(nn.Dropout(p=dropout_p)) input_dim = dim self.feature_dim = fc_dims[-1] return nn.Sequential(*layers_list) def _init_params(self): for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") if m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, (nn.BatchNorm2d, nn.BatchNorm1d)): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) elif isinstance(m, nn.InstanceNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) elif isinstance(m, nn.Linear): nn.init.normal_(m.weight, 0, 0.01) if m.bias is not None: nn.init.constant_(m.bias, 0) def forward(self, x, return_featuremaps=False): x = self.conv1(x) x = self.maxpool(x) x = self.conv2(x) x = self.conv3(x) x = self.conv4(x) x = self.conv5(x) if return_featuremaps: return x v = self.global_avgpool(x) v = v.view(v.size(0), -1) if self.fc is not None: v = self.fc(v) if not self.training: return v y = self.classifier(v) if self.loss == "softmax": return y elif self.loss == "triplet": return y, v raise KeyError(f"Unsupported loss: {self.loss}") def _osnet_x1_0(num_classes=1000, pretrained=True, loss="softmax", **kwargs): return _OSNetX1( num_classes, blocks=[_OSBlockX1, _OSBlockX1, _OSBlockX1], layers=[2, 2, 2], channels=[64, 256, 384, 512], loss=loss, **kwargs, ) def _load_checkpoint_osnet(fpath: str): fpath = os.path.abspath(os.path.expanduser(fpath)) map_location = None if torch.cuda.is_available() else "cpu" return torch.load(fpath, map_location=map_location, weights_only=False) def _load_pretrained_weights_osnet(model: nn.Module, weight_path: str) -> None: checkpoint = _load_checkpoint_osnet(weight_path) state_dict = checkpoint.get("state_dict", checkpoint) model_dict = model.state_dict() new_state_dict = OrderedDict() for k, v in state_dict.items(): if k.startswith("module."): k = k[7:] if k in model_dict and model_dict[k].size() == v.size(): new_state_dict[k] = v model_dict.update(new_state_dict) model.load_state_dict(model_dict) def _load_osnet(device: str = "cuda", weight_path: Optional[Path] = None) -> Optional[nn.Module]: model = _osnet_x1_0(num_classes=1, loss="softmax", pretrained=False) if weight_path and Path(weight_path).exists(): _load_pretrained_weights_osnet(model, str(weight_path)) model.eval() model.to(device) return model def _extract_osnet_embeddings( model: nn.Module, frames: list[ndarray], bboxes_by_frame: dict[int, list[_Bx]], track_ids_by_frame: dict[int, list[int | None]], frame_offset: int, device: str, ) -> tuple[Optional[ndarray], Optional[list[tuple[int, int, int | None]]]]: """Extract OSNet embeddings for player boxes; return (embeddings, meta) with meta = (frame_idx, box_idx, track_id).""" crops = [] meta: list[tuple[int, int, int | None]] = [] for fi in range(len(frames)): frame = frames[fi] if fi < len(frames) else None if frame is None: continue frame_id = frame_offset + fi boxes = bboxes_by_frame.get(frame_id, []) tids = track_ids_by_frame.get(frame_id, [None] * len(boxes)) for bi, box in enumerate(boxes): if int(box.cls_id) != _C2: continue track_id = tids[bi] if bi < len(tids) else None crop = _crop_upper_body_bx(frame, box) if crop.size == 0: continue crops.append(_preprocess_osnet(crop)) meta.append((fi, bi, track_id)) if not crops: return None, None batch = torch.stack(crops).to(device).float() with torch.inference_mode(): embeddings = model(batch) del batch embeddings = embeddings.cpu().numpy() return embeddings, meta def _aggregate_by_track_osnet( embeddings: ndarray, meta: list[tuple[int, int, int | None]], ) -> tuple[ndarray, list[tuple[int, int, int | None]]]: track_map: dict[int | None, list[int]] = defaultdict(list) meta_by_track: dict[int | None, tuple[int, int, int | None]] = {} for idx, (fi, bi, tid) in enumerate(meta): key = tid if tid is not None else id((fi, bi)) track_map[key].append(idx) meta_by_track[key] = (fi, bi, tid) agg_embeddings = [] agg_meta = [] for key, indices in track_map.items(): mean_emb = np.mean(embeddings[indices], axis=0) norm = np.linalg.norm(mean_emb) if norm > 1e-12: mean_emb /= norm agg_embeddings.append(mean_emb) agg_meta.append(meta_by_track[key]) return np.array(agg_embeddings), agg_meta def _classify_teams_osnet( agg_embeddings: ndarray, agg_meta: list[tuple[int, int, int | None]], ) -> dict[int | None, str]: """KMeans on aggregated embeddings; return track_id -> team_id '1' or '2'.""" n = len(agg_embeddings) track_to_team: dict[int | None, str] = {} if n == 0: return track_to_team if n == 1: track_to_team[agg_meta[0][2]] = "1" return track_to_team kmeans = KMeans(n_clusters=2, n_init=2, random_state=42) kmeans.fit(agg_embeddings) centroids = kmeans.cluster_centers_ c0, c1 = centroids[0], centroids[1] norm_0 = np.linalg.norm(c0) norm_1 = np.linalg.norm(c1) similarity = np.dot(c0, c1) / (norm_0 * norm_1 + 1e-12) if similarity > 0.95: for (_, _, tid) in agg_meta: track_to_team[tid] = "1" return track_to_team if norm_0 <= norm_1: kmeans.labels_ = 1 - kmeans.labels_ for (fi, bi, tid), label in zip(agg_meta, kmeans.labels_): track_to_team[tid] = "1" if label == 0 else "2" return track_to_team class _Pl: def __init__(self, repo_root: Path) -> None: self.repo_root = Path(repo_root) self._executor = ThreadPoolExecutor(max_workers=3) self._track_id_to_team_votes: dict[int, dict[str, int]] = {} self._track_id_to_class_votes: dict[int, dict[int, int]] = {} self._osnet_model: Optional[nn.Module] = None self._osnet_device = "cuda" if torch.cuda.is_available() else "cpu" if _USE_OSNET_TEAM: _osnet_path = self.repo_root / "models" / "osnet_model.pth.tar-100" if _osnet_path.exists(): try: self._osnet_model = _load_osnet(self._osnet_device, _osnet_path) except Exception: self._osnet_model = None self._tracker_config = "botsort.yaml" models_dir = self.repo_root / "models" if _B2: self.ball_model = YOLO(str(models_dir / "ball-detection-model.onnx"), task="detect") else: self.ball_model = None self.person_model = YOLO(str(models_dir / "person-detection-model.onnx"), task="detect") self._person_tracker_state: dict[int, tuple[tuple[float, float, float, float], tuple[float, float, float, float], int]] = {} self._person_tracker_next_id = 0 self._keypoint_model_hrnet = None _yaml_path = self.repo_root / "hrnetv2_w48.yaml" _weights_path = self.repo_root / "models" / "keypoint" if _f0 and _yaml_path.exists() and _weights_path.exists(): try: self._keypoint_model_hrnet = _l0( self.repo_root, weights_subdir="models" ) except Exception: self._keypoint_model_hrnet = None self._current_batch_bbox_timings: list[tuple[str, float]] = [] self._current_batch_kp_timings: list[tuple[str, float]] = [] self._prev_batch_tail_tid_counts: dict[int, int] = {} def reset_for_new_video(self) -> None: self._track_id_to_team_votes.clear() self._track_id_to_class_votes.clear() self._prev_batch_tail_tid_counts.clear() self._person_tracker_state.clear() self._person_tracker_next_id = 0 def _keypoint_hrnet_task( self, images: list[ndarray], offset: int, n_keypoints: int, ) -> dict[int, list[list[float]]]: _kp_timings: list[tuple[str, float]] = [] t_total = time.perf_counter() default_kps = [[0.0, 0.0] for _ in range(n_keypoints)] if not _f0 or self._keypoint_model_hrnet is None: self._current_batch_kp_timings = [] return {offset + i: list(default_kps) for i in range(len(images))} device = "cuda" if next(self._keypoint_model_hrnet.parameters()).is_cuda else "cpu" kp_threshold = 0.2 _t = time.perf_counter() kp_result = _x0( images, self._keypoint_model_hrnet, kp_threshold, device, batch_size=_KP_BS ) _kp_timings.append(("kp_hrnet", time.perf_counter() - _t)) _t = time.perf_counter() h, w = images[0].shape[:2] if n_keypoints == 32: keypoints_xyp = _normalize_keypoints_xyp(kp_result, images, n_keypoints) if _FKP_FAST_MODE: job = _fkp_normalize_results(keypoints_xyp, _FKP_SINGLE_THRESHOLD) keypoints = [] for idx in range(len(images)): kps = _fix_keypoints(job[idx] if idx < len(job) else [(0, 0)] * 32, n_keypoints) adjusted = _step8_one_frame_kp(kps, w, h, False, n_keypoints) keypoints.append(_keypoints_to_float(adjusted if adjusted is not None else kps)) else: job = _fkp_normalize_results(keypoints_xyp, _FKP_SINGLE_THRESHOLD) keypoints = [] for idx in range(len(images)): kps = _fix_keypoints(job[idx] if idx < len(job) else [(0, 0)] * 32, n_keypoints) kps_float = _keypoints_to_float(kps) try: refined = _apply_homography_refinement(kps_float, images[idx], n_keypoints) keypoints.append(refined) except Exception: keypoints.append(kps_float) else: keypoints = _n0(kp_result, images, n_keypoints) keypoints = [_fix_keypoints(kps, n_keypoints) for kps in keypoints] keypoints = [_keypoints_to_float(kps) for kps in keypoints] _kp_timings.append(("kp_normalize", time.perf_counter() - _t)) _t = time.perf_counter() out: dict[int, list[list[float]]] = {} for i, kpts in enumerate(keypoints): out[offset + i] = _c1(kpts) _kp_timings.append(("kp_to_output", time.perf_counter() - _t)) _kp_timings.append(("kp_total", time.perf_counter() - t_total)) self._current_batch_kp_timings = _kp_timings return out def _bbox_task( self, images: list[ndarray], offset: int, imgsz: int, conf: float, onnx_batch_size: int, ) -> dict[int, list[_Bx]]: _bbox_timings: list[tuple[str, float]] = [] _t0 = time.perf_counter() ball_res: list = [] if _B2 and self.ball_model is not None: _t = time.perf_counter() for start in range(0, len(images), onnx_batch_size): chunk = images[start : start + onnx_batch_size] batch_res = self.ball_model.predict(chunk, imgsz=imgsz, conf=conf, verbose=False) ball_res.extend(batch_res if batch_res else []) _bbox_timings.append(("bbox_ball_detect", time.perf_counter() - _t)) _t = time.perf_counter() batch_res = self.person_model(images, imgsz=_D0_PERSON, conf=conf, iou=0.5, agnostic_nms=True, verbose=False) if not isinstance(batch_res, list): batch_res = [batch_res] if batch_res is not None else [] self._person_tracker_state, self._person_tracker_next_id, person_track_ids = _assign_person_track_ids( self._person_tracker_state, self._person_tracker_next_id, batch_res, _TRACK_IOU_THRESH ) person_res = batch_res _bbox_timings.append(("bbox_person_track", time.perf_counter() - _t)) bboxes_by_frame: dict[int, list[_Bx]] = {} track_ids_by_frame: dict[int, list[int | None]] = {} boxes_raw_list: list[list[_Bx]] = [] track_ids_raw_list: list[list[int | None]] = [] bbox_to_track_list: list[dict[tuple[int, int, int, int], int]] = [] _t = time.perf_counter() for i, frame in enumerate(images): frame_id = offset + i boxes_raw = [] track_ids_raw: list[int | None] = [] bbox_to_track: dict[tuple[int, int, int, int], int] = {} if _B2: det_ball = ball_res[i] if i < len(ball_res) else None if det_ball is not None and getattr(det_ball, "boxes", None) is not None and len(det_ball.boxes) > 0: b = det_ball.boxes xyxy = b.xyxy.cpu().numpy() confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32) clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32) for (x1, y1, x2, y2), c, cf in zip(xyxy, clss, confs): if int(c) == 0: boxes_raw.append(_Bx(x1=int(round(x1)), y1=int(round(y1)), x2=int(round(x2)), y2=int(round(y2)), cls_id=_C0, conf=float(cf))) track_ids_raw.append(None) det_p = person_res[i] if i < len(person_res) else None if det_p is not None and getattr(det_p, "boxes", None) is not None and len(det_p.boxes) > 0: b = det_p.boxes xyxy = b.xyxy.cpu().numpy() confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32) clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32) if i < len(person_track_ids) and len(person_track_ids[i]) == len(clss): track_ids = np.array(person_track_ids[i], dtype=np.int32) else: track_ids = np.full(len(clss), -1, dtype=np.int32) for (x1, y1, x2, y2), c, cf, tid in zip(xyxy, clss, confs, track_ids): c = int(c) tid = int(tid) x1r, y1r, x2r, y2r = int(round(x1)), int(round(y1)), int(round(x2)), int(round(y2)) if tid >= 0: bbox_to_track[(x1r, y1r, x2r, y2r)] = tid tid_out = tid if tid >= 0 else None if c == 0: boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C2, conf=float(cf))) track_ids_raw.append(tid_out) elif c == 1: boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C3, conf=float(cf))) track_ids_raw.append(tid_out) elif c == 2: boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C1, conf=float(cf))) track_ids_raw.append(tid_out) boxes_raw_list.append(boxes_raw) track_ids_raw_list.append(track_ids_raw) bbox_to_track_list.append(bbox_to_track) _bbox_timings.append(("bbox_parse_ball_person", time.perf_counter() - _t)) for i in range(len(images)): bboxes_by_frame[offset + i] = boxes_raw_list[i] track_ids_by_frame[offset + i] = track_ids_raw_list[i] if i < len(track_ids_raw_list) else [None] * len(boxes_raw_list[i]) if _G0 and len(images) > _G2: _t = time.perf_counter() tid_counts: dict[int, int] = {} tid_first_frame: dict[int, int] = {} for fid in range(offset, offset + len(images)): tids = track_ids_by_frame.get(fid, []) for tid in tids: if tid is not None and tid >= 0: t = int(tid) tid_counts[t] = tid_counts.get(t, 0) + 1 if t not in tid_first_frame or fid < tid_first_frame[t]: tid_first_frame[t] = fid for t, prev_count in self._prev_batch_tail_tid_counts.items(): tid_counts[t] = tid_counts.get(t, 0) + prev_count if prev_count > 0: tid_first_frame[t] = offset + len(images) boundary = offset + len(images) - _G2 noise_tids = { t for t, count in tid_counts.items() if count < _G1 and tid_first_frame[t] < boundary } for fid in range(offset, offset + len(images)): boxes = bboxes_by_frame.get(fid, []) tids = track_ids_by_frame.get(fid, [None] * len(boxes)) if len(tids) != len(boxes): tids = tids + [None] * (len(boxes) - len(tids)) keep = [ i for i in range(len(boxes)) if tids[i] is None or int(tids[i]) not in noise_tids ] bboxes_by_frame[fid] = [boxes[i] for i in keep] track_ids_by_frame[fid] = [tids[i] for i in keep] tail_start = offset + len(images) - _G2 self._prev_batch_tail_tid_counts = {} for fid in range(tail_start, offset + len(images)): tids = track_ids_by_frame.get(fid, []) for tid in tids: if tid is not None and tid >= 0: t = int(tid) self._prev_batch_tail_tid_counts[t] = self._prev_batch_tail_tid_counts.get(t, 0) + 1 _bbox_timings.append(("bbox_noise_filter", time.perf_counter() - _t)) _t = time.perf_counter() for i, frame in enumerate(images): frame_id = offset + i boxes_raw = bboxes_by_frame[frame_id] track_ids_raw = track_ids_by_frame[frame_id] bbox_to_track = {(int(bb.x1), int(bb.y1), int(bb.x2), int(bb.y2)): int(tid) for bb, tid in zip(boxes_raw, track_ids_raw) if tid is not None and int(tid) >= 0} boxes_stabilized = [] track_ids_stabilized: list[int | None] = [] for idx, bb in enumerate(boxes_raw): best_tid = -1 best_iou = 0.0 for (bx1, by1, bx2, by2), tid in bbox_to_track.items(): iou = _i1(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=0, conf=0.0), _Bx(x1=bx1, y1=by1, x2=bx2, y2=by2, cls_id=0, conf=0.0)) if iou > best_iou and iou > 0.5: best_iou, best_tid = iou, tid tid_out = best_tid if best_tid >= 0 else (track_ids_raw[idx] if idx < len(track_ids_raw) else None) if best_tid >= 0: if _G5: if best_tid not in self._track_id_to_class_votes: self._track_id_to_class_votes[best_tid] = {} cls_key = int(bb.cls_id) self._track_id_to_class_votes[best_tid][cls_key] = self._track_id_to_class_votes[best_tid].get(cls_key, 0) + 1 boxes_stabilized.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=bb.cls_id, conf=bb.conf, team_id=None)) track_ids_stabilized.append(tid_out) else: boxes_stabilized.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=bb.cls_id, conf=bb.conf, team_id=None)) track_ids_stabilized.append(tid_out) bboxes_by_frame[frame_id] = boxes_stabilized track_ids_by_frame[frame_id] = track_ids_stabilized _bbox_timings.append(("bbox_stabilize_track_ids", time.perf_counter() - _t)) _t = time.perf_counter() for fid in range(offset, offset + len(images)): new_boxes = [] tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) for box_idx, box in enumerate(bboxes_by_frame[fid]): tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None if _G5 and tid is not None and tid >= 0 and tid in self._track_id_to_class_votes: votes = self._track_id_to_class_votes[tid] ref_votes = votes.get(_C3, 0) gk_votes = votes.get(_C1, 0) if _G6 and ref_votes > _G3: majority_cls = _C3 elif _G7 and gk_votes > _G3: majority_cls = _C1 else: majority_cls = max(votes.items(), key=lambda x: x[1])[0] new_boxes.append(_Bx(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=majority_cls, conf=box.conf, team_id=None)) else: new_boxes.append(box) bboxes_by_frame[fid] = new_boxes track_ids_by_frame[fid] = tids_fid _bbox_timings.append(("bbox_class_votes", time.perf_counter() - _t)) if _B5 and len(images) > 1: _t = time.perf_counter() track_to_frames: dict[int, list[tuple[int, _Bx]]] = {} for fid in range(offset, offset + len(images)): boxes = bboxes_by_frame.get(fid, []) tids = track_ids_by_frame.get(fid, [None] * len(boxes)) for bb, tid in zip(boxes, tids): if tid is not None and int(tid) >= 0: t = int(tid) track_to_frames.setdefault(t, []).append((fid, bb)) to_add: dict[int, list[tuple[_Bx, int]]] = {} for t, pairs in track_to_frames.items(): pairs.sort(key=lambda p: p[0]) for i in range(len(pairs) - 1): f1, b1 = pairs[i] f2, b2 = pairs[i + 1] if f2 - f1 <= 1: continue for g in range(f1 + 1, f2): w = (g - f1) / (f2 - f1) x1 = int(round((1 - w) * b1.x1 + w * b2.x1)) y1 = int(round((1 - w) * b1.y1 + w * b2.y1)) x2 = int(round((1 - w) * b1.x2 + w * b2.x2)) y2 = int(round((1 - w) * b1.y2 + w * b2.y2)) interp = _Bx(x1=x1, y1=y1, x2=x2, y2=y2, cls_id=b2.cls_id, conf=b2.conf, team_id=b2.team_id) to_add.setdefault(g, []).append((interp, t)) for g, add_list in to_add.items(): bboxes_by_frame[g] = list(bboxes_by_frame.get(g, [])) track_ids_by_frame[g] = list(track_ids_by_frame.get(g, [])) for interp_box, tid in add_list: bboxes_by_frame[g].append(interp_box) track_ids_by_frame[g].append(tid) _bbox_timings.append(("bbox_interp_gaps", time.perf_counter() - _t)) reid_team_per_frame: list[list[Optional[str]]] = [[None] * len(bboxes_by_frame[offset + fi]) for fi in range(len(images))] if self._osnet_model is not None: _t_reid_total = time.perf_counter() emb, meta = _extract_osnet_embeddings( self._osnet_model, images, bboxes_by_frame, track_ids_by_frame, offset, self._osnet_device ) if emb is not None and meta is not None: agg_emb, agg_meta = _aggregate_by_track_osnet(emb, meta) track_to_team = _classify_teams_osnet(agg_emb, agg_meta) for fi in range(len(images)): frame_id = offset + fi boxes_f = bboxes_by_frame.get(frame_id, []) tids_f = track_ids_by_frame.get(frame_id, []) for bi in range(len(boxes_f)): tid = tids_f[bi] if bi < len(tids_f) else None if tid in track_to_team and bi < len(reid_team_per_frame[fi]): reid_team_per_frame[fi][bi] = track_to_team[tid] _bbox_timings.append(("bbox_reid_team", time.perf_counter() - _t_reid_total)) _t = time.perf_counter() for i in range(len(images)): frame_id = offset + i boxes = bboxes_by_frame[frame_id] tids_fid = track_ids_by_frame[frame_id] for box_idx, bb in enumerate(boxes): tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None team_from_reid = reid_team_per_frame[i][box_idx] if box_idx < len(reid_team_per_frame[i]) else None if _G8 and tid is not None and tid >= 0 and team_from_reid: if tid not in self._track_id_to_team_votes: self._track_id_to_team_votes[tid] = {} team_key = team_from_reid.strip() self._track_id_to_team_votes[tid][team_key] = self._track_id_to_team_votes[tid].get(team_key, 0) + 1 for fid in range(offset, offset + len(images)): new_boxes = [] tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) fi = fid - offset for box_idx, box in enumerate(bboxes_by_frame[fid]): tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None team_from_reid = reid_team_per_frame[fi][box_idx] if fi < len(reid_team_per_frame) and box_idx < len(reid_team_per_frame[fi]) else None default_team = team_from_reid or box.team_id if _G8 and tid is not None and tid >= 0 and tid in self._track_id_to_team_votes and self._track_id_to_team_votes[tid]: majority_team = max(self._track_id_to_team_votes[tid].items(), key=lambda x: x[1])[0] else: majority_team = default_team new_boxes.append(_Bx(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=box.cls_id, conf=box.conf, team_id=majority_team)) bboxes_by_frame[fid] = new_boxes track_ids_by_frame[fid] = tids_fid _bbox_timings.append(("bbox_team_votes", time.perf_counter() - _t)) if len(images) > 0: _t = time.perf_counter() H, W = images[0].shape[:2] for fid in range(offset, offset + len(images)): orig_boxes = bboxes_by_frame[fid] orig_tids = track_ids_by_frame.get(fid, [None] * len(orig_boxes)) adjusted = _a0( orig_boxes, frame_width=W, frame_height=H, do_goalkeeper_dedup=_B3, do_referee_disambiguation=_B4, do_ball_dedup=_B1, ) adjusted_tids: list[int | None] = [] used_orig = set() for ab in adjusted: matched = None for oi, ob in enumerate(orig_boxes): if oi in used_orig: continue if ob.x1 == ab.x1 and ob.y1 == ab.y1 and ob.x2 == ab.x2 and ob.y2 == ab.y2: matched = orig_tids[oi] if oi < len(orig_tids) else None used_orig.add(oi) break adjusted_tids.append(matched) if _B0 > 0: new_adjusted = [] new_adjusted_tids = [] for ab, tid in zip(adjusted, adjusted_tids): if int(ab.cls_id) == _C0 and float(ab.conf) < _B0: continue new_adjusted.append(ab) new_adjusted_tids.append(tid) adjusted = new_adjusted adjusted_tids = new_adjusted_tids if _q0 != 0.0 or _q1 != 0.0: boxes_offset = [] offset_tids = [] for ab_idx, bb in enumerate(adjusted): cx = 0.5 * (bb.x1 + bb.x2) cy = 0.5 * (bb.y1 + bb.y2) w = bb.x2 - bb.x1 h = bb.y2 - bb.y1 cx *= 1.0 + _q0 cy *= 1.0 + _q1 boxes_offset.append(_Bx(x1=int(round(cx - w/2)), y1=int(round(cy - h/2)), x2=int(round(cx + w/2)), y2=int(round(cy + h/2)), cls_id=bb.cls_id, conf=bb.conf, team_id=bb.team_id)) offset_tids.append(adjusted_tids[ab_idx] if ab_idx < len(adjusted_tids) else None) adjusted = boxes_offset adjusted_tids = offset_tids bboxes_by_frame[fid] = adjusted track_ids_by_frame[fid] = adjusted_tids _bbox_timings.append(("bbox_adjust_boxes", time.perf_counter() - _t)) if _A0 and _S0 > 1 and len(images) > 0: _t = time.perf_counter() _tmp_results = [] for fid in range(offset, offset + len(images)): _boxes = bboxes_by_frame.get(fid, []) _tmp_results.append( _FRes( frame_id=fid, boxes=[{"x1": int(b.x1), "y1": int(b.y1), "x2": int(b.x2), "y2": int(b.y2), "cls_id": int(b.cls_id), "conf": round(float(b.conf), 2), "team_id": b.team_id} for b in _boxes], keypoints=[], ) ) _tmp_results = _s0(_tmp_results, window=_S0, tids_by_frame=track_ids_by_frame) for r in _tmp_results: bboxes_by_frame[int(r.frame_id)] = [_Bx(**box) for box in r.boxes] _bbox_timings.append(("bbox_smoothing", time.perf_counter() - _t)) _bbox_timings.append(("bbox_total", time.perf_counter() - _t0)) self._current_batch_bbox_timings = _bbox_timings return bboxes_by_frame def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[_FRes]: if not batch_images: return [] if offset == 0: self.reset_for_new_video() gc.collect() try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() except Exception: pass images = list(batch_images) n_frames = len(images) imgsz = _D0 conf = _D1 executor = self._executor default_kps = [[0.0, 0.0] for _ in range(n_keypoints)] if _E0 and _E1 and _P0: future_bbox = executor.submit(self._bbox_task, images, offset, imgsz, conf, _BX_BS) future_kp = executor.submit(self._keypoint_hrnet_task, images, offset, n_keypoints) bboxes_by_frame = future_bbox.result() keypoints_by_frame = future_kp.result() elif _E0 and _E1: bboxes_by_frame = self._bbox_task(images, offset, imgsz, conf, _BX_BS) keypoints_by_frame = self._keypoint_hrnet_task(images, offset, n_keypoints) else: if _E0: bboxes_by_frame = self._bbox_task(images, offset, imgsz, conf, _BX_BS) else: bboxes_by_frame = {offset + i: [] for i in range(len(images))} self._current_batch_bbox_timings = [] if _E1: keypoints_by_frame = self._keypoint_hrnet_task(images, offset, n_keypoints) else: keypoints_by_frame = {offset + i: list(default_kps) for i in range(len(images))} self._current_batch_kp_timings = [] if _STEP0_ENABLED and keypoints_by_frame: _t = time.perf_counter() for fid in list(keypoints_by_frame.keys()): kps = keypoints_by_frame[fid] if isinstance(kps, list) and len(kps) == _N0: _step0_remove_close_keypoints(kps, _STEP0_PROXIMITY_PX) self._current_batch_kp_timings.append(("kp_step0_remove_close", time.perf_counter() - _t)) if _U0 and _E1 and keypoints_by_frame and n_keypoints == 32 and _N0 == 32: template_img: ndarray | None = getattr(self, "_kp_template_cache", None) if template_img is None: template_img = _y0() if template_img.size > 0 and template_img.sum() > 0: self._kp_template_cache = template_img else: template_img = None _t = time.perf_counter() for idx in range(len(images)): frame_id = offset + idx kps = keypoints_by_frame.get(frame_id) if not kps or len(kps) != 32: continue frame = images[idx] frame_height, frame_width = frame.shape[:2] if template_img is not None: step5_out = _z0(kps, frame, template_img) if step5_out is not None: keypoints_by_frame[frame_id] = step5_out if template_img is not None and _J1: _z8(keypoints_by_frame, images, offset, template_img) self._current_batch_kp_timings.append(("kp_homography", time.perf_counter() - _t)) if _J4: _t = time.perf_counter() for idx in range(len(images)): frame_id = offset + idx kps = keypoints_by_frame.get(frame_id) if not kps or len(kps) != 32: continue frame = images[idx] frame_height, frame_width = frame.shape[:2] adjusted = _z1(kps, frame_width, frame_height, _J0) if adjusted is not None: keypoints_by_frame[frame_id] = adjusted self._current_batch_kp_timings.append(("kp_adjust", time.perf_counter() - _t)) results = [] for idx in range(len(images)): frame_number = offset + idx kps = keypoints_by_frame.get(frame_number, [[0.0, 0.0] for _ in range(n_keypoints)]) if len(kps) != n_keypoints: kps = (kps[:n_keypoints] if len(kps) >= n_keypoints else kps + [[0.0, 0.0]] * (n_keypoints - len(kps))) kps = [[round(float(kp[0]), 1), round(float(kp[1]), 1)] for kp in kps] boxes_raw = bboxes_by_frame.get(frame_number, []) boxes_for_result = [ { "x1": int(b.x1), "y1": int(b.y1), "x2": int(b.x2), "y2": int(b.y2), "cls_id": _CLS_TO_VALIDATOR.get(int(b.cls_id), int(b.cls_id)), "conf": round(float(b.conf), 2), "team_id": b.team_id, } for b in boxes_raw ] results.append(_FRes(frame_id=frame_number, boxes=boxes_for_result, keypoints=kps)) return results class _M: def __init__(self, path_hf_repo: Path) -> None: self.health = "Okay!!!" self.pipeline: _Pl | None = None self.path_hf_repo = Path(path_hf_repo) def __repr__(self) -> str: return self.health def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[_FRes]: if self.pipeline is None: self.pipeline = _Pl(repo_root=self.path_hf_repo) return self.pipeline.predict_batch(batch_images, offset, n_keypoints) Miner = _M