| from __future__ import annotations |
|
|
| import gc |
| import os |
| import sys |
| import time |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| import cv2 |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import yaml |
| from numpy import ndarray |
| from pydantic import BaseModel |
| from ultralytics import YOLO |
|
|
| from typing import Iterable, Generator, List, TypeVar, Tuple, Sequence, Any, Dict, Optional |
| from collections import deque, OrderedDict, defaultdict |
| import threading |
| from itertools import combinations |
| import yaml |
| from cv2 import ( |
| bitwise_and, |
| findHomography, |
| warpPerspective, |
| cvtColor, |
| COLOR_BGR2GRAY, |
| threshold, |
| THRESH_BINARY, |
| getStructuringElement, |
| MORPH_RECT, |
| MORPH_TOPHAT, |
| GaussianBlur, |
| morphologyEx, |
| Canny, |
| connectedComponents, |
| perspectiveTransform, |
| RETR_EXTERNAL, |
| CHAIN_APPROX_SIMPLE, |
| findContours, |
| boundingRect, |
| dilate, |
| imread, |
| countNonZero |
| ) |
| import gc |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" |
|
|
| _f0 = True |
| BatchNorm2d = nn.BatchNorm2d |
| _v0 = 0.1 |
|
|
|
|
| def _c0(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d: |
| return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False) |
|
|
|
|
| class _B0(nn.Module): |
| expansion = 1 |
|
|
| def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: Any = None): |
| super().__init__() |
| self.conv1 = _c0(inplanes, planes, stride) |
| self.bn1 = BatchNorm2d(planes, momentum=_v0) |
| self.relu = nn.ReLU(inplace=True) |
| self.conv2 = _c0(planes, planes) |
| self.bn2 = BatchNorm2d(planes, momentum=_v0) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| residual = x |
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
| out = self.conv2(out) |
| out = self.bn2(out) |
| if self.downsample is not None: |
| residual = self.downsample(x) |
| out += residual |
| out = self.relu(out) |
| return out |
|
|
|
|
| class _B1(nn.Module): |
| expansion = 4 |
|
|
| def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: Any = None): |
| super().__init__() |
| self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) |
| self.bn1 = BatchNorm2d(planes, momentum=_v0) |
| self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) |
| self.bn2 = BatchNorm2d(planes, momentum=_v0) |
| self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False) |
| self.bn3 = BatchNorm2d(planes * self.expansion, momentum=_v0) |
| self.relu = nn.ReLU(inplace=True) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| residual = x |
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
| out = self.conv2(out) |
| out = self.bn2(out) |
| out = self.relu(out) |
| out = self.conv3(out) |
| out = self.bn3(out) |
| if self.downsample is not None: |
| residual = self.downsample(x) |
| out += residual |
| out = self.relu(out) |
| return out |
|
|
|
|
| _d0 = {"BASIC": _B0, "BOTTLENECK": _B1} |
|
|
|
|
| class _H0(nn.Module): |
| def __init__(self, num_branches: int, blocks: type, num_blocks: list, num_inchannels: list, num_channels: list, fuse_method: str, multi_scale_output: bool = True): |
| super().__init__() |
| self._check_branches(num_branches, blocks, num_blocks, num_inchannels, num_channels) |
| self.num_inchannels = num_inchannels |
| self.fuse_method = fuse_method |
| self.num_branches = num_branches |
| self.multi_scale_output = multi_scale_output |
| self.branches = self._make_branches(num_branches, blocks, num_blocks, num_channels) |
| self.fuse_layers = self._make_fuse_layers() |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def _check_branches(self, num_branches: int, blocks: type, num_blocks: list, num_inchannels: list, num_channels: list) -> None: |
| if num_branches != len(num_blocks): |
| raise ValueError("NUM_BRANCHES <> NUM_BLOCKS") |
| if num_branches != len(num_channels): |
| raise ValueError("NUM_BRANCHES <> NUM_CHANNELS") |
| if num_branches != len(num_inchannels): |
| raise ValueError("NUM_BRANCHES <> NUM_INCHANNELS") |
|
|
| def _make_one_branch(self, branch_index: int, block: type, num_blocks: list, num_channels: list, stride: int = 1) -> nn.Sequential: |
| downsample = None |
| if stride != 1 or self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(self.num_inchannels[branch_index], num_channels[branch_index] * block.expansion, kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(num_channels[branch_index] * block.expansion, momentum=_v0), |
| ) |
| layers = [block(self.num_inchannels[branch_index], num_channels[branch_index], stride, downsample)] |
| self.num_inchannels[branch_index] = num_channels[branch_index] * block.expansion |
| for _ in range(1, num_blocks[branch_index]): |
| layers.append(block(self.num_inchannels[branch_index], num_channels[branch_index])) |
| return nn.Sequential(*layers) |
|
|
| def _make_branches(self, num_branches: int, block: type, num_blocks: list, num_channels: list) -> nn.ModuleList: |
| return nn.ModuleList([self._make_one_branch(i, block, num_blocks, num_channels) for i in range(num_branches)]) |
|
|
| def _make_fuse_layers(self) -> nn.ModuleList | None: |
| if self.num_branches == 1: |
| return None |
| num_branches = self.num_branches |
| num_inchannels = self.num_inchannels |
| fuse_layers = [] |
| for i in range(num_branches if self.multi_scale_output else 1): |
| fuse_layer = [] |
| for j in range(num_branches): |
| if j > i: |
| fuse_layer.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[i], 1, 1, 0, bias=False), BatchNorm2d(num_inchannels[i], momentum=_v0))) |
| elif j == i: |
| fuse_layer.append(None) |
| else: |
| conv3x3s = [] |
| for k in range(i - j): |
| if k == i - j - 1: |
| conv3x3s.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[i], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[i], momentum=_v0))) |
| else: |
| conv3x3s.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[j], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[j], momentum=_v0), nn.ReLU(inplace=True))) |
| fuse_layer.append(nn.Sequential(*conv3x3s)) |
| fuse_layers.append(nn.ModuleList(fuse_layer)) |
| return nn.ModuleList(fuse_layers) |
|
|
| def get_num_inchannels(self) -> list: |
| return self.num_inchannels |
|
|
| def forward(self, x: list) -> list: |
| if self.num_branches == 1: |
| return [self.branches[0](x[0])] |
| for i in range(self.num_branches): |
| x[i] = self.branches[i](x[i]) |
| x_fuse = [] |
| for i in range(len(self.fuse_layers)): |
| y = x[0] if i == 0 else self.fuse_layers[i][0](x[0]) |
| for j in range(1, self.num_branches): |
| if i == j: |
| y = y + x[j] |
| elif j > i: |
| y = y + F.interpolate(self.fuse_layers[i][j](x[j]), size=[x[i].shape[2], x[i].shape[3]], mode="bilinear") |
| else: |
| y = y + self.fuse_layers[i][j](x[j]) |
| x_fuse.append(self.relu(y)) |
| return x_fuse |
|
|
|
|
| class _H1(nn.Module): |
| def __init__(self, config: dict, lines: bool = False, **kwargs: Any) -> None: |
| self.inplanes = 64 |
| self.lines = lines |
| extra = config["MODEL"]["EXTRA"] |
| super().__init__() |
| self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=2, padding=1, bias=False) |
| self.bn1 = BatchNorm2d(self.inplanes, momentum=_v0) |
| self.conv2 = nn.Conv2d(self.inplanes, self.inplanes, kernel_size=3, stride=2, padding=1, bias=False) |
| self.bn2 = BatchNorm2d(self.inplanes, momentum=_v0) |
| self.relu = nn.ReLU(inplace=True) |
| self.layer1 = self._make_layer(_B1, 64, 64, 4) |
| self.stage2_cfg = extra["STAGE2"] |
| num_channels = [extra["STAGE2"]["NUM_CHANNELS"][i] * _d0[extra["STAGE2"]["BLOCK"]].expansion for i in range(len(extra["STAGE2"]["NUM_CHANNELS"]))] |
| self.transition1 = self._make_transition_layer([256], num_channels) |
| self.stage2, pre_stage_channels = self._make_stage(self.stage2_cfg, num_channels) |
| self.stage3_cfg = extra["STAGE3"] |
| num_channels = [extra["STAGE3"]["NUM_CHANNELS"][i] * _d0[extra["STAGE3"]["BLOCK"]].expansion for i in range(len(extra["STAGE3"]["NUM_CHANNELS"]))] |
| self.transition2 = self._make_transition_layer(pre_stage_channels, num_channels) |
| self.stage3, pre_stage_channels = self._make_stage(self.stage3_cfg, num_channels) |
| self.stage4_cfg = extra["STAGE4"] |
| num_channels = [extra["STAGE4"]["NUM_CHANNELS"][i] * _d0[extra["STAGE4"]["BLOCK"]].expansion for i in range(len(extra["STAGE4"]["NUM_CHANNELS"]))] |
| self.transition3 = self._make_transition_layer(pre_stage_channels, num_channels) |
| self.stage4, pre_stage_channels = self._make_stage(self.stage4_cfg, num_channels, multi_scale_output=True) |
| self.upsample = nn.Upsample(scale_factor=2, mode="nearest") |
| final_inp_channels = sum(pre_stage_channels) + self.inplanes |
| self.head = nn.Sequential( |
| nn.Conv2d(final_inp_channels, final_inp_channels, kernel_size=1), |
| BatchNorm2d(final_inp_channels, momentum=_v0), |
| nn.ReLU(inplace=True), |
| nn.Conv2d(final_inp_channels, config["MODEL"]["NUM_JOINTS"], kernel_size=extra["FINAL_CONV_KERNEL"]), |
| nn.Softmax(dim=1) if not self.lines else nn.Sigmoid(), |
| ) |
|
|
| def _make_head(self, x: torch.Tensor, x_skip: torch.Tensor) -> torch.Tensor: |
| x = self.upsample(x) |
| x = torch.cat([x, x_skip], dim=1) |
| return self.head(x) |
|
|
| def _make_transition_layer(self, num_channels_pre_layer: list, num_channels_cur_layer: list) -> nn.ModuleList: |
| num_branches_cur = len(num_channels_cur_layer) |
| num_branches_pre = len(num_channels_pre_layer) |
| transition_layers = [] |
| for i in range(num_branches_cur): |
| if i < num_branches_pre: |
| if num_channels_cur_layer[i] != num_channels_pre_layer[i]: |
| transition_layers.append(nn.Sequential( |
| nn.Conv2d(num_channels_pre_layer[i], num_channels_cur_layer[i], 3, 1, 1, bias=False), |
| BatchNorm2d(num_channels_cur_layer[i], momentum=_v0), |
| nn.ReLU(inplace=True), |
| )) |
| else: |
| transition_layers.append(None) |
| else: |
| conv3x3s = [] |
| for j in range(i + 1 - num_branches_pre): |
| inchannels = num_channels_pre_layer[-1] |
| outchannels = num_channels_cur_layer[i] if j == i - num_branches_pre else inchannels |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(inchannels, outchannels, 3, 2, 1, bias=False), |
| BatchNorm2d(outchannels, momentum=_v0), |
| nn.ReLU(inplace=True), |
| )) |
| transition_layers.append(nn.Sequential(*conv3x3s)) |
| return nn.ModuleList(transition_layers) |
|
|
| def _make_layer(self, block: type, inplanes: int, planes: int, blocks: int, stride: int = 1) -> nn.Sequential: |
| downsample = None |
| if stride != 1 or inplanes != planes * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(planes * block.expansion, momentum=_v0), |
| ) |
| layers = [block(inplanes, planes, stride, downsample)] |
| inplanes = planes * block.expansion |
| for _ in range(1, blocks): |
| layers.append(block(inplanes, planes)) |
| return nn.Sequential(*layers) |
|
|
| def _make_stage(self, layer_config: dict, num_inchannels: list, multi_scale_output: bool = True) -> tuple: |
| num_modules = layer_config["NUM_MODULES"] |
| num_blocks = layer_config["NUM_BLOCKS"] |
| num_channels = layer_config["NUM_CHANNELS"] |
| block = _d0[layer_config["BLOCK"]] |
| fuse_method = layer_config["FUSE_METHOD"] |
| modules = [] |
| for i in range(num_modules): |
| reset_multi_scale_output = False if (not multi_scale_output and i == num_modules - 1) else True |
| modules.append(_H0( |
| layer_config["NUM_BRANCHES"], block, num_blocks, num_inchannels, num_channels, |
| fuse_method, reset_multi_scale_output, |
| )) |
| num_inchannels = modules[-1].get_num_inchannels() |
| return nn.Sequential(*modules), num_inchannels |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| x = self.conv1(x) |
| x_skip = x.clone() |
| x = self.bn1(x) |
| x = self.relu(x) |
| x = self.conv2(x) |
| x = self.bn2(x) |
| x = self.relu(x) |
| x = self.layer1(x) |
| x_list = [self.transition1[i](x) if self.transition1[i] is not None else x for i in range(self.stage2_cfg["NUM_BRANCHES"])] |
| y_list = self.stage2(x_list) |
| x_list = [self.transition2[i](y_list[-1]) if self.transition2[i] is not None else y_list[i] for i in range(self.stage3_cfg["NUM_BRANCHES"])] |
| y_list = self.stage3(x_list) |
| x_list = [self.transition3[i](y_list[-1]) if self.transition3[i] is not None else y_list[i] for i in range(self.stage4_cfg["NUM_BRANCHES"])] |
| x = self.stage4(x_list) |
| height, width = x[0].size(2), x[0].size(3) |
| x1 = F.interpolate(x[1], size=(height, width), mode="bilinear", align_corners=False) |
| x2 = F.interpolate(x[2], size=(height, width), mode="bilinear", align_corners=False) |
| x3 = F.interpolate(x[3], size=(height, width), mode="bilinear", align_corners=False) |
| x = torch.cat([x[0], x1, x2, x3], 1) |
| return self._make_head(x, x_skip) |
|
|
| def init_weights(self, pretrained: str = "") -> None: |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") |
| elif isinstance(m, nn.BatchNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| if pretrained and os.path.isfile(pretrained): |
| w = torch.load(pretrained, map_location="cpu", weights_only=False) |
| self.load_state_dict({k: v for k, v in w.items() if k in self.state_dict()}, strict=False) |
|
|
|
|
| def _g0(config: dict, pretrained: str = "", **kwargs: Any) -> _H1: |
| model = _H1(config, **kwargs) |
| model.init_weights(pretrained) |
| return model |
|
|
|
|
| _K0 = { |
| 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, |
| 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, |
| 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, |
| 45: 9, 50: 31, 52: 32, 57: 22, |
| } |
|
|
|
|
| def _p0(frames: list) -> torch.Tensor: |
| target_size = (540, 960) |
| batch = [] |
| for frame in frames: |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| img = cv2.resize(frame_rgb, (target_size[1], target_size[0])) |
| img = img.astype(np.float32) / 255.0 |
| img = np.transpose(img, (2, 0, 1)) |
| batch.append(img) |
| return torch.from_numpy(np.stack(batch)).float() |
|
|
|
|
| def _e0(heatmap: torch.Tensor, scale: int = 2, max_keypoints: int = 1) -> torch.Tensor: |
| batch_size, n_channels, height, width = heatmap.shape |
| max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1) |
| local_maxima = max_pooled == heatmap |
| masked_heatmap = heatmap * local_maxima |
| flat_heatmap = masked_heatmap.view(batch_size, n_channels, -1) |
| scores, indices = torch.topk(flat_heatmap, max_keypoints, dim=-1, sorted=False) |
| y_coords = torch.div(indices, width, rounding_mode="floor") * scale |
| x_coords = (indices % width) * scale |
| return torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1) |
|
|
|
|
| def _p1(kp_coords: torch.Tensor, kp_threshold: float, w: int, h: int, batch_size: int) -> list: |
| kp_np = kp_coords.cpu().numpy() |
| batch_results = [] |
| for batch_idx in range(batch_size): |
| kp_dict = {} |
| valid_kps = kp_np[batch_idx, :, 0, 2] > kp_threshold |
| for ch_idx in np.where(valid_kps)[0]: |
| x = float(kp_np[batch_idx, ch_idx, 0, 0]) / w |
| y = float(kp_np[batch_idx, ch_idx, 0, 1]) / h |
| p = float(kp_np[batch_idx, ch_idx, 0, 2]) |
| kp_dict[int(ch_idx) + 1] = {"x": x, "y": y, "p": p} |
| batch_results.append(kp_dict) |
| return batch_results |
|
|
|
|
| def _g1(kp_points: dict) -> dict: |
| return {_K0[k]: v for k, v in kp_points.items() if k in _K0} |
|
|
|
|
| def _i0(frames: list, model: nn.Module, kp_threshold: float, device: str, batch_size: int = 2) -> list: |
| results = [] |
| model_device = next(model.parameters()).device |
| for i in range(0, len(frames), batch_size): |
| current_batch_size = min(batch_size, len(frames) - i) |
| batch_frames = frames[i : i + current_batch_size] |
| batch = _p0(batch_frames).to(model_device) |
| with torch.no_grad(): |
| heatmaps = model(batch) |
| kp_coords = _e0(heatmaps[:, :-1, :, :], scale=2, max_keypoints=1) |
| batch_results = _p1(kp_coords, kp_threshold, 960, 540, current_batch_size) |
| results.extend([_g1(kp) for kp in batch_results]) |
| del heatmaps, kp_coords, batch |
| gc.collect() |
| if model_device.type == "cuda": |
| torch.cuda.empty_cache() |
| return results |
|
|
|
|
| def _x0(frames: list, model: nn.Module, kp_threshold: float, device: str = "cpu", batch_size: int = 2) -> list: |
| return _i0(frames, model, kp_threshold, device, batch_size) |
|
|
|
|
| def _n0(keypoints_result: list | None, batch_images: list, n_keypoints: int) -> list: |
| keypoints = [] |
| if keypoints_result is not None and len(keypoints_result) > 0: |
| for frame_number_in_batch, kp_dict in enumerate(keypoints_result): |
| if frame_number_in_batch >= len(batch_images): |
| break |
| frame_keypoints: List[Tuple[int, int]] = [] |
| try: |
| height, width = batch_images[frame_number_in_batch].shape[:2] |
| if kp_dict is not None and isinstance(kp_dict, dict): |
| for idx in range(32): |
| x, y, p = 0, 0, 0 |
| kp_idx = idx + 1 |
| if kp_idx in kp_dict: |
| try: |
| kp_data = kp_dict[kp_idx] |
| if isinstance(kp_data, dict) and "x" in kp_data and "y" in kp_data: |
| x = int(kp_data["x"] * width) |
| y = int(kp_data["y"] * height) |
| except Exception as e: |
| pass |
| frame_keypoints.append((x, y)) |
| except (IndexError, ValueError, AttributeError): |
| frame_keypoints = [(0, 0)] * 32 |
| if len(frame_keypoints) < n_keypoints: |
| frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints))) |
| else: |
| frame_keypoints = frame_keypoints[:n_keypoints] |
| keypoints.append(frame_keypoints) |
| return keypoints |
|
|
|
|
| def _f1(frame_keypoints: list, n_keypoints: int) -> list: |
| if len(frame_keypoints) < n_keypoints: |
| frame_keypoints = list(frame_keypoints) + [(0, 0)] * (n_keypoints - len(frame_keypoints)) |
| elif len(frame_keypoints) > n_keypoints: |
| frame_keypoints = list(frame_keypoints)[:n_keypoints] |
| else: |
| frame_keypoints = list(frame_keypoints) |
| if frame_keypoints[2] != (0, 0) and frame_keypoints[4] != (0, 0) and frame_keypoints[3] == (0, 0): |
| frame_keypoints[3], frame_keypoints[4] = frame_keypoints[4], (0, 0) |
| if frame_keypoints[0] != (0, 0) and frame_keypoints[4] != (0, 0) and frame_keypoints[1] == (0, 0): |
| frame_keypoints[1], frame_keypoints[4] = frame_keypoints[4], (0, 0) |
| if frame_keypoints[2] != (0, 0) and frame_keypoints[3] != (0, 0) and frame_keypoints[1] == (0, 0) and frame_keypoints[3][0] > frame_keypoints[2][0]: |
| frame_keypoints[1], frame_keypoints[3] = frame_keypoints[3], (0, 0) |
| if frame_keypoints[28] != (0, 0) and frame_keypoints[25] == (0, 0) and frame_keypoints[26] != (0, 0) and frame_keypoints[26][0] > frame_keypoints[28][0]: |
| frame_keypoints[25], frame_keypoints[28] = frame_keypoints[28], (0, 0) |
| if frame_keypoints[24] != (0, 0) and frame_keypoints[28] != (0, 0) and frame_keypoints[25] == (0, 0): |
| frame_keypoints[25], frame_keypoints[28] = frame_keypoints[28], (0, 0) |
| if frame_keypoints[24] != (0, 0) and frame_keypoints[27] != (0, 0) and frame_keypoints[26] == (0, 0): |
| frame_keypoints[26], frame_keypoints[27] = frame_keypoints[27], (0, 0) |
| if frame_keypoints[28] != (0, 0) and frame_keypoints[23] == (0, 0) and frame_keypoints[20] != (0, 0) and frame_keypoints[20][1] > frame_keypoints[23][1]: |
| frame_keypoints[23], frame_keypoints[20] = frame_keypoints[20], (0, 0) |
| return frame_keypoints |
|
|
|
|
| def _c1(keypoints: list) -> list: |
| return [[round(float(x), 1), round(float(y), 1)] for x, y in keypoints] |
|
|
|
|
| def _l0(model_dir: Path, device: str | None = None, config_name: str = "hrnetv2_w48.yaml", weights_subdir: str | None = None) -> nn.Module: |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| config_path = model_dir / config_name |
| weights_path = (model_dir / weights_subdir / "keypoint") if weights_subdir else (model_dir / "keypoint") |
| if not config_path.exists(): |
| raise FileNotFoundError(f"Keypoint config not found: {config_path}") |
| if not weights_path.exists(): |
| raise FileNotFoundError(f"Keypoint weights not found: {weights_path}") |
| with open(config_path) as f: |
| cfg = yaml.safe_load(f) |
| loaded = torch.load(weights_path, map_location=device, weights_only=False) |
| state = loaded.get("state_dict", loaded) if isinstance(loaded, dict) else loaded |
| if not isinstance(state, dict): |
| raise ValueError(f"Keypoint weights must be state_dict or dict with 'state_dict'; got {type(state)}") |
| if state and next(iter(state.keys()), "").startswith("module."): |
| state = {k.replace("module.", "", 1): v for k, v in state.items()} |
| def _remap_head(k: str) -> str: |
| if k.startswith("head.0."): |
| return "head." + k[7:] |
| return k |
| state = {_remap_head(k): v for k, v in state.items()} |
| model = _g0(cfg) |
| model.load_state_dict(state, strict=True) |
| model.to(device) |
| model.eval() |
| return model |
|
|
| _C0 = 0 |
| _C1 = 1 |
| _C2 = 2 |
| _C3 = 3 |
|
|
| _D0 = 1280 |
| _D1 = 0.4 |
| _T0 = 0.5 |
| _R0 = 5 |
| _R1 = 0.10 |
| _R2 = 0.70 |
| _R3 = 8 |
| kp_batch_size = 2 |
| onnx_batch_size = 8 |
|
|
| _q0 = 0.006719 |
| _q1 = 0.010711 |
|
|
| _P0 = True |
|
|
| _E0: bool = True |
| _E1: bool = True |
|
|
| _A0: bool = True |
| _S0 = 4 |
|
|
| _F0: list[tuple[float, float]] = [ |
| (5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), (55, 250), (55, 430), |
| (110, 340), (165, 140), (165, 270), (165, 410), (165, 540), (527, 5), (527, 253), |
| (527, 433), (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), (940, 340), |
| (998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540), |
| (1045, 675), (435, 340), (615, 340), |
| ] |
| _F1: list[tuple[float, float]] = [ |
| (2.5, 2.5), (2.5, 139.5), (2.5, 249.5), (2.5, 430.5), (2.5, 540.5), (2.5, 678), |
| (54.5, 249.5), (54.5, 430.5), (110.5, 340.5), (164.5, 139.5), (164.5, 269), (164.5, 411), |
| (164.5, 540.5), (525, 2.5), (525, 249.5), (525, 430.5), (525, 678), (886.5, 139.5), |
| (886.5, 269), (886.5, 411), (886.5, 540.5), (940.5, 340.5), (998, 249.5), (998, 430.5), |
| (1048, 2.5), (1048, 139.5), (1048, 249.5), (1048, 430.5), (1048, 540.5), (1048, 678), |
| (434.5, 340), (615.5, 340), |
| ] |
| _S1 = True |
|
|
|
|
| class _Bx(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
| team_id: str | None = None |
|
|
|
|
| class _FRes(BaseModel): |
| frame_id: int |
| boxes: list[dict] |
| keypoints: list[list[float]] |
|
|
|
|
| class _Cfg: |
| def __init__(self, min_area: int = 1300, overlap_iou: float = 0.91): |
| self.overlap_iou = overlap_iou |
|
|
|
|
| def _d1(bb: _Bx, cy: float) -> float: |
| my = 0.5 * (float(bb.y1) + float(bb.y2)) |
| return (my - cy) ** 2 |
|
|
|
|
| def _i1(a: _Bx, b: _Bx) -> float: |
| ax1, ay1, ax2, ay2 = int(a.x1), int(a.y1), int(a.x2), int(a.y2) |
| bx1, by1, bx2, by2 = int(b.x1), int(b.y1), int(b.x2), int(b.y2) |
| ix1, iy1 = max(ax1, bx1), max(ay1, by1) |
| ix2, iy2 = min(ax2, bx2), min(ay2, by2) |
| iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) |
| inter = iw * ih |
| if inter <= 0: |
| return 0.0 |
| area_a = (ax2 - ax1) * (ay2 - ay1) |
| area_b = (bx2 - bx1) * (by2 - by1) |
| union = area_a + area_b - inter |
| return inter / union if union > 0 else 0.0 |
|
|
|
|
|
|
|
|
| def _s0( |
| results: list[_FRes], |
| window: int = _S0, |
| tids_by_frame: dict[int, list[int | None]] | None = None, |
| ) -> list[_FRes]: |
| if window <= 1 or not results: |
| return results |
| fid_to_idx = {r.frame_id: i for i, r in enumerate(results)} |
| trajectories: dict[int, list[tuple[int, int, _Bx]]] = {} |
| for i, r in enumerate(results): |
| boxes_as_bx = [_Bx(**b) if isinstance(b, dict) else b for b in r.boxes] |
| for j, bb in enumerate(boxes_as_bx): |
| tid = tids_by_frame.get(r.frame_id, [None] * len(r.boxes))[j] if tids_by_frame else None |
| if tid is not None and tid >= 0: |
| tid = int(tid) |
| if tid not in trajectories: |
| trajectories[tid] = [] |
| trajectories[tid].append((r.frame_id, j, bb)) |
| smoothed: dict[tuple[int, int], tuple[int, int, int, int]] = {} |
| half = window // 2 |
| for tid, items in trajectories.items(): |
| items.sort(key=lambda x: x[0]) |
| n = len(items) |
| for k in range(n): |
| fid, box_idx, bb = items[k] |
| result_idx = fid_to_idx[fid] |
| lo = max(0, k - half) |
| hi = min(n, k + half + 1) |
| cx_list = [] |
| cy_list = [] |
| w_list = [] |
| h_list = [] |
| for m in range(lo, hi): |
| b = items[m][2] |
| cx_list.append(0.5 * (b.x1 + b.x2)) |
| cy_list.append(0.5 * (b.y1 + b.y2)) |
| w_list.append(b.x2 - b.x1) |
| h_list.append(b.y2 - b.y1) |
| cx_avg = sum(cx_list) / len(cx_list) |
| cy_avg = sum(cy_list) / len(cy_list) |
| w_avg = sum(w_list) / len(w_list) |
| h_avg = sum(h_list) / len(h_list) |
| x1_new = int(round(cx_avg - w_avg / 2)) |
| y1_new = int(round(cy_avg - h_avg / 2)) |
| x2_new = int(round(cx_avg + w_avg / 2)) |
| y2_new = int(round(cy_avg + h_avg / 2)) |
| smoothed[(result_idx, box_idx)] = (x1_new, y1_new, x2_new, y2_new) |
| out: list[_FRes] = [] |
| for i, r in enumerate(results): |
| boxes_as_bx = [_Bx(**b) if isinstance(b, dict) else b for b in r.boxes] |
| new_boxes: list[_Bx] = [] |
| for j, bb in enumerate(boxes_as_bx): |
| key = (i, j) |
| if key in smoothed: |
| x1, y1, x2, y2 = smoothed[key] |
| new_boxes.append( |
| _Bx( |
| x1=x1, |
| y1=y1, |
| x2=x2, |
| y2=y2, |
| cls_id=int(bb.cls_id), |
| conf=float(bb.conf), |
| team_id=bb.team_id, |
| ) |
| ) |
| else: |
| new_boxes.append( |
| _Bx( |
| x1=int(bb.x1), |
| y1=int(bb.y1), |
| x2=int(bb.x2), |
| y2=int(bb.y2), |
| cls_id=int(bb.cls_id), |
| conf=float(bb.conf), |
| team_id=bb.team_id, |
| ) |
| ) |
| out.append(_FRes(frame_id=r.frame_id, boxes=[{"x1": b.x1, "y1": b.y1, "x2": b.x2, "y2": b.y2, "cls_id": b.cls_id, "conf": b.conf, "team_id": b.team_id} for b in new_boxes], keypoints=r.keypoints)) |
| return out |
|
|
|
|
| def _a0( |
| bboxes: Iterable[_Bx], |
| *, |
| frame_width: int, |
| frame_height: int, |
| cfg: _Cfg | None = None, |
| do_goalkeeper_dedup: bool = True, |
| do_referee_disambiguation: bool = True, |
| ) -> list[_Bx]: |
| cfg = cfg or _Cfg() |
| W, H = int(frame_width), int(frame_height) |
| cy = 0.5 * float(H) |
| kept: list[_Bx] = list(bboxes or []) |
| if cfg.overlap_iou > 0 and len(kept) > 1: |
| balls = [bb for bb in kept if int(bb.cls_id) == _C0] |
| non_balls = [bb for bb in kept if int(bb.cls_id) != _C0] |
| if len(non_balls) > 1: |
| non_balls_sorted = sorted(non_balls, key=lambda bb: float(bb.conf), reverse=True) |
| kept_nb = [] |
| for cand in non_balls_sorted: |
| skip = False |
| for k in kept_nb: |
| iou = _i1(cand, k) |
| if iou >= cfg.overlap_iou: |
| skip = True |
| break |
| if ( |
| abs(int(cand.x1) - int(k.x1)) <= 3 |
| and abs(int(cand.y1) - int(k.y1)) <= 3 |
| and abs(int(cand.x2) - int(k.x2)) <= 3 |
| and abs(int(cand.y2) - int(k.y2)) <= 3 |
| and iou > 0.85 |
| ): |
| skip = True |
| break |
| if not skip: |
| kept_nb.append(cand) |
| kept = kept_nb + balls |
| if do_goalkeeper_dedup: |
| gks = [bb for bb in kept if int(bb.cls_id) == _C1] |
| if len(gks) > 1: |
| best_gk = max(gks, key=lambda bb: float(bb.conf)) |
| best_gk_conf = float(best_gk.conf) |
| deduped = [] |
| for bb in kept: |
| if int(bb.cls_id) == _C1: |
| if float(bb.conf) < best_gk_conf or (float(bb.conf) == best_gk_conf and bb is not best_gk): |
| deduped.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=_C2, conf=float(bb.conf), team_id="1")) |
| else: |
| deduped.append(bb) |
| else: |
| deduped.append(bb) |
| kept = deduped |
| if do_referee_disambiguation: |
| refs = [bb for bb in kept if int(bb.cls_id) == _C3] |
| if len(refs) > 1: |
| best_ref = min(refs, key=lambda bb: _d1(bb, cy)) |
| kept = [bb for bb in kept if int(bb.cls_id) != _C3 or bb is best_ref] |
| return kept |
|
|
|
|
| def _k0(feats: np.ndarray, iters: int = 20) -> tuple[np.ndarray, np.ndarray]: |
| n, d = feats.shape |
| if n <= 0: |
| return np.zeros((2, d), dtype=np.float32), np.zeros(0, dtype=np.int64) |
| if n == 1: |
| return np.stack([feats[0], feats[0]], axis=0), np.zeros(1, dtype=np.int64) |
| c0 = feats[0] |
| d0 = np.linalg.norm(feats - c0[None, :], axis=1) |
| c1 = feats[int(np.argmax(d0))] |
| d1 = np.linalg.norm(feats - c1[None, :], axis=1) |
| c0 = feats[int(np.argmax(d1))] |
| centroids = np.stack([c0, c1], axis=0).astype(np.float32) |
| labels = np.zeros(n, dtype=np.int64) |
| for _ in range(iters): |
| dist = ((feats[:, None, :] - centroids[None, :, :]) ** 2).sum(axis=2) |
| labels = dist.argmin(axis=1) |
| for k in (0, 1): |
| sel = feats[labels == k] |
| if len(sel) > 0: |
| centroids[k] = sel.mean(axis=0) |
| return centroids, labels |
|
|
|
|
| def _m0(prev: np.ndarray, new: np.ndarray) -> np.ndarray: |
| d00 = np.sum((prev[0] - new[0]) ** 2) |
| d11 = np.sum((prev[1] - new[1]) ** 2) |
| d01 = np.sum((prev[0] - new[1]) ** 2) |
| d10 = np.sum((prev[1] - new[0]) ** 2) |
| if d00 + d11 <= d01 + d10: |
| return new |
| return np.stack([new[1], new[0]], axis=0) |
|
|
|
|
| try: |
| import onnxruntime as _ort |
| _HAS_ONNXRUNTIME = True |
| except ImportError: |
| _HAS_ONNXRUNTIME = False |
|
|
|
|
| class _ReidE: |
| def __init__(self, onnx_path: Path, input_height: int = 256, input_width: int = 128): |
| if not _HAS_ONNXRUNTIME: |
| raise RuntimeError("onnxruntime required for ReID; pip install onnxruntime") |
| self.session = _ort.InferenceSession(str(onnx_path), |
| providers = [ |
| ( |
| "CUDAExecutionProvider", |
| { |
| "device_id": 0, |
| "cudnn_conv_algo_search": "HEURISTIC", |
| "enable_cuda_graph": False, |
| }, |
| ) |
| ] |
| ) |
| print("Active providers:", self.session.get_providers()) |
| self.input_height = int(input_height) |
| self.input_width = int(input_width) |
| self._input_name = self.session.get_inputs()[0].name |
| |
| dummy_input = np.zeros((_R3, 3, self.input_height, self.input_width), dtype=np.float32) |
| try: |
| self.session.run(None, {self._input_name: dummy_input}) |
| except Exception as e: |
| pass |
|
|
| def extract( |
| self, |
| frame_bgr: np.ndarray, |
| xyxy: tuple[int, int, int, int], |
| timings: Optional[dict[str, float]] = None, |
| ) -> Optional[np.ndarray]: |
| x1, y1, x2, y2 = map(int, xyxy) |
| t0 = time.perf_counter() |
| H, W = frame_bgr.shape[:2] |
| x1, y1 = max(0, x1), max(0, y1) |
| x2, y2 = min(W, x2), min(H, y2) |
| if x2 <= x1 or y2 <= y1: |
| if timings is not None: |
| timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (time.perf_counter() - t0) |
| return None |
| bh, bw = y2 - y1, x2 - x1 |
| if bh > bw and bw > 0: |
| y1_crop = y1 + int(round(_R1 * bh)) |
| y2_crop = y1 + int(round(_R2 * bh)) |
| y1 = min(y1_crop, y2 - 1) |
| y2 = max(y2_crop, y1 + 1) |
| if y2 <= y1: |
| if timings is not None: |
| timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (time.perf_counter() - t0) |
| return None |
| crop = frame_bgr[y1:y2, x1:x2] |
| if crop.size == 0: |
| if timings is not None: |
| timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (time.perf_counter() - t0) |
| return None |
| rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) |
| resized = cv2.resize(rgb, (self.input_width, self.input_height), interpolation=cv2.INTER_LINEAR) |
| blob = resized.transpose(2, 0, 1).astype(np.float32) / 255.0 |
| mean = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(3, 1, 1) |
| std = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(3, 1, 1) |
| blob = (blob - mean) / std |
| blob = blob[np.newaxis, ...].astype(np.float32) |
| t1 = time.perf_counter() |
| if timings is not None: |
| timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (t1 - t0) |
| try: |
| t2 = time.perf_counter() |
| out = self.session.run(None, {self._input_name: blob})[0] |
| t3 = time.perf_counter() |
| if timings is not None: |
| timings["onnx_inference"] = timings.get("onnx_inference", 0.0) + (t3 - t2) |
| except Exception: |
| if timings is not None: |
| timings["onnx_inference"] = timings.get("onnx_inference", 0.0) |
| return None |
| t4 = time.perf_counter() |
| if out is None or out.size == 0: |
| if timings is not None: |
| timings["postprocess_normalize"] = timings.get("postprocess_normalize", 0.0) + (time.perf_counter() - t4) |
| return None |
| emb = out.flatten().astype(np.float32) |
| n = float(np.linalg.norm(emb)) |
| if n < 1e-6: |
| if timings is not None: |
| timings["postprocess_normalize"] = timings.get("postprocess_normalize", 0.0) + (time.perf_counter() - t4) |
| return None |
| if timings is not None: |
| timings["postprocess_normalize"] = timings.get("postprocess_normalize", 0.0) + (time.perf_counter() - t4) |
| return emb / n |
|
|
| def extract_batch( |
| self, |
| frame_xyxy_list: list[tuple[ndarray, tuple[int, int, int, int]]], |
| batch_size: int = _R3, |
| timings: Optional[dict[str, float]] = None, |
| ) -> list[Optional[np.ndarray]]: |
| if not frame_xyxy_list: |
| return [] |
| n = len(frame_xyxy_list) |
| out: list[Optional[np.ndarray]] = [None] * n |
| t_pre = time.perf_counter() |
| blobs: list[tuple[int, np.ndarray]] = [] |
| for idx, (frame_bgr, xyxy) in enumerate(frame_xyxy_list): |
| x1, y1, x2, y2 = map(int, xyxy) |
| H, W = frame_bgr.shape[:2] |
| x1, y1 = max(0, x1), max(0, y1) |
| x2, y2 = min(W, x2), min(H, y2) |
| if x2 <= x1 or y2 <= y1: |
| continue |
| bh, bw = y2 - y1, x2 - x1 |
| if bh > bw and bw > 0: |
| y1_crop = y1 + int(round(_R1 * bh)) |
| y2_crop = y1 + int(round(_R2 * bh)) |
| y1 = min(y1_crop, y2 - 1) |
| y2 = max(y2_crop, y1 + 1) |
| if y2 <= y1: |
| continue |
| crop = frame_bgr[y1:y2, x1:x2] |
| if crop.size == 0: |
| continue |
| rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) |
| resized = cv2.resize(rgb, (self.input_width, self.input_height), interpolation=cv2.INTER_LINEAR) |
| blob = resized.transpose(2, 0, 1).astype(np.float32) / 255.0 |
| mean = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(3, 1, 1) |
| std = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(3, 1, 1) |
| blob = (blob - mean) / std |
| blob = blob[np.newaxis, ...].astype(np.float32) |
| blobs.append((idx, blob)) |
| if timings is not None: |
| timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (time.perf_counter() - t_pre) |
| if not blobs: |
| return out |
| t_infer_start = time.perf_counter() |
| batch_size = max(1, min(batch_size, len(blobs))) |
| |
|
|
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| torch.cuda.synchronize() |
|
|
| for start in range(0, len(blobs), batch_size): |
| chunk = blobs[start : start + batch_size] |
| indices = [c[0] for c in chunk] |
| batch_blob = np.concatenate([c[1] for c in chunk], axis=0) |
| |
| |
| actual_batch_size = batch_blob.shape[0] |
| if actual_batch_size < batch_size: |
| pad_width = batch_size - actual_batch_size |
| padding = np.zeros((pad_width, *batch_blob.shape[1:]), dtype=batch_blob.dtype) |
| batch_blob = np.concatenate([batch_blob, padding], axis=0) |
| |
| |
| try: |
| run_out = self.session.run(None, {self._input_name: batch_blob})[0] |
| run_out = run_out[:actual_batch_size] |
| except Exception: |
| continue |
| |
| t_post_start = time.perf_counter() |
| if timings is not None: |
| timings["onnx_inference"] = timings.get("onnx_inference", 0.0) + (t_post_start - t_infer_start) |
| for i, orig_idx in enumerate(indices): |
| if i >= run_out.shape[0]: |
| continue |
| emb = run_out[i].astype(np.float32, copy=False) |
| nrm = float(np.linalg.norm(emb)) |
| if nrm >= 1e-6: |
| out[orig_idx] = emb / nrm |
| if timings is not None: |
| timings["postprocess_normalize"] = timings.get("postprocess_normalize", 0.0) + (time.perf_counter() - t_post_start) |
| t_infer_start = time.perf_counter() |
| |
| return out |
|
|
|
|
| class _ReidT: |
| def __init__(self, centroids: Optional[np.ndarray] = None, ema_alpha: float = 0.8): |
| self.centroids = centroids |
| self.ema_alpha = float(ema_alpha) |
|
|
| def assign(self, feats: list, use_for_centroid: Optional[list] = None) -> list[int]: |
| all_valid = [f for f in feats if f is not None] |
| valid = all_valid |
| if use_for_centroid is not None and len(use_for_centroid) == len(feats): |
| valid = [f for i, f in enumerate(feats) if f is not None and use_for_centroid[i]] |
| if self.centroids is None and len(valid) < 2 and len(all_valid) >= 2: |
| valid = all_valid |
| min_required = 2 if self.centroids is None else max(2, _R0) |
| if len(valid) >= min_required: |
| X = np.stack(valid, axis=0).astype(np.float32) |
| c_new, _labels = _k0(X, iters=20) |
| frac0 = float((_labels == 0).mean()) |
| frac1 = float((_labels == 1).mean()) |
| sep = float(np.linalg.norm(c_new[0] - c_new[1])) if len(c_new) == 2 else 0.0 |
| do_update = (min(frac0, frac1) >= 0.15 and sep >= 0.05) or (self.centroids is None and min(frac0, frac1) >= 0.10) |
| if do_update: |
| if self.centroids is None: |
| self.centroids = c_new.copy() |
| else: |
| c_new = _m0(self.centroids, c_new) |
| a = self.ema_alpha |
| self.centroids = a * self.centroids + (1.0 - a) * c_new |
| out = [] |
| for f in feats: |
| if f is None or self.centroids is None: |
| out.append(1) |
| continue |
| d0 = float(np.sum((f - self.centroids[0]) ** 2)) |
| d1 = float(np.sum((f - self.centroids[1]) ** 2)) |
| out.append(1 if d0 <= d1 else 2) |
| return out |
|
|
|
|
| def challenge_template(path_hf_repo) -> ndarray: |
| return imread(f"{path_hf_repo}/football_pitch_template.png") |
|
|
| current_path = str(os.path.dirname(os.path.abspath(__file__))) |
| template_image = challenge_template(current_path) |
| template_image_gray = cvtColor(template_image, COLOR_BGR2GRAY) |
| _sparse_template_cache: dict[tuple[int, int], list[tuple[int, int]]] = {} |
| _shared_eval_executor: ThreadPoolExecutor | None = None |
|
|
| class MaxSizeCache(OrderedDict): |
| """ |
| Fixed-size dictionary behaving like a deque(maxlen=N). |
| Stores key–value pairs with FIFO eviction. |
| """ |
|
|
| def __init__(self, maxlen=500): |
| super().__init__() |
| self.maxlen = maxlen |
| self._lock = threading.Lock() |
|
|
| def set(self, key, value): |
| """Insert or update an item. Evicts oldest if full.""" |
| with self._lock: |
| if key in self: |
| del self[key] |
| super().__setitem__(key, value) |
|
|
| if len(self) > self.maxlen: |
| self.popitem(last=False) |
|
|
| def get(self, key, default=None): |
| """Retrieve an item without changing order.""" |
| with self._lock: |
| return super().get(key, default) |
|
|
| def exists(self, key): |
| """Check if a key exists.""" |
| with self._lock: |
| return key in self |
|
|
| def load(self, data_dict): |
| """ |
| Load initial data into cache. |
| Oldest items evicted if data exceeds maxlen. |
| """ |
| for k, v in data_dict.items(): |
| self.set(k, v) |
|
|
| def __repr__(self): |
| return f"MaxSizeCache(maxlen={self.maxlen}, data={dict(self)})" |
| cached = MaxSizeCache() |
| _per_key_locks = defaultdict(threading.Lock) |
|
|
| def get_or_compute_masks(key, compute_fn): |
| lock = _per_key_locks[key] |
| with lock: |
| if cached.exists(key): |
| return cached.get(key) |
| |
| masks = compute_fn() |
| cached.set(key, masks) |
| return masks |
| |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT = 5 |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT = 29 |
| INDEX_KEYPOINT_CORNER_TOP_LEFT = 0 |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT = 24 |
|
|
| KEYPOINTS: list[tuple[int, int]] = [ |
| (5, 5), |
| (5, 140), |
| (5, 250), |
| (5, 430), |
| (5, 540), |
| (5, 675), |
| |
| (55, 250), |
| (55, 430), |
| |
| (110, 340), |
| |
| (165, 140), |
| (165, 270), |
| (165, 410), |
| (165, 540), |
| |
| (527, 5), |
| (527, 253), |
| (527, 433), |
| (527, 675), |
| |
| (888, 140), |
| (888, 270), |
| (888, 410), |
| (888, 540), |
| |
| (940, 340), |
| |
| (998, 250), |
| (998, 430), |
| |
| (1045, 5), |
| (1045, 140), |
| (1045, 250), |
| (1045, 430), |
| (1045, 540), |
| (1045, 675), |
| |
| (435, 340), |
| (615, 340), |
| ] |
|
|
| KEYPOINTS_NP = np.asarray(KEYPOINTS, dtype=np.float32) |
|
|
| FOOTBALL_KEYPOINTS: list[tuple[int, int]] = [ |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (527, 283), |
| (527, 403), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (405, 340), |
| (645, 340), |
| ] |
|
|
| FOOTBALL_KEYPOINTS_NP = np.asarray(FOOTBALL_KEYPOINTS, dtype=np.float32) |
|
|
| groups = { |
| 1: [2, 3, 7, 10], |
| 2: [1, 3, 7, 10], |
| 3: [2, 4, 7, 8], |
| 4: [3, 5, 8, 7], |
| 5: [4, 8, 6, 3], |
| 6: [5, 4, 8, 13], |
| 7: [3, 8, 9, 10], |
| 8: [4, 7, 9, 13], |
| 9: [7, 8, 11, 12], |
| 10: [9, 11, 7, 2], |
| 11: [9, 10, 12, 31], |
| 12: [9, 11, 13, 31], |
| 13: [9, 12, 8, 5], |
| 14: [15, 31, 32, 16], |
| 15: [31, 16, 32, 14], |
| 16: [31, 15, 32, 17], |
| 17: [31, 16, 32, 15], |
| 18: [19, 22, 23, 26], |
| 19: [18, 22, 20, 32], |
| 20: [19, 22, 21, 32], |
| 21: [20, 22, 24, 29], |
| 22: [23, 24, 19, 20], |
| 23: [27, 24, 22, 28], |
| 24: [28, 23, 22, 27], |
| 25: [26, 27, 23, 18], |
| 26: [25, 27, 23, 18], |
| 27: [26, 23, 28, 24], |
| 28: [27, 24, 29, 23], |
| 29: [28, 30, 24, 21], |
| 30: [29, 28, 24, 21], |
| 31: [15, 16, 32, 14], |
| 32: [15, 31, 16, 14] |
| } |
|
|
| base_temps = [(0, 0)] * 32 |
|
|
| _TEMPLATE_MAX_X: int = 1045 |
| _TEMPLATE_MAX_Y: int = 675 |
|
|
| |
| GROUPS_ARRAY = [np.asarray(groups[i], dtype=np.int32) - 1 for i in range(1, 33)] |
|
|
| kernel = getStructuringElement(MORPH_RECT, (31, 31)) |
| dilate_kernel = getStructuringElement( |
| MORPH_RECT, (3, 3) |
| ) |
|
|
| class InvalidMask(Exception): |
| pass |
|
|
| def has_a_wide_line(mask: ndarray, max_aspect_ratio: float = 1.0) -> bool: |
| contours, _ = findContours(mask, RETR_EXTERNAL, CHAIN_APPROX_SIMPLE) |
| for cnt in contours: |
| x, y, w, h = boundingRect(cnt) |
| |
| if w == 0 or h == 0: |
| continue |
| aspect_ratio = min(w, h) / max(w, h) |
| if aspect_ratio >= max_aspect_ratio: |
| return True |
| return False |
|
|
| def is_bowtie(points: ndarray) -> bool: |
| def segments_intersect(p1: int, p2: int, q1: int, q2: int) -> bool: |
| def ccw(a: int, b: int, c: int): |
| return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0]) |
|
|
| return (ccw(p1, q1, q2) != ccw(p2, q1, q2)) and ( |
| ccw(p1, p2, q1) != ccw(p1, p2, q2) |
| ) |
|
|
| pts = points.reshape(-1, 2) |
| edges = [(pts[0], pts[1]), (pts[1], pts[2]), (pts[2], pts[3]), (pts[3], pts[0])] |
| return segments_intersect(*edges[0], *edges[2]) or segments_intersect( |
| *edges[1], *edges[3] |
| ) |
|
|
| def validate_mask_lines(mask: ndarray) -> None: |
| |
| nonzero_count = countNonZero(mask) |
| if nonzero_count == 0: |
| raise InvalidMask("No projected lines") |
| if nonzero_count == mask.size: |
| raise InvalidMask("Projected lines cover the entire image surface") |
| |
| if has_a_wide_line(mask=mask): |
| raise InvalidMask("A projected line is too wide") |
|
|
| def validate_mask_ground(mask: ndarray) -> None: |
| num_labels, _ = connectedComponents(mask) |
| num_distinct_regions = num_labels - 1 |
| if num_distinct_regions > 1: |
| raise InvalidMask( |
| f"Projected ground should be a single object, detected {num_distinct_regions}" |
| ) |
| area_covered = mask.sum() / mask.size |
| if area_covered >= 0.9: |
| raise InvalidMask( |
| f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic" |
| ) |
|
|
| def validate_projected_corners( |
| source_keypoints: list[tuple[int, int]], homography_matrix: ndarray |
| ) -> None: |
| |
| corner_indices = np.array([ |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_LEFT |
| ], dtype=np.int32) |
| |
| |
| if isinstance(source_keypoints, np.ndarray): |
| src_corners = source_keypoints[corner_indices] |
| else: |
| src_arr = np.array(source_keypoints, dtype=np.float32) |
| src_corners = src_arr[corner_indices] |
| |
| src_corners = src_corners[None, :, :] |
| warped_corners = perspectiveTransform(src_corners, homography_matrix)[0] |
|
|
| if is_bowtie(warped_corners): |
| raise InvalidMask("Projection twisted!") |
|
|
| def project_image_using_keypoints( |
| image: ndarray, |
| source_keypoints: list[tuple[int, int]], |
| destination_keypoints: list[tuple[int, int]], |
| destination_width: int, |
| destination_height: int, |
| inverse: bool = False, |
| ) -> ndarray: |
| |
| src_arr = np.array(source_keypoints, dtype=np.float32) |
| dst_arr = np.array(destination_keypoints, dtype=np.float32) |
| |
| |
| valid_mask = ~((dst_arr[:, 0] == 0) & (dst_arr[:, 1] == 0)) |
| |
| source_points = src_arr[valid_mask] |
| destination_points = dst_arr[valid_mask] |
|
|
| H, _ = findHomography(source_points, destination_points) |
| if H is None: |
| raise InvalidMask("Homography not found") |
| validate_projected_corners(source_keypoints=source_keypoints, homography_matrix=H) |
| |
| projected_image = warpPerspective(image, H, (destination_width, destination_height)) |
|
|
| return projected_image |
|
|
| def extract_masks_for_ground_and_lines(image: ndarray,) -> tuple[ndarray, ndarray]: |
| """assumes template coloured s.t. ground = gray, lines = white, background = black""" |
| |
| gray = image |
|
|
| _, mask_ground = threshold(gray, 10, 1, THRESH_BINARY) |
|
|
| x, y, w, h = cv2.boundingRect(cv2.findNonZero(mask_ground)) |
| rect_size = w * h |
| area_size = countNonZero(mask_ground) |
| is_rect = area_size == rect_size |
|
|
| if is_rect: |
| raise InvalidMask( |
| f"Projected ground should not be rectangular" |
| ) |
| |
| total_pixels = mask_ground.size |
| ground_nonzero = int(countNonZero(mask_ground)) |
| if ground_nonzero == 0: |
| raise InvalidMask("No projected ground") |
| area_covered = ground_nonzero / float(total_pixels) |
| if area_covered >= 0.9: |
| raise InvalidMask(f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic") |
|
|
| validate_mask_ground(mask=mask_ground) |
|
|
| _, mask_lines = threshold(gray, 200, 1, THRESH_BINARY) |
| validate_mask_lines(mask=mask_lines) |
| return mask_ground, mask_lines |
|
|
|
|
| def get_edge_mask(x, y, W, H, t): |
| """Uses bitmasking instead of sets for speed.""" |
| mask = 0 |
| if x <= t: mask |= 1 |
| if x >= W - t: mask |= 2 |
| if y <= t: mask |= 4 |
| if y >= H - t: mask |= 8 |
| return mask |
|
|
| def both_points_same_direction_fast(A, B, W, H, t=100): |
| mask_a = get_edge_mask(A[0], A[1], W, H, t) |
| if mask_a == 0: return False |
| |
| mask_b = get_edge_mask(B[0], B[1], W, H, t) |
| if mask_b == 0: return False |
| |
| |
| return (mask_a & mask_b) != 0 |
|
|
| def canonical(obj): |
| |
| if isinstance(obj, np.ndarray): |
| return canonical(obj.tolist()) |
|
|
| |
| if isinstance(obj, (list, tuple)): |
| return tuple(canonical(x) for x in obj) |
|
|
| |
| if isinstance(obj, set): |
| return tuple(sorted(canonical(x) for x in obj)) |
|
|
| |
| if isinstance(obj, dict): |
| return tuple((k, canonical(v)) for k, v in sorted(obj.items())) |
|
|
| return obj |
|
|
| def fast_cache_key(frame_keypoints, w, h): |
| |
| |
| if isinstance(frame_keypoints, np.ndarray): |
| if frame_keypoints.dtype == np.int32: |
| arr = frame_keypoints |
| else: |
| arr = frame_keypoints.astype(np.int32) |
| else: |
| arr = np.asarray(frame_keypoints, dtype=np.int32) |
| return (arr.tobytes(), int(w), int(h)) |
|
|
| blacklists = [ |
| [23, 24, 27, 28], |
| [7, 8, 3, 4], |
| [2, 10, 1, 14], |
| [18, 26, 14, 25], |
| [5, 13, 6, 17], |
| [21, 29, 17, 30], |
| [10, 11, 2, 3], |
| [10, 11, 2, 7], |
| [12, 13, 4, 5], |
| [12, 13, 5, 8], |
| [18, 19, 26, 27], |
| [18, 19, 26, 23], |
| [20, 21, 24, 29], |
| [20, 21, 28, 29], |
| [8, 4, 5, 13], |
| [3, 7, 2, 10], |
| [23, 27, 18, 26], |
| [24, 28, 21, 29] |
| ] |
|
|
| prepared_blacklists = [(set(bl), bl[0]-1, bl[1]-1) for bl in blacklists] |
|
|
| def evaluate_keypoints_for_frame( |
| frame_keypoints: list[tuple[int, int]], |
| frame_index, |
| h, |
| w, |
| precomputed_key=None, |
| ) -> float: |
| global cache |
| |
| key = precomputed_key or canonical(frame_keypoints, w, h) |
| template_keypoints = KEYPOINTS |
| floor_markings_template = template_image_gray |
| |
|
|
| try: |
| |
| def compute_masks_for_key(frame_keypoints, w, h): |
| try: |
| non_idxs_set = {i + 1 for i, kpt in enumerate(frame_keypoints) if kpt[0] != 0 or kpt[1] != 0} |
| for bl_set, idx0, idx1 in prepared_blacklists: |
| if non_idxs_set.issubset(bl_set): |
| if both_points_same_direction_fast(frame_keypoints[idx0], frame_keypoints[idx1], w, h): |
| return None, 0, None |
| |
| warped_template = project_image_using_keypoints( |
| image=floor_markings_template, |
| source_keypoints=template_keypoints, |
| destination_keypoints=frame_keypoints, |
| destination_width=w, |
| destination_height=h, |
| ) |
| mask_ground, mask_lines_expected = extract_masks_for_ground_and_lines( |
| image=warped_template |
| ) |
| mask_expected_on_ground = mask_lines_expected |
| |
| ys, xs = np.where(mask_lines_expected == 1) |
|
|
| if len(xs) == 0: |
| bbox = None |
| else: |
| min_x = xs.min() |
| max_x = xs.max() |
| min_y = ys.min() |
| max_y = ys.max() |
| bbox = (min_x, min_y, max_x, max_y) |
| bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) if bbox is not None else 1 |
| frame_area = h * w |
| |
| if (bbox_area / frame_area) < 0.2: |
| return None, 0, None |
| |
| pixels_on_lines = int(countNonZero(mask_expected_on_ground)) |
| return mask_expected_on_ground, pixels_on_lines, mask_ground |
| except Exception as e: |
| return None, 0, None |
| |
| mask_expected_on_ground, pixels_on_lines, mask_ground = get_or_compute_masks( |
| key, lambda: compute_masks_for_key(frame_keypoints, w, h) |
| ) |
| if mask_expected_on_ground is None or pixels_on_lines == 0 or mask_ground is None: |
| return 0.0 |
|
|
| image_edges = check_frame[frame_index] |
|
|
| h, w = mask_expected_on_ground.shape[:2] |
| work_buffer = np.zeros((h, w), dtype=np.uint8) |
| bitwise_and( |
| image_edges, |
| image_edges, |
| dst=work_buffer, |
| mask=mask_ground |
| ) |
| dilate(work_buffer, dilate_kernel, dst=work_buffer, iterations=3) |
| threshold(work_buffer, 0, 255, cv2.THRESH_BINARY, dst=work_buffer) |
| pixels_predicted_count = countNonZero(work_buffer) |
| bitwise_and(work_buffer, mask_expected_on_ground, dst=work_buffer) |
| pixels_overlapping = countNonZero(work_buffer) |
| pixels_rest = pixels_predicted_count - pixels_overlapping |
| total_pixels = pixels_predicted_count + pixels_on_lines - pixels_overlapping |
| if total_pixels > 0 and (pixels_rest / total_pixels) > 0.9: |
| return 0.0 |
| score = pixels_overlapping / (pixels_on_lines + 1e-8) |
| return score |
| except Exception as e: |
| pass |
| return 0.0 |
|
|
| def _generate_sparse_template_keypoints(frame_width: int, frame_height: int) -> list[tuple[int, int]]: |
| key = (int(frame_width), int(frame_height)) |
| if key in _sparse_template_cache: |
| return _sparse_template_cache[key] |
| template_max_x, template_max_y = (1045, 675) |
| sx = float(frame_width) / float(template_max_x if template_max_x != 0 else 1) |
| sy = float(frame_height) / float(template_max_y if template_max_y != 0 else 1) |
| |
| scale_factors = np.array([sx, sy], dtype=np.float32) |
| scaled_np = np.round(FOOTBALL_KEYPOINTS_NP * scale_factors).astype(np.int32) |
| scaled = [(int(x), int(y)) for x, y in scaled_np] |
| _sparse_template_cache[key] = scaled |
| return scaled |
|
|
| def convert_keypoints_to_val_format(keypoints): |
| |
| if not keypoints: |
| return [] |
| arr = np.asarray(keypoints, dtype=np.int32) |
| return [(int(x), int(y)) for x, y in arr] |
|
|
|
|
| def are_collinear(pts, eps=1e-9): |
| pts = np.asarray(pts) |
| if len(pts) < 3: |
| return True |
| a, b, c = pts[:3] |
| area = np.abs(np.cross(b - a, c - a)) |
| return area < eps |
|
|
| def line_to_line_transform(P1, P2, Q1, Q2): |
| """ |
| Compute 2D affine transformation mapping line segment P1P2 -> Q1Q2 |
| Optimized version reducing allocations. |
| |
| Parameters: |
| P1, P2: source points (x, y) |
| Q1, Q2: target points (x, y) |
| |
| Returns: |
| M: 3x3 homogeneous transformation matrix |
| """ |
| P1 = np.asarray(P1, dtype=np.float64) |
| P2 = np.asarray(P2, dtype=np.float64) |
| Q1 = np.asarray(Q1, dtype=np.float64) |
| Q2 = np.asarray(Q2, dtype=np.float64) |
| |
| |
| v_s = P2 - P1 |
| v_t = Q2 - Q1 |
| |
| |
| norm_s = np.hypot(v_s[0], v_s[1]) |
| norm_t = np.hypot(v_t[0], v_t[1]) |
| s = norm_t / norm_s |
| |
| |
| theta = np.arctan2(v_t[1], v_t[0]) - np.arctan2(v_s[1], v_s[0]) |
| |
| |
| cos_theta = np.cos(theta) |
| sin_theta = np.sin(theta) |
| |
| |
| sr00 = s * cos_theta |
| sr01 = -s * sin_theta |
| sr10 = s * sin_theta |
| sr11 = s * cos_theta |
| |
| |
| t0 = Q1[0] - (sr00 * P1[0] + sr01 * P1[1]) |
| t1 = Q1[1] - (sr10 * P1[0] + sr11 * P1[1]) |
| |
| |
| M = np.array([ |
| [sr00, sr01, t0], |
| [sr10, sr11, t1], |
| [0.0, 0.0, 1.0] |
| ], dtype=np.float64) |
| |
| return M |
|
|
| def three_point_affine(P, Q): |
| P = np.array(P, dtype=np.float64) |
| Q = np.array(Q, dtype=np.float64) |
| n = P.shape[0] |
| |
| |
| x, y = P[:, 0], P[:, 1] |
| u, v = Q[:, 0], Q[:, 1] |
| |
| |
| A = np.zeros((2*n, 6), dtype=np.float64) |
| A[0::2, 0] = x |
| A[0::2, 1] = y |
| A[0::2, 2] = 1 |
| A[1::2, 3] = x |
| A[1::2, 4] = y |
| A[1::2, 5] = 1 |
| |
| |
| b = np.empty(2*n, dtype=np.float64) |
| b[0::2] = u |
| b[1::2] = v |
| |
| |
| params, _, _, _ = np.linalg.lstsq(A, b, rcond=None) |
| a, b_, e, c, d, f = params |
| |
| |
| M = np.array([ |
| [a, b_, e], |
| [c, d, f], |
| [0, 0, 1] |
| ], dtype=np.float64) |
| |
| return M |
|
|
| def affine_from_4_points(src_pts, dst_pts): |
| """ |
| Compute a 2D affine transformation from 4 source points to 4 target points using least-squares. |
| Vectorized version for better performance. |
| |
| Parameters: |
| src_pts: list of 4 source points [(x1,y1),..., (x4,y4)] |
| dst_pts: list of 4 target points [(u1,v1),..., (u4,v4)] |
| |
| Returns: |
| 3x3 homogeneous affine transformation matrix |
| """ |
| P = np.array(src_pts, dtype=np.float64) |
| Q = np.array(dst_pts, dtype=np.float64) |
| |
| |
| x, y = P[:, 0], P[:, 1] |
| u, v = Q[:, 0], Q[:, 1] |
| |
| A = np.zeros((8, 6), dtype=np.float64) |
| A[0::2, 0] = x |
| A[0::2, 1] = y |
| A[0::2, 2] = 1 |
| A[1::2, 3] = x |
| A[1::2, 4] = y |
| A[1::2, 5] = 1 |
| |
| b = np.empty(8, dtype=np.float64) |
| b[0::2] = u |
| b[1::2] = v |
|
|
| |
| params, _, _, _ = np.linalg.lstsq(A, b, rcond=None) |
| a, b_, e, c, d, f = params |
|
|
| |
| M = np.array([ |
| [a, b_, e], |
| [c, d, f], |
| [0, 0, 1] |
| ], dtype=np.float64) |
| return M |
|
|
| def four_point_homography(src_pts, dst_pts): |
| """ |
| Compute 2D homography mapping 4 source points to 4 target points. |
| Vectorized version for better performance. |
| |
| src_pts: list of 4 source points [(x1,y1),..., (x4,y4)] |
| dst_pts: list of 4 target points [(u1,v1),..., (u4,v4)] |
| |
| Returns: |
| 3x3 homography matrix |
| """ |
| |
| src = np.array(src_pts, dtype=np.float64) |
| dst = np.array(dst_pts, dtype=np.float64) |
| |
| x, y = src[:, 0], src[:, 1] |
| u, v = dst[:, 0], dst[:, 1] |
| |
| |
| A = np.zeros((8, 9), dtype=np.float64) |
| A[0::2, 0] = -x |
| A[0::2, 1] = -y |
| A[0::2, 2] = -1 |
| A[0::2, 6] = x * u |
| A[0::2, 7] = y * u |
| A[0::2, 8] = u |
| |
| A[1::2, 3] = -x |
| A[1::2, 4] = -y |
| A[1::2, 5] = -1 |
| A[1::2, 6] = x * v |
| A[1::2, 7] = y * v |
| A[1::2, 8] = v |
| |
| |
| _, _, Vt = np.linalg.svd(A) |
| h = Vt[-1, :] |
| H = h.reshape(3, 3) |
| |
| |
| H /= H[2, 2] |
| return H |
|
|
| def unique_points(src, dst): |
| src, dst = np.asarray(src, float), np.asarray(dst, float) |
| |
| src_nonzero = ~np.all(np.abs(src) < 1e-9, axis=1) |
| dst_nonzero = ~np.all(np.abs(dst) < 1e-9, axis=1) |
| valid_mask = src_nonzero & dst_nonzero |
| |
| if not valid_mask.any(): |
| return np.array([]), np.array([]) |
| |
| src_valid = src[valid_mask] |
| dst_valid = dst[valid_mask] |
| |
| |
| _, unique_idx = np.unique(src_valid, axis=0, return_index=True) |
| unique_idx.sort() |
| |
| return src_valid[unique_idx], dst_valid[unique_idx] |
|
|
| def robust_transform(src_pts, dst_pts): |
| src, dst = unique_points(src_pts, dst_pts) |
| n = len(src) |
| if n >= 4: |
| if are_collinear(src) or are_collinear(dst): |
| H = affine_from_4_points(src, dst) |
| return lambda pt: apply_transform(H, pt) |
| else: |
| H = four_point_homography(src, dst) |
| return lambda pt: apply_homo_transform(H, pt) |
| elif n==3: |
| H = three_point_affine(src,dst) |
| elif n==2: |
| H = line_to_line_transform(src[0],src[1],dst[0],dst[1]) |
| elif n==1: |
| t = dst[0]-src[0] |
| H = np.eye(3) |
| H[:2,2] = t |
| else: |
| H = np.eye(3) |
| return lambda pt: apply_transform(H, pt) |
|
|
| def apply_homo_transform(M, P): |
| |
| x, y = P[0], P[1] |
| |
| |
| w = M[2, 0] * x + M[2, 1] * y + M[2, 2] |
| x_new = (M[0, 0] * x + M[0, 1] * y + M[0, 2]) / w |
| y_new = (M[1, 0] * x + M[1, 1] * y + M[1, 2]) / w |
| |
| |
| return (int(x_new - x), int(y_new - y)) |
|
|
| def apply_transform(M, P): |
| """ |
| Transform a single 2D point using a 3x3 transformation matrix H. |
| Optimized version avoiding array creation. |
| |
| Args: |
| H : 3x3 numpy array |
| Transformation matrix (homography, affine, similarity, etc.) |
| point : (x, y) array-like |
| Single point coordinates to transform. |
| |
| Returns: |
| (x', y') : Transformed point coordinates |
| """ |
| |
| x, y = P[0], P[1] |
| x_new = M[0, 0] * x + M[0, 1] * y + M[0, 2] |
| y_new = M[1, 0] * x + M[1, 1] * y + M[1, 2] |
| return (int(x_new), int(y_new)) |
| |
| def pick_pt(points): |
| |
| if not points: |
| return [] |
| pts_arr = np.asarray(points, dtype=np.int32) |
| seen = np.zeros(32, dtype=bool) |
| valid_mask = (pts_arr >= 0) & (pts_arr < 32) |
| seen[pts_arr[valid_mask]] = True |
| |
| out_seen = np.zeros(32, dtype=bool) |
| out = [] |
| for p in pts_arr[valid_mask]: |
| neigh = GROUPS_ARRAY[p] |
| candidates = neigh[~seen[neigh] & ~out_seen[neigh]] |
| out_seen[candidates] = True |
| out.extend(candidates.tolist()) |
| return out |
|
|
| def make_possible_keypoints(all_keypoints, frame_width, frame_height, limit=2): |
| |
| if not all_keypoints: |
| return [] |
|
|
| results = [] |
|
|
| for keypoints in all_keypoints: |
| |
| |
| |
| arr = np.asarray(keypoints, dtype=np.int32) |
|
|
| |
| if arr.ndim != 2 or arr.shape[1] != 2: |
| continue |
|
|
| |
| mask = (arr[:, 0] != 0) & (arr[:, 1] != 0) |
| non_zero_count = mask.sum() |
| |
| |
| if non_zero_count > 4: |
| results.append(keypoints) |
| continue |
| |
| if non_zero_count < 2: |
| continue |
|
|
| |
| if non_zero_count == 4: |
| results.append(keypoints) |
|
|
| |
| non_zero_idxs = np.flatnonzero(mask) |
| |
| |
| src = KEYPOINTS_NP[non_zero_idxs] |
| dest = arr[non_zero_idxs].astype(np.float32) |
|
|
| try: |
| |
| transform_func = robust_transform(src, dest) |
| except Exception: |
| continue |
| |
| |
| candidate_idxs = pick_pt(non_zero_idxs.tolist()) |
| if not candidate_idxs: |
| continue |
|
|
| |
| valid_cache = {} |
| valid_real_idxs = [] |
|
|
| for idx in candidate_idxs: |
| |
| t_pt = transform_func(KEYPOINTS_NP[idx]) |
| |
| |
| tx, ty = t_pt[0], t_pt[1] |
| |
| |
| if 0 <= tx < frame_width and 0 <= ty < frame_height: |
| valid_cache[idx] = (int(tx), int(ty)) |
| valid_real_idxs.append(idx) |
|
|
| |
| n_missing = 5 - non_zero_count |
| if len(valid_real_idxs) < n_missing: |
| continue |
|
|
| |
| cnt = 0 |
| for group in combinations(valid_real_idxs, n_missing): |
| if cnt >= limit: |
| break |
| cnt += 1 |
| |
| |
| |
| new_result = list(keypoints) |
| |
| |
| for idx in group: |
| new_result[idx] = valid_cache[idx] |
| |
| results.append(new_result) |
|
|
| return results |
| |
| def _get_shared_eval_executor(max_workers: int) -> ThreadPoolExecutor: |
| global _shared_eval_executor |
| if _shared_eval_executor is None: |
| _shared_eval_executor = ThreadPoolExecutor(max_workers=max_workers) |
| return _shared_eval_executor |
|
|
| def evaluates(jobs, h, w, total_frames: int): |
| |
| if len(jobs) == 0: |
| return [] |
| |
| unique_jobs = [] |
| seen = set() |
| |
| for (job, frame_index) in jobs: |
| try: |
| |
| if isinstance(job, np.ndarray): |
| key_bytes = job.astype(np.int32).tobytes() if job.dtype != np.int32 else job.tobytes() |
| else: |
| key_bytes = np.asarray(job, dtype=np.int32).tobytes() |
| |
| sig = (frame_index, key_bytes) |
| if sig in seen: |
| continue |
| seen.add(sig) |
| unique_jobs.append((job, frame_index, key_bytes)) |
| except Exception as e: |
| continue |
|
|
| if len(unique_jobs) <= 10: |
| scores_unique = [ |
| evaluate_keypoints_for_frame(job, frame_index, h, w, precomputed_key=(key_bytes, w, h)) |
| for (job, frame_index, key_bytes) in unique_jobs |
| ] |
| else: |
| cpu_count = max(1, (os.cpu_count() or 1)) |
| max_workers = min(max(2, cpu_count), 8) |
|
|
| chunk_size = 500 |
| scores_unique = [] |
| ex = _get_shared_eval_executor(max_workers) |
| |
| for i in range(0, len(unique_jobs), chunk_size): |
| chunk = unique_jobs[i:i + chunk_size] |
| scores_unique.extend( |
| ex.map( |
| lambda pair: evaluate_keypoints_for_frame(pair[0], pair[1], h, w, precomputed_key=(pair[2], w, h)), |
| chunk, |
| ) |
| ) |
| scores = np.full(total_frames, -1.0, dtype=np.float32) |
| results = [[(0, 0)] * 32 for _ in range(total_frames)] |
|
|
| for score, (k, frame_index, _) in zip(scores_unique, unique_jobs): |
| if score > scores[frame_index]: |
| scores[frame_index] = score |
| results[frame_index] = k |
| |
| return results |
|
|
| def fix_keypoints_pri( |
| results_frames, |
| frame_width: int, |
| frame_height: int |
| ) -> list[Any]: |
| max_frames = len(results_frames) |
| limit = 30 |
| before = deque(maxlen=limit) |
| after = deque(maxlen=limit) |
| sparse_template = [(0,0)] * 32 |
| all_possible = [None] * max_frames |
| for i in range(max_frames): |
| all_possible[i] = make_possible_keypoints([results_frames[i]], frame_width, frame_height) |
| for i in range(1, min(limit, max_frames)): |
| after.append(all_possible[i]) |
| |
| current = all_possible[0] if max_frames > 0 else [] |
| total_jobs = [] |
|
|
| for frame_index in range(max_frames): |
| if frame_index < max_frames - limit: |
| future_idx = frame_index + limit |
| if all_possible[future_idx] is None: |
| all_possible[future_idx] = make_possible_keypoints([results_frames[future_idx]], frame_width, frame_height) |
| after.append(all_possible[future_idx]) |
| |
| frame_jobs = [(kpts, frame_index) for kpts in current] |
| for t in after: |
| frame_jobs.extend([(kpts, frame_index) for kpts in t]) |
| for t in before: |
| frame_jobs.extend([(kpts, frame_index) for kpts in t]) |
| frame_jobs.append((sparse_template, frame_index)) |
| total_jobs.extend(frame_jobs) |
| |
| before.append(current) |
| |
| if len(after) != 0: |
| current = after.popleft() |
| |
| start_time = time.time() |
| results = evaluates(total_jobs, frame_height, frame_width, max_frames) |
| print(f"Evaluation time: {time.time() - start_time}") |
| return results |
|
|
| |
| def normalize_results(frame_results, threshold): |
| if not frame_results: |
| return [] |
| |
| results_array = [] |
| for result in frame_results: |
| arr = np.array(result, dtype=np.float32) |
| if arr.size == 0: |
| results_array.append([]) |
| continue |
| |
| mask = arr[:, 2] > threshold |
| scaled = arr[:, :2] |
| scaled = np.where(mask[:, None], scaled, 0) |
| results_array.append([(int(x), int(y)) for x, y in scaled]) |
| |
| return results_array |
|
|
| def convert_to_gray(image): |
| gray = cvtColor(image, COLOR_BGR2GRAY) |
| gray = morphologyEx(gray, MORPH_TOPHAT, kernel, dst=gray) |
| GaussianBlur(gray, (5, 5), 0, dst=gray) |
| image_edges = Canny(gray, 30, 100) |
| return image_edges |
|
|
| def get_cls_net(config, pretrained='', **kwargs): |
| """Create keypoint detection model with softmax activation""" |
| |
|
|
| def conv3x3(in_planes, out_planes, stride=1): |
| """3x3 convolution with padding""" |
| return nn.Conv2d(in_planes, out_planes, kernel_size=3, |
| stride=stride, padding=1, bias=False) |
| |
| class BasicBlock(nn.Module): |
| expansion = 1 |
|
|
| def __init__(self, inplanes, planes, stride=1, downsample=None): |
| super(BasicBlock, self).__init__() |
| self.conv1 = conv3x3(inplanes, planes, stride) |
| self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.conv2 = conv3x3(planes, planes) |
| self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x): |
| residual = x |
|
|
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
|
|
| out = self.conv2(out) |
| out = self.bn2(out) |
|
|
| if self.downsample is not None: |
| residual = self.downsample(x) |
|
|
| out += residual |
| out = self.relu(out) |
|
|
| return out |
| |
| class Bottleneck(nn.Module): |
| expansion = 4 |
|
|
| def __init__(self, inplanes, planes, stride=1, downsample=None): |
| super(Bottleneck, self).__init__() |
| self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) |
| self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, |
| padding=1, bias=False) |
| self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, |
| bias=False) |
| self.bn3 = BatchNorm2d(planes * self.expansion, |
| momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x): |
| residual = x |
|
|
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
|
|
| out = self.conv2(out) |
| out = self.bn2(out) |
| out = self.relu(out) |
|
|
| out = self.conv3(out) |
| out = self.bn3(out) |
|
|
| if self.downsample is not None: |
| residual = self.downsample(x) |
|
|
| out += residual |
| out = self.relu(out) |
|
|
| return out |
|
|
| BatchNorm2d = nn.BatchNorm2d |
| BN_MOMENTUM = 0.1 |
| blocks_dict = { |
| 'BASIC': BasicBlock, |
| 'BOTTLENECK': Bottleneck |
| } |
| class HighResolutionModule(nn.Module): |
| def __init__(self, num_branches, blocks, num_blocks, num_inchannels, |
| num_channels, fuse_method, multi_scale_output=True): |
| super(HighResolutionModule, self).__init__() |
| self._check_branches( |
| num_branches, blocks, num_blocks, num_inchannels, num_channels) |
|
|
| self.num_inchannels = num_inchannels |
| self.fuse_method = fuse_method |
| self.num_branches = num_branches |
|
|
| self.multi_scale_output = multi_scale_output |
|
|
| self.branches = self._make_branches( |
| num_branches, blocks, num_blocks, num_channels) |
| self.fuse_layers = self._make_fuse_layers() |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def _check_branches(self, num_branches, blocks, num_blocks, |
| num_inchannels, num_channels): |
| if num_branches != len(num_blocks): |
| error_msg = 'NUM_BRANCHES({}) <> NUM_BLOCKS({})'.format( |
| num_branches, len(num_blocks)) |
| raise ValueError(error_msg) |
|
|
| if num_branches != len(num_channels): |
| error_msg = 'NUM_BRANCHES({}) <> NUM_CHANNELS({})'.format( |
| num_branches, len(num_channels)) |
| raise ValueError(error_msg) |
|
|
| if num_branches != len(num_inchannels): |
| error_msg = 'NUM_BRANCHES({}) <> NUM_INCHANNELS({})'.format( |
| num_branches, len(num_inchannels)) |
| raise ValueError(error_msg) |
|
|
| def _make_one_branch(self, branch_index, block, num_blocks, num_channels, |
| stride=1): |
| downsample = None |
| if stride != 1 or \ |
| self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(self.num_inchannels[branch_index], |
| num_channels[branch_index] * block.expansion, |
| kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(num_channels[branch_index] * block.expansion, |
| momentum=BN_MOMENTUM), |
| ) |
|
|
| layers = [] |
| layers.append(block(self.num_inchannels[branch_index], |
| num_channels[branch_index], stride, downsample)) |
| self.num_inchannels[branch_index] = \ |
| num_channels[branch_index] * block.expansion |
| for i in range(1, num_blocks[branch_index]): |
| layers.append(block(self.num_inchannels[branch_index], |
| num_channels[branch_index])) |
|
|
| return nn.Sequential(*layers) |
|
|
| def _make_branches(self, num_branches, block, num_blocks, num_channels): |
| branches = [] |
|
|
| for i in range(num_branches): |
| branches.append( |
| self._make_one_branch(i, block, num_blocks, num_channels)) |
|
|
| return nn.ModuleList(branches) |
|
|
| def _make_fuse_layers(self): |
| if self.num_branches == 1: |
| return None |
|
|
| num_branches = self.num_branches |
| num_inchannels = self.num_inchannels |
| fuse_layers = [] |
| for i in range(num_branches if self.multi_scale_output else 1): |
| fuse_layer = [] |
| for j in range(num_branches): |
| if j > i: |
| fuse_layer.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], |
| num_inchannels[i], |
| 1, |
| 1, |
| 0, |
| bias=False), |
| BatchNorm2d(num_inchannels[i], momentum=BN_MOMENTUM))) |
| |
| elif j == i: |
| fuse_layer.append(None) |
| else: |
| conv3x3s = [] |
| for k in range(i - j): |
| if k == i - j - 1: |
| num_outchannels_conv3x3 = num_inchannels[i] |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], |
| num_outchannels_conv3x3, |
| 3, 2, 1, bias=False), |
| BatchNorm2d(num_outchannels_conv3x3, momentum=BN_MOMENTUM))) |
| else: |
| num_outchannels_conv3x3 = num_inchannels[j] |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], |
| num_outchannels_conv3x3, |
| 3, 2, 1, bias=False), |
| BatchNorm2d(num_outchannels_conv3x3, |
| momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| fuse_layer.append(nn.Sequential(*conv3x3s)) |
| fuse_layers.append(nn.ModuleList(fuse_layer)) |
|
|
| return nn.ModuleList(fuse_layers) |
|
|
| def get_num_inchannels(self): |
| return self.num_inchannels |
|
|
| def forward(self, x): |
| if self.num_branches == 1: |
| return [self.branches[0](x[0])] |
|
|
| for i in range(self.num_branches): |
| x[i] = self.branches[i](x[i]) |
|
|
| x_fuse = [] |
| for i in range(len(self.fuse_layers)): |
| y = x[0] if i == 0 else self.fuse_layers[i][0](x[0]) |
| for j in range(1, self.num_branches): |
| if i == j: |
| y = y + x[j] |
| elif j > i: |
| y = y + F.interpolate( |
| self.fuse_layers[i][j](x[j]), |
| size=[x[i].shape[2], x[i].shape[3]], |
| mode='bilinear') |
| else: |
| y = y + self.fuse_layers[i][j](x[j]) |
| x_fuse.append(self.relu(y)) |
|
|
| return x_fuse |
|
|
| class HighResolutionNet(nn.Module): |
|
|
| def __init__(self, config, lines=False, **kwargs): |
| self.inplanes = 64 |
| self.lines = lines |
| extra = config['MODEL']['EXTRA'] |
| super(HighResolutionNet, self).__init__() |
|
|
| |
| self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=2, padding=1, |
| bias=False) |
| self.bn1 = BatchNorm2d(self.inplanes, momentum=BN_MOMENTUM) |
| self.conv2 = nn.Conv2d(self.inplanes, self.inplanes, kernel_size=3, stride=2, padding=1, |
| bias=False) |
| self.bn2 = BatchNorm2d(self.inplanes, momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.sf = nn.Softmax(dim=1) |
| self.layer1 = self._make_layer(Bottleneck, 64, 64, 4) |
|
|
| self.stage2_cfg = extra['STAGE2'] |
| num_channels = self.stage2_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage2_cfg['BLOCK']] |
| num_channels = [ |
| num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition1 = self._make_transition_layer( |
| [256], num_channels) |
| self.stage2, pre_stage_channels = self._make_stage( |
| self.stage2_cfg, num_channels) |
|
|
| self.stage3_cfg = extra['STAGE3'] |
| num_channels = self.stage3_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage3_cfg['BLOCK']] |
| num_channels = [ |
| num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition2 = self._make_transition_layer( |
| pre_stage_channels, num_channels) |
| self.stage3, pre_stage_channels = self._make_stage( |
| self.stage3_cfg, num_channels) |
|
|
| self.stage4_cfg = extra['STAGE4'] |
| num_channels = self.stage4_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage4_cfg['BLOCK']] |
| num_channels = [ |
| num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition3 = self._make_transition_layer( |
| pre_stage_channels, num_channels) |
| self.stage4, pre_stage_channels = self._make_stage( |
| self.stage4_cfg, num_channels, multi_scale_output=True) |
|
|
| self.upsample = nn.Upsample(scale_factor=2, mode='nearest') |
| final_inp_channels = sum(pre_stage_channels) + self.inplanes |
|
|
| self.head = nn.Sequential(nn.Sequential( |
| nn.Conv2d( |
| in_channels=final_inp_channels, |
| out_channels=final_inp_channels, |
| kernel_size=1), |
| BatchNorm2d(final_inp_channels, momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True), |
| nn.Conv2d( |
| in_channels=final_inp_channels, |
| out_channels=config['MODEL']['NUM_JOINTS'], |
| kernel_size=extra['FINAL_CONV_KERNEL']), |
| nn.Softmax(dim=1) if self.lines == False else nn.Sigmoid())) |
|
|
|
|
|
|
| def _make_head(self, x, x_skip): |
| x = self.upsample(x) |
| x = torch.cat([x, x_skip], dim=1) |
| x = self.head(x) |
|
|
| return x |
|
|
| def _make_transition_layer( |
| self, num_channels_pre_layer, num_channels_cur_layer): |
| num_branches_cur = len(num_channels_cur_layer) |
| num_branches_pre = len(num_channels_pre_layer) |
|
|
| transition_layers = [] |
| for i in range(num_branches_cur): |
| if i < num_branches_pre: |
| if num_channels_cur_layer[i] != num_channels_pre_layer[i]: |
| transition_layers.append(nn.Sequential( |
| nn.Conv2d(num_channels_pre_layer[i], |
| num_channels_cur_layer[i], |
| 3, |
| 1, |
| 1, |
| bias=False), |
| BatchNorm2d( |
| num_channels_cur_layer[i], momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| else: |
| transition_layers.append(None) |
| else: |
| conv3x3s = [] |
| for j in range(i + 1 - num_branches_pre): |
| inchannels = num_channels_pre_layer[-1] |
| outchannels = num_channels_cur_layer[i] \ |
| if j == i - num_branches_pre else inchannels |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d( |
| inchannels, outchannels, 3, 2, 1, bias=False), |
| BatchNorm2d(outchannels, momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| transition_layers.append(nn.Sequential(*conv3x3s)) |
|
|
| return nn.ModuleList(transition_layers) |
|
|
| def _make_layer(self, block, inplanes, planes, blocks, stride=1): |
| downsample = None |
| if stride != 1 or inplanes != planes * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(inplanes, planes * block.expansion, |
| kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM), |
| ) |
|
|
| layers = [] |
| layers.append(block(inplanes, planes, stride, downsample)) |
| inplanes = planes * block.expansion |
| for i in range(1, blocks): |
| layers.append(block(inplanes, planes)) |
|
|
| return nn.Sequential(*layers) |
|
|
| def _make_stage(self, layer_config, num_inchannels, |
| multi_scale_output=True): |
| num_modules = layer_config['NUM_MODULES'] |
| num_branches = layer_config['NUM_BRANCHES'] |
| num_blocks = layer_config['NUM_BLOCKS'] |
| num_channels = layer_config['NUM_CHANNELS'] |
| block = blocks_dict[layer_config['BLOCK']] |
| fuse_method = layer_config['FUSE_METHOD'] |
|
|
| modules = [] |
| for i in range(num_modules): |
| |
| if not multi_scale_output and i == num_modules - 1: |
| reset_multi_scale_output = False |
| else: |
| reset_multi_scale_output = True |
| modules.append( |
| HighResolutionModule(num_branches, |
| block, |
| num_blocks, |
| num_inchannels, |
| num_channels, |
| fuse_method, |
| reset_multi_scale_output) |
| ) |
| num_inchannels = modules[-1].get_num_inchannels() |
|
|
| return nn.Sequential(*modules), num_inchannels |
|
|
| def forward(self, x): |
| |
| x = self.conv1(x) |
| x_skip = x.clone() |
| x = self.bn1(x) |
| x = self.relu(x) |
| x = self.conv2(x) |
| x = self.bn2(x) |
| x = self.relu(x) |
| x = self.layer1(x) |
|
|
| x_list = [] |
| for i in range(self.stage2_cfg['NUM_BRANCHES']): |
| if self.transition1[i] is not None: |
| x_list.append(self.transition1[i](x)) |
| else: |
| x_list.append(x) |
| y_list = self.stage2(x_list) |
|
|
| x_list = [] |
| for i in range(self.stage3_cfg['NUM_BRANCHES']): |
| if self.transition2[i] is not None: |
| x_list.append(self.transition2[i](y_list[-1])) |
| else: |
| x_list.append(y_list[i]) |
| y_list = self.stage3(x_list) |
|
|
| x_list = [] |
| for i in range(self.stage4_cfg['NUM_BRANCHES']): |
| if self.transition3[i] is not None: |
| x_list.append(self.transition3[i](y_list[-1])) |
| else: |
| x_list.append(y_list[i]) |
| x = self.stage4(x_list) |
|
|
| |
| height, width = x[0].size(2), x[0].size(3) |
| x1 = F.interpolate(x[1], size=(height, width), mode='bilinear', align_corners=False) |
| x2 = F.interpolate(x[2], size=(height, width), mode='bilinear', align_corners=False) |
| x3 = F.interpolate(x[3], size=(height, width), mode='bilinear', align_corners=False) |
| x = torch.cat([x[0], x1, x2, x3], 1) |
| x = self._make_head(x, x_skip) |
|
|
| return x |
|
|
| def init_weights(self, pretrained=''): |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| if self.lines == False: |
| nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') |
| else: |
| nn.init.normal_(m.weight, std=0.001) |
| |
| |
| elif isinstance(m, nn.BatchNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| if pretrained != '': |
| if os.path.isfile(pretrained): |
| pretrained_dict = torch.load(pretrained) |
| model_dict = self.state_dict() |
| pretrained_dict = {k: v for k, v in pretrained_dict.items() |
| if k in model_dict.keys()} |
| model_dict.update(pretrained_dict) |
| self.load_state_dict(model_dict) |
| else: |
| sys.exit(f'Weights {pretrained} not found.') |
|
|
| model = HighResolutionNet(config, **kwargs) |
| model.init_weights(pretrained) |
| return model |
| |
| def load_kp_model(path, device): |
| config_kp_path = path / 'hrnetv2_w48.yaml' |
| cfg_kp = yaml.safe_load(open(config_kp_path, 'r')) |
|
|
| loaded_state_kp = torch.load(path / "keypoint_detect.pt", map_location=device, weights_only=False) |
| model = get_cls_net(cfg_kp) |
| model.load_state_dict(loaded_state_kp) |
| model.to(device) |
| model.eval() |
| return model |
|
|
| def preprocess_batch_fast(frames): |
| """Ultra-fast batch preprocessing using optimized tensor operations""" |
| target_size = (540, 960) |
| batch = [] |
| for i, frame in enumerate(frames): |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| img = cv2.resize(frame_rgb, (target_size[1], target_size[0])) |
| img = img.astype(np.float32) / 255.0 |
| img = np.transpose(img, (2, 0, 1)) |
| batch.append(img) |
| batch = torch.from_numpy(np.stack(batch)).float() |
|
|
| return batch |
|
|
| def extract_keypoints_from_heatmap_fast(heatmap: torch.Tensor, scale: int = 2, max_keypoints: int = 1): |
| """Ultra-fast keypoint extraction optimized for speed""" |
| batch_size, n_channels, height, width = heatmap.shape |
| |
| |
| max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1) |
| local_maxima = (max_pooled == heatmap) |
| |
| |
| masked_heatmap = heatmap * local_maxima |
| flat_heatmap = masked_heatmap.view(batch_size, n_channels, -1) |
| scores, indices = torch.topk(flat_heatmap, max_keypoints, dim=-1, sorted=False) |
| |
| |
| y_coords = torch.div(indices, width, rounding_mode="floor") * scale |
| x_coords = (indices % width) * scale |
| |
| |
| results = torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1) |
| return results |
|
|
| def process_keypoints_vectorized(kp_coords, kp_threshold, w, h, batch_size): |
| """Ultra-fast vectorized keypoint processing""" |
| batch_results = [] |
| |
| |
| kp_np = kp_coords.cpu().numpy() |
| |
| for batch_idx in range(batch_size): |
| kp_dict = {} |
| |
| valid_kps = kp_np[batch_idx, :, 0, 2] > kp_threshold |
| valid_indices = np.where(valid_kps)[0] |
| |
| for ch_idx in valid_indices: |
| x = float(kp_np[batch_idx, ch_idx, 0, 0]) / w |
| y = float(kp_np[batch_idx, ch_idx, 0, 1]) / h |
| p = float(kp_np[batch_idx, ch_idx, 0, 2]) |
| kp_dict[ch_idx + 1] = {'x': x, 'y': y, 'p': p} |
| |
| batch_results.append(kp_dict) |
| |
| return batch_results |
|
|
| def inference_batch(frames, model, kp_threshold, device, batch_size=8): |
| """Optimized batch inference for multiple frames""" |
| results = [] |
| num_frames = len(frames) |
| |
| |
| model_device = next(model.parameters()).device |
| |
| |
| for i in range(0, num_frames, batch_size): |
| current_batch_size = min(batch_size, num_frames - i) |
| batch_frames = frames[i:i + current_batch_size] |
| |
| |
| batch = preprocess_batch_fast(batch_frames) |
| b, c, h, w = batch.size() |
| |
| |
| batch = batch.to(model_device) |
|
|
| with torch.inference_mode(): |
| heatmaps = model(batch) |
|
|
| |
| kp_coords = extract_keypoints_from_heatmap_fast(heatmaps[:,:-1,:,:], scale=2, max_keypoints=1) |
| |
| |
| batch_results = process_keypoints_vectorized(kp_coords, kp_threshold, 960, 540, current_batch_size) |
| results.extend(batch_results) |
| |
| del heatmaps, kp_coords, batch, batch_results, batch_frames |
|
|
| return results |
|
|
| map_keypoints = { |
| 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, |
| 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, |
| 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, |
| 45: 9, 50: 31, 52: 32, 57: 22 |
| } |
| def get_mapped_keypoints(kp_points): |
| """Apply keypoint mapping to detection results""" |
| mapped_points = {} |
| for key, value in kp_points.items(): |
| if key in map_keypoints: |
| mapped_key = map_keypoints[key] |
| mapped_points[mapped_key] = value |
| |
| |
| |
| return mapped_points |
|
|
| def process_batch_input(frames, model, kp_threshold, device='cpu', batch_size=16): |
| """Process multiple input images in batch""" |
| |
| kp_results = inference_batch(frames, model, kp_threshold, device, batch_size) |
| kp_results = [get_mapped_keypoints(kp) for kp in kp_results] |
|
|
| return kp_results |
|
|
| class _Pl: |
| def __init__(self, repo_root: Path) -> None: |
| self.repo_root = Path(repo_root) |
| self._executor = ThreadPoolExecutor(max_workers=3) |
| self._track_id_to_team_votes: dict[int, dict[str, int]] = {} |
| self._track_id_to_class_votes: dict[int, dict[int, int]] = {} |
| self._reid_embedder: Optional[_ReidE] = None |
| self._reid_team_assigner: Optional[_ReidT] = None |
| self._track_to_reid_mean: dict[int, np.ndarray] = {} |
| self._reid_ema_alpha = 0.25 |
| _reid_path = self.repo_root / "models" / "reid.onnx" |
| if _reid_path.exists() and _HAS_ONNXRUNTIME: |
| try: |
| self._reid_embedder = _ReidE(_reid_path, input_height=256, input_width=128) |
| self._reid_team_assigner = _ReidT() |
| except Exception: |
| self._reid_embedder = None |
| self._reid_team_assigner = None |
| self._tracker_config = "botsort.yaml" |
| models_dir = self.repo_root / "models" |
| self.ball_model = YOLO(str(models_dir / "ball-detection-model.onnx"), task="detect") |
| self.person_model = YOLO(str(models_dir / "person-detection-model.onnx"), task="detect") |
| self._keypoint_model_hrnet = None |
| _yaml_path = self.repo_root / "hrnetv2_w48.yaml" |
| _weights_path = self.repo_root / "models" / "keypoint" |
| if _f0 and _yaml_path.exists() and _weights_path.exists(): |
| try: |
| self._keypoint_model_hrnet = _l0( |
| self.repo_root, weights_subdir="models" |
| ) |
| except Exception: |
| self._keypoint_model_hrnet = None |
| self._current_batch_bbox_timings: list[tuple[str, float]] = [] |
| self._current_batch_kp_timings: list[tuple[str, float]] = [] |
|
|
| def reset_for_new_video(self) -> None: |
| self._track_id_to_team_votes.clear() |
| self._track_id_to_class_votes.clear() |
| self._track_to_reid_mean.clear() |
| if self._reid_team_assigner is not None: |
| self._reid_team_assigner.centroids = None |
| try: |
| pred = getattr(self.person_model, "predictor", None) |
| if pred is not None: |
| for t in getattr(pred, "trackers", []) or []: |
| if hasattr(t, "reset"): |
| t.reset() |
| except Exception: |
| pass |
|
|
| def _keypoint_hrnet_task( |
| self, |
| images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> dict[int, list[list[float]]]: |
| start_time = time.time() |
| default_kps = [[0.0, 0.0] for _ in range(n_keypoints)] |
| if not _f0 or self._keypoint_model_hrnet is None: |
| return {i: list(default_kps) for i in range(len(images))} |
| device = "cuda" if next(self._keypoint_model_hrnet.parameters()).is_cuda else "cpu" |
| kp_threshold = 0.2 |
| |
| |
| |
| kp_result = process_batch_input( |
| images, |
| self._keypoint_model_hrnet, |
| kp_threshold, |
| device, |
| batch_size=kp_batch_size |
| ) |
| keypoints = _n0(kp_result, images, n_keypoints) |
| for idx in range(len(keypoints)): |
| keypoints[idx] = _f1(keypoints[idx], n_keypoints) |
| out: dict[int, list[list[float]]] = {} |
| for i, kpts in enumerate(keypoints): |
| out[i] = _c1(kpts) |
| print(f"Keypoint HRNet: {time.time() - start_time}") |
| return out |
|
|
| def _bbox_task( |
| self, |
| images: list[ndarray], |
| offset: int, |
| imgsz: int, |
| conf: float, |
| onnx_batch_size: int, |
| ) -> dict[int, list[_Bx]]: |
| start_time = time.time() |
| ball_res = [] |
| for start in range(0, len(images), onnx_batch_size): |
| chunk = images[start : start + onnx_batch_size] |
| batch_res = self.ball_model.predict(chunk, imgsz=imgsz, conf=conf, verbose=False) |
| ball_res.extend(batch_res if batch_res else []) |
| print(f"Ball Model: {time.time() - start_time}") |
| start_time = time.time() |
| person_res = [] |
| for frame in images: |
| pr = self.person_model.track(frame, persist=True, tracker=self._tracker_config, imgsz=imgsz, conf=conf, verbose=False) |
| person_res.append(pr[0] if pr else None) |
| print(f"Person Model: {time.time() - start_time}") |
| start_time = time.time() |
| bboxes_by_frame: dict[int, list[_Bx]] = {} |
| track_ids_by_frame: dict[int, list[int | None]] = {} |
| boxes_raw_list: list[list[_Bx]] = [] |
| track_ids_raw_list: list[list[int | None]] = [] |
| bbox_to_track_list: list[dict[tuple[int, int, int, int], int]] = [] |
| for i, frame in enumerate(images): |
| frame_id = offset + i |
| boxes_raw = [] |
| track_ids_raw: list[int | None] = [] |
| bbox_to_track: dict[tuple[int, int, int, int], int] = {} |
| det_ball = ball_res[i] if i < len(ball_res) else None |
| if det_ball is not None and getattr(det_ball, "boxes", None) is not None and len(det_ball.boxes) > 0: |
| b = det_ball.boxes |
| xyxy = b.xyxy.cpu().numpy() |
| confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32) |
| clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32) |
| for (x1, y1, x2, y2), c, cf in zip(xyxy, clss, confs): |
| if int(c) == 0: |
| boxes_raw.append(_Bx(x1=int(round(x1)), y1=int(round(y1)), x2=int(round(x2)), y2=int(round(y2)), cls_id=_C0, conf=float(cf))) |
| track_ids_raw.append(None) |
| det_p = person_res[i] if i < len(person_res) else None |
| if det_p is not None and getattr(det_p, "boxes", None) is not None and len(det_p.boxes) > 0: |
| b = det_p.boxes |
| xyxy = b.xyxy.cpu().numpy() |
| confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32) |
| clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32) |
| track_ids = b.id.cpu().numpy().astype(int) if getattr(b, "id", None) is not None else np.full(len(clss), -1, dtype=np.int32) |
| for (x1, y1, x2, y2), c, cf, tid in zip(xyxy, clss, confs, track_ids): |
| c = int(c) |
| tid = int(tid) |
| x1r, y1r, x2r, y2r = int(round(x1)), int(round(y1)), int(round(x2)), int(round(y2)) |
| if tid >= 0: |
| bbox_to_track[(x1r, y1r, x2r, y2r)] = tid |
| tid_out = tid if tid >= 0 else None |
| if c == 0: |
| boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C2, conf=float(cf))) |
| track_ids_raw.append(tid_out) |
| elif c == 1: |
| boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C3, conf=float(cf))) |
| track_ids_raw.append(tid_out) |
| elif c == 2: |
| boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C1, conf=float(cf))) |
| track_ids_raw.append(tid_out) |
| boxes_raw_list.append(boxes_raw) |
| track_ids_raw_list.append(track_ids_raw) |
| bbox_to_track_list.append(bbox_to_track) |
| reid_embs_per_frame: list[list[Optional[np.ndarray]]] = [] |
| if self._reid_embedder and self._reid_team_assigner: |
| crop_list: list[tuple[ndarray, tuple[int, int, int, int]]] = [] |
| mapping: list[tuple[int, int]] = [] |
| for fi in range(len(images)): |
| boxes_raw = boxes_raw_list[fi] |
| for bi, bb in enumerate(boxes_raw): |
| if int(bb.cls_id) == _C2: |
| crop_list.append((images[fi], (int(bb.x1), int(bb.y1), int(bb.x2), int(bb.y2)))) |
| mapping.append((fi, bi)) |
| |
| reid_results = self._reid_embedder.extract_batch( |
| crop_list, batch_size=_R3, timings=None |
| ) |
| |
| reid_embs_per_frame = [[None] * len(boxes_raw_list[fi]) for fi in range(len(images))] |
| for k, (fi, bi) in enumerate(mapping): |
| if k < len(reid_results): |
| reid_embs_per_frame[fi][bi] = reid_results[k] |
| else: |
| reid_embs_per_frame = [[None] * len(boxes_raw) for boxes_raw in boxes_raw_list] |
| for i, frame in enumerate(images): |
| frame_id = offset + i |
| boxes_raw = boxes_raw_list[i] |
| bbox_to_track = bbox_to_track_list[i] |
| reid_embs = reid_embs_per_frame[i] |
| H, W = frame.shape[:2] |
| use_centroid: list[bool] = [] |
| if self._reid_embedder and self._reid_team_assigner: |
| player_boxes = [bb for bb in boxes_raw if int(bb.cls_id) == _C2] |
| use_centroid = [not any(_i1(bb, o) >= _T0 for o in player_boxes if o is not bb) for bb in boxes_raw] |
| team_ids_reid = self._reid_team_assigner.assign(reid_embs, use_for_centroid=use_centroid) |
| boxes_with_team = [] |
| for idx, bb in enumerate(boxes_raw): |
| if int(bb.cls_id) == _C2 and idx < len(team_ids_reid): |
| tid = team_ids_reid[idx] |
| boxes_with_team.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=bb.cls_id, conf=bb.conf, team_id=str(int(tid)))) |
| else: |
| boxes_with_team.append(bb) |
| track_ids_with_team = track_ids_raw_list[i] if i < len(track_ids_raw_list) else [None] * len(boxes_raw) |
| else: |
| boxes_with_team = list(boxes_raw) |
| track_ids_with_team = track_ids_raw_list[i] if i < len(track_ids_raw_list) else [None] * len(boxes_raw) |
| boxes_stabilized = [] |
| track_ids_stabilized: list[int | None] = [] |
| for idx, bb in enumerate(boxes_with_team): |
| best_tid = -1 |
| best_iou = 0.0 |
| for (bx1, by1, bx2, by2), tid in bbox_to_track.items(): |
| iou = _i1(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=0, conf=0.0), _Bx(x1=bx1, y1=by1, x2=bx2, y2=by2, cls_id=0, conf=0.0)) |
| if iou > best_iou and iou > 0.5: |
| best_iou, best_tid = iou, tid |
| tid_out = best_tid if best_tid >= 0 else (track_ids_with_team[idx] if idx < len(track_ids_with_team) else None) |
| if best_tid >= 0: |
| if best_tid not in self._track_id_to_class_votes: |
| self._track_id_to_class_votes[best_tid] = {} |
| cls_key = int(bb.cls_id) |
| self._track_id_to_class_votes[best_tid][cls_key] = self._track_id_to_class_votes[best_tid].get(cls_key, 0) + 1 |
| if int(bb.cls_id) == _C2 and bb.team_id: |
| team_key = (bb.team_id or "1").strip() |
| if best_tid not in self._track_id_to_team_votes: |
| self._track_id_to_team_votes[best_tid] = {} |
| self._track_id_to_team_votes[best_tid][team_key] = self._track_id_to_team_votes[best_tid].get(team_key, 0) + 1 |
| boxes_stabilized.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=bb.cls_id, conf=bb.conf, team_id=bb.team_id)) |
| track_ids_stabilized.append(tid_out) |
| else: |
| boxes_stabilized.append(bb) |
| track_ids_stabilized.append(tid_out) |
| bboxes_by_frame[frame_id] = boxes_stabilized |
| track_ids_by_frame[frame_id] = track_ids_stabilized |
| for fid in range(offset, offset + len(images)): |
| new_boxes = [] |
| tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid])) |
| for box_idx, box in enumerate(bboxes_by_frame[fid]): |
| tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None |
| if tid is not None and tid >= 0 and tid in self._track_id_to_class_votes: |
| majority_cls = max(self._track_id_to_class_votes[tid].items(), key=lambda x: x[1])[0] |
| if tid in self._track_id_to_team_votes and self._track_id_to_team_votes[tid]: |
| majority_team = max(self._track_id_to_team_votes[tid].items(), key=lambda x: x[1])[0] |
| else: |
| majority_team = box.team_id |
| new_boxes.append(_Bx(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=majority_cls, conf=box.conf, team_id=majority_team)) |
| else: |
| new_boxes.append(box) |
| bboxes_by_frame[fid] = new_boxes |
| track_ids_by_frame[fid] = tids_fid |
| if len(images) > 0: |
| H, W = images[0].shape[:2] |
| for fid in range(offset, offset + len(images)): |
| orig_boxes = bboxes_by_frame[fid] |
| orig_tids = track_ids_by_frame.get(fid, [None] * len(orig_boxes)) |
| adjusted = _a0( |
| orig_boxes, |
| frame_width=W, |
| frame_height=H, |
| do_goalkeeper_dedup=True, |
| do_referee_disambiguation=True, |
| ) |
| adjusted_tids: list[int | None] = [] |
| used_orig = set() |
| for ab in adjusted: |
| matched = None |
| for oi, ob in enumerate(orig_boxes): |
| if oi in used_orig: |
| continue |
| if ob.x1 == ab.x1 and ob.y1 == ab.y1 and ob.x2 == ab.x2 and ob.y2 == ab.y2: |
| matched = orig_tids[oi] if oi < len(orig_tids) else None |
| used_orig.add(oi) |
| break |
| adjusted_tids.append(matched) |
| if _q0 != 0.0 or _q1 != 0.0: |
| boxes_offset = [] |
| offset_tids = [] |
| for ab_idx, bb in enumerate(adjusted): |
| cx = 0.5 * (bb.x1 + bb.x2) |
| cy = 0.5 * (bb.y1 + bb.y2) |
| w = bb.x2 - bb.x1 |
| h = bb.y2 - bb.y1 |
| cx *= 1.0 + _q0 |
| cy *= 1.0 + _q1 |
| boxes_offset.append(_Bx(x1=int(round(cx - w/2)), y1=int(round(cy - h/2)), x2=int(round(cx + w/2)), y2=int(round(cy + h/2)), cls_id=bb.cls_id, conf=bb.conf, team_id=bb.team_id)) |
| offset_tids.append(adjusted_tids[ab_idx] if ab_idx < len(adjusted_tids) else None) |
| adjusted = boxes_offset |
| adjusted_tids = offset_tids |
| bboxes_by_frame[fid] = adjusted |
| track_ids_by_frame[fid] = adjusted_tids |
| if _A0 and _S0 > 1 and len(images) > 0: |
| _tmp_results = [] |
| for fid in range(offset, offset + len(images)): |
| _boxes = bboxes_by_frame.get(fid, []) |
| _tmp_results.append( |
| _FRes( |
| frame_id=fid, |
| boxes=[{"x1": int(b.x1), "y1": int(b.y1), "x2": int(b.x2), "y2": int(b.y2), "cls_id": int(b.cls_id), "conf": float(b.conf), "team_id": b.team_id} for b in _boxes], |
| keypoints=[], |
| ) |
| ) |
| _tmp_results = _s0(_tmp_results, window=_S0, tids_by_frame=track_ids_by_frame) |
| for r in _tmp_results: |
| bboxes_by_frame[int(r.frame_id)] = [_Bx(**box) for box in r.boxes] |
| self._current_batch_bbox_timings = [] |
| print(f"Detect Process: {time.time() - start_time}") |
| return bboxes_by_frame |
|
|
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[_FRes]: |
| if not batch_images: |
| return [] |
| if offset == 0: |
| self.reset_for_new_video() |
| gc.collect() |
| try: |
| import torch |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| except Exception: |
| pass |
| images = list(batch_images) |
| n_frames = len(images) |
| imgsz = _D0 |
| conf = _D1 |
| executor = self._executor |
| default_kps = [[0.0, 0.0] for _ in range(n_keypoints)] |
| start_time = time.time() |
| |
| |
| |
| |
| |
| |
| bboxes_by_frame = self._bbox_task(images, offset, imgsz, conf, onnx_batch_size) |
| keypoints_by_frame = self._keypoint_hrnet_task(images, offset, n_keypoints) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| print(f"Predict Objects: {time.time() - start_time}") |
| start_time = time.time() |
| if _E1 and keypoints_by_frame and n_keypoints == 32 and len(_F0) == 32 and len(_F1) == 32: |
| for idx in range(len(images)): |
| frame_id = idx |
| kps = keypoints_by_frame.get(frame_id) |
| if not kps or len(kps) != 32: |
| continue |
| frame = images[idx] |
| frame_height, frame_width = frame.shape[:2] |
| valid_src_corrected: list[tuple[float, float]] = [] |
| valid_dst: list[tuple[float, float]] = [] |
| valid_indices: list[int] = [] |
| for kp_idx, kp in enumerate(kps): |
| if kp and len(kp) >= 2: |
| x, y = float(kp[0]), float(kp[1]) |
| if not (abs(x) < 1e-6 and abs(y) < 1e-6) and 0 <= x < frame_width and 0 <= y < frame_height: |
| valid_src_corrected.append(_F1[kp_idx]) |
| valid_dst.append((x, y)) |
| valid_indices.append(kp_idx) |
| if len(valid_src_corrected) < 4: |
| continue |
| src_pts = np.array(valid_src_corrected, dtype=np.float32) |
| dst_pts = np.array(valid_dst, dtype=np.float32) |
| H_corrected, _ = cv2.findHomography(src_pts, dst_pts) |
| if H_corrected is None: |
| continue |
| all_template_points = np.array(_F0, dtype=np.float32).reshape(-1, 1, 2) |
| adjusted_points = cv2.perspectiveTransform(all_template_points, H_corrected) |
| adjusted_points = adjusted_points.reshape(-1, 2) |
| adj_x_arr = adjusted_points[:32, 0] |
| adj_y_arr = adjusted_points[:32, 1] |
| valid_mask = (adj_x_arr >= 0) & (adj_y_arr >= 0) & (adj_x_arr < frame_width) & (adj_y_arr < frame_height) |
| valid_indices_set = set(valid_indices) |
| adjusted_kps: list[list[float]] = [[0.0, 0.0]] * 32 |
| for i in np.where(valid_mask)[0]: |
| if _S1 or i in valid_indices_set: |
| adjusted_kps[i] = [float(adj_x_arr[i]), float(adj_y_arr[i])] |
| keypoints_by_frame[frame_id] = adjusted_kps |
|
|
| print(f"Get kps: {time.time() - start_time}") |
|
|
| h, w = batch_images[0].shape[:2] |
| keypoints_by_frame = fix_keypoints_pri(keypoints_by_frame, w, h) |
|
|
| results = [] |
| for idx in range(len(images)): |
| frame_number = offset + idx |
| boxes_raw = bboxes_by_frame.get(frame_number, []) |
| boxes_for_result = [ |
| { |
| "x1": int(b.x1), |
| "y1": int(b.y1), |
| "x2": int(b.x2), |
| "y2": int(b.y2), |
| "cls_id": int(b.cls_id), |
| "conf": float(b.conf), |
| "team_id": b.team_id, |
| } |
| for b in boxes_raw |
| ] |
| results.append( |
| _FRes( |
| frame_id=frame_number, |
| boxes=boxes_for_result, |
| keypoints=convert_keypoints_to_val_format(keypoints_by_frame[frame_number - offset]) |
| ) |
| ) |
| return results |
|
|
|
|
| class Miner: |
| def __init__(self, path_hf_repo: Path) -> None: |
| self.health = "Okay!!!" |
| self.pipeline: _Pl | None = None |
| self.path_hf_repo = Path(path_hf_repo) |
| self.is_start = False |
|
|
| def __repr__(self) -> str: |
| return self.health |
|
|
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[_FRes]: |
| if self.is_start is False: |
| self.is_start = True |
| return [] |
| if self.pipeline is None: |
| self.pipeline = _Pl(repo_root=self.path_hf_repo) |
| return self.pipeline.predict_batch(batch_images, offset, n_keypoints) |
|
|