| from __future__ import annotations |
|
|
| import gc |
| import math |
| import os |
| import threading |
| import time |
| from itertools import combinations |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor |
| from collections import OrderedDict, defaultdict |
| from typing import Any, Dict, Iterable, List, Optional |
|
|
| import cv2 |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import yaml |
| from numpy import ndarray |
| from PIL import Image |
| import torchvision.transforms as T |
| from sklearn.cluster import KMeans |
| from pydantic import BaseModel |
| from ultralytics import YOLO |
|
|
| try: |
| from scipy.optimize import linear_sum_assignment as _linear_sum_assignment |
| except ImportError: |
| _linear_sum_assignment = None |
|
|
| _f0 = True |
| BatchNorm2d = nn.BatchNorm2d |
| _v0 = 0.1 |
|
|
|
|
| def _c0(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d: |
| return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False) |
|
|
|
|
| class _B0(nn.Module): |
| expansion = 1 |
|
|
| def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: Any = None): |
| super().__init__() |
| self.conv1 = _c0(inplanes, planes, stride) |
| self.bn1 = BatchNorm2d(planes, momentum=_v0) |
| self.relu = nn.ReLU(inplace=True) |
| self.conv2 = _c0(planes, planes) |
| self.bn2 = BatchNorm2d(planes, momentum=_v0) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| residual = x |
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
| out = self.conv2(out) |
| out = self.bn2(out) |
| if self.downsample is not None: |
| residual = self.downsample(x) |
| out += residual |
| out = self.relu(out) |
| return out |
|
|
|
|
| class _B1(nn.Module): |
| expansion = 4 |
|
|
| def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: Any = None): |
| super().__init__() |
| self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) |
| self.bn1 = BatchNorm2d(planes, momentum=_v0) |
| self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) |
| self.bn2 = BatchNorm2d(planes, momentum=_v0) |
| self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False) |
| self.bn3 = BatchNorm2d(planes * self.expansion, momentum=_v0) |
| self.relu = nn.ReLU(inplace=True) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| residual = x |
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
| out = self.conv2(out) |
| out = self.bn2(out) |
| out = self.relu(out) |
| out = self.conv3(out) |
| out = self.bn3(out) |
| if self.downsample is not None: |
| residual = self.downsample(x) |
| out += residual |
| out = self.relu(out) |
| return out |
|
|
|
|
| _d0 = {"BASIC": _B0, "BOTTLENECK": _B1} |
|
|
|
|
| def _block_from_cfg(block_key: Any) -> type: |
| if isinstance(block_key, bool): |
| return _d0["BOTTLENECK"] if block_key else _d0["BASIC"] |
| key = str(block_key).upper() if block_key else "BASIC" |
| if key not in _d0: |
| key = "BASIC" |
| return _d0[key] |
|
|
|
|
| class _H0(nn.Module): |
| def __init__(self, num_branches: int, blocks: type, num_blocks: list, num_inchannels: list, num_channels: list, fuse_method: str, multi_scale_output: bool = True): |
| super().__init__() |
| if isinstance(blocks, bool): |
| blocks = _d0["BOTTLENECK"] if blocks else _d0["BASIC"] |
| self._check_branches(num_branches, blocks, num_blocks, num_inchannels, num_channels) |
| self.num_inchannels = num_inchannels |
| self.fuse_method = fuse_method |
| self.num_branches = num_branches |
| self.multi_scale_output = multi_scale_output |
| self.branches = self._make_branches(num_branches, blocks, num_blocks, num_channels) |
| self.fuse_layers = self._make_fuse_layers() |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def _check_branches(self, num_branches: int, blocks: type, num_blocks: list, num_inchannels: list, num_channels: list) -> None: |
| if num_branches != len(num_blocks): |
| raise ValueError("NUM_BRANCHES <> NUM_BLOCKS") |
| if num_branches != len(num_channels): |
| raise ValueError("NUM_BRANCHES <> NUM_CHANNELS") |
| if num_branches != len(num_inchannels): |
| raise ValueError("NUM_BRANCHES <> NUM_INCHANNELS") |
|
|
| def _make_one_branch(self, branch_index: int, block: type, num_blocks: list, num_channels: list, stride: int = 1) -> nn.Sequential: |
| if isinstance(block, bool): |
| block = _d0["BOTTLENECK"] if block else _d0["BASIC"] |
| downsample = None |
| if stride != 1 or self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(self.num_inchannels[branch_index], num_channels[branch_index] * block.expansion, kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(num_channels[branch_index] * block.expansion, momentum=_v0), |
| ) |
| layers = [block(self.num_inchannels[branch_index], num_channels[branch_index], stride, downsample)] |
| self.num_inchannels[branch_index] = num_channels[branch_index] * block.expansion |
| for _ in range(1, num_blocks[branch_index]): |
| layers.append(block(self.num_inchannels[branch_index], num_channels[branch_index])) |
| return nn.Sequential(*layers) |
|
|
| def _make_branches(self, num_branches: int, block: type, num_blocks: list, num_channels: list) -> nn.ModuleList: |
| return nn.ModuleList([self._make_one_branch(i, block, num_blocks, num_channels) for i in range(num_branches)]) |
|
|
| def _make_fuse_layers(self) -> nn.ModuleList | None: |
| if self.num_branches == 1: |
| return None |
| num_branches = self.num_branches |
| num_inchannels = self.num_inchannels |
| fuse_layers = [] |
| for i in range(num_branches if self.multi_scale_output else 1): |
| fuse_layer = [] |
| for j in range(num_branches): |
| if j > i: |
| fuse_layer.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[i], 1, 1, 0, bias=False), BatchNorm2d(num_inchannels[i], momentum=_v0))) |
| elif j == i: |
| fuse_layer.append(None) |
| else: |
| conv3x3s = [] |
| for k in range(i - j): |
| if k == i - j - 1: |
| conv3x3s.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[i], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[i], momentum=_v0))) |
| else: |
| conv3x3s.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[j], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[j], momentum=_v0), nn.ReLU(inplace=True))) |
| fuse_layer.append(nn.Sequential(*conv3x3s)) |
| fuse_layers.append(nn.ModuleList(fuse_layer)) |
| return nn.ModuleList(fuse_layers) |
|
|
| def get_num_inchannels(self) -> list: |
| return self.num_inchannels |
|
|
| def forward(self, x: list) -> list: |
| if self.num_branches == 1: |
| return [self.branches[0](x[0])] |
| for i in range(self.num_branches): |
| x[i] = self.branches[i](x[i]) |
| x_fuse = [] |
| for i in range(len(self.fuse_layers)): |
| y = x[0] if i == 0 else self.fuse_layers[i][0](x[0]) |
| for j in range(1, self.num_branches): |
| if i == j: |
| y = y + x[j] |
| elif j > i: |
| y = y + F.interpolate(self.fuse_layers[i][j](x[j]), size=[x[i].shape[2], x[i].shape[3]], mode="bilinear") |
| else: |
| y = y + self.fuse_layers[i][j](x[j]) |
| x_fuse.append(self.relu(y)) |
| return x_fuse |
|
|
|
|
| class _H1(nn.Module): |
| def __init__(self, config: dict, lines: bool = False, **kwargs: Any) -> None: |
| self.inplanes = 64 |
| self.lines = lines |
| extra = config["MODEL"]["EXTRA"] |
| super().__init__() |
| self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=2, padding=1, bias=False) |
| self.bn1 = BatchNorm2d(self.inplanes, momentum=_v0) |
| self.conv2 = nn.Conv2d(self.inplanes, self.inplanes, kernel_size=3, stride=2, padding=1, bias=False) |
| self.bn2 = BatchNorm2d(self.inplanes, momentum=_v0) |
| self.relu = nn.ReLU(inplace=True) |
| self.layer1 = self._make_layer(_B1, 64, 64, 4) |
| self.stage2_cfg = extra["STAGE2"] |
| num_channels = [extra["STAGE2"]["NUM_CHANNELS"][i] * _block_from_cfg(extra["STAGE2"]["BLOCK"]).expansion for i in range(len(extra["STAGE2"]["NUM_CHANNELS"]))] |
| self.transition1 = self._make_transition_layer([256], num_channels) |
| self.stage2, pre_stage_channels = self._make_stage(self.stage2_cfg, num_channels) |
| self.stage3_cfg = extra["STAGE3"] |
| num_channels = [extra["STAGE3"]["NUM_CHANNELS"][i] * _block_from_cfg(extra["STAGE3"]["BLOCK"]).expansion for i in range(len(extra["STAGE3"]["NUM_CHANNELS"]))] |
| self.transition2 = self._make_transition_layer(pre_stage_channels, num_channels) |
| self.stage3, pre_stage_channels = self._make_stage(self.stage3_cfg, num_channels) |
| self.stage4_cfg = extra["STAGE4"] |
| num_channels = [extra["STAGE4"]["NUM_CHANNELS"][i] * _block_from_cfg(extra["STAGE4"]["BLOCK"]).expansion for i in range(len(extra["STAGE4"]["NUM_CHANNELS"]))] |
| self.transition3 = self._make_transition_layer(pre_stage_channels, num_channels) |
| self.stage4, pre_stage_channels = self._make_stage(self.stage4_cfg, num_channels, multi_scale_output=True) |
| self.upsample = nn.Upsample(scale_factor=2, mode="nearest") |
| final_inp_channels = sum(pre_stage_channels) + self.inplanes |
| self.head = nn.Sequential( |
| nn.Conv2d(final_inp_channels, final_inp_channels, kernel_size=1), |
| BatchNorm2d(final_inp_channels, momentum=_v0), |
| nn.ReLU(inplace=True), |
| nn.Conv2d(final_inp_channels, config["MODEL"]["NUM_JOINTS"], kernel_size=extra["FINAL_CONV_KERNEL"]), |
| nn.Softmax(dim=1) if not self.lines else nn.Sigmoid(), |
| ) |
|
|
| def _make_head(self, x: torch.Tensor, x_skip: torch.Tensor) -> torch.Tensor: |
| x = self.upsample(x) |
| x = torch.cat([x, x_skip], dim=1) |
| return self.head(x) |
|
|
| def _make_transition_layer(self, num_channels_pre_layer: list, num_channels_cur_layer: list) -> nn.ModuleList: |
| num_branches_cur = len(num_channels_cur_layer) |
| num_branches_pre = len(num_channels_pre_layer) |
| transition_layers = [] |
| for i in range(num_branches_cur): |
| if i < num_branches_pre: |
| if num_channels_cur_layer[i] != num_channels_pre_layer[i]: |
| transition_layers.append(nn.Sequential( |
| nn.Conv2d(num_channels_pre_layer[i], num_channels_cur_layer[i], 3, 1, 1, bias=False), |
| BatchNorm2d(num_channels_cur_layer[i], momentum=_v0), |
| nn.ReLU(inplace=True), |
| )) |
| else: |
| transition_layers.append(None) |
| else: |
| conv3x3s = [] |
| for j in range(i + 1 - num_branches_pre): |
| inchannels = num_channels_pre_layer[-1] |
| outchannels = num_channels_cur_layer[i] if j == i - num_branches_pre else inchannels |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(inchannels, outchannels, 3, 2, 1, bias=False), |
| BatchNorm2d(outchannels, momentum=_v0), |
| nn.ReLU(inplace=True), |
| )) |
| transition_layers.append(nn.Sequential(*conv3x3s)) |
| return nn.ModuleList(transition_layers) |
|
|
| def _make_layer(self, block: type, inplanes: int, planes: int, blocks: int, stride: int = 1) -> nn.Sequential: |
| if isinstance(block, bool): |
| block = _d0["BOTTLENECK"] if block else _d0["BASIC"] |
| downsample = None |
| if stride != 1 or inplanes != planes * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(planes * block.expansion, momentum=_v0), |
| ) |
| layers = [block(inplanes, planes, stride, downsample)] |
| inplanes = planes * block.expansion |
| for _ in range(1, blocks): |
| layers.append(block(inplanes, planes)) |
| return nn.Sequential(*layers) |
|
|
| def _make_stage(self, layer_config: dict, num_inchannels: list, multi_scale_output: bool = True) -> tuple: |
| num_modules = layer_config["NUM_MODULES"] |
| num_blocks = layer_config["NUM_BLOCKS"] |
| num_channels = layer_config["NUM_CHANNELS"] |
| block = _block_from_cfg(layer_config["BLOCK"]) |
| fuse_method = layer_config["FUSE_METHOD"] |
| modules = [] |
| for i in range(num_modules): |
| reset_multi_scale_output = False if (not multi_scale_output and i == num_modules - 1) else True |
| modules.append(_H0( |
| layer_config["NUM_BRANCHES"], block, num_blocks, num_inchannels, num_channels, |
| fuse_method, reset_multi_scale_output, |
| )) |
| num_inchannels = modules[-1].get_num_inchannels() |
| return nn.Sequential(*modules), num_inchannels |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| x = self.conv1(x) |
| x_skip = x.clone() |
| x = self.bn1(x) |
| x = self.relu(x) |
| x = self.conv2(x) |
| x = self.bn2(x) |
| x = self.relu(x) |
| x = self.layer1(x) |
| x_list = [self.transition1[i](x) if self.transition1[i] is not None else x for i in range(self.stage2_cfg["NUM_BRANCHES"])] |
| y_list = self.stage2(x_list) |
| x_list = [self.transition2[i](y_list[-1]) if self.transition2[i] is not None else y_list[i] for i in range(self.stage3_cfg["NUM_BRANCHES"])] |
| y_list = self.stage3(x_list) |
| x_list = [self.transition3[i](y_list[-1]) if self.transition3[i] is not None else y_list[i] for i in range(self.stage4_cfg["NUM_BRANCHES"])] |
| x = self.stage4(x_list) |
| height, width = x[0].size(2), x[0].size(3) |
| x1 = F.interpolate(x[1], size=(height, width), mode="bilinear", align_corners=False) |
| x2 = F.interpolate(x[2], size=(height, width), mode="bilinear", align_corners=False) |
| x3 = F.interpolate(x[3], size=(height, width), mode="bilinear", align_corners=False) |
| x = torch.cat([x[0], x1, x2, x3], 1) |
| return self._make_head(x, x_skip) |
|
|
| def init_weights(self, pretrained: str = "") -> None: |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") |
| elif isinstance(m, nn.BatchNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| if pretrained and os.path.isfile(pretrained): |
| w = torch.load(pretrained, map_location="cpu", weights_only=False) |
| self.load_state_dict({k: v for k, v in w.items() if k in self.state_dict()}, strict=False) |
|
|
|
|
| def _g0(config: dict, pretrained: str = "", **kwargs: Any) -> _H1: |
| model = _H1(config, **kwargs) |
| model.init_weights(pretrained) |
| return model |
|
|
|
|
| _K0 = { |
| 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, |
| 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, |
| 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, |
| 45: 9, 50: 31, 52: 32, 57: 22, |
| } |
|
|
| |
|
|
| map_keypoints = { |
| 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, |
| 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, |
| 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, |
| 45: 9, 50: 31, 52: 32, 57: 22 |
| } |
|
|
| |
| TEMPLATE_F0: List[Tuple[float, float]] = [ |
| (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), (55, 250), (55, 430), |
| (110, 340), (165, 140), (165, 270), (165, 410), (165, 540), (527, 5), (527, 253), |
| (527, 433), (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), (940, 340), |
| (998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540), |
| (1045, 675), (435, 340), (615, 340), |
| ] |
| TEMPLATE_F1: List[Tuple[float, float]] = [ |
| (2.5, 2.5), (2.5, 139.5), (2.5, 249.5), (2.5, 430.5), (2.5, 540.5), (2.5, 678), |
| (54.5, 249.5), (54.5, 430.5), (110.5, 340.5), (164.5, 139.5), (164.5, 269), (164.5, 411), |
| (164.5, 540.5), (525, 2.5), (525, 249.5), (525, 430.5), (525, 678), (886.5, 139.5), |
| (886.5, 269), (886.5, 411), (886.5, 540.5), (940.5, 340.5), (998, 249.5), (998, 430.5), |
| (1048, 2.5), (1048, 139.5), (1048, 249.5), (1048, 430.5), (1048, 540.5), (1048, 678), |
| (434.5, 340), (615.5, 340), |
| ] |
|
|
| HOMOGRAPHY_FILL_ONLY_VALID = True |
| |
| STEP8_ENABLED = True |
| STEP8_FILL_MISSING = True |
| KP_THRESHOLD = 0.2 |
|
|
| |
| _KP_H, _KP_W = 540, 960 |
| |
|
|
| def _p0(frames: list) -> torch.Tensor: |
| target_size = (_KP_H, _KP_W) |
| batch = [] |
| for frame in frames: |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| img = cv2.resize(frame_rgb, (target_size[1], target_size[0])) |
| img = img.astype(np.float32) / 255.0 |
| img = np.transpose(img, (2, 0, 1)) |
| batch.append(img) |
| return torch.from_numpy(np.stack(batch)).float() |
|
|
|
|
| def _e0(heatmap: torch.Tensor, scale: int = 2, max_keypoints: int = 1) -> torch.Tensor: |
| batch_size, n_channels, height, width = heatmap.shape |
| max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1) |
| local_maxima = max_pooled == heatmap |
| masked_heatmap = heatmap * local_maxima |
| flat_heatmap = masked_heatmap.view(batch_size, n_channels, -1) |
| scores, indices = torch.topk(flat_heatmap, max_keypoints, dim=-1, sorted=False) |
| y_coords = torch.div(indices, width, rounding_mode="floor") * scale |
| x_coords = (indices % width) * scale |
| return torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1) |
|
|
|
|
| def _p1(kp_coords: torch.Tensor, kp_threshold: float, w: int, h: int, batch_size: int) -> list: |
| kp_np = kp_coords.cpu().numpy() |
| batch_results = [] |
| for batch_idx in range(batch_size): |
| kp_dict = {} |
| valid_kps = kp_np[batch_idx, :, 0, 2] > kp_threshold |
| for ch_idx in np.where(valid_kps)[0]: |
| x = float(kp_np[batch_idx, ch_idx, 0, 0]) / w |
| y = float(kp_np[batch_idx, ch_idx, 0, 1]) / h |
| p = float(kp_np[batch_idx, ch_idx, 0, 2]) |
| kp_dict[int(ch_idx) + 1] = {"x": x, "y": y, "p": p} |
| batch_results.append(kp_dict) |
| return batch_results |
|
|
|
|
| def _g1(kp_points: dict) -> dict: |
| return {_K0[k]: v for k, v in kp_points.items() if k in _K0} |
|
|
|
|
| def _i0(frames: list, model: nn.Module, kp_threshold: float, device: str, batch_size: int = 2) -> list: |
| results = [] |
| model_device = next(model.parameters()).device |
| use_amp = model_device.type == "cuda" |
| for i in range(0, len(frames), batch_size): |
| current_batch_size = min(batch_size, len(frames) - i) |
| batch_frames = frames[i : i + current_batch_size] |
| batch = _p0(batch_frames).to(model_device, non_blocking=True) |
| with torch.no_grad(): |
| with torch.amp.autocast("cuda", enabled=use_amp): |
| heatmaps = model(batch) |
| kp_coords = _e0(heatmaps[:, :-1, :, :], scale=2, max_keypoints=1) |
| batch_results = _p1(kp_coords, kp_threshold, _KP_W, _KP_H, current_batch_size) |
| results.extend([_g1(kp) for kp in batch_results]) |
| del heatmaps, kp_coords, batch |
| gc.collect() |
| if model_device.type == "cuda": |
| torch.cuda.empty_cache() |
| return results |
|
|
|
|
| def _x0(frames: list, model: nn.Module, kp_threshold: float, device: str = "cpu", batch_size: int = 2) -> list: |
| return _i0(frames, model, kp_threshold, device, batch_size) |
|
|
|
|
| def _normalize_keypoints_xyp(kp_results: list | None, frames: list, n_keypoints: int) -> list: |
| """Produce [(x, y, p), ...] per frame for fix_keypoints_pri thresholding.""" |
| if not kp_results: |
| return [] |
| keypoints = [] |
| for i in range(min(len(kp_results), len(frames))): |
| kp_dict = kp_results[i] |
| h, w = frames[i].shape[:2] |
| frame_kps = [] |
| for idx in range(n_keypoints): |
| kp_idx = idx + 1 |
| x, y, p = 0, 0, 0.0 |
| if kp_dict and isinstance(kp_dict, dict) and kp_idx in kp_dict: |
| d = kp_dict[kp_idx] |
| if isinstance(d, dict) and "x" in d: |
| x = int(d["x"] * w) |
| y = int(d["y"] * h) |
| p = float(d.get("p", 0.0)) |
| frame_kps.append((x, y, p)) |
| keypoints.append(frame_kps) |
| return keypoints |
|
|
|
|
| def _n0(keypoints_result: list | None, batch_images: list, n_keypoints: int) -> list: |
| keypoints = [] |
| if not keypoints_result: |
| return [] |
| for frame_number_in_batch, kp_dict in enumerate(keypoints_result): |
| if frame_number_in_batch >= len(batch_images): |
| break |
| frame_keypoints = [] |
| try: |
| height, width = batch_images[frame_number_in_batch].shape[:2] |
| if kp_dict and isinstance(kp_dict, dict): |
| for idx in range(32): |
| x, y = 0, 0 |
| kp_idx = idx + 1 |
| if kp_idx in kp_dict: |
| kp_data = kp_dict[kp_idx] |
| if isinstance(kp_data, dict) and "x" in kp_data and "y" in kp_data: |
| x, y = int(kp_data["x"] * width), int(kp_data["y"] * height) |
| frame_keypoints.append((x, y)) |
| else: |
| frame_keypoints = [(0, 0)] * 32 |
| except (IndexError, ValueError, AttributeError): |
| frame_keypoints = [(0, 0)] * 32 |
| if len(frame_keypoints) < n_keypoints: |
| frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints))) |
| else: |
| frame_keypoints = frame_keypoints[:n_keypoints] |
| keypoints.append(frame_keypoints) |
| return keypoints |
|
|
|
|
| def _fix_keypoints(kps: list, n: int) -> list: |
| if len(kps) < n: |
| kps += [(0, 0)] * (n - len(kps)) |
| elif len(kps) > n: |
| kps = kps[:n] |
|
|
| if kps[2] != (0,0) and kps[4] != (0,0) and kps[3] == (0,0): |
| kps[3] = kps[4]; kps[4] = (0,0) |
| if kps[0] != (0,0) and kps[4] != (0,0) and kps[1] == (0,0): |
| kps[1] = kps[4]; kps[4] = (0,0) |
| if kps[2] != (0,0) and kps[3] != (0,0) and kps[1] == (0,0) and kps[3][0] > kps[2][0]: |
| kps[1] = kps[3]; kps[3] = (0,0) |
| if kps[28] != (0,0) and kps[25] == (0,0) and kps[26] != (0,0) and kps[26][0] > kps[28][0]: |
| kps[25] = kps[28]; kps[28] = (0,0) |
| if kps[24] != (0,0) and kps[28] != (0,0) and kps[25] == (0,0): |
| kps[25] = kps[28]; kps[28] = (0,0) |
| if kps[24] != (0,0) and kps[27] != (0,0) and kps[26] == (0,0): |
| kps[26] = kps[27]; kps[27] = (0,0) |
| if kps[28] != (0,0) and kps[23] == (0,0) and kps[20] != (0,0) and kps[20][1] > kps[23][1]: |
| kps[23] = kps[20]; kps[20] = (0,0) |
| return kps |
|
|
|
|
| def _keypoints_to_float(keypoints: list) -> List[List[float]]: |
| """Convert keypoints to [[x, y], ...] float format for homography.""" |
| return [[float(x), float(y)] for x, y in keypoints] |
|
|
|
|
| def _keypoints_to_int(keypoints: list) -> List[Tuple[int, int]]: |
| """Convert keypoints to [(x, y), ...] integer format.""" |
| return [(int(round(float(kp[0]))), int(round(float(kp[1])))) for kp in keypoints] |
|
|
|
|
| |
| _FKP_KEYPOINTS: List[Tuple[int, int]] = [ |
| (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), |
| (55, 250), (55, 430), (110, 340), (165, 140), (165, 270), (165, 410), (165, 540), |
| (527, 5), (527, 253), (527, 433), (527, 675), |
| (888, 140), (888, 270), (888, 410), (888, 540), (940, 340), |
| (998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540), (1045, 675), |
| (435, 340), (615, 340), |
| ] |
| _FKP_KEYPOINTS_NP = np.asarray(_FKP_KEYPOINTS, dtype=np.float32) |
| _FKP_GROUPS = { |
| 1: [2, 3, 7, 10], 2: [1, 3, 7, 10], 3: [2, 4, 7, 8], 4: [3, 5, 8, 7], 5: [4, 8, 6, 3], 6: [5, 4, 8, 13], |
| 7: [3, 8, 9, 10], 8: [4, 7, 9, 13], 9: [7, 8, 11, 12], 10: [9, 11, 7, 2], 11: [9, 10, 12, 31], 12: [9, 11, 13, 31], |
| 13: [9, 12, 8, 5], 14: [15, 31, 32, 16], 15: [31, 16, 32, 14], 16: [31, 15, 32, 17], 17: [31, 16, 32, 15], |
| 18: [19, 22, 23, 26], 19: [18, 22, 20, 32], 20: [19, 22, 21, 32], 21: [20, 22, 24, 29], 22: [23, 24, 19, 20], |
| 23: [27, 24, 22, 28], 24: [28, 23, 22, 27], 25: [26, 27, 23, 18], 26: [25, 27, 23, 18], 27: [26, 23, 28, 24], |
| 28: [27, 24, 29, 23], 29: [28, 30, 24, 21], 30: [29, 28, 24, 21], 31: [15, 16, 32, 14], 32: [15, 31, 16, 14], |
| } |
| _FKP_GROUPS_ARRAY = [np.asarray(_FKP_GROUPS[i], dtype=np.int32) - 1 for i in range(1, 33)] |
| _FKP_BLACKLISTS = [ |
| [23, 24, 27, 28], [7, 8, 3, 4], [2, 10, 1, 14], [18, 26, 14, 25], [5, 13, 6, 17], [21, 29, 17, 30], |
| [10, 11, 2, 3], [10, 11, 2, 7], [12, 13, 4, 5], [12, 13, 5, 8], [18, 19, 26, 27], [18, 19, 26, 23], |
| [20, 21, 24, 29], [20, 21, 28, 29], [8, 4, 5, 13], [3, 7, 2, 10], [23, 27, 18, 26], [24, 28, 21, 29], |
| ] |
| _FKP_PREPARED_BLACKLISTS = [(set(bl), bl[0] - 1, bl[1] - 1) for bl in _FKP_BLACKLISTS] |
| _FKP_DILATE_KERNEL = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) |
| _FKP_KERNEL_31 = cv2.getStructuringElement(cv2.MORPH_RECT, (31, 31)) |
| _FKP_TEMPLATE_GRAY: Optional[ndarray] = None |
| _FKP_SHARED_EXECUTOR: Optional[ThreadPoolExecutor] = None |
| _FKP_PER_KEY_LOCKS: Dict[Any, threading.Lock] = defaultdict(threading.Lock) |
|
|
|
|
| class _FKP_MaxSizeCache(OrderedDict): |
| def __init__(self, maxlen: int = 500): |
| super().__init__() |
| self.maxlen = maxlen |
| self._lock = threading.Lock() |
|
|
| def set(self, k: Any, v: Any) -> None: |
| with self._lock: |
| if k in self: |
| self.move_to_end(k) |
| self[k] = v |
| if len(self) > self.maxlen: |
| self.popitem(last=False) |
|
|
| def get(self, k: Any) -> Any: |
| with self._lock: |
| return super().get(k) |
|
|
| def exists(self, k: Any) -> bool: |
| with self._lock: |
| return k in self |
|
|
|
|
| _FKP_CACHED = _FKP_MaxSizeCache() |
|
|
|
|
| def _fkp_load_template_gray() -> ndarray: |
| global _FKP_TEMPLATE_GRAY |
| if _FKP_TEMPLATE_GRAY is None: |
| template_path = Path(__file__).parent / "football_pitch_template.png" |
| img = cv2.imread(str(template_path), cv2.IMREAD_COLOR) |
| if img is not None: |
| _FKP_TEMPLATE_GRAY = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
| else: |
| _FKP_TEMPLATE_GRAY = np.zeros((680, 1050), dtype=np.uint8) |
| return _FKP_TEMPLATE_GRAY |
|
|
|
|
| def _fkp_get_or_compute_masks(key: Any, compute_fn: Any) -> Any: |
| lock = _FKP_PER_KEY_LOCKS[key] |
| with lock: |
| if _FKP_CACHED.exists(key): |
| return _FKP_CACHED.get(key) |
| masks = compute_fn() |
| _FKP_CACHED.set(key, masks) |
| return masks |
|
|
|
|
| def _fkp_canonical(obj: Any) -> Any: |
| if isinstance(obj, np.ndarray): |
| return _fkp_canonical(obj.tolist()) |
| if isinstance(obj, (list, tuple)): |
| return tuple(_fkp_canonical(x) for x in obj) |
| if isinstance(obj, set): |
| return tuple(sorted(_fkp_canonical(x) for x in obj)) |
| if isinstance(obj, dict): |
| return tuple((k, _fkp_canonical(v)) for k, v in sorted(obj.items())) |
| return obj |
|
|
|
|
| def _fkp_are_collinear(pts: Any, eps: float = 1e-9) -> bool: |
| pts = np.asarray(pts) |
| if len(pts) < 3: |
| return True |
| a, b, c = pts[:3] |
| area = np.abs(np.cross(b - a, c - a)) |
| return bool(area < eps) |
|
|
|
|
| def _fkp_unique_points(src: Any, dst: Any) -> Any: |
| src, dst = np.asarray(src, float), np.asarray(dst, float) |
| src_nonzero = ~np.all(np.abs(src) < 1e-9, axis=1) |
| dst_nonzero = ~np.all(np.abs(dst) < 1e-9, axis=1) |
| valid_mask = src_nonzero & dst_nonzero |
| if not valid_mask.any(): |
| return np.array([]), np.array([]) |
| src_valid = src[valid_mask] |
| dst_valid = dst[valid_mask] |
| _, unique_idx = np.unique(src_valid, axis=0, return_index=True) |
| unique_idx.sort() |
| return src_valid[unique_idx], dst_valid[unique_idx] |
|
|
|
|
| def _fkp_apply_transform(M: ndarray, P: Any) -> Tuple[int, int]: |
| x, y = P[0], P[1] |
| return (int(M[0, 0] * x + M[0, 1] * y + M[0, 2]), int(M[1, 0] * x + M[1, 1] * y + M[1, 2])) |
|
|
|
|
| def _fkp_apply_homo_transform(M: ndarray, P: Any) -> Tuple[int, int]: |
| x, y = P[0], P[1] |
| w = M[2, 0] * x + M[2, 1] * y + M[2, 2] |
| x_new = (M[0, 0] * x + M[0, 1] * y + M[0, 2]) / w |
| y_new = (M[1, 0] * x + M[1, 1] * y + M[1, 2]) / w |
| return (int(x_new), int(y_new)) |
|
|
|
|
| def _fkp_affine_from_4_points(src_pts: Any, dst_pts: Any) -> ndarray: |
| P, Q = np.array(src_pts, dtype=np.float64), np.array(dst_pts, dtype=np.float64) |
| x, y = P[:, 0], P[:, 1] |
| u, v = Q[:, 0], Q[:, 1] |
| A = np.zeros((8, 6), dtype=np.float64) |
| A[0::2, 0], A[0::2, 1], A[0::2, 2] = x, y, 1 |
| A[1::2, 3], A[1::2, 4], A[1::2, 5] = x, y, 1 |
| b = np.empty(8, dtype=np.float64) |
| b[0::2], b[1::2] = u, v |
| params, _, _, _ = np.linalg.lstsq(A, b, rcond=None) |
| a, b_, e, c, d, f = params |
| return np.array([[a, b_, e], [c, d, f], [0, 0, 1]], dtype=np.float64) |
|
|
|
|
| def _fkp_four_point_homography(src_pts: Any, dst_pts: Any) -> ndarray: |
| src, dst = np.array(src_pts, dtype=np.float64), np.array(dst_pts, dtype=np.float64) |
| x, y = src[:, 0], src[:, 1] |
| u, v = dst[:, 0], dst[:, 1] |
| A = np.zeros((8, 9), dtype=np.float64) |
| A[0::2, 0], A[0::2, 1], A[0::2, 2] = -x, -y, -1 |
| A[0::2, 6], A[0::2, 7], A[0::2, 8] = x * u, y * u, u |
| A[1::2, 3], A[1::2, 4], A[1::2, 5] = -x, -y, -1 |
| A[1::2, 6], A[1::2, 7], A[1::2, 8] = x * v, y * v, v |
| _, _, Vt = np.linalg.svd(A) |
| h = Vt[-1, :] |
| return (h.reshape(3, 3) / h[8]).astype(np.float64) |
|
|
|
|
| def _fkp_three_point_affine(P: Any, Q: Any) -> ndarray: |
| P, Q = np.array(P, dtype=np.float64), np.array(Q, dtype=np.float64) |
| x, y = P[:, 0], P[:, 1] |
| u, v = Q[:, 0], Q[:, 1] |
| n = P.shape[0] |
| A = np.zeros((2 * n, 6), dtype=np.float64) |
| A[0::2, 0], A[0::2, 1], A[0::2, 2] = x, y, 1 |
| A[1::2, 3], A[1::2, 4], A[1::2, 5] = x, y, 1 |
| b = np.empty(2 * n, dtype=np.float64) |
| b[0::2], b[1::2] = u, v |
| params, _, _, _ = np.linalg.lstsq(A, b, rcond=None) |
| a, b_, e, c, d, f = params |
| return np.array([[a, b_, e], [c, d, f], [0, 0, 1]], dtype=np.float64) |
|
|
|
|
| def _fkp_line_to_line_transform(P1: Any, P2: Any, Q1: Any, Q2: Any) -> ndarray: |
| P1, P2 = np.asarray(P1, dtype=np.float64), np.asarray(P2, dtype=np.float64) |
| Q1, Q2 = np.asarray(Q1, dtype=np.float64), np.asarray(Q2, dtype=np.float64) |
| v_s, v_t = P2 - P1, Q2 - Q1 |
| s = np.hypot(v_t[0], v_t[1]) / (np.hypot(v_s[0], v_s[1]) + 1e-12) |
| theta = np.arctan2(v_t[1], v_t[0]) - np.arctan2(v_s[1], v_s[0]) |
| c, s_ = np.cos(theta), np.sin(theta) |
| return np.array([ |
| [s * c, -s * s_, Q1[0] - (s * c * P1[0] - s * s_ * P1[1])], |
| [s * s_, s * c, Q1[1] - (s * s_ * P1[0] + s * c * P1[1])], |
| [0, 0, 1] |
| ], dtype=np.float64) |
|
|
|
|
| def _fkp_robust_transform(src_pts: Any, dst_pts: Any) -> Any: |
| src, dst = _fkp_unique_points(src_pts, dst_pts) |
| n = len(src) |
| if n >= 4: |
| if _fkp_are_collinear(src) or _fkp_are_collinear(dst): |
| H = _fkp_affine_from_4_points(src, dst) |
| return lambda pt: _fkp_apply_transform(H, pt) |
| H = _fkp_four_point_homography(src, dst) |
| return lambda pt: _fkp_apply_homo_transform(H, pt) |
| elif n == 3: |
| H = _fkp_three_point_affine(src, dst) |
| return lambda pt: _fkp_apply_transform(H, pt) |
| elif n == 2: |
| H = _fkp_line_to_line_transform(src[0], src[1], dst[0], dst[1]) |
| return lambda pt: _fkp_apply_transform(H, pt) |
| elif n == 1: |
| H = np.eye(3) |
| H[:2, 2] = dst[0] - src[0] |
| return lambda pt: _fkp_apply_transform(H, pt) |
| return lambda pt: _fkp_apply_transform(np.eye(3), pt) |
|
|
|
|
| def _fkp_pick_pt(points: Any) -> List[int]: |
| if not points: |
| return [] |
| pts_arr = np.asarray(points, dtype=np.int32) |
| seen = np.zeros(32, dtype=bool) |
| valid_mask = (pts_arr >= 0) & (pts_arr < 32) |
| seen[pts_arr[valid_mask]] = True |
| out_seen = np.zeros(32, dtype=bool) |
| out: List[int] = [] |
| for p in pts_arr[valid_mask]: |
| neigh = _FKP_GROUPS_ARRAY[p] |
| candidates = neigh[~seen[neigh] & ~out_seen[neigh]] |
| out_seen[candidates] = True |
| out.extend(candidates.tolist()) |
| return out |
|
|
|
|
| def _fkp_is_include(kp: Any, all_kps: Any) -> bool: |
| for kps in all_kps: |
| if np.sum(np.abs(np.array(kps) - np.array(kp))) <= 2: |
| return True |
| return False |
|
|
|
|
| def _fkp_get_edge_mask(x: float, y: float, W: int, H: int, t: int = 100) -> int: |
| mask = 0 |
| if x <= t: |
| mask |= 1 |
| if x >= W - t: |
| mask |= 2 |
| if y <= t: |
| mask |= 4 |
| if y >= H - t: |
| mask |= 8 |
| return mask |
|
|
|
|
| def _fkp_both_points_same_direction_fast(A: Any, B: Any, W: int, H: int, t: int = 100) -> bool: |
| mask_a = _fkp_get_edge_mask(A[0], A[1], W, H, t) |
| if mask_a == 0: |
| return False |
| mask_b = _fkp_get_edge_mask(B[0], B[1], W, H, t) |
| return (mask_a & mask_b) != 0 |
|
|
|
|
| def _fkp_project_image(image: ndarray, src_kps: Any, dst_kps: Any, w: int, h: int) -> ndarray: |
| src_arr = np.array(src_kps, dtype=np.float32) |
| dst_arr = np.array(dst_kps, dtype=np.float32) |
| valid_mask = ~((dst_arr[:, 0] == 0) & (dst_arr[:, 1] == 0)) |
| H, _ = cv2.findHomography(src_arr[valid_mask], dst_arr[valid_mask]) |
| if H is None: |
| raise ValueError("Homography not found") |
| return cv2.warpPerspective(image, H, (w, h)) |
|
|
|
|
| def _fkp_extract_masks(image: ndarray) -> tuple: |
| gray = image if image.ndim == 2 else cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| _, mask_ground = cv2.threshold(gray, 10, 1, cv2.THRESH_BINARY) |
| _, mask_lines = cv2.threshold(gray, 200, 1, cv2.THRESH_BINARY) |
| return mask_ground, mask_lines |
|
|
|
|
| def _fkp_convert_to_gray(image: ndarray) -> ndarray: |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| gray = cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, _FKP_KERNEL_31) |
| gray = cv2.GaussianBlur(gray, (5, 5), 0) |
| return cv2.Canny(gray, 30, 100) |
|
|
|
|
| def _fkp_evaluate_keypoints_for_frame( |
| frame_keypoints: Any, frame_index: int, h: int, w: int, check_frame_list: List[ndarray], precomputed_key: Any = None |
| ) -> float: |
| key = precomputed_key or _fkp_canonical((frame_keypoints, w, h)) |
| floor_markings = _fkp_load_template_gray() |
|
|
| def compute_masks(fkp: Any, ww: int, hh: int) -> Any: |
| try: |
| non_idxs_set = {i + 1 for i, kpt in enumerate(fkp) if kpt[0] != 0 or kpt[1] != 0} |
| for bl_set, idx0, idx1 in _FKP_PREPARED_BLACKLISTS: |
| if non_idxs_set.issubset(bl_set): |
| if _fkp_both_points_same_direction_fast(fkp[idx0], fkp[idx1], ww, hh): |
| return None, 0, None |
| warped = _fkp_project_image(floor_markings, _FKP_KEYPOINTS, fkp, ww, hh) |
| mask_ground, mask_lines = _fkp_extract_masks(warped) |
| ys, xs = np.where(mask_lines == 1) |
| if len(xs) == 0: |
| bbox = None |
| else: |
| bbox = (xs.min(), ys.min(), xs.max(), ys.max()) |
| bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) if bbox else 1 |
| if (bbox_area / (hh * ww)) < 0.2: |
| return None, 0, None |
| return mask_lines, int(cv2.countNonZero(mask_lines)), mask_ground |
| except Exception: |
| return None, 0, None |
|
|
| try: |
| mask_exp, pixels_on_lines, mask_ground = _fkp_get_or_compute_masks( |
| key, lambda: compute_masks(frame_keypoints, w, h) |
| ) |
| if mask_exp is None or pixels_on_lines == 0 or mask_ground is None: |
| return 0.0 |
| if frame_index >= len(check_frame_list): |
| return 0.0 |
| scale = max(1, _FKP_EVAL_DOWNSCALE) |
| if scale > 1 and h > scale and w > scale: |
| h_s, w_s = h // scale, w // scale |
| frame_s = cv2.resize(check_frame_list[frame_index], (w_s, h_s), interpolation=cv2.INTER_AREA) |
| mask_ground_s = cv2.resize(mask_ground, (w_s, h_s), interpolation=cv2.INTER_NEAREST) |
| mask_exp_s = cv2.resize(mask_exp, (w_s, h_s), interpolation=cv2.INTER_NEAREST) |
| pixels_on_lines = cv2.countNonZero(mask_exp_s) |
| if pixels_on_lines == 0: |
| return 0.0 |
| work_buffer = np.zeros((h_s, w_s), dtype=np.uint8) |
| cv2.bitwise_and(frame_s, frame_s, dst=work_buffer, mask=mask_ground_s) |
| cv2.dilate(work_buffer, _FKP_DILATE_KERNEL, dst=work_buffer, iterations=2) |
| cv2.threshold(work_buffer, 0, 255, cv2.THRESH_BINARY, dst=work_buffer) |
| pixels_predicted = cv2.countNonZero(work_buffer) |
| cv2.bitwise_and(work_buffer, mask_exp_s, dst=work_buffer) |
| pixels_overlapping = cv2.countNonZero(work_buffer) |
| else: |
| work_buffer = np.zeros((h, w), dtype=np.uint8) |
| cv2.bitwise_and(check_frame_list[frame_index], check_frame_list[frame_index], dst=work_buffer, mask=mask_ground) |
| cv2.dilate(work_buffer, _FKP_DILATE_KERNEL, dst=work_buffer, iterations=3) |
| cv2.threshold(work_buffer, 0, 255, cv2.THRESH_BINARY, dst=work_buffer) |
| pixels_predicted = cv2.countNonZero(work_buffer) |
| cv2.bitwise_and(work_buffer, mask_exp, dst=work_buffer) |
| pixels_overlapping = cv2.countNonZero(work_buffer) |
| pixels_rest = pixels_predicted - pixels_overlapping |
| total_pixels = pixels_predicted + pixels_on_lines - pixels_overlapping |
| if total_pixels > 0 and (pixels_rest / total_pixels) > 0.9: |
| return 0.0 |
| return pixels_overlapping / (pixels_on_lines + 1e-8) |
| except Exception: |
| pass |
| return 0.0 |
|
|
|
|
| def _fkp_make_possible_keypoints(all_keypoints: Any, frame_width: int, frame_height: int, limit: int | None = None) -> List[Any]: |
| if not all_keypoints: |
| return [] |
| max_candidates = limit if limit is not None else _FKP_MAX_CANDIDATES_PER_FRAME |
| results: List[Any] = [] |
| for keypoints in all_keypoints: |
| if len(results) >= max_candidates: |
| break |
| kps = _keypoints_to_int(keypoints) |
| arr = np.asarray(kps, dtype=np.int32) |
| if arr.ndim != 2 or arr.shape[1] != 2: |
| continue |
| mask = (arr[:, 0] != 0) & (arr[:, 1] != 0) |
| non_zero_count = int(mask.sum()) |
| if non_zero_count > 4: |
| if not _fkp_is_include(kps, results): |
| results.append(kps) |
| continue |
| if non_zero_count < 2: |
| continue |
| |
| if not _fkp_is_include(kps, results): |
| results.append(kps) |
| return results |
|
|
|
|
| def _fkp_get_executor(max_workers: int) -> ThreadPoolExecutor: |
| global _FKP_SHARED_EXECUTOR |
| if _FKP_SHARED_EXECUTOR is None: |
| _FKP_SHARED_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers) |
| return _FKP_SHARED_EXECUTOR |
|
|
|
|
| def _fkp_evaluates( |
| jobs: Any, h: int, w: int, total_frames: int, time_left: float, check_frame_list: List[ndarray] |
| ) -> List[Any]: |
| start = time.time() |
| results = [[(0, 0)] * 32 for _ in range(total_frames)] |
| if len(jobs) == 0: |
| return results |
| unique_jobs: List[Any] = [] |
| seen: set = set() |
| for (job, frame_index) in jobs: |
| try: |
| key_bytes = np.asarray(job, dtype=np.int32).tobytes() if not isinstance(job, np.ndarray) else (job.astype(np.int32).tobytes() if job.dtype != np.int32 else job.tobytes()) |
| sig = (frame_index, key_bytes) |
| if sig in seen: |
| continue |
| seen.add(sig) |
| unique_jobs.append((job, frame_index, key_bytes)) |
| except Exception: |
| continue |
| if len(unique_jobs) <= 10: |
| scores_unique = [ |
| _fkp_evaluate_keypoints_for_frame(job, frame_index, h, w, check_frame_list, (key_bytes, w, h)) |
| for (job, frame_index, key_bytes) in unique_jobs |
| ] |
| else: |
| cpu_count = max(1, (os.cpu_count() or 1)) |
| max_workers = min(max(2, cpu_count), 8) |
| chunk_size = 24 |
| scores_unique = [] |
| ex = _fkp_get_executor(max_workers) |
| time_left -= (time.time() - start) |
| for i in range(0, len(unique_jobs), chunk_size): |
| start = time.time() |
| chunk = unique_jobs[i : min(i + chunk_size, len(unique_jobs))] |
| scores_unique.extend(ex.map( |
| lambda pair: _fkp_evaluate_keypoints_for_frame(pair[0], pair[1], h, w, check_frame_list, (pair[2], w, h)), |
| chunk, |
| )) |
| time_left -= (time.time() - start) |
| if time_left <= 0: |
| unique_jobs = unique_jobs[: min(i + chunk_size, len(unique_jobs))] |
| break |
| scores = np.full(total_frames, 0.0, dtype=np.float32) |
| for score, (k, frame_index, _) in zip(scores_unique, unique_jobs): |
| if score > scores[frame_index]: |
| scores[frame_index] = score |
| results[frame_index] = k |
| return results |
|
|
|
|
| def _fkp_normalize_results(frame_results: Any, threshold: float) -> List[Any]: |
| if not frame_results: |
| return [] |
| results_array: List[Any] = [] |
| for result in frame_results: |
| pad_len = 32 - len(result) |
| if pad_len > 0: |
| result = list(result) + [(0, 0, 0.0)] * pad_len |
| result = result[:32] |
| arr = np.array(result, dtype=np.float32) |
| if arr.size == 0: |
| results_array.append([(0, 0)] * 32) |
| continue |
| if arr.ndim == 2 and arr.shape[1] >= 3: |
| mask = arr[:, 2] > threshold |
| scaled = np.where(mask[:, None], arr[:, :2].copy(), 0) |
| results_array.append([(int(x), int(y)) for x, y in scaled]) |
| else: |
| results_array.append([(0, 0)] * 32) |
| return results_array |
|
|
|
|
| def fix_keypoints_pri( |
| results_frames: Any, frame_width: int, frame_height: int, time_left: float, check_frame_list: List[ndarray] |
| ) -> List[Any]: |
| start = time.time() |
| max_frames = len(results_frames) |
| all_possible = [None] * max_frames |
| for i in range(max_frames): |
| all_possible[i] = _fkp_make_possible_keypoints(results_frames[i], frame_width, frame_height) |
| default_kps: List[Any] = [] |
| for i in range(len(all_possible)): |
| default_kps.append(all_possible[i][0] if all_possible[i] else [(0, 0)] * 32) |
| total_jobs: List[Any] = [] |
| is_end = [0] * len(all_possible) |
| while is_end.count(-1) != len(is_end): |
| for frame_index in range(max_frames): |
| if is_end[frame_index] == -1: |
| continue |
| if is_end[frame_index] == len(all_possible[frame_index]): |
| is_end[frame_index] = -1 |
| continue |
| total_jobs.append((all_possible[frame_index][is_end[frame_index]], frame_index)) |
| is_end[frame_index] += 1 |
| time_left -= (time.time() - start) |
| if time_left <= 0: |
| return default_kps |
| return _fkp_evaluates(total_jobs, frame_height, frame_width, max_frames, time_left, check_frame_list) |
|
|
|
|
| def _step8_one_frame_kp( |
| kps: list, |
| frame_width: int, |
| frame_height: int, |
| fill_missing: bool, |
| n_keypoints: int = 32, |
| ) -> Optional[List[List[float]]]: |
| """Step8 (example_miner _z1): homography from template to frame, project all template points, optionally fill missing.""" |
| if not isinstance(kps, list) or len(kps) != n_keypoints or frame_width <= 0 or frame_height <= 0: |
| return None |
| if n_keypoints != 32 or len(TEMPLATE_F0) != 32 or len(TEMPLATE_F1) != 32: |
| return None |
| filtered_src: List[Tuple[float, float]] = [] |
| filtered_dst: List[Tuple[float, float]] = [] |
| valid_indices: List[int] = [] |
| for idx, kp in enumerate(kps): |
| if not isinstance(kp, (list, tuple)) or len(kp) < 2: |
| continue |
| try: |
| x, y = float(kp[0]), float(kp[1]) |
| except (TypeError, ValueError): |
| continue |
| if x == 0.0 and y == 0.0: |
| continue |
| if idx >= len(TEMPLATE_F1): |
| continue |
| filtered_src.append(TEMPLATE_F1[idx]) |
| filtered_dst.append((x, y)) |
| valid_indices.append(idx) |
| if len(filtered_src) < 4: |
| return None |
| src_np = np.array(filtered_src, dtype=np.float32) |
| dst_np = np.array(filtered_dst, dtype=np.float32) |
| H_corrected, _ = cv2.findHomography(src_np, dst_np) |
| if H_corrected is None: |
| return None |
| fk_np = np.array(TEMPLATE_F0, dtype=np.float32).reshape(1, -1, 2) |
| projected_np = cv2.perspectiveTransform(fk_np, H_corrected)[0] |
| valid_indices_set = set(valid_indices) |
| adjusted_kps: List[List[float]] = [[0.0, 0.0] for _ in range(n_keypoints)] |
| for idx in range(n_keypoints): |
| x, y = float(projected_np[idx][0]), float(projected_np[idx][1]) |
| if not (0 <= x < frame_width and 0 <= y < frame_height): |
| continue |
| if fill_missing or idx in valid_indices_set: |
| adjusted_kps[idx] = [x, y] |
| return adjusted_kps |
|
|
|
|
| def _apply_homography_refinement( |
| keypoints: List[List[float]], |
| frame: np.ndarray, |
| n_keypoints: int, |
| ) -> List[List[float]]: |
| """Refine keypoints using homography from template to frame (new-5 style).""" |
| if n_keypoints != 32 or len(TEMPLATE_F0) != 32 or len(TEMPLATE_F1) != 32: |
| return keypoints |
| frame_height, frame_width = frame.shape[:2] |
| valid_src: List[Tuple[float, float]] = [] |
| valid_dst: List[Tuple[float, float]] = [] |
| valid_indices: List[int] = [] |
| for kp_idx, kp in enumerate(keypoints): |
| if kp and len(kp) >= 2: |
| x, y = float(kp[0]), float(kp[1]) |
| if not (abs(x) < 1e-6 and abs(y) < 1e-6) and 0 <= x < frame_width and 0 <= y < frame_height: |
| valid_src.append(TEMPLATE_F1[kp_idx]) |
| valid_dst.append((x, y)) |
| valid_indices.append(kp_idx) |
| if len(valid_src) < 4: |
| return keypoints |
| src_pts = np.array(valid_src, dtype=np.float32) |
| dst_pts = np.array(valid_dst, dtype=np.float32) |
| H, _ = cv2.findHomography(src_pts, dst_pts) |
| if H is None: |
| return keypoints |
| all_template_points = np.array(TEMPLATE_F0, dtype=np.float32).reshape(-1, 1, 2) |
| adjusted_points = cv2.perspectiveTransform(all_template_points, H) |
| adjusted_points = adjusted_points.reshape(-1, 2) |
| adj_x = adjusted_points[:32, 0] |
| adj_y = adjusted_points[:32, 1] |
| valid_mask = (adj_x >= 0) & (adj_y >= 0) & (adj_x < frame_width) & (adj_y < frame_height) |
| valid_indices_set = set(valid_indices) |
| adjusted_kps: List[List[float]] = [[0.0, 0.0] for _ in range(32)] |
| for i in np.where(valid_mask)[0]: |
| if not HOMOGRAPHY_FILL_ONLY_VALID or i in valid_indices_set: |
| adjusted_kps[i] = [float(adj_x[i]), float(adj_y[i])] |
| return adjusted_kps |
|
|
|
|
| def _c1(keypoints: list) -> list: |
| return [[round(float(x), 1), round(float(y), 1)] for x, y in keypoints] |
|
|
|
|
| def _l0(model_dir: Path, device: str | None = None, config_name: str = "hrnetv2_w48.yaml", weights_subdir: str | None = None) -> nn.Module: |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| config_path = model_dir / config_name |
| weights_path = (model_dir / weights_subdir / "keypoint") if weights_subdir else (model_dir / "keypoint") |
| if not config_path.exists(): |
| raise FileNotFoundError(f"Keypoint config not found: {config_path}") |
| if not weights_path.exists(): |
| raise FileNotFoundError(f"Keypoint weights not found: {weights_path}") |
| with open(config_path) as f: |
| cfg = yaml.safe_load(f) |
| loaded = torch.load(weights_path, map_location=device, weights_only=False) |
| state = loaded.get("state_dict", loaded) if isinstance(loaded, dict) else loaded |
| if not isinstance(state, dict): |
| raise ValueError(f"Keypoint weights must be state_dict or dict with 'state_dict'; got {type(state)}") |
| if state and next(iter(state.keys()), "").startswith("module."): |
| state = {k.replace("module.", "", 1): v for k, v in state.items()} |
| def _remap_head(k: str) -> str: |
| if k.startswith("head.0."): |
| return "head." + k[7:] |
| return k |
| state = {_remap_head(k): v for k, v in state.items()} |
| model = _g0(cfg) |
| model.load_state_dict(state, strict=True) |
| model.to(device) |
| model.eval() |
| return model |
|
|
| _C0 = 0 |
| _C1 = 1 |
| _C2 = 2 |
| _C3 = 3 |
| _CLS_TO_VALIDATOR: dict[int, int] = {_C2: 0, _C3: 1, _C1: 2, _C0: 3} |
|
|
| _B0: float = 0.25 |
| _B1: bool = True |
| _B2: bool = False |
| _B3: bool = False |
| _B4: bool = False |
| _B5: bool = True |
|
|
| _D0 = 640 |
| _D0_PERSON = 640 |
| _TRACK_IOU_THRESH = 0.3 |
| _TRACK_IOU_HIGH = 0.4 |
| _TRACK_IOU_LOW = 0.2 |
| _TRACK_MAX_AGE = 3 |
| _TRACK_USE_VELOCITY = True |
| _D1 = 0.3 |
| _T0 = 0.5 |
| _R0 = 5 |
| _R1 = 0.10 |
| _R2 = 0.70 |
| _q0 = 0.0 |
| _q1 = 0.0 |
|
|
| _P0 = True |
|
|
| _E0: bool = True |
| _E1: bool = True |
|
|
| _BX_BS: bool = 16 |
| _KP_BS: int = 16 |
|
|
| _A0: bool = False |
| _S0 = 8 |
|
|
| _G0: bool = True |
| _G1 = 5 |
| _G2 = 4 |
| _G3 = 3 |
| _G6: bool = False |
| _G7: bool = True |
| _G5: bool = True |
| _G8: bool = True |
|
|
| ENABLE_KEYPOINT_CONVERT: bool = False |
| _U0 = ENABLE_KEYPOINT_CONVERT |
| _J0 = True |
| _J1 = True |
| _J2: list[float] = [0.3, 0.5] |
| _J3: int = 20 |
| _J4 = True |
| _J5: float = 50.0 |
| _J6: int = 2 |
| _W0: list[int] = [4, 9, 10, 11, 12, 17, 18, 19, 20, 28] |
| _W1: list[int] = [13, 14, 15] |
| _W2: list[int] = [5, 16, 29] |
| _W3: list[int] = [4, 9, 10, 11, 12, 17, 18, 19, 20, 28] |
| _W4: list[int] = [13, 14, 15] |
| _W5: list[int] = [5, 16, 29] |
| _KP16_WEIGHT: int = 8 |
| _INDICES_H3_VS_H1: set[int] = {5, 13, 14, 15, 16, 29} |
| _INDICES_H3_VS_H2: set[int] = {4, 9, 10, 11, 12, 17, 18, 19, 20, 28} |
| _ALWAYS_INCLUDE_INDICES: tuple[int, ...] = (5, 16, 29) |
| _MASK_RETRY_ERRORS: tuple[str, ...] = ("A projected line is too wide", "Projected ground should not be rectangular") |
| |
| _FKP_FAST_MODE: bool = True |
| _FKP_THRESHOLDS: tuple[float, ...] = (0.2, 0.4, 0.6, 0.8) |
| _FKP_SINGLE_THRESHOLD: float = 0.4 |
| _FKP_MAX_CANDIDATES_PER_FRAME: int = 2 |
| _FKP_TIME_BUDGET_SEC: float = 2.5 |
| _FKP_EVAL_DOWNSCALE: int = 2 |
| _Z8_MIN_BATCH_FRAMES: int = 6 |
| _Z8_MAX_PROBLEMATIC_PER_BATCH: int = 8 |
| _STEP0_ENABLED: bool = True |
| _STEP0_PROXIMITY_PX: float = 30.0 |
| _STEP5_2_RIGHT_QUAD_HALFLENGTH: float = 200.0 |
| _STEP5_2_8PX_COARSE_STEP: int = 10 |
| _STEP5_2_8PX_REFINE_WINDOW: int = 10 |
| _STEP5_2_ROI_MARGIN: int = 10 |
| _STEP5_2_LONGEST_SEGMENT_MAX_PTS: int = 28 |
| _STEP5_2_8PX_HALFRES: bool = True |
| _STEP5_2_8PX_REFINE_PASS: bool = True |
| _STEP5_2_HEAVY_SEARCH_FLAG: bool = True |
| _F0: list[tuple[float, float]] = [ |
| (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), |
| (55, 250), (55, 430), (110, 340), (165, 140), (165, 270), |
| (165, 410), (165, 540), (527, 5), (527, 253), (527, 433), |
| (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), |
| (940, 340), (998, 250), (998, 430), (1045, 5), (1045, 140), |
| (1045, 250), (1045, 430), (1045, 540), (1045, 675), |
| (435, 340), (615, 340), |
| ] |
| _F1: list[tuple[float, float]] = [ |
| (2.5, 2.5), (2.5, 139.5), (2.5, 249.5), (2.5, 430.5), (2.5, 540.5), (2.5, 678.0), |
| (54.5, 249.5), (54.5, 430.5), (110.5, 340.5), (164.5, 139.5), (164.5, 269.0), |
| (164.5, 411.0), (164.5, 540.5), (525.0, 2.5), (525.0, 249.5), (525.0, 430.5), |
| (525.0, 678.0), (886.5, 139.5), (886.5, 269.0), (886.5, 411.0), (886.5, 540.5), |
| (940.5, 340.5), (998.0, 249.5), (998.0, 430.5), (1048.0, 2.5), (1048.0, 139.5), |
| (1048.0, 249.5), (1048.0, 430.5), (1048.0, 540.5), (1048.0, 678.0), |
| (434.5, 340.0), (615.5, 340.0), |
| ] |
| _I0 = 5 |
| _I1 = 29 |
| _I2 = 0 |
| _I3 = 24 |
| _N0 = len(_F0) |
|
|
|
|
| def _step0_remove_close_keypoints(kps: list[list[float]], proximity_px: float = 30.0) -> int: |
| n = len(kps) |
| if n == 0: |
| return 0 |
|
|
| def _valid(i: int) -> bool: |
| if i >= n or not isinstance(kps[i], (list, tuple)) or len(kps[i]) < 2: |
| return False |
| x, y = float(kps[i][0]), float(kps[i][1]) |
| return not (x == 0.0 and y == 0.0) |
|
|
| valid_indices = [i for i in range(n) if _valid(i)] |
| if len(valid_indices) < 2: |
| return 0 |
| to_remove: set[int] = set() |
| for ii in range(len(valid_indices)): |
| a = valid_indices[ii] |
| ax, ay = float(kps[a][0]), float(kps[a][1]) |
| for jj in range(ii + 1, len(valid_indices)): |
| b = valid_indices[jj] |
| bx, by = float(kps[b][0]), float(kps[b][1]) |
| if math.hypot(ax - bx, ay - by) <= proximity_px: |
| to_remove.add(a) |
| to_remove.add(b) |
| for idx in to_remove: |
| kps[idx] = [0.0, 0.0] |
| return len(to_remove) |
|
|
|
|
| class _Xe(Exception): |
| pass |
|
|
|
|
| def _y0() -> ndarray: |
| template_path = Path(__file__).parent / "football_pitch_template.png" |
| img = cv2.imread(str(template_path), cv2.IMREAD_COLOR) |
| if img is None: |
| return np.zeros((720, 1280, 3), dtype=np.uint8) |
| return img |
|
|
|
|
| def _y1(mask: ndarray) -> bool: |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| for cnt in contours: |
| _, _, w, h = cv2.boundingRect(cnt) |
| if w == 0 or h == 0: |
| continue |
| if min(w, h) / max(w, h) >= 1.0: |
| return True |
| return False |
|
|
|
|
| def _y2(ground_mask: ndarray, line_mask: ndarray) -> None: |
| if ground_mask.sum() == 0: |
| raise _Xe("No projected ground (empty mask)") |
| pts = cv2.findNonZero(ground_mask) |
| if pts is None: |
| raise _Xe("No projected ground (empty mask)") |
| _, _, w, h = cv2.boundingRect(pts) |
| if cv2.countNonZero(ground_mask) == w * h: |
| raise _Xe("Projected ground should not be rectangular") |
| n_labels, _ = cv2.connectedComponents(ground_mask) |
| if n_labels - 1 > 1: |
| raise _Xe("Projected ground should be a single object") |
| if ground_mask.sum() / ground_mask.size >= 0.9: |
| raise _Xe("Projected ground covers too much of the image") |
| if line_mask.sum() == 0: |
| raise _Xe("No projected lines") |
| if line_mask.sum() == line_mask.size: |
| raise _Xe("Projected lines cover the entire image") |
| if _y1(line_mask): |
| raise _Xe("A projected line is too wide") |
|
|
|
|
| def _y3(pts: ndarray) -> bool: |
| def _ccw(a: tuple, b: tuple, c: tuple) -> bool: |
| return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0]) |
|
|
| def _intersect(p1: tuple, p2: tuple, q1: tuple, q2: tuple) -> bool: |
| return (_ccw(p1, q1, q2) != _ccw(p2, q1, q2)) and (_ccw(p1, p2, q1) != _ccw(p1, p2, q2)) |
|
|
| p = pts.reshape(-1, 2) |
| if len(p) < 4: |
| return False |
| edges = [(p[0], p[1]), (p[1], p[2]), (p[2], p[3]), (p[3], p[0])] |
| return _intersect(*edges[0], *edges[2]) or _intersect(*edges[1], *edges[3]) |
|
|
|
|
| def _y4( |
| template: ndarray, |
| src_kps: list[tuple[float, float]], |
| dst_kps: list[tuple[float, float]], |
| frame_width: int, |
| frame_height: int, |
| ) -> ndarray: |
| src = np.array(src_kps, dtype=np.float32) |
| dst = np.array(dst_kps, dtype=np.float32) |
| H, _ = cv2.findHomography(src, dst) |
| if H is None: |
| raise ValueError("Homography computation failed") |
| warped = cv2.warpPerspective(template, H, (frame_width, frame_height)) |
| corner_indices = [_I0, _I1, _I3, _I2] |
| if len(src_kps) > max(corner_indices): |
| src_corners = np.array( |
| [[src_kps[i][0], src_kps[i][1]] for i in corner_indices], |
| dtype=np.float32, |
| ).reshape(1, 4, 2) |
| proj_corners = cv2.perspectiveTransform(src_corners, H)[0] |
| if _y3(proj_corners): |
| raise _Xe("Projection twisted!") |
| return warped |
|
|
|
|
| def _y5(warped: ndarray) -> tuple[ndarray, ndarray]: |
| gray = cv2.cvtColor(warped, cv2.COLOR_BGR2GRAY) |
| _, m_ground = cv2.threshold(gray, 10, 255, cv2.THRESH_BINARY) |
| _, m_lines = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY) |
| ground_bin = (m_ground > 0).astype(np.uint8) |
| lines_bin = (m_lines > 0).astype(np.uint8) |
| _y2(ground_bin, lines_bin) |
| return ground_bin, lines_bin |
|
|
|
|
| def _y6(frame: ndarray, ground_mask: ndarray) -> ndarray: |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (31, 31)) |
| gray = cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel) |
| gray = cv2.GaussianBlur(gray, (5, 5), 0) |
| edges = cv2.Canny(gray, 30, 100) |
| edges_on_ground = cv2.bitwise_and(edges, edges, mask=ground_mask) |
| dilate_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) |
| edges_on_ground = cv2.dilate(edges_on_ground, dilate_kernel, iterations=3) |
| return (edges_on_ground > 0).astype(np.uint8) |
|
|
|
|
| def _fit_line_to_points(points: list[tuple[float, float]]) -> tuple[float, float, float] | None: |
| if len(points) < 2: |
| return None |
| pts = np.array(points, dtype=np.float64) |
| x = pts[:, 0] |
| y = pts[:, 1] |
| mx, my = float(x.mean()), float(y.mean()) |
| u = x - mx |
| v = y - my |
| n = len(pts) |
| cxx = (u * u).sum() / n |
| cxy = (u * v).sum() / n |
| cyy = (v * v).sum() / n |
| trace = cxx + cyy |
| diff = cxx - cyy |
| lambda_small = (trace - np.sqrt(diff * diff + 4.0 * cxy * cxy)) * 0.5 |
| a = float(cxy) |
| b = float(lambda_small - cxx) |
| norm = np.sqrt(a * a + b * b) |
| if norm < 1e-12: |
| a, b = 1.0, 0.0 |
| else: |
| a, b = a / norm, b / norm |
| c = -(a * mx + b * my) |
| return (a, b, c) |
|
|
|
|
| def _line_intersection( |
| a1: float, b1: float, c1: float, |
| a2: float, b2: float, c2: float, |
| ) -> tuple[float, float] | None: |
| det = a1 * b2 - a2 * b1 |
| if abs(det) < 1e-12: |
| return None |
| x = (b1 * c2 - b2 * c1) / det |
| y = (a2 * c1 - a1 * c2) / det |
| return (float(x), float(y)) |
|
|
|
|
| def _line_through_two_points(x1: float, y1: float, x2: float, y2: float) -> tuple[float, float, float]: |
| a = y2 - y1 |
| b = -(x2 - x1) |
| c = (x2 - x1) * y1 - (y2 - y1) * x1 |
| return (a, b, c) |
|
|
|
|
| def _frame_line_edges(frame: ndarray) -> ndarray: |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (31, 31)) |
| gray = cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel) |
| gray = cv2.GaussianBlur(gray, (5, 5), 0) |
| return cv2.Canny(gray, 30, 100) |
|
|
|
|
| def _dilate_uint8_full_frame(frame: ndarray) -> ndarray: |
| edges = _frame_line_edges(frame) |
| dilate_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) |
| dilated = cv2.dilate(edges, dilate_kernel, iterations=3) |
| return ((dilated > 0).astype(np.uint8)) * 255 |
|
|
|
|
| def _clip_segment_to_rect( |
| x1: float, y1: float, x2: float, y2: float, |
| w: int, h: int, |
| ) -> tuple[tuple[float, float], tuple[float, float]] | None: |
| dx, dy = x2 - x1, y2 - y1 |
| pts: list[tuple[float, float]] = [] |
| if 0 <= x1 <= w and 0 <= y1 <= h: |
| pts.append((x1, y1)) |
| if 0 <= x2 <= w and 0 <= y2 <= h: |
| pts.append((x2, y2)) |
| if abs(dx) >= 1e-12: |
| for x_edge in (0.0, float(w - 1)): |
| t = (x_edge - x1) / dx |
| if 0 <= t <= 1: |
| y = y1 + t * dy |
| if 0 <= y <= h - 1: |
| pts.append((x_edge, y)) |
| if abs(dy) >= 1e-12: |
| for y_edge in (0.0, float(h - 1)): |
| t = (y_edge - y1) / dy |
| if 0 <= t <= 1: |
| x = x1 + t * dx |
| if 0 <= x <= w - 1: |
| pts.append((x, y_edge)) |
| if len(pts) < 2: |
| if len(pts) == 1: |
| return (pts[0], pts[0]) |
| return None |
| pts_sorted = sorted(pts, key=lambda p: p[0]) |
| return (pts_sorted[0], pts_sorted[-1]) |
|
|
|
|
| def _segment_fully_inside_mask( |
| p1: tuple[int, int], |
| p2: tuple[int, int], |
| mask: ndarray, |
| ) -> bool: |
| h, w = mask.shape[:2] |
| x1, y1 = p1[0], p1[1] |
| x2, y2 = p2[0], p2[1] |
| n = max(abs(x2 - x1), abs(y2 - y1), 1) |
| for k in range(n + 1): |
| t = k / n |
| x = int(round(x1 + t * (x2 - x1))) |
| y = int(round(y1 + t * (y2 - y1))) |
| if x < 0 or x >= w or y < 0 or y >= h: |
| return False |
| if mask[y, x] == 0: |
| return False |
| return True |
|
|
|
|
| def _longest_segment_fully_inside_mask( |
| mask: ndarray, |
| contour_points: ndarray, |
| ) -> tuple[tuple[int, int], tuple[int, int]] | None: |
| pts = contour_points.reshape(-1, 2) |
| n_pts = len(pts) |
| if n_pts < 2: |
| return None |
| best_len_sq = -1.0 |
| best_p1, best_p2 = None, None |
| for i in range(n_pts): |
| for j in range(i + 1, n_pts): |
| p1 = (int(pts[i][0]), int(pts[i][1])) |
| p2 = (int(pts[j][0]), int(pts[j][1])) |
| if not _segment_fully_inside_mask(p1, p2, mask): |
| continue |
| d_sq = (pts[i][0] - pts[j][0]) ** 2 + (pts[i][1] - pts[j][1]) ** 2 |
| if d_sq > best_len_sq: |
| best_len_sq = d_sq |
| best_p1, best_p2 = p1, p2 |
| if best_p1 is not None and best_p2 is not None: |
| return (best_p1, best_p2) |
| return None |
|
|
|
|
| def _line_segment_for_drawing( |
| a: float, b: float, c: float, w: int, h: int, |
| ) -> tuple[tuple[float, float], tuple[float, float]] | None: |
| pts: list[tuple[float, float]] = [] |
| if abs(b) >= 1e-12: |
| for x in (0.0, float(w - 1)): |
| y = -(a * x + c) / b |
| if -50 <= y <= h + 50: |
| pts.append((x, y)) |
| if abs(a) >= 1e-12: |
| for y in (0.0, float(h - 1)): |
| x = -(b * y + c) / a |
| if -50 <= x <= w + 50: |
| pts.append((x, y)) |
| if len(pts) < 2: |
| return None |
| seen: set[tuple[float, float]] = set() |
| unique = [] |
| for p in pts: |
| key = (round(p[0], 2), round(p[1], 2)) |
| if key not in seen: |
| seen.add(key) |
| unique.append(p) |
| if len(unique) < 2: |
| return None |
| unique.sort(key=lambda p: (p[0], p[1])) |
| return (unique[0], unique[-1]) |
|
|
|
|
| def _y7() -> dict[int, int]: |
| return {i: 2 for i in _W0} |
|
|
|
|
| def _y8() -> dict[int, int]: |
| m: dict[int, int] = {} |
| for i in _W1: |
| m[i] = 3 |
| for i in _W2: |
| m[i] = 4 |
| m[16] = _KP16_WEIGHT |
| return m |
|
|
|
|
| def _y9() -> dict[int, int]: |
| m: dict[int, int] = {} |
| for i in _W3: |
| m[i] = 2 |
| for i in _W4: |
| m[i] = 3 |
| for i in _W5: |
| m[i] = 4 |
| m[16] = _KP16_WEIGHT |
| return m |
|
|
|
|
| def _y10( |
| valid_indices: list[int], |
| valid_src: list[tuple[float, float]], |
| valid_dst: list[tuple[float, float]], |
| weight_by_index: dict[int, int], |
| ) -> ndarray | None: |
| src_list: list[tuple[float, float]] = [] |
| dst_list: list[tuple[float, float]] = [] |
| for idx, (s, d) in zip(valid_indices, zip(valid_src, valid_dst)): |
| w = max(1, weight_by_index.get(idx, 1)) |
| for _ in range(w): |
| src_list.append(s) |
| dst_list.append(d) |
| if len(src_list) < 4: |
| return None |
| src_np = np.array(src_list, dtype=np.float32) |
| dst_np = np.array(dst_list, dtype=np.float32) |
| H, _ = cv2.findHomography(src_np, dst_np) |
| return H |
|
|
|
|
| def _y11( |
| H: ndarray, |
| template_image: ndarray, |
| video_frame: ndarray, |
| valid_indices: list[int] | None = None, |
| valid_src: list[tuple[float, float]] | None = None, |
| valid_dst: list[tuple[float, float]] | None = None, |
| weight_map: dict[int, int] | None = None, |
| ) -> tuple[float, ndarray | None, list[tuple[float, float]] | None]: |
| h, w = video_frame.shape[:2] |
|
|
| def _score_from_warped(warped: ndarray) -> float: |
| ground_mask, line_mask = _y5(warped) |
| predicted_mask = _y6(video_frame, ground_mask) |
| overlap = cv2.bitwise_and(line_mask, predicted_mask) |
| pixels_on_lines = int(line_mask.sum()) |
| pixels_overlap = int(overlap.sum()) |
| return float(pixels_overlap) / float(pixels_on_lines + 1e-8) |
|
|
| try: |
| warped = cv2.warpPerspective(template_image, H, (w, h)) |
| score = _score_from_warped(warped) |
| return (score, H, None) |
| except _Xe as e: |
| err_msg = e.args[0] if e.args else "" |
| if ( |
| err_msg in _MASK_RETRY_ERRORS |
| and valid_indices is not None |
| and valid_src is not None |
| and valid_dst is not None |
| and weight_map is not None |
| ): |
| idx_smallest_y = min(range(len(valid_dst)), key=lambda i: valid_dst[i][1]) |
| x0, y0 = valid_dst[idx_smallest_y] |
| for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]: |
| new_dst = list(valid_dst) |
| new_dst[idx_smallest_y] = (x0 + dx, y0 + dy) |
| H2 = _y10(valid_indices, valid_src, new_dst, weight_map) |
| if H2 is None: |
| continue |
| try: |
| warped2 = cv2.warpPerspective(template_image, H2, (w, h)) |
| score = _score_from_warped(warped2) |
| return (score, H2, new_dst) |
| except _Xe: |
| continue |
| return (0.0, None, None) |
| except Exception: |
| return (0.0, None, None) |
|
|
|
|
| def _is_kp_valid(kp: Any) -> bool: |
| if not isinstance(kp, (list, tuple)) or len(kp) < 2: |
| return False |
| try: |
| x, y = float(kp[0]), float(kp[1]) |
| except (TypeError, ValueError): |
| return False |
| return not (x == 0.0 and y == 0.0) |
|
|
|
|
| def _refine_kp5_kp16_kp29( |
| kps: list[list[float]], |
| H: ndarray, |
| video_frame: ndarray, |
| template_image: ndarray, |
| *, |
| precomputed_dilate_uint8: ndarray | None = None, |
| precomputed_warped: ndarray | None = None, |
| precomputed_ground_mask: ndarray | None = None, |
| ) -> tuple[bool, str | None]: |
| n_valid_5_16_29 = sum(1 for i in (5, 16, 29) if i < len(kps) and _is_kp_valid(kps[i])) |
| if n_valid_5_16_29 >= 2: |
| return (False, None) |
| h, w = video_frame.shape[:2] |
| kp16_valid_input = _is_kp_valid(kps[16]) if len(kps) > 16 else False |
| left_set = [0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12] |
| right_set = [18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] |
| middle_set = [9, 13, 14, 15, 16, 17, 30, 31] |
| decision: str | None = None |
| if any(i < len(kps) and _is_kp_valid(kps[i]) for i in left_set): |
| decision = "left" |
| elif any(i < len(kps) and _is_kp_valid(kps[i]) for i in right_set): |
| decision = "right" |
| elif any(i < len(kps) and _is_kp_valid(kps[i]) for i in middle_set): |
| decision = "middle" |
| else: |
| decision = "other" |
| src_pts = np.array([_F1[i] for i in (5, 16, 29)], dtype=np.float32).reshape(1, 3, 2) |
| projected = cv2.perspectiveTransform(src_pts, H)[0] |
| for idx, i in enumerate((5, 16, 29)): |
| if i < len(kps) and not _is_kp_valid(kps[i]): |
| kps[i] = [float(projected[idx][0]), float(projected[idx][1])] |
| tkp_5 = (float(kps[5][0]), float(kps[5][1])) |
| tkp_16 = (float(kps[16][0]), float(kps[16][1])) |
| tkp_29 = (float(kps[29][0]), float(kps[29][1])) |
| clip = _clip_segment_to_rect(tkp_5[0], tkp_5[1], tkp_29[0], tkp_29[1], w, h) |
| if clip is None: |
| return (False, None) |
| (ax, ay), (bx, by) = clip |
|
|
| if decision == "right": |
| clip_r = _clip_segment_to_rect(tkp_16[0], tkp_16[1], tkp_29[0], tkp_29[1], w, h) |
| if clip_r is None: |
| return (False, None) |
| (Ax, Ay), (Bx, By) = clip_r |
| valid_indices_52 = [] |
| valid_src_52 = [] |
| valid_dst_52 = [] |
| for idx, kp in enumerate(kps): |
| if not _is_kp_valid(kp): |
| continue |
| x, y = float(kp[0]), float(kp[1]) |
| valid_indices_52.append(idx) |
| valid_src_52.append(_F1[idx] if idx < len(_F1) else (0.0, 0.0)) |
| valid_dst_52.append((x, y)) |
| warped_r = precomputed_warped |
| ground_mask_r = precomputed_ground_mask |
| H_use_r = H |
| if warped_r is None or ground_mask_r is None: |
| try: |
| warped_r = cv2.warpPerspective(template_image, H_use_r, (w, h)) |
| ground_mask_r, _ = _y5(warped_r) |
| except _Xe as e: |
| err_msg = e.args[0] if e.args else "" |
| if err_msg in _MASK_RETRY_ERRORS and len(valid_indices_52) >= 4 and len(valid_dst_52) >= 4: |
| idx_smallest_y = min(range(len(valid_dst_52)), key=lambda i: valid_dst_52[i][1]) |
| x0, y0 = valid_dst_52[idx_smallest_y] |
| for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]: |
| new_dst = list(valid_dst_52) |
| new_dst[idx_smallest_y] = (x0 + dx, y0 + dy) |
| H_retry = _y10(valid_indices_52, valid_src_52, new_dst, {}) |
| if H_retry is None: |
| continue |
| try: |
| warped_r = cv2.warpPerspective(template_image, H_retry, (w, h)) |
| ground_mask_r, _ = _y5(warped_r) |
| H_use_r = H_retry |
| break |
| except _Xe: |
| continue |
| if warped_r is None or ground_mask_r is None: |
| return (False, None) |
| except Exception: |
| return (False, None) |
| if warped_r is None or ground_mask_r is None: |
| return (False, None) |
| dilate_uint8_r = precomputed_dilate_uint8 if precomputed_dilate_uint8 is not None else _dilate_uint8_full_frame(video_frame) |
| pts_right = [(float(kps[i][0]), float(kps[i][1])) for i in [24, 25, 26, 27, 28, 29] if i < len(kps) and _is_kp_valid(kps[i])] |
| if len(pts_right) >= 2: |
| line3 = _fit_line_to_points(pts_right) |
| else: |
| src_24_29 = np.array([[_F1[i] for i in [24, 25, 26, 27, 28, 29]]], dtype=np.float32) |
| tkp_24_29 = cv2.perspectiveTransform(src_24_29, H_use_r)[0] |
| pts_right = [(float(tkp_24_29[i][0]), float(tkp_24_29[i][1])) for i in range(6)] |
| line3 = _fit_line_to_points(pts_right) |
| if line3 is None: |
| return (False, None) |
| a3, b3, c3 = line3 |
| norm_u = math.hypot(b3, -a3) |
| if norm_u < 1e-12: |
| return (False, None) |
| ux, uy = b3 / norm_u, -a3 / norm_u |
| d = _STEP5_2_RIGHT_QUAD_HALFLENGTH |
| A1 = (Ax - d * ux, Ay - d * uy) |
| A2 = (Ax + d * ux, Ay + d * uy) |
| B1 = (Bx - d * ux, By - d * uy) |
| B2 = (Bx + d * ux, By + d * uy) |
| pts_poly = np.array([[A1[0], A1[1]], [A2[0], A2[1]], [B2[0], B2[1]], [B1[0], B1[1]]], dtype=np.int32) |
| mask_poly = np.zeros((h, w), dtype=np.uint8) |
| cv2.fillConvexPoly(mask_poly, pts_poly, 255) |
| dilate_in_roi = cv2.bitwise_and(dilate_uint8_r, mask_poly) |
| px = pts_poly[:, 0] |
| py = pts_poly[:, 1] |
| x_min = max(0, int(px.min()) - _STEP5_2_ROI_MARGIN) |
| y_min = max(0, int(py.min()) - _STEP5_2_ROI_MARGIN) |
| x_max = min(w, int(px.max()) + 1 + _STEP5_2_ROI_MARGIN) |
| y_max = min(h, int(py.max()) + 1 + _STEP5_2_ROI_MARGIN) |
| roi_w = x_max - x_min |
| roi_h = y_max - y_min |
| dilate_roi = dilate_in_roi[y_min:y_max, x_min:x_max] |
| num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(dilate_roi, connectivity=8) |
| best_label = 0 |
| best_area = 0 |
| for i in range(1, num_labels): |
| area = stats[i, cv2.CC_STAT_AREA] |
| if area > best_area: |
| best_area = area |
| best_label = i |
| longest_mask_roi = ((labels == best_label).astype(np.uint8)) * 255 |
| longest_mask = np.zeros((h, w), dtype=np.uint8) |
| longest_mask[y_min:y_max, x_min:x_max] = longest_mask_roi |
| p1, p2 = None, None |
| A3, B3 = None, None |
| contours, _ = cv2.findContours(longest_mask_roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) |
| if contours: |
| contour = max(contours, key=cv2.contourArea) |
| pts_contour = contour.reshape(-1, 2) |
| n_c = len(pts_contour) |
| max_pts = _STEP5_2_LONGEST_SEGMENT_MAX_PTS |
| if n_c > max_pts: |
| step = max(1, n_c // max_pts) |
| pts_subsample = pts_contour[np.arange(0, n_c, step)] |
| else: |
| pts_subsample = pts_contour |
| if _STEP5_2_HEAVY_SEARCH_FLAG: |
| result = _longest_segment_fully_inside_mask(longest_mask_roi, pts_subsample) |
| if result is not None: |
| p1_roi, p2_roi = result |
| p1 = (p1_roi[0] + x_min, p1_roi[1] + y_min) |
| p2 = (p2_roi[0] + x_min, p2_roi[1] + y_min) |
| else: |
| best_len_sq = -1.0 |
| best_p1_roi, best_p2_roi = None, None |
| for i in range(len(pts_subsample)): |
| for j in range(i + 1, len(pts_subsample)): |
| d_sq = (pts_subsample[i][0] - pts_subsample[j][0]) ** 2 + (pts_subsample[i][1] - pts_subsample[j][1]) ** 2 |
| if d_sq > best_len_sq: |
| best_len_sq = d_sq |
| best_p1_roi = (int(pts_subsample[i][0]), int(pts_subsample[i][1])) |
| best_p2_roi = (int(pts_subsample[j][0]), int(pts_subsample[j][1])) |
| if best_p1_roi is not None and best_p2_roi is not None: |
| p1 = (best_p1_roi[0] + x_min, best_p1_roi[1] + y_min) |
| p2 = (best_p2_roi[0] + x_min, best_p2_roi[1] + y_min) |
| if p1 is not None and p2 is not None: |
| a_long, b_long, c_long = _line_through_two_points(float(p1[0]), float(p1[1]), float(p2[0]), float(p2[1])) |
| a2, b2, c2 = _line_through_two_points(B1[0], B1[1], B2[0], B2[1]) |
| B3 = _line_intersection(a_long, b_long, c_long, a2, b2, c2) |
| seg_border = _line_segment_for_drawing(a_long, b_long, c_long, w, h) |
| if seg_border is not None: |
| A3 = seg_border[0] |
| if A3 is not None and B3 is not None: |
| c4 = -a3 * A3[0] - b3 * A3[1] |
| A3x, A3y = A3[0], A3[1] |
| B3x, B3y = B3[0], B3[1] |
| A3x_roi = A3x - x_min |
| A3y_roi = A3y - y_min |
| B3x_roi = B3x - x_min |
| B3y_roi = B3y - y_min |
| if _STEP5_2_8PX_HALFRES and roi_w >= 4 and roi_h >= 4: |
| dilate_8px = cv2.resize(dilate_roi, (roi_w // 2, roi_h // 2), interpolation=cv2.INTER_NEAREST) |
| roi_w_8, roi_h_8 = roi_w // 2, roi_h // 2 |
| scale_8, seg_width_8 = 0.5, 4 |
| else: |
| dilate_8px = dilate_roi |
| roi_w_8, roi_h_8 = roi_w, roi_h |
| scale_8, seg_width_8 = 1.0, 8 |
| mask_8_roi = np.zeros((roi_h_8, roi_w_8), dtype=np.uint8) |
| overlap_roi = np.empty((roi_h_8, roi_w_8), dtype=np.uint8) |
| best_count_8 = -1 |
| best_s, best_t = 0, 0 |
| for s in range(-30, 31, _STEP5_2_8PX_COARSE_STEP): |
| for t in range(-30, 31, _STEP5_2_8PX_COARSE_STEP): |
| A4x_roi = A3x_roi + s * ux |
| A4y_roi = A3y_roi + s * uy |
| B4x_roi = B3x_roi + t * ux |
| B4y_roi = B3y_roi + t * uy |
| ax_d = int(round(A4x_roi * scale_8)) |
| ay_d = int(round(A4y_roi * scale_8)) |
| bx_d = int(round(B4x_roi * scale_8)) |
| by_d = int(round(B4y_roi * scale_8)) |
| mask_8_roi.fill(0) |
| cv2.line(mask_8_roi, (ax_d, ay_d), (bx_d, by_d), 255, seg_width_8) |
| cv2.bitwise_and(dilate_8px, mask_8_roi, overlap_roi) |
| count = cv2.countNonZero(overlap_roi) |
| if count > best_count_8: |
| best_count_8 = count |
| best_s, best_t = s, t |
| if _STEP5_2_8PX_REFINE_PASS: |
| s_lo = max(-30, best_s - _STEP5_2_8PX_REFINE_WINDOW) |
| s_hi = min(31, best_s + _STEP5_2_8PX_REFINE_WINDOW + 1) |
| t_lo = max(-30, best_t - _STEP5_2_8PX_REFINE_WINDOW) |
| t_hi = min(31, best_t + _STEP5_2_8PX_REFINE_WINDOW + 1) |
| for s in range(s_lo, s_hi, 5): |
| for t in range(t_lo, t_hi, 5): |
| A4x_roi = A3x_roi + s * ux |
| A4y_roi = A3y_roi + s * uy |
| B4x_roi = B3x_roi + t * ux |
| B4y_roi = B3y_roi + t * uy |
| ax_d = int(round(A4x_roi * scale_8)) |
| ay_d = int(round(A4y_roi * scale_8)) |
| bx_d = int(round(B4x_roi * scale_8)) |
| by_d = int(round(B4y_roi * scale_8)) |
| mask_8_roi.fill(0) |
| cv2.line(mask_8_roi, (ax_d, ay_d), (bx_d, by_d), 255, seg_width_8) |
| cv2.bitwise_and(dilate_8px, mask_8_roi, overlap_roi) |
| count = cv2.countNonZero(overlap_roi) |
| if count > best_count_8: |
| best_count_8 = count |
| best_s, best_t = s, t |
| A4 = (A3x + best_s * ux, A3y + best_s * uy) |
| B4 = (B3x + best_t * ux, B3y + best_t * uy) |
| a_ab, b_ab, c_ab = _line_through_two_points(A4[0], A4[1], B4[0], B4[1]) |
| kkp29 = _line_intersection(a_ab, b_ab, c_ab, a3, b3, c3) |
| center_pts = [(float(kps[i][0]), float(kps[i][1])) for i in [13, 14, 15, 16] if i < len(kps) and _is_kp_valid(kps[i])] |
| if len(center_pts) >= 2: |
| line_13_16 = _fit_line_to_points(center_pts) |
| else: |
| src_13_16 = np.array([[_F1[i] for i in [13, 14, 15, 16]]], dtype=np.float32) |
| tkp_13_16 = cv2.perspectiveTransform(src_13_16, H_use_r)[0] |
| center_pts = [(float(tkp_13_16[i][0]), float(tkp_13_16[i][1])) for i in range(4)] |
| line_13_16 = _fit_line_to_points(center_pts) |
| kkp16 = _line_intersection(a_ab, b_ab, c_ab, line_13_16[0], line_13_16[1], line_13_16[2]) if line_13_16 is not None else None |
| if kkp29 is not None: |
| kps[29] = [float(kkp29[0]), float(kkp29[1])] |
| if kkp16 is not None: |
| kps[16] = [float(kkp16[0]), float(kkp16[1])] |
| if kkp16 is not None and kkp16[0] > 0: |
| pts_0_5_r = [(float(kps[i][0]), float(kps[i][1])) for i in [0, 1, 2, 3, 4, 5] if i < len(kps) and _is_kp_valid(kps[i])] |
| if len(pts_0_5_r) >= 2: |
| line_0_5_r = _fit_line_to_points(pts_0_5_r) |
| else: |
| src_0_5_r = np.array([[_F1[i] for i in [0, 1, 2, 3, 4, 5]]], dtype=np.float32) |
| tkp_0_5_r = cv2.perspectiveTransform(src_0_5_r, H_use_r)[0] |
| pts_0_5_r = [(float(tkp_0_5_r[i][0]), float(tkp_0_5_r[i][1])) for i in range(6)] |
| line_0_5_r = _fit_line_to_points(pts_0_5_r) |
| kkp5_r = _line_intersection(a_ab, b_ab, c_ab, line_0_5_r[0], line_0_5_r[1], line_0_5_r[2]) if line_0_5_r is not None else None |
| if kkp5_r is not None: |
| kps[5] = [float(kkp5_r[0]), float(kkp5_r[1])] |
| return (True, "right") |
|
|
| if decision == "left": |
| clip_l = _clip_segment_to_rect(tkp_5[0], tkp_5[1], tkp_16[0], tkp_16[1], w, h) |
| if clip_l is None: |
| return (False, None) |
| (Bx, By), (Ax, Ay) = clip_l |
| valid_indices_52 = [] |
| valid_src_52 = [] |
| valid_dst_52 = [] |
| for idx, kp in enumerate(kps): |
| if not _is_kp_valid(kp): |
| continue |
| x, y = float(kp[0]), float(kp[1]) |
| valid_indices_52.append(idx) |
| valid_src_52.append(_F1[idx] if idx < len(_F1) else (0.0, 0.0)) |
| valid_dst_52.append((x, y)) |
| warped_l = precomputed_warped |
| ground_mask_l = precomputed_ground_mask |
| H_use_l = H |
| if warped_l is None or ground_mask_l is None: |
| try: |
| warped_l = cv2.warpPerspective(template_image, H_use_l, (w, h)) |
| ground_mask_l, _ = _y5(warped_l) |
| except _Xe as e: |
| err_msg = e.args[0] if e.args else "" |
| if err_msg in _MASK_RETRY_ERRORS and len(valid_indices_52) >= 4 and len(valid_dst_52) >= 4: |
| idx_smallest_y = min(range(len(valid_dst_52)), key=lambda i: valid_dst_52[i][1]) |
| x0, y0 = valid_dst_52[idx_smallest_y] |
| for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]: |
| new_dst = list(valid_dst_52) |
| new_dst[idx_smallest_y] = (x0 + dx, y0 + dy) |
| H_retry = _y10(valid_indices_52, valid_src_52, new_dst, {}) |
| if H_retry is None: |
| continue |
| try: |
| warped_l = cv2.warpPerspective(template_image, H_retry, (w, h)) |
| ground_mask_l, _ = _y5(warped_l) |
| H_use_l = H_retry |
| break |
| except _Xe: |
| continue |
| if warped_l is None or ground_mask_l is None: |
| return (False, None) |
| except Exception: |
| return (False, None) |
| if warped_l is None or ground_mask_l is None: |
| return (False, None) |
| dilate_uint8_l = precomputed_dilate_uint8 if precomputed_dilate_uint8 is not None else _dilate_uint8_full_frame(video_frame) |
| pts_left = [(float(kps[i][0]), float(kps[i][1])) for i in [0, 1, 2, 3, 4, 5] if i < len(kps) and _is_kp_valid(kps[i])] |
| if len(pts_left) >= 2: |
| line3_l = _fit_line_to_points(pts_left) |
| else: |
| src_0_5 = np.array([[_F1[i] for i in [0, 1, 2, 3, 4, 5]]], dtype=np.float32) |
| tkp_0_5 = cv2.perspectiveTransform(src_0_5, H_use_l)[0] |
| pts_left = [(float(tkp_0_5[i][0]), float(tkp_0_5[i][1])) for i in range(6)] |
| line3_l = _fit_line_to_points(pts_left) |
| if line3_l is None: |
| return (False, None) |
| a3_l, b3_l, c3_l = line3_l |
| norm_u_l = math.hypot(b3_l, -a3_l) |
| if norm_u_l < 1e-12: |
| return (False, None) |
| ux_l, uy_l = b3_l / norm_u_l, -a3_l / norm_u_l |
| d_l = _STEP5_2_RIGHT_QUAD_HALFLENGTH |
| A1_l = (Ax - d_l * ux_l, Ay - d_l * uy_l) |
| A2_l = (Ax + d_l * ux_l, Ay + d_l * uy_l) |
| B1_l = (Bx - d_l * ux_l, By - d_l * uy_l) |
| B2_l = (Bx + d_l * ux_l, By + d_l * uy_l) |
| pts_poly_l = np.array([[A1_l[0], A1_l[1]], [A2_l[0], A2_l[1]], [B2_l[0], B2_l[1]], [B1_l[0], B1_l[1]]], dtype=np.int32) |
| mask_poly_l = np.zeros((h, w), dtype=np.uint8) |
| cv2.fillConvexPoly(mask_poly_l, pts_poly_l, 255) |
| dilate_in_roi_l = cv2.bitwise_and(dilate_uint8_l, mask_poly_l) |
| px_l = pts_poly_l[:, 0] |
| py_l = pts_poly_l[:, 1] |
| x_min_l = max(0, int(px_l.min()) - _STEP5_2_ROI_MARGIN) |
| y_min_l = max(0, int(py_l.min()) - _STEP5_2_ROI_MARGIN) |
| x_max_l = min(w, int(px_l.max()) + 1 + _STEP5_2_ROI_MARGIN) |
| y_max_l = min(h, int(py_l.max()) + 1 + _STEP5_2_ROI_MARGIN) |
| roi_w_l = x_max_l - x_min_l |
| roi_h_l = y_max_l - y_min_l |
| dilate_roi_l = dilate_in_roi_l[y_min_l:y_max_l, x_min_l:x_max_l] |
| num_labels_l, labels_l, stats_l, _ = cv2.connectedComponentsWithStats(dilate_roi_l, connectivity=8) |
| best_label_l = 0 |
| best_area_l = 0 |
| for i in range(1, num_labels_l): |
| area = stats_l[i, cv2.CC_STAT_AREA] |
| if area > best_area_l: |
| best_area_l = area |
| best_label_l = i |
| longest_mask_roi_l = ((labels_l == best_label_l).astype(np.uint8)) * 255 |
| longest_mask_l = np.zeros((h, w), dtype=np.uint8) |
| longest_mask_l[y_min_l:y_max_l, x_min_l:x_max_l] = longest_mask_roi_l |
| p1_l, p2_l = None, None |
| A3_l, B3_l = None, None |
| contours_l, _ = cv2.findContours(longest_mask_roi_l, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) |
| if contours_l: |
| contour_l = max(contours_l, key=cv2.contourArea) |
| pts_contour_l = contour_l.reshape(-1, 2) |
| n_c_l = len(pts_contour_l) |
| max_pts_l = _STEP5_2_LONGEST_SEGMENT_MAX_PTS |
| if n_c_l > max_pts_l: |
| step_l = max(1, n_c_l // max_pts_l) |
| pts_subsample_l = pts_contour_l[np.arange(0, n_c_l, step_l)] |
| else: |
| pts_subsample_l = pts_contour_l |
| if _STEP5_2_HEAVY_SEARCH_FLAG: |
| result_l = _longest_segment_fully_inside_mask(longest_mask_roi_l, pts_subsample_l) |
| if result_l is not None: |
| p1_roi_l, p2_roi_l = result_l |
| p1_l = (p1_roi_l[0] + x_min_l, p1_roi_l[1] + y_min_l) |
| p2_l = (p2_roi_l[0] + x_min_l, p2_roi_l[1] + y_min_l) |
| else: |
| best_len_sq_l = -1.0 |
| best_p1_roi_l, best_p2_roi_l = None, None |
| for i in range(len(pts_subsample_l)): |
| for j in range(i + 1, len(pts_subsample_l)): |
| d_sq = (pts_subsample_l[i][0] - pts_subsample_l[j][0]) ** 2 + (pts_subsample_l[i][1] - pts_subsample_l[j][1]) ** 2 |
| if d_sq > best_len_sq_l: |
| best_len_sq_l = d_sq |
| best_p1_roi_l = (int(pts_subsample_l[i][0]), int(pts_subsample_l[i][1])) |
| best_p2_roi_l = (int(pts_subsample_l[j][0]), int(pts_subsample_l[j][1])) |
| if best_p1_roi_l is not None and best_p2_roi_l is not None: |
| p1_l = (best_p1_roi_l[0] + x_min_l, best_p1_roi_l[1] + y_min_l) |
| p2_l = (best_p2_roi_l[0] + x_min_l, best_p2_roi_l[1] + y_min_l) |
| if p1_l is not None and p2_l is not None: |
| a_long_l, b_long_l, c_long_l = _line_through_two_points(float(p1_l[0]), float(p1_l[1]), float(p2_l[0]), float(p2_l[1])) |
| a2_l, b2_l, c2_l = _line_through_two_points(B1_l[0], B1_l[1], B2_l[0], B2_l[1]) |
| B3_l = _line_intersection(a_long_l, b_long_l, c_long_l, a2_l, b2_l, c2_l) |
| seg_border_l = _line_segment_for_drawing(a_long_l, b_long_l, c_long_l, w, h) |
| if seg_border_l is not None: |
| A3_l = seg_border_l[1] |
| if A3_l is not None and B3_l is not None: |
| A3x_l, A3y_l = A3_l[0], A3_l[1] |
| B3x_l, B3y_l = B3_l[0], B3_l[1] |
| A3x_roi_l = A3x_l - x_min_l |
| A3y_roi_l = A3y_l - y_min_l |
| B3x_roi_l = B3x_l - x_min_l |
| B3y_roi_l = B3y_l - y_min_l |
| if _STEP5_2_8PX_HALFRES and roi_w_l >= 4 and roi_h_l >= 4: |
| dilate_8px_l = cv2.resize(dilate_roi_l, (roi_w_l // 2, roi_h_l // 2), interpolation=cv2.INTER_NEAREST) |
| roi_w_8_l, roi_h_8_l = roi_w_l // 2, roi_h_l // 2 |
| scale_8_l, seg_width_8_l = 0.5, 4 |
| else: |
| dilate_8px_l = dilate_roi_l |
| roi_w_8_l, roi_h_8_l = roi_w_l, roi_h_l |
| scale_8_l, seg_width_8_l = 1.0, 8 |
| mask_8_roi_l = np.zeros((roi_h_8_l, roi_w_8_l), dtype=np.uint8) |
| overlap_roi_l = np.empty((roi_h_8_l, roi_w_8_l), dtype=np.uint8) |
| best_count_8_l = -1 |
| best_s_l, best_t_l = 0, 0 |
| for s in range(-30, 31, _STEP5_2_8PX_COARSE_STEP): |
| for t in range(-30, 31, _STEP5_2_8PX_COARSE_STEP): |
| A4x_roi_l = A3x_roi_l + s * ux_l |
| A4y_roi_l = A3y_roi_l + s * uy_l |
| B4x_roi_l = B3x_roi_l + t * ux_l |
| B4y_roi_l = B3y_roi_l + t * uy_l |
| ax_d_l = int(round(A4x_roi_l * scale_8_l)) |
| ay_d_l = int(round(A4y_roi_l * scale_8_l)) |
| bx_d_l = int(round(B4x_roi_l * scale_8_l)) |
| by_d_l = int(round(B4y_roi_l * scale_8_l)) |
| mask_8_roi_l.fill(0) |
| cv2.line(mask_8_roi_l, (ax_d_l, ay_d_l), (bx_d_l, by_d_l), 255, seg_width_8_l) |
| cv2.bitwise_and(dilate_8px_l, mask_8_roi_l, overlap_roi_l) |
| count = cv2.countNonZero(overlap_roi_l) |
| if count > best_count_8_l: |
| best_count_8_l = count |
| best_s_l, best_t_l = s, t |
| if _STEP5_2_8PX_REFINE_PASS: |
| s_lo_l = max(-30, best_s_l - _STEP5_2_8PX_REFINE_WINDOW) |
| s_hi_l = min(31, best_s_l + _STEP5_2_8PX_REFINE_WINDOW + 1) |
| t_lo_l = max(-30, best_t_l - _STEP5_2_8PX_REFINE_WINDOW) |
| t_hi_l = min(31, best_t_l + _STEP5_2_8PX_REFINE_WINDOW + 1) |
| for s in range(s_lo_l, s_hi_l, 5): |
| for t in range(t_lo_l, t_hi_l, 5): |
| A4x_roi_l = A3x_roi_l + s * ux_l |
| A4y_roi_l = A3y_roi_l + s * uy_l |
| B4x_roi_l = B3x_roi_l + t * ux_l |
| B4y_roi_l = B3y_roi_l + t * uy_l |
| ax_d_l = int(round(A4x_roi_l * scale_8_l)) |
| ay_d_l = int(round(A4y_roi_l * scale_8_l)) |
| bx_d_l = int(round(B4x_roi_l * scale_8_l)) |
| by_d_l = int(round(B4y_roi_l * scale_8_l)) |
| mask_8_roi_l.fill(0) |
| cv2.line(mask_8_roi_l, (ax_d_l, ay_d_l), (bx_d_l, by_d_l), 255, seg_width_8_l) |
| cv2.bitwise_and(dilate_8px_l, mask_8_roi_l, overlap_roi_l) |
| count = cv2.countNonZero(overlap_roi_l) |
| if count > best_count_8_l: |
| best_count_8_l = count |
| best_s_l, best_t_l = s, t |
| A4_l = (A3x_l + best_s_l * ux_l, A3y_l + best_s_l * uy_l) |
| B4_l = (B3x_l + best_t_l * ux_l, B3y_l + best_t_l * uy_l) |
| a_ab_l, b_ab_l, c_ab_l = _line_through_two_points(A4_l[0], A4_l[1], B4_l[0], B4_l[1]) |
| kkp5_l = _line_intersection(a_ab_l, b_ab_l, c_ab_l, a3_l, b3_l, c3_l) |
| center_pts_l = [(float(kps[i][0]), float(kps[i][1])) for i in [13, 14, 15, 16] if i < len(kps) and _is_kp_valid(kps[i])] |
| if len(center_pts_l) >= 2: |
| line_13_16_l = _fit_line_to_points(center_pts_l) |
| else: |
| src_13_16_l = np.array([[_F1[i] for i in [13, 14, 15, 16]]], dtype=np.float32) |
| tkp_13_16_l = cv2.perspectiveTransform(src_13_16_l, H_use_l)[0] |
| center_pts_l = [(float(tkp_13_16_l[i][0]), float(tkp_13_16_l[i][1])) for i in range(4)] |
| line_13_16_l = _fit_line_to_points(center_pts_l) |
| kkp16_l = _line_intersection(a_ab_l, b_ab_l, c_ab_l, line_13_16_l[0], line_13_16_l[1], line_13_16_l[2]) if line_13_16_l is not None else None |
| if kkp5_l is not None: |
| kps[5] = [float(kkp5_l[0]), float(kkp5_l[1])] |
| if kkp16_l is not None: |
| kps[16] = [float(kkp16_l[0]), float(kkp16_l[1])] |
| if kkp16_l is not None and kkp16_l[0] < w: |
| pts_24_29_l = [(float(kps[i][0]), float(kps[i][1])) for i in [24, 25, 26, 27, 28, 29] if i < len(kps) and _is_kp_valid(kps[i])] |
| if len(pts_24_29_l) >= 2: |
| line_24_29_l = _fit_line_to_points(pts_24_29_l) |
| else: |
| src_24_29_l = np.array([[_F1[i] for i in [24, 25, 26, 27, 28, 29]]], dtype=np.float32) |
| tkp_24_29_l = cv2.perspectiveTransform(src_24_29_l, H_use_l)[0] |
| pts_24_29_l = [(float(tkp_24_29_l[i][0]), float(tkp_24_29_l[i][1])) for i in range(6)] |
| line_24_29_l = _fit_line_to_points(pts_24_29_l) |
| kkp29_l = _line_intersection(a_ab_l, b_ab_l, c_ab_l, line_24_29_l[0], line_24_29_l[1], line_24_29_l[2]) if line_24_29_l is not None else None |
| if kkp29_l is not None: |
| kps[29] = [float(kkp29_l[0]), float(kkp29_l[1])] |
| return (True, "left") |
|
|
| if not kp16_valid_input: |
| return (False, None) |
| x16, y16 = tkp_16[0], tkp_16[1] |
| valid_indices_52 = [] |
| valid_src_52 = [] |
| valid_dst_52 = [] |
| for idx, kp in enumerate(kps): |
| if not _is_kp_valid(kp): |
| continue |
| x, y = float(kp[0]), float(kp[1]) |
| valid_indices_52.append(idx) |
| valid_src_52.append(_F1[idx] if idx < len(_F1) else (0.0, 0.0)) |
| valid_dst_52.append((x, y)) |
| warped = None |
| ground_mask = None |
| H_use = H |
| try: |
| warped = cv2.warpPerspective(template_image, H_use, (w, h)) |
| ground_mask, _ = _y5(warped) |
| except _Xe as e: |
| err_msg = e.args[0] if e.args else "" |
| if err_msg in _MASK_RETRY_ERRORS and len(valid_indices_52) >= 4 and len(valid_dst_52) >= 4: |
| idx_smallest_y = min(range(len(valid_dst_52)), key=lambda i: valid_dst_52[i][1]) |
| x0, y0 = valid_dst_52[idx_smallest_y] |
| for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]: |
| new_dst = list(valid_dst_52) |
| new_dst[idx_smallest_y] = (x0 + dx, y0 + dy) |
| H_retry = _y10(valid_indices_52, valid_src_52, new_dst, {}) |
| if H_retry is None: |
| continue |
| try: |
| warped = cv2.warpPerspective(template_image, H_retry, (w, h)) |
| ground_mask, _ = _y5(warped) |
| H_use = H_retry |
| break |
| except _Xe: |
| continue |
| else: |
| warped = None |
| ground_mask = None |
| if warped is None or ground_mask is None: |
| return (False, None) |
| except Exception: |
| return (False, None) |
| if warped is None or ground_mask is None: |
| return (False, None) |
| dilate_uint8 = _dilate_uint8_full_frame(video_frame) |
| seg_width = 8 |
| mask = np.zeros((h, w), dtype=np.uint8) |
| overlap_buf = np.empty((h, w), dtype=np.uint8) |
| best_count = -1 |
| best_ay, best_by = ay, by |
| step = 5 |
| for t in range(-100, 101, step): |
| ay_new = ay + t |
| if abs(bx - ax) < 1e-12: |
| by_new = ay_new |
| else: |
| by_new = ay_new + (y16 - ay_new) * (bx - ax) / (x16 - ax) if abs(x16 - ax) >= 1e-12 else ay_new |
| a_pt = (int(round(ax)), int(round(ay_new))) |
| b_pt = (int(round(bx)), int(round(by_new))) |
| mask.fill(0) |
| cv2.line(mask, a_pt, b_pt, 255, seg_width) |
| cv2.bitwise_and(dilate_uint8, mask, overlap_buf) |
| count = cv2.countNonZero(overlap_buf) |
| if count > best_count: |
| best_count = count |
| best_ay, best_by = ay_new, by_new |
| for shift in range(-20, 21, 5): |
| ay_shift = best_ay + shift |
| by_shift = best_by + shift |
| a_pt = (int(round(ax)), int(round(ay_shift))) |
| b_pt = (int(round(bx)), int(round(by_shift))) |
| mask.fill(0) |
| cv2.line(mask, a_pt, b_pt, 255, seg_width) |
| cv2.bitwise_and(dilate_uint8, mask, overlap_buf) |
| count = cv2.countNonZero(overlap_buf) |
| if count > best_count: |
| best_count = count |
| best_ay, best_by = ay_shift, by_shift |
| a_final = (ax, best_ay) |
| b_final = (bx, best_by) |
| center_pts = [] |
| for i in [13, 14, 15, 16]: |
| if i < len(kps) and _is_kp_valid(kps[i]): |
| center_pts.append((float(kps[i][0]), float(kps[i][1]))) |
| line_center = _fit_line_to_points(center_pts) if len(center_pts) >= 2 else None |
| a_ab, b_ab, c_ab = _line_through_two_points(a_final[0], a_final[1], b_final[0], b_final[1]) |
| if line_center is not None: |
| a_c, b_c, c_c = line_center |
| inter = _line_intersection(a_c, b_c, c_c, a_ab, b_ab, c_ab) |
| if inter is not None: |
| x16, y16 = inter[0], inter[1] |
| d5 = math.hypot(tkp_5[0] - x16, tkp_5[1] - y16) |
| d29 = math.hypot(tkp_29[0] - x16, tkp_29[1] - y16) |
| dx_ab = b_final[0] - a_final[0] |
| dy_ab = b_final[1] - a_final[1] |
| len_ab = math.hypot(dx_ab, dy_ab) |
| if len_ab < 1e-12: |
| kkp5 = (x16, y16) |
| kkp29 = (x16, y16) |
| else: |
| ux = dx_ab / len_ab |
| uy = dy_ab / len_ab |
| kkp5_plus = (x16 + d5 * ux, y16 + d5 * uy) |
| kkp5_minus = (x16 - d5 * ux, y16 - d5 * uy) |
| dist_plus_to_a = math.hypot(kkp5_plus[0] - a_final[0], kkp5_plus[1] - a_final[1]) |
| dist_minus_to_a = math.hypot(kkp5_minus[0] - a_final[0], kkp5_minus[1] - a_final[1]) |
| kkp5 = kkp5_minus if dist_minus_to_a < dist_plus_to_a else kkp5_plus |
| kkp29_plus = (x16 + d29 * ux, y16 + d29 * uy) |
| kkp29_minus = (x16 - d29 * ux, y16 - d29 * uy) |
| dist_plus_to_b = math.hypot(kkp29_plus[0] - b_final[0], kkp29_plus[1] - b_final[1]) |
| dist_minus_to_b = math.hypot(kkp29_minus[0] - b_final[0], kkp29_minus[1] - b_final[1]) |
| kkp29 = kkp29_minus if dist_minus_to_b < dist_plus_to_b else kkp29_plus |
| kps[5] = [kkp5[0], kkp5[1]] |
| kps[29] = [kkp29[0], kkp29[1]] |
| kps[16] = [x16, y16] |
| return (True, None) |
|
|
|
|
| def _refine_kp4_kp12( |
| kps: list[list[float]], |
| H: ndarray, |
| video_frame: ndarray, |
| template_image: ndarray, |
| ) -> bool: |
| if len(kps) <= 12: |
| return False |
| if not _is_kp_valid(kps[12]) or _is_kp_valid(kps[4]): |
| return False |
| h, w = video_frame.shape[:2] |
| src_pt4 = np.array([_F1[4]], dtype=np.float32).reshape(1, 1, 2) |
| inferred_4 = cv2.perspectiveTransform(src_pt4, H)[0, 0] |
| kp4_x, kp4_y = float(inferred_4[0]), float(inferred_4[1]) |
| kp12_x = float(kps[12][0]) |
| kp12_y = float(kps[12][1]) |
| try: |
| warped = cv2.warpPerspective(template_image, H, (w, h)) |
| ground_mask, _ = _y5(warped) |
| except _Xe: |
| return False |
| dilate_image = _y6(video_frame, ground_mask) |
| dilate_uint8 = (dilate_image.astype(np.uint8)) * 255 |
| y4_lo = max(0, int(kp4_y) - 50) |
| y4_hi = min(h - 1, int(kp4_y) + 50) |
| y12_lo = max(0, int(kp12_y) - 50) |
| y12_hi = min(h - 1, int(kp12_y) + 50) |
| step = 5 |
| best_count = -1 |
| best_y4 = int(kp4_y) |
| best_y12 = int(kp12_y) |
| seg_width = 5 |
| mask = np.zeros((h, w), dtype=np.uint8) |
| overlap_buf = np.empty((h, w), dtype=np.uint8) |
| for y4 in range(y4_lo, min(y4_hi + 1, y4_lo + ((y4_hi - y4_lo) // step) * step + 1), step): |
| for y12 in range(y12_lo, min(y12_hi + 1, y12_lo + ((y12_hi - y12_lo) // step) * step + 1), step): |
| p1 = (int(round(kp4_x)), y4) |
| p2 = (int(round(kp12_x)), y12) |
| mask.fill(0) |
| cv2.line(mask, p1, p2, 255, seg_width) |
| cv2.bitwise_and(dilate_uint8, mask, overlap_buf) |
| count = cv2.countNonZero(overlap_buf) |
| if count > best_count: |
| best_count = count |
| best_y4 = y4 |
| best_y12 = y12 |
| kkp4 = (kp4_x, float(best_y4)) |
| kkp12 = (kp12_x, float(best_y12)) |
| line_ext = _line_through_two_points(kkp4[0], kkp4[1], kkp12[0], kkp12[1]) |
| pts1 = [] |
| for i in [0, 1, 2, 3, 4]: |
| if i < len(kps) and _is_kp_valid(kps[i]): |
| pts1.append((float(kps[i][0]), float(kps[i][1]))) |
| if len(pts1) < 2: |
| return False |
| line1 = _fit_line_to_points(pts1) |
| if line1 is None: |
| return False |
| pts2 = [] |
| for i in [9, 10, 11, 12]: |
| if i < len(kps) and _is_kp_valid(kps[i]): |
| pts2.append((float(kps[i][0]), float(kps[i][1]))) |
| if len(pts2) < 2: |
| return False |
| line2 = _fit_line_to_points(pts2) |
| if line2 is None: |
| return False |
| a1, b1, c1 = line1 |
| a2, b2, c2 = line2 |
| inter1 = _line_intersection(a1, b1, c1, line_ext[0], line_ext[1], line_ext[2]) |
| inter2 = _line_intersection(a2, b2, c2, line_ext[0], line_ext[1], line_ext[2]) |
| if inter1 is None or inter2 is None: |
| return False |
| kps[4] = [inter1[0], inter1[1]] |
| kps[12] = [inter2[0], inter2[1]] |
| return True |
|
|
|
|
| def _refine_kp20_kp28( |
| kps: list[list[float]], |
| H: ndarray, |
| video_frame: ndarray, |
| template_image: ndarray, |
| ) -> bool: |
| if len(kps) <= 28: |
| return False |
| if not _is_kp_valid(kps[20]) or _is_kp_valid(kps[28]): |
| return False |
| h, w = video_frame.shape[:2] |
| src_pt28 = np.array([_F1[28]], dtype=np.float32).reshape(1, 1, 2) |
| inferred_28 = cv2.perspectiveTransform(src_pt28, H)[0, 0] |
| kp28_x, kp28_y = float(inferred_28[0]), float(inferred_28[1]) |
| kp20_x = float(kps[20][0]) |
| kp20_y = float(kps[20][1]) |
| try: |
| warped = cv2.warpPerspective(template_image, H, (w, h)) |
| ground_mask, _ = _y5(warped) |
| except _Xe: |
| return False |
| dilate_image = _y6(video_frame, ground_mask) |
| dilate_uint8 = (dilate_image.astype(np.uint8)) * 255 |
| y28_lo = max(0, int(kp28_y) - 50) |
| y28_hi = min(h - 1, int(kp28_y) + 50) |
| y20_lo = max(0, int(kp20_y) - 50) |
| y20_hi = min(h - 1, int(kp20_y) + 50) |
| step = 5 |
| best_count = -1 |
| best_y28 = int(kp28_y) |
| best_y20 = int(kp20_y) |
| seg_width = 5 |
| mask = np.zeros((h, w), dtype=np.uint8) |
| overlap_buf = np.empty((h, w), dtype=np.uint8) |
| for y28 in range(y28_lo, min(y28_hi + 1, y28_lo + ((y28_hi - y28_lo) // step) * step + 1), step): |
| for y20 in range(y20_lo, min(y20_hi + 1, y20_lo + ((y20_hi - y20_lo) // step) * step + 1), step): |
| p1 = (int(round(kp28_x)), y28) |
| p2 = (int(round(kp20_x)), y20) |
| mask.fill(0) |
| cv2.line(mask, p1, p2, 255, seg_width) |
| cv2.bitwise_and(dilate_uint8, mask, overlap_buf) |
| count = cv2.countNonZero(overlap_buf) |
| if count > best_count: |
| best_count = count |
| best_y28 = y28 |
| best_y20 = y20 |
| kkp28 = (kp28_x, float(best_y28)) |
| kkp20 = (kp20_x, float(best_y20)) |
| line_ext = _line_through_two_points(kkp28[0], kkp28[1], kkp20[0], kkp20[1]) |
| pts1 = [] |
| for i in [24, 25, 26, 27, 28]: |
| if i < len(kps) and _is_kp_valid(kps[i]): |
| pts1.append((float(kps[i][0]), float(kps[i][1]))) |
| if len(pts1) < 2: |
| return False |
| line1 = _fit_line_to_points(pts1) |
| if line1 is None: |
| return False |
| pts2 = [] |
| for i in [17, 18, 19, 20]: |
| if i < len(kps) and _is_kp_valid(kps[i]): |
| pts2.append((float(kps[i][0]), float(kps[i][1]))) |
| if len(pts2) < 2: |
| return False |
| line2 = _fit_line_to_points(pts2) |
| if line2 is None: |
| return False |
| a1, b1, c1 = line1 |
| a2, b2, c2 = line2 |
| inter1 = _line_intersection(a1, b1, c1, line_ext[0], line_ext[1], line_ext[2]) |
| inter2 = _line_intersection(a2, b2, c2, line_ext[0], line_ext[1], line_ext[2]) |
| if inter1 is None or inter2 is None: |
| return False |
| kps[28] = [inter1[0], inter1[1]] |
| kps[20] = [inter2[0], inter2[1]] |
| return True |
|
|
|
|
| def _z0( |
| kps: list[Any], |
| video_frame: ndarray, |
| template_image: ndarray, |
| ) -> list[list[float]] | None: |
| if not isinstance(kps, list) or len(kps) != _N0: |
| return None |
| h, w = video_frame.shape[:2] |
| frame_width, frame_height = w, h |
|
|
| def _collect_valid( |
| kps_list: list[Any], |
| step52_decision: str | None, |
| ) -> tuple[list[int], list[tuple[float, float]], list[tuple[float, float]]]: |
| vi: list[int] = [] |
| vs: list[tuple[float, float]] = [] |
| vd: list[tuple[float, float]] = [] |
| kp16_x: float | None = None |
| if len(kps_list) > 16 and isinstance(kps_list[16], (list, tuple)) and len(kps_list[16]) >= 1: |
| try: |
| kp16_x = float(kps_list[16][0]) |
| except (TypeError, ValueError): |
| pass |
| for idx, kp in enumerate(kps_list): |
| if not isinstance(kp, (list, tuple)) or len(kp) < 2: |
| continue |
| try: |
| x, y = float(kp[0]), float(kp[1]) |
| except (TypeError, ValueError): |
| continue |
| if x == 0.0 and y == 0.0: |
| continue |
| if idx not in _ALWAYS_INCLUDE_INDICES: |
| if x < 0 or x > frame_width or y < 0 or y > frame_height: |
| continue |
| if idx == 5 and x > frame_width: |
| continue |
| if idx == 29 and x < 0: |
| continue |
| if step52_decision == "left" and kp16_x is not None and kp16_x > frame_width and idx == 29: |
| continue |
| if step52_decision == "right" and kp16_x is not None and kp16_x < 0 and idx == 5: |
| continue |
| vi.append(idx) |
| if idx < len(_F1): |
| vs.append(_F1[idx]) |
| vd.append((x, y)) |
| return (vi, vs, vd) |
|
|
| valid_indices, valid_src, valid_dst = _collect_valid(kps, None) |
| if len(valid_src) < 4: |
| return None |
|
|
| H0 = _y10(valid_indices, valid_src, valid_dst, {}) |
| if H0 is not None: |
| score0, H0_used, dst_retry = _y11( |
| H0, template_image, video_frame, |
| valid_indices, valid_src, valid_dst, {}, |
| ) |
| if dst_retry is not None and H0_used is not None: |
| for i, idx in enumerate(valid_indices): |
| if idx < len(kps): |
| kps[idx] = [float(dst_retry[i][0]), float(dst_retry[i][1])] |
| valid_indices, valid_src, valid_dst = _collect_valid(kps, None) |
| H0 = H0_used |
| else: |
| score0 = 0.0 |
|
|
| refined = False |
| step52_decision: str | None = None |
| if H0 is not None: |
| refined = _refine_kp4_kp12(kps, H0, video_frame, template_image) or refined |
| refined = _refine_kp20_kp28(kps, H0, video_frame, template_image) or refined |
| dilate_uint8 = _dilate_uint8_full_frame(video_frame) |
| warp_52: ndarray | None = None |
| ground_mask_52: ndarray | None = None |
| try: |
| warp_52 = cv2.warpPerspective(template_image, H0, (frame_width, frame_height)) |
| ground_mask_52, _ = _y5(warp_52) |
| except _Xe: |
| pass |
| step52_refined, step52_decision = _refine_kp5_kp16_kp29( |
| kps, H0, video_frame, template_image, |
| precomputed_dilate_uint8=dilate_uint8, |
| precomputed_warped=warp_52, |
| precomputed_ground_mask=ground_mask_52, |
| ) |
| refined = refined or step52_refined |
|
|
| if refined: |
| valid_indices, valid_src, valid_dst = _collect_valid(kps, step52_decision) |
| if len(valid_src) < 4: |
| valid_indices, valid_src, valid_dst = _collect_valid(kps, None) |
|
|
| if len(valid_src) < 4: |
| if H0 is not None: |
| src_all = np.array(_F1, dtype=np.float32).reshape(1, -1, 2) |
| projected = cv2.perspectiveTransform(src_all, H0)[0] |
| return [[float(projected[i][0]), float(projected[i][1])] for i in range(_N0)] |
| return None |
|
|
| w1, w2, w3 = _y7(), _y8(), _y9() |
| H1 = _y10(valid_indices, valid_src, valid_dst, w1) |
| H2 = _y10(valid_indices, valid_src, valid_dst, w2) |
| valid_set = set(valid_indices) |
| if valid_set.isdisjoint(_INDICES_H3_VS_H1): |
| H3 = H1 |
| elif valid_set.isdisjoint(_INDICES_H3_VS_H2): |
| H3 = H2 |
| else: |
| H3 = _y10(valid_indices, valid_src, valid_dst, w3) |
| score1 = _y11(H1, template_image, video_frame)[0] if H1 is not None else 0.0 |
| score2 = _y11(H2, template_image, video_frame)[0] if H2 is not None else 0.0 |
| score3 = _y11(H3, template_image, video_frame)[0] if H3 is not None else 0.0 |
| best_H = H0 |
| best_score = score0 |
| if H1 is not None and score1 > best_score: |
| best_H, best_score = H1, score1 |
| if H2 is not None and score2 > best_score: |
| best_H, best_score = H2, score2 |
| if H3 is not None and score3 > best_score: |
| best_H = H3 |
| if best_H is None: |
| return None |
| src_all = np.array(_F1, dtype=np.float32).reshape(1, -1, 2) |
| projected = cv2.perspectiveTransform(src_all, best_H)[0] |
| return [[float(projected[i][0]), float(projected[i][1])] for i in range(_N0)] |
|
|
|
|
| def _z1( |
| kps: list[Any], |
| frame_width: int, |
| frame_height: int, |
| fill_missing: bool, |
| ) -> list[list[float]] | None: |
| if not isinstance(kps, list) or len(kps) != _N0 or frame_width <= 0 or frame_height <= 0: |
| return None |
| filtered_src: list[tuple[float, float]] = [] |
| filtered_dst: list[tuple[float, float]] = [] |
| valid_indices: list[int] = [] |
| for idx, kp in enumerate(kps): |
| if not isinstance(kp, (list, tuple)) or len(kp) < 2: |
| continue |
| try: |
| x, y = float(kp[0]), float(kp[1]) |
| except (TypeError, ValueError): |
| continue |
| if x == 0.0 and y == 0.0: |
| continue |
| if idx >= len(_F1): |
| continue |
| filtered_src.append(_F1[idx]) |
| filtered_dst.append((x, y)) |
| valid_indices.append(idx) |
| if len(filtered_src) < 4: |
| return None |
| src_np = np.array(filtered_src, dtype=np.float32) |
| dst_np = np.array(filtered_dst, dtype=np.float32) |
| H_corrected, _ = cv2.findHomography(src_np, dst_np) |
| if H_corrected is None: |
| return None |
| fk_np = np.array(_F0, dtype=np.float32).reshape(1, -1, 2) |
| projected_np = cv2.perspectiveTransform(fk_np, H_corrected)[0] |
| valid_indices_set = set(valid_indices) |
| adjusted_kps: list[list[float]] = [[0.0, 0.0] for _ in range(_N0)] |
| for idx in range(_N0): |
| x, y = float(projected_np[idx][0]), float(projected_np[idx][1]) |
| if not (0 <= x < frame_width and 0 <= y < frame_height): |
| continue |
| if fill_missing or idx in valid_indices_set: |
| adjusted_kps[idx] = [x, y] |
| return adjusted_kps |
|
|
|
|
| def _z2( |
| keypoints: list[list[float]], |
| video_frame: ndarray, |
| template_image: ndarray, |
| ) -> float: |
| score, _ = _z2_score_and_kps(keypoints, video_frame, template_image) |
| return score |
|
|
|
|
| def _z2_score_and_kps( |
| keypoints: list[list[float]], |
| video_frame: ndarray, |
| template_image: ndarray, |
| ) -> tuple[float, list[list[float]] | None]: |
| if not isinstance(keypoints, list) or len(keypoints) != _N0: |
| return (0.0, None) |
| valid_indices: list[int] = [] |
| valid_src: list[tuple[float, float]] = [] |
| valid_dst: list[tuple[float, float]] = [] |
| for idx, kp in enumerate(keypoints): |
| if not isinstance(kp, (list, tuple)) or len(kp) < 2: |
| continue |
| try: |
| x, y = float(kp[0]), float(kp[1]) |
| except (TypeError, ValueError): |
| continue |
| if x == 0.0 and y == 0.0: |
| continue |
| if idx >= len(_F1): |
| continue |
| valid_indices.append(idx) |
| valid_src.append(_F1[idx]) |
| valid_dst.append((x, y)) |
| if len(valid_src) < 4: |
| return (0.0, None) |
| H = _y10(valid_indices, valid_src, valid_dst, {}) |
| if H is None: |
| return (0.0, None) |
| score, H_used, new_dst = _y11( |
| H, template_image, video_frame, |
| valid_indices, valid_src, valid_dst, {}, |
| ) |
| if new_dst is not None and H_used is not None: |
| new_keypoints = [list(kp) if isinstance(kp, (list, tuple)) else [0.0, 0.0] for kp in keypoints] |
| if len(new_keypoints) != _N0: |
| new_keypoints = (new_keypoints + [[0.0, 0.0]] * _N0)[:_N0] |
| for i, idx in enumerate(valid_indices): |
| if idx < len(new_keypoints) and i < len(new_dst): |
| new_keypoints[idx] = [float(new_dst[i][0]), float(new_dst[i][1])] |
| return (score, new_keypoints) |
| return (score, None) |
|
|
|
|
| def _z3(kps: list[Any]) -> dict[int, tuple[float, float]]: |
| out: dict[int, tuple[float, float]] = {} |
| for idx, kp in enumerate(kps): |
| if not isinstance(kp, (list, tuple)) or len(kp) < 2: |
| continue |
| try: |
| x, y = float(kp[0]), float(kp[1]) |
| except (TypeError, ValueError): |
| continue |
| if x != 0.0 or y != 0.0: |
| out[idx] = (x, y) |
| return out |
|
|
|
|
| def _z4( |
| a: dict[int, tuple[float, float]], |
| b: dict[int, tuple[float, float]], |
| threshold: float, |
| ) -> int: |
| count = 0 |
| for idx, (ax, ay) in a.items(): |
| if idx not in b: |
| continue |
| bx, by = b[idx] |
| if ((ax - bx) ** 2 + (ay - by) ** 2) ** 0.5 <= threshold: |
| count += 1 |
| return count |
|
|
|
|
| def _z5(a: list[Any], b: list[Any]) -> list[int]: |
| out: list[int] = [] |
| for i in range(min(len(a), len(b))): |
| ka, kb = a[i], b[i] |
| if not (isinstance(ka, (list, tuple)) and len(ka) >= 2): |
| continue |
| if not (isinstance(kb, (list, tuple)) and len(kb) >= 2): |
| continue |
| if float(ka[0]) == 0.0 and float(ka[1]) == 0.0: |
| continue |
| if float(kb[0]) == 0.0 and float(kb[1]) == 0.0: |
| continue |
| out.append(i) |
| return out |
|
|
|
|
| def _z6( |
| a: list[Any], |
| b: list[Any], |
| frame_width: int, |
| frame_height: int, |
| ) -> list[int]: |
| out: list[int] = [] |
| for i in range(min(len(a), len(b))): |
| ka, kb = a[i], b[i] |
| if not (isinstance(ka, (list, tuple)) and len(ka) >= 2): |
| continue |
| if not (isinstance(kb, (list, tuple)) and len(kb) >= 2): |
| continue |
| xa, ya = float(ka[0]), float(ka[1]) |
| xb, yb = float(kb[0]), float(kb[1]) |
| if xa == 0.0 and ya == 0.0: |
| continue |
| if xb == 0.0 and yb == 0.0: |
| continue |
| if not (0 <= xa < frame_width and 0 <= ya < frame_height): |
| continue |
| if not (0 <= xb < frame_width and 0 <= yb < frame_height): |
| continue |
| out.append(i) |
| return out |
|
|
|
|
| def _z7( |
| batch_frame_ids: list[int], |
| keypoints_by_frame: dict[int, list[list[float]]], |
| ) -> list[list[int]]: |
| id_kps: list[tuple[int, list[list[float]]]] = [] |
| for fid in batch_frame_ids: |
| kps = keypoints_by_frame.get(fid) |
| if not kps: |
| continue |
| vkps = _z3(kps) |
| if vkps: |
| id_kps.append((fid, kps)) |
| id_kps.sort(key=lambda t: t[0]) |
| segments: list[list[int]] = [] |
| if not id_kps: |
| return segments |
| current_segment: list[int] = [id_kps[0][0]] |
| prev_vkps = _z3(id_kps[0][1]) |
| for i in range(1, len(id_kps)): |
| fid, kps = id_kps[i] |
| cur_vkps = _z3(kps) |
| common = _z4(prev_vkps, cur_vkps, _J5) |
| if common >= _J6: |
| current_segment.append(fid) |
| else: |
| segments.append(current_segment) |
| current_segment = [fid] |
| prev_vkps = cur_vkps |
| segments.append(current_segment) |
| return segments |
|
|
|
|
| def _z8( |
| keypoints_by_frame: dict[int, list[list[float]]], |
| images: list[ndarray], |
| offset: int, |
| template_image: ndarray, |
| ) -> int: |
| if not _J1 or not images or len(images) < _Z8_MIN_BATCH_FRAMES: |
| return 0 |
| batch_frame_ids = [offset + i for i in range(len(images))] |
| score_map: dict[int, float] = {} |
| for i, fid in enumerate(batch_frame_ids): |
| kps = keypoints_by_frame.get(fid) |
| if not kps or len(kps) != _N0: |
| score_map[fid] = 0.0 |
| continue |
| score_map[fid] = _z2(kps, images[i], template_image) |
| sorted_ids = sorted(score_map.keys()) |
| if not sorted_ids: |
| return 0 |
| segments = _z7(batch_frame_ids, keypoints_by_frame) |
| frame_to_seg: dict[int, int] = {} |
| for seg_idx, seg in enumerate(segments): |
| for fid in seg: |
| frame_to_seg[fid] = seg_idx |
| frame_width = images[0].shape[1] if images else 0 |
| frame_height = images[0].shape[0] if images else 0 |
| total_updated = 0 |
| for threshold in _J2: |
| problematic = [fid for fid in sorted_ids if score_map[fid] < threshold] |
| if not problematic: |
| continue |
| problematic = problematic[:_Z8_MAX_PROBLEMATIC_PER_BATCH] |
| segments_seen: dict[tuple[int, int], tuple[list[Any], list[Any], set[int]]] = {} |
| for problem_id in problematic: |
| backward_id: int | None = None |
| for fid in reversed(sorted_ids): |
| if fid < problem_id and score_map[fid] >= threshold: |
| backward_id = fid |
| break |
| forward_id: int | None = None |
| for fid in sorted_ids: |
| if fid > problem_id and score_map[fid] >= threshold: |
| forward_id = fid |
| break |
| if backward_id is None or forward_id is None: |
| continue |
| if frame_to_seg.get(backward_id) != frame_to_seg.get(forward_id): |
| continue |
| if forward_id - backward_id > _J3: |
| continue |
| bwd_kps = keypoints_by_frame.get(backward_id) or [] |
| fwd_kps = keypoints_by_frame.get(forward_id) or [] |
| if frame_width > 0 and frame_height > 0: |
| common_set = set(_z6(bwd_kps, fwd_kps, frame_width, frame_height)) |
| else: |
| common_set = set(_z5(bwd_kps, fwd_kps)) |
| if len(common_set) < 4: |
| continue |
| key = (backward_id, forward_id) |
| if key not in segments_seen: |
| segments_seen[key] = (bwd_kps, fwd_kps, common_set) |
| already_rewritten: set[int] = set() |
| for (backward_id, forward_id), (bwd_kps, fwd_kps, common_set) in segments_seen.items(): |
| gap = forward_id - backward_id |
| if gap <= 0: |
| continue |
| for interp_id in range(backward_id + 1, forward_id): |
| if interp_id not in batch_frame_ids or interp_id in already_rewritten: |
| continue |
| local_idx = interp_id - offset |
| if local_idx < 0 or local_idx >= len(images): |
| continue |
| video_frame = images[local_idx] |
| weight = (interp_id - backward_id) / gap |
| max_len = max(len(bwd_kps), len(fwd_kps), _N0) |
| new_kps: list[list[float]] = [] |
| for i in range(max_len): |
| if i in common_set and i < len(bwd_kps) and i < len(fwd_kps): |
| bx = float(bwd_kps[i][0]) |
| by = float(bwd_kps[i][1]) |
| fx = float(fwd_kps[i][0]) |
| fy = float(fwd_kps[i][1]) |
| new_kps.append([bx + (fx - bx) * weight, by + (fy - by) * weight]) |
| else: |
| new_kps.append([0.0, 0.0]) |
| if len(new_kps) < _N0: |
| new_kps.extend([[0.0, 0.0]] * (_N0 - len(new_kps))) |
| else: |
| new_kps = new_kps[:_N0] |
| before_score = score_map.get(interp_id, 0.0) |
| new_score, kps_to_apply = _z2_score_and_kps(new_kps, video_frame, template_image) |
| if new_score <= before_score: |
| continue |
| keypoints_by_frame[interp_id] = kps_to_apply if kps_to_apply is not None else new_kps |
| score_map[interp_id] = new_score |
| already_rewritten.add(interp_id) |
| total_updated += 1 |
| return total_updated |
|
|
|
|
| class _Bx(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
| team_id: str | None = None |
|
|
|
|
| class _FRes(BaseModel): |
| frame_id: int |
| boxes: List[Dict[str, Any]] |
| keypoints: List[List[float]] |
|
|
|
|
| _FRes.model_rebuild() |
|
|
|
|
| class _Cfg: |
| def __init__(self, min_area: int = 1300, overlap_iou: float = 0.91): |
| self.overlap_iou = overlap_iou |
|
|
|
|
| def _d1(bb: _Bx, cy: float) -> float: |
| my = 0.5 * (float(bb.y1) + float(bb.y2)) |
| return (my - cy) ** 2 |
|
|
|
|
| def _i1(a: _Bx, b: _Bx) -> float: |
| ax1, ay1, ax2, ay2 = int(a.x1), int(a.y1), int(a.x2), int(a.y2) |
| bx1, by1, bx2, by2 = int(b.x1), int(b.y1), int(b.x2), int(b.y2) |
| ix1, iy1 = max(ax1, bx1), max(ay1, by1) |
| ix2, iy2 = min(ax2, bx2), min(ay2, by2) |
| iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) |
| inter = iw * ih |
| if inter <= 0: |
| return 0.0 |
| area_a = (ax2 - ax1) * (ay2 - ay1) |
| area_b = (bx2 - bx1) * (by2 - by1) |
| union = area_a + area_b - inter |
| return inter / union if union > 0 else 0.0 |
|
|
|
|
| def _iou_box4(a: tuple[float, float, float, float], b: tuple[float, float, float, float]) -> float: |
| ax1, ay1, ax2, ay2 = a |
| bx1, by1, bx2, by2 = b |
| ix1, iy1 = max(ax1, bx1), max(ay1, by1) |
| ix2, iy2 = min(ax2, bx2), min(ay2, by2) |
| iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1) |
| inter = iw * ih |
| if inter <= 0: |
| return 0.0 |
| area_a = (ax2 - ax1) * (ay2 - ay1) |
| area_b = (bx2 - bx1) * (by2 - by1) |
| union = area_a + area_b - inter |
| return inter / union if union > 0 else 0.0 |
|
|
|
|
| def _match_tracks_detections( |
| prev_list: list[tuple[int, tuple[float, float, float, float]]], |
| curr_boxes: list[tuple[float, float, float, float]], |
| iou_thresh: float, |
| exclude_prev: set[int], |
| exclude_curr: set[int], |
| ) -> list[tuple[int, int]]: |
| prev_filtered = [(pi, tid, pbox) for pi, (tid, pbox) in enumerate(prev_list) if pi not in exclude_prev] |
| curr_filtered = [(ci, cbox) for ci, cbox in enumerate(curr_boxes) if ci not in exclude_curr] |
| if not prev_filtered or not curr_filtered: |
| return [] |
| n_prev, n_curr = len(prev_filtered), len(curr_filtered) |
| iou_mat = np.zeros((n_prev, n_curr), dtype=np.float64) |
| for i, (_, _, pbox) in enumerate(prev_filtered): |
| for j, (_, cbox) in enumerate(curr_filtered): |
| iou_mat[i, j] = _iou_box4(pbox, cbox) |
| cost = 1.0 - iou_mat |
| cost[iou_mat < iou_thresh] = 1e9 |
| if _linear_sum_assignment is not None: |
| row_ind, col_ind = _linear_sum_assignment(cost) |
| matches = [ |
| (prev_filtered[row_ind[k]][0], curr_filtered[col_ind[k]][0]) |
| for k in range(len(row_ind)) |
| if cost[row_ind[k], col_ind[k]] < 1.0 |
| ] |
| else: |
| matches = [] |
| iou_pairs = [ |
| (iou_mat[i, j], i, j) |
| for i in range(n_prev) |
| for j in range(n_curr) |
| if iou_mat[i, j] >= iou_thresh |
| ] |
| iou_pairs.sort(key=lambda x: -x[0]) |
| used_prev, used_curr = set(), set() |
| for _, i, j in iou_pairs: |
| pi = prev_filtered[i][0] |
| ci = curr_filtered[j][0] |
| if pi in used_prev or ci in used_curr: |
| continue |
| matches.append((pi, ci)) |
| used_prev.add(pi) |
| used_curr.add(ci) |
| return matches |
|
|
|
|
| def _predict_box(prev: tuple[float, float, float, float], last: tuple[float, float, float, float]) -> tuple[float, float, float, float]: |
| px1, py1, px2, py2 = prev |
| lx1, ly1, lx2, ly2 = last |
| pcx = 0.5 * (px1 + px2) |
| pcy = 0.5 * (py1 + py2) |
| lcx = 0.5 * (lx1 + lx2) |
| lcy = 0.5 * (ly1 + ly2) |
| w = lx2 - lx1 |
| h = ly2 - ly1 |
| ncx = 2.0 * lcx - pcx |
| ncy = 2.0 * lcy - pcy |
| return (ncx - w * 0.5, ncy - h * 0.5, ncx + w * 0.5, ncy + h * 0.5) |
|
|
|
|
| def _assign_person_track_ids( |
| prev_state: dict[int, tuple[tuple[float, float, float, float], tuple[float, float, float, float], int]], |
| next_id: int, |
| results: list, |
| iou_thresh: float = _TRACK_IOU_THRESH, |
| iou_high: float = _TRACK_IOU_HIGH, |
| iou_low: float = _TRACK_IOU_LOW, |
| max_age: int = _TRACK_MAX_AGE, |
| use_velocity: bool = _TRACK_USE_VELOCITY, |
| ) -> tuple[dict[int, tuple[tuple[float, float, float, float], tuple[float, float, float, float], int]], int, list[list[int]]]: |
| state = {tid: (prev_box, last_box, age) for tid, (prev_box, last_box, age) in prev_state.items()} |
| nid = next_id |
| ids_per_result: list[list[int]] = [] |
| for result in results: |
| if getattr(result, "boxes", None) is None or len(result.boxes) == 0: |
| state = { |
| tid: (prev_box, last_box, age + 1) |
| for tid, (prev_box, last_box, age) in state.items() |
| if age + 1 <= max_age |
| } |
| ids_per_result.append([]) |
| continue |
| b = result.boxes |
| xyxy = b.xyxy.cpu().numpy() |
| curr_boxes = [tuple(float(x) for x in row) for row in xyxy] |
| prev_list: list[tuple[int, tuple[float, float, float, float]]] = [] |
| for tid, (prev_box, last_box, _age) in state.items(): |
| if use_velocity and (prev_box != last_box): |
| pbox = _predict_box(prev_box, last_box) |
| else: |
| pbox = last_box |
| prev_list.append((tid, pbox)) |
| stage1 = _match_tracks_detections(prev_list, curr_boxes, iou_high, set(), set()) |
| assigned_prev = {pi for pi, _ in stage1} |
| assigned_curr = {ci for _, ci in stage1} |
| stage2 = _match_tracks_detections(prev_list, curr_boxes, iou_low, assigned_prev, assigned_curr) |
| for pi, ci in stage2: |
| assigned_prev.add(pi) |
| assigned_curr.add(ci) |
| tid_per_curr: dict[int, int] = {} |
| for pi, ci in stage1 + stage2: |
| tid_per_curr[ci] = prev_list[pi][0] |
| ids: list[int] = [] |
| new_state: dict[int, tuple[tuple[float, float, float, float], tuple[float, float, float, float], int]] = {} |
| for ci, cbox in enumerate(curr_boxes): |
| if ci in tid_per_curr: |
| tid = tid_per_curr[ci] |
| _prev, last_box, _ = state[tid] |
| new_state[tid] = (last_box, cbox, 0) |
| else: |
| tid = nid |
| nid += 1 |
| new_state[tid] = (cbox, cbox, 0) |
| ids.append(tid) |
| for pi in range(len(prev_list)): |
| if pi in assigned_prev: |
| continue |
| tid = prev_list[pi][0] |
| prev_box, last_box, age = state[tid] |
| if age + 1 <= max_age: |
| new_state[tid] = (prev_box, last_box, age + 1) |
| state = new_state |
| ids_per_result.append(ids) |
| return (state, nid, ids_per_result) |
|
|
|
|
| def _s0( |
| results: list[_FRes], |
| window: int = _S0, |
| tids_by_frame: dict[int, list[int | None]] | None = None, |
| ) -> list[_FRes]: |
| if window <= 1 or not results: |
| return results |
| fid_to_idx = {r.frame_id: i for i, r in enumerate(results)} |
| trajectories: dict[int, list[tuple[int, int, _Bx]]] = {} |
| for i, r in enumerate(results): |
| boxes_as_bx = [_Bx(**b) if isinstance(b, dict) else b for b in r.boxes] |
| for j, bb in enumerate(boxes_as_bx): |
| tid = tids_by_frame.get(r.frame_id, [None] * len(r.boxes))[j] if tids_by_frame else None |
| if tid is not None and tid >= 0: |
| tid = int(tid) |
| if tid not in trajectories: |
| trajectories[tid] = [] |
| trajectories[tid].append((r.frame_id, j, bb)) |
| smoothed: dict[tuple[int, int], tuple[int, int, int, int]] = {} |
| half = window // 2 |
| for tid, items in trajectories.items(): |
| items.sort(key=lambda x: x[0]) |
| n = len(items) |
| for k in range(n): |
| fid, box_idx, bb = items[k] |
| result_idx = fid_to_idx[fid] |
| lo = max(0, k - half) |
| hi = min(n, k + half + 1) |
| cx_list = [] |
| cy_list = [] |
| w_list = [] |
| h_list = [] |
| for m in range(lo, hi): |
| b = items[m][2] |
| cx_list.append(0.5 * (b.x1 + b.x2)) |
| cy_list.append(0.5 * (b.y1 + b.y2)) |
| w_list.append(b.x2 - b.x1) |
| h_list.append(b.y2 - b.y1) |
| cx_avg = sum(cx_list) / len(cx_list) |
| cy_avg = sum(cy_list) / len(cy_list) |
| w_avg = sum(w_list) / len(w_list) |
| h_avg = sum(h_list) / len(h_list) |
| x1_new = int(round(cx_avg - w_avg / 2)) |
| y1_new = int(round(cy_avg - h_avg / 2)) |
| x2_new = int(round(cx_avg + w_avg / 2)) |
| y2_new = int(round(cy_avg + h_avg / 2)) |
| smoothed[(result_idx, box_idx)] = (x1_new, y1_new, x2_new, y2_new) |
| out: list[_FRes] = [] |
| for i, r in enumerate(results): |
| boxes_as_bx = [_Bx(**b) if isinstance(b, dict) else b for b in r.boxes] |
| new_boxes: list[_Bx] = [] |
| for j, bb in enumerate(boxes_as_bx): |
| key = (i, j) |
| if key in smoothed: |
| x1, y1, x2, y2 = smoothed[key] |
| new_boxes.append( |
| _Bx( |
| x1=x1, |
| y1=y1, |
| x2=x2, |
| y2=y2, |
| cls_id=int(bb.cls_id), |
| conf=float(bb.conf), |
| team_id=bb.team_id, |
| ) |
| ) |
| else: |
| new_boxes.append( |
| _Bx( |
| x1=int(bb.x1), |
| y1=int(bb.y1), |
| x2=int(bb.x2), |
| y2=int(bb.y2), |
| cls_id=int(bb.cls_id), |
| conf=float(bb.conf), |
| team_id=bb.team_id, |
| ) |
| ) |
| out.append(_FRes(frame_id=r.frame_id, boxes=[{"x1": b.x1, "y1": b.y1, "x2": b.x2, "y2": b.y2, "cls_id": b.cls_id, "conf": round(float(b.conf), 2), "team_id": b.team_id} for b in new_boxes], keypoints=r.keypoints)) |
| return out |
|
|
|
|
| def _a0( |
| bboxes: Iterable[_Bx], |
| *, |
| frame_width: int, |
| frame_height: int, |
| cfg: _Cfg | None = None, |
| do_goalkeeper_dedup: bool = True, |
| do_referee_disambiguation: bool = False, |
| do_ball_dedup: bool = True, |
| ) -> list[_Bx]: |
| cfg = cfg or _Cfg() |
| W, H = int(frame_width), int(frame_height) |
| cy = 0.5 * float(H) |
| kept: list[_Bx] = list(bboxes or []) |
| if cfg.overlap_iou > 0 and len(kept) > 1: |
| balls = [bb for bb in kept if int(bb.cls_id) == _C0] |
| non_balls = [bb for bb in kept if int(bb.cls_id) != _C0] |
| if len(non_balls) > 1: |
| non_balls_sorted = sorted(non_balls, key=lambda bb: float(bb.conf), reverse=True) |
| kept_nb = [] |
| for cand in non_balls_sorted: |
| skip = False |
| for k in kept_nb: |
| iou = _i1(cand, k) |
| if iou >= cfg.overlap_iou: |
| skip = True |
| break |
| if ( |
| abs(int(cand.x1) - int(k.x1)) <= 3 |
| and abs(int(cand.y1) - int(k.y1)) <= 3 |
| and abs(int(cand.x2) - int(k.x2)) <= 3 |
| and abs(int(cand.y2) - int(k.y2)) <= 3 |
| and iou > 0.85 |
| ): |
| skip = True |
| break |
| if not skip: |
| kept_nb.append(cand) |
| kept = kept_nb + balls |
| if do_goalkeeper_dedup: |
| gks = [bb for bb in kept if int(bb.cls_id) == _C1] |
| if len(gks) > 1: |
| best_gk = max(gks, key=lambda bb: float(bb.conf)) |
| best_gk_conf = float(best_gk.conf) |
| deduped = [] |
| for bb in kept: |
| if int(bb.cls_id) == _C1: |
| if float(bb.conf) < best_gk_conf or (float(bb.conf) == best_gk_conf and bb is not best_gk): |
| deduped.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=_C2, conf=float(bb.conf), team_id="1")) |
| else: |
| deduped.append(bb) |
| else: |
| deduped.append(bb) |
| kept = deduped |
| if do_referee_disambiguation: |
| refs = [bb for bb in kept if int(bb.cls_id) == _C3] |
| if len(refs) > 1: |
| best_ref = min(refs, key=lambda bb: _d1(bb, cy)) |
| kept = [bb for bb in kept if int(bb.cls_id) != _C3 or bb is best_ref] |
| if do_ball_dedup: |
| balls = [bb for bb in kept if int(bb.cls_id) == _C0] |
| if len(balls) > 1: |
| best_ball = max(balls, key=lambda bb: float(bb.conf)) |
| kept = [bb for bb in kept if int(bb.cls_id) != _C0] + [best_ball] |
| return kept |
|
|
|
|
| def _k0(feats: np.ndarray, iters: int = 20) -> tuple[np.ndarray, np.ndarray]: |
| n, d = feats.shape |
| if n <= 0: |
| return np.zeros((2, d), dtype=np.float32), np.zeros(0, dtype=np.int64) |
| if n == 1: |
| return np.stack([feats[0], feats[0]], axis=0), np.zeros(1, dtype=np.int64) |
| c0 = feats[0] |
| d0 = np.linalg.norm(feats - c0[None, :], axis=1) |
| c1 = feats[int(np.argmax(d0))] |
| d1 = np.linalg.norm(feats - c1[None, :], axis=1) |
| c0 = feats[int(np.argmax(d1))] |
| centroids = np.stack([c0, c1], axis=0).astype(np.float32) |
| labels = np.zeros(n, dtype=np.int64) |
| for _ in range(iters): |
| dist = ((feats[:, None, :] - centroids[None, :, :]) ** 2).sum(axis=2) |
| labels = dist.argmin(axis=1) |
| for k in (0, 1): |
| sel = feats[labels == k] |
| if len(sel) > 0: |
| centroids[k] = sel.mean(axis=0) |
| return centroids, labels |
|
|
|
|
| def _m0(prev: np.ndarray, new: np.ndarray) -> np.ndarray: |
| d00 = np.sum((prev[0] - new[0]) ** 2) |
| d11 = np.sum((prev[1] - new[1]) ** 2) |
| d01 = np.sum((prev[0] - new[1]) ** 2) |
| d10 = np.sum((prev[1] - new[0]) ** 2) |
| if d00 + d11 <= d01 + d10: |
| return new |
| return np.stack([new[1], new[0]], axis=0) |
|
|
|
|
| |
| _USE_OSNET_TEAM = True |
| OSNET_IMAGE_SIZE = (64, 32) |
| OSNET_PREPROCESS = T.Compose([ |
| T.Resize(OSNET_IMAGE_SIZE), |
| T.ToTensor(), |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
| ]) |
|
|
|
|
| def _crop_upper_body_bx(frame: ndarray, box: _Bx) -> ndarray: |
| return frame[ |
| max(0, box.y1) : max(0, box.y2), |
| max(0, box.x1) : max(0, box.x2), |
| ] |
|
|
|
|
| def _preprocess_osnet(crop: ndarray) -> torch.Tensor: |
| rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) |
| pil = Image.fromarray(rgb) |
| return OSNET_PREPROCESS(pil) |
|
|
|
|
| def _filter_player_boxes_bx(boxes: list[_Bx]) -> list[_Bx]: |
| return [b for b in boxes if int(b.cls_id) == _C2] |
|
|
|
|
| |
| class _ConvLayer(nn.Module): |
| def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, groups=1, IN=False): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, bias=False, groups=groups) |
| self.bn = nn.InstanceNorm2d(out_channels, affine=True) if IN else nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| return self.relu(self.bn(self.conv(x))) |
|
|
|
|
| class _Conv1x1(nn.Module): |
| def __init__(self, in_channels, out_channels, stride=1, groups=1): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, 1, stride=stride, padding=0, bias=False, groups=groups) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| return self.relu(self.bn(self.conv(x))) |
|
|
|
|
| class _Conv1x1Linear(nn.Module): |
| def __init__(self, in_channels, out_channels, stride=1, bn=True): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, 1, stride=stride, padding=0, bias=False) |
| self.bn = nn.BatchNorm2d(out_channels) if bn else None |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| return self.bn(x) if self.bn is not None else x |
|
|
|
|
| class _Conv3x3(nn.Module): |
| def __init__(self, in_channels, out_channels, stride=1, groups=1): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False, groups=groups) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| return self.relu(self.bn(self.conv(x))) |
|
|
|
|
| class _LightConv3x3(nn.Module): |
| def __init__(self, in_channels, out_channels): |
| super().__init__() |
| self.conv1 = nn.Conv2d(in_channels, out_channels, 1, stride=1, padding=0, bias=False) |
| self.conv2 = nn.Conv2d(out_channels, out_channels, 3, stride=1, padding=1, bias=False, groups=out_channels) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| x = self.conv1(x) |
| x = self.conv2(x) |
| return self.relu(self.bn(x)) |
|
|
|
|
| class _LightConvStream(nn.Module): |
| def __init__(self, in_channels, out_channels, depth): |
| super().__init__() |
| layers = [_LightConv3x3(in_channels, out_channels)] |
| for _ in range(depth - 1): |
| layers.append(_LightConv3x3(out_channels, out_channels)) |
| self.layers = nn.Sequential(*layers) |
|
|
| def forward(self, x): |
| return self.layers(x) |
|
|
|
|
| class _ChannelGate(nn.Module): |
| def __init__(self, in_channels, num_gates=None, return_gates=False, gate_activation="sigmoid", reduction=16, layer_norm=False): |
| super().__init__() |
| if num_gates is None: |
| num_gates = in_channels |
| self.return_gates = return_gates |
| self.global_avgpool = nn.AdaptiveAvgPool2d(1) |
| self.fc1 = nn.Conv2d(in_channels, in_channels // reduction, kernel_size=1, bias=True, padding=0) |
| self.norm1 = nn.LayerNorm((in_channels // reduction, 1, 1)) if layer_norm else None |
| self.relu = nn.ReLU() |
| self.fc2 = nn.Conv2d(in_channels // reduction, num_gates, kernel_size=1, bias=True, padding=0) |
| self.gate_activation = nn.Sigmoid() if gate_activation == "sigmoid" else nn.ReLU() |
|
|
| def forward(self, x): |
| inp = x |
| x = self.global_avgpool(x) |
| x = self.fc1(x) |
| if self.norm1 is not None: |
| x = self.norm1(x) |
| x = self.relu(x) |
| x = self.fc2(x) |
| if self.gate_activation is not None: |
| x = self.gate_activation(x) |
| return x if self.return_gates else inp * x |
|
|
|
|
| class _OSBlockX1(nn.Module): |
| def __init__(self, in_channels, out_channels, IN=False, bottleneck_reduction=4): |
| super().__init__() |
| mid_channels = out_channels // bottleneck_reduction |
| self.conv1 = _Conv1x1(in_channels, mid_channels) |
| self.conv2a = _LightConv3x3(mid_channels, mid_channels) |
| self.conv2b = nn.Sequential(_LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels)) |
| self.conv2c = nn.Sequential(_LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels)) |
| self.conv2d = nn.Sequential(_LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels), _LightConv3x3(mid_channels, mid_channels)) |
| self.gate = _ChannelGate(mid_channels) |
| self.conv3 = _Conv1x1Linear(mid_channels, out_channels) |
| self.downsample = _Conv1x1Linear(in_channels, out_channels) if in_channels != out_channels else None |
| self.IN = nn.InstanceNorm2d(out_channels, affine=True) if IN else None |
|
|
| def forward(self, x): |
| identity = x |
| x1 = self.conv1(x) |
| x2 = self.gate(self.conv2a(x1)) + self.gate(self.conv2b(x1)) + self.gate(self.conv2c(x1)) + self.gate(self.conv2d(x1)) |
| x3 = self.conv3(x2) |
| if self.downsample is not None: |
| identity = self.downsample(identity) |
| out = x3 + identity |
| if self.IN is not None: |
| out = self.IN(out) |
| return F.relu(out) |
|
|
|
|
| class _OSNetX1(nn.Module): |
| def __init__(self, num_classes, blocks, layers, channels, feature_dim=512, loss="softmax", IN=False): |
| super().__init__() |
| self.loss = loss |
| self.feature_dim = feature_dim |
| self.conv1 = _ConvLayer(3, channels[0], 7, stride=2, padding=3, IN=IN) |
| self.maxpool = nn.MaxPool2d(3, stride=2, padding=1) |
| self.conv2 = self._make_layer(blocks[0], layers[0], channels[0], channels[1], reduce_spatial_size=True, IN=IN) |
| self.conv3 = self._make_layer(blocks[1], layers[1], channels[1], channels[2], reduce_spatial_size=True) |
| self.conv4 = self._make_layer(blocks[2], layers[2], channels[2], channels[3], reduce_spatial_size=False) |
| self.conv5 = _Conv1x1(channels[3], channels[3]) |
| self.global_avgpool = nn.AdaptiveAvgPool2d(1) |
| self.fc = self._construct_fc_layer(feature_dim, channels[3], dropout_p=None) |
| self.classifier = nn.Linear(self.feature_dim, num_classes) |
| self._init_params() |
|
|
| def _make_layer(self, block, layer, in_channels, out_channels, reduce_spatial_size, IN=False): |
| layers_list = [block(in_channels, out_channels, IN=IN)] |
| for _ in range(1, layer): |
| layers_list.append(block(out_channels, out_channels, IN=IN)) |
| if reduce_spatial_size: |
| layers_list.append(nn.Sequential(_Conv1x1(out_channels, out_channels), nn.AvgPool2d(2, stride=2))) |
| return nn.Sequential(*layers_list) |
|
|
| def _construct_fc_layer(self, fc_dims, input_dim, dropout_p=None): |
| if fc_dims is None or fc_dims < 0: |
| self.feature_dim = input_dim |
| return None |
| if isinstance(fc_dims, int): |
| fc_dims = [fc_dims] |
| layers_list = [] |
| for dim in fc_dims: |
| layers_list.append(nn.Linear(input_dim, dim)) |
| layers_list.append(nn.BatchNorm1d(dim)) |
| layers_list.append(nn.ReLU(inplace=True)) |
| if dropout_p is not None: |
| layers_list.append(nn.Dropout(p=dropout_p)) |
| input_dim = dim |
| self.feature_dim = fc_dims[-1] |
| return nn.Sequential(*layers_list) |
|
|
| def _init_params(self): |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") |
| if m.bias is not None: |
| nn.init.constant_(m.bias, 0) |
| elif isinstance(m, (nn.BatchNorm2d, nn.BatchNorm1d)): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| elif isinstance(m, nn.InstanceNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| elif isinstance(m, nn.Linear): |
| nn.init.normal_(m.weight, 0, 0.01) |
| if m.bias is not None: |
| nn.init.constant_(m.bias, 0) |
|
|
| def forward(self, x, return_featuremaps=False): |
| x = self.conv1(x) |
| x = self.maxpool(x) |
| x = self.conv2(x) |
| x = self.conv3(x) |
| x = self.conv4(x) |
| x = self.conv5(x) |
| if return_featuremaps: |
| return x |
| v = self.global_avgpool(x) |
| v = v.view(v.size(0), -1) |
| if self.fc is not None: |
| v = self.fc(v) |
| if not self.training: |
| return v |
| y = self.classifier(v) |
| if self.loss == "softmax": |
| return y |
| elif self.loss == "triplet": |
| return y, v |
| raise KeyError(f"Unsupported loss: {self.loss}") |
|
|
|
|
| def _osnet_x1_0(num_classes=1000, pretrained=True, loss="softmax", **kwargs): |
| return _OSNetX1( |
| num_classes, |
| blocks=[_OSBlockX1, _OSBlockX1, _OSBlockX1], |
| layers=[2, 2, 2], |
| channels=[64, 256, 384, 512], |
| loss=loss, |
| **kwargs, |
| ) |
|
|
|
|
| def _load_checkpoint_osnet(fpath: str): |
| fpath = os.path.abspath(os.path.expanduser(fpath)) |
| map_location = None if torch.cuda.is_available() else "cpu" |
| return torch.load(fpath, map_location=map_location, weights_only=False) |
|
|
|
|
| def _load_pretrained_weights_osnet(model: nn.Module, weight_path: str) -> None: |
| checkpoint = _load_checkpoint_osnet(weight_path) |
| state_dict = checkpoint.get("state_dict", checkpoint) |
| model_dict = model.state_dict() |
| new_state_dict = OrderedDict() |
| for k, v in state_dict.items(): |
| if k.startswith("module."): |
| k = k[7:] |
| if k in model_dict and model_dict[k].size() == v.size(): |
| new_state_dict[k] = v |
| model_dict.update(new_state_dict) |
| model.load_state_dict(model_dict) |
|
|
|
|
| def _load_osnet(device: str = "cuda", weight_path: Optional[Path] = None) -> Optional[nn.Module]: |
| model = _osnet_x1_0(num_classes=1, loss="softmax", pretrained=False) |
| if weight_path and Path(weight_path).exists(): |
| _load_pretrained_weights_osnet(model, str(weight_path)) |
| model.eval() |
| model.to(device) |
| return model |
|
|
|
|
| def _extract_osnet_embeddings( |
| model: nn.Module, |
| frames: list[ndarray], |
| bboxes_by_frame: dict[int, list[_Bx]], |
| track_ids_by_frame: dict[int, list[int | None]], |
| frame_offset: int, |
| device: str, |
| ) -> tuple[Optional[ndarray], Optional[list[tuple[int, int, int | None]]]]: |
| """Extract OSNet embeddings for player boxes; return (embeddings, meta) with meta = (frame_idx, box_idx, track_id).""" |
| crops = [] |
| meta: list[tuple[int, int, int | None]] = [] |
| for fi in range(len(frames)): |
| frame = frames[fi] if fi < len(frames) else None |
| if frame is None: |
| continue |
| frame_id = frame_offset + fi |
| boxes = bboxes_by_frame.get(frame_id, []) |
| tids = track_ids_by_frame.get(frame_id, [None] * len(boxes)) |
| for bi, box in enumerate(boxes): |
| if int(box.cls_id) != _C2: |
| continue |
| track_id = tids[bi] if bi < len(tids) else None |
| crop = _crop_upper_body_bx(frame, box) |
| if crop.size == 0: |
| continue |
| crops.append(_preprocess_osnet(crop)) |
| meta.append((fi, bi, track_id)) |
| if not crops: |
| return None, None |
| batch = torch.stack(crops).to(device).float() |
| with torch.inference_mode(): |
| embeddings = model(batch) |
| del batch |
| embeddings = embeddings.cpu().numpy() |
| return embeddings, meta |
|
|
|
|
| def _aggregate_by_track_osnet( |
| embeddings: ndarray, |
| meta: list[tuple[int, int, int | None]], |
| ) -> tuple[ndarray, list[tuple[int, int, int | None]]]: |
| track_map: dict[int | None, list[int]] = defaultdict(list) |
| meta_by_track: dict[int | None, tuple[int, int, int | None]] = {} |
| for idx, (fi, bi, tid) in enumerate(meta): |
| key = tid if tid is not None else id((fi, bi)) |
| track_map[key].append(idx) |
| meta_by_track[key] = (fi, bi, tid) |
| agg_embeddings = [] |
| agg_meta = [] |
| for key, indices in track_map.items(): |
| mean_emb = np.mean(embeddings[indices], axis=0) |
| norm = np.linalg.norm(mean_emb) |
| if norm > 1e-12: |
| mean_emb /= norm |
| agg_embeddings.append(mean_emb) |
| agg_meta.append(meta_by_track[key]) |
| return np.array(agg_embeddings), agg_meta |
|
|
|
|
| def _classify_teams_osnet( |
| agg_embeddings: ndarray, |
| agg_meta: list[tuple[int, int, int | None]], |
| ) -> dict[int | None, str]: |
| """KMeans on aggregated embeddings; return track_id -> team_id '1' or '2'.""" |
| n = len(agg_embeddings) |
| track_to_team: dict[int | None, str] = {} |
| if n == 0: |
| return track_to_team |
| if n == 1: |
| track_to_team[agg_meta[0][2]] = "1" |
| return track_to_team |
| kmeans = KMeans(n_clusters=2, n_init=2, random_state=42) |
| kmeans.fit(agg_embeddings) |
| centroids = kmeans.cluster_centers_ |
| c0, c1 = centroids[0], centroids[1] |
| norm_0 = np.linalg.norm(c0) |
| norm_1 = np.linalg.norm(c1) |
| similarity = np.dot(c0, c1) / (norm_0 * norm_1 + 1e-12) |
| if similarity > 0.95: |
| for (_, _, tid) in agg_meta: |
| track_to_team[tid] = "1" |
| return track_to_team |
| if norm_0 <= norm_1: |
| kmeans.labels_ = 1 - kmeans.labels_ |
| for (fi, bi, tid), label in zip(agg_meta, kmeans.labels_): |
| track_to_team[tid] = "1" if label == 0 else "2" |
| return track_to_team |
|
|
|
|
| class _Pl: |
| def __init__(self, repo_root: Path) -> None: |
| self.repo_root = Path(repo_root) |
| self._executor = ThreadPoolExecutor(max_workers=3) |
| self._track_id_to_team_votes: dict[int, dict[str, int]] = {} |
| self._track_id_to_class_votes: dict[int, dict[int, int]] = {} |
| self._osnet_model: Optional[nn.Module] = None |
| self._osnet_device = "cuda" if torch.cuda.is_available() else "cpu" |
| if _USE_OSNET_TEAM: |
| _osnet_path = self.repo_root / "models" / "osnet_model.pth.tar-100" |
| if _osnet_path.exists(): |
| try: |
| self._osnet_model = _load_osnet(self._osnet_device, _osnet_path) |
| except Exception: |
| self._osnet_model = None |
| self._tracker_config = "botsort.yaml" |
| models_dir = self.repo_root / "models" |
| if _B2: |
| self.ball_model = YOLO(str(models_dir / "ball-detection-model.onnx"), task="detect") |
| else: |
| self.ball_model = None |
| self.person_model = YOLO(str(models_dir / "person-detection-model.onnx"), task="detect") |
| self._person_tracker_state: dict[int, tuple[tuple[float, float, float, float], tuple[float, float, float, float], int]] = {} |
| self._person_tracker_next_id = 0 |
| self._keypoint_model_hrnet = None |
| _yaml_path = self.repo_root / "hrnetv2_w48.yaml" |
| _weights_path = self.repo_root / "models" / "keypoint" |
| if _f0 and _yaml_path.exists() and _weights_path.exists(): |
| try: |
| self._keypoint_model_hrnet = _l0( |
| self.repo_root, weights_subdir="models" |
| ) |
| except Exception: |
| self._keypoint_model_hrnet = None |
| self._current_batch_bbox_timings: list[tuple[str, float]] = [] |
| self._current_batch_kp_timings: list[tuple[str, float]] = [] |
| self._prev_batch_tail_tid_counts: dict[int, int] = {} |
|
|
| def reset_for_new_video(self) -> None: |
| self._track_id_to_team_votes.clear() |
| self._track_id_to_class_votes.clear() |
| self._prev_batch_tail_tid_counts.clear() |
| self._person_tracker_state.clear() |
| self._person_tracker_next_id = 0 |
|
|
| def _keypoint_hrnet_task( |
| self, |
| images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> dict[int, list[list[float]]]: |
| _kp_timings: list[tuple[str, float]] = [] |
| t_total = time.perf_counter() |
| default_kps = [[0.0, 0.0] for _ in range(n_keypoints)] |
| if not _f0 or self._keypoint_model_hrnet is None: |
| self._current_batch_kp_timings = [] |
| return {offset + i: list(default_kps) for i in range(len(images))} |
| device = "cuda" if next(self._keypoint_model_hrnet.parameters()).is_cuda else "cpu" |
| kp_threshold = 0.2 |
| _t = time.perf_counter() |
| kp_result = _x0( |
| images, self._keypoint_model_hrnet, kp_threshold, device, batch_size=_KP_BS |
| ) |
| _kp_timings.append(("kp_hrnet", time.perf_counter() - _t)) |
| _t = time.perf_counter() |
| h, w = images[0].shape[:2] |
| if n_keypoints == 32: |
| keypoints_xyp = _normalize_keypoints_xyp(kp_result, images, n_keypoints) |
| if _FKP_FAST_MODE: |
| job = _fkp_normalize_results(keypoints_xyp, _FKP_SINGLE_THRESHOLD) |
| keypoints = [] |
| for idx in range(len(images)): |
| kps = _fix_keypoints(job[idx] if idx < len(job) else [(0, 0)] * 32, n_keypoints) |
| adjusted = _step8_one_frame_kp(kps, w, h, False, n_keypoints) |
| keypoints.append(_keypoints_to_float(adjusted if adjusted is not None else kps)) |
| else: |
| job = _fkp_normalize_results(keypoints_xyp, _FKP_SINGLE_THRESHOLD) |
| keypoints = [] |
| for idx in range(len(images)): |
| kps = _fix_keypoints(job[idx] if idx < len(job) else [(0, 0)] * 32, n_keypoints) |
| kps_float = _keypoints_to_float(kps) |
| try: |
| refined = _apply_homography_refinement(kps_float, images[idx], n_keypoints) |
| keypoints.append(refined) |
| except Exception: |
| keypoints.append(kps_float) |
| else: |
| keypoints = _n0(kp_result, images, n_keypoints) |
| keypoints = [_fix_keypoints(kps, n_keypoints) for kps in keypoints] |
| keypoints = [_keypoints_to_float(kps) for kps in keypoints] |
| _kp_timings.append(("kp_normalize", time.perf_counter() - _t)) |
| _t = time.perf_counter() |
| out: dict[int, list[list[float]]] = {} |
| for i, kpts in enumerate(keypoints): |
| out[offset + i] = _c1(kpts) |
| _kp_timings.append(("kp_to_output", time.perf_counter() - _t)) |
| _kp_timings.append(("kp_total", time.perf_counter() - t_total)) |
| self._current_batch_kp_timings = _kp_timings |
| return out |
|
|
| def _bbox_task( |
| self, |
| images: list[ndarray], |
| offset: int, |
| imgsz: int, |
| conf: float, |
| onnx_batch_size: int, |
| ) -> dict[int, list[_Bx]]: |
| _bbox_timings: list[tuple[str, float]] = [] |
| _t0 = time.perf_counter() |
|
|
| ball_res: list = [] |
| if _B2 and self.ball_model is not None: |
| _t = time.perf_counter() |
| for start in range(0, len(images), onnx_batch_size): |
| chunk = images[start : start + onnx_batch_size] |
| batch_res = self.ball_model.predict(chunk, imgsz=imgsz, conf=conf, verbose=False) |
| ball_res.extend(batch_res if batch_res else []) |
| _bbox_timings.append(("bbox_ball_detect", time.perf_counter() - _t)) |
| _t = time.perf_counter() |
| batch_res = self.person_model(images, imgsz=_D0_PERSON, conf=conf, iou=0.5, agnostic_nms=True, verbose=False) |
| if not isinstance(batch_res, list): |
| batch_res = [batch_res] if batch_res is not None else [] |
| self._person_tracker_state, self._person_tracker_next_id, person_track_ids = _assign_person_track_ids( |
| self._person_tracker_state, self._person_tracker_next_id, batch_res, _TRACK_IOU_THRESH |
| ) |
| person_res = batch_res |
| _bbox_timings.append(("bbox_person_track", time.perf_counter() - _t)) |
|
|
| bboxes_by_frame: dict[int, list[_Bx]] = {} |
| track_ids_by_frame: dict[int, list[int | None]] = {} |
| boxes_raw_list: list[list[_Bx]] = [] |
| track_ids_raw_list: list[list[int | None]] = [] |
| bbox_to_track_list: list[dict[tuple[int, int, int, int], int]] = [] |
| _t = time.perf_counter() |
| for i, frame in enumerate(images): |
| frame_id = offset + i |
| boxes_raw = [] |
| track_ids_raw: list[int | None] = [] |
| bbox_to_track: dict[tuple[int, int, int, int], int] = {} |
| if _B2: |
| det_ball = ball_res[i] if i < len(ball_res) else None |
| if det_ball is not None and getattr(det_ball, "boxes", None) is not None and len(det_ball.boxes) > 0: |
| b = det_ball.boxes |
| xyxy = b.xyxy.cpu().numpy() |
| confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32) |
| clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32) |
| for (x1, y1, x2, y2), c, cf in zip(xyxy, clss, confs): |
| if int(c) == 0: |
| boxes_raw.append(_Bx(x1=int(round(x1)), y1=int(round(y1)), x2=int(round(x2)), y2=int(round(y2)), cls_id=_C0, conf=float(cf))) |
| track_ids_raw.append(None) |
| det_p = person_res[i] if i < len(person_res) else None |
| if det_p is not None and getattr(det_p, "boxes", None) is not None and len(det_p.boxes) > 0: |
| b = det_p.boxes |
| xyxy = b.xyxy.cpu().numpy() |
| confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32) |
| clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32) |
| if i < len(person_track_ids) and len(person_track_ids[i]) == len(clss): |
| track_ids = np.array(person_track_ids[i], dtype=np.int32) |
| else: |
| track_ids = np.full(len(clss), -1, dtype=np.int32) |
| for (x1, y1, x2, y2), c, cf, tid in zip(xyxy, clss, confs, track_ids): |
| c = int(c) |
| tid = int(tid) |
| x1r, y1r, x2r, y2r = int(round(x1)), int(round(y1)), int(round(x2)), int(round(y2)) |
| if tid >= 0: |
| bbox_to_track[(x1r, y1r, x2r, y2r)] = tid |
| tid_out = tid if tid >= 0 else None |
| if c == 0: |
| boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C2, conf=float(cf))) |
| track_ids_raw.append(tid_out) |
| elif c == 1: |
| boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C3, conf=float(cf))) |
| track_ids_raw.append(tid_out) |
| elif c == 2: |
| boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C1, conf=float(cf))) |
| track_ids_raw.append(tid_out) |
| boxes_raw_list.append(boxes_raw) |
| track_ids_raw_list.append(track_ids_raw) |
| bbox_to_track_list.append(bbox_to_track) |
| _bbox_timings.append(("bbox_parse_ball_person", time.perf_counter() - _t)) |
| for i in range(len(images)): |
| bboxes_by_frame[offset + i] = boxes_raw_list[i] |
| track_ids_by_frame[offset + i] = track_ids_raw_list[i] if i < len(track_ids_raw_list) else [None] * len(boxes_raw_list[i]) |
| if _G0 and len(images) > _G2: |
| _t = time.perf_counter() |
| tid_counts: dict[int, int] = {} |
| tid_first_frame: dict[int, int] = {} |
| for fid in range(offset, offset + len(images)): |
| tids = track_ids_by_frame.get(fid, []) |
| for tid in tids: |
| if tid is not None and tid >= 0: |
| t = int(tid) |
| tid_counts[t] = tid_counts.get(t, 0) + 1 |
| if t not in tid_first_frame or fid < tid_first_frame[t]: |
| tid_first_frame[t] = fid |
| for t, prev_count in self._prev_batch_tail_tid_counts.items(): |
| tid_counts[t] = tid_counts.get(t, 0) + prev_count |
| if prev_count > 0: |
| tid_first_frame[t] = offset + len(images) |
| boundary = offset + len(images) - _G2 |
| noise_tids = { |
| t for t, count in tid_counts.items() |
| if count < _G1 and tid_first_frame[t] < boundary |
| } |
| for fid in range(offset, offset + len(images)): |
| boxes = bboxes_by_frame.get(fid, []) |
| tids = track_ids_by_frame.get(fid, [None] * len(boxes)) |
| if len(tids) != len(boxes): |
| tids = tids + [None] * (len(boxes) - len(tids)) |
| keep = [ |
| i for i in range(len(boxes)) |
| if tids[i] is None or int(tids[i]) not in noise_tids |
| ] |
| bboxes_by_frame[fid] = [boxes[i] for i in keep] |
| track_ids_by_frame[fid] = [tids[i] for i in keep] |
| tail_start = offset + len(images) - _G2 |
| self._prev_batch_tail_tid_counts = {} |
| for fid in range(tail_start, offset + len(images)): |
| tids = track_ids_by_frame.get(fid, []) |
| for tid in tids: |
| if tid is not None and tid >= 0: |
| t = int(tid) |
| self._prev_batch_tail_tid_counts[t] = self._prev_batch_tail_tid_counts.get(t, 0) + 1 |
| _bbox_timings.append(("bbox_noise_filter", time.perf_counter() - _t)) |
| _t = time.perf_counter() |
| for i, frame in enumerate(images): |
| frame_id = offset + i |
| boxes_raw = bboxes_by_frame[frame_id] |
| track_ids_raw = track_ids_by_frame[frame_id] |
| bbox_to_track = {(int(bb.x1), int(bb.y1), int(bb.x2), int(bb.y2)): int(tid) for bb, tid in zip(boxes_raw, track_ids_raw) if tid is not None and int(tid) >= 0} |
| boxes_stabilized = [] |
| track_ids_stabilized: list[int | None] = [] |
| for idx, bb in enumerate(boxes_raw): |
| best_tid = -1 |
| best_iou = 0.0 |
| for (bx1, by1, bx2, by2), tid in bbox_to_track.items(): |
| iou = _i1(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=0, conf=0.0), _Bx(x1=bx1, y1=by1, x2=bx2, y2=by2, cls_id=0, conf=0.0)) |
| if iou > best_iou and iou > 0.5: |
| best_iou, best_tid = iou, tid |
| tid_out = best_tid if best_tid >= 0 else (track_ids_raw[idx] if idx < len(track_ids_raw) else None) |
| if best_tid >= 0: |
| if _G5: |
| if best_tid not in self._track_id_to_class_votes: |
| self._track_id_to_class_votes[best_tid] = {} |
| cls_key = int(bb.cls_id) |
| self._track_id_to_class_votes[best_tid][cls_key] = self._track_id_to_class_votes[best_tid].get(cls_key, 0) + 1 |
| boxes_stabilized.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=bb.cls_id, conf=bb.conf, team_id=None)) |
| track_ids_stabilized.append(tid_out) |
| else: |
| boxes_stabilized.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=bb.cls_id, conf=bb.conf, team_id=None)) |
| track_ids_stabilized.append(tid_out) |
| bboxes_by_frame[frame_id] = boxes_stabilized |
| track_ids_by_frame[frame_id] = track_ids_stabilized |
| _bbox_timings.append(("bbox_stabilize_track_ids", time.perf_counter() - _t)) |
| _t = time.perf_counter() |
| for fid in range(offset, offset + len(images)): |
| new_boxes = [] |
| tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) |
| for box_idx, box in enumerate(bboxes_by_frame[fid]): |
| tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None |
| if _G5 and tid is not None and tid >= 0 and tid in self._track_id_to_class_votes: |
| votes = self._track_id_to_class_votes[tid] |
| ref_votes = votes.get(_C3, 0) |
| gk_votes = votes.get(_C1, 0) |
| if _G6 and ref_votes > _G3: |
| majority_cls = _C3 |
| elif _G7 and gk_votes > _G3: |
| majority_cls = _C1 |
| else: |
| majority_cls = max(votes.items(), key=lambda x: x[1])[0] |
| new_boxes.append(_Bx(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=majority_cls, conf=box.conf, team_id=None)) |
| else: |
| new_boxes.append(box) |
| bboxes_by_frame[fid] = new_boxes |
| track_ids_by_frame[fid] = tids_fid |
| _bbox_timings.append(("bbox_class_votes", time.perf_counter() - _t)) |
| if _B5 and len(images) > 1: |
| _t = time.perf_counter() |
| track_to_frames: dict[int, list[tuple[int, _Bx]]] = {} |
| for fid in range(offset, offset + len(images)): |
| boxes = bboxes_by_frame.get(fid, []) |
| tids = track_ids_by_frame.get(fid, [None] * len(boxes)) |
| for bb, tid in zip(boxes, tids): |
| if tid is not None and int(tid) >= 0: |
| t = int(tid) |
| track_to_frames.setdefault(t, []).append((fid, bb)) |
| to_add: dict[int, list[tuple[_Bx, int]]] = {} |
| for t, pairs in track_to_frames.items(): |
| pairs.sort(key=lambda p: p[0]) |
| for i in range(len(pairs) - 1): |
| f1, b1 = pairs[i] |
| f2, b2 = pairs[i + 1] |
| if f2 - f1 <= 1: |
| continue |
| for g in range(f1 + 1, f2): |
| w = (g - f1) / (f2 - f1) |
| x1 = int(round((1 - w) * b1.x1 + w * b2.x1)) |
| y1 = int(round((1 - w) * b1.y1 + w * b2.y1)) |
| x2 = int(round((1 - w) * b1.x2 + w * b2.x2)) |
| y2 = int(round((1 - w) * b1.y2 + w * b2.y2)) |
| interp = _Bx(x1=x1, y1=y1, x2=x2, y2=y2, cls_id=b2.cls_id, conf=b2.conf, team_id=b2.team_id) |
| to_add.setdefault(g, []).append((interp, t)) |
| for g, add_list in to_add.items(): |
| bboxes_by_frame[g] = list(bboxes_by_frame.get(g, [])) |
| track_ids_by_frame[g] = list(track_ids_by_frame.get(g, [])) |
| for interp_box, tid in add_list: |
| bboxes_by_frame[g].append(interp_box) |
| track_ids_by_frame[g].append(tid) |
| _bbox_timings.append(("bbox_interp_gaps", time.perf_counter() - _t)) |
| reid_team_per_frame: list[list[Optional[str]]] = [[None] * len(bboxes_by_frame[offset + fi]) for fi in range(len(images))] |
| if self._osnet_model is not None: |
| _t_reid_total = time.perf_counter() |
| emb, meta = _extract_osnet_embeddings( |
| self._osnet_model, images, bboxes_by_frame, track_ids_by_frame, offset, self._osnet_device |
| ) |
| if emb is not None and meta is not None: |
| agg_emb, agg_meta = _aggregate_by_track_osnet(emb, meta) |
| track_to_team = _classify_teams_osnet(agg_emb, agg_meta) |
| for fi in range(len(images)): |
| frame_id = offset + fi |
| boxes_f = bboxes_by_frame.get(frame_id, []) |
| tids_f = track_ids_by_frame.get(frame_id, []) |
| for bi in range(len(boxes_f)): |
| tid = tids_f[bi] if bi < len(tids_f) else None |
| if tid in track_to_team and bi < len(reid_team_per_frame[fi]): |
| reid_team_per_frame[fi][bi] = track_to_team[tid] |
| _bbox_timings.append(("bbox_reid_team", time.perf_counter() - _t_reid_total)) |
| _t = time.perf_counter() |
| for i in range(len(images)): |
| frame_id = offset + i |
| boxes = bboxes_by_frame[frame_id] |
| tids_fid = track_ids_by_frame[frame_id] |
| for box_idx, bb in enumerate(boxes): |
| tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None |
| team_from_reid = reid_team_per_frame[i][box_idx] if box_idx < len(reid_team_per_frame[i]) else None |
| if _G8 and tid is not None and tid >= 0 and team_from_reid: |
| if tid not in self._track_id_to_team_votes: |
| self._track_id_to_team_votes[tid] = {} |
| team_key = team_from_reid.strip() |
| self._track_id_to_team_votes[tid][team_key] = self._track_id_to_team_votes[tid].get(team_key, 0) + 1 |
| for fid in range(offset, offset + len(images)): |
| new_boxes = [] |
| tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) |
| fi = fid - offset |
| for box_idx, box in enumerate(bboxes_by_frame[fid]): |
| tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None |
| team_from_reid = reid_team_per_frame[fi][box_idx] if fi < len(reid_team_per_frame) and box_idx < len(reid_team_per_frame[fi]) else None |
| default_team = team_from_reid or box.team_id |
| if _G8 and tid is not None and tid >= 0 and tid in self._track_id_to_team_votes and self._track_id_to_team_votes[tid]: |
| majority_team = max(self._track_id_to_team_votes[tid].items(), key=lambda x: x[1])[0] |
| else: |
| majority_team = default_team |
| new_boxes.append(_Bx(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=box.cls_id, conf=box.conf, team_id=majority_team)) |
| bboxes_by_frame[fid] = new_boxes |
| track_ids_by_frame[fid] = tids_fid |
| _bbox_timings.append(("bbox_team_votes", time.perf_counter() - _t)) |
| if len(images) > 0: |
| _t = time.perf_counter() |
| H, W = images[0].shape[:2] |
| for fid in range(offset, offset + len(images)): |
| orig_boxes = bboxes_by_frame[fid] |
| orig_tids = track_ids_by_frame.get(fid, [None] * len(orig_boxes)) |
| adjusted = _a0( |
| orig_boxes, |
| frame_width=W, |
| frame_height=H, |
| do_goalkeeper_dedup=_B3, |
| do_referee_disambiguation=_B4, |
| do_ball_dedup=_B1, |
| ) |
| adjusted_tids: list[int | None] = [] |
| used_orig = set() |
| for ab in adjusted: |
| matched = None |
| for oi, ob in enumerate(orig_boxes): |
| if oi in used_orig: |
| continue |
| if ob.x1 == ab.x1 and ob.y1 == ab.y1 and ob.x2 == ab.x2 and ob.y2 == ab.y2: |
| matched = orig_tids[oi] if oi < len(orig_tids) else None |
| used_orig.add(oi) |
| break |
| adjusted_tids.append(matched) |
| if _B0 > 0: |
| new_adjusted = [] |
| new_adjusted_tids = [] |
| for ab, tid in zip(adjusted, adjusted_tids): |
| if int(ab.cls_id) == _C0 and float(ab.conf) < _B0: |
| continue |
| new_adjusted.append(ab) |
| new_adjusted_tids.append(tid) |
| adjusted = new_adjusted |
| adjusted_tids = new_adjusted_tids |
| if _q0 != 0.0 or _q1 != 0.0: |
| boxes_offset = [] |
| offset_tids = [] |
| for ab_idx, bb in enumerate(adjusted): |
| cx = 0.5 * (bb.x1 + bb.x2) |
| cy = 0.5 * (bb.y1 + bb.y2) |
| w = bb.x2 - bb.x1 |
| h = bb.y2 - bb.y1 |
| cx *= 1.0 + _q0 |
| cy *= 1.0 + _q1 |
| boxes_offset.append(_Bx(x1=int(round(cx - w/2)), y1=int(round(cy - h/2)), x2=int(round(cx + w/2)), y2=int(round(cy + h/2)), cls_id=bb.cls_id, conf=bb.conf, team_id=bb.team_id)) |
| offset_tids.append(adjusted_tids[ab_idx] if ab_idx < len(adjusted_tids) else None) |
| adjusted = boxes_offset |
| adjusted_tids = offset_tids |
| bboxes_by_frame[fid] = adjusted |
| track_ids_by_frame[fid] = adjusted_tids |
| _bbox_timings.append(("bbox_adjust_boxes", time.perf_counter() - _t)) |
| if _A0 and _S0 > 1 and len(images) > 0: |
| _t = time.perf_counter() |
| _tmp_results = [] |
| for fid in range(offset, offset + len(images)): |
| _boxes = bboxes_by_frame.get(fid, []) |
| _tmp_results.append( |
| _FRes( |
| frame_id=fid, |
| boxes=[{"x1": int(b.x1), "y1": int(b.y1), "x2": int(b.x2), "y2": int(b.y2), "cls_id": int(b.cls_id), "conf": round(float(b.conf), 2), "team_id": b.team_id} for b in _boxes], |
| keypoints=[], |
| ) |
| ) |
| _tmp_results = _s0(_tmp_results, window=_S0, tids_by_frame=track_ids_by_frame) |
| for r in _tmp_results: |
| bboxes_by_frame[int(r.frame_id)] = [_Bx(**box) for box in r.boxes] |
| _bbox_timings.append(("bbox_smoothing", time.perf_counter() - _t)) |
| _bbox_timings.append(("bbox_total", time.perf_counter() - _t0)) |
| self._current_batch_bbox_timings = _bbox_timings |
| return bboxes_by_frame |
|
|
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[_FRes]: |
| if not batch_images: |
| return [] |
| if offset == 0: |
| self.reset_for_new_video() |
| gc.collect() |
| try: |
| import torch |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| except Exception: |
| pass |
| images = list(batch_images) |
| n_frames = len(images) |
| imgsz = _D0 |
| conf = _D1 |
| executor = self._executor |
| default_kps = [[0.0, 0.0] for _ in range(n_keypoints)] |
| if _E0 and _E1 and _P0: |
| future_bbox = executor.submit(self._bbox_task, images, offset, imgsz, conf, _BX_BS) |
| future_kp = executor.submit(self._keypoint_hrnet_task, images, offset, n_keypoints) |
| bboxes_by_frame = future_bbox.result() |
| keypoints_by_frame = future_kp.result() |
| elif _E0 and _E1: |
| bboxes_by_frame = self._bbox_task(images, offset, imgsz, conf, _BX_BS) |
| keypoints_by_frame = self._keypoint_hrnet_task(images, offset, n_keypoints) |
| else: |
| if _E0: |
| bboxes_by_frame = self._bbox_task(images, offset, imgsz, conf, _BX_BS) |
| else: |
| bboxes_by_frame = {offset + i: [] for i in range(len(images))} |
| self._current_batch_bbox_timings = [] |
| if _E1: |
| keypoints_by_frame = self._keypoint_hrnet_task(images, offset, n_keypoints) |
| else: |
| keypoints_by_frame = {offset + i: list(default_kps) for i in range(len(images))} |
| self._current_batch_kp_timings = [] |
| if _STEP0_ENABLED and keypoints_by_frame: |
| _t = time.perf_counter() |
| for fid in list(keypoints_by_frame.keys()): |
| kps = keypoints_by_frame[fid] |
| if isinstance(kps, list) and len(kps) == _N0: |
| _step0_remove_close_keypoints(kps, _STEP0_PROXIMITY_PX) |
| self._current_batch_kp_timings.append(("kp_step0_remove_close", time.perf_counter() - _t)) |
| if _U0 and _E1 and keypoints_by_frame and n_keypoints == 32 and _N0 == 32: |
| template_img: ndarray | None = getattr(self, "_kp_template_cache", None) |
| if template_img is None: |
| template_img = _y0() |
| if template_img.size > 0 and template_img.sum() > 0: |
| self._kp_template_cache = template_img |
| else: |
| template_img = None |
| _t = time.perf_counter() |
| for idx in range(len(images)): |
| frame_id = offset + idx |
| kps = keypoints_by_frame.get(frame_id) |
| if not kps or len(kps) != 32: |
| continue |
| frame = images[idx] |
| frame_height, frame_width = frame.shape[:2] |
| if template_img is not None: |
| step5_out = _z0(kps, frame, template_img) |
| if step5_out is not None: |
| keypoints_by_frame[frame_id] = step5_out |
| if template_img is not None and _J1: |
| _z8(keypoints_by_frame, images, offset, template_img) |
| self._current_batch_kp_timings.append(("kp_homography", time.perf_counter() - _t)) |
| if _J4: |
| _t = time.perf_counter() |
| for idx in range(len(images)): |
| frame_id = offset + idx |
| kps = keypoints_by_frame.get(frame_id) |
| if not kps or len(kps) != 32: |
| continue |
| frame = images[idx] |
| frame_height, frame_width = frame.shape[:2] |
| adjusted = _z1(kps, frame_width, frame_height, _J0) |
| if adjusted is not None: |
| keypoints_by_frame[frame_id] = adjusted |
| self._current_batch_kp_timings.append(("kp_adjust", time.perf_counter() - _t)) |
| results = [] |
| for idx in range(len(images)): |
| frame_number = offset + idx |
| kps = keypoints_by_frame.get(frame_number, [[0.0, 0.0] for _ in range(n_keypoints)]) |
| if len(kps) != n_keypoints: |
| kps = (kps[:n_keypoints] if len(kps) >= n_keypoints else kps + [[0.0, 0.0]] * (n_keypoints - len(kps))) |
| kps = [[round(float(kp[0]), 1), round(float(kp[1]), 1)] for kp in kps] |
| boxes_raw = bboxes_by_frame.get(frame_number, []) |
| boxes_for_result = [ |
| { |
| "x1": int(b.x1), |
| "y1": int(b.y1), |
| "x2": int(b.x2), |
| "y2": int(b.y2), |
| "cls_id": _CLS_TO_VALIDATOR.get(int(b.cls_id), int(b.cls_id)), |
| "conf": round(float(b.conf), 2), |
| "team_id": b.team_id, |
| } |
| for b in boxes_raw |
| ] |
| results.append(_FRes(frame_id=frame_number, boxes=boxes_for_result, keypoints=kps)) |
| return results |
|
|
| class _M: |
| def __init__(self, path_hf_repo: Path) -> None: |
| self.health = "Okay!!!" |
| self.pipeline: _Pl | None = None |
| self.path_hf_repo = Path(path_hf_repo) |
|
|
| def __repr__(self) -> str: |
| return self.health |
|
|
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[_FRes]: |
| if self.pipeline is None: |
| self.pipeline = _Pl(repo_root=self.path_hf_repo) |
| return self.pipeline.predict_batch(batch_images, offset, n_keypoints) |
|
|
|
|
| Miner = _M |