turbo_1_2 / miner.py
gloriforge's picture
Duplicate from gloriforge/turbo_1_1
03f1f92
import time
import cv2
import torch
import numpy as np
from pathlib import Path
from numpy import ndarray
from pydantic import BaseModel
from ultralytics import YOLO
import os
from typing import Iterable, Generator, List, TypeVar, Tuple, Sequence, Any, Dict, Optional
from collections import deque, OrderedDict, defaultdict
import threading
from itertools import combinations
from concurrent.futures import ThreadPoolExecutor
import yaml
from cv2 import (
bitwise_and,
findHomography,
warpPerspective,
cvtColor,
COLOR_BGR2GRAY,
threshold,
THRESH_BINARY,
getStructuringElement,
MORPH_RECT,
MORPH_TOPHAT,
GaussianBlur,
morphologyEx,
Canny,
connectedComponents,
perspectiveTransform,
RETR_EXTERNAL,
CHAIN_APPROX_SIMPLE,
findContours,
boundingRect,
dilate,
imread,
countNonZero
)
import gc
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
track_id: int | None = None
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
V = TypeVar("V")
kp_threshold = 0.3
def create_batches(sequence: Iterable[V], batch_size: int) -> Generator[List[V], None, None]:
batch_size = max(batch_size, 1)
current_batch = []
for element in sequence:
if len(current_batch) == batch_size:
yield current_batch
current_batch = []
current_batch.append(element)
if current_batch:
yield current_batch
from torch import nn
from torch.nn import functional as F
from sklearn.cluster import KMeans
from PIL import Image
from collections import defaultdict
_OSNET_MODEL = None
team_classifier_path = None
BALL_ID = 0
GK_ID = 1
PLAYER_ID = 2
REF_ID = 3
TEAM_1_ID = 6
TEAM_2_ID = 7
pretrained_urls = {
'osnet_x1_0':
'https://drive.google.com/uc?id=1LaG1EJpHrxdAxKnSCJ_i0u-nbxSAeiFY',
}
class ConvLayer(nn.Module):
"""Convolution layer (conv + bn + relu)."""
def __init__(
self,
in_channels,
out_channels,
kernel_size,
stride=1,
padding=0,
groups=1,
IN=False
):
super(ConvLayer, self).__init__()
self.conv = nn.Conv2d(
in_channels,
out_channels,
kernel_size,
stride=stride,
padding=padding,
bias=False,
groups=groups
)
if IN:
self.bn = nn.InstanceNorm2d(out_channels, affine=True)
else:
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
x = self.relu(x)
return x
class Conv1x1(nn.Module):
"""1x1 convolution + bn + relu."""
def __init__(self, in_channels, out_channels, stride=1, groups=1):
super(Conv1x1, self).__init__()
self.conv = nn.Conv2d(
in_channels,
out_channels,
1,
stride=stride,
padding=0,
bias=False,
groups=groups
)
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
x = self.relu(x)
return x
class Conv1x1Linear(nn.Module):
"""1x1 convolution + bn (w/o non-linearity)."""
def __init__(self, in_channels, out_channels, stride=1):
super(Conv1x1Linear, self).__init__()
self.conv = nn.Conv2d(
in_channels, out_channels, 1, stride=stride, padding=0, bias=False
)
self.bn = nn.BatchNorm2d(out_channels)
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
return x
class Conv3x3(nn.Module):
"""3x3 convolution + bn + relu."""
def __init__(self, in_channels, out_channels, stride=1, groups=1):
super(Conv3x3, self).__init__()
self.conv = nn.Conv2d(
in_channels,
out_channels,
3,
stride=stride,
padding=1,
bias=False,
groups=groups
)
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
x = self.relu(x)
return x
class LightConv3x3(nn.Module):
"""Lightweight 3x3 convolution.
1x1 (linear) + dw 3x3 (nonlinear).
"""
def __init__(self, in_channels, out_channels):
super(LightConv3x3, self).__init__()
self.conv1 = nn.Conv2d(
in_channels, out_channels, 1, stride=1, padding=0, bias=False
)
self.conv2 = nn.Conv2d(
out_channels,
out_channels,
3,
stride=1,
padding=1,
bias=False,
groups=out_channels
)
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.bn(x)
x = self.relu(x)
return x
class ChannelGate(nn.Module):
def __init__(
self,
in_channels,
num_gates=None,
return_gates=False,
gate_activation='sigmoid',
reduction=16,
layer_norm=False
):
super(ChannelGate, self).__init__()
if num_gates is None:
num_gates = in_channels
self.return_gates = return_gates
self.global_avgpool = nn.AdaptiveAvgPool2d(1)
self.fc1 = nn.Conv2d(
in_channels,
in_channels // reduction,
kernel_size=1,
bias=True,
padding=0
)
self.norm1 = None
if layer_norm:
self.norm1 = nn.LayerNorm((in_channels // reduction, 1, 1))
self.relu = nn.ReLU(inplace=True)
self.fc2 = nn.Conv2d(
in_channels // reduction,
num_gates,
kernel_size=1,
bias=True,
padding=0
)
if gate_activation == 'sigmoid':
self.gate_activation = nn.Sigmoid()
elif gate_activation == 'relu':
self.gate_activation = nn.ReLU(inplace=True)
elif gate_activation == 'linear':
self.gate_activation = None
else:
raise RuntimeError(
"Unknown gate activation: {}".format(gate_activation)
)
def forward(self, x):
input = x
x = self.global_avgpool(x)
x = self.fc1(x)
if self.norm1 is not None:
x = self.norm1(x)
x = self.relu(x)
x = self.fc2(x)
if self.gate_activation is not None:
x = self.gate_activation(x)
if self.return_gates:
return x
return input * x
class OSBlock(nn.Module):
"""Omni-scale feature learning block."""
def __init__(
self,
in_channels,
out_channels,
IN=False,
bottleneck_reduction=4,
**kwargs
):
super(OSBlock, self).__init__()
mid_channels = out_channels // bottleneck_reduction
self.conv1 = Conv1x1(in_channels, mid_channels)
self.conv2a = LightConv3x3(mid_channels, mid_channels)
self.conv2b = nn.Sequential(
LightConv3x3(mid_channels, mid_channels),
LightConv3x3(mid_channels, mid_channels),
)
self.conv2c = nn.Sequential(
LightConv3x3(mid_channels, mid_channels),
LightConv3x3(mid_channels, mid_channels),
LightConv3x3(mid_channels, mid_channels),
)
self.conv2d = nn.Sequential(
LightConv3x3(mid_channels, mid_channels),
LightConv3x3(mid_channels, mid_channels),
LightConv3x3(mid_channels, mid_channels),
LightConv3x3(mid_channels, mid_channels),
)
self.gate = ChannelGate(mid_channels)
self.conv3 = Conv1x1Linear(mid_channels, out_channels)
self.downsample = None
if in_channels != out_channels:
self.downsample = Conv1x1Linear(in_channels, out_channels)
self.IN = None
if IN:
self.IN = nn.InstanceNorm2d(out_channels, affine=True)
def forward(self, x):
identity = x
x1 = self.conv1(x)
x2a = self.conv2a(x1)
x2b = self.conv2b(x1)
x2c = self.conv2c(x1)
x2d = self.conv2d(x1)
x2 = self.gate(x2a) + self.gate(x2b) + self.gate(x2c) + self.gate(x2d)
x3 = self.conv3(x2)
if self.downsample is not None:
identity = self.downsample(identity)
out = x3 + identity
if self.IN is not None:
out = self.IN(out)
return F.relu(out)
class OSNet(nn.Module):
def __init__(
self,
num_classes,
blocks,
layers,
channels,
feature_dim=512,
loss='softmax',
IN=False,
**kwargs
):
super(OSNet, self).__init__()
num_blocks = len(blocks)
assert num_blocks == len(layers)
assert num_blocks == len(channels) - 1
self.loss = loss
self.feature_dim = feature_dim
# convolutional backbone
self.conv1 = ConvLayer(3, channels[0], 7, stride=2, padding=3, IN=IN)
self.maxpool = nn.MaxPool2d(3, stride=2, padding=1)
self.conv2 = self._make_layer(
blocks[0],
layers[0],
channels[0],
channels[1],
reduce_spatial_size=True,
IN=IN
)
self.conv3 = self._make_layer(
blocks[1],
layers[1],
channels[1],
channels[2],
reduce_spatial_size=True
)
self.conv4 = self._make_layer(
blocks[2],
layers[2],
channels[2],
channels[3],
reduce_spatial_size=False
)
self.conv5 = Conv1x1(channels[3], channels[3])
self.global_avgpool = nn.AdaptiveAvgPool2d(1)
# fully connected layer
self.fc = self._construct_fc_layer(
self.feature_dim, channels[3], dropout_p=None
)
# identity classification layer
self.classifier = nn.Linear(self.feature_dim, num_classes)
self._init_params()
def _make_layer(
self,
block,
layer,
in_channels,
out_channels,
reduce_spatial_size,
IN=False
):
layers = []
layers.append(block(in_channels, out_channels, IN=IN))
for i in range(1, layer):
layers.append(block(out_channels, out_channels, IN=IN))
if reduce_spatial_size:
layers.append(
nn.Sequential(
Conv1x1(out_channels, out_channels),
nn.AvgPool2d(2, stride=2)
)
)
return nn.Sequential(*layers)
def _construct_fc_layer(self, fc_dims, input_dim, dropout_p=None):
if fc_dims is None or fc_dims < 0:
self.feature_dim = input_dim
return None
if isinstance(fc_dims, int):
fc_dims = [fc_dims]
layers = []
for dim in fc_dims:
layers.append(nn.Linear(input_dim, dim))
layers.append(nn.BatchNorm1d(dim))
layers.append(nn.ReLU(inplace=True))
if dropout_p is not None:
layers.append(nn.Dropout(p=dropout_p))
input_dim = dim
self.feature_dim = fc_dims[-1]
return nn.Sequential(*layers)
def _init_params(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(
m.weight, mode='fan_out', nonlinearity='relu'
)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm1d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
def featuremaps(self, x):
x = self.conv1(x)
x = self.maxpool(x)
x = self.conv2(x)
x = self.conv3(x)
x = self.conv4(x)
x = self.conv5(x)
return x
def forward(self, x, return_featuremaps=False):
x = self.featuremaps(x)
if return_featuremaps:
return x
v = self.global_avgpool(x)
v = v.view(v.size(0), -1)
if self.fc is not None:
v = self.fc(v)
if not self.training:
return v
y = self.classifier(v)
if self.loss == 'softmax':
return y
elif self.loss == 'triplet':
return y, v
else:
raise KeyError("Unsupported loss: {}".format(self.loss))
def init_pretrained_weights(model, key=''):
import os
import errno
import gdown
from collections import OrderedDict
def _get_torch_home():
ENV_TORCH_HOME = 'TORCH_HOME'
ENV_XDG_CACHE_HOME = 'XDG_CACHE_HOME'
DEFAULT_CACHE_DIR = '~/.cache'
torch_home = os.path.expanduser(
os.getenv(
ENV_TORCH_HOME,
os.path.join(
os.getenv(ENV_XDG_CACHE_HOME, DEFAULT_CACHE_DIR), 'torch'
)
)
)
return torch_home
torch_home = _get_torch_home()
model_dir = os.path.join(torch_home, 'checkpoints')
try:
os.makedirs(model_dir)
except OSError as e:
if e.errno == errno.EEXIST:
# Directory already exists, ignore.
pass
else:
# Unexpected OSError, re-raise.
raise
filename = key + '_imagenet.pth'
cached_file = os.path.join(model_dir, filename)
if not os.path.exists(cached_file):
gdown.download(pretrained_urls[key], cached_file, quiet=False)
state_dict = torch.load(cached_file)
model_dict = model.state_dict()
new_state_dict = OrderedDict()
matched_layers, discarded_layers = [], []
for k, v in state_dict.items():
if k.startswith('module.'):
k = k[7:] # discard module.
if k in model_dict and model_dict[k].size() == v.size():
new_state_dict[k] = v
matched_layers.append(k)
else:
discarded_layers.append(k)
model_dict.update(new_state_dict)
model.load_state_dict(model_dict)
if len(matched_layers) == 0:
print(
'The pretrained weights from "{}" cannot be loaded, '
'please check the key names manually '
'(** ignored and continue **)'.format(cached_file)
)
else:
print(
'Successfully loaded imagenet pretrained weights from "{}"'.
format(cached_file)
)
if len(discarded_layers) > 0:
print(
'** The following layers are discarded '
'due to unmatched keys or layer size: {}'.
format(discarded_layers)
)
def osnet_x1_0(num_classes=1000, pretrained=True, loss='softmax', **kwargs):
# standard size (width x1.0)
model = OSNet(
num_classes,
blocks=[OSBlock, OSBlock, OSBlock],
layers=[2, 2, 2],
channels=[64, 256, 384, 512],
loss=loss,
**kwargs
)
# if pretrained:
# init_pretrained_weights(model, key='osnet_x1_0')
return model
from typing import Generator, Iterable
import torchvision.transforms as T
from collections import OrderedDict
import os.path as osp
def load_checkpoint(fpath):
fpath = osp.abspath(osp.expanduser(fpath))
map_location = None if torch.cuda.is_available() else 'cpu'
# weights_only=False allows checkpoints that contain numpy/other objects (e.g. model.pth.tar-100)
checkpoint = torch.load(fpath, map_location=map_location, weights_only=False)
return checkpoint
def load_pretrained_weights(model, weight_path):
checkpoint = load_checkpoint(weight_path)
if 'state_dict' in checkpoint:
state_dict = checkpoint['state_dict']
else:
state_dict = checkpoint
model_dict = model.state_dict()
new_state_dict = OrderedDict()
matched_layers, discarded_layers = ([], [])
for k, v in state_dict.items():
if k.startswith('module.'):
k = k[7:]
if k in model_dict and model_dict[k].size() == v.size():
new_state_dict[k] = v
matched_layers.append(k)
else:
discarded_layers.append(k)
model_dict.update(new_state_dict)
model.load_state_dict(model_dict)
def load_osnet(device="cuda", weight_path=None):
"""Build osnet_x1_0 and load weights from model.pth.tar-100 via load_pretrained_weights."""
model = osnet_x1_0(num_classes=1, loss='softmax', pretrained=False, use_gpu=device == 'cuda')
# if weight_path is None:
# weight_path = Path(__file__).resolve().parent / "model.pth.tar-100"
weight_path = Path(weight_path)
if weight_path.exists():
load_pretrained_weights(model, str(weight_path))
model.eval()
model.to(device)
return model
def filter_player_boxes(
boxes: List[BoundingBox],
min_area: int = 1500
) -> List[BoundingBox]:
players = []
for b in boxes:
if b.cls_id != 2: # only players
continue
# area = (b.x2 - b.x1) * (b.y2 - b.y1)
# if area < min_area:
# continue
players.append(b)
return players
# OSNet preprocess (same as team_cluster: Resize, ToTensor, ImageNet normalize)
OSNET_IMAGE_SIZE = (64, 32) # (height, width)
OSNET_PREPROCESS = T.Compose([
T.Resize(OSNET_IMAGE_SIZE),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def crop_upper_body(frame: np.ndarray, box: BoundingBox) -> np.ndarray:
# h = box.y2 - box.y1
# y2 = box.y1 + int(0.6 * h)
return frame[
max(0, box.y1):max(0, box.y2),
max(0, box.x1):max(0, box.x2)
]
def preprocess_osnet(crop: np.ndarray) -> torch.Tensor:
"""BGR crop -> RGB PIL -> Resize, ToTensor, ImageNet Normalize (same as team_cluster)."""
rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(rgb)
return OSNET_PREPROCESS(pil)
@torch.no_grad()
def extract_osnet_embeddings(
frames: List[np.ndarray],
# batch_boxes: List[List[BoundingBox]],
batch_boxes: dict[int, List[BoundingBox]],
device="cuda",
batch_size=4
) -> Tuple[np.ndarray, List[BoundingBox]]:
crops = []
meta = []
for frame, frame_index, boxes in zip(frames, batch_boxes.keys(), batch_boxes.values()):
players = filter_player_boxes(boxes)
for box in players:
crop = crop_upper_body(frame, box)
if crop.size == 0:
continue
crops.append(preprocess_osnet(crop))
meta.append(box)
if not crops:
return None, None
all_embeddings = []
with torch.no_grad(): # Inference mode saves ~20-30%
for start in range(0, len(crops), batch_size):
end = start + batch_size
batch = torch.stack(crops[start:end]).float().to(device)
embeddings_chunk = _OSNET_MODEL(batch) # (chunk_size, 256)
all_embeddings.append(embeddings_chunk.cpu())
del batch, embeddings_chunk
embeddings = torch.cat(all_embeddings, dim=0).numpy()
# embeddings /= np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings, meta
def aggregate_by_track(
embeddings: np.ndarray,
meta: List[BoundingBox]
):
track_map = defaultdict(list)
box_map = {}
for emb, box in zip(embeddings, meta):
key = box.track_id if box.track_id is not None else id(box)
track_map[key].append(emb)
box_map[key] = box
agg_embeddings = []
agg_boxes = []
for key, embs in track_map.items():
mean_emb = np.mean(embs, axis=0)
mean_emb /= np.linalg.norm(mean_emb)
agg_embeddings.append(mean_emb)
agg_boxes.append(box_map[key])
return np.array(agg_embeddings), agg_boxes
def cluster_teams(embeddings: np.ndarray):
if len(embeddings) < 2:
return None
kmeans = KMeans(n_clusters=2, n_init = 2, random_state=42)
return kmeans.fit_predict(embeddings)
def update_team_ids(
boxes: List[BoundingBox],
labels: np.ndarray
):
for box, label in zip(boxes, labels):
box.cls_id = TEAM_1_ID if label == 0 else TEAM_2_ID
def classify_teams_batch(
frames: List[np.ndarray],
# batch_boxes: List[List[BoundingBox]],
batch_boxes: dict[int, List[BoundingBox]],
batch_size,
device="cuda"
):
# Fallback: OSNet embeddings + aggregate by track + KMeans
embeddings, meta = extract_osnet_embeddings(
frames, batch_boxes, device, batch_size
)
if embeddings is None:
return
embeddings, agg_boxes = aggregate_by_track(embeddings, meta)
n = len(embeddings)
if n == 0:
return
if n == 1:
agg_boxes[0].cls_id = TEAM_1_ID
return
kmeans = KMeans(n_clusters=2, n_init=2, random_state=42)
kmeans.fit(embeddings)
centroids = kmeans.cluster_centers_ # (2, dim)
# print("Clusters' centers:")
# for i, c in enumerate(centroids):
# print(f" cluster_{i}: shape={c.shape}, norm={np.linalg.norm(c):.4f}, mean={np.mean(c):.4f}")
c0, c1 = centroids[0], centroids[1]
norm_0 = np.linalg.norm(c0)
norm_1 = np.linalg.norm(c1)
# Similarity (cosine), distance (L2), square error (SSE) between the two centers
similarity = np.dot(c0, c1) / (norm_0 * norm_1 + 1e-12)
distance = np.linalg.norm(c0 - c1)
square_error = np.sum((c0 - c1) ** 2)
# print(f" Between centers: similarity(cosine)={similarity:.4f}, distance(L2)={distance:.4f}, square_error(SSE)={square_error:.4f}")
if similarity > 0.95:
# Centers too similar: treat as one cluster (all same team)
for b in agg_boxes:
b.cls_id = TEAM_1_ID
# print(" Similarity > 0.95: using single cluster (all assigned to team 1).")
return
# If cluster_centers_[0] > cluster_centers_[1] then team A = cluster 0, else team B = cluster 0 (swap)
if norm_0 <= norm_1:
kmeans.labels_ = 1 - kmeans.labels_
update_team_ids(agg_boxes, kmeans.labels_)
def get_cls_net(config, pretrained='', **kwargs):
"""Create keypoint detection model with softmax activation"""
def conv3x3(in_planes, out_planes, stride=1):
"""3x3 convolution with padding"""
return nn.Conv2d(in_planes, out_planes, kernel_size=3,
stride=stride, padding=1, bias=False)
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None):
super(BasicBlock, self).__init__()
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes)
self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, inplanes, planes, stride=1, downsample=None):
super(Bottleneck, self).__init__()
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride,
padding=1, bias=False)
self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM)
self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1,
bias=False)
self.bn3 = BatchNorm2d(planes * self.expansion,
momentum=BN_MOMENTUM)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
BatchNorm2d = nn.BatchNorm2d
BN_MOMENTUM = 0.1
blocks_dict = {
'BASIC': BasicBlock,
'BOTTLENECK': Bottleneck
}
class HighResolutionModule(nn.Module):
def __init__(self, num_branches, blocks, num_blocks, num_inchannels,
num_channels, fuse_method, multi_scale_output=True):
super(HighResolutionModule, self).__init__()
self._check_branches(
num_branches, blocks, num_blocks, num_inchannels, num_channels)
self.num_inchannels = num_inchannels
self.fuse_method = fuse_method
self.num_branches = num_branches
self.multi_scale_output = multi_scale_output
self.branches = self._make_branches(
num_branches, blocks, num_blocks, num_channels)
self.fuse_layers = self._make_fuse_layers()
self.relu = nn.ReLU(inplace=True)
def _check_branches(self, num_branches, blocks, num_blocks,
num_inchannels, num_channels):
if num_branches != len(num_blocks):
error_msg = 'NUM_BRANCHES({}) <> NUM_BLOCKS({})'.format(
num_branches, len(num_blocks))
raise ValueError(error_msg)
if num_branches != len(num_channels):
error_msg = 'NUM_BRANCHES({}) <> NUM_CHANNELS({})'.format(
num_branches, len(num_channels))
raise ValueError(error_msg)
if num_branches != len(num_inchannels):
error_msg = 'NUM_BRANCHES({}) <> NUM_INCHANNELS({})'.format(
num_branches, len(num_inchannels))
raise ValueError(error_msg)
def _make_one_branch(self, branch_index, block, num_blocks, num_channels,
stride=1):
downsample = None
if stride != 1 or \
self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.num_inchannels[branch_index],
num_channels[branch_index] * block.expansion,
kernel_size=1, stride=stride, bias=False),
BatchNorm2d(num_channels[branch_index] * block.expansion,
momentum=BN_MOMENTUM),
)
layers = []
layers.append(block(self.num_inchannels[branch_index],
num_channels[branch_index], stride, downsample))
self.num_inchannels[branch_index] = \
num_channels[branch_index] * block.expansion
for i in range(1, num_blocks[branch_index]):
layers.append(block(self.num_inchannels[branch_index],
num_channels[branch_index]))
return nn.Sequential(*layers)
def _make_branches(self, num_branches, block, num_blocks, num_channels):
branches = []
for i in range(num_branches):
branches.append(
self._make_one_branch(i, block, num_blocks, num_channels))
return nn.ModuleList(branches)
def _make_fuse_layers(self):
if self.num_branches == 1:
return None
num_branches = self.num_branches
num_inchannels = self.num_inchannels
fuse_layers = []
for i in range(num_branches if self.multi_scale_output else 1):
fuse_layer = []
for j in range(num_branches):
if j > i:
fuse_layer.append(nn.Sequential(
nn.Conv2d(num_inchannels[j],
num_inchannels[i],
1,
1,
0,
bias=False),
BatchNorm2d(num_inchannels[i], momentum=BN_MOMENTUM)))
# nn.Upsample(scale_factor=2**(j-i), mode='nearest')))
elif j == i:
fuse_layer.append(None)
else:
conv3x3s = []
for k in range(i - j):
if k == i - j - 1:
num_outchannels_conv3x3 = num_inchannels[i]
conv3x3s.append(nn.Sequential(
nn.Conv2d(num_inchannels[j],
num_outchannels_conv3x3,
3, 2, 1, bias=False),
BatchNorm2d(num_outchannels_conv3x3, momentum=BN_MOMENTUM)))
else:
num_outchannels_conv3x3 = num_inchannels[j]
conv3x3s.append(nn.Sequential(
nn.Conv2d(num_inchannels[j],
num_outchannels_conv3x3,
3, 2, 1, bias=False),
BatchNorm2d(num_outchannels_conv3x3,
momentum=BN_MOMENTUM),
nn.ReLU(inplace=True)))
fuse_layer.append(nn.Sequential(*conv3x3s))
fuse_layers.append(nn.ModuleList(fuse_layer))
return nn.ModuleList(fuse_layers)
def get_num_inchannels(self):
return self.num_inchannels
def forward(self, x):
if self.num_branches == 1:
return [self.branches[0](x[0])]
for i in range(self.num_branches):
x[i] = self.branches[i](x[i])
x_fuse = []
for i in range(len(self.fuse_layers)):
y = x[0] if i == 0 else self.fuse_layers[i][0](x[0])
for j in range(1, self.num_branches):
if i == j:
y = y + x[j]
elif j > i:
y = y + F.interpolate(
self.fuse_layers[i][j](x[j]),
size=[x[i].shape[2], x[i].shape[3]],
mode='bilinear')
else:
y = y + self.fuse_layers[i][j](x[j])
x_fuse.append(self.relu(y))
return x_fuse
class HighResolutionNet(nn.Module):
def __init__(self, config, lines=False, **kwargs):
self.inplanes = 64
self.lines = lines
extra = config['MODEL']['EXTRA']
super(HighResolutionNet, self).__init__()
# stem net
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=2, padding=1,
bias=False)
self.bn1 = BatchNorm2d(self.inplanes, momentum=BN_MOMENTUM)
self.conv2 = nn.Conv2d(self.inplanes, self.inplanes, kernel_size=3, stride=2, padding=1,
bias=False)
self.bn2 = BatchNorm2d(self.inplanes, momentum=BN_MOMENTUM)
self.relu = nn.ReLU(inplace=True)
self.sf = nn.Softmax(dim=1)
self.layer1 = self._make_layer(Bottleneck, 64, 64, 4)
self.stage2_cfg = extra['STAGE2']
num_channels = self.stage2_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage2_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion for i in range(len(num_channels))]
self.transition1 = self._make_transition_layer(
[256], num_channels)
self.stage2, pre_stage_channels = self._make_stage(
self.stage2_cfg, num_channels)
self.stage3_cfg = extra['STAGE3']
num_channels = self.stage3_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage3_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion for i in range(len(num_channels))]
self.transition2 = self._make_transition_layer(
pre_stage_channels, num_channels)
self.stage3, pre_stage_channels = self._make_stage(
self.stage3_cfg, num_channels)
self.stage4_cfg = extra['STAGE4']
num_channels = self.stage4_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage4_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion for i in range(len(num_channels))]
self.transition3 = self._make_transition_layer(
pre_stage_channels, num_channels)
self.stage4, pre_stage_channels = self._make_stage(
self.stage4_cfg, num_channels, multi_scale_output=True)
self.upsample = nn.Upsample(scale_factor=2, mode='nearest')
final_inp_channels = sum(pre_stage_channels) + self.inplanes
self.head = nn.Sequential(nn.Sequential(
nn.Conv2d(
in_channels=final_inp_channels,
out_channels=final_inp_channels,
kernel_size=1),
BatchNorm2d(final_inp_channels, momentum=BN_MOMENTUM),
nn.ReLU(inplace=True),
nn.Conv2d(
in_channels=final_inp_channels,
out_channels=config['MODEL']['NUM_JOINTS'],
kernel_size=extra['FINAL_CONV_KERNEL']),
nn.Softmax(dim=1) if self.lines == False else nn.Sigmoid()))
def _make_head(self, x, x_skip):
x = self.upsample(x)
x = torch.cat([x, x_skip], dim=1)
x = self.head(x)
return x
def _make_transition_layer(
self, num_channels_pre_layer, num_channels_cur_layer):
num_branches_cur = len(num_channels_cur_layer)
num_branches_pre = len(num_channels_pre_layer)
transition_layers = []
for i in range(num_branches_cur):
if i < num_branches_pre:
if num_channels_cur_layer[i] != num_channels_pre_layer[i]:
transition_layers.append(nn.Sequential(
nn.Conv2d(num_channels_pre_layer[i],
num_channels_cur_layer[i],
3,
1,
1,
bias=False),
BatchNorm2d(
num_channels_cur_layer[i], momentum=BN_MOMENTUM),
nn.ReLU(inplace=True)))
else:
transition_layers.append(None)
else:
conv3x3s = []
for j in range(i + 1 - num_branches_pre):
inchannels = num_channels_pre_layer[-1]
outchannels = num_channels_cur_layer[i] \
if j == i - num_branches_pre else inchannels
conv3x3s.append(nn.Sequential(
nn.Conv2d(
inchannels, outchannels, 3, 2, 1, bias=False),
BatchNorm2d(outchannels, momentum=BN_MOMENTUM),
nn.ReLU(inplace=True)))
transition_layers.append(nn.Sequential(*conv3x3s))
return nn.ModuleList(transition_layers)
def _make_layer(self, block, inplanes, planes, blocks, stride=1):
downsample = None
if stride != 1 or inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(inplanes, planes * block.expansion,
kernel_size=1, stride=stride, bias=False),
BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM),
)
layers = []
layers.append(block(inplanes, planes, stride, downsample))
inplanes = planes * block.expansion
for i in range(1, blocks):
layers.append(block(inplanes, planes))
return nn.Sequential(*layers)
def _make_stage(self, layer_config, num_inchannels,
multi_scale_output=True):
num_modules = layer_config['NUM_MODULES']
num_branches = layer_config['NUM_BRANCHES']
num_blocks = layer_config['NUM_BLOCKS']
num_channels = layer_config['NUM_CHANNELS']
block = blocks_dict[layer_config['BLOCK']]
fuse_method = layer_config['FUSE_METHOD']
modules = []
for i in range(num_modules):
# multi_scale_output is only used last module
if not multi_scale_output and i == num_modules - 1:
reset_multi_scale_output = False
else:
reset_multi_scale_output = True
modules.append(
HighResolutionModule(num_branches,
block,
num_blocks,
num_inchannels,
num_channels,
fuse_method,
reset_multi_scale_output)
)
num_inchannels = modules[-1].get_num_inchannels()
return nn.Sequential(*modules), num_inchannels
def forward(self, x):
# h, w = x.size(2), x.size(3)
x = self.conv1(x)
x_skip = x.clone()
x = self.bn1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
x = self.layer1(x)
x_list = []
for i in range(self.stage2_cfg['NUM_BRANCHES']):
if self.transition1[i] is not None:
x_list.append(self.transition1[i](x))
else:
x_list.append(x)
y_list = self.stage2(x_list)
x_list = []
for i in range(self.stage3_cfg['NUM_BRANCHES']):
if self.transition2[i] is not None:
x_list.append(self.transition2[i](y_list[-1]))
else:
x_list.append(y_list[i])
y_list = self.stage3(x_list)
x_list = []
for i in range(self.stage4_cfg['NUM_BRANCHES']):
if self.transition3[i] is not None:
x_list.append(self.transition3[i](y_list[-1]))
else:
x_list.append(y_list[i])
x = self.stage4(x_list)
# Head Part
height, width = x[0].size(2), x[0].size(3)
x1 = F.interpolate(x[1], size=(height, width), mode='bilinear', align_corners=False)
x2 = F.interpolate(x[2], size=(height, width), mode='bilinear', align_corners=False)
x3 = F.interpolate(x[3], size=(height, width), mode='bilinear', align_corners=False)
x = torch.cat([x[0], x1, x2, x3], 1)
x = self._make_head(x, x_skip)
return x
def init_weights(self, pretrained=''):
for m in self.modules():
if isinstance(m, nn.Conv2d):
if self.lines == False:
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
else:
nn.init.normal_(m.weight, std=0.001)
#nn.init.normal_(m.weight, std=0.001)
#nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
if pretrained != '':
if os.path.isfile(pretrained):
pretrained_dict = torch.load(pretrained)
model_dict = self.state_dict()
pretrained_dict = {k: v for k, v in pretrained_dict.items()
if k in model_dict.keys()}
model_dict.update(pretrained_dict)
self.load_state_dict(model_dict)
else:
sys.exit(f'Weights {pretrained} not found.')
model = HighResolutionNet(config, **kwargs)
model.init_weights(pretrained)
return model
# Keypoint Inference
def load_kp_model(path, device):
config_kp_path = path / 'hrnetv2_w48.yaml'
cfg_kp = yaml.safe_load(open(config_kp_path, 'r'))
loaded_state_kp = torch.load(path / "keypoint_detect.pt", map_location=device, weights_only=False)
model = get_cls_net(cfg_kp)
model.load_state_dict(loaded_state_kp)
model.to(device)
model.eval()
return model
def preprocess_batch_fast(frames):
"""Ultra-fast batch preprocessing using optimized tensor operations"""
target_size = (540, 960) # H, W format for model input
batch = []
for i, frame in enumerate(frames):
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = cv2.resize(frame_rgb, (target_size[1], target_size[0]))
img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1)) # HWC -> CHW
batch.append(img)
batch = torch.from_numpy(np.stack(batch)).float()
return batch
def extract_keypoints_from_heatmap_fast(heatmap: torch.Tensor, scale: int = 2, max_keypoints: int = 1):
"""Ultra-fast keypoint extraction optimized for speed"""
batch_size, n_channels, height, width = heatmap.shape
# Simplified local maxima detection (faster but slightly less accurate)
max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1)
local_maxima = (max_pooled == heatmap)
# Apply mask and get top keypoints in one go
masked_heatmap = heatmap * local_maxima
flat_heatmap = masked_heatmap.view(batch_size, n_channels, -1)
scores, indices = torch.topk(flat_heatmap, max_keypoints, dim=-1, sorted=False)
# Vectorized coordinate calculation
y_coords = torch.div(indices, width, rounding_mode="floor") * scale
x_coords = (indices % width) * scale
# Stack results efficiently
results = torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1)
return results
def process_keypoints_vectorized(kp_coords, kp_threshold, w, h, batch_size):
"""Ultra-fast vectorized keypoint processing"""
batch_results = []
# Convert to numpy once for faster CPU operations
kp_np = kp_coords.cpu().numpy()
for batch_idx in range(batch_size):
kp_dict = {}
# Vectorized threshold check
valid_kps = kp_np[batch_idx, :, 0, 2] > kp_threshold
valid_indices = np.where(valid_kps)[0]
for ch_idx in valid_indices:
x = float(kp_np[batch_idx, ch_idx, 0, 0]) / w
y = float(kp_np[batch_idx, ch_idx, 0, 1]) / h
p = float(kp_np[batch_idx, ch_idx, 0, 2])
kp_dict[ch_idx + 1] = {'x': x, 'y': y, 'p': p}
batch_results.append(kp_dict)
return batch_results
def inference_batch(frames, model, kp_threshold, device, batch_size=8):
"""Optimized batch inference for multiple frames"""
results = []
num_frames = len(frames)
# Get the device from the model itself
model_device = next(model.parameters()).device
# Process all frames in optimally-sized batches
for i in range(0, num_frames, batch_size):
current_batch_size = min(batch_size, num_frames - i)
batch_frames = frames[i:i + current_batch_size]
# Fast preprocessing - create on CPU first
batch = preprocess_batch_fast(batch_frames)
b, c, h, w = batch.size()
# Move batch to model device
batch = batch.to(model_device)
with torch.inference_mode():
heatmaps = model(batch)
# Ultra-fast keypoint extraction
kp_coords = extract_keypoints_from_heatmap_fast(heatmaps[:,:-1,:,:], scale=2, max_keypoints=1)
# Vectorized batch processing - no loops
batch_results = process_keypoints_vectorized(kp_coords, kp_threshold, 960, 540, current_batch_size)
results.extend(batch_results)
del heatmaps, kp_coords, batch, batch_results, batch_frames
return results
map_keypoints = {
1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23,
11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29,
28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20,
45: 9, 50: 31, 52: 32, 57: 22
}
def get_mapped_keypoints(kp_points):
"""Apply keypoint mapping to detection results"""
mapped_points = {}
for key, value in kp_points.items():
if key in map_keypoints:
mapped_key = map_keypoints[key]
mapped_points[mapped_key] = value
# else:
# Keep unmapped keypoints with original key
# mapped_points[key] = value
return mapped_points
def process_batch_input(frames, model, kp_threshold, device='cpu', batch_size=16):
"""Process multiple input images in batch"""
# Batch inference
kp_results = inference_batch(frames, model, kp_threshold, device, batch_size)
kp_results = [get_mapped_keypoints(kp) for kp in kp_results]
return kp_results
def convert_keypoints_to_val_format(keypoints):
return [tuple(int(x) for x in pair) for pair in keypoints]
def normalize_keypoints(keypoints_result, batch_images, n_keypoints):
keypoints = []
if keypoints_result is not None and len(keypoints_result) > 0:
for frame_number_in_batch, kp_dict in enumerate(keypoints_result):
if frame_number_in_batch >= len(batch_images):
break
frame_keypoints: List[Tuple[int, int]] = []
try:
height, width = batch_images[frame_number_in_batch].shape[:2]
if kp_dict is not None and isinstance(kp_dict, dict):
for idx in range(32):
x, y, p = 0, 0, 0
kp_idx = idx + 1
if kp_idx in kp_dict:
try:
kp_data = kp_dict[kp_idx]
if isinstance(kp_data, dict) and "x" in kp_data and "y" in kp_data:
x = int(kp_data["x"] * width)
y = int(kp_data["y"] * height)
except Exception as e:
pass
frame_keypoints.append((x, y))
except (IndexError, ValueError, AttributeError):
frame_keypoints = [(0, 0)] * 32
if len(frame_keypoints) < n_keypoints:
frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints)))
else:
frame_keypoints = frame_keypoints[:n_keypoints]
keypoints.append(frame_keypoints)
return keypoints
def fix_keypoints(frame_keypoints: list[tuple[int, int]], n_keypoints: int) -> list[tuple[int, int]]:
# Pad or trim to exact n_keypoints
if len(frame_keypoints) < n_keypoints:
frame_keypoints += [(0, 0)] * (n_keypoints - len(frame_keypoints))
elif len(frame_keypoints) > n_keypoints:
frame_keypoints = frame_keypoints[:n_keypoints]
if(frame_keypoints[2] != (0, 0) and frame_keypoints[4] != (0, 0) and frame_keypoints[3] == (0, 0)):
frame_keypoints[3] = frame_keypoints[4]
frame_keypoints[4] = (0, 0)
if(frame_keypoints[0] != (0, 0) and frame_keypoints[4] != (0, 0) and frame_keypoints[1] == (0, 0)):
frame_keypoints[1] = frame_keypoints[4]
frame_keypoints[4] = (0, 0)
if(frame_keypoints[2] != (0, 0) and frame_keypoints[3] != (0, 0) and frame_keypoints[1] == (0, 0) and frame_keypoints[3][0] > frame_keypoints[2][0]):
frame_keypoints[1] = frame_keypoints[3]
frame_keypoints[3] = (0, 0)
if(frame_keypoints[28] != (0, 0) and frame_keypoints[25] == (0, 0) and frame_keypoints[26] != (0, 0) and frame_keypoints[26][0] > frame_keypoints[28][0]):
frame_keypoints[25] = frame_keypoints[28]
frame_keypoints[28] = (0, 0)
if(frame_keypoints[24] != (0, 0) and frame_keypoints[28] != (0, 0) and frame_keypoints[25] == (0, 0)):
frame_keypoints[25] = frame_keypoints[28]
frame_keypoints[28] = (0, 0)
if(frame_keypoints[24] != (0, 0) and frame_keypoints[27] != (0, 0) and frame_keypoints[26] == (0, 0)):
frame_keypoints[26] = frame_keypoints[27]
frame_keypoints[27] = (0, 0)
if(frame_keypoints[28] != (0, 0) and frame_keypoints[23] == (0, 0) and frame_keypoints[20] != (0, 0) and frame_keypoints[20][1] > frame_keypoints[23][1]):
frame_keypoints[23] = frame_keypoints[20]
frame_keypoints[20] = (0, 0)
if(frame_keypoints[28] != (0, 0) and frame_keypoints[23] == (0, 0) and frame_keypoints[20] != (0, 0) and frame_keypoints[20][1] > frame_keypoints[23][1]):
frame_keypoints[23] = frame_keypoints[20]
frame_keypoints[20] = (0, 0)
return frame_keypoints
def challenge_template(path_hf_repo) -> ndarray:
return imread(f"{path_hf_repo}/football_pitch_template.png")
current_path = str(os.path.dirname(os.path.abspath(__file__)))
template_image = challenge_template(current_path)
template_image_gray = cvtColor(template_image, COLOR_BGR2GRAY)
_sparse_template_cache: dict[tuple[int, int], list[tuple[int, int]]] = {}
_shared_eval_executor: ThreadPoolExecutor | None = None
class MaxSizeCache(OrderedDict):
"""
Fixed-size dictionary behaving like a deque(maxlen=N).
Stores key–value pairs with FIFO eviction.
"""
def __init__(self, maxlen=500):
super().__init__()
self.maxlen = maxlen
self._lock = threading.Lock()
def set(self, key, value):
"""Insert or update an item. Evicts oldest if full."""
with self._lock:
if key in self:
del self[key] # refresh position
super().__setitem__(key, value)
if len(self) > self.maxlen:
self.popitem(last=False) # remove oldest
def get(self, key, default=None):
"""Retrieve an item without changing order."""
with self._lock:
return super().get(key, default)
def exists(self, key):
"""Check if a key exists."""
with self._lock:
return key in self
def load(self, data_dict):
"""
Load initial data into cache.
Oldest items evicted if data exceeds maxlen.
"""
for k, v in data_dict.items():
self.set(k, v)
def __repr__(self):
return f"MaxSizeCache(maxlen={self.maxlen}, data={dict(self)})"
cached = MaxSizeCache()
_per_key_locks = defaultdict(threading.Lock)
def get_or_compute_masks(key, compute_fn):
lock = _per_key_locks[key]
with lock:
if cached.exists(key):
return cached.get(key)
# compute once
masks = compute_fn()
cached.set(key, masks)
return masks
INDEX_KEYPOINT_CORNER_BOTTOM_LEFT = 5
INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT = 29
INDEX_KEYPOINT_CORNER_TOP_LEFT = 0
INDEX_KEYPOINT_CORNER_TOP_RIGHT = 24
KEYPOINTS: list[tuple[int, int]] = [
(5, 5), # 1
(5, 140), # 2
(5, 250), # 3
(5, 430), # 4
(5, 540), # 5
(5, 675), # 6
# -------------
(55, 250), # 7
(55, 430), # 8
# -------------
(110, 340), # 9
# -------------
(165, 140), # 10
(165, 270), # 11
(165, 410), # 12
(165, 540), # 13
# -------------
(527, 5), # 14
(527, 253), # 15
(527, 433), # 16
(527, 675), # 17
# -------------
(888, 140), # 18
(888, 270), # 19
(888, 410), # 20
(888, 540), # 21
# -------------
(940, 340), # 22
# -------------
(998, 250), # 23
(998, 430), # 24
# -------------
(1045, 5), # 25
(1045, 140), # 26
(1045, 250), # 27
(1045, 430), # 28
(1045, 540), # 29
(1045, 675), # 30
# -------------
(435, 340), # 31
(615, 340), # 32
]
KEYPOINTS_NP = np.asarray(KEYPOINTS, dtype=np.float32)
FOOTBALL_KEYPOINTS: list[tuple[int, int]] = [
(0, 0), # 1
(0, 0), # 2
(0, 0), # 3
(0, 0), # 4
(0, 0), # 5
(0, 0), # 6
(0, 0), # 7
(0, 0), # 8
(0, 0), # 9
(0, 0), # 10
(0, 0), # 11
(0, 0), # 12
(0, 0), # 13
(0, 0), # 14
(527, 283), # 15
(527, 403), # 16
(0, 0), # 17
(0, 0), # 18
(0, 0), # 19
(0, 0), # 20
(0, 0), # 21
(0, 0), # 22
(0, 0), # 23
(0, 0), # 24
(0, 0), # 25
(0, 0), # 26
(0, 0), # 27
(0, 0), # 28
(0, 0), # 29
(0, 0), # 30
(405, 340), # 31
(645, 340), # 32
]
FOOTBALL_KEYPOINTS_NP = np.asarray(FOOTBALL_KEYPOINTS, dtype=np.float32)
groups = {
1: [2, 3, 7, 10],
2: [1, 3, 7, 10],
3: [2, 4, 7, 8],
4: [3, 5, 8, 7],
5: [4, 8, 6, 3],
6: [5, 4, 8, 13],
7: [3, 8, 9, 10],
8: [4, 7, 9, 13],
9: [7, 8, 11, 12],
10: [9, 11, 7, 2],
11: [9, 10, 12, 31],
12: [9, 11, 13, 31],
13: [9, 12, 8, 5],
14: [15, 31, 32, 16],
15: [31, 16, 32, 14],
16: [31, 15, 32, 17],
17: [31, 16, 32, 15],
18: [19, 22, 23, 26],
19: [18, 22, 20, 32],
20: [19, 22, 21, 32],
21: [20, 22, 24, 29],
22: [23, 24, 19, 20],
23: [27, 24, 22, 28],
24: [28, 23, 22, 27],
25: [26, 27, 23, 18],
26: [25, 27, 23, 18],
27: [26, 23, 28, 24],
28: [27, 24, 29, 23],
29: [28, 30, 24, 21],
30: [29, 28, 24, 21],
31: [15, 16, 32, 14],
32: [15, 31, 16, 14]
}
base_temps = [(0, 0)] * 32
_TEMPLATE_MAX_X: int = 1045
_TEMPLATE_MAX_Y: int = 675
# Precomputed group arrays for faster neighbor lookup (0-based).
GROUPS_ARRAY = [np.asarray(groups[i], dtype=np.int32) - 1 for i in range(1, 33)]
kernel = getStructuringElement(MORPH_RECT, (31, 31))
dilate_kernel = getStructuringElement(
MORPH_RECT, (3, 3)
)
class InvalidMask(Exception):
pass
def has_a_wide_line(mask: ndarray, max_aspect_ratio: float = 1.0) -> bool:
contours, _ = findContours(mask, RETR_EXTERNAL, CHAIN_APPROX_SIMPLE)
for cnt in contours:
x, y, w, h = boundingRect(cnt)
# Early exit optimization
if w == 0 or h == 0:
continue
aspect_ratio = min(w, h) / max(w, h)
if aspect_ratio >= max_aspect_ratio:
return True
return False
def is_bowtie(points: ndarray) -> bool:
def segments_intersect(p1: int, p2: int, q1: int, q2: int) -> bool:
def ccw(a: int, b: int, c: int):
return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0])
return (ccw(p1, q1, q2) != ccw(p2, q1, q2)) and (
ccw(p1, p2, q1) != ccw(p1, p2, q2)
)
pts = points.reshape(-1, 2)
edges = [(pts[0], pts[1]), (pts[1], pts[2]), (pts[2], pts[3]), (pts[3], pts[0])]
return segments_intersect(*edges[0], *edges[2]) or segments_intersect(
*edges[1], *edges[3]
)
def validate_mask_lines(mask: ndarray) -> None:
# Use fast count instead of sum when possible
nonzero_count = countNonZero(mask)
if nonzero_count == 0:
raise InvalidMask("No projected lines")
if nonzero_count == mask.size:
raise InvalidMask("Projected lines cover the entire image surface")
# Skip expensive contour check if mask is small
if has_a_wide_line(mask=mask):
raise InvalidMask("A projected line is too wide")
def validate_mask_ground(mask: ndarray) -> None:
num_labels, _ = connectedComponents(mask)
num_distinct_regions = num_labels - 1
if num_distinct_regions > 1:
raise InvalidMask(
f"Projected ground should be a single object, detected {num_distinct_regions}"
)
area_covered = mask.sum() / mask.size
if area_covered >= 0.9:
raise InvalidMask(
f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic"
)
def validate_projected_corners(
source_keypoints: list[tuple[int, int]], homography_matrix: ndarray
) -> None:
# Vectorized: use fancy indexing to extract corners
corner_indices = np.array([
INDEX_KEYPOINT_CORNER_BOTTOM_LEFT,
INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT,
INDEX_KEYPOINT_CORNER_TOP_RIGHT,
INDEX_KEYPOINT_CORNER_TOP_LEFT
], dtype=np.int32)
# Convert to array once and index
if isinstance(source_keypoints, np.ndarray):
src_corners = source_keypoints[corner_indices]
else:
src_arr = np.array(source_keypoints, dtype=np.float32)
src_corners = src_arr[corner_indices]
src_corners = src_corners[None, :, :]
warped_corners = perspectiveTransform(src_corners, homography_matrix)[0]
if is_bowtie(warped_corners):
raise InvalidMask("Projection twisted!")
def project_image_using_keypoints(
image: ndarray,
source_keypoints: list[tuple[int, int]],
destination_keypoints: list[tuple[int, int]],
destination_width: int,
destination_height: int,
inverse: bool = False,
) -> ndarray:
# Vectorized filtering: convert to arrays and filter with boolean mask
src_arr = np.array(source_keypoints, dtype=np.float32)
dst_arr = np.array(destination_keypoints, dtype=np.float32)
# Vectorized mask: filter out (0, 0) destination points
valid_mask = ~((dst_arr[:, 0] == 0) & (dst_arr[:, 1] == 0))
source_points = src_arr[valid_mask]
destination_points = dst_arr[valid_mask]
H, _ = findHomography(source_points, destination_points)
if H is None:
raise InvalidMask("Homography not found")
validate_projected_corners(source_keypoints=source_keypoints, homography_matrix=H)
projected_image = warpPerspective(image, H, (destination_width, destination_height))
return projected_image
def extract_masks_for_ground_and_lines(image: ndarray,) -> tuple[ndarray, ndarray]:
"""assumes template coloured s.t. ground = gray, lines = white, background = black"""
# gray = cvtColor(image, COLOR_BGR2GRAY)
gray = image
_, mask_ground = threshold(gray, 10, 1, THRESH_BINARY)
x, y, w, h = cv2.boundingRect(cv2.findNonZero(mask_ground))
rect_size = w * h
area_size = countNonZero(mask_ground)
is_rect = area_size == rect_size
if is_rect:
raise InvalidMask(
f"Projected ground should not be rectangular"
)
total_pixels = mask_ground.size
ground_nonzero = int(countNonZero(mask_ground))
if ground_nonzero == 0:
raise InvalidMask("No projected ground")
area_covered = ground_nonzero / float(total_pixels)
if area_covered >= 0.9:
raise InvalidMask(f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic")
validate_mask_ground(mask=mask_ground)
_, mask_lines = threshold(gray, 200, 1, THRESH_BINARY)
validate_mask_lines(mask=mask_lines)
return mask_ground, mask_lines
def get_edge_mask(x, y, W, H, t):
"""Uses bitmasking instead of sets for speed."""
mask = 0
if x <= t: mask |= 1 # Left
if x >= W - t: mask |= 2 # Right
if y <= t: mask |= 4 # Top
if y >= H - t: mask |= 8 # Bottom
return mask
def both_points_same_direction_fast(A, B, W, H, t=100):
mask_a = get_edge_mask(A[0], A[1], W, H, t)
if mask_a == 0: return False
mask_b = get_edge_mask(B[0], B[1], W, H, t)
if mask_b == 0: return False
# Bitwise AND: if any bit matches, they share an edge
return (mask_a & mask_b) != 0
def canonical(obj):
# numpy arrays -> keep order
if isinstance(obj, np.ndarray):
return canonical(obj.tolist())
# ordered sequences
if isinstance(obj, (list, tuple)):
return tuple(canonical(x) for x in obj)
# unordered sets
if isinstance(obj, set):
return tuple(sorted(canonical(x) for x in obj))
# dictionaries (keys may not be ordered)
if isinstance(obj, dict):
return tuple((k, canonical(v)) for k, v in sorted(obj.items()))
return obj # primitive types
def fast_cache_key(frame_keypoints, w, h):
# Byte-based key avoids deep recursion/tuples while preserving order.
# Optimize: check if already array to avoid copy
if isinstance(frame_keypoints, np.ndarray):
if frame_keypoints.dtype == np.int32:
arr = frame_keypoints
else:
arr = frame_keypoints.astype(np.int32)
else:
arr = np.asarray(frame_keypoints, dtype=np.int32)
return (arr.tobytes(), int(w), int(h))
blacklists = [
[23, 24, 27, 28],
[7, 8, 3, 4],
[2, 10, 1, 14],
[18, 26, 14, 25],
[5, 13, 6, 17],
[21, 29, 17, 30],
[10, 11, 2, 3],
[10, 11, 2, 7],
[12, 13, 4, 5],
[12, 13, 5, 8],
[18, 19, 26, 27],
[18, 19, 26, 23],
[20, 21, 24, 29],
[20, 21, 28, 29],
[8, 4, 5, 13],
[3, 7, 2, 10],
[23, 27, 18, 26],
[24, 28, 21, 29]
]
prepared_blacklists = [(set(bl), bl[0]-1, bl[1]-1) for bl in blacklists]
def evaluate_keypoints_for_frame(
frame_keypoints: list[tuple[int, int]],
frame_index,
h,
w,
precomputed_key=None,
) -> float:
global cache
# key = canonical((frame_keypoints, w, h))
key = precomputed_key or canonical(frame_keypoints, w, h)
template_keypoints = KEYPOINTS
floor_markings_template = template_image_gray
# start = time.time()
try:
# h, w = frame.shape[:2]
def compute_masks_for_key(frame_keypoints, w, h):
try:
non_idxs_set = {i + 1 for i, kpt in enumerate(frame_keypoints) if kpt[0] != 0 or kpt[1] != 0}
for bl_set, idx0, idx1 in prepared_blacklists:
if non_idxs_set.issubset(bl_set):
if both_points_same_direction_fast(frame_keypoints[idx0], frame_keypoints[idx1], w, h):
return None, 0, None
warped_template = project_image_using_keypoints(
image=floor_markings_template,
source_keypoints=template_keypoints,
destination_keypoints=frame_keypoints,
destination_width=w,
destination_height=h,
)
mask_ground, mask_lines_expected = extract_masks_for_ground_and_lines(
image=warped_template
)
mask_expected_on_ground = mask_lines_expected
ys, xs = np.where(mask_lines_expected == 1)
if len(xs) == 0:
bbox = None # no foreground pixels
else:
min_x = xs.min()
max_x = xs.max()
min_y = ys.min()
max_y = ys.max()
bbox = (min_x, min_y, max_x, max_y)
bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) if bbox is not None else 1
frame_area = h * w
if (bbox_area / frame_area) < 0.2:
return None, 0, None
pixels_on_lines = int(countNonZero(mask_expected_on_ground))
return mask_expected_on_ground, pixels_on_lines, mask_ground
except Exception as e:
return None, 0, None
mask_expected_on_ground, pixels_on_lines, mask_ground = get_or_compute_masks(
key, lambda: compute_masks_for_key(frame_keypoints, w, h)
)
if mask_expected_on_ground is None or pixels_on_lines == 0 or mask_ground is None:
return 0.0
image_edges = check_frame[frame_index]
h, w = mask_expected_on_ground.shape[:2]
work_buffer = np.zeros((h, w), dtype=np.uint8)
bitwise_and(
image_edges,
image_edges,
dst=work_buffer,
mask=mask_ground
)
dilate(work_buffer, dilate_kernel, dst=work_buffer, iterations=3)
threshold(work_buffer, 0, 255, cv2.THRESH_BINARY, dst=work_buffer)
pixels_predicted_count = countNonZero(work_buffer)
bitwise_and(work_buffer, mask_expected_on_ground, dst=work_buffer)
pixels_overlapping = countNonZero(work_buffer)
pixels_rest = pixels_predicted_count - pixels_overlapping
total_pixels = pixels_predicted_count + pixels_on_lines - pixels_overlapping
if total_pixels > 0 and (pixels_rest / total_pixels) > 0.9:
return 0.0
score = pixels_overlapping / (pixels_on_lines + 1e-8)
return score
except Exception as e:
pass
return 0.0
def _generate_sparse_template_keypoints(frame_width: int, frame_height: int) -> list[tuple[int, int]]:
key = (int(frame_width), int(frame_height))
if key in _sparse_template_cache:
return _sparse_template_cache[key]
template_max_x, template_max_y = (1045, 675)
sx = float(frame_width) / float(template_max_x if template_max_x != 0 else 1)
sy = float(frame_height) / float(template_max_y if template_max_y != 0 else 1)
# Vectorized scaling and rounding
scale_factors = np.array([sx, sy], dtype=np.float32)
scaled_np = np.round(FOOTBALL_KEYPOINTS_NP * scale_factors).astype(np.int32)
scaled = [(int(x), int(y)) for x, y in scaled_np]
_sparse_template_cache[key] = scaled
return scaled
def convert_keypoints_to_val_format(keypoints):
# Vectorized: convert to numpy, cast, then back to list of tuples
if not keypoints:
return []
arr = np.asarray(keypoints, dtype=np.int32)
return [(int(x), int(y)) for x, y in arr]
def are_collinear(pts, eps=1e-9):
pts = np.asarray(pts)
if len(pts) < 3:
return True
a, b, c = pts[:3]
area = np.abs(np.cross(b - a, c - a))
return area < eps
def line_to_line_transform(P1, P2, Q1, Q2):
"""
Compute 2D affine transformation mapping line segment P1P2 -> Q1Q2
Optimized version reducing allocations.
Parameters:
P1, P2: source points (x, y)
Q1, Q2: target points (x, y)
Returns:
M: 3x3 homogeneous transformation matrix
"""
P1 = np.asarray(P1, dtype=np.float64)
P2 = np.asarray(P2, dtype=np.float64)
Q1 = np.asarray(Q1, dtype=np.float64)
Q2 = np.asarray(Q2, dtype=np.float64)
# Source and target vectors
v_s = P2 - P1
v_t = Q2 - Q1
# Scale factor (using hypot for better numerical stability)
norm_s = np.hypot(v_s[0], v_s[1])
norm_t = np.hypot(v_t[0], v_t[1])
s = norm_t / norm_s
# Rotation angle
theta = np.arctan2(v_t[1], v_t[0]) - np.arctan2(v_s[1], v_s[0])
# Precompute sin/cos
cos_theta = np.cos(theta)
sin_theta = np.sin(theta)
# 2x2 scaled rotation components
sr00 = s * cos_theta
sr01 = -s * sin_theta
sr10 = s * sin_theta
sr11 = s * cos_theta
# Translation (direct computation avoiding matrix mul)
t0 = Q1[0] - (sr00 * P1[0] + sr01 * P1[1])
t1 = Q1[1] - (sr10 * P1[0] + sr11 * P1[1])
# Homogeneous 3x3 matrix (direct construction)
M = np.array([
[sr00, sr01, t0],
[sr10, sr11, t1],
[0.0, 0.0, 1.0]
], dtype=np.float64)
return M
def three_point_affine(P, Q):
P = np.array(P, dtype=np.float64)
Q = np.array(Q, dtype=np.float64)
n = P.shape[0]
# Vectorized construction of least-squares system
x, y = P[:, 0], P[:, 1]
u, v = Q[:, 0], Q[:, 1]
# Pre-allocate A matrix
A = np.zeros((2*n, 6), dtype=np.float64)
A[0::2, 0] = x
A[0::2, 1] = y
A[0::2, 2] = 1
A[1::2, 3] = x
A[1::2, 4] = y
A[1::2, 5] = 1
# Vectorized b vector
b = np.empty(2*n, dtype=np.float64)
b[0::2] = u
b[1::2] = v
# Solve least squares (robust to collinear points)
params, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
a, b_, e, c, d, f = params
# Homogeneous transformation matrix
M = np.array([
[a, b_, e],
[c, d, f],
[0, 0, 1]
], dtype=np.float64)
return M
def affine_from_4_points(src_pts, dst_pts):
"""
Compute a 2D affine transformation from 4 source points to 4 target points using least-squares.
Vectorized version for better performance.
Parameters:
src_pts: list of 4 source points [(x1,y1),..., (x4,y4)]
dst_pts: list of 4 target points [(u1,v1),..., (u4,v4)]
Returns:
3x3 homogeneous affine transformation matrix
"""
P = np.array(src_pts, dtype=np.float64)
Q = np.array(dst_pts, dtype=np.float64)
# Vectorized construction of 8x6 system (2 eqs per point)
x, y = P[:, 0], P[:, 1]
u, v = Q[:, 0], Q[:, 1]
A = np.zeros((8, 6), dtype=np.float64)
A[0::2, 0] = x
A[0::2, 1] = y
A[0::2, 2] = 1
A[1::2, 3] = x
A[1::2, 4] = y
A[1::2, 5] = 1
b = np.empty(8, dtype=np.float64)
b[0::2] = u
b[1::2] = v
# Solve least-squares
params, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
a, b_, e, c, d, f = params
# Construct 3x3 affine matrix
M = np.array([
[a, b_, e],
[c, d, f],
[0, 0, 1]
], dtype=np.float64)
return M
def four_point_homography(src_pts, dst_pts):
"""
Compute 2D homography mapping 4 source points to 4 target points.
Vectorized version for better performance.
src_pts: list of 4 source points [(x1,y1),..., (x4,y4)]
dst_pts: list of 4 target points [(u1,v1),..., (u4,v4)]
Returns:
3x3 homography matrix
"""
# Vectorized construction of A matrix
src = np.array(src_pts, dtype=np.float64)
dst = np.array(dst_pts, dtype=np.float64)
x, y = src[:, 0], src[:, 1]
u, v = dst[:, 0], dst[:, 1]
# Pre-allocate A matrix
A = np.zeros((8, 9), dtype=np.float64)
A[0::2, 0] = -x
A[0::2, 1] = -y
A[0::2, 2] = -1
A[0::2, 6] = x * u
A[0::2, 7] = y * u
A[0::2, 8] = u
A[1::2, 3] = -x
A[1::2, 4] = -y
A[1::2, 5] = -1
A[1::2, 6] = x * v
A[1::2, 7] = y * v
A[1::2, 8] = v
# Solve Ah=0 using SVD
_, _, Vt = np.linalg.svd(A)
h = Vt[-1, :] # last row of V^T
H = h.reshape(3, 3)
# Normalize
H /= H[2, 2]
return H
def unique_points(src, dst):
src, dst = np.asarray(src, float), np.asarray(dst, float)
# Vectorized filtering for zero points
src_nonzero = ~np.all(np.abs(src) < 1e-9, axis=1)
dst_nonzero = ~np.all(np.abs(dst) < 1e-9, axis=1)
valid_mask = src_nonzero & dst_nonzero
if not valid_mask.any():
return np.array([]), np.array([])
src_valid = src[valid_mask]
dst_valid = dst[valid_mask]
# Remove duplicates using numpy unique
_, unique_idx = np.unique(src_valid, axis=0, return_index=True)
unique_idx.sort() # preserve order
return src_valid[unique_idx], dst_valid[unique_idx]
def robust_transform(src_pts, dst_pts):
src, dst = unique_points(src_pts, dst_pts)
n = len(src)
if n >= 4:
if are_collinear(src) or are_collinear(dst):
H = affine_from_4_points(src, dst)
return lambda pt: apply_transform(H, pt)
else:
H = four_point_homography(src, dst)
return lambda pt: apply_homo_transform(H, pt)
elif n==3:
H = three_point_affine(src,dst)
elif n==2:
H = line_to_line_transform(src[0],src[1],dst[0],dst[1])
elif n==1:
t = dst[0]-src[0]
H = np.eye(3)
H[:2,2] = t
else:
H = np.eye(3)
return lambda pt: apply_transform(H, pt)
def apply_homo_transform(M, P):
# Optimized: direct indexing instead of array creation
x, y = P[0], P[1]
# Apply transformation with pre-computed homogeneous coords
w = M[2, 0] * x + M[2, 1] * y + M[2, 2]
x_new = (M[0, 0] * x + M[0, 1] * y + M[0, 2]) / w
y_new = (M[1, 0] * x + M[1, 1] * y + M[1, 2]) / w
# Displacement vector
return (int(x_new - x), int(y_new - y))
def apply_transform(M, P):
"""
Transform a single 2D point using a 3x3 transformation matrix H.
Optimized version avoiding array creation.
Args:
H : 3x3 numpy array
Transformation matrix (homography, affine, similarity, etc.)
point : (x, y) array-like
Single point coordinates to transform.
Returns:
(x', y') : Transformed point coordinates
"""
# Direct computation without intermediate arrays
x, y = P[0], P[1]
x_new = M[0, 0] * x + M[0, 1] * y + M[0, 2]
y_new = M[1, 0] * x + M[1, 1] * y + M[1, 2]
return (int(x_new), int(y_new))
def pick_pt(points):
# Fully vectorized neighbor expansion preserving original order.
if not points:
return []
pts_arr = np.asarray(points, dtype=np.int32)
seen = np.zeros(32, dtype=bool)
valid_mask = (pts_arr >= 0) & (pts_arr < 32)
seen[pts_arr[valid_mask]] = True
out_seen = np.zeros(32, dtype=bool)
out = []
for p in pts_arr[valid_mask]:
neigh = GROUPS_ARRAY[p]
candidates = neigh[~seen[neigh] & ~out_seen[neigh]]
out_seen[candidates] = True
out.extend(candidates.tolist())
return out
def make_possible_keypoints(all_keypoints, frame_width, frame_height, limit=2):
# Early exit for empty input
if not all_keypoints:
return []
results = []
for keypoints in all_keypoints:
# --- FIX APPLIED HERE ---
# np.asarray is smart: it avoids copying if the input is already
# the right type/shape, but allows it if conversion is needed.
arr = np.asarray(keypoints, dtype=np.int32)
# Basic shape validation
if arr.ndim != 2 or arr.shape[1] != 2:
continue
# Fast Masking and Counting
mask = (arr[:, 0] != 0) & (arr[:, 1] != 0)
non_zero_count = mask.sum()
# Logic Flow
if non_zero_count > 4:
results.append(keypoints)
continue
if non_zero_count < 2:
continue
# If exactly 4, we append the original BUT continue to try and find the 5th
if non_zero_count == 4:
results.append(keypoints)
# Prepare Transformation Data
non_zero_idxs = np.flatnonzero(mask)
# Assuming KEYPOINTS_NP is available globally
src = KEYPOINTS_NP[non_zero_idxs]
dest = arr[non_zero_idxs].astype(np.float32)
try:
# transform_func is calculated once
transform_func = robust_transform(src, dest)
except Exception:
continue
# Get candidate indices to check
candidate_idxs = pick_pt(non_zero_idxs.tolist())
if not candidate_idxs:
continue
# Pre-calculate Valid Projections
valid_cache = {}
valid_real_idxs = []
for idx in candidate_idxs:
# Transform point
t_pt = transform_func(KEYPOINTS_NP[idx])
# Unroll checks for speed
tx, ty = t_pt[0], t_pt[1]
# Boundary check
if 0 <= tx < frame_width and 0 <= ty < frame_height:
valid_cache[idx] = (int(tx), int(ty))
valid_real_idxs.append(idx)
# Check if we have enough valid points to satisfy the request
n_missing = 5 - non_zero_count
if len(valid_real_idxs) < n_missing:
continue
# Generate Combinations
cnt = 0
for group in combinations(valid_real_idxs, n_missing):
if cnt >= limit:
break
cnt += 1
# Create the result list
# A shallow copy of the list is much faster than recreating a numpy object array.
new_result = list(keypoints)
# Fill in the missing points from our cache
for idx in group:
new_result[idx] = valid_cache[idx]
results.append(new_result)
return results
def _get_shared_eval_executor(max_workers: int) -> ThreadPoolExecutor:
global _shared_eval_executor
if _shared_eval_executor is None:
_shared_eval_executor = ThreadPoolExecutor(max_workers=max_workers)
return _shared_eval_executor
def evaluates(jobs, h, w, total_frames: int):
# start_time = time.time()
if len(jobs) == 0:
return []
unique_jobs = [] # (job, frame_index, key_bytes)
seen = set()
for (job, frame_index) in jobs:
try:
# Optimize: check if already array
if isinstance(job, np.ndarray):
key_bytes = job.astype(np.int32).tobytes() if job.dtype != np.int32 else job.tobytes()
else:
key_bytes = np.asarray(job, dtype=np.int32).tobytes()
sig = (frame_index, key_bytes)
if sig in seen:
continue
seen.add(sig)
unique_jobs.append((job, frame_index, key_bytes))
except Exception as e:
continue
if len(unique_jobs) <= 10:
scores_unique = [
evaluate_keypoints_for_frame(job, frame_index, h, w, precomputed_key=(key_bytes, w, h))
for (job, frame_index, key_bytes) in unique_jobs
]
else:
cpu_count = max(1, (os.cpu_count() or 1))
max_workers = min(max(2, cpu_count), 8)
chunk_size = 500
scores_unique = []
ex = _get_shared_eval_executor(max_workers)
for i in range(0, len(unique_jobs), chunk_size):
chunk = unique_jobs[i:i + chunk_size]
scores_unique.extend(
ex.map(
lambda pair: evaluate_keypoints_for_frame(pair[0], pair[1], h, w, precomputed_key=(pair[2], w, h)),
chunk,
)
)
scores = np.full(total_frames, -1.0, dtype=np.float32)
results = [[(0, 0)] * 32 for _ in range(total_frames)]
for score, (k, frame_index, _) in zip(scores_unique, unique_jobs):
if score > scores[frame_index]:
scores[frame_index] = score
results[frame_index] = k
return results
def fix_keypoints_pri(
results_frames,
frame_width: int,
frame_height: int
) -> list[Any]:
sparse_template = convert_keypoints_to_val_format(_generate_sparse_template_keypoints(frame_width, frame_height))
max_frames = len(results_frames)
limit = 30
before = deque(maxlen=limit)
after = deque(maxlen=limit)
all_possible = [None] * max_frames
for i in range(max_frames):
all_possible[i] = make_possible_keypoints([results_frames[i]], frame_width, frame_height)
for i in range(1, min(limit, max_frames)):
after.append(all_possible[i])
current = all_possible[0] if max_frames > 0 else []
total_jobs = []
for frame_index in range(max_frames):
if frame_index < max_frames - limit:
future_idx = frame_index + limit
if all_possible[future_idx] is None:
all_possible[future_idx] = make_possible_keypoints([results_frames[future_idx]], frame_width, frame_height)
after.append(all_possible[future_idx])
frame_jobs = [(kpts, frame_index) for kpts in current]
for t in after:
frame_jobs.extend([(kpts, frame_index) for kpts in t])
for t in before:
frame_jobs.extend([(kpts, frame_index) for kpts in t])
frame_jobs.append((sparse_template, frame_index))
total_jobs.extend(frame_jobs)
before.append(current)
if len(after) != 0:
current = after.popleft()
start_time = time.time()
results = evaluates(total_jobs, frame_height, frame_width, max_frames)
print(f"Evaluation time: {time.time() - start_time}")
return results
def normalize_results(frame_results, threshold):
if not frame_results:
return []
results_array = []
for result in frame_results:
arr = np.array(result, dtype=np.float32) # (N, 3)
if arr.size == 0:
results_array.append([])
continue
mask = arr[:, 2] > threshold # (N,)
scaled = arr[:, :2] # (N, 2)
scaled = np.where(mask[:, None], scaled, 0) # Apply mask
results_array.append([(int(x), int(y)) for x, y in scaled])
return results_array
def convert_to_gray(image):
gray = cvtColor(image, COLOR_BGR2GRAY)
gray = morphologyEx(gray, MORPH_TOPHAT, kernel, dst=gray)
GaussianBlur(gray, (5, 5), 0, dst=gray)
image_edges = Canny(gray, 30, 100)
return image_edges
class Miner:
def __init__(self, path_hf_repo: Path) -> None:
global _OSNET_MODEL, team_classifier_path
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.path_hf_repo = path_hf_repo
print("✅ Loading YOLO models...")
self.bbox_model = YOLO(path_hf_repo / "player_detect.pt")
print("✅ Loading Team Classifier...")
self.keypoints_model = load_kp_model(path_hf_repo, device)
self.pitch_batch_size = 4
self.osnet_batch_size = 8
self.kp_threshold = 0.3
team_classifier_path = path_hf_repo / "osnet_model.pth.tar-100"
_OSNET_MODEL = load_osnet(device, team_classifier_path)
print("✅ All models loaded")
def predict_batch(self, batch_images: list[ndarray], offset: int, n_keypoints: int):
start = time.time()
# ---------- YOLO ----------
bboxes = {}
bbox_model_results = self.bbox_model.predict(batch_images, verbose=False)
print(f"Detect objects: {time.time() - start}")
start = time.time()
track_id = 0
track_number = 1
for frame_number_in_batch, detection in enumerate(bbox_model_results):
boxes: list[BoundingBox] = []
for box in detection.boxes.data:
x1, y1, x2, y2, conf, cls_id = box.tolist()
temp_track_id = None
if cls_id == PLAYER_ID :
track_id += 1
temp_track_id = track_id
boxes.append(
BoundingBox(
x1=int(x1), y1=int(y1),
x2=int(x2), y2=int(y2),
cls_id=int(cls_id),
conf=float(conf),
track_id = temp_track_id,
)
)
ball_idxs = [i for i, b in enumerate(boxes) if b.cls_id == BALL_ID]
if len(ball_idxs) > 1:
best_i = max(ball_idxs, key=lambda i: boxes[i].conf)
boxes = [
b for i, b in enumerate(boxes)
if not (b.cls_id == BALL_ID and i != best_i)
]
gk_idxs = [i for i, b in enumerate(boxes) if b.cls_id == GK_ID]
if len(gk_idxs) > 1:
best_gk_i = max(gk_idxs, key=lambda i: boxes[i].conf)
for i in gk_idxs:
if i != best_gk_i:
boxes[i].cls_id = PLAYER_ID
track_id += 1
boxes[i].track_id = track_id
ref_idxs = [i for i, b in enumerate(boxes) if b.cls_id == REF_ID]
if len(ref_idxs) > 3:
# sort referee indices by confidence (descending)
ref_idxs_sorted = sorted(ref_idxs, key=lambda i: boxes[i].conf, reverse=True)
keep = set(ref_idxs_sorted[:3])
for i in ref_idxs:
if i not in keep:
boxes[i].cls_id = PLAYER_ID
track_id += 1
boxes[i].track_id = track_id
bboxes[offset + frame_number_in_batch] = boxes
t_redi = team_classifier_path
classify_teams_batch(
frames=batch_images, # List[np.ndarray]
batch_boxes=bboxes, # List[List[BoundingBox]]
batch_size=self.osnet_batch_size,
device=self.device
)
print(f"finish team classify")
print(f"Object Tracking: {time.time() - start}")
start = time.time()
batch_size = len(batch_images)
processed_tensors = []
original_sizes = []
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
pitch_size = min(self.pitch_batch_size, len(batch_images))
device_str = "cuda" if torch.cuda.is_available() else "cpu"
keypoints = []
keypoints_result = process_batch_input(
batch_images,
self.keypoints_model,
self.kp_threshold,
device_str,
batch_size=pitch_size,
)
print(f"Kps detection: {time.time() - start}")
start = time.time()
keypoints = normalize_keypoints(keypoints_result, batch_images, n_keypoints)
for idx, kpts in enumerate(keypoints):
keypoints[idx] = fix_keypoints(kpts, n_keypoints)
h, w = batch_images[0].shape[:2]
keypoints_by_frame = fix_keypoints_pri(keypoints, w, h)
print(f"Fix kps: {time.time() - start}")
results = []
for i in range(len(batch_images)):
frame_number = offset + i
results.append(
TVFrameResult(
frame_id=frame_number,
boxes=bboxes.get(frame_number, []),
keypoints=convert_keypoints_to_val_format(keypoints_by_frame[frame_number - offset])
)
)
return results