turbo_3_0 / miner.py
gloriforge's picture
Duplicate from gloriforge/turbo_3_1
1270219
from __future__ import annotations
import gc
import os
import sys
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import yaml
from numpy import ndarray
from pydantic import BaseModel
from ultralytics import YOLO
from typing import Iterable, Generator, List, TypeVar, Tuple, Sequence, Any, Dict, Optional
from collections import deque, OrderedDict, defaultdict
import threading
from itertools import combinations
import yaml
from cv2 import (
bitwise_and,
findHomography,
warpPerspective,
cvtColor,
COLOR_BGR2GRAY,
threshold,
THRESH_BINARY,
getStructuringElement,
MORPH_RECT,
MORPH_TOPHAT,
GaussianBlur,
morphologyEx,
Canny,
connectedComponents,
perspectiveTransform,
RETR_EXTERNAL,
CHAIN_APPROX_SIMPLE,
findContours,
boundingRect,
dilate,
imread,
countNonZero
)
import gc
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
_f0 = True
BatchNorm2d = nn.BatchNorm2d
_v0 = 0.1
def _c0(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)
class _B0(nn.Module):
expansion = 1
def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: Any = None):
super().__init__()
self.conv1 = _c0(inplanes, planes, stride)
self.bn1 = BatchNorm2d(planes, momentum=_v0)
self.relu = nn.ReLU(inplace=True)
self.conv2 = _c0(planes, planes)
self.bn2 = BatchNorm2d(planes, momentum=_v0)
self.downsample = downsample
self.stride = stride
def forward(self, x: torch.Tensor) -> torch.Tensor:
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class _B1(nn.Module):
expansion = 4
def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: Any = None):
super().__init__()
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
self.bn1 = BatchNorm2d(planes, momentum=_v0)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn2 = BatchNorm2d(planes, momentum=_v0)
self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
self.bn3 = BatchNorm2d(planes * self.expansion, momentum=_v0)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x: torch.Tensor) -> torch.Tensor:
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
_d0 = {"BASIC": _B0, "BOTTLENECK": _B1}
class _H0(nn.Module):
def __init__(self, num_branches: int, blocks: type, num_blocks: list, num_inchannels: list, num_channels: list, fuse_method: str, multi_scale_output: bool = True):
super().__init__()
self._check_branches(num_branches, blocks, num_blocks, num_inchannels, num_channels)
self.num_inchannels = num_inchannels
self.fuse_method = fuse_method
self.num_branches = num_branches
self.multi_scale_output = multi_scale_output
self.branches = self._make_branches(num_branches, blocks, num_blocks, num_channels)
self.fuse_layers = self._make_fuse_layers()
self.relu = nn.ReLU(inplace=True)
def _check_branches(self, num_branches: int, blocks: type, num_blocks: list, num_inchannels: list, num_channels: list) -> None:
if num_branches != len(num_blocks):
raise ValueError("NUM_BRANCHES <> NUM_BLOCKS")
if num_branches != len(num_channels):
raise ValueError("NUM_BRANCHES <> NUM_CHANNELS")
if num_branches != len(num_inchannels):
raise ValueError("NUM_BRANCHES <> NUM_INCHANNELS")
def _make_one_branch(self, branch_index: int, block: type, num_blocks: list, num_channels: list, stride: int = 1) -> nn.Sequential:
downsample = None
if stride != 1 or self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.num_inchannels[branch_index], num_channels[branch_index] * block.expansion, kernel_size=1, stride=stride, bias=False),
BatchNorm2d(num_channels[branch_index] * block.expansion, momentum=_v0),
)
layers = [block(self.num_inchannels[branch_index], num_channels[branch_index], stride, downsample)]
self.num_inchannels[branch_index] = num_channels[branch_index] * block.expansion
for _ in range(1, num_blocks[branch_index]):
layers.append(block(self.num_inchannels[branch_index], num_channels[branch_index]))
return nn.Sequential(*layers)
def _make_branches(self, num_branches: int, block: type, num_blocks: list, num_channels: list) -> nn.ModuleList:
return nn.ModuleList([self._make_one_branch(i, block, num_blocks, num_channels) for i in range(num_branches)])
def _make_fuse_layers(self) -> nn.ModuleList | None:
if self.num_branches == 1:
return None
num_branches = self.num_branches
num_inchannels = self.num_inchannels
fuse_layers = []
for i in range(num_branches if self.multi_scale_output else 1):
fuse_layer = []
for j in range(num_branches):
if j > i:
fuse_layer.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[i], 1, 1, 0, bias=False), BatchNorm2d(num_inchannels[i], momentum=_v0)))
elif j == i:
fuse_layer.append(None)
else:
conv3x3s = []
for k in range(i - j):
if k == i - j - 1:
conv3x3s.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[i], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[i], momentum=_v0)))
else:
conv3x3s.append(nn.Sequential(nn.Conv2d(num_inchannels[j], num_inchannels[j], 3, 2, 1, bias=False), BatchNorm2d(num_inchannels[j], momentum=_v0), nn.ReLU(inplace=True)))
fuse_layer.append(nn.Sequential(*conv3x3s))
fuse_layers.append(nn.ModuleList(fuse_layer))
return nn.ModuleList(fuse_layers)
def get_num_inchannels(self) -> list:
return self.num_inchannels
def forward(self, x: list) -> list:
if self.num_branches == 1:
return [self.branches[0](x[0])]
for i in range(self.num_branches):
x[i] = self.branches[i](x[i])
x_fuse = []
for i in range(len(self.fuse_layers)):
y = x[0] if i == 0 else self.fuse_layers[i][0](x[0])
for j in range(1, self.num_branches):
if i == j:
y = y + x[j]
elif j > i:
y = y + F.interpolate(self.fuse_layers[i][j](x[j]), size=[x[i].shape[2], x[i].shape[3]], mode="bilinear")
else:
y = y + self.fuse_layers[i][j](x[j])
x_fuse.append(self.relu(y))
return x_fuse
class _H1(nn.Module):
def __init__(self, config: dict, lines: bool = False, **kwargs: Any) -> None:
self.inplanes = 64
self.lines = lines
extra = config["MODEL"]["EXTRA"]
super().__init__()
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=2, padding=1, bias=False)
self.bn1 = BatchNorm2d(self.inplanes, momentum=_v0)
self.conv2 = nn.Conv2d(self.inplanes, self.inplanes, kernel_size=3, stride=2, padding=1, bias=False)
self.bn2 = BatchNorm2d(self.inplanes, momentum=_v0)
self.relu = nn.ReLU(inplace=True)
self.layer1 = self._make_layer(_B1, 64, 64, 4)
self.stage2_cfg = extra["STAGE2"]
num_channels = [extra["STAGE2"]["NUM_CHANNELS"][i] * _d0[extra["STAGE2"]["BLOCK"]].expansion for i in range(len(extra["STAGE2"]["NUM_CHANNELS"]))]
self.transition1 = self._make_transition_layer([256], num_channels)
self.stage2, pre_stage_channels = self._make_stage(self.stage2_cfg, num_channels)
self.stage3_cfg = extra["STAGE3"]
num_channels = [extra["STAGE3"]["NUM_CHANNELS"][i] * _d0[extra["STAGE3"]["BLOCK"]].expansion for i in range(len(extra["STAGE3"]["NUM_CHANNELS"]))]
self.transition2 = self._make_transition_layer(pre_stage_channels, num_channels)
self.stage3, pre_stage_channels = self._make_stage(self.stage3_cfg, num_channels)
self.stage4_cfg = extra["STAGE4"]
num_channels = [extra["STAGE4"]["NUM_CHANNELS"][i] * _d0[extra["STAGE4"]["BLOCK"]].expansion for i in range(len(extra["STAGE4"]["NUM_CHANNELS"]))]
self.transition3 = self._make_transition_layer(pre_stage_channels, num_channels)
self.stage4, pre_stage_channels = self._make_stage(self.stage4_cfg, num_channels, multi_scale_output=True)
self.upsample = nn.Upsample(scale_factor=2, mode="nearest")
final_inp_channels = sum(pre_stage_channels) + self.inplanes
self.head = nn.Sequential(
nn.Conv2d(final_inp_channels, final_inp_channels, kernel_size=1),
BatchNorm2d(final_inp_channels, momentum=_v0),
nn.ReLU(inplace=True),
nn.Conv2d(final_inp_channels, config["MODEL"]["NUM_JOINTS"], kernel_size=extra["FINAL_CONV_KERNEL"]),
nn.Softmax(dim=1) if not self.lines else nn.Sigmoid(),
)
def _make_head(self, x: torch.Tensor, x_skip: torch.Tensor) -> torch.Tensor:
x = self.upsample(x)
x = torch.cat([x, x_skip], dim=1)
return self.head(x)
def _make_transition_layer(self, num_channels_pre_layer: list, num_channels_cur_layer: list) -> nn.ModuleList:
num_branches_cur = len(num_channels_cur_layer)
num_branches_pre = len(num_channels_pre_layer)
transition_layers = []
for i in range(num_branches_cur):
if i < num_branches_pre:
if num_channels_cur_layer[i] != num_channels_pre_layer[i]:
transition_layers.append(nn.Sequential(
nn.Conv2d(num_channels_pre_layer[i], num_channels_cur_layer[i], 3, 1, 1, bias=False),
BatchNorm2d(num_channels_cur_layer[i], momentum=_v0),
nn.ReLU(inplace=True),
))
else:
transition_layers.append(None)
else:
conv3x3s = []
for j in range(i + 1 - num_branches_pre):
inchannels = num_channels_pre_layer[-1]
outchannels = num_channels_cur_layer[i] if j == i - num_branches_pre else inchannels
conv3x3s.append(nn.Sequential(
nn.Conv2d(inchannels, outchannels, 3, 2, 1, bias=False),
BatchNorm2d(outchannels, momentum=_v0),
nn.ReLU(inplace=True),
))
transition_layers.append(nn.Sequential(*conv3x3s))
return nn.ModuleList(transition_layers)
def _make_layer(self, block: type, inplanes: int, planes: int, blocks: int, stride: int = 1) -> nn.Sequential:
downsample = None
if stride != 1 or inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
BatchNorm2d(planes * block.expansion, momentum=_v0),
)
layers = [block(inplanes, planes, stride, downsample)]
inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(block(inplanes, planes))
return nn.Sequential(*layers)
def _make_stage(self, layer_config: dict, num_inchannels: list, multi_scale_output: bool = True) -> tuple:
num_modules = layer_config["NUM_MODULES"]
num_blocks = layer_config["NUM_BLOCKS"]
num_channels = layer_config["NUM_CHANNELS"]
block = _d0[layer_config["BLOCK"]]
fuse_method = layer_config["FUSE_METHOD"]
modules = []
for i in range(num_modules):
reset_multi_scale_output = False if (not multi_scale_output and i == num_modules - 1) else True
modules.append(_H0(
layer_config["NUM_BRANCHES"], block, num_blocks, num_inchannels, num_channels,
fuse_method, reset_multi_scale_output,
))
num_inchannels = modules[-1].get_num_inchannels()
return nn.Sequential(*modules), num_inchannels
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.conv1(x)
x_skip = x.clone()
x = self.bn1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
x = self.layer1(x)
x_list = [self.transition1[i](x) if self.transition1[i] is not None else x for i in range(self.stage2_cfg["NUM_BRANCHES"])]
y_list = self.stage2(x_list)
x_list = [self.transition2[i](y_list[-1]) if self.transition2[i] is not None else y_list[i] for i in range(self.stage3_cfg["NUM_BRANCHES"])]
y_list = self.stage3(x_list)
x_list = [self.transition3[i](y_list[-1]) if self.transition3[i] is not None else y_list[i] for i in range(self.stage4_cfg["NUM_BRANCHES"])]
x = self.stage4(x_list)
height, width = x[0].size(2), x[0].size(3)
x1 = F.interpolate(x[1], size=(height, width), mode="bilinear", align_corners=False)
x2 = F.interpolate(x[2], size=(height, width), mode="bilinear", align_corners=False)
x3 = F.interpolate(x[3], size=(height, width), mode="bilinear", align_corners=False)
x = torch.cat([x[0], x1, x2, x3], 1)
return self._make_head(x, x_skip)
def init_weights(self, pretrained: str = "") -> None:
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
if pretrained and os.path.isfile(pretrained):
w = torch.load(pretrained, map_location="cpu", weights_only=False)
self.load_state_dict({k: v for k, v in w.items() if k in self.state_dict()}, strict=False)
def _g0(config: dict, pretrained: str = "", **kwargs: Any) -> _H1:
model = _H1(config, **kwargs)
model.init_weights(pretrained)
return model
_K0 = {
1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23,
11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29,
28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20,
45: 9, 50: 31, 52: 32, 57: 22,
}
def _p0(frames: list) -> torch.Tensor:
target_size = (540, 960)
batch = []
for frame in frames:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = cv2.resize(frame_rgb, (target_size[1], target_size[0]))
img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1))
batch.append(img)
return torch.from_numpy(np.stack(batch)).float()
def _e0(heatmap: torch.Tensor, scale: int = 2, max_keypoints: int = 1) -> torch.Tensor:
batch_size, n_channels, height, width = heatmap.shape
max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1)
local_maxima = max_pooled == heatmap
masked_heatmap = heatmap * local_maxima
flat_heatmap = masked_heatmap.view(batch_size, n_channels, -1)
scores, indices = torch.topk(flat_heatmap, max_keypoints, dim=-1, sorted=False)
y_coords = torch.div(indices, width, rounding_mode="floor") * scale
x_coords = (indices % width) * scale
return torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1)
def _p1(kp_coords: torch.Tensor, kp_threshold: float, w: int, h: int, batch_size: int) -> list:
kp_np = kp_coords.cpu().numpy()
batch_results = []
for batch_idx in range(batch_size):
kp_dict = {}
valid_kps = kp_np[batch_idx, :, 0, 2] > kp_threshold
for ch_idx in np.where(valid_kps)[0]:
x = float(kp_np[batch_idx, ch_idx, 0, 0]) / w
y = float(kp_np[batch_idx, ch_idx, 0, 1]) / h
p = float(kp_np[batch_idx, ch_idx, 0, 2])
kp_dict[int(ch_idx) + 1] = {"x": x, "y": y, "p": p}
batch_results.append(kp_dict)
return batch_results
def _g1(kp_points: dict) -> dict:
return {_K0[k]: v for k, v in kp_points.items() if k in _K0}
def _i0(frames: list, model: nn.Module, kp_threshold: float, device: str, batch_size: int = 2) -> list:
results = []
model_device = next(model.parameters()).device
for i in range(0, len(frames), batch_size):
current_batch_size = min(batch_size, len(frames) - i)
batch_frames = frames[i : i + current_batch_size]
batch = _p0(batch_frames).to(model_device)
with torch.no_grad():
heatmaps = model(batch)
kp_coords = _e0(heatmaps[:, :-1, :, :], scale=2, max_keypoints=1)
batch_results = _p1(kp_coords, kp_threshold, 960, 540, current_batch_size)
results.extend([_g1(kp) for kp in batch_results])
del heatmaps, kp_coords, batch
gc.collect()
if model_device.type == "cuda":
torch.cuda.empty_cache()
return results
def _x0(frames: list, model: nn.Module, kp_threshold: float, device: str = "cpu", batch_size: int = 2) -> list:
return _i0(frames, model, kp_threshold, device, batch_size)
def _n0(keypoints_result: list | None, batch_images: list, n_keypoints: int) -> list:
keypoints = []
if keypoints_result is not None and len(keypoints_result) > 0:
for frame_number_in_batch, kp_dict in enumerate(keypoints_result):
if frame_number_in_batch >= len(batch_images):
break
frame_keypoints: List[Tuple[int, int]] = []
try:
height, width = batch_images[frame_number_in_batch].shape[:2]
if kp_dict is not None and isinstance(kp_dict, dict):
for idx in range(32):
x, y, p = 0, 0, 0
kp_idx = idx + 1
if kp_idx in kp_dict:
try:
kp_data = kp_dict[kp_idx]
if isinstance(kp_data, dict) and "x" in kp_data and "y" in kp_data:
x = int(kp_data["x"] * width)
y = int(kp_data["y"] * height)
except Exception as e:
pass
frame_keypoints.append((x, y))
except (IndexError, ValueError, AttributeError):
frame_keypoints = [(0, 0)] * 32
if len(frame_keypoints) < n_keypoints:
frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints)))
else:
frame_keypoints = frame_keypoints[:n_keypoints]
keypoints.append(frame_keypoints)
return keypoints
def _f1(frame_keypoints: list, n_keypoints: int) -> list:
if len(frame_keypoints) < n_keypoints:
frame_keypoints = list(frame_keypoints) + [(0, 0)] * (n_keypoints - len(frame_keypoints))
elif len(frame_keypoints) > n_keypoints:
frame_keypoints = list(frame_keypoints)[:n_keypoints]
else:
frame_keypoints = list(frame_keypoints)
if frame_keypoints[2] != (0, 0) and frame_keypoints[4] != (0, 0) and frame_keypoints[3] == (0, 0):
frame_keypoints[3], frame_keypoints[4] = frame_keypoints[4], (0, 0)
if frame_keypoints[0] != (0, 0) and frame_keypoints[4] != (0, 0) and frame_keypoints[1] == (0, 0):
frame_keypoints[1], frame_keypoints[4] = frame_keypoints[4], (0, 0)
if frame_keypoints[2] != (0, 0) and frame_keypoints[3] != (0, 0) and frame_keypoints[1] == (0, 0) and frame_keypoints[3][0] > frame_keypoints[2][0]:
frame_keypoints[1], frame_keypoints[3] = frame_keypoints[3], (0, 0)
if frame_keypoints[28] != (0, 0) and frame_keypoints[25] == (0, 0) and frame_keypoints[26] != (0, 0) and frame_keypoints[26][0] > frame_keypoints[28][0]:
frame_keypoints[25], frame_keypoints[28] = frame_keypoints[28], (0, 0)
if frame_keypoints[24] != (0, 0) and frame_keypoints[28] != (0, 0) and frame_keypoints[25] == (0, 0):
frame_keypoints[25], frame_keypoints[28] = frame_keypoints[28], (0, 0)
if frame_keypoints[24] != (0, 0) and frame_keypoints[27] != (0, 0) and frame_keypoints[26] == (0, 0):
frame_keypoints[26], frame_keypoints[27] = frame_keypoints[27], (0, 0)
if frame_keypoints[28] != (0, 0) and frame_keypoints[23] == (0, 0) and frame_keypoints[20] != (0, 0) and frame_keypoints[20][1] > frame_keypoints[23][1]:
frame_keypoints[23], frame_keypoints[20] = frame_keypoints[20], (0, 0)
return frame_keypoints
def _c1(keypoints: list) -> list:
return [[round(float(x), 1), round(float(y), 1)] for x, y in keypoints]
def _l0(model_dir: Path, device: str | None = None, config_name: str = "hrnetv2_w48.yaml", weights_subdir: str | None = None) -> nn.Module:
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
config_path = model_dir / config_name
weights_path = (model_dir / weights_subdir / "keypoint") if weights_subdir else (model_dir / "keypoint")
if not config_path.exists():
raise FileNotFoundError(f"Keypoint config not found: {config_path}")
if not weights_path.exists():
raise FileNotFoundError(f"Keypoint weights not found: {weights_path}")
with open(config_path) as f:
cfg = yaml.safe_load(f)
loaded = torch.load(weights_path, map_location=device, weights_only=False)
state = loaded.get("state_dict", loaded) if isinstance(loaded, dict) else loaded
if not isinstance(state, dict):
raise ValueError(f"Keypoint weights must be state_dict or dict with 'state_dict'; got {type(state)}")
if state and next(iter(state.keys()), "").startswith("module."):
state = {k.replace("module.", "", 1): v for k, v in state.items()}
def _remap_head(k: str) -> str:
if k.startswith("head.0."):
return "head." + k[7:]
return k
state = {_remap_head(k): v for k, v in state.items()}
model = _g0(cfg)
model.load_state_dict(state, strict=True)
model.to(device)
model.eval()
return model
_C0 = 0
_C1 = 1
_C2 = 2
_C3 = 3
_D0 = 1280
_D1 = 0.4
_T0 = 0.5
_R0 = 5
_R1 = 0.10
_R2 = 0.70
_R3 = 8
kp_batch_size = 2
onnx_batch_size = 8
_q0 = 0.006719
_q1 = 0.010711
_P0 = True
_E0: bool = True
_E1: bool = True
_A0: bool = True
_S0 = 4
_F0: list[tuple[float, float]] = [
(5, 5), (5, 140), (5, 250), (5, 430), (5, 540), (5, 675), (55, 250), (55, 430),
(110, 340), (165, 140), (165, 270), (165, 410), (165, 540), (527, 5), (527, 253),
(527, 433), (527, 675), (888, 140), (888, 270), (888, 410), (888, 540), (940, 340),
(998, 250), (998, 430), (1045, 5), (1045, 140), (1045, 250), (1045, 430), (1045, 540),
(1045, 675), (435, 340), (615, 340),
]
_F1: list[tuple[float, float]] = [
(2.5, 2.5), (2.5, 139.5), (2.5, 249.5), (2.5, 430.5), (2.5, 540.5), (2.5, 678),
(54.5, 249.5), (54.5, 430.5), (110.5, 340.5), (164.5, 139.5), (164.5, 269), (164.5, 411),
(164.5, 540.5), (525, 2.5), (525, 249.5), (525, 430.5), (525, 678), (886.5, 139.5),
(886.5, 269), (886.5, 411), (886.5, 540.5), (940.5, 340.5), (998, 249.5), (998, 430.5),
(1048, 2.5), (1048, 139.5), (1048, 249.5), (1048, 430.5), (1048, 540.5), (1048, 678),
(434.5, 340), (615.5, 340),
]
_S1 = True
class _Bx(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
team_id: str | None = None
class _FRes(BaseModel):
frame_id: int
boxes: list[dict]
keypoints: list[list[float]]
class _Cfg:
def __init__(self, min_area: int = 1300, overlap_iou: float = 0.91):
self.overlap_iou = overlap_iou
def _d1(bb: _Bx, cy: float) -> float:
my = 0.5 * (float(bb.y1) + float(bb.y2))
return (my - cy) ** 2
def _i1(a: _Bx, b: _Bx) -> float:
ax1, ay1, ax2, ay2 = int(a.x1), int(a.y1), int(a.x2), int(a.y2)
bx1, by1, bx2, by2 = int(b.x1), int(b.y1), int(b.x2), int(b.y2)
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1)
inter = iw * ih
if inter <= 0:
return 0.0
area_a = (ax2 - ax1) * (ay2 - ay1)
area_b = (bx2 - bx1) * (by2 - by1)
union = area_a + area_b - inter
return inter / union if union > 0 else 0.0
def _s0(
results: list[_FRes],
window: int = _S0,
tids_by_frame: dict[int, list[int | None]] | None = None,
) -> list[_FRes]:
if window <= 1 or not results:
return results
fid_to_idx = {r.frame_id: i for i, r in enumerate(results)}
trajectories: dict[int, list[tuple[int, int, _Bx]]] = {}
for i, r in enumerate(results):
boxes_as_bx = [_Bx(**b) if isinstance(b, dict) else b for b in r.boxes]
for j, bb in enumerate(boxes_as_bx):
tid = tids_by_frame.get(r.frame_id, [None] * len(r.boxes))[j] if tids_by_frame else None
if tid is not None and tid >= 0:
tid = int(tid)
if tid not in trajectories:
trajectories[tid] = []
trajectories[tid].append((r.frame_id, j, bb))
smoothed: dict[tuple[int, int], tuple[int, int, int, int]] = {}
half = window // 2
for tid, items in trajectories.items():
items.sort(key=lambda x: x[0])
n = len(items)
for k in range(n):
fid, box_idx, bb = items[k]
result_idx = fid_to_idx[fid]
lo = max(0, k - half)
hi = min(n, k + half + 1)
cx_list = []
cy_list = []
w_list = []
h_list = []
for m in range(lo, hi):
b = items[m][2]
cx_list.append(0.5 * (b.x1 + b.x2))
cy_list.append(0.5 * (b.y1 + b.y2))
w_list.append(b.x2 - b.x1)
h_list.append(b.y2 - b.y1)
cx_avg = sum(cx_list) / len(cx_list)
cy_avg = sum(cy_list) / len(cy_list)
w_avg = sum(w_list) / len(w_list)
h_avg = sum(h_list) / len(h_list)
x1_new = int(round(cx_avg - w_avg / 2))
y1_new = int(round(cy_avg - h_avg / 2))
x2_new = int(round(cx_avg + w_avg / 2))
y2_new = int(round(cy_avg + h_avg / 2))
smoothed[(result_idx, box_idx)] = (x1_new, y1_new, x2_new, y2_new)
out: list[_FRes] = []
for i, r in enumerate(results):
boxes_as_bx = [_Bx(**b) if isinstance(b, dict) else b for b in r.boxes]
new_boxes: list[_Bx] = []
for j, bb in enumerate(boxes_as_bx):
key = (i, j)
if key in smoothed:
x1, y1, x2, y2 = smoothed[key]
new_boxes.append(
_Bx(
x1=x1,
y1=y1,
x2=x2,
y2=y2,
cls_id=int(bb.cls_id),
conf=float(bb.conf),
team_id=bb.team_id,
)
)
else:
new_boxes.append(
_Bx(
x1=int(bb.x1),
y1=int(bb.y1),
x2=int(bb.x2),
y2=int(bb.y2),
cls_id=int(bb.cls_id),
conf=float(bb.conf),
team_id=bb.team_id,
)
)
out.append(_FRes(frame_id=r.frame_id, boxes=[{"x1": b.x1, "y1": b.y1, "x2": b.x2, "y2": b.y2, "cls_id": b.cls_id, "conf": b.conf, "team_id": b.team_id} for b in new_boxes], keypoints=r.keypoints))
return out
def _a0(
bboxes: Iterable[_Bx],
*,
frame_width: int,
frame_height: int,
cfg: _Cfg | None = None,
do_goalkeeper_dedup: bool = True,
do_referee_disambiguation: bool = True,
) -> list[_Bx]:
cfg = cfg or _Cfg()
W, H = int(frame_width), int(frame_height)
cy = 0.5 * float(H)
kept: list[_Bx] = list(bboxes or [])
if cfg.overlap_iou > 0 and len(kept) > 1:
balls = [bb for bb in kept if int(bb.cls_id) == _C0]
non_balls = [bb for bb in kept if int(bb.cls_id) != _C0]
if len(non_balls) > 1:
non_balls_sorted = sorted(non_balls, key=lambda bb: float(bb.conf), reverse=True)
kept_nb = []
for cand in non_balls_sorted:
skip = False
for k in kept_nb:
iou = _i1(cand, k)
if iou >= cfg.overlap_iou:
skip = True
break
if (
abs(int(cand.x1) - int(k.x1)) <= 3
and abs(int(cand.y1) - int(k.y1)) <= 3
and abs(int(cand.x2) - int(k.x2)) <= 3
and abs(int(cand.y2) - int(k.y2)) <= 3
and iou > 0.85
):
skip = True
break
if not skip:
kept_nb.append(cand)
kept = kept_nb + balls
if do_goalkeeper_dedup:
gks = [bb for bb in kept if int(bb.cls_id) == _C1]
if len(gks) > 1:
best_gk = max(gks, key=lambda bb: float(bb.conf))
best_gk_conf = float(best_gk.conf)
deduped = []
for bb in kept:
if int(bb.cls_id) == _C1:
if float(bb.conf) < best_gk_conf or (float(bb.conf) == best_gk_conf and bb is not best_gk):
deduped.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=_C2, conf=float(bb.conf), team_id="1"))
else:
deduped.append(bb)
else:
deduped.append(bb)
kept = deduped
if do_referee_disambiguation:
refs = [bb for bb in kept if int(bb.cls_id) == _C3]
if len(refs) > 1:
best_ref = min(refs, key=lambda bb: _d1(bb, cy))
kept = [bb for bb in kept if int(bb.cls_id) != _C3 or bb is best_ref]
return kept
def _k0(feats: np.ndarray, iters: int = 20) -> tuple[np.ndarray, np.ndarray]:
n, d = feats.shape
if n <= 0:
return np.zeros((2, d), dtype=np.float32), np.zeros(0, dtype=np.int64)
if n == 1:
return np.stack([feats[0], feats[0]], axis=0), np.zeros(1, dtype=np.int64)
c0 = feats[0]
d0 = np.linalg.norm(feats - c0[None, :], axis=1)
c1 = feats[int(np.argmax(d0))]
d1 = np.linalg.norm(feats - c1[None, :], axis=1)
c0 = feats[int(np.argmax(d1))]
centroids = np.stack([c0, c1], axis=0).astype(np.float32)
labels = np.zeros(n, dtype=np.int64)
for _ in range(iters):
dist = ((feats[:, None, :] - centroids[None, :, :]) ** 2).sum(axis=2)
labels = dist.argmin(axis=1)
for k in (0, 1):
sel = feats[labels == k]
if len(sel) > 0:
centroids[k] = sel.mean(axis=0)
return centroids, labels
def _m0(prev: np.ndarray, new: np.ndarray) -> np.ndarray:
d00 = np.sum((prev[0] - new[0]) ** 2)
d11 = np.sum((prev[1] - new[1]) ** 2)
d01 = np.sum((prev[0] - new[1]) ** 2)
d10 = np.sum((prev[1] - new[0]) ** 2)
if d00 + d11 <= d01 + d10:
return new
return np.stack([new[1], new[0]], axis=0)
try:
import onnxruntime as _ort
_HAS_ONNXRUNTIME = True
except ImportError:
_HAS_ONNXRUNTIME = False
class _ReidE:
def __init__(self, onnx_path: Path, input_height: int = 256, input_width: int = 128):
if not _HAS_ONNXRUNTIME:
raise RuntimeError("onnxruntime required for ReID; pip install onnxruntime")
self.session = _ort.InferenceSession(str(onnx_path),
providers = [
(
"CUDAExecutionProvider",
{
"device_id": 0,
"cudnn_conv_algo_search": "HEURISTIC", # Disables exhaustive tuning
"enable_cuda_graph": False,
},
)
]
)
print("Active providers:", self.session.get_providers())
self.input_height = int(input_height)
self.input_width = int(input_width)
self._input_name = self.session.get_inputs()[0].name
dummy_input = np.zeros((_R3, 3, self.input_height, self.input_width), dtype=np.float32)
try:
self.session.run(None, {self._input_name: dummy_input})
except Exception as e:
pass # Ignore errors on warmup
def extract(
self,
frame_bgr: np.ndarray,
xyxy: tuple[int, int, int, int],
timings: Optional[dict[str, float]] = None,
) -> Optional[np.ndarray]:
x1, y1, x2, y2 = map(int, xyxy)
t0 = time.perf_counter()
H, W = frame_bgr.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(W, x2), min(H, y2)
if x2 <= x1 or y2 <= y1:
if timings is not None:
timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (time.perf_counter() - t0)
return None
bh, bw = y2 - y1, x2 - x1
if bh > bw and bw > 0:
y1_crop = y1 + int(round(_R1 * bh))
y2_crop = y1 + int(round(_R2 * bh))
y1 = min(y1_crop, y2 - 1)
y2 = max(y2_crop, y1 + 1)
if y2 <= y1:
if timings is not None:
timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (time.perf_counter() - t0)
return None
crop = frame_bgr[y1:y2, x1:x2]
if crop.size == 0:
if timings is not None:
timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (time.perf_counter() - t0)
return None
rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, (self.input_width, self.input_height), interpolation=cv2.INTER_LINEAR)
blob = resized.transpose(2, 0, 1).astype(np.float32) / 255.0
mean = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(3, 1, 1)
std = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(3, 1, 1)
blob = (blob - mean) / std
blob = blob[np.newaxis, ...].astype(np.float32)
t1 = time.perf_counter()
if timings is not None:
timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (t1 - t0)
try:
t2 = time.perf_counter()
out = self.session.run(None, {self._input_name: blob})[0]
t3 = time.perf_counter()
if timings is not None:
timings["onnx_inference"] = timings.get("onnx_inference", 0.0) + (t3 - t2)
except Exception:
if timings is not None:
timings["onnx_inference"] = timings.get("onnx_inference", 0.0)
return None
t4 = time.perf_counter()
if out is None or out.size == 0:
if timings is not None:
timings["postprocess_normalize"] = timings.get("postprocess_normalize", 0.0) + (time.perf_counter() - t4)
return None
emb = out.flatten().astype(np.float32)
n = float(np.linalg.norm(emb))
if n < 1e-6:
if timings is not None:
timings["postprocess_normalize"] = timings.get("postprocess_normalize", 0.0) + (time.perf_counter() - t4)
return None
if timings is not None:
timings["postprocess_normalize"] = timings.get("postprocess_normalize", 0.0) + (time.perf_counter() - t4)
return emb / n
def extract_batch(
self,
frame_xyxy_list: list[tuple[ndarray, tuple[int, int, int, int]]],
batch_size: int = _R3,
timings: Optional[dict[str, float]] = None,
) -> list[Optional[np.ndarray]]:
if not frame_xyxy_list:
return []
n = len(frame_xyxy_list)
out: list[Optional[np.ndarray]] = [None] * n
t_pre = time.perf_counter()
blobs: list[tuple[int, np.ndarray]] = []
for idx, (frame_bgr, xyxy) in enumerate(frame_xyxy_list):
x1, y1, x2, y2 = map(int, xyxy)
H, W = frame_bgr.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(W, x2), min(H, y2)
if x2 <= x1 or y2 <= y1:
continue
bh, bw = y2 - y1, x2 - x1
if bh > bw and bw > 0:
y1_crop = y1 + int(round(_R1 * bh))
y2_crop = y1 + int(round(_R2 * bh))
y1 = min(y1_crop, y2 - 1)
y2 = max(y2_crop, y1 + 1)
if y2 <= y1:
continue
crop = frame_bgr[y1:y2, x1:x2]
if crop.size == 0:
continue
rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, (self.input_width, self.input_height), interpolation=cv2.INTER_LINEAR)
blob = resized.transpose(2, 0, 1).astype(np.float32) / 255.0
mean = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(3, 1, 1)
std = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(3, 1, 1)
blob = (blob - mean) / std
blob = blob[np.newaxis, ...].astype(np.float32)
blobs.append((idx, blob))
if timings is not None:
timings["crop_preprocess"] = timings.get("crop_preprocess", 0.0) + (time.perf_counter() - t_pre)
if not blobs:
return out
t_infer_start = time.perf_counter()
batch_size = max(1, min(batch_size, len(blobs)))
# start_time = time.time()
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
for start in range(0, len(blobs), batch_size):
chunk = blobs[start : start + batch_size]
indices = [c[0] for c in chunk]
batch_blob = np.concatenate([c[1] for c in chunk], axis=0)
# Pad batch to strictly enforce consistent shape
actual_batch_size = batch_blob.shape[0]
if actual_batch_size < batch_size:
pad_width = batch_size - actual_batch_size
padding = np.zeros((pad_width, *batch_blob.shape[1:]), dtype=batch_blob.dtype)
batch_blob = np.concatenate([batch_blob, padding], axis=0)
# onnx_time = time.time()
try:
run_out = self.session.run(None, {self._input_name: batch_blob})[0]
run_out = run_out[:actual_batch_size]
except Exception:
continue
# print(f"Onnx time: {time.time() - onnx_time}")
t_post_start = time.perf_counter()
if timings is not None:
timings["onnx_inference"] = timings.get("onnx_inference", 0.0) + (t_post_start - t_infer_start)
for i, orig_idx in enumerate(indices):
if i >= run_out.shape[0]:
continue
emb = run_out[i].astype(np.float32, copy=False)
nrm = float(np.linalg.norm(emb))
if nrm >= 1e-6:
out[orig_idx] = emb / nrm
if timings is not None:
timings["postprocess_normalize"] = timings.get("postprocess_normalize", 0.0) + (time.perf_counter() - t_post_start)
t_infer_start = time.perf_counter()
# print(f"Time taken: {time.time() - start_time}")
return out
class _ReidT:
def __init__(self, centroids: Optional[np.ndarray] = None, ema_alpha: float = 0.8):
self.centroids = centroids
self.ema_alpha = float(ema_alpha)
def assign(self, feats: list, use_for_centroid: Optional[list] = None) -> list[int]:
all_valid = [f for f in feats if f is not None]
valid = all_valid
if use_for_centroid is not None and len(use_for_centroid) == len(feats):
valid = [f for i, f in enumerate(feats) if f is not None and use_for_centroid[i]]
if self.centroids is None and len(valid) < 2 and len(all_valid) >= 2:
valid = all_valid
min_required = 2 if self.centroids is None else max(2, _R0)
if len(valid) >= min_required:
X = np.stack(valid, axis=0).astype(np.float32)
c_new, _labels = _k0(X, iters=20)
frac0 = float((_labels == 0).mean())
frac1 = float((_labels == 1).mean())
sep = float(np.linalg.norm(c_new[0] - c_new[1])) if len(c_new) == 2 else 0.0
do_update = (min(frac0, frac1) >= 0.15 and sep >= 0.05) or (self.centroids is None and min(frac0, frac1) >= 0.10)
if do_update:
if self.centroids is None:
self.centroids = c_new.copy()
else:
c_new = _m0(self.centroids, c_new)
a = self.ema_alpha
self.centroids = a * self.centroids + (1.0 - a) * c_new
out = []
for f in feats:
if f is None or self.centroids is None:
out.append(1)
continue
d0 = float(np.sum((f - self.centroids[0]) ** 2))
d1 = float(np.sum((f - self.centroids[1]) ** 2))
out.append(1 if d0 <= d1 else 2)
return out
def challenge_template(path_hf_repo) -> ndarray:
return imread(f"{path_hf_repo}/football_pitch_template.png")
current_path = str(os.path.dirname(os.path.abspath(__file__)))
template_image = challenge_template(current_path)
template_image_gray = cvtColor(template_image, COLOR_BGR2GRAY)
_sparse_template_cache: dict[tuple[int, int], list[tuple[int, int]]] = {}
_shared_eval_executor: ThreadPoolExecutor | None = None
class MaxSizeCache(OrderedDict):
"""
Fixed-size dictionary behaving like a deque(maxlen=N).
Stores key–value pairs with FIFO eviction.
"""
def __init__(self, maxlen=500):
super().__init__()
self.maxlen = maxlen
self._lock = threading.Lock()
def set(self, key, value):
"""Insert or update an item. Evicts oldest if full."""
with self._lock:
if key in self:
del self[key] # refresh position
super().__setitem__(key, value)
if len(self) > self.maxlen:
self.popitem(last=False) # remove oldest
def get(self, key, default=None):
"""Retrieve an item without changing order."""
with self._lock:
return super().get(key, default)
def exists(self, key):
"""Check if a key exists."""
with self._lock:
return key in self
def load(self, data_dict):
"""
Load initial data into cache.
Oldest items evicted if data exceeds maxlen.
"""
for k, v in data_dict.items():
self.set(k, v)
def __repr__(self):
return f"MaxSizeCache(maxlen={self.maxlen}, data={dict(self)})"
cached = MaxSizeCache()
_per_key_locks = defaultdict(threading.Lock)
def get_or_compute_masks(key, compute_fn):
lock = _per_key_locks[key]
with lock:
if cached.exists(key):
return cached.get(key)
# compute once
masks = compute_fn()
cached.set(key, masks)
return masks
INDEX_KEYPOINT_CORNER_BOTTOM_LEFT = 5
INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT = 29
INDEX_KEYPOINT_CORNER_TOP_LEFT = 0
INDEX_KEYPOINT_CORNER_TOP_RIGHT = 24
KEYPOINTS: list[tuple[int, int]] = [
(5, 5), # 1
(5, 140), # 2
(5, 250), # 3
(5, 430), # 4
(5, 540), # 5
(5, 675), # 6
# -------------
(55, 250), # 7
(55, 430), # 8
# -------------
(110, 340), # 9
# -------------
(165, 140), # 10
(165, 270), # 11
(165, 410), # 12
(165, 540), # 13
# -------------
(527, 5), # 14
(527, 253), # 15
(527, 433), # 16
(527, 675), # 17
# -------------
(888, 140), # 18
(888, 270), # 19
(888, 410), # 20
(888, 540), # 21
# -------------
(940, 340), # 22
# -------------
(998, 250), # 23
(998, 430), # 24
# -------------
(1045, 5), # 25
(1045, 140), # 26
(1045, 250), # 27
(1045, 430), # 28
(1045, 540), # 29
(1045, 675), # 30
# -------------
(435, 340), # 31
(615, 340), # 32
]
KEYPOINTS_NP = np.asarray(KEYPOINTS, dtype=np.float32)
FOOTBALL_KEYPOINTS: list[tuple[int, int]] = [
(0, 0), # 1
(0, 0), # 2
(0, 0), # 3
(0, 0), # 4
(0, 0), # 5
(0, 0), # 6
(0, 0), # 7
(0, 0), # 8
(0, 0), # 9
(0, 0), # 10
(0, 0), # 11
(0, 0), # 12
(0, 0), # 13
(0, 0), # 14
(527, 283), # 15
(527, 403), # 16
(0, 0), # 17
(0, 0), # 18
(0, 0), # 19
(0, 0), # 20
(0, 0), # 21
(0, 0), # 22
(0, 0), # 23
(0, 0), # 24
(0, 0), # 25
(0, 0), # 26
(0, 0), # 27
(0, 0), # 28
(0, 0), # 29
(0, 0), # 30
(405, 340), # 31
(645, 340), # 32
]
FOOTBALL_KEYPOINTS_NP = np.asarray(FOOTBALL_KEYPOINTS, dtype=np.float32)
groups = {
1: [2, 3, 7, 10],
2: [1, 3, 7, 10],
3: [2, 4, 7, 8],
4: [3, 5, 8, 7],
5: [4, 8, 6, 3],
6: [5, 4, 8, 13],
7: [3, 8, 9, 10],
8: [4, 7, 9, 13],
9: [7, 8, 11, 12],
10: [9, 11, 7, 2],
11: [9, 10, 12, 31],
12: [9, 11, 13, 31],
13: [9, 12, 8, 5],
14: [15, 31, 32, 16],
15: [31, 16, 32, 14],
16: [31, 15, 32, 17],
17: [31, 16, 32, 15],
18: [19, 22, 23, 26],
19: [18, 22, 20, 32],
20: [19, 22, 21, 32],
21: [20, 22, 24, 29],
22: [23, 24, 19, 20],
23: [27, 24, 22, 28],
24: [28, 23, 22, 27],
25: [26, 27, 23, 18],
26: [25, 27, 23, 18],
27: [26, 23, 28, 24],
28: [27, 24, 29, 23],
29: [28, 30, 24, 21],
30: [29, 28, 24, 21],
31: [15, 16, 32, 14],
32: [15, 31, 16, 14]
}
base_temps = [(0, 0)] * 32
_TEMPLATE_MAX_X: int = 1045
_TEMPLATE_MAX_Y: int = 675
# Precomputed group arrays for faster neighbor lookup (0-based).
GROUPS_ARRAY = [np.asarray(groups[i], dtype=np.int32) - 1 for i in range(1, 33)]
kernel = getStructuringElement(MORPH_RECT, (31, 31))
dilate_kernel = getStructuringElement(
MORPH_RECT, (3, 3)
)
class InvalidMask(Exception):
pass
def has_a_wide_line(mask: ndarray, max_aspect_ratio: float = 1.0) -> bool:
contours, _ = findContours(mask, RETR_EXTERNAL, CHAIN_APPROX_SIMPLE)
for cnt in contours:
x, y, w, h = boundingRect(cnt)
# Early exit optimization
if w == 0 or h == 0:
continue
aspect_ratio = min(w, h) / max(w, h)
if aspect_ratio >= max_aspect_ratio:
return True
return False
def is_bowtie(points: ndarray) -> bool:
def segments_intersect(p1: int, p2: int, q1: int, q2: int) -> bool:
def ccw(a: int, b: int, c: int):
return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0])
return (ccw(p1, q1, q2) != ccw(p2, q1, q2)) and (
ccw(p1, p2, q1) != ccw(p1, p2, q2)
)
pts = points.reshape(-1, 2)
edges = [(pts[0], pts[1]), (pts[1], pts[2]), (pts[2], pts[3]), (pts[3], pts[0])]
return segments_intersect(*edges[0], *edges[2]) or segments_intersect(
*edges[1], *edges[3]
)
def validate_mask_lines(mask: ndarray) -> None:
# Use fast count instead of sum when possible
nonzero_count = countNonZero(mask)
if nonzero_count == 0:
raise InvalidMask("No projected lines")
if nonzero_count == mask.size:
raise InvalidMask("Projected lines cover the entire image surface")
# Skip expensive contour check if mask is small
if has_a_wide_line(mask=mask):
raise InvalidMask("A projected line is too wide")
def validate_mask_ground(mask: ndarray) -> None:
num_labels, _ = connectedComponents(mask)
num_distinct_regions = num_labels - 1
if num_distinct_regions > 1:
raise InvalidMask(
f"Projected ground should be a single object, detected {num_distinct_regions}"
)
area_covered = mask.sum() / mask.size
if area_covered >= 0.9:
raise InvalidMask(
f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic"
)
def validate_projected_corners(
source_keypoints: list[tuple[int, int]], homography_matrix: ndarray
) -> None:
# Vectorized: use fancy indexing to extract corners
corner_indices = np.array([
INDEX_KEYPOINT_CORNER_BOTTOM_LEFT,
INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT,
INDEX_KEYPOINT_CORNER_TOP_RIGHT,
INDEX_KEYPOINT_CORNER_TOP_LEFT
], dtype=np.int32)
# Convert to array once and index
if isinstance(source_keypoints, np.ndarray):
src_corners = source_keypoints[corner_indices]
else:
src_arr = np.array(source_keypoints, dtype=np.float32)
src_corners = src_arr[corner_indices]
src_corners = src_corners[None, :, :]
warped_corners = perspectiveTransform(src_corners, homography_matrix)[0]
if is_bowtie(warped_corners):
raise InvalidMask("Projection twisted!")
def project_image_using_keypoints(
image: ndarray,
source_keypoints: list[tuple[int, int]],
destination_keypoints: list[tuple[int, int]],
destination_width: int,
destination_height: int,
inverse: bool = False,
) -> ndarray:
# Vectorized filtering: convert to arrays and filter with boolean mask
src_arr = np.array(source_keypoints, dtype=np.float32)
dst_arr = np.array(destination_keypoints, dtype=np.float32)
# Vectorized mask: filter out (0, 0) destination points
valid_mask = ~((dst_arr[:, 0] == 0) & (dst_arr[:, 1] == 0))
source_points = src_arr[valid_mask]
destination_points = dst_arr[valid_mask]
H, _ = findHomography(source_points, destination_points)
if H is None:
raise InvalidMask("Homography not found")
validate_projected_corners(source_keypoints=source_keypoints, homography_matrix=H)
projected_image = warpPerspective(image, H, (destination_width, destination_height))
return projected_image
def extract_masks_for_ground_and_lines(image: ndarray,) -> tuple[ndarray, ndarray]:
"""assumes template coloured s.t. ground = gray, lines = white, background = black"""
# gray = cvtColor(image, COLOR_BGR2GRAY)
gray = image
_, mask_ground = threshold(gray, 10, 1, THRESH_BINARY)
x, y, w, h = cv2.boundingRect(cv2.findNonZero(mask_ground))
rect_size = w * h
area_size = countNonZero(mask_ground)
is_rect = area_size == rect_size
if is_rect:
raise InvalidMask(
f"Projected ground should not be rectangular"
)
total_pixels = mask_ground.size
ground_nonzero = int(countNonZero(mask_ground))
if ground_nonzero == 0:
raise InvalidMask("No projected ground")
area_covered = ground_nonzero / float(total_pixels)
if area_covered >= 0.9:
raise InvalidMask(f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic")
validate_mask_ground(mask=mask_ground)
_, mask_lines = threshold(gray, 200, 1, THRESH_BINARY)
validate_mask_lines(mask=mask_lines)
return mask_ground, mask_lines
def get_edge_mask(x, y, W, H, t):
"""Uses bitmasking instead of sets for speed."""
mask = 0
if x <= t: mask |= 1 # Left
if x >= W - t: mask |= 2 # Right
if y <= t: mask |= 4 # Top
if y >= H - t: mask |= 8 # Bottom
return mask
def both_points_same_direction_fast(A, B, W, H, t=100):
mask_a = get_edge_mask(A[0], A[1], W, H, t)
if mask_a == 0: return False
mask_b = get_edge_mask(B[0], B[1], W, H, t)
if mask_b == 0: return False
# Bitwise AND: if any bit matches, they share an edge
return (mask_a & mask_b) != 0
def canonical(obj):
# numpy arrays -> keep order
if isinstance(obj, np.ndarray):
return canonical(obj.tolist())
# ordered sequences
if isinstance(obj, (list, tuple)):
return tuple(canonical(x) for x in obj)
# unordered sets
if isinstance(obj, set):
return tuple(sorted(canonical(x) for x in obj))
# dictionaries (keys may not be ordered)
if isinstance(obj, dict):
return tuple((k, canonical(v)) for k, v in sorted(obj.items()))
return obj # primitive types
def fast_cache_key(frame_keypoints, w, h):
# Byte-based key avoids deep recursion/tuples while preserving order.
# Optimize: check if already array to avoid copy
if isinstance(frame_keypoints, np.ndarray):
if frame_keypoints.dtype == np.int32:
arr = frame_keypoints
else:
arr = frame_keypoints.astype(np.int32)
else:
arr = np.asarray(frame_keypoints, dtype=np.int32)
return (arr.tobytes(), int(w), int(h))
blacklists = [
[23, 24, 27, 28],
[7, 8, 3, 4],
[2, 10, 1, 14],
[18, 26, 14, 25],
[5, 13, 6, 17],
[21, 29, 17, 30],
[10, 11, 2, 3],
[10, 11, 2, 7],
[12, 13, 4, 5],
[12, 13, 5, 8],
[18, 19, 26, 27],
[18, 19, 26, 23],
[20, 21, 24, 29],
[20, 21, 28, 29],
[8, 4, 5, 13],
[3, 7, 2, 10],
[23, 27, 18, 26],
[24, 28, 21, 29]
]
prepared_blacklists = [(set(bl), bl[0]-1, bl[1]-1) for bl in blacklists]
def evaluate_keypoints_for_frame(
frame_keypoints: list[tuple[int, int]],
frame_index,
h,
w,
precomputed_key=None,
) -> float:
global cache
# key = canonical((frame_keypoints, w, h))
key = precomputed_key or canonical(frame_keypoints, w, h)
template_keypoints = KEYPOINTS
floor_markings_template = template_image_gray
# start = time.time()
try:
# h, w = frame.shape[:2]
def compute_masks_for_key(frame_keypoints, w, h):
try:
non_idxs_set = {i + 1 for i, kpt in enumerate(frame_keypoints) if kpt[0] != 0 or kpt[1] != 0}
for bl_set, idx0, idx1 in prepared_blacklists:
if non_idxs_set.issubset(bl_set):
if both_points_same_direction_fast(frame_keypoints[idx0], frame_keypoints[idx1], w, h):
return None, 0, None
warped_template = project_image_using_keypoints(
image=floor_markings_template,
source_keypoints=template_keypoints,
destination_keypoints=frame_keypoints,
destination_width=w,
destination_height=h,
)
mask_ground, mask_lines_expected = extract_masks_for_ground_and_lines(
image=warped_template
)
mask_expected_on_ground = mask_lines_expected
ys, xs = np.where(mask_lines_expected == 1)
if len(xs) == 0:
bbox = None # no foreground pixels
else:
min_x = xs.min()
max_x = xs.max()
min_y = ys.min()
max_y = ys.max()
bbox = (min_x, min_y, max_x, max_y)
bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) if bbox is not None else 1
frame_area = h * w
if (bbox_area / frame_area) < 0.2:
return None, 0, None
pixels_on_lines = int(countNonZero(mask_expected_on_ground))
return mask_expected_on_ground, pixels_on_lines, mask_ground
except Exception as e:
return None, 0, None
mask_expected_on_ground, pixels_on_lines, mask_ground = get_or_compute_masks(
key, lambda: compute_masks_for_key(frame_keypoints, w, h)
)
if mask_expected_on_ground is None or pixels_on_lines == 0 or mask_ground is None:
return 0.0
image_edges = check_frame[frame_index]
h, w = mask_expected_on_ground.shape[:2]
work_buffer = np.zeros((h, w), dtype=np.uint8)
bitwise_and(
image_edges,
image_edges,
dst=work_buffer,
mask=mask_ground
)
dilate(work_buffer, dilate_kernel, dst=work_buffer, iterations=3)
threshold(work_buffer, 0, 255, cv2.THRESH_BINARY, dst=work_buffer)
pixels_predicted_count = countNonZero(work_buffer)
bitwise_and(work_buffer, mask_expected_on_ground, dst=work_buffer)
pixels_overlapping = countNonZero(work_buffer)
pixels_rest = pixels_predicted_count - pixels_overlapping
total_pixels = pixels_predicted_count + pixels_on_lines - pixels_overlapping
if total_pixels > 0 and (pixels_rest / total_pixels) > 0.9:
return 0.0
score = pixels_overlapping / (pixels_on_lines + 1e-8)
return score
except Exception as e:
pass
return 0.0
def _generate_sparse_template_keypoints(frame_width: int, frame_height: int) -> list[tuple[int, int]]:
key = (int(frame_width), int(frame_height))
if key in _sparse_template_cache:
return _sparse_template_cache[key]
template_max_x, template_max_y = (1045, 675)
sx = float(frame_width) / float(template_max_x if template_max_x != 0 else 1)
sy = float(frame_height) / float(template_max_y if template_max_y != 0 else 1)
# Vectorized scaling and rounding
scale_factors = np.array([sx, sy], dtype=np.float32)
scaled_np = np.round(FOOTBALL_KEYPOINTS_NP * scale_factors).astype(np.int32)
scaled = [(int(x), int(y)) for x, y in scaled_np]
_sparse_template_cache[key] = scaled
return scaled
def convert_keypoints_to_val_format(keypoints):
# Vectorized: convert to numpy, cast, then back to list of tuples
if not keypoints:
return []
arr = np.asarray(keypoints, dtype=np.int32)
return [(int(x), int(y)) for x, y in arr]
def are_collinear(pts, eps=1e-9):
pts = np.asarray(pts)
if len(pts) < 3:
return True
a, b, c = pts[:3]
area = np.abs(np.cross(b - a, c - a))
return area < eps
def line_to_line_transform(P1, P2, Q1, Q2):
"""
Compute 2D affine transformation mapping line segment P1P2 -> Q1Q2
Optimized version reducing allocations.
Parameters:
P1, P2: source points (x, y)
Q1, Q2: target points (x, y)
Returns:
M: 3x3 homogeneous transformation matrix
"""
P1 = np.asarray(P1, dtype=np.float64)
P2 = np.asarray(P2, dtype=np.float64)
Q1 = np.asarray(Q1, dtype=np.float64)
Q2 = np.asarray(Q2, dtype=np.float64)
# Source and target vectors
v_s = P2 - P1
v_t = Q2 - Q1
# Scale factor (using hypot for better numerical stability)
norm_s = np.hypot(v_s[0], v_s[1])
norm_t = np.hypot(v_t[0], v_t[1])
s = norm_t / norm_s
# Rotation angle
theta = np.arctan2(v_t[1], v_t[0]) - np.arctan2(v_s[1], v_s[0])
# Precompute sin/cos
cos_theta = np.cos(theta)
sin_theta = np.sin(theta)
# 2x2 scaled rotation components
sr00 = s * cos_theta
sr01 = -s * sin_theta
sr10 = s * sin_theta
sr11 = s * cos_theta
# Translation (direct computation avoiding matrix mul)
t0 = Q1[0] - (sr00 * P1[0] + sr01 * P1[1])
t1 = Q1[1] - (sr10 * P1[0] + sr11 * P1[1])
# Homogeneous 3x3 matrix (direct construction)
M = np.array([
[sr00, sr01, t0],
[sr10, sr11, t1],
[0.0, 0.0, 1.0]
], dtype=np.float64)
return M
def three_point_affine(P, Q):
P = np.array(P, dtype=np.float64)
Q = np.array(Q, dtype=np.float64)
n = P.shape[0]
# Vectorized construction of least-squares system
x, y = P[:, 0], P[:, 1]
u, v = Q[:, 0], Q[:, 1]
# Pre-allocate A matrix
A = np.zeros((2*n, 6), dtype=np.float64)
A[0::2, 0] = x
A[0::2, 1] = y
A[0::2, 2] = 1
A[1::2, 3] = x
A[1::2, 4] = y
A[1::2, 5] = 1
# Vectorized b vector
b = np.empty(2*n, dtype=np.float64)
b[0::2] = u
b[1::2] = v
# Solve least squares (robust to collinear points)
params, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
a, b_, e, c, d, f = params
# Homogeneous transformation matrix
M = np.array([
[a, b_, e],
[c, d, f],
[0, 0, 1]
], dtype=np.float64)
return M
def affine_from_4_points(src_pts, dst_pts):
"""
Compute a 2D affine transformation from 4 source points to 4 target points using least-squares.
Vectorized version for better performance.
Parameters:
src_pts: list of 4 source points [(x1,y1),..., (x4,y4)]
dst_pts: list of 4 target points [(u1,v1),..., (u4,v4)]
Returns:
3x3 homogeneous affine transformation matrix
"""
P = np.array(src_pts, dtype=np.float64)
Q = np.array(dst_pts, dtype=np.float64)
# Vectorized construction of 8x6 system (2 eqs per point)
x, y = P[:, 0], P[:, 1]
u, v = Q[:, 0], Q[:, 1]
A = np.zeros((8, 6), dtype=np.float64)
A[0::2, 0] = x
A[0::2, 1] = y
A[0::2, 2] = 1
A[1::2, 3] = x
A[1::2, 4] = y
A[1::2, 5] = 1
b = np.empty(8, dtype=np.float64)
b[0::2] = u
b[1::2] = v
# Solve least-squares
params, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
a, b_, e, c, d, f = params
# Construct 3x3 affine matrix
M = np.array([
[a, b_, e],
[c, d, f],
[0, 0, 1]
], dtype=np.float64)
return M
def four_point_homography(src_pts, dst_pts):
"""
Compute 2D homography mapping 4 source points to 4 target points.
Vectorized version for better performance.
src_pts: list of 4 source points [(x1,y1),..., (x4,y4)]
dst_pts: list of 4 target points [(u1,v1),..., (u4,v4)]
Returns:
3x3 homography matrix
"""
# Vectorized construction of A matrix
src = np.array(src_pts, dtype=np.float64)
dst = np.array(dst_pts, dtype=np.float64)
x, y = src[:, 0], src[:, 1]
u, v = dst[:, 0], dst[:, 1]
# Pre-allocate A matrix
A = np.zeros((8, 9), dtype=np.float64)
A[0::2, 0] = -x
A[0::2, 1] = -y
A[0::2, 2] = -1
A[0::2, 6] = x * u
A[0::2, 7] = y * u
A[0::2, 8] = u
A[1::2, 3] = -x
A[1::2, 4] = -y
A[1::2, 5] = -1
A[1::2, 6] = x * v
A[1::2, 7] = y * v
A[1::2, 8] = v
# Solve Ah=0 using SVD
_, _, Vt = np.linalg.svd(A)
h = Vt[-1, :] # last row of V^T
H = h.reshape(3, 3)
# Normalize
H /= H[2, 2]
return H
def unique_points(src, dst):
src, dst = np.asarray(src, float), np.asarray(dst, float)
# Vectorized filtering for zero points
src_nonzero = ~np.all(np.abs(src) < 1e-9, axis=1)
dst_nonzero = ~np.all(np.abs(dst) < 1e-9, axis=1)
valid_mask = src_nonzero & dst_nonzero
if not valid_mask.any():
return np.array([]), np.array([])
src_valid = src[valid_mask]
dst_valid = dst[valid_mask]
# Remove duplicates using numpy unique
_, unique_idx = np.unique(src_valid, axis=0, return_index=True)
unique_idx.sort() # preserve order
return src_valid[unique_idx], dst_valid[unique_idx]
def robust_transform(src_pts, dst_pts):
src, dst = unique_points(src_pts, dst_pts)
n = len(src)
if n >= 4:
if are_collinear(src) or are_collinear(dst):
H = affine_from_4_points(src, dst)
return lambda pt: apply_transform(H, pt)
else:
H = four_point_homography(src, dst)
return lambda pt: apply_homo_transform(H, pt)
elif n==3:
H = three_point_affine(src,dst)
elif n==2:
H = line_to_line_transform(src[0],src[1],dst[0],dst[1])
elif n==1:
t = dst[0]-src[0]
H = np.eye(3)
H[:2,2] = t
else:
H = np.eye(3)
return lambda pt: apply_transform(H, pt)
def apply_homo_transform(M, P):
# Optimized: direct indexing instead of array creation
x, y = P[0], P[1]
# Apply transformation with pre-computed homogeneous coords
w = M[2, 0] * x + M[2, 1] * y + M[2, 2]
x_new = (M[0, 0] * x + M[0, 1] * y + M[0, 2]) / w
y_new = (M[1, 0] * x + M[1, 1] * y + M[1, 2]) / w
# Displacement vector
return (int(x_new - x), int(y_new - y))
def apply_transform(M, P):
"""
Transform a single 2D point using a 3x3 transformation matrix H.
Optimized version avoiding array creation.
Args:
H : 3x3 numpy array
Transformation matrix (homography, affine, similarity, etc.)
point : (x, y) array-like
Single point coordinates to transform.
Returns:
(x', y') : Transformed point coordinates
"""
# Direct computation without intermediate arrays
x, y = P[0], P[1]
x_new = M[0, 0] * x + M[0, 1] * y + M[0, 2]
y_new = M[1, 0] * x + M[1, 1] * y + M[1, 2]
return (int(x_new), int(y_new))
def pick_pt(points):
# Fully vectorized neighbor expansion preserving original order.
if not points:
return []
pts_arr = np.asarray(points, dtype=np.int32)
seen = np.zeros(32, dtype=bool)
valid_mask = (pts_arr >= 0) & (pts_arr < 32)
seen[pts_arr[valid_mask]] = True
out_seen = np.zeros(32, dtype=bool)
out = []
for p in pts_arr[valid_mask]:
neigh = GROUPS_ARRAY[p]
candidates = neigh[~seen[neigh] & ~out_seen[neigh]]
out_seen[candidates] = True
out.extend(candidates.tolist())
return out
def make_possible_keypoints(all_keypoints, frame_width, frame_height, limit=2):
# Early exit for empty input
if not all_keypoints:
return []
results = []
for keypoints in all_keypoints:
# --- FIX APPLIED HERE ---
# np.asarray is smart: it avoids copying if the input is already
# the right type/shape, but allows it if conversion is needed.
arr = np.asarray(keypoints, dtype=np.int32)
# Basic shape validation
if arr.ndim != 2 or arr.shape[1] != 2:
continue
# Fast Masking and Counting
mask = (arr[:, 0] != 0) & (arr[:, 1] != 0)
non_zero_count = mask.sum()
# Logic Flow
if non_zero_count > 4:
results.append(keypoints)
continue
if non_zero_count < 2:
continue
# If exactly 4, we append the original BUT continue to try and find the 5th
if non_zero_count == 4:
results.append(keypoints)
# Prepare Transformation Data
non_zero_idxs = np.flatnonzero(mask)
# Assuming KEYPOINTS_NP is available globally
src = KEYPOINTS_NP[non_zero_idxs]
dest = arr[non_zero_idxs].astype(np.float32)
try:
# transform_func is calculated once
transform_func = robust_transform(src, dest)
except Exception:
continue
# Get candidate indices to check
candidate_idxs = pick_pt(non_zero_idxs.tolist())
if not candidate_idxs:
continue
# Pre-calculate Valid Projections
valid_cache = {}
valid_real_idxs = []
for idx in candidate_idxs:
# Transform point
t_pt = transform_func(KEYPOINTS_NP[idx])
# Unroll checks for speed
tx, ty = t_pt[0], t_pt[1]
# Boundary check
if 0 <= tx < frame_width and 0 <= ty < frame_height:
valid_cache[idx] = (int(tx), int(ty))
valid_real_idxs.append(idx)
# Check if we have enough valid points to satisfy the request
n_missing = 5 - non_zero_count
if len(valid_real_idxs) < n_missing:
continue
# Generate Combinations
cnt = 0
for group in combinations(valid_real_idxs, n_missing):
if cnt >= limit:
break
cnt += 1
# Create the result list
# A shallow copy of the list is much faster than recreating a numpy object array.
new_result = list(keypoints)
# Fill in the missing points from our cache
for idx in group:
new_result[idx] = valid_cache[idx]
results.append(new_result)
return results
def _get_shared_eval_executor(max_workers: int) -> ThreadPoolExecutor:
global _shared_eval_executor
if _shared_eval_executor is None:
_shared_eval_executor = ThreadPoolExecutor(max_workers=max_workers)
return _shared_eval_executor
def evaluates(jobs, h, w, total_frames: int):
# start_time = time.time()
if len(jobs) == 0:
return []
unique_jobs = [] # (job, frame_index, key_bytes)
seen = set()
for (job, frame_index) in jobs:
try:
# Optimize: check if already array
if isinstance(job, np.ndarray):
key_bytes = job.astype(np.int32).tobytes() if job.dtype != np.int32 else job.tobytes()
else:
key_bytes = np.asarray(job, dtype=np.int32).tobytes()
sig = (frame_index, key_bytes)
if sig in seen:
continue
seen.add(sig)
unique_jobs.append((job, frame_index, key_bytes))
except Exception as e:
continue
if len(unique_jobs) <= 10:
scores_unique = [
evaluate_keypoints_for_frame(job, frame_index, h, w, precomputed_key=(key_bytes, w, h))
for (job, frame_index, key_bytes) in unique_jobs
]
else:
cpu_count = max(1, (os.cpu_count() or 1))
max_workers = min(max(2, cpu_count), 8)
chunk_size = 500
scores_unique = []
ex = _get_shared_eval_executor(max_workers)
for i in range(0, len(unique_jobs), chunk_size):
chunk = unique_jobs[i:i + chunk_size]
scores_unique.extend(
ex.map(
lambda pair: evaluate_keypoints_for_frame(pair[0], pair[1], h, w, precomputed_key=(pair[2], w, h)),
chunk,
)
)
scores = np.full(total_frames, -1.0, dtype=np.float32)
results = [[(0, 0)] * 32 for _ in range(total_frames)]
for score, (k, frame_index, _) in zip(scores_unique, unique_jobs):
if score > scores[frame_index]:
scores[frame_index] = score
results[frame_index] = k
return results
def fix_keypoints_pri(
results_frames,
frame_width: int,
frame_height: int
) -> list[Any]:
max_frames = len(results_frames)
limit = 30
before = deque(maxlen=limit)
after = deque(maxlen=limit)
sparse_template = [(0,0)] * 32
all_possible = [None] * max_frames
for i in range(max_frames):
all_possible[i] = make_possible_keypoints([results_frames[i]], frame_width, frame_height)
for i in range(1, min(limit, max_frames)):
after.append(all_possible[i])
current = all_possible[0] if max_frames > 0 else []
total_jobs = []
for frame_index in range(max_frames):
if frame_index < max_frames - limit:
future_idx = frame_index + limit
if all_possible[future_idx] is None:
all_possible[future_idx] = make_possible_keypoints([results_frames[future_idx]], frame_width, frame_height)
after.append(all_possible[future_idx])
frame_jobs = [(kpts, frame_index) for kpts in current]
for t in after:
frame_jobs.extend([(kpts, frame_index) for kpts in t])
for t in before:
frame_jobs.extend([(kpts, frame_index) for kpts in t])
frame_jobs.append((sparse_template, frame_index))
total_jobs.extend(frame_jobs)
before.append(current)
if len(after) != 0:
current = after.popleft()
start_time = time.time()
results = evaluates(total_jobs, frame_height, frame_width, max_frames)
print(f"Evaluation time: {time.time() - start_time}")
return results
def normalize_results(frame_results, threshold):
if not frame_results:
return []
results_array = []
for result in frame_results:
arr = np.array(result, dtype=np.float32) # (N, 3)
if arr.size == 0:
results_array.append([])
continue
mask = arr[:, 2] > threshold # (N,)
scaled = arr[:, :2] # (N, 2)
scaled = np.where(mask[:, None], scaled, 0) # Apply mask
results_array.append([(int(x), int(y)) for x, y in scaled])
return results_array
def convert_to_gray(image):
gray = cvtColor(image, COLOR_BGR2GRAY)
gray = morphologyEx(gray, MORPH_TOPHAT, kernel, dst=gray)
GaussianBlur(gray, (5, 5), 0, dst=gray)
image_edges = Canny(gray, 30, 100)
return image_edges
def get_cls_net(config, pretrained='', **kwargs):
"""Create keypoint detection model with softmax activation"""
def conv3x3(in_planes, out_planes, stride=1):
"""3x3 convolution with padding"""
return nn.Conv2d(in_planes, out_planes, kernel_size=3,
stride=stride, padding=1, bias=False)
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None):
super(BasicBlock, self).__init__()
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes)
self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, inplanes, planes, stride=1, downsample=None):
super(Bottleneck, self).__init__()
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride,
padding=1, bias=False)
self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM)
self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1,
bias=False)
self.bn3 = BatchNorm2d(planes * self.expansion,
momentum=BN_MOMENTUM)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
BatchNorm2d = nn.BatchNorm2d
BN_MOMENTUM = 0.1
blocks_dict = {
'BASIC': BasicBlock,
'BOTTLENECK': Bottleneck
}
class HighResolutionModule(nn.Module):
def __init__(self, num_branches, blocks, num_blocks, num_inchannels,
num_channels, fuse_method, multi_scale_output=True):
super(HighResolutionModule, self).__init__()
self._check_branches(
num_branches, blocks, num_blocks, num_inchannels, num_channels)
self.num_inchannels = num_inchannels
self.fuse_method = fuse_method
self.num_branches = num_branches
self.multi_scale_output = multi_scale_output
self.branches = self._make_branches(
num_branches, blocks, num_blocks, num_channels)
self.fuse_layers = self._make_fuse_layers()
self.relu = nn.ReLU(inplace=True)
def _check_branches(self, num_branches, blocks, num_blocks,
num_inchannels, num_channels):
if num_branches != len(num_blocks):
error_msg = 'NUM_BRANCHES({}) <> NUM_BLOCKS({})'.format(
num_branches, len(num_blocks))
raise ValueError(error_msg)
if num_branches != len(num_channels):
error_msg = 'NUM_BRANCHES({}) <> NUM_CHANNELS({})'.format(
num_branches, len(num_channels))
raise ValueError(error_msg)
if num_branches != len(num_inchannels):
error_msg = 'NUM_BRANCHES({}) <> NUM_INCHANNELS({})'.format(
num_branches, len(num_inchannels))
raise ValueError(error_msg)
def _make_one_branch(self, branch_index, block, num_blocks, num_channels,
stride=1):
downsample = None
if stride != 1 or \
self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.num_inchannels[branch_index],
num_channels[branch_index] * block.expansion,
kernel_size=1, stride=stride, bias=False),
BatchNorm2d(num_channels[branch_index] * block.expansion,
momentum=BN_MOMENTUM),
)
layers = []
layers.append(block(self.num_inchannels[branch_index],
num_channels[branch_index], stride, downsample))
self.num_inchannels[branch_index] = \
num_channels[branch_index] * block.expansion
for i in range(1, num_blocks[branch_index]):
layers.append(block(self.num_inchannels[branch_index],
num_channels[branch_index]))
return nn.Sequential(*layers)
def _make_branches(self, num_branches, block, num_blocks, num_channels):
branches = []
for i in range(num_branches):
branches.append(
self._make_one_branch(i, block, num_blocks, num_channels))
return nn.ModuleList(branches)
def _make_fuse_layers(self):
if self.num_branches == 1:
return None
num_branches = self.num_branches
num_inchannels = self.num_inchannels
fuse_layers = []
for i in range(num_branches if self.multi_scale_output else 1):
fuse_layer = []
for j in range(num_branches):
if j > i:
fuse_layer.append(nn.Sequential(
nn.Conv2d(num_inchannels[j],
num_inchannels[i],
1,
1,
0,
bias=False),
BatchNorm2d(num_inchannels[i], momentum=BN_MOMENTUM)))
# nn.Upsample(scale_factor=2**(j-i), mode='nearest')))
elif j == i:
fuse_layer.append(None)
else:
conv3x3s = []
for k in range(i - j):
if k == i - j - 1:
num_outchannels_conv3x3 = num_inchannels[i]
conv3x3s.append(nn.Sequential(
nn.Conv2d(num_inchannels[j],
num_outchannels_conv3x3,
3, 2, 1, bias=False),
BatchNorm2d(num_outchannels_conv3x3, momentum=BN_MOMENTUM)))
else:
num_outchannels_conv3x3 = num_inchannels[j]
conv3x3s.append(nn.Sequential(
nn.Conv2d(num_inchannels[j],
num_outchannels_conv3x3,
3, 2, 1, bias=False),
BatchNorm2d(num_outchannels_conv3x3,
momentum=BN_MOMENTUM),
nn.ReLU(inplace=True)))
fuse_layer.append(nn.Sequential(*conv3x3s))
fuse_layers.append(nn.ModuleList(fuse_layer))
return nn.ModuleList(fuse_layers)
def get_num_inchannels(self):
return self.num_inchannels
def forward(self, x):
if self.num_branches == 1:
return [self.branches[0](x[0])]
for i in range(self.num_branches):
x[i] = self.branches[i](x[i])
x_fuse = []
for i in range(len(self.fuse_layers)):
y = x[0] if i == 0 else self.fuse_layers[i][0](x[0])
for j in range(1, self.num_branches):
if i == j:
y = y + x[j]
elif j > i:
y = y + F.interpolate(
self.fuse_layers[i][j](x[j]),
size=[x[i].shape[2], x[i].shape[3]],
mode='bilinear')
else:
y = y + self.fuse_layers[i][j](x[j])
x_fuse.append(self.relu(y))
return x_fuse
class HighResolutionNet(nn.Module):
def __init__(self, config, lines=False, **kwargs):
self.inplanes = 64
self.lines = lines
extra = config['MODEL']['EXTRA']
super(HighResolutionNet, self).__init__()
# stem net
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=2, padding=1,
bias=False)
self.bn1 = BatchNorm2d(self.inplanes, momentum=BN_MOMENTUM)
self.conv2 = nn.Conv2d(self.inplanes, self.inplanes, kernel_size=3, stride=2, padding=1,
bias=False)
self.bn2 = BatchNorm2d(self.inplanes, momentum=BN_MOMENTUM)
self.relu = nn.ReLU(inplace=True)
self.sf = nn.Softmax(dim=1)
self.layer1 = self._make_layer(Bottleneck, 64, 64, 4)
self.stage2_cfg = extra['STAGE2']
num_channels = self.stage2_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage2_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion for i in range(len(num_channels))]
self.transition1 = self._make_transition_layer(
[256], num_channels)
self.stage2, pre_stage_channels = self._make_stage(
self.stage2_cfg, num_channels)
self.stage3_cfg = extra['STAGE3']
num_channels = self.stage3_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage3_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion for i in range(len(num_channels))]
self.transition2 = self._make_transition_layer(
pre_stage_channels, num_channels)
self.stage3, pre_stage_channels = self._make_stage(
self.stage3_cfg, num_channels)
self.stage4_cfg = extra['STAGE4']
num_channels = self.stage4_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage4_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion for i in range(len(num_channels))]
self.transition3 = self._make_transition_layer(
pre_stage_channels, num_channels)
self.stage4, pre_stage_channels = self._make_stage(
self.stage4_cfg, num_channels, multi_scale_output=True)
self.upsample = nn.Upsample(scale_factor=2, mode='nearest')
final_inp_channels = sum(pre_stage_channels) + self.inplanes
self.head = nn.Sequential(nn.Sequential(
nn.Conv2d(
in_channels=final_inp_channels,
out_channels=final_inp_channels,
kernel_size=1),
BatchNorm2d(final_inp_channels, momentum=BN_MOMENTUM),
nn.ReLU(inplace=True),
nn.Conv2d(
in_channels=final_inp_channels,
out_channels=config['MODEL']['NUM_JOINTS'],
kernel_size=extra['FINAL_CONV_KERNEL']),
nn.Softmax(dim=1) if self.lines == False else nn.Sigmoid()))
def _make_head(self, x, x_skip):
x = self.upsample(x)
x = torch.cat([x, x_skip], dim=1)
x = self.head(x)
return x
def _make_transition_layer(
self, num_channels_pre_layer, num_channels_cur_layer):
num_branches_cur = len(num_channels_cur_layer)
num_branches_pre = len(num_channels_pre_layer)
transition_layers = []
for i in range(num_branches_cur):
if i < num_branches_pre:
if num_channels_cur_layer[i] != num_channels_pre_layer[i]:
transition_layers.append(nn.Sequential(
nn.Conv2d(num_channels_pre_layer[i],
num_channels_cur_layer[i],
3,
1,
1,
bias=False),
BatchNorm2d(
num_channels_cur_layer[i], momentum=BN_MOMENTUM),
nn.ReLU(inplace=True)))
else:
transition_layers.append(None)
else:
conv3x3s = []
for j in range(i + 1 - num_branches_pre):
inchannels = num_channels_pre_layer[-1]
outchannels = num_channels_cur_layer[i] \
if j == i - num_branches_pre else inchannels
conv3x3s.append(nn.Sequential(
nn.Conv2d(
inchannels, outchannels, 3, 2, 1, bias=False),
BatchNorm2d(outchannels, momentum=BN_MOMENTUM),
nn.ReLU(inplace=True)))
transition_layers.append(nn.Sequential(*conv3x3s))
return nn.ModuleList(transition_layers)
def _make_layer(self, block, inplanes, planes, blocks, stride=1):
downsample = None
if stride != 1 or inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(inplanes, planes * block.expansion,
kernel_size=1, stride=stride, bias=False),
BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM),
)
layers = []
layers.append(block(inplanes, planes, stride, downsample))
inplanes = planes * block.expansion
for i in range(1, blocks):
layers.append(block(inplanes, planes))
return nn.Sequential(*layers)
def _make_stage(self, layer_config, num_inchannels,
multi_scale_output=True):
num_modules = layer_config['NUM_MODULES']
num_branches = layer_config['NUM_BRANCHES']
num_blocks = layer_config['NUM_BLOCKS']
num_channels = layer_config['NUM_CHANNELS']
block = blocks_dict[layer_config['BLOCK']]
fuse_method = layer_config['FUSE_METHOD']
modules = []
for i in range(num_modules):
# multi_scale_output is only used last module
if not multi_scale_output and i == num_modules - 1:
reset_multi_scale_output = False
else:
reset_multi_scale_output = True
modules.append(
HighResolutionModule(num_branches,
block,
num_blocks,
num_inchannels,
num_channels,
fuse_method,
reset_multi_scale_output)
)
num_inchannels = modules[-1].get_num_inchannels()
return nn.Sequential(*modules), num_inchannels
def forward(self, x):
# h, w = x.size(2), x.size(3)
x = self.conv1(x)
x_skip = x.clone()
x = self.bn1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
x = self.layer1(x)
x_list = []
for i in range(self.stage2_cfg['NUM_BRANCHES']):
if self.transition1[i] is not None:
x_list.append(self.transition1[i](x))
else:
x_list.append(x)
y_list = self.stage2(x_list)
x_list = []
for i in range(self.stage3_cfg['NUM_BRANCHES']):
if self.transition2[i] is not None:
x_list.append(self.transition2[i](y_list[-1]))
else:
x_list.append(y_list[i])
y_list = self.stage3(x_list)
x_list = []
for i in range(self.stage4_cfg['NUM_BRANCHES']):
if self.transition3[i] is not None:
x_list.append(self.transition3[i](y_list[-1]))
else:
x_list.append(y_list[i])
x = self.stage4(x_list)
# Head Part
height, width = x[0].size(2), x[0].size(3)
x1 = F.interpolate(x[1], size=(height, width), mode='bilinear', align_corners=False)
x2 = F.interpolate(x[2], size=(height, width), mode='bilinear', align_corners=False)
x3 = F.interpolate(x[3], size=(height, width), mode='bilinear', align_corners=False)
x = torch.cat([x[0], x1, x2, x3], 1)
x = self._make_head(x, x_skip)
return x
def init_weights(self, pretrained=''):
for m in self.modules():
if isinstance(m, nn.Conv2d):
if self.lines == False:
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
else:
nn.init.normal_(m.weight, std=0.001)
#nn.init.normal_(m.weight, std=0.001)
#nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
if pretrained != '':
if os.path.isfile(pretrained):
pretrained_dict = torch.load(pretrained)
model_dict = self.state_dict()
pretrained_dict = {k: v for k, v in pretrained_dict.items()
if k in model_dict.keys()}
model_dict.update(pretrained_dict)
self.load_state_dict(model_dict)
else:
sys.exit(f'Weights {pretrained} not found.')
model = HighResolutionNet(config, **kwargs)
model.init_weights(pretrained)
return model
# Keypoint Inference
def load_kp_model(path, device):
config_kp_path = path / 'hrnetv2_w48.yaml'
cfg_kp = yaml.safe_load(open(config_kp_path, 'r'))
loaded_state_kp = torch.load(path / "keypoint_detect.pt", map_location=device, weights_only=False)
model = get_cls_net(cfg_kp)
model.load_state_dict(loaded_state_kp)
model.to(device)
model.eval()
return model
def preprocess_batch_fast(frames):
"""Ultra-fast batch preprocessing using optimized tensor operations"""
target_size = (540, 960) # H, W format for model input
batch = []
for i, frame in enumerate(frames):
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = cv2.resize(frame_rgb, (target_size[1], target_size[0]))
img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1)) # HWC -> CHW
batch.append(img)
batch = torch.from_numpy(np.stack(batch)).float()
return batch
def extract_keypoints_from_heatmap_fast(heatmap: torch.Tensor, scale: int = 2, max_keypoints: int = 1):
"""Ultra-fast keypoint extraction optimized for speed"""
batch_size, n_channels, height, width = heatmap.shape
# Simplified local maxima detection (faster but slightly less accurate)
max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1)
local_maxima = (max_pooled == heatmap)
# Apply mask and get top keypoints in one go
masked_heatmap = heatmap * local_maxima
flat_heatmap = masked_heatmap.view(batch_size, n_channels, -1)
scores, indices = torch.topk(flat_heatmap, max_keypoints, dim=-1, sorted=False)
# Vectorized coordinate calculation
y_coords = torch.div(indices, width, rounding_mode="floor") * scale
x_coords = (indices % width) * scale
# Stack results efficiently
results = torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1)
return results
def process_keypoints_vectorized(kp_coords, kp_threshold, w, h, batch_size):
"""Ultra-fast vectorized keypoint processing"""
batch_results = []
# Convert to numpy once for faster CPU operations
kp_np = kp_coords.cpu().numpy()
for batch_idx in range(batch_size):
kp_dict = {}
# Vectorized threshold check
valid_kps = kp_np[batch_idx, :, 0, 2] > kp_threshold
valid_indices = np.where(valid_kps)[0]
for ch_idx in valid_indices:
x = float(kp_np[batch_idx, ch_idx, 0, 0]) / w
y = float(kp_np[batch_idx, ch_idx, 0, 1]) / h
p = float(kp_np[batch_idx, ch_idx, 0, 2])
kp_dict[ch_idx + 1] = {'x': x, 'y': y, 'p': p}
batch_results.append(kp_dict)
return batch_results
def inference_batch(frames, model, kp_threshold, device, batch_size=8):
"""Optimized batch inference for multiple frames"""
results = []
num_frames = len(frames)
# Get the device from the model itself
model_device = next(model.parameters()).device
# Process all frames in optimally-sized batches
for i in range(0, num_frames, batch_size):
current_batch_size = min(batch_size, num_frames - i)
batch_frames = frames[i:i + current_batch_size]
# Fast preprocessing - create on CPU first
batch = preprocess_batch_fast(batch_frames)
b, c, h, w = batch.size()
# Move batch to model device
batch = batch.to(model_device)
with torch.inference_mode():
heatmaps = model(batch)
# Ultra-fast keypoint extraction
kp_coords = extract_keypoints_from_heatmap_fast(heatmaps[:,:-1,:,:], scale=2, max_keypoints=1)
# Vectorized batch processing - no loops
batch_results = process_keypoints_vectorized(kp_coords, kp_threshold, 960, 540, current_batch_size)
results.extend(batch_results)
del heatmaps, kp_coords, batch, batch_results, batch_frames
return results
map_keypoints = {
1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23,
11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29,
28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20,
45: 9, 50: 31, 52: 32, 57: 22
}
def get_mapped_keypoints(kp_points):
"""Apply keypoint mapping to detection results"""
mapped_points = {}
for key, value in kp_points.items():
if key in map_keypoints:
mapped_key = map_keypoints[key]
mapped_points[mapped_key] = value
# else:
# Keep unmapped keypoints with original key
# mapped_points[key] = value
return mapped_points
def process_batch_input(frames, model, kp_threshold, device='cpu', batch_size=16):
"""Process multiple input images in batch"""
# Batch inference
kp_results = inference_batch(frames, model, kp_threshold, device, batch_size)
kp_results = [get_mapped_keypoints(kp) for kp in kp_results]
return kp_results
class _Pl:
def __init__(self, repo_root: Path) -> None:
self.repo_root = Path(repo_root)
self._executor = ThreadPoolExecutor(max_workers=3)
self._track_id_to_team_votes: dict[int, dict[str, int]] = {}
self._track_id_to_class_votes: dict[int, dict[int, int]] = {}
self._reid_embedder: Optional[_ReidE] = None
self._reid_team_assigner: Optional[_ReidT] = None
self._track_to_reid_mean: dict[int, np.ndarray] = {}
self._reid_ema_alpha = 0.25
_reid_path = self.repo_root / "models" / "reid.onnx"
if _reid_path.exists() and _HAS_ONNXRUNTIME:
try:
self._reid_embedder = _ReidE(_reid_path, input_height=256, input_width=128)
self._reid_team_assigner = _ReidT()
except Exception:
self._reid_embedder = None
self._reid_team_assigner = None
self._tracker_config = "botsort.yaml"
models_dir = self.repo_root / "models"
self.ball_model = YOLO(str(models_dir / "ball-detection-model.onnx"), task="detect")
self.person_model = YOLO(str(models_dir / "person-detection-model.onnx"), task="detect")
self._keypoint_model_hrnet = None
_yaml_path = self.repo_root / "hrnetv2_w48.yaml"
_weights_path = self.repo_root / "models" / "keypoint"
if _f0 and _yaml_path.exists() and _weights_path.exists():
try:
self._keypoint_model_hrnet = _l0(
self.repo_root, weights_subdir="models"
)
except Exception:
self._keypoint_model_hrnet = None
self._current_batch_bbox_timings: list[tuple[str, float]] = []
self._current_batch_kp_timings: list[tuple[str, float]] = []
def reset_for_new_video(self) -> None:
self._track_id_to_team_votes.clear()
self._track_id_to_class_votes.clear()
self._track_to_reid_mean.clear()
if self._reid_team_assigner is not None:
self._reid_team_assigner.centroids = None
try:
pred = getattr(self.person_model, "predictor", None)
if pred is not None:
for t in getattr(pred, "trackers", []) or []:
if hasattr(t, "reset"):
t.reset()
except Exception:
pass
def _keypoint_hrnet_task(
self,
images: list[ndarray],
offset: int,
n_keypoints: int,
) -> dict[int, list[list[float]]]:
start_time = time.time()
default_kps = [[0.0, 0.0] for _ in range(n_keypoints)]
if not _f0 or self._keypoint_model_hrnet is None:
return {i: list(default_kps) for i in range(len(images))}
device = "cuda" if next(self._keypoint_model_hrnet.parameters()).is_cuda else "cpu"
kp_threshold = 0.2
# kp_result = _x0(
# images, self._keypoint_model_hrnet, kp_threshold, device, batch_size=2
# )
kp_result = process_batch_input(
images,
self._keypoint_model_hrnet,
kp_threshold,
device,
batch_size=kp_batch_size
)
keypoints = _n0(kp_result, images, n_keypoints)
for idx in range(len(keypoints)):
keypoints[idx] = _f1(keypoints[idx], n_keypoints)
out: dict[int, list[list[float]]] = {}
for i, kpts in enumerate(keypoints):
out[i] = _c1(kpts)
print(f"Keypoint HRNet: {time.time() - start_time}")
return out
def _bbox_task(
self,
images: list[ndarray],
offset: int,
imgsz: int,
conf: float,
onnx_batch_size: int,
) -> dict[int, list[_Bx]]:
start_time = time.time()
ball_res = []
for start in range(0, len(images), onnx_batch_size):
chunk = images[start : start + onnx_batch_size]
batch_res = self.ball_model.predict(chunk, imgsz=imgsz, conf=conf, verbose=False)
ball_res.extend(batch_res if batch_res else [])
print(f"Ball Model: {time.time() - start_time}")
start_time = time.time()
person_res = []
for frame in images:
pr = self.person_model.track(frame, persist=True, tracker=self._tracker_config, imgsz=imgsz, conf=conf, verbose=False)
person_res.append(pr[0] if pr else None)
print(f"Person Model: {time.time() - start_time}")
start_time = time.time()
bboxes_by_frame: dict[int, list[_Bx]] = {}
track_ids_by_frame: dict[int, list[int | None]] = {}
boxes_raw_list: list[list[_Bx]] = []
track_ids_raw_list: list[list[int | None]] = []
bbox_to_track_list: list[dict[tuple[int, int, int, int], int]] = []
for i, frame in enumerate(images):
frame_id = offset + i
boxes_raw = []
track_ids_raw: list[int | None] = []
bbox_to_track: dict[tuple[int, int, int, int], int] = {}
det_ball = ball_res[i] if i < len(ball_res) else None
if det_ball is not None and getattr(det_ball, "boxes", None) is not None and len(det_ball.boxes) > 0:
b = det_ball.boxes
xyxy = b.xyxy.cpu().numpy()
confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32)
clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32)
for (x1, y1, x2, y2), c, cf in zip(xyxy, clss, confs):
if int(c) == 0:
boxes_raw.append(_Bx(x1=int(round(x1)), y1=int(round(y1)), x2=int(round(x2)), y2=int(round(y2)), cls_id=_C0, conf=float(cf)))
track_ids_raw.append(None)
det_p = person_res[i] if i < len(person_res) else None
if det_p is not None and getattr(det_p, "boxes", None) is not None and len(det_p.boxes) > 0:
b = det_p.boxes
xyxy = b.xyxy.cpu().numpy()
confs = b.conf.cpu().numpy() if b.conf is not None else np.ones(len(xyxy), dtype=np.float32)
clss = b.cls.cpu().numpy().astype(int) if b.cls is not None else np.zeros(len(xyxy), dtype=np.int32)
track_ids = b.id.cpu().numpy().astype(int) if getattr(b, "id", None) is not None else np.full(len(clss), -1, dtype=np.int32)
for (x1, y1, x2, y2), c, cf, tid in zip(xyxy, clss, confs, track_ids):
c = int(c)
tid = int(tid)
x1r, y1r, x2r, y2r = int(round(x1)), int(round(y1)), int(round(x2)), int(round(y2))
if tid >= 0:
bbox_to_track[(x1r, y1r, x2r, y2r)] = tid
tid_out = tid if tid >= 0 else None
if c == 0:
boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C2, conf=float(cf)))
track_ids_raw.append(tid_out)
elif c == 1:
boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C3, conf=float(cf)))
track_ids_raw.append(tid_out)
elif c == 2:
boxes_raw.append(_Bx(x1=x1r, y1=y1r, x2=x2r, y2=y2r, cls_id=_C1, conf=float(cf)))
track_ids_raw.append(tid_out)
boxes_raw_list.append(boxes_raw)
track_ids_raw_list.append(track_ids_raw)
bbox_to_track_list.append(bbox_to_track)
reid_embs_per_frame: list[list[Optional[np.ndarray]]] = []
if self._reid_embedder and self._reid_team_assigner:
crop_list: list[tuple[ndarray, tuple[int, int, int, int]]] = []
mapping: list[tuple[int, int]] = []
for fi in range(len(images)):
boxes_raw = boxes_raw_list[fi]
for bi, bb in enumerate(boxes_raw):
if int(bb.cls_id) == _C2:
crop_list.append((images[fi], (int(bb.x1), int(bb.y1), int(bb.x2), int(bb.y2))))
mapping.append((fi, bi))
# start_time = time.time()
reid_results = self._reid_embedder.extract_batch(
crop_list, batch_size=_R3, timings=None
)
# print(f"Process2_2_2: {time.time() - start_time}")
reid_embs_per_frame = [[None] * len(boxes_raw_list[fi]) for fi in range(len(images))]
for k, (fi, bi) in enumerate(mapping):
if k < len(reid_results):
reid_embs_per_frame[fi][bi] = reid_results[k]
else:
reid_embs_per_frame = [[None] * len(boxes_raw) for boxes_raw in boxes_raw_list]
for i, frame in enumerate(images):
frame_id = offset + i
boxes_raw = boxes_raw_list[i]
bbox_to_track = bbox_to_track_list[i]
reid_embs = reid_embs_per_frame[i]
H, W = frame.shape[:2]
use_centroid: list[bool] = []
if self._reid_embedder and self._reid_team_assigner:
player_boxes = [bb for bb in boxes_raw if int(bb.cls_id) == _C2]
use_centroid = [not any(_i1(bb, o) >= _T0 for o in player_boxes if o is not bb) for bb in boxes_raw]
team_ids_reid = self._reid_team_assigner.assign(reid_embs, use_for_centroid=use_centroid)
boxes_with_team = []
for idx, bb in enumerate(boxes_raw):
if int(bb.cls_id) == _C2 and idx < len(team_ids_reid):
tid = team_ids_reid[idx]
boxes_with_team.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=bb.cls_id, conf=bb.conf, team_id=str(int(tid))))
else:
boxes_with_team.append(bb)
track_ids_with_team = track_ids_raw_list[i] if i < len(track_ids_raw_list) else [None] * len(boxes_raw)
else:
boxes_with_team = list(boxes_raw)
track_ids_with_team = track_ids_raw_list[i] if i < len(track_ids_raw_list) else [None] * len(boxes_raw)
boxes_stabilized = []
track_ids_stabilized: list[int | None] = []
for idx, bb in enumerate(boxes_with_team):
best_tid = -1
best_iou = 0.0
for (bx1, by1, bx2, by2), tid in bbox_to_track.items():
iou = _i1(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=0, conf=0.0), _Bx(x1=bx1, y1=by1, x2=bx2, y2=by2, cls_id=0, conf=0.0))
if iou > best_iou and iou > 0.5:
best_iou, best_tid = iou, tid
tid_out = best_tid if best_tid >= 0 else (track_ids_with_team[idx] if idx < len(track_ids_with_team) else None)
if best_tid >= 0:
if best_tid not in self._track_id_to_class_votes:
self._track_id_to_class_votes[best_tid] = {}
cls_key = int(bb.cls_id)
self._track_id_to_class_votes[best_tid][cls_key] = self._track_id_to_class_votes[best_tid].get(cls_key, 0) + 1
if int(bb.cls_id) == _C2 and bb.team_id:
team_key = (bb.team_id or "1").strip()
if best_tid not in self._track_id_to_team_votes:
self._track_id_to_team_votes[best_tid] = {}
self._track_id_to_team_votes[best_tid][team_key] = self._track_id_to_team_votes[best_tid].get(team_key, 0) + 1
boxes_stabilized.append(_Bx(x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, cls_id=bb.cls_id, conf=bb.conf, team_id=bb.team_id))
track_ids_stabilized.append(tid_out)
else:
boxes_stabilized.append(bb)
track_ids_stabilized.append(tid_out)
bboxes_by_frame[frame_id] = boxes_stabilized
track_ids_by_frame[frame_id] = track_ids_stabilized
for fid in range(offset, offset + len(images)):
new_boxes = []
tids_fid = track_ids_by_frame.get(fid, [None] * len(bboxes_by_frame[fid]))
for box_idx, box in enumerate(bboxes_by_frame[fid]):
tid = tids_fid[box_idx] if box_idx < len(tids_fid) else None
if tid is not None and tid >= 0 and tid in self._track_id_to_class_votes:
majority_cls = max(self._track_id_to_class_votes[tid].items(), key=lambda x: x[1])[0]
if tid in self._track_id_to_team_votes and self._track_id_to_team_votes[tid]:
majority_team = max(self._track_id_to_team_votes[tid].items(), key=lambda x: x[1])[0]
else:
majority_team = box.team_id
new_boxes.append(_Bx(x1=box.x1, y1=box.y1, x2=box.x2, y2=box.y2, cls_id=majority_cls, conf=box.conf, team_id=majority_team))
else:
new_boxes.append(box)
bboxes_by_frame[fid] = new_boxes
track_ids_by_frame[fid] = tids_fid
if len(images) > 0:
H, W = images[0].shape[:2]
for fid in range(offset, offset + len(images)):
orig_boxes = bboxes_by_frame[fid]
orig_tids = track_ids_by_frame.get(fid, [None] * len(orig_boxes))
adjusted = _a0(
orig_boxes,
frame_width=W,
frame_height=H,
do_goalkeeper_dedup=True,
do_referee_disambiguation=True,
)
adjusted_tids: list[int | None] = []
used_orig = set()
for ab in adjusted:
matched = None
for oi, ob in enumerate(orig_boxes):
if oi in used_orig:
continue
if ob.x1 == ab.x1 and ob.y1 == ab.y1 and ob.x2 == ab.x2 and ob.y2 == ab.y2:
matched = orig_tids[oi] if oi < len(orig_tids) else None
used_orig.add(oi)
break
adjusted_tids.append(matched)
if _q0 != 0.0 or _q1 != 0.0:
boxes_offset = []
offset_tids = []
for ab_idx, bb in enumerate(adjusted):
cx = 0.5 * (bb.x1 + bb.x2)
cy = 0.5 * (bb.y1 + bb.y2)
w = bb.x2 - bb.x1
h = bb.y2 - bb.y1
cx *= 1.0 + _q0
cy *= 1.0 + _q1
boxes_offset.append(_Bx(x1=int(round(cx - w/2)), y1=int(round(cy - h/2)), x2=int(round(cx + w/2)), y2=int(round(cy + h/2)), cls_id=bb.cls_id, conf=bb.conf, team_id=bb.team_id))
offset_tids.append(adjusted_tids[ab_idx] if ab_idx < len(adjusted_tids) else None)
adjusted = boxes_offset
adjusted_tids = offset_tids
bboxes_by_frame[fid] = adjusted
track_ids_by_frame[fid] = adjusted_tids
if _A0 and _S0 > 1 and len(images) > 0:
_tmp_results = []
for fid in range(offset, offset + len(images)):
_boxes = bboxes_by_frame.get(fid, [])
_tmp_results.append(
_FRes(
frame_id=fid,
boxes=[{"x1": int(b.x1), "y1": int(b.y1), "x2": int(b.x2), "y2": int(b.y2), "cls_id": int(b.cls_id), "conf": float(b.conf), "team_id": b.team_id} for b in _boxes],
keypoints=[],
)
)
_tmp_results = _s0(_tmp_results, window=_S0, tids_by_frame=track_ids_by_frame)
for r in _tmp_results:
bboxes_by_frame[int(r.frame_id)] = [_Bx(**box) for box in r.boxes]
self._current_batch_bbox_timings = []
print(f"Detect Process: {time.time() - start_time}")
return bboxes_by_frame
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[_FRes]:
if not batch_images:
return []
if offset == 0:
self.reset_for_new_video()
gc.collect()
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception:
pass
images = list(batch_images)
n_frames = len(images)
imgsz = _D0
conf = _D1
executor = self._executor
default_kps = [[0.0, 0.0] for _ in range(n_keypoints)]
start_time = time.time()
# if _E0 and _E1 and _P0:
# future_bbox = executor.submit(self._bbox_task, images, offset, imgsz, conf, onnx_batch_size)
# future_kp = executor.submit(self._keypoint_hrnet_task, images, offset, n_keypoints)
# bboxes_by_frame = future_bbox.result()
# keypoints_by_frame = future_kp.result()
# elif _E0 and _E1:
bboxes_by_frame = self._bbox_task(images, offset, imgsz, conf, onnx_batch_size)
keypoints_by_frame = self._keypoint_hrnet_task(images, offset, n_keypoints)
# else:
# if _E0:
# bboxes_by_frame = self._bbox_task(images, offset, imgsz, conf, onnx_batch_size)
# else:
# bboxes_by_frame = {offset + i: [] for i in range(len(images))}
# self._current_batch_bbox_timings = []
# if _E1:
# keypoints_by_frame = self._keypoint_hrnet_task(images, offset, n_keypoints)
# else:
# keypoints_by_frame = {i: list(default_kps) for i in range(len(images))}
print(f"Predict Objects: {time.time() - start_time}")
start_time = time.time()
if _E1 and keypoints_by_frame and n_keypoints == 32 and len(_F0) == 32 and len(_F1) == 32:
for idx in range(len(images)):
frame_id = idx
kps = keypoints_by_frame.get(frame_id)
if not kps or len(kps) != 32:
continue
frame = images[idx]
frame_height, frame_width = frame.shape[:2]
valid_src_corrected: list[tuple[float, float]] = []
valid_dst: list[tuple[float, float]] = []
valid_indices: list[int] = []
for kp_idx, kp in enumerate(kps):
if kp and len(kp) >= 2:
x, y = float(kp[0]), float(kp[1])
if not (abs(x) < 1e-6 and abs(y) < 1e-6) and 0 <= x < frame_width and 0 <= y < frame_height:
valid_src_corrected.append(_F1[kp_idx])
valid_dst.append((x, y))
valid_indices.append(kp_idx)
if len(valid_src_corrected) < 4:
continue
src_pts = np.array(valid_src_corrected, dtype=np.float32)
dst_pts = np.array(valid_dst, dtype=np.float32)
H_corrected, _ = cv2.findHomography(src_pts, dst_pts)
if H_corrected is None:
continue
all_template_points = np.array(_F0, dtype=np.float32).reshape(-1, 1, 2)
adjusted_points = cv2.perspectiveTransform(all_template_points, H_corrected)
adjusted_points = adjusted_points.reshape(-1, 2)
adj_x_arr = adjusted_points[:32, 0]
adj_y_arr = adjusted_points[:32, 1]
valid_mask = (adj_x_arr >= 0) & (adj_y_arr >= 0) & (adj_x_arr < frame_width) & (adj_y_arr < frame_height)
valid_indices_set = set(valid_indices)
adjusted_kps: list[list[float]] = [[0.0, 0.0]] * 32
for i in np.where(valid_mask)[0]:
if _S1 or i in valid_indices_set:
adjusted_kps[i] = [float(adj_x_arr[i]), float(adj_y_arr[i])]
keypoints_by_frame[frame_id] = adjusted_kps
print(f"Get kps: {time.time() - start_time}")
h, w = batch_images[0].shape[:2]
keypoints_by_frame = fix_keypoints_pri(keypoints_by_frame, w, h)
results = []
for idx in range(len(images)):
frame_number = offset + idx
boxes_raw = bboxes_by_frame.get(frame_number, [])
boxes_for_result = [
{
"x1": int(b.x1),
"y1": int(b.y1),
"x2": int(b.x2),
"y2": int(b.y2),
"cls_id": int(b.cls_id),
"conf": float(b.conf),
"team_id": b.team_id,
}
for b in boxes_raw
]
results.append(
_FRes(
frame_id=frame_number,
boxes=boxes_for_result,
keypoints=convert_keypoints_to_val_format(keypoints_by_frame[frame_number - offset])
)
)
return results
class Miner:
def __init__(self, path_hf_repo: Path) -> None:
self.health = "Okay!!!"
self.pipeline: _Pl | None = None
self.path_hf_repo = Path(path_hf_repo)
self.is_start = False
def __repr__(self) -> str:
return self.health
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[_FRes]:
if self.is_start is False:
self.is_start = True
return []
if self.pipeline is None:
self.pipeline = _Pl(repo_root=self.path_hf_repo)
return self.pipeline.predict_batch(batch_images, offset, n_keypoints)